RabbitMQ 是一个基于 AMQP 协议实现的企业级消息系统,想要顺畅的玩耍的前提是得先了解它,本文将主要介绍 rabbitmq 的一些基本知识点
- 特点
- 基本概念
- 消息投递消费的几种姿势
- 事务
- 集群
I. 基本知识点
它是采用 Erlang 语言实现的 AMQP(Advanced Message Queued Protocol)的消息中间件,最初起源于金融系统,用在分布式系统存储转发消息,目前广泛应用于各类系统用于解耦、削峰
1.特点
首先得了解一下 rabbitmq 的特点,看看是否满足我们的系统需求(毕竟学习一个框架也是要不少时间的)
主要特点,大致可以归纳为以下几个
- 可靠性:通过支持消息持久化,支持事务,支持消费和传输的 ack 等来确保可靠性
- 路由机制:支持主流的订阅消费模式,如广播,订阅,headers 匹配等
- 扩展性:多个 RabbitMQ 节点可以组成一个集群,也可以根据实际业务情况动态地扩展集群中节点。
- 高可用性:队列可以在集群中的机器上设置镜像,使得在部分节点出现问题的情况下队仍然可用。
- 多种协议:RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP,MQTT 等多种消息中间件协议。
- 多语言客户端:RabbitMQ 几乎支持所有常用语言,比如 Jav a、Python、Ruby、PHP、C#、JavaScript 等。
- 管理界面:RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集群中的节点等。
- 插件机制:RabbitMQ 提供了许多插件,以实现从多方面进行扩展,当然也可以编写自己的插件。
2. 基本概念
下图为 rabbitmq 的内部结构图
从上图也可以发现几个基本概念(Message, Publisher, Exchange, Binding, Queue, Channel, Consuer, Virtual host)
下面逐一进行说明
a. Message
具体的消息,包含消息头(即附属的配置信息)和消息体(即消息的实体内容)
由发布者,将消息推送到 Exchange,由消费者从 Queue 中获取
b. Publisher
消息生产者,负责将消息发布到交换器(Exchange)
c. Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列
d. Binding
绑定,用于给 Exchange 和 Queue 建立关系,从而决定将这个交换器中的哪些消息,发送到对应的 Queue
e. Queue
消息队列,用来保存消息直到发送给消费者
它是消息的容器,也是消息的终点
一个消息可投入一个或多个队列
消息一直在队列里面,等待消费者连接到这个队列将其取走
f. Connection
连接,内部持有一些 channel,用于和 queue 打交道
g. Channel
信道(通道),MQ 与外部打交道都是通过 Channel 来的,发布消息、订阅队列还是接收消息,这些动作都是通过 Channel 完成;
简单来说就是消息通过 Channel 塞进队列或者流出队列
h. Consumer
消费者,从消息队列中获取消息的主体
i. Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。
虚拟主机是共享相同的身份认证和加密环境的独立服务器域。
每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。
vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /
可以理解为 db 中的数据库的概念,用于逻辑拆分
j. Broker
消息队列服务器实体
3. 消息投递消费
从前面的内部结构图可以知晓,消息由生产者发布到 Exchange,然后通过路由规则,分发到绑定 queue 上,供消费者获取消息
接下来我们看一下 Exchange 支持的四种策略
a. Direct 策略
消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中
简单来讲,就是
rounting key
与
binding key
完全匹配
- 如果一个队列绑定到交换机要求路由键为
dog
- 只转发
routing key
标记为dog
的消息, - 不会转发
dog.puppy
,也不会转发“dog.guard”等等 - 它是完全匹配、单播的模式
举例说明
Exchange 和两个队列绑定在一起:
- Q1 的 bindingkey 是 orange
- Q2 的 binding key 是 black 和 green.
- 当 Producer 发布一个消息,其
routing key
是orange
时, exchange 会把它放到 Q1 上, 如果是black
或green
就会到 Q2 上, 其余的 Message 被丢弃
注意
- 当有多个队列绑定到同一个 Exchange,且 binding key 相同时,这时消息会分发给所有满足条件的队列
b. Topic 策略
这个策略可以看成是 Direct 策略的升级版,通过
routing key
与
bingding key
的模式匹配方式来分发消息
简单来讲,直接策略是完全精确匹配,而 topic 则支持正则匹配,满足某类指定规则的(如以 xxx 开头的路由键),可以将消息分发过去
#
匹配 0 个或多个单词*
匹配不多不少一个单词
一个更直观的实例如下
Producer 发送消息时需要设置 routing_key,
- Q1 的 binding key 是
*.orange.*
- Q2 是
*.*.rabbit
和lazy.#
: - 发布一个
routing key
为test.orange.mm
消息,则会路由到 Q1;- 注意: 如果是routng key
是test.orange
则无法路由到 Q1,- 因为 Q1 的规则是三个单词,中间一个为 orange,不满足这个规则的都无效 - 发布一个
routing key
为test.qq.rabbit
或者lazy.qq
的消息 都可以分发到 Q2;即路由 key 为三个单词,最后一个为 rabbit 或者不限制单词个数,主要第一个是 lazy 的消息,都可以分发过来 - 如果发布的是一个
test.orange.rabbit
消息,则 Q1 和 Q2 都可以满足- 注意: 这时两个队列都会接受到这个消息
c. Fanout 策略
广播策略,忽略
routing key
和
binding key
,将消息分发给所有绑定在这个 exchange 上的 queue
d. Headers 策略
这个实际上用得不多,它是根据 Message 的一些头部信息来分发过滤 Message,忽略 routing key 的属性,如果 Header 信息和 message 消息的头信息相匹配
II. 消息一致性问题
在进入 rabbitmq 如何保证一致性之前,我们先得理解,什么是消息一致性?
1. 一致性问题
数据的一致性是什么[2]
按照我个人的粗浅理解,我认为的消息一致性,应该包含下面几个
- 生产者,确保消息发布成功- 消息不会丢- 顺序不会乱- 消息不会重复(如重传,导致发布一次,却出现多个消息)
- 消费者,确保消息消费成功- 有序消费- 不重复消费
发送端
为了确保发布者推送的消息不会丢失,我们需要消息持久化
- broker 持久化消息
为了确定消息正确接收
- publisher 需要知道消息投递并成功持久化
2. 持久化
这里的持久化,主要是指将内存中的消息保存到磁盘,避免 mq 宕机导致的内存中消息丢失;然而单纯的持久化,只是保证一致性的其中一个要素,比如 publisher 将消息发送到 exchange,在 broker 持久化的工程中,宕机了导致持久化失败,而 publisher 并不知道持久化失败,这个时候就会出现数据丢失,为了解决这个问题,rabbitmq 提供了事务机制
3. 事务机制
事务机制能够解决生产者与 broker 之间消息确认的问题,只有消息成功被 broker 接受,事务才能提交成功,否则就进行事务回滚操作并进行消息重发。但是使用事务机制会降低 RabbitMQ 的消息吞吐量,不适用于需要发布大量消息的业务场景。
注意,事务是同步的
4. 消息确认机制
RabbitMQ 学习(六)——消息确认机制(Confirm 模式)[3]
消息确认机制,可以区分为生产端和消费端
生产端
- 生产者将信道设置成 Confirm 模式,一旦信道进入 Confirm 模式,所有在该信道上面发布的消息都会被指派一个唯一的 ID(以 confirm.select 为基础从 1 开始计数),
- 一旦消息被投递到所有匹配的队列之后,Broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,
- 如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,
- Broker 回传给生产者的确认消息中 deliver-tag 域包含了确认消息的序列号(此外 Broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理)
Confirm 模式属性异步,publisher 发布一条消息之后,在等信道返回确认的同时,依然可以继续发送下一条消息,所以小概率会出现投递的消息顺序和 broker 中持久化消息顺序不一致的问题
一般从编程角度出发,Confirm 模式有三种姿势
- 普通 Confirm 模式:发送一条消息之后,等到服务器 confirm,然后再发布下一条消息(串行发布)
- 批量 Confirm 模式:发送一批消息之后,等到服务器 confirm,然后再发布下一批消息(如果失败,这一批消息全部重复,所以会有重复问题)
- 异步 Confirm 模式:提供一个回调方法,服务器 confirm 之后,触发回调方法,因此不会阻塞下一条消息的发送
消费端
ACK 机制是消费者从 RabbitMQ 收到消息并处理完成后,反馈给 RabbitMQ,RabbitMQ 收到反馈后才将此消息从队列中删除。
- 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有 ACK 反馈,RabbitMQ 会认为这个消息没有正常消费,会将消息重新放入队列中
- 如果在集群的情况下,RabbitMQ 会立即将这个消息推送给这个在线的其他消费者。这种机制保证了在消费者服务端故障的时候,不丢失任何消息和任务
- 消息永远不会从 RabbitMQ 中删除,只有当消费者正确发送 ACK 反馈,RabbitMQ 确认收到后,消息才会从 RabbitMQ 服务器的数据中删除
III. 集群
按照目前的发展趋势,一个不支持集群的中间件基本上是不会有市场的;rabbitmq 也是支持集群的,下面简单的介绍一下常见的 4 种集群架构模式
以下内容来自网上博文,详情请点击右边:RabbitMQ 的 4 种集群架构[4]
1. 主备模式
这个属于常见的集群模式了,但又不太一样
主节点提供读写,备用节点不提供读写。如果主节点挂了,就切换到备用节点,原来的备用节点升级为主节点提供读写服务,当原来的主节点恢复运行后,原来的主节点就变成备用节点
2. 远程模式
远程模式可以实现双活的一种模式,简称 shovel 模式,所谓的 shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 MQ 集群互联,远距离通信和复制。
- Shovel 就是我们可以把消息进行数据中心的复制工作,我们可以跨地域的让两个 MQ 集群互联。
如上图,有两个异地的 MQ 集群(可以是更多的集群),当用户在地区 1 这里下单了,系统发消息到 1 区的 MQ 服务器,发现 MQ 服务已超过设定的阈值,负载过高,这条消息就会被转到 地区 2 的 MQ 服务器上,由 2 区的去执行后面的业务逻辑,相当于分摊我们的服务压力。
3. 镜像模式
非常经典的 mirror 镜像模式,保证 100% 数据不丢失。在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集群模式。
如上图,用 KeepAlived 做了 HA-Proxy 的高可用,然后有 3 个节点的 MQ 服务,消息发送到主节点上,主节点通过 mirror 队列把数据同步到其他的 MQ 节点,这样来实现其高可靠
4. 多活模式
也是实现异地数据复制的主流模式,因为 shovel 模式配置比较复杂,所以一般来说,实现异地集群的都是采用这种双活 或者 多活模型来实现的。这种模式需要依赖 rabbitMQ 的 federation 插件,可以实现持续的,可靠的 AMQP 数据通信,多活模式在实际配置与应用非常的简单
rabbitMQ 部署架构采用双中心模式(多中心),那么在两套(或多套)数据中心各部署一套 rabbitMQ 集群,各中心的 rabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享。
federation 插件是一个不需要构建 cluster ,而在 brokers 之间传输消息的高性能插件,federation 插件可以在 brokers 或者 cluster 之间传输消息,连接的双方可以使用不同的 users 和 virtual hosts,双方也可以使用不同版本的 rabbitMQ 和 erlang。federation 插件使用 AMQP 协议通信,可以接受不连续的传输。federation 不是建立在集群上的,而是建立在单个节点上的,如图上黄色的 rabbit node 3 可以与绿色的 node1、node2、node3 中的任意一个利用 federation 插件进行数据同步。
版权归原作者 程序员xysam 所有, 如有侵权,请联系我们删除。