0


消息队列MQ详解(Kafka、RabbitMQ、RocketMQ、ActiveMQ等)

文章目录

概述

消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。当今市面上有很多主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,炙手可热的Kafka,阿里巴巴自主开发RocketMQ等。

消息中间件的优势(异步削峰解耦)

使用消息队列可用6字概括:解耦、异步、削峰
解耦:将消息写入消息队列,需要消息的时候自己从消息队列中订阅,从而原系统不需要做任何修改。
异步:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度
削峰:原系统慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。
应用解耦、流量削峰、异步处理、消息通讯、远程调用
系统解耦
交互系统之间没有直接的调用关系,只是通过消息传输,故系统侵入性不强,耦合度低。
提高系统响应时间
例如原来的一套逻辑,完成支付可能涉及先修改订单状态、计算会员积分、通知物流配送几个逻辑才能完成;通过MQ架构设计,就可将紧急重要(需要立刻响应)的业务放到该调用方法中,响应要求不高的使用消息队列,放到MQ队列中,供消费者处理。
为大数据处理架构提供服务
通过消息作为整合,大数据的背景下,消息队列还与实时处理架构整合,为数据处理提供性能支持。
Java消息服务——JMS
Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
JMS中的P2P和Pub/Sub消息模式:点对点(point to point, queue)与发布订阅(publish/subscribe,topic)最初是由JMS定义的。这两种模式主要区别或解决的问题就是发送到队列的消息能否重复消费(多订阅)。

消息队列的缺点

一致性 幂等性
MQ引入成本
增加引入复杂度 、只能保证最终一致性
系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
系统复杂度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何
保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理
失败。如何保证消息数据处理的一致性?

消息中间件模式分类

1.点对点
PTP点对点:使用queue作为通信载体
在这里插入图片描述

说明:
消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息。
消息被消费以后,queue中不再存储,所以消息消费者不可能消费到已经被消费的消息。 Queue支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/b57d97f84fe040bca039ca3cc1ac4412.png

2 发布/订阅
Pub/Sub发布订阅(广播):使用topic作为通信载体
在这里插入图片描述

说明:
消息生产者(发布)将消息发布到topic中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到topic的消息会被所有订阅者消费。
queue实现了负载均衡,将producer生产的消息发送到消息队列中,由多个消费者消费。但一个消息只能被一个消费者接受,当没有消费者可用时,这个消息会被保存直到有一个可用的消费者。
topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到一个消息的拷贝。

消息队列使用场景和应用场景

1、异步处理
将一个请求链路中的非核心流程,拆分出来,异步处理,减少主流程链路的处理逻辑,缩短RT,提升吞吐量。
有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
如:注册新用户发短信通知
2、削峰填谷
避免流量暴涨,打垮下游系统,前面会加个消息队列,平滑流量冲击。比如:秒杀活动。生活中像电源适配器也是这个原理。
3、应用解耦
降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
两个应用,通过消息系统间接建立关系,避免一个系统宕机后对另一个系统的影响,提升系统的可用性。如:下单异步扣减库存
4、消息通讯
内置了高效的通信机制,可用于消息通讯。如:点对点消息队列、聊天室
5. 冗余
有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
6扩展性
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。便于分布式扩容。
7 过载保护
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
8可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
9 顺序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。
10 缓冲
在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。
11 数据流处理
分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择

消息中间件常用协议

AMQP协议
AMQP即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。
优点:可靠、通用
MQTT协议
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当做传感器和致动器(比如通过Twitter让房屋联网)的通信协议。
优点:格式简洁、占用带宽小、移动端通信、PUSH、嵌入式系统
STOMP协议
STOMP(Streaming Text Orientated Message Protocol)是流文本定向消息协议,是一种为MOM(Message Oriented Middleware,面向消息的中间件)设计的简单文本协议。STOMP提供一个可互操作的连接格式,允许客户端与任意STOMP消息代理(Broker)进行交互。
优点:命令模式(非topic\queue模式)
XMPP协议
XMPP(可扩展消息处理现场协议,Extensible Messaging and Presence Protocol)是基于可扩展标记语言(XML)的协议,多用于即时消息(IM)以及在线现场探测。适用于服务器之间的准即时操作。核心是基于XML流传输,这个协议可能最终允许因特网用户向因特网上的其他任何人发送即时消息,即使其操作系统和浏览器不同。
优点:通用公开、兼容性强、可扩展、安全性高,但XML编码格式占用带宽大
其他基于TCP/IP自定义的协议
有些特殊框架(如:redis、kafka、zeroMq等)根据自身需要未严格遵循MQ规范,而是基于TCP\IP自行封装了一套协议,通过网络socket接口进行传输,实现了MQ的功能。

