0


RabbitMQ详解

RabbitMQ详解

文章目录

一、AMQP 和 JMS

1.1 JMS

JMS 是由 Sun 公司早期提出的消息标准,是一个 Java 平台中关于面向消息中间件的 API,旨在为 Java 应用提供统一的消息操作。

它已经成为 Java EE(Java Enterprise Edition)的一部分。其担任的角色类似于 JDBC,开发人员只需要根据相应的接口就可以和实现了JMS的服务进行通信。

JMS包含的角色如下:
RolesDescriptionProvider(提供者)实现了 JMS 接口的消息服务器Client(客户端)生产或消费消息的应用Producer(生产者)消息生产者Consumer(消费者)消息消费者Message(消息)生产者和消费者传递的数据内容Queue(队列)生产者存在待消费的消息的地方Topic(主题)发布订阅模式下使用的 topic 模式
JMS 提供了两种消息模型,分别是 P2P(点对点)和 P/S(发布订阅)模型。

P2P
在这里插入图片描述

  • 使用队列作为消息通信载体
  • 每个队列可以有多个消费者,但每条消息只有一个消费者,即消息一旦被消费就被移出队列

P/S

  • 使用主题作为消息通信载体
  • 发布者发布一条消息,消息会通过主题传递给所有的订阅者,类似于广播。在这里插入图片描述

1.2 AMQP

AMQP(Advanced message queuing protocol)是一种协议,更准确的说是一种 binary wire-level protocol(链接协议)。这是其和 JMS 的本质差别,AMQP 不从 API 层进行限定,而是直接定义网络交换的数据格式。

其协议使得实现了 AMQP 的 Provider 天然性就是跨平台的。意味着我们可以使用 Java 的 AMQP Provider的同时使用一个 Python 的 Producer。可以将其与 Http 进行类比,不关心实现的语言,只要按照约定的数据格式发送报文,不同语言的 Client 和 Server 可以无障碍连接。

AMQP 在消息的生产者以及传递信息的队列之间引入了一种间接的机制:Exchange,实现了生产者和队列的解耦。生产者将信息发布到一个 Exchange 上,Exchange 会绑定一个或多个队列上,它负责将信息路由到队列上。

AMQP 包含的角色如下:
RolesDescriptionPublisher(生产者)消息发送端Consumer(消费者)消息接收端Connection(连接)一个 TCP 长连接,生产者和消费者先与 RabbitMQ 建立连接Channel(信道)在 Connection 的基础上建立的虚拟连接,解决多线程问题Broker(中间件)实现 AMQP 实体服务,如 RabbitMQVirtual Host(虚拟主机)不同用户访问同一个 Broker 时,可以创建自己的 Virtual HostExchange(交换机)根据不同的分发规则将消息分发到不同的 QueueQueue(队列)用来存放消息的队列Binding(绑定)Exchange 和 Queue之间的关联,取两者的多对多关系
AMQP 四种不同的 Exchange
Exchange概念Direct Exchange消息的 routing key 与 Binding 的 routing key 直接匹配时,消息到此队列Topic Exchange消息的 routing key 与 Binding 的 routing key 符合通配符匹配时,消息到此队列Headers Exchange消息的 headers 与 Binding 的 headers 符合匹配时,消息到此队列Fanout Exchange消息会到所有队列上,无论消息的 routing key 和 headers 是什么
在这里插入图片描述

1.3 两者对比

JMSAMQP定义Java API协议跨语言否是跨平台否是传输类型P2P(端到端)和P/S(发布订阅)direct exchange
fanout exchange
topic change
headers exchange消息类型StreamMessage(Java原始值的数据流)
MapMessage(一套名称-值对)
TextMessage(一个字符串对象)
ObjectMessage(一个序列化的 Java对象)
BytesMessage(一个字节的数据流)
Message (只有消息头和属性)byte[](二进制)实现ActiveMQRabbitMQ

二、RabbitMQ消息模型

2.1 基本消息模型

在这里插入图片描述
生产者将消息发送到队列,消费者从队列中获取消息,队列是存储消息的缓冲区。

队列只能存储在队列中,它只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。多个生产者可以发消息到同一个队列,多个消费者可以从一个队列接收数据。

消息一旦被消费者接收,队列中的消息就会被删除。如果消费者领取消息后,还没执行操作就挂掉了呢?或者抛出了异常?消息消费失败,但是 RabbitMQ 无从得知,这样消息就丢失了!因此,RabbitMQ 有一个ACK 机制。当消费者获取消息后,会向 RabbitMQ 发送回执 ACK,告知消息已经被接收。不过这种回执ACK 分两种情况:

