Spring Cloud Stream 进阶配置——高可用(二)——死信队列

前言

前文 Spring Cloud Stream 进阶配置——高可用(一)——失败重试 介绍了 失败重试 机制如何保障消息被正确消费,对于短暂性故障,消费失败后重试,可以得到有效解决;但是如果是诸如程序问题导致消费失败的情况,短时间内(未修复bug之前),当重试次数消耗完之后,消息则会被丢弃。

对于无关紧要的消息,丢了也就丢了,但如果是类似账单这种敏感数据,一旦丢了,老板就要找你谈话了。

针对上面所述场景,rabbitmq 有对应的方案,即 死信队列。

死信队列

何为死信

在开始了解死信队列之前,我们需要知道什么死信,从字面看就是“死掉了的信息(消息)”,不过这是相对于队列来说的,在消息所在的队列看来,没有意义、没有价值的消息,就应该丢弃,任其消亡。那么问题来了,队列是怎么界定这类消息的?这里,不得不先说明一下哪类消息属于死信,有如下几种情况:

  • 消费者通过 basic.reject 拒绝确认消息;
  • 消费者使用 basic.nack 否定确认消息,并参数 requeue 设置为 false;
  • 消息在队列中停留时间超过 ttl,即未能被及时消费;
  • 消息体过大,超过队列所允许的消息体大??;

对于前2种情况,都是消费者主动放弃消息,而后面2种,则因为队列的自我?;せ票欢恿形耷榈囟?。不过,这几种情况都有一个共同点,如果再保留这些死信,很大可能会影响整个队列的正常工作,因为这些都属于消费者不疼,队列不爱的消息,所以只好选择从队列踢掉。

死信的归宿

而死信队列则是死信的归宿,也可以将它比做死信的回收站(下文会揭秘为什么)。死信队列其实也是一个普通队列,可以被消费者订阅,当消息成为死信后,会被投递到与原队列 “绑定” 的队列,该队列就是死信队列。

死信交换机

我们都知道,发布者在发布消息后,需要经过消息交换机,根据特定的路由,才能被正确投递到目标队列。而死信要被投递到死信队列,那肯定还需要一个消息交换机,该交换机为 Dead Letters Exchanges(DLX),即 死信交换机。当然,死信交换机也是一个普通的消息交换机,可以通过正常的声明方式去创建。

思考

这里先抛出一个问题,既然死信队列、死信交换机都是普通的队列、消息交换机,在可视化界面怎么去区分死信队列与其他队列、死信交换机与其他交换机。

声明死信队列

ps: 这里的主题是如何在 SpringCloud Stream 中使用死信队列,其他声明方式(如原生SDK)不在本文讨论范围。

Spring Cloud Stream 声明死信队列非常简单,简单到只需要一个配置就能搞定,这里不得不说 Spring BootSpring Cloud 的设计思想是真厉害。

“开启” 死信队列的相关配置为:spring.cloud.stream.bindings.<channelName>.consumer.autoBindDlq,该配置的作用为:是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。

上面的 开启 用了双引号,为什么呢?配置 autoBindDlq 翻译一下就能大概猜到原因了,因为这个开关,当为 true 时,开启的是 自动声明死信队列,并将其绑定到死信交换机。所以,我们也是可以自己手动创建的。

示例

以下代码可在 源码 查看。

配置

spring:
  cloud:
    stream:
      bindings:
        packetUplinkOutput:
          destination: packetUplinkDlxTopic
          content-type: application/json
          binder: rabbit

        packetUplinkInput:
          destination: packetUplinkDlxTopic
          content-type: application/json
          group: ${spring.application.name}.dlx
          binder: rabbit
      rabbit:
        bindings:
          packetUplinkInput:
            consumer:
              ttl: 20000 # 默认不做限制,即无限。消息在队列中最大的存活时间。当消息滞留超过ttl时,会被当成消费失败消息,即会被转发到死信队列或丢弃.
              # DLQ相关
              autoBindDlq: true # 是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。

