Topic exchange
发送到 topic exchange 的消息不能有任意的 routing_key—它必须是一个用点分隔的单词列表。这些单词可以是任何内容,但通常会指定与信息相关的一些特征。
A few valid routing key examples, there can be as many words in the routing key as you like, up to the limit of 255 bytes.
stock.usd.nyse、nyse.vyw、quick.orange.rabbit。路由密钥的字数不限,最多 255 字节。
binding key 的形式也必须相同。 topic exchange 背后的逻辑与直接交换类似—使用特定routing key发送的消息将被传送到所有使用匹配binding key 绑定的队列。不过,binding key 有两种重要的特殊情况:

我们要发送的消息都是描述动物的。发送消息时将使用由三个单词(两个点)组成的routing key。路由键中的第一个词描述速度,第二个词描述颜色,第三个词描述物种:<speed>.<colour>.<species>。
These bindings can be summarised as:
- Q1 is interested in all the orange animals.
- Q2 wants to hear everything about rabbits, and everything about lazy animals.
routing key设置为quick.orange.rabbit的消息将同时送到两个队列。消息 lazy.orange.elephant也会同时进入两个队列。另一方面,quick.orange.fox只会发送到第一个队列,而 azy.brown.fox只会发送到第二个队列。
尽管 lazy.pink.rabbit匹配了两个绑定,但它只会被送到第二个队列一次。quick.brown.fox不匹配任何绑定,因此会被丢弃。
如果我们违反规则,发送包含一个或四个单词的信息,如orange或 quick.orange.new.rabbit,这些信息与任何绑定都不匹配,因此会丢失。
另一方面,lazy.orange.new.rabbit虽然有四个单词,但它将与最后一个绑定匹配,并被传送到第二个队列。
Topic exchange
当队列使用 #(散列)binding key绑定时,无论路由密钥如何,它都将接收所有信息,和fanout exchange一样
如果绑定中没有使用特殊字符 *(星号)和#(散列),和 direct exchange一样
EmitLogTopic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String routingKey = getRouting(argv); String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); } }
private static String getRouting(String[] strings) { if (strings.length < 1) return "anonymous.info"; return strings[0]; }
private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); }
private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length < startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
|
ReceiveLogsTopic
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) { System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); }
for (String bindingKey : argv) { channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); }
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
|
Spring AMQP

我们将使用 Spring Boot 来引导和配置 Spring AMQP 项目,代码仓库链接,与往常一样,配置类如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| @Configuration public class RabbitMQConfig {
@Bean public TopicExchange topic() { return new TopicExchange("topics-topic"); }
@Bean public Queue autoDeleteQueue1() { return new AnonymousQueue(); }
@Bean public Binding binding1a(TopicExchange topic, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(topic) .with("*.orange.*"); }
@Bean public Binding binding1b(TopicExchange topic, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(topic) .with("*.*.rabbit"); }
@Bean public Queue autoDeleteQueue2() { return new AnonymousQueue(); }
@Bean public Binding binding2a(TopicExchange topic, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(topic) .with("lazy.#"); }
}
|
发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Service public class TopicsSenderServiceImpl implements TopicsSenderService {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private TopicExchange topic;
private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant", "quick.orange.fox", "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};
@Override public void send(String message) {
for (int i = 0; i < keys.length; i++) { String key = keys[i]; System.out.println("第 " + i + "次循环发消息");
rabbitTemplate.convertAndSend(topic.getName(), key, message); System.out.println(" *********************send() 发送了 :" + message + " exchange is: " + topic.getName() + "routing key is: " + key); }
} }
|
接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Component public class TopicsReceiver {
@RabbitListener(queues = "#{autoDeleteQueue1.name}") public void receive1(String in) {
System.out.println("receive1: " + in); }
@RabbitListener(queues = "#{autoDeleteQueue2.name}") public void receive2(String in) {
System.out.println("receive2: " + in); } }
|
使用http client 测试结果如下
1 2
| # Topics 测试 GET {{routing_host}}/amqp/send?message= 你好 topics exchange
|
