SpringBoot2.3整合RabbitMQ实现延迟消费消息

1.源码获取地址

文章末尾有源代码地址
https://www.sunnyblog.top/detail.html?id=1265257400324063232
本章节主要实现消息的延迟消费,在学习延迟消费之前必须先了解RabbitMQ两个基本概念,消息的TTL和死信Exchange,通过这两者的组合来实现消息的延迟消费。
不想看原理讲解的,直接通过标题6看代码实现

2.消息的TTL(Time To Live)

消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。

3.死信交换器 Dead Letter Exchanges

  • 一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
  • 一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
  • 上面的消息的TTL到了,消息过期了
  • 队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。 死信交换器(Dead Letter Exchange)其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。

4.实现延迟消费原理

file
  • 大概原理:首先发送消息到死信队列,死信队列设置ttl过期时间,到期之后会自动将消息发送到一般队列实现消息的消费
  • 实现步骤如下
  • 创建死信交换器
  • 创建死信队列
  • 将死信队列与死信交换机绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)消息的发送方在向 Exchange发送消息时,也必须指定消息的RoutingKey。Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的Routing key完全一致,才会接收到消息.
  • 创建正常交换器
  • 创建正常队列
  • 将正常队列绑定到正常交换器

5.基于案例实现消息的延迟消费

这里我们以最熟悉的12306购票为例进行案例场景的分析,12306购票步骤如下:

  • 首先登录12306根据日期和起点站等条件进行抢票下订单
  • 抢到票订单处于未支付状态,并提示支付时间30分钟内


    file
  • 这里就可以使用到延时队列,在下订单完成的时候将订单号发送到MQ的死信队列,并设置30分钟过期,30分钟以后死信队列的数据会转发到正常队列,从正常队列中获取到下订单的订单号,然后我们根据订单号查询订单的支付状态,如果已经支付我们不做任何操作,如果未支付取消订单,关闭支付状态,将票回滚到票池供其他用户购买

6.代码实现

  • 在RabbitMQConfig中创建队列、交换机以及绑定关系

      @Configuration
      public class RabbitMQConfig {
    
              /**
               * 测试发送消息到MQ
               * @return
               */
              @Bean
              public Queue testHello() {
                      return new Queue(SysConstant.QUEUE_TEST_HELLO);
              }
    
    
              /**
               * 死信交换机
               * @return
               */
              @Bean
              public DirectExchange sysOrderDelayExchange() {
                      return new DirectExchange(SysConstant.SYS_ORDER_DELAY_EXCHANGE);
              }
    
              /**
               * 死信队列
               * @return
               */
              @Bean
              public Queue sysOrderDelayQueue() {
                      Map<String, Object> map = new HashMap<String, Object>(16);
                      map.put("x-dead-letter-exchange",SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); //指定死信送往的交换机
                      map.put("x-dead-letter-routing-key", SysConstant.SYS_ORDER_RECEIVE_KEY); //指定死信的routingkey
                      return new Queue(SysConstant.SYS_ORDER_DELAY_QUEUE, true, false, false, map);
              }
    
              /**
               * 给死信队列绑定死信交换机
               * @return
               */
              @Bean
              public Binding sysOrderDelayBinding() {
                      return BindingBuilder.bind(sysOrderDelayQueue()).to(sysOrderDelayExchange()).with(SysConstant.SYS_ORDER_DELAY_KEY);
              }
    
              /**
               * 死信接收交换机,用于接收死信队列的消息
               * @return
               */
              @Bean
              public DirectExchange sysOrderReceiveExchange() {
                      return new DirectExchange(SysConstant.SYS_ORDER_RECEIVE_EXCHANGE);
              }
    
              /**
               * 死信接收队列
               * @return
               */
              @Bean
              public Queue sysOrderReceiveQueue() {
                      return new Queue(SysConstant.SYS_ORDER_RECEIVE_QUEUE);
              }
    
              /**
               * 死信接收交换机绑定接收死信队列消费队列
               * @return
               */
              @Bean
              public Binding sysOrdeReceiveBinding() {
                      return BindingBuilder.bind(sysOrderReceiveQueue()).to(sysOrderReceiveExchange()).with(SysConstant.SYS_ORDER_RECEIVE_KEY);
              }
      }
    
  •   发送延时消息到死信交换器方法
    
              @Service
              public class MsgService {
    
                      @Autowired
                      private RabbitTemplate rabbitTemplate;
                      /**
                       * 发送延时消息到mq
                       * @param exchange 死信交换机
                       * @param routeKey 路由key
                       * @param data 发送数据
                       * @param delayTime 过期时间,单位毫秒
                       */
                      public void sendDelayMsgToMQ(String exchange, String routeKey, String data,int delayTime) {
                              rabbitTemplate.convertAndSend(exchange, routeKey, data, message -> {
                                      message.getMessageProperties().setExpiration(delayTime + "");
                                      return message;
                              });
                      }
              }
    
  • 监听队列消息ReceiveMsgListener类

       /**
               * 获取到的延时消息
               * 这里接收到消息进行对应的业务处理(例如:取消订单,关闭支付,回滚库存等 ...)
               * @param msg
               */
              @RabbitListener(queues = SysConstant.SYS_ORDER_RECEIVE_QUEUE)
              @RabbitHandler
              public void getdelayMsg(String msg) {
                      log.info("MQ接收消息时间:{},消息内容:{}", DateUtil.formatDateTime(DateUtil.date()),msg);
                      log.info("------->这里实现订单关闭、支付关闭、回滚库存业务逻辑...");
              }
    
  •           创建Controller向队列发送消息,设置过期时间10秒
    
                      @RestController
                      @RequestMapping("mq")
                      @Slf4j
                      public class MQController {
    
                              @Autowired
                              private MsgService msgService;
    
                              @GetMapping("sendMsg")
                              public String sendMsg() {
                                      log.info("发送延时消息时间:" + DateUtil.formatDateTime(DateUtil.date()));
    
                                      OrderInfo orderInfo = new OrderInfo();
                                      orderInfo.setOrderId(IdUtil.fastSimpleUUID());
                                      orderInfo.setOrderState("待支付");
                                      orderInfo.setPayMoney(999.88);
                                      msgService.sendDelayMsgToMQ(SysConstant.SYS_ORDER_DELAY_EXCHANGE,SysConstant.SYS_ORDER_DELAY_KEY, JSONUtil.toJsonStr(orderInfo),10*1000);//1分钟
                                      return JSONUtil.toJsonStr("发送延时消息成功");
                              }
                      }
    
  • 启动服务,可以看到MQ中创建对应的队列和交换器

file

file
  • 控制台日志可以看到发送消息与消费消息间隔时间是10s
file

7.更多MQ技术文档获取

https://www.sunnyblog.top/index.html?tagId=1264009609236971520

详细开发技术文档尽在 点击这里查看技术文档 ;更多技术文章: https://www.sunnyblog.top;任何疑问加QQ群咨询:534073451

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352