Direct exchange
direct exchange路由算法 -消息会进入binding key 与消息routing key完全匹配的队列。
可以看到Direct exchange X 绑定了两个队列。第一个队列的绑定键为orange,第二个队列有两个绑定键,一个绑定键为black,另一个绑定键为green。

在此情况下,如果发布到交换机的消息routing key 为orange,则会被路由到队列 Q1。routing key 为black或green的消息将进入 Q2,其他消息都将被丢弃
Multiple bindings

用同一个binding key绑定多个队列是完全合法的。在我们的例子中,我们可以用binding key black 在 X 和 Q1 之间添加一个绑定。在这种情况下,direct exchange 将像fanout exchange 一样,向所有匹配队列广播消息,routing key为 black 的消息将同时发送到 Q1 和 Q2。
Emitting logs
我们和以往一样继续使用日志Demo,不同的是将fanout换成direct。我们将 the log severity 作为routing key。
和之前一样,我们先声明一个队列
1
   | channel.exchangeDeclare(EXCHANGE_NAME, "direct");
   | 
 
然后发送消息
1
   | channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
   | 
 
Subscribing
接收信息的方式与上一教程相同, 我们将为感兴趣的每个severity创建一个新的绑定。
1 2 3 4 5
   | String queueName = channel.queueDeclare().getQueue();
  for(String severity : argv){   channel.queueBind(queueName, EXCHANGE_NAME, severity); }
   | 
 

完整代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
   | public class EmitLogDirect {
      private static final String EXCHANGE_NAME = "direct_logs";
      public static void main(String[] argv) throws Exception {                  ConnectionFactory factory = new ConnectionFactory();                  factory.setHost("localhost");                  try (Connection connection = factory.newConnection();              Channel channel = connection.createChannel()) {                          channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
              String severity = getSeverity(argv);             String message = getMessage(argv);
              channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));             System.out.println(" [x] Sent '" + severity + "':'" + message + "'");         }     }
      private static String getSeverity(String[] strings) {         if (strings.length < 1)             return "info";         return strings[0];     }
      private static String getMessage(String[] strings) {         if (strings.length < 2)             return "Hello World!";         return joinStrings(strings, " ", 1);     }
      private static String joinStrings(String[] strings, String delimiter, int startIndex) {         int length = strings.length;         if (length == 0) return "";         if (length <= startIndex) return "";         StringBuilder words = new StringBuilder(strings[startIndex]);         for (int i = startIndex + 1; i < length; i++) {             words.append(delimiter).append(strings[i]);         }         return words.toString();     } }
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
   | public class ReceiveLogsDirect {
    private static final String EXCHANGE_NAME = "direct_logs";
    public static void main(String[] argv) throws Exception {            ConnectionFactory factory = new ConnectionFactory();     factory.setHost("localhost");            Connection connection = factory.newConnection();            Channel channel = connection.createChannel();
      channel.exchangeDeclare(EXCHANGE_NAME, "direct");     String queueName = channel.queueDeclare().getQueue();
      if (argv.length < 1) {         System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");         System.exit(1);     }
      for (String severity : argv) {         channel.queueBind(queueName, EXCHANGE_NAME, severity);     }     System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
      DeliverCallback deliverCallback = (consumerTag, delivery) -> {         String message = new String(delivery.getBody(), "UTF-8");         System.out.println(" [x] Received '" +             delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");     };     channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });   } }
  | 
 
Spring AMQP
我们将使用 Spring Boot 来引导和配置 Spring AMQP 项目,代码仓库链接,与往常一样,配置类如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
   | @Configuration public class RabbitMQConfig {
      @Bean     public DirectExchange direct() {          return new DirectExchange("routing-direct");     }
      @Bean      public Queue autoDeleteQueue1() {         return new AnonymousQueue();     }
      @Bean     public Binding binding1a(DirectExchange direct, Queue autoDeleteQueue1) {         return BindingBuilder.bind(autoDeleteQueue1)                 .to(direct)                 .with("orange");     }
      @Bean     public Binding binding1b(DirectExchange direct, Queue autoDeleteQueue1) {         return BindingBuilder.bind(autoDeleteQueue1)                 .to(direct)                 .with("black");     }
      @Bean      public Queue autoDeleteQueue2() {         return new AnonymousQueue();     }
      @Bean     public Binding binding2a(DirectExchange direct, Queue autoDeleteQueue2) {         return BindingBuilder.bind(autoDeleteQueue2)                 .to(direct)                 .with("green");     }
      @Bean     public Binding binding2b(DirectExchange direct, Queue autoDeleteQueue2) {         return BindingBuilder.bind(autoDeleteQueue2)                 .to(direct)                 .with("black");     }
  }
   | 
 
消息发送方:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
   | @Service public class RoutingSenderServiceImpl implements RoutingSenderService {
      @Autowired     private RabbitTemplate rabbitTemplate;
      @Autowired     private DirectExchange direct;
      private final String[] keys = {"orange", "black", "green"};
      @Override     public void send(String message) { 
          for (int i = 0; i < keys.length; i++) {             String key = keys[i];             System.out.println("第 " + i + "次循环发消息");
              rabbitTemplate.convertAndSend(direct.getName(), key, message);             System.out.println(" *********************send() 发送了 :" + message + " exchange is: " + direct.getName() + "routing key is: " + key);         }
      } }
   | 
 
消息接收方:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
   | @Component public class RoutingReceiver {
      @RabbitListener(queues = "#{autoDeleteQueue1.name}")     public void receive1(String in) {
          System.out.println("receive1: " + in);     }
      @RabbitListener(queues = "#{autoDeleteQueue2.name}")     public void receive2(String in) {
          System.out.println("receive2: " + in);     } }
   | 
 
使用http client 测试,结果如下:
1 2
   | # Routing 测试 GET {{routing_host}}/amqp/send?message= 你好  routing and routing
   | 
 
