消息队列
什么是消息队列(Message Queue,MQ)呢?
首先回忆下生活中在餐馆点餐的场景,当你点完餐之后老板会给一个号牌,每个人都按照自己付款拿到的号牌顺序排队等待叫号。实际上,这里的柜台就充当着消息队列的角色??突У却甙讯┎偷南⒎⑺偷焦裉聪⒍恿兄泻?,从中取餐即消费消息。整个流程实际上是消息被发送到队列中,又成功被消费者消费的过程。
回过头来理解下消息队列,首先需要了解什么是消息,什么是队列。
消息Message
消息是软件对象之间交互和通讯的数据,消息可以是简单的文本字符串也可以是复杂的对象。
消息作为消息队列中最基本的概念,本质上是一段数据,能被一个或多个应用程序所理解,是应用程序之间传递信息的载体。
在消息队列中,把应用程序交由消息队列传输的数据定义为消息,可以定义消息的内容并对消息进行广义的理解。
消息由两部分组成
- 消息描述符(
Message Description
或Message Handler
)
用于描述消息的特征,如消息的优先级、生命周期、消息ID等。 - 消息体(
Message Body
)
即用户数据部分
在消息队列中消息分为两种类型
- 非永久性消息(
non-persistent
)
非永久性消息是存储在内存中,为了提高性能而设计的,当系统掉电或消息队列管理器重新启动时将不可恢复。如果用户对消息的可靠性要求不高而侧重系统的性能表现时,可采用此种类型的消息。 - 永久性消息(
persistent
)
永久性消息是存储在磁盘上并记录数据日志的,具有高可靠性,在网络和系统发生故障等情况下能够确保消息不丢失。
队列Queue
队列是一种特殊的线性表,和栈一样是一种操作受限的线性表,特殊之处在于队列只允许在表的头部front
删除在尾部rear
插入。进行插入操作的一端称为队尾rear
,进行删除操作的一端称为队头front
。
队列是一种先进先出的数据结构,队列可以理解为管道,消息队列也就是以管道的方式做消息传递。
例如:Redis提供的list
数据结构就非常适合做消息队列,Redis中的List
本质上是一个双向链表。zset
有序集合也可以用来做消息队列的容器。
Redis中可以使用自带的publish
和subscribe
命令完成消息推送和消息拉取,进而实现消息队列。但这种方式有一个缺陷是消费者必须一直在线,否则会出现消费遗漏。
使用Redis实现的轻量化的消息队列有三大优势
- Redis已经广泛应用于各大系统无需再次引入其它第三方框架和API
- Redis是基于内存存储的,生产者和消费者的存取速度都非???/li>
- Redis集群的容量可以通过添加实例进行扩展
一个轻量化的消息队列需要满足的条件包括
- 消费顺序保持跟生产顺序一致
- 对于广播消息,某个消费者实例重启后能重新收到消息
- 定时清理所有消费者都已经消费过的数据,防止容量无限增长。
消息队列可以简单理解为将需要传输的数据放入队列中,把数据放入消息队列的一端称为生产者(productor
),从消息队列中取出数据的一端叫做消费者(consumer
)。
消息队列基本结构
-
Broker
消息队列服务器 -
Producer
消息生产者,发送消息到消息队列 -
Consumer
消息消费者,从消息队列中接收消息
也可以将消息队列视为一个存放消息的容器,当需要使用消息时可以从中取出供自己使用。队列的目的是提供路由并保证消息的传递,如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。
简单来说,消息队列是指利用高效可靠的消息传输机制及运行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
消息队列通过提供消息传递和消息排队模型,可以在分布式环境下提供应用解耦、弹性伸缩、冗余存储、流量削峰、异步通信、数据同步等功能,其作为分布式系统架构中的一个重要组件,有着举足轻重的地位。
消息队列是分布式系统中重要的组件,应用于当不需要立即获得结果但并发量又需要进行控制时的场景,主要解决了应用耦合、异步处理、流量削峰等问题。消息队列利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。
消息队列是一种异步的服务将通信方式,适用于无服务器和微服务架构,消息在被处理和删除之前一直存储在队列上。每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批处理工作以及解耦高峰期工作负荷。
借助于消息队列,系统的不同部分可以相互通信并异步执行处理操作。消息队列提供了一个临时存储消息的轻量级缓冲区,以及允许软件组件连接到队列以发送和接收消息的终端节点。这些消息通常较小,可以是请求、恢复、错误消息、明文信息等。
消息队列有什么特性呢?
- 业务无关
一个具有普适性质的消息队列组件不需要考虑上层的业务模型,只做好消息的分发就可以了,上层业务的不同??榉炊枰览迪⒍恿兴ㄒ宓墓娣督型ㄐ拧?/li> - FIFO 先进先出
先投递先到达是消息队列和buffer
的本质区别 - 容灾
对于普适的消息队列组件而言,节点的动态删减和消息的持久化都是支持容灾能力的重要基本特性。 - 性能
消息队列的吞吐量上去了整个系统的内部通信效率也会有提高
推拉模型
消息队列的推拉模型
-
push
推消息模型
消息生产者将消息发送给消息中间件,消息中间件再将消息推送给消费者。
push
模型最大的问题是慢消费,也就是说消费者消费速度比生产者的生产速度慢,将会导致在broker
的堆积,如果这些消息是有用且无法丢掉的就会一直在broker
中保存,而且broker
会不断地给消费者推送消息,消费者reject
或error
之后可能会来回推送。
-
pull
拉消息模型
消费者请求消息中间件并接收消息,消费者从消息中间件拉取消息。
pull
模型中消费者可以按需消费,不用担心自己处理不了的信息来骚扰自己,broker
堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量即可。由于主动权在消费方,消费方无法确切的决定何时去拉去最新的消息,如果一次拉取了消息还可以继续去拉,如果没有拉取则需要等待一段时间重新拉取。
pull
模型最大的问题是消息延迟和忙等,业界比较成熟的做法是从短时间开始(不会对broker
有太大负担),然后指数级增长等待。比如开始等5毫秒,然后10毫秒,然后20毫秒...,直到有消息到来,然后再回到5毫秒。
优缺点
为什么需要消息队列呢?
当系统中出现生产和消费的速度或稳定性等因素不一致的时候就需要消息队列,作为抽象层,弥合双方的差异。
关键在于:解耦、异步、削峰
在高并发环境下,由于来不及同步处理,请求往往会发生阻塞。
例如:大量的插入、更新语句类的请求同时达到数据库将会直接导致无数的行锁表锁,最后甚至会因为请求堆积过多从而触发连接数不足的错误。通过使用消息队列可以异步的处理请求从而缓解系统的压力。
通过异步处理可以提高系统性能
例如:在不使用消息队列的时候,用户请求的数据将会直接写入数据库,在高并发情况下数据库压力倍增,响应速度变慢。使用消息队列之后,用户请求数据发送给消息队列后立即返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库,因此响应速度得到大幅改善。
通过异步处理将短时间高并发产生的事务消息存储在消息队列中从而削平高峰期的并发事务。
消息队列的特点是什么?
- 消息队列把请求的压力保存一下然后逐渐释放出来,按照自己的节奏来处理。
- 消息队列引入了新的节点,系统的可靠性会受到消息队列节点的影响。
- 消息队列是异步单向的消息,发送消息被设计成无需等待处理的完成。
消息队列有什么缺点呢?
- 系统可用性降低
- 系统复杂性增加
- 数据一致性问题
应用场景
消息队列有哪些应用场景?当需要使用消息队列时首先需要考虑它的必要性,可以使用消息队列的场景很多,最常用的如应用程序松耦合、异步处理模式、发布与订阅、最终一致性、错峰流控、日志缓冲等。反知,如果需要强一致性,关注业务逻辑的处理结构则使用RPC
显得更为合适。
应用耦合
应用系统之间解耦和主要体现在发送者和接收者不必了解对象只需要确认消息,另外发送者接收者不必同时在线。
多应用之间通过消息队列对同一消息进行处理,避免调用接口失败导致整个过程失败。
例如:用户下单后订单系统需要通知库存系统,传统的做法是订单系统调用库存系统的接口,缺点在于如果库存系统无法访问订单减库存将失败,从而导致订单失败。
问题的根源在于订单系统与库存系统紧密耦合,如何解决这个问题呢?引入消息队列,订单系统在用户下单后完成持久化处理,并将消息写入消息队列然后返回下单成功??獯嫦低扯┰南碌ハ?,采用拉/推的方式,获取下单信息,然后根据下单信息进行库存操作。
如果在下单时库存系统不能正常使用,也不会影响正常下单。因为下单后订单系统写入消息队列后就不再关系其他的后续操作了,因此实现了订单系统与库存系统的应用解耦。
异步处理
异步处理的主要目的在于:非核心流程异步化,减少系统响应时间,提高吞吐量
消息发送者可以发送一个消息而无需等待响应,消息发送者将消息发送到一条虚拟的通道(主题或队列)上,消息接收者则订阅或监听该通道。一条消息可能最终转发给一个或多个消息接收者,这些接收者都无需对消息发送者做出同步回应,整个过程都是异步的。
多应用对消息队列中同一消息进行处理,应用间并发处理消息,相比串行处理能减少处理时间。
例如:用户使用应用进行注册,系统需发送注册邮件并验证短信。
对注册的处理有两种方式:串行、并行
-
串行方式:用户注册信息写入数据库后,先发送注册邮件,再发送验证短信,最终发送短信验证码给客户端。
并行方式:用户注册信息写入数据库后,同时发送短信和发送邮件并行处理。发短信和发邮件处理完毕后返回给客户端。
假如三个业务节点每个的使用均为50毫秒,不考虑网络等其他开销,那么串行方式的时间会是150毫秒,并行的方式可能是100毫秒。由于CPU在单位时间内处理请求数量是一定的,假如CPU在1秒内吞吐量是100次,则串行方式下1秒内的CPU可处理的请求是100 * 10 / 150 约7次,并行方式处理的请求数量是100 * 10 / 100约10次。
使用传统方式系统的性能(并发量、吞吐量、响应时间等)存在瓶颈,应该如何解决这个问题呢?
引入消息队列,用户注册信息写入数据库后,写入消息队列后立即返回给客户端。总的响应时间依赖于写入消息队列的时间,由于写入消息队列的时间本身很快基本可忽略不计。
按照之前约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。系统的吞吐量提高到100 * 10 / 50约每秒20QPS,相比串行提高了3倍,相比并行提高了2倍。
限流削峰
流量削峰是消息队列常用场景,应用于秒杀或抢购活动中,避免流量过大导致应用系统性能挂掉的情况。
例如:购物网站开展秒杀活动,由于瞬间访问量过大服务器接收过大会导致流量暴增,相关系统无法处理请求甚至崩溃。加入消息队列后,系统从消息队列中取数据,相当于消息队列做了一次缓冲。
这样做的优势在于请求先写入消息队列而非由业务处理系统直接处理,做了一次缓冲,极大地减少了业务处理系统的压力。其次,队列长度是可以做限制的,也就是说可以控制活动的人数。
事实上,秒杀时后入队的用户无法秒杀到商品,这些请求可以直接被抛弃,返回活动已经结束或商品已经售完等信息。消息队列的长度一旦超过最大限制数量会直接抛弃用户请求或跳转到错误页面。秒杀业务根据消息队列中的请求信息再做后续处理。
消息驱动的系统
消息驱动的系统分为消息队列、消息生产者、消息消费者,生产者负责产生消息,消费者负责对消息进行处理。
例如:用户新上传一批照片,人脸识别系统需要对用户的所有照片进行聚类,聚类完毕后由对账系统重新生成用户的人脸索引以加快查询。这三个子系统间由消息队列连接起来,前一阶段的处理结果放入队列中,后一阶段从队列中获取消息继续处理。这样做的好处在于避免了直接调用下一个系统导致当前系统失败。其次每个子系统对于消息的处理方式可以更为灵活,可以选择收到消息时就处理,也可以划分时间段按不同的处理速度进行处理。
确认机制
消息的确认机制是什么?
消息的ACK(Ackownledge)
确认机制,为了保证消息不丢失,消息队列提供了消息Acknowledge
即ACK
机制,当消费者Consumer
确认消息已经被消费处理后会发送一个ACK
给消息队列,此时消息队列便可以删除这个消息。如果消费者Consumer
宕机或关闭,没有发送ACK
,消息队列将认为这个消息没有被处理,会见这个消息重新发送给其他的消费者重新消费处理。
传输模式
消息队列有哪几种传输模式呢?
消息队列的传输模式可以分为两种,分别是点对点模式(point to point,queue
)和发布/订阅模式(publish/subscribe,topic
)
点对点模式
点对点模型用于消息生产者和消息消费者之间点到点的通信,消息生产者将消息发送到由某个名字标识的特定消费者,这个名字实际上对应消费服务中的一个队列,在消息传递给消费者之前它被存储在这个队列中,队列消息可以存放在内存也可以持久化,以保证在消息服务出现故障时仍然能够传递消息。
消息生产者向一个特定的队列发送消息,消息消费者从该队列中接收消息,消息生产者和消费者可以不同时处于运行状态,每一个成功处理的消息都是由消息消费者签收确认(Acknowledge
,ACK
)。
点对点模式下包含三个角色:消息队列、发送者(生产者)、接收者(消费者)
消息发送者生产消息并发送到消息队列中,然后消息接收者从队列中取出并消费。消息被消费后队列中不再有存储,所以消息接收者不可能消费到已经被消费的消息。
点对点模式的特点在于每个消息只有一个接收者(消费者),也就是说消息一旦被消费就不会在消息队列中存在。另外,发送者和接收者之间是没有依赖性的,发送者发送消息之后,不管有没有接收者在运行,都不会影响到发送者下次发送消息。再者,接收者在成功接收消息之后需向队列应答成功,以便消息队列删除当前接收的消息。
传统的点对点消息中间件通常是由消息队列服务、消息传递服务、消息队列、消息应用程序接口API组成。
其特点在于每个消息只用一个消费者,发送者和接收者之间没有时间依赖,接收者确认消息接收和处理成功。
发布/订阅模式
发布/订阅(pub/sub
)模式中包含三个角色:角色主题(topic
)、发布者(publisher
)、订阅者(subscriber
)
发布/订阅模型支持先一个特定的消息主题生产消息,零个或多个订阅者可能对接收来自特定消息主题的消息感兴趣。这种模型下,发布者和订阅者彼此不知道对象,就好比是匿名公告板。
发布/订阅模式可以被概况为多个消费者可以获得消息,在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription
)以便能够被消费者订阅,订阅者必须保证持续的活动状态并接收消息。
发布订阅消息模型中,支持向一个特定的主题发布消息,零个或多个订阅者接收来自这个消息主题的消息。在这种模型下,发布者和订阅者彼此不知道对象。发布者将消息发送到角色主题中,系统将这些消息传递给多个订阅者。发布/订阅模型的特点在于每个消息可以有多个订阅者,发布者和订阅者之间有时间上的依赖性。针对某个主题的订阅者必须创建一个订阅之后才能消费发布者的消息。为了消费消息,订阅者需要提前订阅该角色主题并保持在线运行。在这种情况下,在订阅者未连接时,发布的消息将在订阅者重新连接时重新发布。
发布订阅的特点在于每个消息可以有多个订阅者,客户端只有订阅后才能接受到消息。发布者和订阅者之间有时间依赖性,接收者和发布者只有建立订阅关系后才能接收到消息。
订阅可分为持久订阅和非持久订阅,持久订阅表示订阅关系建立后消息就不会消失,不管订阅者是否在线。非持久订阅表示订阅者为了接收消息,必须一直在线,当只有一个订阅者时可视为点对点模式。
MQ与RPC
消息队列与RPC的异同点是什么呢?
RPC远程过程调用的结构是消费者(Consumer)调用提供者(Provider)所提供的服务,消息队列的结构是发送者(Sender)将消息发送给队列(Queue),接收者(Receiver)从消息队列中取出消息进行处理。
架构上,远程过程调用RPC和消息队列Message Queue的差异点在于消息队列有一个中间节点Message Queue(broker),可以将消息缓存。
RPC远程过程调用的特点在于同步调用,适用于要等待返回结果或处理结果的场景,RPC也支持异步调用。由于RPC需要等待结果,所以消费者Consumer(Client)会有线程上的消耗。如果以异步的方式使用RPC,消费者Consumer(Client)是没有线程消耗的,但是不能做到像消息一样被缓存,压力会直接传导到服务提供者Provider那里。
RPC的使用场景主要包括
- 希望同步得到结果的场景
- RPC操作基于接口使用简单
- 不希望发送端受限于处理端的速度时可以使用消息队列
重复消费
如何保证消息不会被重复消费呢?如何保证消息队列的幂等性呢?
首先要明白为什么会造成重复消费?其实无论是哪种消息队列,造成重复消费的原因都是类似的。正常情况下,消费者在消费消息完毕后会发送一个确认消息给消息队列,消息队列收到后就知道该消息被消费了,然后会将消息从消息队列中删除。只是不同的消息队列发出的确认消息形式不同。那造成重复消费的原因是什么呢?就是因为网络传输等故障,确认消息没有传递到消息队列,导致消息队列不知道自己已经消费过该消息,会再次将消息分发给其他消费者。
如何解决这个问题呢?需针对不同的业务场景来解决:
- 如果是拿消息做数据库的插入操作,那么给这个消息做一个唯一主键,出现重复消费将导致主键冲突,即可避免数据库出现脏数据。
- 如果是拿消息做Redis的集合操作则不用解决,因为集合操作本身就是幂等操作。
- 如果两种情况都不行,准备一个第三方截止来做消费记录。以Redis为例给消息分配一个全局ID,只要消费过该消息将
<id, message>
以键值对形式存入Redis,消费者消费之前先去Redis中查询是否有消费记录即可。
可靠性
如何保证消费的可靠性传输呢?
对于可靠性传输,消息队列需要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据
顺序性
如何保证消息的顺序性呢?
队列本身是一种先进先出的数据结构,所以消费消息时也是安装顺序来消费的。但也会出现消息被消费的顺序错误的情况,例如某个消息消费失败或一个队列多个消费者时也会导致消息被消费的顺序错误。
消息堆积
如何处理消息堆积的问题呢?处理消息堆积的方法就是将它保存下来,只是这个存储可以做成很多种方式,如存储在内存中,存储在分布式KV
里,存储在磁盘,存储在数据库等。归根结底主要有持久化和非持久化两种方式。持久化的方式能更大程度地保证消息的可靠性,如断电等不可抗拒的外力因素,理论上能承载更大限度的消息堆积。但并不是每种消息都需要持久化存储的,很多消息对于投递性能的要求大于可靠性的要求,且数据极大如日志。这个时候消息不落地直接暂存内存,尝试几次allover
最终投递出去也未尝不可。
未完待续...