1.什么是消息队列?
首先,我们来看看什么是消息队列,维基百科里的解释翻译过来如下:
队列提供了一种异步通信协议,这意味着消息的发送者和接收者不需要同时与消息保持联系,发送者发送的消息会存储在队列中,直到接收者拿到它。
一般我们把消息的发送者称为生产者,消息的接收者称为消费者;注意定义中的那两个字“异步”,通常生产者的生产速度和消费者的消费速度是不相等的;如果两个程序始终保持同步沟通,那势必会有一方存在空等时间;如果两个程序一持续运行的话,消费者的平均速度一定要大于生产者,不然队列囤积会越来越多;当然,如果消费者没有时效性需求的话,也可以把消息囤积在队列中,集中消费。
2.why 消息队列---消息队列的应用场景
消息队列的一些应用场景: 异步处理、解耦、削峰、提速、(广播)
- 异步处理 : 将一些实时性要求不是很强的业务异步处理。
- 解耦 : 消息队列将消息生产和订阅分离,可以实现应用解耦。
- 削峰: 通过在应用前端以消息队列接收请求来达到削峰的目的。请求超过队列长度直接不处理重定向至错误页面。
流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛
应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
消息队列在秒杀活动中的应用:
1. 可以控制活动的人数
2.可以缓解短时间内高流量压垮应用
3 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面
4 秒杀业务根据消息队列中的请求信息,再做后续处理
- 提速:消息队列应用下,消息生产应用只管生产,不需要等待消费完就能处理其他事情。实际上起到了提速的作用。
3. 常见的消息队列及其实现
常见的消息队列中间件 Active MQ,Rabbit MQ,Zero MQ,Kafka
这里介绍下kafka
4 Kafka
Kafka 最初是 linkedin 用于日志处理的分布式消息队列,同时支持离线和在线日志处理。kafka 对消息保存时根据 Topic 进行归类,发送消息者成为 Producer,消息接受者成为 Consumer,此外 kafka 集群由多个 kafka 实例组成,每个实例 (server) 称为 broker。无论是 kafka 集群,还是 producer 和 consumer 都依赖于 zookeeper 来保证系统可用性,为集群保存一些 meta 信息。
kafka基本概念
Broker : Kafka集群包含一个或多个服务器,这种服务器被称为broker
Topic : 每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
Partition : Parition是物理上的概念,每个Topic包含一个或多个Partition.
Producer : 消息生产者,负责发布消息到Kafka broker
Consumer :消息消费者,向Kafka broker读取消息的客户端。
-
Consumer Group : 每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。
如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
Topic
Topic是生产者生产、消费者消费的队列标识。一个Topic由一个或多个partition组成,每个partition可以单独存在一个broker上,消费者可以往任一partition发送消息,以此实现生产的分布式,任一partition都可以被且只被一个消费者消息,以此实现消费的分布式;因此partition的设计提供了分布式的基础。
Partition
partition 特点1:顺序写磁盘
上图我们可以看到 partition每条消息都被append到该partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
上图中如果只有一个消费者组时就等同与P2P模型(单播),当存在多个消费者组时就是PUB/SUB模型(多播)。
partition 特点2:消费线程消费topic下所有partition
当启动一个consumer group去消费一个topic的时候,无论topic里面有多个少个partition,无论我们consumer group里面配置了多少个consumer thread,这个consumer group下面的所有consumer thread一定会消费全部的partition;即便这个consumer group下只有一个consumer thread,那么这个consumer thread也会去消费所有的partition。因此,最优的设计就是,consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。
这样做的好处是可以加快消费的速度。
partition 特点3:保留已消费的消息
对于传统的message queue而言,一般会删除已经被消费的消息,而Kafka集群会保留所有的消息,无论其被消费与否。当然,因为磁盘限制,不可能永久保留所有数据(实际上也没必要),因此Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如可以通过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可通过配置让Kafka在partition文件超过1GB时删除旧数据,如下所示。
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# By default the log cleaner is disabled and the log retention policy will default to
#just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs
#can then be marked for log compaction.
log.cleaner.enable=false
partition 特点4:Kafka读取特定消息的时间复杂度为O(1)
Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。
partition 特点5:由consumer group维护offset
Kafka会为每一个consumer group保留一些metadata信息–当前消费的消息的position,也即offset。这个offset由consumer控制。正常情况下consumer会在消费完一条消息后线性增加这个offset。当然,consumer也可将offset设成一个较小的值,重新消费一些消息。因为offet由consumer控制,所以Kafka broker是无状态的,它不需要标记哪些消息被哪些consumer过,不需要通过broker去保证同一个consumer group只有一个consumer能消费某一条消息,因此也就不需要锁机制,这也为Kafka的高吞吐率提供了有力保障。
参考链接:
kafka:一个分布式消息系统
Kafka剖析(一):Kafka背景及架构介绍
消息队列使用的四种场景介绍
kafka 数据可靠性深度解读