Direct exchange
direct exchange路由算法 -消息会进入binding key 与消息routing key完全匹配的队列。
可以看到Direct exchange X 绑定了两个队列。第一个队列的绑定键为orange,第二个队列有两个绑定键,一个绑定键为black,另一个绑定键为green。
在此情况下,如果发布到交换机的消息routing key 为orange,则会被路由到队列 Q1。routing key 为black或green的消息将进入 Q2,其他消息都将被丢弃
Multiple bindings
用同一个binding key绑定多个队列是完全合法的。在我们的例子中,我们可以用binding key black 在 X 和 Q1 之间添加一个绑定。在这种情况下,direct exchange 将像fanout exchange 一样,向所有匹配队列广播消息,routing key为 black 的消息将同时发送到 Q1 和 Q2。
Emitting logs
我们和以往一样继续使用日志Demo,不同的是将fanout换成direct。我们将 the log severity 作为routing key。
和之前一样,我们先声明一个队列
1
| channel.exchangeDeclare(EXCHANGE_NAME, "direct");
|
然后发送消息
1
| channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
|
Subscribing
接收信息的方式与上一教程相同, 我们将为感兴趣的每个severity创建一个新的绑定。
1 2 3 4 5
| String queueName = channel.queueDeclare().getQueue();
for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }
|
完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class EmitLogDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String severity = getSeverity(argv); String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); } }
private static String getSeverity(String[] strings) { if (strings.length < 1) return "info"; return strings[0]; }
private static String getMessage(String[] strings) { if (strings.length < 2) return "Hello World!"; return joinStrings(strings, " ", 1); }
private static String joinStrings(String[] strings, String delimiter, int startIndex) { int length = strings.length; if (length == 0) return ""; if (length <= startIndex) return ""; StringBuilder words = new StringBuilder(strings[startIndex]); for (int i = startIndex + 1; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class ReceiveLogsDirect {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue();
if (argv.length < 1) { System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); }
for (String severity : argv) { channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
|
Spring AMQP
我们将使用 Spring Boot 来引导和配置 Spring AMQP 项目,代码仓库链接,与往常一样,配置类如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| @Configuration public class RabbitMQConfig {
@Bean public DirectExchange direct() { return new DirectExchange("routing-direct"); }
@Bean public Queue autoDeleteQueue1() { return new AnonymousQueue(); }
@Bean public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("orange"); }
@Bean public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue1) { return BindingBuilder.bind(autoDeleteQueue1) .to(direct) .with("black"); }
@Bean public Queue autoDeleteQueue2() { return new AnonymousQueue(); }
@Bean public Binding binding2a(DirectExchange direct, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(direct) .with("green"); }
@Bean public Binding binding2b(DirectExchange direct, Queue autoDeleteQueue2) { return BindingBuilder.bind(autoDeleteQueue2) .to(direct) .with("black"); }
}
|
消息发送方:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Service public class RoutingSenderServiceImpl implements RoutingSenderService {
@Autowired private RabbitTemplate rabbitTemplate;
@Autowired private DirectExchange direct;
private final String[] keys = {"orange", "black", "green"};
@Override public void send(String message) {
for (int i = 0; i < keys.length; i++) { String key = keys[i]; System.out.println("第 " + i + "次循环发消息");
rabbitTemplate.convertAndSend(direct.getName(), key, message); System.out.println(" *********************send() 发送了 :" + message + " exchange is: " + direct.getName() + "routing key is: " + key); }
} }
|
消息接收方:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| @Component public class RoutingReceiver {
@RabbitListener(queues = "#{autoDeleteQueue1.name}") public void receive1(String in) {
System.out.println("receive1: " + in); }
@RabbitListener(queues = "#{autoDeleteQueue2.name}") public void receive2(String in) {
System.out.println("receive2: " + in); } }
|
使用http client 测试,结果如下:
1 2
| # Routing 测试 GET {{routing_host}}/amqp/send?message= 你好 routing and routing
|