metaq是阿里团队的消息中间件,之前也有用过和了解过kafka,据说metaq是基于kafka的源码改过来的,他们之间的区别在哪里,接下来一探究竟。
由此实现一个重要的功能:挡住前端的数据洪峰,保证后端系统的稳定性。
1.支持严格的消息顺序
2.支持Topic与Queue两种模式
3.亿级消息堆积能力
4.比较友好的分布式特性
5.同时支持Push与Pull方式消费消息
究竟metaq是如何支持这5个特性的,带着问题去分析metaq。
2007年,淘宝实施了“五彩石”项目,将交易系统由单机交易升级到了分布式,这个过程中产生了Notify。
2010年,阿里巴巴B2B部门基于ActiveMQ的5.1版本也开发了自己的一款消息引擎,称为Napoli。
2011年,Linkin推出Kafka消息引擎,阿里巴巴在研究了Kafka的整体机制和架构设计之后,基于Kafka的设计使用Java进行了完全重写并推出了MetaQ 1.0版本,主要是用于解决顺序消息和海量堆积的问题,由开源社区killme2008维护。
2012年,阿里巴巴对于MetaQ进行了架构重组升级,开发出了MetaQ 2.0,这时就发现MetaQ原本基于Kafka的架构在阿里巴巴如此庞大的体系下很难进行水平扩展,所以在2012年的时候就开发了RocketMQ 3.0。
2015年,又基于RocketMQ开发了阿里云上的Aliware MQ和Notify 3.0。
2016年,阿里巴巴将RocketMQ的内核引擎捐赠给了Apache基金会。
MetaQ和RocketMQ区别:两者等价,在阿里内部称为MetaQ 3.0,对外称为RocketMQ 3.0。
以上就是RocketMQ的整体发展历史,其实在阿里巴巴内部围绕着RocketMQ内核打造了三款产品,分别是MetaQ、Notify和Aliware MQ。这三者分别采用了不同的模型:
MetaQ主要使用了拉模型,解决了顺序消息和海量堆积问题。
Notify主要使用了推模型,解决了事务消息
而云产品Aliware MQ则是提供了商业化的版本。
NameServer集群:MetaQ 1.x和MetaQ 2.x是依赖ZooKeeper的,由于ZooKeeper功能过重,RocketMQ(即MetaQ 3.x)去掉了对ZooKeeper依赖,采用自己的NameServer。
Broker:消息中转角色,负责存储消息,转发消息。
Consumer:Push Consumer / Pull Consumer。前者向Consumer对象注册一个Listener接口,收到消息后回调Listener接口方法,采用long-polling长轮询实现push;后者主动由Consumer主动拉取信息,同kafka。
Producer:消息生产者。
Message:单位消息
Topic:软分区,对应相同的topic时,生产者对应消费者的分区标识
Tag:消息在topic基础上的二级分类
Message Queue:硬分区,物理上区分topic,一个topic对应多个message queue
Group:Consumer Group,一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致;Producer Group,一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。
Offset:绝对偏移值,message queue中有两类offset(commitOffset和offset),前者存储在OffsetStore中表示消费到的位置,后者是在PullRequest中为拉取消息位置。
广播消费:Producer 向一些队列轮流发送消息,队列集合称为 Topic,每一个 consumer 实例消费这个 Topic 对应的所有队列。
集群消费:多个 Consumer 实例平均消费这个 topic 对应的队列集合。
Broker以组为单位向Consumer提供消息服务,group中分为master和slave两种角色,master和slave的消息同步后续介绍。然后通过NameServer暴露给Consumer具体通信地址,采用message queue消息队列结构来提供消费接口。针对某一topic情况下,message queue会根据queue id分布在不同的broker上,Consumer的消息消费压力则会分摊在不同的Broker上的message queue,从而达到负载均衡的作用。
负载均衡关系图:
虽然每个topic下面有很多message queue,但是message queue本身并不存储消息。真正的消息存储会写在CommitLog的文件,message queue只是存储CommitLog中对应的位置信息,方便通过message queue找到对应存储在CommitLog的消息。不同的topic,message queue都是写到相同的CommitLog 文件,也就是说CommitLog完全的顺序写,而顺序读写是metaq高吞吐量的基础。
下图为本机启动broker后文件系统截图:
重试队列:%RETRY%+consumergroup,push consumer默认订阅用于消费失败后的重试消费
死信队列:多次(默认16次)消费失败后进入DLQ队列,需要人工处理
定时队列:用于定时和延时消息
ConsumeQueue:?即message queue,根据topic和queueId区分的消息队列,对MappedFileQueue进行封装
CommitLog:?Broker中顺序存储的消息结构,管理消息commit和flush,对MappedFileQueue进行封装
MappedFileQueue:?对~/store/commitlog/中MappedFile封装成文件队列,进行文件大小格式检查,对mappedFile进行管理。
MappedFile:?实际broker数据文件映射成的类,即~/store/commitlog/中00000000000000000000、00000000001073741824等文件,每个文件默认大小上限为1G。
CommitLog负责将Producer的消息写入文件中,写入过程中单例加锁
MappedFile unlockMappedFile = null;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try {
? ? long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
? ? this.beginTimeInLock = beginLockTimestamp;
? ? // Here settings are stored timestamp, in order to ensure an orderly
? ? // global
? ? msg.setStoreTimestamp(beginLockTimestamp);
? ? if (null == mappedFile || mappedFile.isFull()) {
? ? ? ? mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
? ? }
? ? if (null == mappedFile) {
? ? ? ? log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
? ? ? ? beginTimeInLock = 0;
? ? ? ? return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
? ? }
? ? result = mappedFile.appendMessage(msg, this.appendMessageCallback);
? ? ...
} finally {
? ? putMessageLock.unlock();
}
核心消息存储体DefaultMessageStore类,主要包含两个重要成员:
//消息存放的物理主体?
private final CommitLog commitLog?
//根据topic分离的消费队列,记录消息的直接地址?
private final ConcurrentMap> consumeQueueTable
metaq会启动一个定时服务ReputMessageService分别定时调用(间隔1ms)来生成消费者队列和索引文件,两者均会以文件形式落盘。
metaq使用PullMessageProcessor来处理来自Consumer消费消息的请求,然后MessageStore存储体根据group、topic、queueId以及maxMsgNums来从CommitLog消息物理文件中获取对应消息的MappedByteBuffer然后返回消息给Consumer。
metaq文件目录下有两个文件用于持久化消费进度,每次写入 consumerOffset.json,将原内容备份到 consumerOffset.json.bak。
consumerOffset.json:消费进度存储文件
consumerOffset.json.bak:消费进度存储文件备份
Consumer消息完后Broker持久化其消费进度,关键代码如下:
//com.alibaba.rocketmq.broker.offset.ConsumerOffsetManager
private ConcurrentMap> offsetTable =
? ? ? ? new ConcurrentHashMap>(512);
private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
? ? ? ? ConcurrentMap map = this.offsetTable.get(key);
? ? ? ? if (null == map) {
? ? ? ? ? ? map = new ConcurrentHashMap(32);
? ? ? ? ? ? map.put(queueId, offset);
? ? ? ? ? ? this.offsetTable.put(key, map);
? ? ? ? } else {
? ? ? ? ? ? Long storeOffset = map.put(queueId, offset);
? ? ? ? ? ? if (storeOffset != null && offset < storeOffset) {
? ? ? ? ? ? ? ? log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}", clientHost, key, queueId, offset, storeOffset);
? ? ? ? ? ? }
? ? ? ? }
? ? }
GroupCommitService、FlushRealTimeService、CommitRealTimeService均是ServiceThread的子类(ServiceThread线程同步待分析)。FlushRealTimeService和CommitRealTimeService顾名思义前者是实时Flush到磁盘,直接采用FileChannel的force()来flush;后者则是Commit实时,采用FileChannel的write()先写到内存字节缓冲区,然后唤醒flush线程。
同步刷盘是在每条消息都确认落盘了之后才向发送者返回响应;而异步刷盘中,只要消息保存到Broker的内存就向发送者返回响应,Broker会有专门的线程对内存中的消息进行批量存储。所以异步刷盘的策略下,当机器突然掉电时,Broker内存中的消息因无法刷到磁盘导致丢失。
FlushRealTimeService两种flush定时:固定式flush和唤醒式flush,采用固定时间或堵塞等待上一个flush线程完成flush并唤醒。
while (!this.isStopped()) {
? ? ...
? ? if (flushCommitLogTimed) {
? ? ? ? Thread.sleep(interval);
? ? } else {
? ? ? ? this.waitForRunning(interval);
? ? }
? ? ...
? ? CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages);
}
//com.alibaba.rocketmq.store.MappedFile:int flush(final int flushLeastPages)
try {
? ? //We only append data to fileChannel or mappedByteBuffer, never both.
? ? if (writeBuffer != null || this.fileChannel.position() != 0) {
? ? ? ? this.fileChannel.force(false);
? ? } else {
? ? ? ? this.mappedByteBuffer.force();
? ? }
} catch (Throwable e) {
? ? log.error("Error occurred when force data to disk.", e);
}
采用FileChannel.force()来落盘,路径为wrotePosion->flushedPositionwrotePosition。
采用MappedByteBuffer.force()来落盘,路径为wrotePosion->committedPosion->flushedPositionwrotePosition。
其中FileChannel为NIO中流式读写IO,MappedByteBuffer将文件映射成虚拟内存然后对内存直接操作。对10M文件进行读写对比(数据本机跑,参考:FileChannel和 MappedByteBuffer性能对比):
读性能上,FileChannel要优于MappedByteBuffer。写性能上,FileChannel的34ms为直接落盘耗时,MappedByteBuffer还需要等待系统同步。MappedByteBuffer还需要额外的map耗时。总体来说NIO的FileChannel表现更好,这也是MetaQ采用FileChannel原因。
利用Linux文件系统内存cache来提高性能
本地磁盘文件->socket发送,4步骤数据流向:
hard driver -> kernel space ----?[DMA copy]
kernel space -> user space ----?[CPU copy]
user space -> kernel space ----?[CPU copy]
kernel space -> protocol engine ----?[DMA copy]?
左右图的上部分为user和kernel的上下文切换。下部分为数据流向图,其中左图为正常流程中Linux从文件系统读取文件然后通过socket发送的流程,其中共经历了4次内存拷贝;右图为改进版本,通过mmap内存映射将文件映射进内存,绕开了user space和kernel space的二次拷贝,实现zero copy(针对user space)。
//正常读写方式
read(file, tmp_buf, len);
write(socket, tmp_buf, len);?
//mmap读写方式
tmp_buf = mmap(file, len);
write(socket, tmp_buf, len);
采用MesaageId查询消息不需要采用索引服务,从MesaageId可以解析出Broker地址和Commit Log Offset然后拉取消息。
metaq的IndexFile主要参考HashMap的实现,采用拉链法解决哈希冲突,每个slot倒序指向最新的索引即目前索引数量。
Header:记录落Broker时间戳、偏移量,槽位数目和索引个数
SlotTable:数组插槽,插槽位置 = key的Hash值 % 插槽数量,每个槽位记录当前索引总数
Index Linked List:插槽后接的链表结构,记录key的Hash值、物理偏移地址、落盘时间和哈希冲突后上一个索引地址
通过MessageKey检索消息:通过key定位slot,加锁从最大索引值开始倒序查找,比对hash值和落盘时间,返回一致时的物理偏移地址。
定义了三种发送方式:
SYNC:同步发送,发送方线程发送后同步堵塞等待SendResult,若failed则重试下一个broker。
ASYNC:异步发送,超时可抛出Timeout异常。
ONEWAY:单向发送,发送方不等待broker响应也没有回调函数触发,速度快但可靠性弱。
1、查询本地缓存是否存储了TopicPublishInfo,否则从NameServer获取。
2、根据选择策略获取待发送队列。
3、获取消息队列对应的broker实际IP。
4、设置消息Unique ID,zip压缩消息。
5、检查信息合法性,调用NettyClient发送消息
TopicPublishInfo包含队列优先级、消息队列列表、路由信息以及一个线程安全的index坐标。
发送线程以线程独立的方式自增,遍历MessageQueue选择一条待发送的MessageQueue。
volatile作用。
若启动容错策略(默认false)
1、通过ThreadLocalIndex遍历选出一条非faultItme的messageQueue。
2、若无非faultItme的broker,将faultItme按照可用<不可用,失败时长短<失败时长长,恢复时间点早<恢复时间点晚来排序,pickOneAtLeast遍历排序链表的前半部分选出一个broker。即至少不是最差策略 —— bad but not worst。
3、依次轮流写入broker中对应的messageQueue中,发端的负载均衡体现在这里。
异常情况:每次Producer发送失败时,维护并更新一个broker的faultItem的Map,则逻辑认为N秒内不可用,用于容错策略下的比较。
非异常情况下:看源码发现也会归为faultItem节点,延时设置为上一次从发出请求到响应的时长。
容错策略下 pickOneAtLeast逻辑:
? ? public String pickOneAtLeast() {
? ? ? ? final Enumeration elements = this.faultItemTable.elements();
? ? ? ? List tmpList = new LinkedList();
? ? ? ? while (elements.hasMoreElements()) {
? ? ? ? ? ? final FaultItem faultItem = elements.nextElement();
? ? ? ? ? ? tmpList.add(faultItem);
? ? ? ? }
? ? ? ? if (!tmpList.isEmpty()) {
? ? ? ? ? ? Collections.shuffle(tmpList);
? ? ? ? ? ? Collections.sort(tmpList);
? ? ? ? ? ? final int half = tmpList.size() / 2;
? ? ? ? ? ? if (half <= 0) {
? ? ? ? ? ? ? ? return tmpList.get(0).getName();
? ? ? ? ? ? } else {
? ? ? ? ? ? ? ? final int i = this.whichItemWorst.getAndIncrement() % half;
? ? ? ? ? ? ? ? return tmpList.get(i).getName();
? ? ? ? ? ? }
? ? ? ? }
? ? ? ? return null;
? ? }
? ? // For Class FaultItem
? ? public int compareTo(final LatencyFaultToleranceImpl.FaultItem other) {
? ? ? ? if (this.isAvailable() != other.isAvailable()) {
? ? ? ? ? ? if (this.isAvailable())
? ? ? ? ? ? ? ? return -1;
? ? ? ? ? ? if (other.isAvailable())
? ? ? ? ? ? ? ? return 1;
? ? ? ? }
? ? ? ? if (this.currentLatency < other.currentLatency)
? ? ? ? ? ? return -1;
? ? ? ? else if (this.currentLatency > other.currentLatency) {
? ? ? ? ? ? return 1;
? ? ? ? }
? ? ? ? if (this.startTimestamp < other.startTimestamp)
? ? ? ? ? ? return -1;
? ? ? ? else if (this.startTimestamp > other.startTimestamp) {
? ? ? ? ? ? return 1;
? ? ? ? }
? ? ? ? return 0;
? ? }
PushConsumer逻辑(PullConsumer不适用)
黄色箭头代表PullRequest的流向,蓝色箭头代表ConsumeRequest的流向,灰色箭头根据offset拉取消息。
RebalanceService确定consumer拉取的queue,为需要拉取的queue生成一个PullRequest,放入PullRequestQueue中,拉取消息的位置从nextOffset从Broker远程拉取。
PullMessageService不断从PullRequestQueue中消费PullRequest,根据nextOffset去broker拉取消息,若queue已经dropped则更新offset到broker并丢弃此拉消息请求。
PullMessageService异步拉取消息,同时将PullRequest封装在PullCallback中,PullCallback封装在ResponseFuture中,并以自增的请求id为键,ResponseFuture为值放入ResponseTable中。
Broker收到请求,如果offset之后有新的消息会立即发送异步响应;否则等待直到producer有新的消息发送后返回或者超时。如果通信异常或者Broker超时未返回响应,nettyClient会定时清理超时的请求,释放PullRequest回到PullRequestQueue。
用最新的offset更新ResponseFuture里的PullRequest并推送给PullRequestQueue里以进行下一次拉取。批量拉取到的消息分批提交给consumeExecutor线程处理。
第一种方式,是每次进行rebalance之后生成pullResult并调用ConsumeMessageService.submitConsumeRequest,该方式是拉取消息的起点
第二种方式,每次获取PullResult的状态,状态为FOUND则调用ConsumeMessageService.submitConsumeRequest将请求扔给ConsumeExecutor去启用线程消费,然后更新offset重新将pullRequest入队;其他状态如NO_NEW_MSG NO_MATHCHED_MSG则更新offset直接将pullRequest入队。
rebalance的触发情况:
默认waitInterval = 20000ms
启动MQConsumerInner调用rebalanceImmediately()
broker通知consumer,group改变调用rebalanceImmediately()
Consumer采用策略模式AllocateMessageQueueStrategy来定义不同的队列分配机制:
AllocateMessageQueueAveragely:平均分配队列策略,如下图:
A. queue数量小于消费者数量
B. 消费者数量大于queue数量,且queue数量 % 消费者数量 > 0?
B. 消费者数量大于或等于queue数量,且queue数量 % 消费者数量 = 0?
AllocateMessageQueueAveragelyByCircle:环状分配消息队列策略
AllocateMessageQueueByConfig:根据配置分配消息队列策略
AllocateMessageQueueByMachineRoom:
AllocateMessageQueueConsistentHash:一致性哈希算法分配策略
PullMessageService拉取线程不停的读取PullRequestQueue根据request拉取消息,然后将消息丢到ProcessQueue中并新建ConsumeRequest提交到ConsumeService处理, 然后生成下一批的PullRequest丢到PullRequestQueue,形成无限循环。ProcessQueue中以TreeMap形式保存待处理的消息,key为消息对应的offset,并自动进行排序:
private final TreeMap msgTreeMap = new TreeMap();
/**
* Concurrently max span offset.it has no effect on sequential consumption
*/
private int consumeConcurrentlyMaxSpan = 2000;
//metaq拉取流控
if (!this.consumeOrderly) {
? ? if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
? ? ? ? this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
? ? ? ? if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
? ? ? ? ? ? log.warn("the queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, pullRequest={}, flowControlTimes={}",processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), processQueue.getMaxSpan(),pullRequest, queueMaxSpanFlowControlTimes);
? ? ? ? }
? ? ? ? return;
? ? }
}
消息快速拉?。?/b>
metaq中拉取线程不需要等待消费线程的处理,一批消息拉取后未消费完可直接拉取第二批消息(消费成功或消费失败回发成功均会offset前移),拉取前判断msgTreeMap中最大offset值-最小offset值是否超过流控阈值,超过后延时50ms重新拉取。msgTreeMap每次会清除掉消费成功和消费失败回发成功的消息,剩下保存的是消费失败且未回发成功的消息,回发不成功会本地重试且远端offset不会前移(ack卡顿)。
分离拉取offset(PullRequest#nextOffset)和消费offset(OffsetStore#offset)分离拉取和处理进度,提升拉取效率,并根据消费者处理卡主情况做拉取阀值控制。拉取和处理分离,保证不丢数据,提升效率同时代价是重复消息。
offset分类:
switch (this.defaultMQPushConsumer.getMessageModel()) {
? ? case BROADCASTING: this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;
? ? case CLUSTERING: this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;
? ? default: break;
}
广播模式下采用offset本地保存,集群消费模式下采用offset远端保存(即broker端)。对于广播模式,每个queue对应所有的consumer,而consumer消费之间互相独立,故offset保存在consumer本地即可。对于集群模式,consumer之间消费需要broker进行协调,故offset保存在broker端。
offset、queue、consumer关系:
private ConcurrentMap offsetTable =
? ? ? ? new ConcurrentHashMap();
在集群模式下,多个消费者会负载到不同的消费队列上,因为消息消费进度是基于消息队列进行保存的,也就是不同的消费者之间的消费进度保存是不会存在并发的。
offset持久化触发:
定时offset持久化
拉取消息后触发offset持久化
分配消息队列触发offset持久化
更新最小offset:
consumerA拉取了queueA的offset:1-10批量消息,成功消费的消息会被剔除掉,剩2和8未成功消费且回发broker失败则update offset会设置为处理队列中最小的偏移量,来保证消息肯定能被消费成功。但由于外部原因(consumer宕机、新consumer加入等)会触发rebalance,导致queueA对应consumerB,2-10会被重复消费。这也是前面提到的:拉取和处理分离,保证不丢数据,提升效率同时代价是重复消息。
public void processConsumeResult(
? ? final ConsumeConcurrentlyStatus status,
? ? final ConsumeConcurrentlyContext context,
? ? final ConsumeRequest consumeRequest) {
? ? int ackIndex = context.getAckIndex();
? ? if (consumeRequest.getMsgs().isEmpty())
? ? ? ? return;
? ? switch (status) {
? ? ? ? case CONSUME_SUCCESS:
? ? ? ? ? ? if (ackIndex >= consumeRequest.getMsgs().size()) {
? ? ? ? ? ? ? ? ackIndex = consumeRequest.getMsgs().size() - 1;
? ? ? ? ? ? }
? ? ? ? ? ? int ok = ackIndex + 1;
? ? ? ? ? ? int failed = consumeRequest.getMsgs().size() - ok;
? ? ? ? ? ? this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
? ? ? ? ? ? this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
? ? ? ? ? ? break;
? ? ? ? case RECONSUME_LATER:
? ? ? ? ? ? ackIndex = -1;
? ? ? ? ? ? this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),
? ? ? ? ? ? ? ? consumeRequest.getMsgs().size());
? ? ? ? ? ? break;
? ? ? ? default:
? ? ? ? ? ? break;
? ? }
? ? switch (this.defaultMQPushConsumer.getMessageModel()) {
? ? ? ? case BROADCASTING:
? ? ? ? ? ? for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
? ? ? ? ? ? ? ? MessageExt msg = consumeRequest.getMsgs().get(i);
? ? ? ? ? ? ? ? log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
? ? ? ? ? ? }
? ? ? ? ? ? break;
? ? ? ? case CLUSTERING:
? ? ? ? ? ? List msgBackFailed = new ArrayList(consumeRequest.getMsgs().size());
? ? ? ? ? ? for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
? ? ? ? ? ? ? ? MessageExt msg = consumeRequest.getMsgs().get(i);
? ? ? ? ? ? ? ? boolean result = this.sendMessageBack(msg, context);
? ? ? ? ? ? ? ? if (!result) {
? ? ? ? ? ? ? ? ? ? msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
? ? ? ? ? ? ? ? ? ? msgBackFailed.add(msg);
? ? ? ? ? ? ? ? }
? ? ? ? ? ? }
? ? ? ? ? ? if (!msgBackFailed.isEmpty()) {
? ? ? ? ? ? ? ? consumeRequest.getMsgs().removeAll(msgBackFailed);
? ? ? ? ? ? ? ? this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
? ? ? ? ? ? }
? ? ? ? ? ? break;
? ? ? ? default:
? ? ? ? ? ? break;
? ? }
? ? long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
? ? if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
? ? ? ? this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
? ? }
}
广播模式:无论是否消费失败,不发回消息到Broker,只打印Log。
集群模式下:回发不成功会本地重试且远端offset不会前移,消费失败但发回Broker成功远端offset会前移。如果发回Broker 成功,结果因为例如网络异常,导致Consumer以为发回失败,判定消费发回失败会导致消息重复消费。
消费失败回发broker失败:
调用submitConsumeRequestLater延迟重新消费。
消费失败回发broker成功:
broker端修改topic和queueId,将数据写到SCHEDULE_TOPIC对应队列中,最终通过ScheduleMessageService的定时任务来进行处理。定时任务读取这些数据,修改topic为RETRY,交给commitLog存储,ReputService将消息写入RETRY队列中,默认集群模式的consumer会订阅Retry队列,然后消费掉这些消息。而如果一直这样重复消费都持续失败到一定次数(默认16次),就会投递到DLQ死信队列。应用可以监控死信队列来做人工干预。
rebalance导致重复消费:
假设新增消费者前,ConsumerA正在消费MessageQueue-M,消费到第3个offset,这个时候新增了ConsumerB,那么根据集群模式的AllocateMessageQueue的策略,可能MessageQueue-M被分配给了ConsumerB,这个时候ConsumerA由于消费的offset没有实时更新回去,会导致ConsumerB和ConsumerA之前的消费有重叠?;蛘呦咽О芑卮О艿氖焙騬ebalance也会导致重复消费。
发送时消息重复:
MQ Producer 发送消息场景下,消息已成功发送到服务端并完成持久化,此时网络闪断或者客户端宕机导致服务端应答给客户端失败。如果此时 MQ Producer 意识到消息发送失败并尝试再次发送消息,MQ 消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
投递时消息重复:
MQ Consumer 消费消息场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,MQ 服务端将在网络恢复后再次尝试投递之前已被处理过的消息,MQ 消费者后续会收到两条内容相同并且 Message ID 也相同的消息。
全局顺序
对于指定的一个 Topic,所有消息按照严格的先入先出(FIFO)的顺序进行发布和消费。?
分区顺序
对于指定的一个 Topic,所有消息根据 sharding key 进行区块分区。同一个分区内的消息按照严格的 FIFO 顺序进行发布和消费。Sharding key 是顺序消息中用来区分不同分区的关键字段,和普通消息的 Key 是完全不同的概念。?
性能对比:
Topic类型支持事务消息支持定时消息性能
无序消息是是最高
分区消息否否高
全局消息否否一般
发送方向 MQ 服务端发送消息;
MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
发送方开始执行本地事务逻辑。
发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息。
在断网或者是应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达 MQ Server,经过固定时间后 MQ Server 将对该消息发起消息回查。
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
发送方根据检查得到的本地事务的最终状态再次提交二次确认,MQ Server 仍按照步骤4对半消息进行操作。
事务消息发送对应步骤1、2、3、4,事务消息回查对应步骤5、6、7。
略......
broker有三种角色,ASYNC_MASTER、SYNC_MASTER和SLAVE,几种搭配方式:
ASYNC_MASTER、SLAVE:容许丢消息,但是要broker一直可用,master异步传输CommitLog到slave
SYNC_MASTER、SLAVE:不允许丢消息,master同步传输CommitLog到slave
ASYNC_MASTER:如果只是想简单部署则使用这种方式
在broker集群中每个master相互之间是独立,master之间不会有交互,每个master维护自己的CommitLog、自己的ConsumeQueue,但是每一个master都有可能收到同一个topic下的producer发来的消息