ps: 上文提到有几种情况,消息会变成死信,而上面使用的配置是通过设置队列的 ttl,即消息在队列中存活的最大时间为 20s。因为这是制造死信最简单粗暴的方法。

代码

消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
    /**
     * 设备 eui
     */
    private String devEui;

    /**
     * 数据
     */
    private String data;

    // 省略其他字段
}
测试用例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("dlq")
@EnableBinding({ScasDlqTest.MessageSink.class, ScasDlqTest.MessageSource.class})
public class ScasDlqTest {

    @Autowired
    private PacketUplinkProducer packetUplinkProducer;

    private Random random = new Random();
    private List<String> devEuis = new ArrayList<>(10);

    @PostConstruct
    private void initDevEuis() {
        devEuis.add("10001");
        devEuis.add("10002");
        devEuis.add("10003");
        devEuis.add("10004");
        devEuis.add("10005");
        devEuis.add("10006");
        devEuis.add("10007");
        devEuis.add("10008");
        devEuis.add("10009");
        devEuis.add("10010");
    }

    /**
     *
     */
    @Test
    public void test() throws InterruptedException {
        for (int i = 0; i < 100; i++) {
            String devEui = getDevEuis();
            packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
        }

        Thread.sleep(1000000);

    }

    private String getDevEuis() {
        return devEuis.get(random.nextInt(10));
    }

    @Component
    public static class PacketUplinkProducer {

        @Autowired
        private MessageSource messageSource;

        public void publish(PacketModel model) {
            log.info("发布上行数据包消息. model: [{}].", model);
            messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
        }

    }

    @Component
    public static class PacketUplinkHandler {

        @StreamListener("packetUplinkInput")
        public void handle(PacketModel model) throws InterruptedException {
            Thread.sleep(1000);
            log.info("消费上行数据包消息. model: [{}].", model);
        }

    }

    public interface MessageSink {

        @Input("packetUplinkInput")
        SubscribableChannel packetUplinkInput();

    }

    public interface MessageSource {

        @Output("packetUplinkOutput")
        MessageChannel packetUplinkOutput();

    }

}

运行测试用例

运行测试用例后,访问 Rabbitmq可视化页面 可以看到类似下图的页面:

死信队列1

因为目标队列的消费者 1s 才消费一条消息,而队列的 ttl 只有 20s,所以差不多 20s 后,再刷新页面,可以看到:

死信队列2

可以看到,队列中的待消费消息为80条,而我们一共发布了100条,消费力为1条/s,20s后,未消费的消息全部进入死信队列,所以80条对得上。

验证死信被丢弃

为了验证只有创建死信队列并绑定到死信交换机,死信才不会被丢弃,可以将 autoBindDlq 改成 false,再跑一次,20s 后,看目标队列是不是没有消息。不过,需要先把目标队列删除,不然会出现如下错误:

2019-08-19 17:37:17.545 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:18.552 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:20.557 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:24.566 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
2019-08-19 17:37:29.575 ERROR 26146 --- [ 127.0.0.1:5672] o.s.a.r.c.CachingConnectionFactory       : Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-dead-letter-exchange' for queue 'packetUplinkDlxTopic.scas-data-collection.dlx' in vhost '/': received none but current is the value 'DLX' of type 'longstr', class-id=50, method-id=10)
...
删除队列
删除队列
验证结果

重新跑一次测试用例,20s后,可以看到:


死信被丢弃后

如果没有声明死信队列,那么死信一旦产生,就会直接被丢弃,也找不回来了。

如何使用死信队列

死信在进入死信队列后,如果没有类似重新消费的逻辑,那跟被直接丢弃没啥区别,甚至还占用磁盘空间。下面介绍2种体现死信队列价值的操作与实现。

1. 重新发布到目标队列

在每个队列的详情页中,有一个 Move Messages 分栏,如下图所示:

move messages(未启动插件)

上图中圈中的提示,是因为没有启动 rabbitmq_shovel、 rabbitmq_shovel_management 这2个插件,启动命令为: rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management。

