Rocketmq源码分析01:搭建源码调试环境

Rocketmq源码分析01:搭建源码调试环境

转载地址:https://mp.weixin.qq.com/s/waDzMr4rOaC_NlxSqF0YCg

1. 基本架构

RocketMQ架构上主要分为四部分,如下图所示:

图片
  • Producer:消息发布的角色,支持分布式集群方式部署。Producer通过MQ的负载均衡??檠≡裣嘤Φ?code>Broker集群队列进行消息投递,投递的过程支持快速失败并且低延迟。

  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费,它提供实时消息订阅机制,可以满足大多数用户的需求。

  • NameServerNameServer是一个非常简单的Topic路由注册中心,其角色类似Dubbo中的zookeeper,支持Broker的动态注册与发现。主要包括两个功能:

    NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,Broker仍然可以向其它NameServer同步其路由信息,Producer,Consumer仍然可以动态感知Broker的路由的信息。

    • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
    • 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后ProducerConumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
  • BrokerServerBroker主要负责消息的存储、投递和查询以及服务高可用保证,为了实现这些功能,Broker包含了以下几个重要子??椋?/p>

    • Client Manager:负责管理客户端(Producer/Consumer)和维护ConsumerTopic订阅信息
    • Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
    • HA Service:高可用服务,提供Master BrokerSlave Broker之间的数据同步功能。
    • Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
    • Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
图片

2. 获取源码

rocketMq项目的github仓库为https://github.com/apache/rocketmq.git,由于网络原因,我们并不会直接使用github仓库,而是将其导入到gitee上,只需在gitee创建新仓库时,选择导入已有仓库即可:

图片

导入到gitee后,就可以进行checkout了,本文对应的gitee仓库为https://gitee.com/funcy/rocketmq.git

checkout源码到本地后,默认是master分支,本人习惯基于tag创建自己的分支,然后在自己的分支上进行分析,rocketMqtag如下:

图片

最新版本是4.8.0,我们将基于此tag创建新分支,使用的命令如下:

# 切换到 rocketmq-all-4.8.0
git checkout rocketmq-all-4.8.0
# 基于 rocketmq-all-4.8.0 创建自己的分析,名称为 rocketmq-all-4.8.0-LEARN
git checkout -b rocketmq-all-4.8.0-LEARN
# 将 rocketmq-all-4.8.0-LEARN 分支推送到远程仓库
git push -u origin rocketmq-all-4.8.0-LEARN

接下来,我们所有的操作都是在rocketmq-all-4.8.0-LEARN分支上进行了。

3. 本地启动

拿到代码后,我们就开始进行本地启动了,没错,就是在idea中进行启动。

3.1 复制conf目录

在启动项目前,我们需要进行一些配置,rocketMq项目的配置文件位于rocketmq/distribution??橄碌?code>conf目录中,直接整个复制到rocketmq目录下:

图片

也不需要改动,复制出来就行了,这些配置的内容后面分析源码时再讲解吧。

3.2 启动nameServer

nameServer的主类为org.apache.rocketmq.namesrv.NamesrvStartup

图片

如果我们直接运行main()方法,会报错:

图片

报错信息已经很明确了,需要我们配置ROCKETMQ_HOME目录,我们在idea中进行配置即可:

打开配置界面:

图片

填写ROCKETMQ_HOME配置:

图片

这里我填写的是ROCKETMQ_HOME=/Users/chengyan/IdeaProjects/myproject/rocketmq,这个ROCKETMQ_HOME路径就是conf文件夹所在的目录。

填写好后,就可以启动了:

图片

3.3 启动broker

broker的主类为org.apache.rocketmq.broker.BrokerStartup,启动方式与nameServer很相似,启动前也要配置ROCKETMQ_HOME路径:

图片

Idea 老版本配置


image.png

相比于nameServer,这里多配置了启动参数:

-n localhost:9876 autoCreateTopicEnable=true

这个启动参数是指定nameServer的地址,以及开启自动创建topic的功能。

配置完成之后就可以启动了:

图片

3.4 启动管理后台

rocketMq的管理后台在另一个仓库https://github.com/apache/rocketmq-externals,除了后台,这个仓库还包含了许多的其他模块:

图片

我们并不需要分析这个项目,源码本可以不必下载,但我在找这个项目的release版本时,发现并没有提供已编译好的jar包,需要自己构建代码,因此我就再次下载了这个代码源码。当然,由于网络的原因,这个项目的源码也被我导入到了gitee上,地址为https://gitee.com/funcy/rocketmq-externals.git.

这个项目的代码我们并不分析,因此直接在master分支上操作即可,

管理后台项目为rocketmq-console,主类为org.apache.rocketmq.console.App

图片

在启动前,我们需要修改下application.properties的配置,找到rocketmq.config.namesrvAddr配置,添加nameServer的ip与端口,这里我们连接的是本地应用,直接填写localhost:9876

...
rocketmq.config.namesrvAddr=localhost:9876
...

启动,结果如下:

图片

访问http://localhost:8080,结果如下:

图片

可以看到broker已经出现在cluster列表中了,这就表明启动成功了。

4. 收发消息测试

rocketMq项目的example??橄掠写罅康牟馐允纠?,我们选择其一进行消息收发测试。

4.1 启动Consumer

我们先找到org.apache.rocketmq.example.simple.PushConsumer,代码如下:

public class PushConsumer {

    public static void main(String[] args) 
            throws InterruptedException, MQClientException {
        String nameServer = "localhost:9876";
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.setNamesrvAddr(nameServer);
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, 
                    ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", 
                    Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

这个Consumer监听的topicTopicTest,后面我们就会往这个topic发送消息。另外,需要注意nameServer的配置,我们是在本地启动的nameServer,因此这里配置的是localhost:9876。

运行main()方法,结果如下:

图片

4.2 启动Producer

我们找到 org.apache.rocketmq.example.simple.Producer 类,代码如下:

public class Producer {

    public static void main(String[] args) 
            throws MQClientException, InterruptedException {
        String nameServer = "localhost:9876";
        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.setNamesrvAddr(nameServer);
        producer.start();

        for (int i = 0; i < 10; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

同样地,这里使用的是的nameServer地址是localhost:9876,topicTopicTest,运行,结果如下:

图片

再回过头看看PushConsumer的控制台:

图片

可以看到,Producer发送消息成功了,PushConsumer也成功获取到消息了。

4.3 异常分析

如图所示:

图片

如果出现异常:

org.apache.rocketmq.client.exception.MQClientException: 
No route info of this topic: TopicTest

这表明当前broker中没有TopicTesttopic,这时我们可以手动创建topic,也可以在启动时指定autoCreateTopicEnable=true.

如果是按上面步骤进行的,请确认下org.apache.rocketmq.broker.BrokerStartup是否配置启动参数

-n localhost:9876 autoCreateTopicEnable=true

配置方式就按3.3节的方式配置就行了。

5. 总结

本文主要介绍了rocketMq的基本架构,通过源码展示了rocketMq的启动方式,最后通过rocketMq项目下example??橹械牟馐源胝故玖讼⒌氖辗⒐?。

总的来说,本文还是在准备源码分析的环境,下篇文章开始,我们就正式开始rocketMq的源码分析了。

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,992评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,212评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,535评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,197评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,310评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,383评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,409评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,191评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,621评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,910评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,084评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,763评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,403评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,083评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,318评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,946评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,967评论 2 351