消息中间件的组成

Broker
消息服务器,作为server提供消息核心服务
Producer
消息生产者,业务的发起方,负责生产消息传输给broker,
Consumer
消息消费者,业务的处理方,负责从broker获取消息并进行业务逻辑处理
Topic
主题,发布订阅模式下的消息统一汇集地,不同生产者向topic发送消息,由MQ服务器分发到不同的订阅者,实现消息的 广播
Queue
队列,PTP模式下,特定生产者向特定queue发送消息,消费者订阅特定的queue完成指定消息的接收
Message
消息体,根据不同通信协议定义的固定格式进行编码的数据包,来封装业务数据,实现消息的传输

如何实现高吞吐量

1、消息的批量处理
2、消息压缩,节省传输带宽和存储空间
3、零拷贝
4、磁盘的顺序写入
5、page cache 页缓存,由操作系统异步将缓存中的数据刷到磁盘,以及高效的内存读取
6、分区设计,一个逻辑topic下面挂载N个分区,每个分区可以对应不同的机器消费消息,并发设计。

  1. 优化消息生产者:通过批量发送消息、异步发送、压缩数据等方式来提高消息生产者的性能和吞吐量。避免频繁的网络连接和消息发送操作,尽量减少生产者与消息队列之间的通信开销。
  2. 优化消息消费者:使用多线程或多进程方式来并发处理消息,提高消息消费者的并发性和处理能力。根据实际情况调整消费者的线程数或进程数,合理分配资源,避免出现瓶颈。
  3. 批量处理消息:将多条消息打包成一个批次进行处理,减少网络传输的开销和消息队列的调度开销。合理设置批处理的大小,以在保证系统稳定性的前提下最大化地提高吞吐量。
  4. 水平扩展:通过增加消息队列的节点(Broker)数量或实例数来水平扩展系统的处理能力。通过负载均衡机制将消息流量均匀地分发到不同的节点上,提高系统的整体吞吐量和并发性能。
  5. 优化存储和传输:选择高性能的存储介质(如固态硬盘),使用高效的压缩算法来减少存储和传输开销。优化网络配置和协议选择,减少网络延迟和带宽消耗,提高数据传输的效率。
  6. 预取和缓存:使用预取机制来提前加载消费者可能需要的消息,减少每次消费时的等待时间。设置适当的缓存机制,提高消息的读取和写入性能,减少对底层存储系统的频繁访问。
  7. 监控和调优:建立监控系统,实时监测消息队列系统的性能指标、负载情况和瓶颈点。通过监控数据进行调优,识别系统的瓶颈,针对性地进行优化和改进,以提高系统的吞吐量和性能。
  8. 合理设计消息模型:根据实际业务需求,合理设计消息模型,避免不必要的复杂性和冗余数据。简化消息结构和内容,减少消息的大小和处理成本,提高系统的整体吞吐量。 通过综合考虑以上因素,并根据具体的业务需求和场景进行调优和优化,可以有效提高消息队列系统的吞吐量和性能。

MQ 如何避免消息堆积

1.产生背景: 生产者投递消息的速率与我们消费者消费的速率完全不匹配。
2.生产者投递消息的速率>消费者消费的速率
导致我们消息会堆积在我们 mq 服务器端中,没有及时的被消费者消费 所以就会产生消息堆积的问题
3.注意的是:rabbitmq 消费者我们的消息消费如果成功的话 消息会被立即删除(自动ack)
kafka 或者 rocketmq 消息消费如果成功的话,消息是不会立即被删除。
4.解决办法:
A.提高消费者消费的速率;(对我们的消费者实现集群)
B.消费者应该批量形式获取消息 减少网络传输的次数;

消息堆积如何处理

主要是消息的消费速度跟不上生产速度,从而导致消息堆积。解决思路:
1.可能是刚上线的业务,或者大促活动,流量评估不到位,这时需要增加消费组的机器数量,提升整体消费能力
2.也可能是消费端的问题,正常情况,一条消息处理需要10ms,但是优化不到位或者线上bug,现在要500ms,那么消费端的整体处理速度会下降50倍。这时,我们就要针对性的排查业务代码。例如数据库的一条sql没有命中索引,导致单条消息处理耗时拉长,进而导致消息堆积

