RocketMQ的消费模式有2种
查看一下源码,在默认情况下,就是集群消费(CLUSTERING)。另一种消费模式,是广播消费(BROADCASTING)。
其实,对于RocketMQ而言,通过ConsumeGroup的机制,实现了天然的消息负载均衡!通俗点来说,RocketMQ中的消息通过ConsumeGroup实现了将消息分发到C1/C2/C3/……的机制,这意味着我们将非常方便的通过加机器来实现水平扩展!
至于消息分发到C1/C2/C3,其实也是可以设置策略的:
默认的分配算法是AllocateMessageQueueAveragely
还有另外一种平均的算法是AllocateMessageQueueAveragelyByCircle,也是平均分摊每一条queue,只是以环状轮流分queue的形式,如下图:
广播消费,类似于ActiveMQ中的发布订阅模式,消息会发给Consume Group中的每一个消费者进行消费。??由于广播模式下要求一条消息需要投递到一个消费组下面所有的消费者实例,所以也就没有消息被分摊消费的说法。
/**
* Consumer,订阅消息
*/
public classConsumer2{??
? public static void main(String[] args) throws InterruptedException, MQClientException {
? ? ? ? DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
? ? ? ? consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
? ? ? ? consumer.setConsumeMessageBatchMaxSize(10);
? ? ? ? // 设置为广播消费模式? ? ? ??
? ? ? ? consumer.setMessageModel(MessageModel.BROADCASTING);
? ? ? ? consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
? ? ? ? consumer.subscribe("TopicTest", "*");
? ? ? ? consumer.registerMessageListener(new MessageListenerConcurrently() {
? ? ? ? ? ? public ConsumeConcurrentlyStatus consumeMessage(List msgs, ConsumeConcurrentlyContext context) {
? ? ? ? ? ? ? ? try {
? ? ? ? ? ? ? ? ? ? for (MessageExt msg : msgs) {
? ? ? ? ? ? ? ? ? ? ? ? System.out.println(" Receive New Messages: " + msg);
? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? } catch (Exception e) {
? ? ? ? ? ? ? ? ? ? e.printStackTrace();
? ? ? ? ? ? ? ? ? ? return ConsumeConcurrentlyStatus.RECONSUME_LATER;? // 重试? ? ? ? ??
? ? ? ? ? ? ? ? ?}
? ? ? ? ? ? ? ? return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;? ? ? // 成功? ??
? ? ? ? ? ? ? ? }
? ? ? ? });
? ? ? ? consumer.start();
? ? ? ? System.out.println("Consumer Started.");
? ? }
}