RabbitMQ 高级特性
5.1 消息可靠性🔥🔥🔥
消息从发送,到消费者接收,会经理多个过程,如下所示:
其中的每一步都可能导致消息丢失,常见的丢失原因包括:
- 发送时丢失: - 生产者发送的消息未送达 exchange- 消息到达 exchange 后未到达 queue
- MQ 宕机,queue 将消息丢失
- consumer 接收到消息后未消费就宕机
针对这些问题,RabbitMQ 分别给出了解决方案:
- 生产者确认机制
- mq 持久化
- 消费者确认机制
- 失败重试机制
我们以一个 Demo 进行演示:
5.1.1 生产者消息确认
RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。这种机制必须给每个消息指定一个唯一ID。消息发送到 MQ 以后,会返回一个结果给发送者,表示消息是否处理成功。
返回结果有两种方式:
- publisher-confirm,发送者确认- 消息成功投递到交换机,返回 ack- 消息未投递到交换机,返回 nack
- publisher-return,发送者回执- 消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
注意:确认机制发送消息时,需要给每个消息设置一个全局唯一 id,以区分不同消息,避免 ack 冲突
修改 publisher 服务中的配置:
spring:rabbitmq:publisher-confirm-type: correlated
publisher-returns:truetemplate:mandatory:true
说明:
- publish-confirm-type:开启 publisher-confirm,这里支持两种类型: - simple:同步等待 confirm 结果,直到超时- correlated:异步回调,定义 ConfirmCallback ,MQ 返回结果时会回调这个 ConfirmCallback
- publish-returns:开启 publish-return 功能,同样是基于callback机制,不过是定义ReturnCallback
- template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
定义回调函数
// 设置发送者确认回调函数
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* @param correlationData 自定义的数据 一般是消息的 UUID
* @param b 是否确认 true:消息发送到 exchange 中 false:消息未发送到 exchange 中
* @param s 原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
log.info("发送确认回调触发 消息的ID===> {}", correlationData.getId());
if (b) {
log.info("消息成功发送到交换机中!!!");
} else {
log.error("消息发送到交换机中失败!!!,原因:{}", s);
// 可以重发
}
}
});
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要这个方法被调用,代表消息没能正确路由到队列,被 mq 返还回来了
* @param message 返回的消息
* @param i 回复状态码
* @param s 回复内容
* @param s1 交换机
* @param s2 路由 key
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
i, s, s1, s2, message.toString());
// 如果有业务需要,可以重发
}
});
5.1.2 消息持久化
生产者确认可以确保消息投递到RabbitMQ的队列中,但是消息发送到RabbitMQ以后,如果突然宕机,也可能导致消息丢失。
要想确保消息在RabbitMQ中安全保存,必须开启消息持久化机制。
- 交换机持久化
- 队列持久化
- 消息持久化
默认情况下,由 SpringAMQP 声明的交换机都是持久化的
@BeanpublicFanoutExchangefanoutExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除returnnewFanoutExchange("fanout.exchange",true,false);}
由 SpringAMQP 声明的队列都是持久化的
@BeanpublicQueuequeue(){returnnewQueue("fanout.queue");}
利用 SpringAMQP 发送消息时,可以设置消息的属性(MessageProperties),指定 delivery-mode:
- 1:非持久化
- 2:持久化
默认情况下,SpringAMQP 发出的任何消息都是持久化的,不用特意指定
@TestpublicvoidtestSendDurableMessage()throwsInterruptedException{// 1.消息体Message message =MessageBuilder.withBody("hello, spring amqp!".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();// 2.发送消息
rabbitTemplate.convertAndSend("simple.queue", message);}
5.1.3 消费者消息确认
设想这样的场景:
- RabbitMQ 投递消息给消费者
- 消费者获取消息后,返回 ACK 给 RabbitMQ
- RabbitMQ 删除消息
- 消费者宕机,消息尚未处理
这样,消息就丢失了。因此消费者返回 ACK 的时机非常重要
而 SpringAMQP 则允许配置三种确认模式:
- manual:手动 ack,需要在业务代码结束后,调用 ap i发送 ack。
- auto:自动 ack,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack;抛出异常则返回nack
- none:关闭 ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除
由此可知:
- none模式下,消息投递是不可靠的,可能丢失
- auto模式类似事务机制,出现异常时返回nack,消息回滚到mq;没有异常,返回ack
- manual:自己根据业务情况,判断什么时候该ack
一般,我们都是使用默认的auto即可
手动ack:
spring:rabbitmq:listener:simple:acknowledge-mode: manual # 手动ack
@RabbitListener(
bindings ={@QueueBinding(
value =@Queue,
exchange =@Exchange(value ="boot-topic-exchange",type ="topic"),
key ={"black.*.#"})})publicvoidgetMessage3(String msg,Channel channel,Message message)throwsIOException{System.out.println("接收到消息3:"+ msg);// 手动 ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}
auto 模式
首先我们修改消费者的 yml 配置文件
spring:rabbitmq:listener:simple:acknowledge-mode: auto # 自动ack
在异常位置打断点,再次发送消息,程序卡在断点时,可以发现此时消息状态为unack(未确定状态)
抛出异常后,因为Spring会自动返回nack,所以消息恢复至Ready状态,并且没有被RabbitMQ删除:
5.1.4 消费失败重试机制
当消费者出现异常后,消息会不断 requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次 requeue,无限循环,导致 mq 的消息处理飙升,带来不必要的压力,我们怎么办?
我们可以利用 Spring 的 retry 机制(本地重试),在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 mq 队列。
修改 consumer 服务的 application.yml 文件,添加内容
spring:rabbitmq:listener:simple:retry:enabled:true# 开启消费者失败重试initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier:1# 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts:3# 最大重试次数stateless:true# true无状态;false有状态。如果业务中包含事务,这里改为false
重启 consumer 服务,重复之前的测试。可以发现:
- 在重试3次后,SpringAMQP会抛出异常 AmqpRejectAndDontRequeueException,说明本地重试触发了
- 查看 RabbitMQ 控制台,发现消息被删除了,说明最后 SpringAMQP 返回的是ack,mq删除消息了
由上述的发现可得知,开启本地重试后,最终消息还是会丢失,这个我们需要怎么解决?
这个时候我们可以自定义失败策略
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由 Spring 内部机制决定的,
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecovery 接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack,消息重新入队,这种也类似无限循环
- ** RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机 **
比较优雅的一种处理方案是 RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。
在 consumer 中定义处理失败消息的交换机和队列
@BeanpublicDirectExchangeerrorMessageExchange(){returnnewDirectExchange("error.direct");}@BeanpublicQueueerrorQueue(){returnnewQueue("error.queue",true);}@BeanpublicBindingerrorBinding(Queue errorQueue,DirectExchange errorMessageExchange){returnBindingBuilder.bind(errorQueue).to(errorMessageExchange).with("error");}
定义一个 RepublishMessageRecoverer,关联队列和交换机
@BeanpublicMessageRecovererrepublishMessageRecoverer(RabbitTemplate rabbitTemplate){returnnewRepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
5.2 死信交换机
5.2.1 什么是死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为false
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息满了,无法投递
如果这个包含死信的队列配置了
dead-letter-exchange
属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,检查DLX)
另外,队列将死信投递给死信交换机时,必须知道两个信息:
- 死信交换机名称
- 死信交换机与死信队列绑定的RoutingKey
这样才能确保投递的消息能到达死信交换机,并且正确的路由到死信队列
在失败重试策略中,默认的 RejectAndDontRequeueRecoverer 会在本地重试次数耗尽后,发送 reject 给RabbitMQ,消息变成死信,被丢弃。
我们可以给 simple.queue 添加一个死信交换机,给死信交换机绑定一个队列。这样消息变成死信后也不会丢弃,而是最终投递到死信交换机,路由到与死信交换机绑定的队列,这也是一种防止信息丢失的方法,不过我们经常用的是失败策略,而不用死信交换机,原因是因为配置麻烦
@BeanpublicQueuesimpleQueue(){// 配置死信交换机returnQueueBuilder.durable("simple.queue").deadLetterExchange("dl.exchange").deadLetterRoutingKey("dl").build();}@BeanpublicQueuedlQueue(){returnnewQueue("dl.queue");}@BeanpublicDirectExchangedlExchange(){returnnewDirectExchange("dl.exchange");}@BeanpublicBindingdlBinding(){returnBindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl");}
5.2.2 TTL
一个队列中的消息如果超时未消费,则会变为死信,超时分为两种情况
- 消息所在的队列设置了超时时间
- 消息本身设置了超时时间
/**
* 基于注解方式声明一组死信交换机和队列
*/@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="dl.queue"),
exchange =@Exchange(name ="dl.direct"),
key ="dl"))publicvoidlistenDlQueue(String msg){
log.info("接收到 ttl.queue的延迟消息:{}", msg);}@BeanpublicDirectExchangettlExchange(){returnnewDirectExchange("ttl.direct");}@BeanpublicQueuettlQueue(){returnQueueBuilder.durable("ttl.queue").ttl(10000)// 设置队列的超时时间 10s.deadLetterExchange("dl.direct")// 指定死信交换机.deadLetterRoutingKey("dl")// 指定死信 RoutingKey.build();}@BeanpublicBindingttlBinding(){returnBindingBuilder.bind(ttlQueue()).to(ttlExchange()).with("ttl");}@TestpublicvoidtestTTLMsg(){// 创建消息Message message =MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)).setExpiration("5000")// 设置消息的过期时间 5s.build();CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("ttl.direct","ttl", message, correlationData);}
总结:
消息超时的两种方式是?
- 给队列设置ttl属性,进入队列后超过ttl时间的消息变为死信
- 给消息设置ttl属性,队列接收到消息超过ttl时间后变为死信
如何实现发送一个消息20秒后消费者才收到消息?
- 给消息的目标队列指定死信交换机
- 将消费者监听的队列绑定到死信交换机
- 发送消息时给消息设置超时时间为20秒
5.2.3 延迟队列
上面的死信交换机和 TTL 在我们项目中一般不使用,我们一般使用延迟队列来进行实现延迟发送效果
因为延迟队列的需求非常多,所以RabbitMQ的官方也推出了一个插件,原生支持延迟队列效果。
这个插件就是DelayExchange插件。参考RabbitMQ的插件列表页面:https://www.rabbitmq.com/community-plugins.html
在使用的时候我们首先需要安装,这里不再演示
DelayExchange 原理
DelayExchange需要将一个交换机声明为 delayed 类型。当我们发送消息到 delayExchange 时,流程如下:
- 接收消息
- 判断消息是否具备 x-delay 属性
- 如果有 x-delay 属性,说明是延迟消息,持久化到硬盘,读取 x-delay 值,作为延迟时间
- 返回routing not found结果给消息发送者
- x-delay 时间到期后,重新投递消息到指定队列
使用 DelayExchange
- 首先声明 DelayExchange 交换机
注解声明(推荐)
@RabbitListener(bindings =@QueueBinding(
value =@Queue("delay.queue"),// 队列
exchange =@Exchange(value ="delay.direct", delayed ="true"),// Dealay 交换机
key ="delay"))publicvoidlistenDelayQueue(String msg){
log.info("接收到 delay.queue的延迟消息:{}", msg);}
基于 Bean 方式
- 发送消息
发送消息时,一定要携带 x-delay 属性,指定延迟的时间:
@TestpublicvoidtestDelayMsg(){// 创建消息Message message =MessageBuilder.withBody("hello, ttl message".getBytes(StandardCharsets.UTF_8)).setHeader("x-delay",10000).build();CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("delay.direct","delay", message, correlationData);}
5.3 惰性队列
5.3.1 消息堆积问题🔥🔥🔥
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。之后发送的消息就会成为死信,可能会被丢弃,这就是消息堆积问题
解决消息堆积有三种种思路:
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池加快消息处理速度
- 扩大队列容积,提高堆积上限
5.3.2 惰性队列
从 RabbitMQ 的 3.6.0 版本开始,就增加了 Lazy Queues 的概念,也就是惰性队列。惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
惰性队列的优点有哪些?
- 基于磁盘存储,消息上限高
- 没有间歇性的page-out,性能比较稳定
惰性队列的缺点有哪些?
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘IO
基于命令行设置lazy-queue
而要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。可以通过命令行将一个运行中的队列修改为惰性队列:
rabbitmqctl set_policy Lazy "^simple.queue$"'{"queue-mode":"lazy"}' --apply-to queues
命令解读:
rabbitmqctl
:RabbitMQ的命令行工具set_policy
:添加一个策略Lazy
:策略名称,可以自定义"^lazy-queue$"
:用正则表达式匹配队列的名字'{"queue-mode":"lazy"}'
:设置队列模式为lazy模式--apply-to queues
:策略的作用对象,是所有的队列
基于@Bean声明lazy-queue
@BeanpublicQueuelazyQueue(){returnQueueBuilder.durable("lazy.queue").lazy()// 开启 x-queue-mode 为 lazy.build();}
基于@RabbitListener声明LazyQueue
@RabbitListener(queuesToDeclare =@Queue(
value ="lazy.queue",
durable ="true",
arguments =@Argument(name ="x-queue-mode", value ="lazy")// 惰性队列))publicvoidlistenLazyQueue(String msg){
log.info("接收到 lazy.queue的消息:{}", msg);}
5.4 MQ 集群
集群搭建,这部分一般开发不会搭建,而是运维搭建,了解即可,但是我们需要知道 MQ 的集群以及特点
RabbitMQ的是基于Erlang语言编写,而Erlang又是一个面向并发的语言,天然支持集群模式。RabbitMQ的集群有两种模式:
- 普通模式:普通模式集群提高了并发能力,但是不进行数据同步,每个MQ都有自己的队列、数据信息(其它元数据信息如交换机等会同步)。例如我们有2个MQ:mq1,和mq2,如果你的消息在mq1,而你连接到了mq2,那么mq2会去mq1拉取消息,然后返回给你。如果mq1宕机,消息就会丢失。
- 镜像模式:与普通模式不同,队列会在各个mq的镜像节点之间同步,因此你连接到任何一个镜像节点,均可获取到消息,提高了数据的可用性。而且如果一个节点宕机,并不会导致数据丢失。不过,这种方式增加了数据同步的带宽消耗。
镜像集群虽然支持主从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在RabbitMQ的3.8版本以后,推出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。
5.4.1 普通集群
普通集群,或者叫标准集群(classic cluster),具备下列特征
- 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息。不包含队列中的消息
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
- 队列所在节点宕机,队列中的消息就会丢失
5.4.2 镜像集群
在普通集群中,一旦创建队列的主机宕机,队列就会不可用。不具备高可用能力。如果要解决这个问题,必须使用官方提供的镜像集群方案
官方文档地址:https://www.rabbitmq.com/ha.html
镜像集群:本质是主从模式,具备下面的特征
- 交换机、队列、队列中的消息会在各个mq的镜像节点之间同步备份。
- 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。
- 一个队列的主节点可能是另一个队列的镜像节点
- 所有操作都是主节点完成,然后同步给镜像节点
- 主宕机后,镜像节点会替代成新的主节点(如果在主从同步完成前,主就已经宕机,可能出现数据丢失)
5.4.3 仲裁队列
仲裁队列:仲裁队列是3.8版本以后才有的新功能,用来替代镜像队列,具备下列特征
- 与镜像队列一样,都是主从模式,支持主从数据同步
- 使用非常简单,没有复杂的配置
- 主从同步基于Raft协议,强一致
用 Java 代码创建仲裁队列
@BeanpublicQueuequorumQueue(){returnQueueBuilder.durable("quorum.queue")// 持久化.quorum()// 仲裁队列.build();}
SpringAMQP 连接 MQ 集群
spring:rabbitmq:addresses: 192.168.80.128:8071, 192.168.80.128:8072, 192.168.80.128:8073username: muziteng
password:806823virtual-host: /
注意,这里用 address 来代替 host、port 方式
更多知识在我的语雀知识库:https://www.yuque.com/ambition-bcpii/muziteng
版权归原作者 lili要努力 所有, 如有侵权,请联系我们删除。