如果 bug 导致几百万消息持续积压几小时。有如何处理呢? 需要解决bug,临时紧急扩容,大概思路如下:
1.先修复 consumer 消费者慢问题,以确保其恢复消费速度,然后将现有consumer 都停掉。
2.新建一个 topic,partition 原来 10 倍,临时立好原先 10 倍 queue数量。
3.然后写一个临时分发数据consumer 程序,这个程序部署上去消费积压数据,消费之后不做耗时处理,直接均匀轮询写入临时立好10 倍数量queue。
4.接着临时征用 10 倍机器来部署 consumer,每一批 consumer 消费一个临时queue 数据。这种做法相当于临时将 queue 资源和 consumer 资源扩大10 倍,以正常 10 倍速度来消费数据。
5.等快速消费完积压数据之后,得恢复原先部署架构,重新用原先consumer 机器来消费消息。

如何解决消息队列的延时以及过期失效问题?消息队列满了以后该怎么处理?有几百万消息持续积压几小时,怎么办?

消息积压处理办法:临时紧急扩容先修复 consumer 的问题,确保其恢复消费速度,然后将现有 cnosumer 都停掉。新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。 MQ中消息失效:假设你用的是 RabbitMQ,RabbtiMQ 是可以设置过期时间的,也就是 TTL。如果消息在 queue 中积压超过一定的时间就会被 RabbitMQ 给清理掉,这个数据就没了。那这就是第二个坑了。 这就不是说数据会大量积压在 mq 里,而是大量的数据会直接搞丢。我们可以采取一个方案,就是批量重导,这个我们之前线上也有类似的场景干过。就是大量积压的时候,我们当时就直接丢弃数据了,然后等过了高峰期以后,比如大家一起喝咖啡熬夜到晚上12点以后,用户都睡觉了。这个时候我们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,然后重新灌入 mq 里面去,把白天丢的数据给他补回来。也只能是这样了。假设 1 万个订单积压 在 mq 里面,没有处理,其中 1000 个订单都丢了,你只能手动写程序把那 1000 个订单给查出来,手动发到 mq 里去再补一次。 mq消息队列块满了:如果消息积压在 mq 里,你很长时间都没有处理掉,此时导致 mq 都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉所有的消息。然后走第二个方案,到了晚上再补数据吧。

消息队列MQ技术选型

ActiveMQ、RabbitMQ、Kafka、RocketMQ 这几种, 在架构技术选型的时候一般根据业务的需求选择合适的中间件: 比如中小型公司,低吞吐量的一般用 ActiveMQ、RabbitMQ 较为合适, 大数据高吞吐量的大型公司一般选用 Kafka 和 RocketMQ。
一般的业务系统要引入 MQ,最早大家都用 ActiveMQ,但是现在确实大家用的不多了,没经过大规模吞吐量场景的验证,社区也不是很活跃,所以大家还是算了吧,我个人不推荐用这个了;
后来大家开始用 RabbitMQ,但是确实 erlang 语言阻止了大量的 Java 工程师去深入研究和掌控它,对公司而言,几乎处于不可控的状态,但是确实人家是开源的,比较稳定的支持,活跃度也高;
不过现在确实越来越多的公司会去用 RocketMQ,确实很不错,毕竟是阿里出品,但社区可能有突然黄掉的风险(目前RocketMQ 已捐给 Apache,但 GitHub 上的活跃度其实不算高)对自己公司技术实力有绝对自信的,推荐RocketMQ,否则回去老老实实用 RabbitMQ 吧,人家有活跃的开源社区,绝对不会黄
所以中小型公司,技术实力较为一般,技术挑战不是特别高,用 RabbitMQ 是不错的选择;大型公司,基础架构研发实力较强,用 RocketMQ 是很好的选择。
如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
ActiveMQ、RabbitMQ、Kafka、RocketMQ 这几种, 在架构技术选型的时候一般根据业务的需求选择合适的中间件: 比如中小型公司,低吞吐量的一般用 ActiveMQ、RabbitMQ 较为合适, 大数据高吞吐量的大型公司一般选用 Kafka 和 RocketMQ。
对于吞吐量来说kafka和RocketMQ支撑高吞吐,ActiveMQ和RabbitMQ比他们低一个数量级。对于延迟量来说RabbitMQ是最低的。

