RabbitMQ学习笔记

本文作者:陈进坚
个人博客:https://jian1098.github.io
CSDN博客:https://blog.csdn.net/c_jian
简书:http://08643.cn/u/8ba9ac5706b6
联系方式:jian1098@qq.com

安装


windows

下载安装ErLang

https://www.erlang.org/downloads下载安装程序,一直按提示安装即可

配置环境变量

1.在用变量新建变量名为ERLANG_HOME,变量值为C:\Program Files\erl-23.1的变量,变量值根据自己的安装路径填

2.在PATH中添加%ERLANG_HOME%\bin,保存

3.在cmd命令行中输入erl并回车,看到版本号说明erlang安装成功

$ erl
Eshell V11.1  (abort with ^G)
下载安装RabbitMQ

https://www.rabbitmq.com/download.html下载安装程序,一直按提示安装即可

安装RabbitMQ-Plugins

进入RabbitMQ安装目录/sbin目录下,打开cmd命令行输入下面命令并回车

rabbitmq-plugins enable rabbitmq_management

然后再输入下面点命令并回车,会看到一堆信息

rabbitmqctl status
启动RabbitMQ

进入RabbitMQ安装目录/sbin目录下,双击rabbitmq-server.bat启动,然后在浏览器打开http://localhost:15672即可打开web管理界面,默认用户名和密码都是guest。在开始菜单也有RabbitMQ Service - start启动和RabbitMQ Service - stop结束快捷方式,RabbitMQ默认运行在本机的 5672端口。

Linux

消息队列


创建队列

先创建连接,然后设置队列属性创建队列,golang代码如下:

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    failOnError(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    failOnError(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "hello", // name
        false,   // durable ,RabbitMQ重启后数据会消失,要持久化必须开启这个参数
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    failOnError(err, "Failed to declare a queue")

生产消息

定义字符数组,然后调用Publish()方法发布消息,需要设置消息的具体参数,其中Body为消息内容,golang代码如下:

        body := "Hello World!"
        err = ch.Publish(
            "",     // exchange 交换机名称
            q.Name, // routing key
            false,  // mandatory
            false,  // immediate
            amqp.Publishing {
                ContentType: "text/plain",
                Body:        []byte(body),
            })
        failOnError(err, "Failed to publish a message")

消费消息

队列中的消息由消费者进行消费,同样需要设置消费参数然后调用Consume()方法,golang代码如下:

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack 自动确认消息,如果改为false,在下面逻辑业务结束后需要调用d.Ack(false)
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    failOnError2(err, "Failed to register a consumer")

消息的遍历

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            //d.Ack(false)  //手动消息确认时需要调用
        }
    }()

消息确认


当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中移除。这种情况,你只要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。

为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。

如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。

在golang代码中实现

msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  false,  // auto-ack   修改这个值
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)
    log.Printf("Done")
    d.Ack(false)    //手动确认
  }
}()

这里代码里将auto-ack设置为false,并且在业务结束后调用d.Ack(false)手动确认即可,工作者(worker)挂掉之后,重启RabbitMQ所有没有响应的消息都会重新发送。如果忘记确认RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经消息的消息 为了排除这种错误,你可以使用rabbitmqctl命令,输出messages_unacknowledged字段:

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

Windows上执行:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

数据持久化


如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。

队列持久化

在golang代码中实现

q, err := ch.QueueDeclare(
  "hello",      // name
  true,         // durable  持久化设置
  false,        // delete when unused
  false,        // exclusive
  false,        // no-wait
  nil,          // arguments
)
failOnError(err, "Failed to declare a queue")

这里设置durable参数为true,此时,已经确保即使RabbitMQ重新启动,task_queue队列也不会丢失。如果已经存在一个名为"hello"的非持久化队列,重新改为持久化的话会报错,必须修改name或者清空队列。

消息持久化

在golang代码中实现

err = ch.Publish(
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,
    ContentType:  "text/plain",
    Body:         []byte(body),
})

通过设置amqp.Publishingamqp.Persistent属性即可。

注意:将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果您需要更强的保证,那么您可以使用使用后面的发布订阅功能。

公平调度


