8 消息驱动 Stream
Stream概述
消息驱动
屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型
SpringCloud Stream
SpringCloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者outputs来与SpringCloudStream中binder对象交互。通过配置来binding,而SpringCloudStream的binder对象负责与消息中间件交互。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。SpringCloudStream为一些供应商的消息中间件产品提供个性化的自动配置实现,引用了发布-订阅、消费组、分区三个核心概念。目前仅RabbitMQ和Kafka
设计思想
标准MQ
- 生产者/消费者之间靠消息媒介传递信息内容(Message)
- 消息必须走特定的通道(MessageChannel)
- 消息通道里的消息谁负责发谁负责处理(消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅)
为什么使用CloudStream
这些中间件的差异性导致实际项目开发造成困扰,如果使用两个消息队列的其中一种,后面的业务需求,想往另外一种消息队列进行迁移,此时一大堆东西要重写做,因为它跟系统耦合,而Cloud Stream给我们提供了一种解耦的方式
stream如何统一底层差异
通过定义绑定器作为中间层,实现应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现
Cloud Stream标准流程套路
Binder:很方便的连接中间件,屏蔽差异
Channel:通道,在消息通讯系统中实现存储和转发的媒介,通过Channel对队列进行配置
Souce和Sink:从Stream发布消息就是输出,接受消息就是输入
编码API和常用注解
组成 | 说明 |
---|---|
Middleware | 中间件,目前仅支持RabbitMQ和Kafka |
Binder | Binder是应用与消息中间件之间的封装,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应Kafka的topic、RabbitMQ的exchange) |
@Input | 注解标识输入通道,通过该输入通道接受到的消息进入应用程序 |
@Output | 注解标识输出通道,发布的消息将通过该通道离开应用程序 |
@StreamListener | 监听队列,用于消费者的队列的消息接收 |
@EanableBinding | 指信道channel和exchange绑定在一起 |
消息驱动—生产者
1.新建cloud-stream-rabbitmq-provider-8801
2.引入pom
3.引入yml
4.添加主启动
5.处理业务类
发送消息接口
1 | public interface IMessageProvider { |
发送消息接口实现
1 | // 消息推送管道 |
控制层
1 |
|
6.测试
- 启动7001
- 启动rabbitmq(http://localhost:15672/)
- 启动8801(http://localhost:8801/sendMessage)
消息驱动—消费者
1.新建cloud-stream-rabbitmq-consumer-8802
2.引入pom
3.配置yml
4.添加主启动
5.处理业务
1 |
|
6.测试
测试8801发送,8802接收消息
分组消费与持久化
拷贝一份8802作为新的消费者,8801发送消息,8802和8803接收,出现重复消费和消息持久化问题
重复消费问题
原因:默认分组group是不同的,不同分组可以重复消费
解决方案:自定义配置分组,分同一个组,解决重复消费问题
使用分组和持久化属性group解决
在上述场景下,订单系统做集群部署,都会从RabbitMQ中获取订单信息,如果一个订单同时被两个服务获取,那么就会造成数据错误,为了避免该情况,可以使用Stream中的消息分组解决。在Stream中处于同一个组中的多个消费者是竞争关系,能保证消息只会被其中一个应用消费一次,不同组可以全面消费(重复消息),同一个组内会发生竞争关系,只有一个可以消费
解决
原理 : 微服务应用宝放置于同一个group中,就能保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费
多数情况,生产者发生消息给某具体微服务时只希望被消费一次,上述启动的两个应用实例,虽然同属一个应用,但该消息出现被重复消费两次的情况。为了解决该问题,在SpringCloud中提出消费组的概念
8802/8803实现轮询分组,每次只有一个消费者。8801模块的发的消息只能被两者之一接收到,这样避免重复消费
因此将8802和8803归为相同组group: groupA
消息持久化