讲下 Kafka、RabbitMQ、RocketMQ 之间的区别是什么

性能
消息中间件的性能主要衡量吞吐量,Kafka 的吞吐量比 RabbitMQ 要高出 1~2 个数级,
RabbitMQ 的单机 QPS 在万级别,Kafka 的单机 QPS 能够达到百万级别。RocketMQ 单机 写入 TPS 单实例约 7 万条/秒,单机部署 3 个 Broker,可以跑到最高 12 万条/秒,消息大小 10 个字节,Kafka 如果开启幂等、事务等功能,性能也会有所降低。
数据可靠性
Kafka 与 RabbitMQ 都具备多副本机制,数据可靠性较高。RocketMQ 支持异步实时刷盘, 同步刷盘,同步 Replication,异步 Replication。
服务可用性
Kafka 采用集群部署,分区与多副本的设计,使得单节点宕机对服务无影响,且支持消息容量 的线性提升。RabbitMQ 支持集群部署,集群节点数量有多种规格。RocketMQ 是分布式架 构,可用性高。
功能
Kafka 与 RabbitMQ 都是比较主流的两款消息中间件,具备消息传递的基本功能,但在一些 特殊的功能方面存在差异,RocketMQ 在阿里集团内部有大量的应用在使用。

消息队列的在各种场景下如何选型

优先级队列;队列设置最大的优先级,之后每条消息设置对应的优先级,队列根据消息优 先级进行消费,(在有可能队列堆积的情况才有意义);应用场景:不同业务消息推送。

  1. 延迟队列:消息发送后,并不想让消费者立即拿到消息,等待特定的事件后,消费者才能 拿到并消费;应用场景:订单系统中订单支付 30 分钟内没有支付成功,那么将这个订单 进行异常处理;远程操作智能设备在指定时间进行工作等。(rabbit 中没有延迟队列,但 可以借助死信队 列与 TTL 设置来完成) 2.死信队列:当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器(DLX 交换器)中,绑定 DLX 的队列就称为死信队列。
  2. 重试队列:消费端,一直不回传消费的结果,rocketmq 认为消息没收到,consumer 下 一次拉取,broker 依然会发送该消息(有次数限制)。重试队列其实可以看成是一种回 退队列,具体指消费端消费消息失败时,为防止消息无故丢失而重新将消息回滚到 Broker 中。 4.消费模式: 推模式:对于 kafka 而言,由 Broker 主动推送消息至消费端,实时性较好, 不过需要一定的流 制机制来确保服务端推送过来的消息不会压垮消费端。拉模式:对于 kafka 而言,消费端主动向 Broker 端请求拉取(一般是定时或者定量)消息,实时性较推模 式差,但是可以根据自身的处理能力而控制拉取的消息量。
  3. 消息回溯:重置消息 offset(如:kafka、rokcetMq) 一般消息在消费完成之后就被处 理了,之后再也不能消费到该条消息。消息回溯正好相反,是指消息在消费完成之后,还 能消费到之前被消费掉的消息。对于消息而言,经常面临的问题是“消息丢失”,至于是 真正由于消息中间件的缺陷丢失还是由于使用方的误用而丢失一般很难追查,如果消息中 间件本身具备消息回溯功能的话,可以通过回溯消费复现“丢失的”消息 进而查出问题的 源头之所在。消息回溯的作用远不止与此,比如还有索引恢复、本地缓存重建,有些业务 补偿方案也可以采用回溯的方式来实现。
  4. 消息堆积:流量削峰是消息中间件的一个非常重要的功能,而这个功能其实得益于其消息 堆积能力。从某种意义上来讲,如果一个消息中间件不具备消息堆积的能力,那么就不能 把它看做是一个合格的消息中间件。消息堆积分内存式堆积和磁盘式堆积。
  5. 消息持久化:持久化确保 MQ 的使用不只是一个部分场景的辅助工具,而是让 MQ 能像 数据库一样存储核心的数据。有些功能是默认不开启的,需要进行配置。
  6. 多租户: 也可以称为多重租赁技术,是一种软件架构技术,主要用来实现多用户的环境下 公用相同的系统或程序组件,并且仍可以确保各用户间数据的隔离性。RabbitMQ 就能够 支持多租户技术,每一个租户表示为一个 vhost,其本质上是一个独立的小型 RabbitMQ 服务器,又有自己独立 的队列、交换器及绑定关系等,并且它拥有自己独立的权限。 vhost 就像是物理机中的虚拟机 一样,它们在各个实例间提供逻辑上的分离,为不同程序 安全保密地允许数据,它既能将同一 个 RabbitMQ 中的众多客户区分开,又可以避免队 列和交换器等命名冲突。
  7. 跨语言支持: 对很多公司而言,其技术栈体系中会有多种编程语言,如 C/C++、JAVA、 Go、PHP 等,消息 中间件本身具备应用解耦的特性,如果能够进一步的支持多客户端语 言,那么就可以将此特性 的效能扩大。跨语言的支持力度也可以从侧面反映出一个消息中间件的流行程度。 10.消息顺序消息:先进先出、 逐条进行消费顾名思义,消息顺序性是指保证消息有序。这个 功能有个很常见的应用场景就是 CDC(Change Data Chapture),以 MySQL 为例,如 果其传输的 binlog 的顺序出错,比如原本是先对一条数据加 1,然后再乘以 2,发送错序 之后就变成了先乘以 2 后加 1 了,造成了数据不一致。
  8. 安全机制: 在 Kafka 0.9 版本之后就开始增加了身份认证和权限控制两种安全机制。身份 认证是指客户端与服务端连接进行身份认证,包括客户端与 Broker 之间、Broker 与 Broker 之间、Broker 与 ZooKeeper 之间的连接认证,目前支持 SSL、SASL 等认证机 制。权限控制是指对客户端的读写操作进行权限控制,包括对消息或 Kafka 集群操作权限 控制。权限控制是可插拔的,并支持与外部的授权服务进行集成。对于 RabbitMQ 而言,其同样提供身份认证(TLS/SSL、SASL)和 权限控制(读写操作)的安全机制。
  9. 事务支持: 事务本身是一个并不陌生的词汇,事务是由事务开始(Begin Transaction) 和事务结束(End Transaction)之间执行的全体操作组成。支持事务的消息中间件并不 在少数,Kafka 和 RabbitMQ 都支持,不过此两者的事务是指生产者发生消息的事务,要 么发送成功,要么发送失败。消息中间件可以作为用来实现分布式事务的一种手段,但其 本身并不提供全局分布式事务的功能。

