概念
Spring Cloud Stream是构建消息驱动的微服务应用程序的框架。Spring Cloud Stream基于Spring Boot建立独立的生产级Spring应用程序,并使用Spring Integration提供与消息代理的连接。它提供了来自几家供应商的中间件的意见配置,介绍了持久发布订阅语义,消费者组和分区的概念。
用途
为消息中间件提供统一管理通道,便于灵活更替,接入
参考资料
http://08643.cn/p/bf992c23c381
https://blog.csdn.net/lzwglory/article/details/78295343
基础知识
资料:https://blog.csdn.net/weixin_38399962/article/details/82192340
Middleware未RocketMQ RabbitMQ等中间件
编码流程
开启Middleware(Kafka)
创建通道并与Binder绑定(@EnableBinding)
编写操作通道的代码
在配置文件上配置目的地,组,Middleware的地址,端口等等
问题
坑1.stream生成的exchang默认是topic模式。就是按照前缀匹配,发送消息给对应的队列
坑2.默认消息异常之后,都会往死消息队列里面写,然而异常是放到一个header里面去的。默认消息队列支持的最大frame_max 是128kb,超过这个大小,服务器就主动给你关闭连接,然后把你的消息会不断的重试
坑3.看到国内好多博客,使用@Input和@output都是用MessageChannel,这是不对的。@Output对MessageChannel,@Input对应SubscribableChannel 。切记!
坑4.我使用的stream版本是1.2.1,springboot版本时1.5.6。没有办法使用routingkey属性,即在spring.cloud.stream.rabbit这个属性无法显示。应该是我的stream版本偏低吧。遇到这种情况,大家果断换新版本,或者使用自带的ampq来实现吧
坑5.instanceName设置问题
instanceName为区分集群机器使用,假设不设置采用默认,集群时效会发到同一台机器
具体解释:
默认情况下不需要设置instanceName,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
如果同一个jvm中,不同的producer需要往不同的rocketmq集群发送消息,需要设置不同的instanceName
原因如下:如果不设置instanceName,那么会使用ip@pid作为producer唯一标识,那么会导致多个producer内部只有一个MQClientInstance(与mq交互)实例,从而导致只往一个集群发消息。