Stream概述

消息驱动

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

SpringCloud Stream

SpringCloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与SpringCloudStream中binder对象交互。通过配置来binding,而SpringCloudStream的binder对象负责与消息中间件交互。

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。SpringCloudStream为一些供应商的消息中间件产品提供个性化的自动配置实现,引用了发布-订阅、消费组、分区三个核心概念。目前仅RabbitMQ和Kafka

设计思想

标准MQ

  • 生产者/消费者之间靠消息媒介传递信息内容(Message)
  • 消息必须走特定的通道(MessageChannel)
  • 消息通道里的消息谁负责发谁负责处理(消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅)

为什么使用CloudStream

这些中间件的差异性导致实际项目开发造成困扰,如果使用两个消息队列的其中一种,后面的业务需求,想往另外一种消息队列进行迁移,此时一大堆东西要重写做,因为它跟系统耦合,而Cloud Stream给我们提供了一种解耦的方式

stream如何统一底层差异

通过定义绑定器作为中间层,实现应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现

Cloud Stream标准流程套路

Binder:很方便的连接中间件,屏蔽差异

Channel:通道,在消息通讯系统中实现存储和转发的媒介,通过Channel对队列进行配置

Souce和Sink:从Stream发布消息就是输出,接受消息就是输入

编码API和常用注解

组成 说明
Middleware 中间件,目前仅支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应Kafka的topic、RabbitMQ的exchange)
@Input 注解标识输入通道,通过该输入通道接受到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的消息接收
@EanableBinding 指信道channel和exchange绑定在一起

消息驱动—生产者

1.新建cloud-stream-rabbitmq-provider-8801

2.引入pom

3.引入yml

4.添加主启动

5.处理业务类

发送消息接口

1
2
3
4
public interface IMessageProvider {

String send();
}

发送消息接口实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@EnableBinding(Source.class) // 消息推送管道
public class MessageProviderImpl implements IMessageProvider {

@Resource
private MessageChannel output; // 消息发送管道

@Override
public String send() {

String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("************serial = " + serial);
return null;

}
}

控制层

1
2
3
4
5
6
7
8
9
10
11
@RestController
public class SendMessageController {

@Resource
private IMessageProvider messageProvider;

@GetMapping("/sendMessage")
public String sendMessage() {
return messageProvider.send();
}
}

6.测试

  1. 启动7001
  2. 启动rabbitmq(http://localhost:15672/)
  3. 启动8801(http://localhost:8801/sendMessage)

消息驱动—消费者

1.新建cloud-stream-rabbitmq-consumer-8802

2.引入pom

3.配置yml

4.添加主启动

5.处理业务

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@EnableBinding(Sink.class)
public class ReceiveMessage {

@Value("${server.port}")
private String serverPort;

@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消费者1号,接收到的消息 " + message.getPayload() + "\tport\t" + serverPort);
}
}

6.测试

测试8801发送,8802接收消息

分组消费与持久化

拷贝一份8802作为新的消费者,8801发送消息,8802和8803接收,出现重复消费和消息持久化问题

重复消费问题

原因:默认分组group是不同的,不同分组可以重复消费

解决方案:自定义配置分组,分同一个组,解决重复消费问题

使用分组和持久化属性group解决

在上述场景下,订单系统做集群部署,都会从RabbitMQ中获取订单信息,如果一个订单同时被两个服务获取,那么就会造成数据错误,为了避免该情况,可以使用Stream中的消息分组解决。在Stream中处于同一个组中的多个消费者是竞争关系,能保证消息只会被其中一个应用消费一次,不同组可以全面消费(重复消息),同一个组内会发生竞争关系,只有一个可以消费

解决

原理 : 微服务应用宝放置于同一个group中,就能保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费

多数情况,生产者发生消息给某具体微服务时只希望被消费一次,上述启动的两个应用实例,虽然同属一个应用,但该消息出现被重复消费两次的情况。为了解决该问题,在SpringCloud中提出消费组的概念

8802/8803实现轮询分组,每次只有一个消费者。8801模块的发的消息只能被两者之一接收到,这样避免重复消费

因此将8802和8803归为相同组group: groupA

消息持久化