如果有多个消费者,并且有的消费者处理消息比较繁忙,有的处理消息比较轻松,可以设置计数器,让RabbitMQ一次只向一个worker发送一条消息?;痪浠八?,在处理并确认前一个消息之前,不要向正在工作人员发送新消息。

在golang代码中实现

err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)
failOnError(err, "Failed to set QoS")

这里置预取计数值为1即可。

订阅发布


创建交换机

发布者(producer)不会直接发送任何消息给队列,只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。有几个可供选择的交换机类型:direct, topic, headersfanout。

显示交换机列表

sudo rabbitmqctl list_exchanges

在golang代码中创建交换机

err = ch.ExchangeDeclare(
  "logs",   // name
  "fanout", // type
  true,     // durable
  false,    // auto-deleted
  false,    // internal
  false,    // no-wait
  nil,      // arguments
)

name为交换机名称,type为交换机类型。前面的教程中我们对交换机一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串("")的匿名交换机。如果要发送消息到指定队列中,在调用Publish()生产消息时指定exchange字段即可,不过该指定的交换机必须是创建好的。

绑定交换机

我们已经创建了一个fanout交换机和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。交换机和队列之间的联系我们称之为绑定(binding)。golang代码实现:

err = ch.QueueBind(
  q.Name, // queue name 队列名
  "",     // routing key
  "logs", // exchange   交换机名
  false,
  nil,
)

绑定好交易机之后生产者将消息存放在指定队列,在调用Consume()方法消费时只要指定queue队列名就能获取指定队列消息。

路由


在上面绑定交换机的设置中可以看到有一个参数routing key是空的,这个参数就是路由

绑定路由

绑定(binding)是指交换机(exchange)和队列(queue)的关系??梢约虻ダ斫馕赫飧龆恿校╭ueue)对这个交换机(exchange)的消息感兴趣。绑定的时候可以带上一个额外的routing_key参数。为了避免与Channel.Publish的参数混淆,我们把它叫做绑定键binding key。绑定键的意义取决于交换机(exchange)的类型。我们之前使用过fanout 交换机会忽略这个值,使用direct 交换机来代替这个值会生效。

每个队列可以有多个绑定键,每个交换机可以设置相同的或多个绑定键到各个队列中。原理如下图

image

其中P为生产者,X为交换机,amqp为队列,C为消费者

订阅路由

绑定好路由之后就可以订阅指定路由的队列,在消费队列的队列绑定QueueBind()方法中指定routing key的名称即可,golang代码实现:

for _, s := range os.Args[1:] {
                log.Printf("Binding queue %s to exchange %s with routing key %s",
                        q.Name, "logs_direct", s)
                err = ch.QueueBind(
                        q.Name,        // queue name
                        s,             // routing key   指定路由名称
                        "logs_direct", // exchange
                        false,
                        nil)
                failOnError(err, "Failed to bind a queue")
        }

        msgs, err := ch.Consume(
                q.Name, // queue
                "",     // consumer
                true,   // auto ack
                false,  // exclusive
                false,  // no local
                false,  // no wait
                nil,    // args
        )
        failOnError(err, "Failed to register a consumer")

主题交换机

发送到topic交换机的消息不可以携带随意routing_key,它的routing_key必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。

topic交换机背后的逻辑跟direct交换机很相似 —— 一个携带着特定routing_key的消息会被topic交换机投递给绑定键与之想匹配的队列。但是它的binding key和routing_key有两个特殊应用方式:

  • * (星号) 用来表示一个单词.
  • # (井号) 用来表示任意数量(零个或多个)单词。

下边用图说明:


None

这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的: <celerity>.<colour>.<species>

我们创建了三个绑定:Q1的绑定键为 *.orange.*,Q2的绑定键为 *.*.rabbitlazy.#

这三个绑定键被可以总结为:

  • Q1 对所有的桔黄色动物都感兴趣。
  • Q2 则是对所有的兔子所有懒惰的动物感兴趣。

一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。

如果我们违反约定,发送了一个携带有一个单词或者四个单词("orange" or "quick.orange.male.rabbit")的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。

但是另一方面,即使 "lazy.orange.male.rabbit" 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

参考文档

http://rabbitmq.mr-ping.com/tutorials_with_golang

?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容