RabbitMQ 和 Kafka 的显著区别

RabbitMQ 是一个消息代理中间件,而 Apache Kafka 是一个分布式流处理平台。这种差异可能看起来只是语义上的,但它会带来严重的影响,影响我们方便地实现各种系统功能。
例如 Kafka 最适合处理流数据,在同一主题同一分区内保证消息顺序,而 RabbitMQ 对流中消息的顺序只提供基本的保证。
不过 RabbitMQ 内置了对重试逻辑和死信交换的支持,而 Kafka 将此类逻辑实现留给了用户。

redis

使用C语言开发的一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10K,Redis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis。

ZeroMQ

号称最快的消息队列系统,专门为高吞吐量/低延迟的场景开发,在金融界的应用中经常使用,偏重于实时数据通信场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,开发成本高。因此ZeroMQ具有一个独特的非中间件的模式,更像一个socket library,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序本身就是使用ZeroMQ API完成逻辑服务的角色。但是ZeroMQ仅提供非持久性的队列,如果down机,数据将会丢失。如:Twitter的Storm中使用ZeroMQ作为数据流的传输。
ZeroMQ套接字是与传输层无关的:ZeroMQ套接字对所有传输层协议定义了统一的API接口。默认支持 进程内(inproc) ,进程间(IPC) ,多播,TCP协议,在不同的协议之间切换只要简单的改变连接字符串的前缀。可以在任何时候以最小的代价从进程间的本地通信切换到分布式下的TCP通信。ZeroMQ在背后处理连接建立,断开和重连逻辑。
特性:
● 无锁的队列模型:对于跨线程间的交互(用户端和session)之间的数据交换通道pipe,采用无锁的队列算法CAS;在pipe的两端注册有异步事件,在读或者写消息到pipe的时,会自动触发读写事件。
● 批量处理的算法:对于批量的消息,进行了适应性的优化,可以批量的接收和发送消息。
● 多核下的线程绑定,无须CPU切换:区别于传统的多线程并发模式,信号量或者临界区,zeroMQ充分利用多核的优势,每个核绑定运行一个工作者线程,避免多线程之间的CPU切换开销。