如果插件已启动,看到的界面如下:


move messages(插件已启动)

进入死信队列的详情页,将目标队列 packetUplinkDlxTopic.scas-data-collection.dlx 拷贝到 Move Messages 表单的 Destination queue 输入框中,然后点击 Move Messages 按钮??梢钥吹饺缦拢?br>

move messages to target queue

re-consume dead-letters

死信队列的所有消息全部被重新投递到目标队列,看到这里,可以确定的是:通过 Move Messages 功能,是可以将死信重新投递到原队列,而且也可以被正常重新消费。不过可以预见的是,再过20s,又有60条消息变成死信。

ps: 揭秘一下上文的埋点——死信队列比作死信的回收站。其实,看到这里,大家应该大致能理解这句话了,消息在变成死信,这在队列看来,就是我不要这些消息了,可以把它们丢了,所以就进入死信队列这一回收站,而在特定时机,比如机器、环境稳定了,又可以重新发布到原来的队列,即对应回收站的恢复文件功能。所以将死信队列比作死信的回收站,在这种情况下还是可以理解的。

2. 定义死信队列的消费逻辑

上文提到,死信队列其实是一个普通的队列,那么我们直接订阅该死信队列是不是就可以正常消费死信了?答案是肯定的。接下来,使用 spring-rabbitmq 的注解 @RabbitListener 定义死信队列的处理逻辑,代码如下(直接追加在测试用例类即可):

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("dlq")
@EnableBinding({ScasDlqTest.MessageSink.class, ScasDlqTest.MessageSource.class})
public class ScasDlqTest {
    // 省略其他代码

    /**
     * 原队列名称
     */
    private static final String ORIGINAL_QUEUE = "packetUplinkDlxTopic.scas-data-collection.dlx";
    /**
     * 死信队列名称. 由于没有自定义, 所以根据 spring cloud stream 死信队列名称生成规则, 在原队列名称后追加 '.dlq'.
     */
    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
    /**
     * 死信队列交换机. 默认为: {@link RabbitCommonProperties#DEAD_LETTER_EXCHANGE}, 值为 "DLX".
     */
    private static final String DLX = RabbitCommonProperties.DEAD_LETTER_EXCHANGE;
    /**
     * 死信交换机将死信路由到死信队列的 routing-key. 由于没有自定义, 所以根据 spring cloud stream 死信队列名称生成规则,
     * routing-key为原队列的名称.
     */
    private static final String routingKey = "packetUplinkDlxTopic.scas-data-collection";

    /**
     * 死信队列的处理逻辑
     * @param failedMessage
     */
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(DLQ)
                    , exchange = @Exchange(DLX)
                    , key = routingKey
            ),
            concurrency = "1-5"
    )
    public void handleDlq(Message failedMessage) throws InterruptedException {
        Thread.sleep(10);
        log.info("进入 [上行数据包队列] 的死信队列. 完整消息: {};", failedMessage);
        log.info("body: {}", (PacketModel) JSON.parseObject(failedMessage.getBody(), PacketModel.class));
    }
    
}

代码中各个变量都说的很清楚,这里就不赘述了,直接重新启动测试用例(可以考虑先把死信队列删掉,因为里边还有之前遗留的死信),20s 后,可以看到控制台打印如下:


消费死信

消费完成后,2个队列中也都没有堆积的消息,如下:


死信消被费后

当然,上面示例中,只是加死信打印出来,而实战中,则需要根据具体业务自定义死信处理逻辑,比如,发送邮件、序列化到数据库等。

这2种方案的区别
  • 第一种需要手动人工去操作;而第二种是全自动的,只要有死信,就能立即被消费;
  • 基于上面一点,可以引出开发成本上的区别。第一种基本不用额外的编程;而第二种则需要定义对应死信队列的监听器,才能自定义消费逻辑;
  • 再基于上面一点,可以引出功能、扩展性上的区别。第一种基本没有其他扩展能力;而第二种,因为收到的死信消息体,不仅包含了原消息,还携带了成为死信的原因,比如上面的例子,在日志打印的完整消息中,可以看到 x-first-death-reason=expired,即原因是消息过期了,那我们则可以根据不同的原因再结合具体业务,定制处理逻辑;