自动 ACK:消息一旦被接收,消费者自动发送 ACK

手动 ACK:消息接收后,不会发送 ACK,需要手动调用

注意:

虽然图中没有出现 Exchange(交换机),但是本质上信息还是通过交换机传递的。

在不声明交换机的情况下,RabbitMQ 使用的是默认的交换机

2.2 工作消息模型

在这里插入图片描述

工作消息模型主要思想就是避免执行资源密集型任务时,必须等待它执行完成。

一个生产发送消息到队列,允许有多个消费者接收消息,但是一条消息只会被一个消费者获取。

如果多个消费者处理消息的时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置 prefetchCount 来限制 Queue 每次发送给每个消费者的消息数,比如我们设置 prefetchCount=1,则 Queue 每次给每个消费者发送一条消息;消费者处理完这条消息后 Queue 会再给该消费者发送一条消息。

2.3 订阅模型

2.3.1 Fanout 订阅模型

Fanout 又称广播。

在广播模式下:

  • 消费者监听自己的队列
  • 每个队列都有绑定的交换机
  • 生产者只声明交换机,发送的消息也只发送到交换机
  • 交换机把消息发送给绑定到该交换机下的所有队列
  • 消费者绑定队列和交换机时不再定义路由,只要和监听队列就会收到消息在这里插入图片描述

2.3.2 Direct 订阅模型

相较于广播模型,Direct 模型添加一个功能:只能订阅一部分消息。

例如在某些场景下,多个消费者需要分别监听生产者发布的部分消息,这时就需要 Direct 类型的交换机。

在该模型下,队形与交换机的绑定必须要指定一个 routing key(路由),同时生产者在向交换机发送消息时也必须指定消息的 routing key。

交换机收到消息之后会获取消息的路由,并在与自己绑定的队列中寻找绑定路由与消息的路由完全一致的队列,将消息传递给该队列。
在这里插入图片描述

2.3.3 Topic 订阅模型

相较于 Direct 模型,Topic 模型添加一个功能:更细致化的路由匹配规则。

路由一般是由一个或多个单词组成,单词之间以 “.” 分割。例如 good.insert、good.insert.success 等。

通配符规则:

‘#’:匹配一个或多个单词

‘*’:只能匹配一个单词

在这里插入图片描述

三、消息持久化

消息者的 ACK 机制可以防止消费者丢失消息。但是如果消息还没有被消息时 MQ 服务宕机了怎么办呢?

RabbitMQ 支持消息持久化,而将消息持仓化就意味着交换机和队列都要持久化。

3.1 交换机持久化

/**
 * 声明交换机
 *
 * @param exchange   交换机名
 * @param type       交换机类型 DIRECT、FANOUT、TOPIC、HEADERS
 * @param durable    是否持久化
 * @param autoDelete 自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除
 * @param internal   是否被 RabbitMQ 内部使用
 * @param arguments  自定义参数
 * @return
 */DeclareOkexchangeDeclare(String exchange,BuiltinExchangeType type,boolean durable,boolean autoDelete,boolean internal,Map<String,Object> arguments);

参数解析:boolean internal 是否被 RabbitMQ 内部使用

Internal 的意思是内部的意思,在交换机这里设置为“Yes”之后,表示当前 Exchange 是 RabbitMQ 内部使用,用户所创建的 Queue 不会消费该类型交换机下的消息,既然是为了 RabbitMQ 系统所用,作为用户,我们就没有必要创建该类型的 Exchange,当然默认也是选择 No。

如果对底层 MQ 进行封装的时候可以使用。

参数解析:Map<String, Object> arguments 自定义参数

key 如下,value可以自己定义:

  • x-message-ttl 发送到队列的消息在丢弃之前可以存活多长时间(毫秒)。
  • x-expires 队列在被自动删除(毫秒)之前可以使用多长时间。
  • x-max-length 队列在开始从头部删除之前可以包含多少就绪消息。
  • x-max-length-bytes 队列在开始从头部删除之前可以包含的就绪消息的总体大小。
  • x-dead-letter-exchange 设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称。
  • x-dead-letter-routing-key 可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,将使用消息的原始路由密钥。
  • x-max-priority 队列支持的最大优先级数;如果未设置,队列将不支持消息优先级。
  • x-queue-mode 将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息。
  • x-queue-master-locator 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。

