本文作者:陈进坚
个人博客:https://jian1098.github.io
CSDN博客:https://blog.csdn.net/c_jian
简书:http://08643.cn/u/8ba9ac5706b6
联系方式:jian1098@qq.com
安装
windows
下载安装ErLang
到https://www.erlang.org/downloads
下载安装程序,一直按提示安装即可
配置环境变量
1.在用变量新建变量名为ERLANG_HOME
,变量值为C:\Program Files\erl-23.1
的变量,变量值根据自己的安装路径填
2.在PATH
中添加%ERLANG_HOME%\bin
,保存
3.在cmd命令行中输入erl
并回车,看到版本号说明erlang
安装成功
$ erl
Eshell V11.1 (abort with ^G)
下载安装RabbitMQ
到https://www.rabbitmq.com/download.html
下载安装程序,一直按提示安装即可
安装RabbitMQ-Plugins
进入RabbitMQ安装目录/sbin
目录下,打开cmd命令行输入下面命令并回车
rabbitmq-plugins enable rabbitmq_management
然后再输入下面点命令并回车,会看到一堆信息
rabbitmqctl status
启动RabbitMQ
进入RabbitMQ安装目录/sbin
目录下,双击rabbitmq-server.bat
启动,然后在浏览器打开http://localhost:15672
即可打开web管理界面,默认用户名和密码都是guest
。在开始菜单也有RabbitMQ Service - start
启动和RabbitMQ Service - stop
结束快捷方式,RabbitMQ默认运行在本机的 5672
端口。
Linux
略
消息队列
创建队列
先创建连接,然后设置队列属性创建队列,golang代码如下:
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
q, err := ch.QueueDeclare(
"hello", // name
false, // durable ,RabbitMQ重启后数据会消失,要持久化必须开启这个参数
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
生产消息
定义字符数组,然后调用Publish()方法发布消息,需要设置消息的具体参数,其中Body
为消息内容,golang代码如下:
body := "Hello World!"
err = ch.Publish(
"", // exchange 交换机名称
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing {
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
消费消息
队列中的消息由消费者进行消费,同样需要设置消费参数然后调用Consume()方法,golang代码如下:
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack 自动确认消息,如果改为false,在下面逻辑业务结束后需要调用d.Ack(false)
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError2(err, "Failed to register a consumer")
消息的遍历
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
//d.Ack(false) //手动消息确认时需要调用
}
}()
消息确认
当处理一个比较耗时得任务的时候,你也许想知道消费者(consumers)是否运行到一半就挂掉。当前的代码中,当消息被RabbitMQ发送给消费者(consumers)之后,马上就会在内存中移除。这种情况,你只要把一个工作者(worker)停止,正在处理的消息就会丢失。同时,所有发送到这个工作者的还没有处理的消息都会丢失。我们不想丢失任何任务消息。如果一个工作者(worker)挂掉了,我们希望任务会重新发送给其他的工作者(worker)。
为了防止消息丢失,RabbitMQ提供了消息响应(acknowledgments)。消费者会通过一个ack(响应),告诉RabbitMQ已经收到并处理了某条消息,然后RabbitMQ就会释放并删除这条消息。
如果消费者(consumer)挂掉了,没有发送响应,RabbitMQ就会认为消息没有被完全处理,然后重新发送给其他消费者(consumer)。这样,即使工作者(workers)偶尔的挂掉,也不会丢失消息。
在golang代码中实现
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack 修改这个值
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false) //手动确认
}
}()
这里代码里将auto-ack
设置为false
,并且在业务结束后调用d.Ack(false)
手动确认即可,工作者(worker)挂掉之后,重启RabbitMQ所有没有响应的消息都会重新发送。如果忘记确认RabbitMQ将会占用越来越多的内存,因为它无法释放任何未经消息的消息 为了排除这种错误,你可以使用rabbitmqctl
命令,输出messages_unacknowledged
字段:
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Windows上执行:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
数据持久化
如果你没有特意告诉RabbitMQ,那么在它退出或者崩溃的时候,将会丢失所有队列和消息。为了确保信息不会丢失,有两个事情是需要注意的:我们必须把“队列”和“消息”设为持久化。
队列持久化
在golang代码中实现
q, err := ch.QueueDeclare(
"hello", // name
true, // durable 持久化设置
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
这里设置durable
参数为true
,此时,已经确保即使RabbitMQ重新启动,task_queue
队列也不会丢失。如果已经存在一个名为"hello"的非持久化队列,重新改为持久化的话会报错,必须修改name
或者清空队列。
消息持久化
在golang代码中实现
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent,
ContentType: "text/plain",
Body: []byte(body),
})
通过设置amqp.Publishing
的amqp.Persistent
属性即可。
注意:将消息设为持久化并不能完全保证不会丢失。以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果您需要更强的保证,那么您可以使用使用后面的发布订阅功能。
公平调度
如果有多个消费者,并且有的消费者处理消息比较繁忙,有的处理消息比较轻松,可以设置计数器,让RabbitMQ一次只向一个worker发送一条消息?;痪浠八?,在处理并确认前一个消息之前,不要向正在工作人员发送新消息。
在golang代码中实现
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
这里置预取计数值为1即可。
订阅发布
创建交换机
发布者(producer)不会直接发送任何消息给队列,只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。有几个可供选择的交换机类型:direct
, topic
, headers
和fanout
。
显示交换机列表
sudo rabbitmqctl list_exchanges
在golang代码中创建交换机
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
name
为交换机名称,type
为交换机类型。前面的教程中我们对交换机一无所知,但仍然能够发送消息到队列中。因为我们使用了命名为空字符串("")的匿名交换机。如果要发送消息到指定队列中,在调用Publish()生产消息时指定exchange
字段即可,不过该指定的交换机必须是创建好的。
绑定交换机
我们已经创建了一个fanout
交换机和一个队列。现在我们需要告诉交换机如何发送消息给我们的队列。交换机和队列之间的联系我们称之为绑定(binding)。golang代码实现:
err = ch.QueueBind(
q.Name, // queue name 队列名
"", // routing key
"logs", // exchange 交换机名
false,
nil,
)
绑定好交易机之后生产者将消息存放在指定队列,在调用Consume()方法消费时只要指定queue
队列名就能获取指定队列消息。
路由
在上面绑定交换机的设置中可以看到有一个参数routing key
是空的,这个参数就是路由
绑定路由
绑定(binding)是指交换机(exchange)和队列(queue)的关系??梢约虻ダ斫馕赫飧龆恿校╭ueue)对这个交换机(exchange)的消息感兴趣。绑定的时候可以带上一个额外的routing_key
参数。为了避免与Channel.Publish
的参数混淆,我们把它叫做绑定键binding key
。绑定键的意义取决于交换机(exchange)的类型。我们之前使用过fanout
交换机会忽略这个值,使用direct
交换机来代替这个值会生效。
每个队列可以有多个绑定键,每个交换机可以设置相同的或多个绑定键到各个队列中。原理如下图
其中P为生产者,X为交换机,amqp为队列,C为消费者
订阅路由
绑定好路由之后就可以订阅指定路由的队列,在消费队列的队列绑定QueueBind()方法中指定routing key
的名称即可,golang代码实现:
for _, s := range os.Args[1:] {
log.Printf("Binding queue %s to exchange %s with routing key %s",
q.Name, "logs_direct", s)
err = ch.QueueBind(
q.Name, // queue name
s, // routing key 指定路由名称
"logs_direct", // exchange
false,
nil)
failOnError(err, "Failed to bind a queue")
}
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto ack
false, // exclusive
false, // no local
false, // no wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
主题交换机
发送到topic
交换机的消息不可以携带随意routing_key
,它的routing_key必须是一个由.
分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。
topic
交换机背后的逻辑跟direct
交换机很相似 —— 一个携带着特定routing_key的消息会被topic交换机投递给绑定键与之想匹配的队列。但是它的binding key和routing_key有两个特殊应用方式:
-
*
(星号) 用来表示一个单词. -
#
(井号) 用来表示任意数量(零个或多个)单词。
下边用图说明:
这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.
分割开。路由键里的第一个单词描述的是动物的手脚的利索程度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的: <celerity>.<colour>.<species>
。
我们创建了三个绑定:Q1的绑定键为 *.orange.*
,Q2的绑定键为 *.*.rabbit
和 lazy.#
。
这三个绑定键被可以总结为:
- Q1 对所有的桔黄色动物都感兴趣。
- Q2 则是对所有的兔子和所有懒惰的动物感兴趣。
一个携带有 quick.orange.rabbit
的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant
的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox
的消息会投递给第一个队列,携带有 lazy.brown.fox
的消息会投递给第二个队列。携带有 lazy.pink.rabbit
的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox
的消息不会投递给任何一个队列。
如果我们违反约定,发送了一个携带有一个单词或者四个单词("orange"
or "quick.orange.male.rabbit"
)的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。
但是另一方面,即使 "lazy.orange.male.rabbit"
有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。