Topic exchange

发送到 topic exchange 的消息不能有任意的 routing_key它必须是一个用点分隔的单词列表。这些单词可以是任何内容,但通常会指定与信息相关的一些特征。

A few valid routing key examples, there can be as many words in the routing key as you like, up to the limit of 255 bytes.

stock.usd.nysenyse.vywquick.orange.rabbit。路由密钥的字数不限,最多 255 字节。

binding key 的形式也必须相同。 topic exchange 背后的逻辑与直接交换类似—使用特定routing key发送的消息将被传送到所有使用匹配binding key 绑定的队列。不过,binding key 有两种重要的特殊情况:

  • * (star) can substitute for exactly one word.

  • # (hash) can substitute for zero or more words.

我们要发送的消息都是描述动物的。发送消息时将使用由三个单词(两个点)组成的routing key。路由键中的第一个词描述速度,第二个词描述颜色,第三个词描述物种:<speed>.<colour>.<species>

These bindings can be summarised as:

  • Q1 is interested in all the orange animals.
  • Q2 wants to hear everything about rabbits, and everything about lazy animals.

routing key设置为quick.orange.rabbit的消息将同时送到两个队列。消息 lazy.orange.elephant也会同时进入两个队列。另一方面,quick.orange.fox只会发送到第一个队列,而 azy.brown.fox只会发送到第二个队列。

尽管 lazy.pink.rabbit匹配了两个绑定,但它只会被送到第二个队列一次quick.brown.fox不匹配任何绑定,因此会被丢弃。

如果我们违反规则,发送包含一个或四个单词的信息,如orangequick.orange.new.rabbit,这些信息与任何绑定都不匹配,因此会丢失。

另一方面,lazy.orange.new.rabbit虽然有四个单词,但它将与最后一个绑定匹配,并被传送到第二个队列。

Topic exchange

当队列使用 #(散列)binding key绑定时,无论路由密钥如何,它都将接收所有信息,和fanout exchange一样

如果绑定中没有使用特殊字符 *(星号)和#(散列),和 direct exchange一样

EmitLogTopic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class EmitLogTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

String routingKey = getRouting(argv);
String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}

private static String getRouting(String[] strings) {
if (strings.length < 1)
return "anonymous.info";
return strings[0];
}

private static String getMessage(String[] strings) {
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}

private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0) return "";
if (length < startIndex) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}

ReceiveLogsTopic

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class ReceiveLogsTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();

if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}

for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}

Spring AMQP

我们将使用 Spring Boot 来引导和配置 Spring AMQP 项目,代码仓库链接,与往常一样,配置类如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Configuration
public class RabbitMQConfig {

@Bean // TopicExchange
public TopicExchange topic() {
return new TopicExchange("topics-topic");
}

@Bean // 创建临时队列
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}

@Bean
public Binding binding1a(TopicExchange topic, Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(topic)
.with("*.orange.*");
}

@Bean
public Binding binding1b(TopicExchange topic, Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(topic)
.with("*.*.rabbit");
}

@Bean // 创建临时队列
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding binding2a(TopicExchange topic, Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2)
.to(topic)
.with("lazy.#");
}

}

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Service
public class TopicsSenderServiceImpl implements TopicsSenderService {

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private TopicExchange topic;

private final String[] keys = {"quick.orange.rabbit", "lazy.orange.elephant",
"quick.orange.fox", "lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};

@Override
public void send(String message) { // 使用RabbitTemplate向队列发送消息

for (int i = 0; i < keys.length; i++) {
String key = keys[i];
System.out.println("第 " + i + "次循环发消息");

rabbitTemplate.convertAndSend(topic.getName(), key, message);
System.out.println(" *********************send() 发送了 :" + message + " exchange is: " + topic.getName() + "routing key is: " + key);
}

}
}

接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
public class TopicsReceiver {

@RabbitListener(queues = "#{autoDeleteQueue1.name}")
public void receive1(String in) {

System.out.println("receive1: " + in);
}

@RabbitListener(queues = "#{autoDeleteQueue2.name}")
public void receive2(String in) {

System.out.println("receive2: " + in);
}
}

使用http client 测试结果如下

1
2
# Topics 测试
GET {{routing_host}}/amqp/send?message= 你好 topics exchange