3.2 队列持久化

/**
 * 声明队列
 *
 * @param queue      队列名
 * @param durable    是否持久化
 * @param exclusive  排他队列
 * @param autoDelete 自动删除
 * @param arguments  自定义参数
 * @return
 */DeclareOkqueueDeclare(String queue,boolean durable,boolean exclusive,boolean autoDelete,Map<String,Object> arguments);

参数解析:boolean exclusive 排他队列

如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。

  • 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创 建的排他队列的。
  • “首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的 排他队列的,这个与普通队列不同。
  • 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动 删除的。

这种队列适用于只限于一个客户端发送读取消息的应用场景。

参数解析:Map<String, Object> arguments 自定义参数

  • x-max-length:消息条数限制,该参数是非负整数值。先进先出原则,超过10条后面的消息会顶替前面的消息。
  • x-max-length-bytes:消息容量限制,该参数是非负整数值。该参数和 x-max-length 目的一样限制队列的容量,但是这个是靠队列大小(bytes)来达到限制。
  • x-message-ttl:消息存活时间,该参数是非负整数值。创建 Queue 时设置该参数可指定消息在该 Queue 中待多久,可根据 x-dead-letter-routing-key 和 x-dead-letter-exchange 生成可延迟的死信队列。
  • x-max-priority:消息优先级,创建 Queue 时声明优先级队列 。该参数应该是一个整数,表示队列应该支持的最大优先级。建议使用1到10之间。目前使用更多的优先级将消耗更多的资源(Erlang进程)。
  • x-expires:存活时间,该 Queue 会在 x-expires 到期后直接消息,哪怕里面有未消费的消息。
  • x-dead-letter-exchange 和 x-dead-letter-routing-key:在 x-message-ttl 时间到期后把消息放到x-dead-letter-routing-key 和 x-dead-letter-exchange 指定的队列中达到延迟队列的目的。

3.3 签收机制

3.3.1 生产者签收机制

RabbitMQ中对生产者提供了两种签收机制:事务机制和 Confirm 机制。

事务机制

RabbitMQ 中与事务机制有关的方法有三个:txSelect(),txCommit() 以及 txRollback()。

  • txSelect 用于将当前 channel 设置成 transaction 模式
  • txCommit 用于提交事务
  • txRollback 用于回滚事

在通过 txSelect 开启事务之后,我们便可以发布消息给 broker 代理服务器了,如果
txCommit 提交成功了,则消息一定到达了 broker 了,如果在 txCommit 执行之前 broker
异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过 txRollback 回滚事务了。

代码如下:

try{// 开启事务模式
    channel.txSelect();// 发送消息
    channel.basicPublish();// 模拟异常int num =1/0;// 事务提交
    channel.txCommit();}catch(Exception e){// 事务回滚
    channel.txRollback();
    e.printStackTrack();}
Confirm 机制

通过将 Channel 设置成 Confirm 模式来实现。需要注意,事务方式和 Confirm 两种模式不能共存。

生产者将信道设置成 Confirm 模式,一旦信道进入 Confirm 模式,所有在该信道上面发布的消息都将会被指派一个唯一的 ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了。

如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出。

broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。

在编程中我们可以选择下面的几种编程方式:

  • 普通 confirm 模式:每发送一条消息后,调用 waitForConfirms() 方法,等待服务器端 confirm。实际上是一种串行 confirm 了。
  • 批量 confirm 模式:每发送一批消息后,调用 waitForConfirms() 方法,等待服务器端confirm。一旦出现 confirm 返回 false 或者超时的情况时,客户端需要将这一批次的消息全部重发。
  • 异步 confirm 模式:提供一个回调方法,服务端 confirm 了一条或者多条消息后 Client 端会回调这个方法。

3.3.2 消费者签收机制

对于消费者的消息签收有有自动和手动两种:自动签收和手动签收。

自动签收

不会管消息的处理即其它问题。类似于签收快递,自动签收则是将快递放入快递柜,至于后续,不管你是否接收,以及快递是否有问题(程序执行不通过),都不会有所反应。

手动签收

是由程序确认是否签收。即自己本人签收快递,即细致检查(程序处理)后,如果没有问题,则会确定签收,若有问题,可以拒签。


本文转载自: https://blog.csdn.net/qq_42647711/article/details/127881662
版权归原作者 蓝带915 所有, 如有侵权,请联系我们删除。

“RabbitMQ详解”的评论:

还没有评论