消息队列中间件如何设计

设计消息队列中间件时需要考虑以下因素:

  1. 可靠性:消息队列中间件应该具备高可靠性,能够确保消息的可靠传递和持久化存储。需要考虑如何处理消息丢失、重复消费等情况。

  2. 性能:消息队列中间件应该具备高性能,能够支撑系统的并发需求。需要考虑消息的处理速度、吞吐量、延迟等指标。

  3. 可扩展性:消息队列中间件应该支持水平扩展,能够方便地扩展集群规模以应对不断增长的消息量和并发请求。

  4. 消息顺序性:有些场景下要求消息按照发送顺序进行处理,因此需要考虑如何确保消息的顺序性。

  5. 消息持久化:消息队列中间件应该支持消息的持久化存储,以防止消息丢失或系统故障导致的消息丢失。

  6. 监控和管理:消息队列中间件应该提供监控和管理功能,能够实时监控消息队列的状态,并提供适当的管理接口。

  7. 安全性:消息队列中间件应该具备安全性,包括数据加密、访问控制、身份认证等功能,以保护消息的安全性和完整性。

  8. 支持多种消息传输协议:消息队列中间件应该支持多种消息传输协议,如AMQP、MQTT、STOMP等,以满足不同场景下的需求。

  9. 与其他系统的集成:考虑消息队列中间件与其他系统的集成,如与数据库、缓存、日志系统等的集成,以实现更多功能和提升系统的整体效率。 在设计消息队列中间件时,需要综合考虑以上因素,并根据实际业务需求和系统架构选择适合的消息队列中间件或进行定制开发。

  10. 远程过程调用

  11. 面向消息:利用搞笑的消息传递机制进行平台无关的数据交流,并给予数据通信来进行分布式系统的集成,有一下三个特点: i) 通讯程序可以在不同的时间运行 ii) 通讯可以一对一、一对多、多对一甚至是上述多种方式的混合 iii) 程序将消息放入消息队列会从小吸毒列中取出消息来进行通讯

  12. 对象请求代理:提供不同形式的通讯服务包括同步、排队、订阅发布、广播等。可构筑各种框架如:事物处理监控器、分布数据访问、对象事务管理器 OTM 等。

  13. 事物处理监控有一下功能: a) 进程管理,包括启动 server 进程、分配任务、监控其执行并对负载进行平衡 b) 事务管理,保证在其监控下的事务处理的原子性、一致性、独立性和持久性 c) 通讯管理,为 client 和 server 之间提供多种通讯机制,包括请求响应、会话、排队、订阅发布和广播等

  14. 首先消息队列整体流程,producer 发送消息给broker,broker存储好,broker 再发送给consumer 消费,consumer 回复消费确认等。

  15. producer 发送消息给 broker,broker 发消息给 consumer 消 费,那就需要两次RPC 了,RPC 如何设计呢?可以参考开源框架 Dubbo,你可以说说服务发现、序列化协议等等

  16. broker 考虑如何持久化呢,放文件系统还数据库呢,会不会消息堆积呢,消息堆积如何处理呢。

  17. 消费关系如何保存呢? 点对点还是广播方式呢?广播关系又如何维护呢?zk 还config server

  18. 消息可靠性如何保证呢?如果消息重复了,如何幂等处理呢?

  19. 消息队列高可用如何设计呢? 可以参考 Kafka 高可用保障机制。多副本 ->leader & follower -> broker 挂了重新选举 leader 即可对外服务。

  20. 消息事务特性,与本地业务同个事务,本地消息落库;消息投递到服务端,本地才删除;定时任务扫描本地消息库,补偿发送。

  21. MQ 得伸缩性和可扩展性,如果消息积压或者资源不够时,如何支持快速扩容,提高吞吐?可以参照一下 Kafka 设计理念,broker -> topic -> partition,每个partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高吞吐量了?

标签: kafka rabbitmq rocketmq

本文转载自: https://blog.csdn.net/Fireworkit/article/details/136261543
版权归原作者 思静语 所有, 如有侵权,请联系我们删除。

“消息队列MQ详解(Kafka、RabbitMQ、RocketMQ、ActiveMQ等)”的评论:

还没有评论