死信队列其他配置

spring:
  cloud:
    stream:
      rabbit:
        bindings:
          packetUplinkInput:
            consumer:
              # DLQ相关
              autoBindDlq: true # 是否自动声明死信队列(DLQ)并将其绑定到死信交换机(DLX)。默认是false。
              deadLetterQueueName: 'packetUplinkDlxTopic.scas-data-collection.dlx.dlq' # 默认prefix + destination + group + .dlq。DLQ的名称。
              deadLetterExchange: 'DLX' # 默认prefix + DLX。DLX的名称
              deadLetterRoutingKey: 'packetUplinkDlxTopic.scas-data-collection.dlx' # 默认destination + group
              dlqExpires: 30000 # 队列所有 customer 下线, 且在过期时间段内 queue 没有被重新声明, 多久之后队列会被销毁, 注意, 不管队列内有没有消息. 默认不设置.
              dlqLazy: false # 是否声明为惰性队列(Lazy Queue).默认false
              dlqMaxLength: 100000 # 队列中消息数量的最大限制. 默认不限制
              dlqMaxLengthBytes: 100000000 # 队列所有消息总字节的最大限制. 默认不限制
              dlqMaxPriority: 255 # 队列的消息可以设置的最大优先级. 默认不设置
              dlqTtl: 1000000 # 队列的消息的过期时间. 默认不限制
              republishToDlq: true # 默认false。当为true时,死信队列接收到的消息的headers会更加丰富,多了异常信息和堆栈跟踪。
              republishDeliveryMode: DeliveryMode.PERSISTENT # 默认DeliveryMode.PERSISTENT(持久化)。当republishToDlq为true时,转发的消息的delivery mode

ps:
如果需要验证 republishToDlq 配置的作用,可运行测试用例类 `ScasRepublishToDlqTest,既可看到结果,控制台打印结果类似如下:

死信包含异常栈

总结

消息被队列丢弃后,会变成死信,如果队列不声明死信队列,那么这些消息将被永久丢弃,而如果声明死信队列,则死信会进入死信,死信可以被重新投递回原队列,也可以采用订阅死信队列的方式自定义处理逻辑,因为死信队列其实也是一个普通队列。又因为死信队列是一个普通队列,消费过程中肯定也会产生死信,那么死信队列产生的死信,有该何去何从?所以有了死信队列的死信队列,后续文章继续说明。

扩展

1. 鼠标悬停标签查看队列的header

队列headers

2. 如何看出队列是否声明了死信队列

dlq标志

当队列声明了死信队列,会有上图圈中的2个标签。
DLX: 代表死信会被投递到的死信交换机。悬停该标签,可以看到 x-dead-letter-exchange: DLX,其中 DLX 就是交换机名称;
DLK: 代表死信被投递到死信交换机后,会根据什么路由准确投递到死信队列;悬停该标签,可以看到 x-dead-letter-routing-key: packetUplinkDlxTopic.scas-data-collection.dlx;

3. 建议队列尽可能声明死信队列

死信队列是个好东西,当队列声明了死信队列,可以很大程度上避免消息丢失的情况,所以建议队列都添加 autoBindDlq 配置。可以使用全局默认配置:spring.cloud.stream.default.consumer.autoBindDlq: true,这样所有队列都会应用该配置。

推荐阅读

Spring Cloud Stream 进阶配置——高吞吐量(一)——多消费者
Spring Cloud Stream 进阶配置——高吞吐量(二)——弹性消费者数量
Spring Cloud Stream 进阶配置——高吞吐量(三)——批量预取消息(prefetch)
Spring Cloud Stream 进阶配置——高可用(一)——失败重试

相关链接

Spring Cloud Stream消费失败后的处理策略(三):使用DLQ队列(RabbitMQ)

完!

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351