消息可靠性
生产者确认机制
提出问题:消息投递过程中,生产者——> MQ ——> 消费者 中间会出现消息丢失问题,导致信息没有及时同步
先梳理一下流程
生产者生产个消息 ——> 建立连接——>通道传递进mq交换机——>交换机传给队列——>消费者拉取数据消费
1.生产者生产完消息,相当于写好代码,写错了自己改,然后建立连接投递,连接建立不成功会再建立,这里不用操心,如果在投递过程中消息丢失了,生产者发送了,消费者没收到,这要是情侣铁定闹分手,有可能是网络波动造成的也有可能没有给了一个不存在的交换机,所以mq官方需要处理这些问题,这就交给他们了,但是我们要了解其中的处理方案
发现问题:消息没到交换机或者到了交换机没到队列,这两种情况基本一致,因为交换机没有存储信息能力
理论解决方案:没到mq的队列就丢失了,连给队列持久化保存到硬盘的机会都没得,那就只能重新传一份。
mq的实践解决方案:生产者确认机制,首先需要给每个信息绑定全局唯一的ID,信息可能有上百万条,谁知道哪个丢失了,然后生产者发生了一个带有ID的信息,连接一旦建立,交换机这边是否接收到了这条ID,接收到了给生产者发一个ack,让他先别发了,没接收到也得发一个nack,如果路由到队列后,则发生一个回执ack,说我任务完成了
消息持久化
前面的生产者确认机制已经确保消息可靠的传输到队列,如果RabbitMQ宕机的话,依旧会导致小心丢失,由于MQ是内存存储,所以需要持久化保存到硬盘
这里面涉及到了三个持久化
交换机持久化,队列持久化,消息持久化
1.交换机默认是非持久化的,mq重启就会丢失,SpringAMQP中可以通过代码指定交换机持久化,默认情况下,由SpringAMQP声明的交换机都是持久化的。
@Bean public DirectExchange simpleExchange(){ // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange("simple.direct", true, false); }
2.队列持久化
队列默认是非持久化的,mq重启就会丢失,SpringAMQP中可以通过代码指定队列持久化,默认情况下,由SpringAMQP声明的队列都是持久化的。
@Bean public Queue simpleQueue(){ // 使用QueueBuilder构建队列,durable就是持久化的 return QueueBuilder.durable("simple.queue").build(); }
3.消息持久化
消息持久化是把生产者生产的信息需要持久化,保存到硬盘里
默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。
消费者消息确认
RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。
而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。
这里面涉及一点,消息是存储在MQ当中,消费者拉取完之后,MQ删除消息的时机是哪,如果消费者拉取完之后,没消费呢,自己挂了,又怎么办
结合以上两点,最可靠的就是消费者消费完了,通知一声我完事了,然后MQ把消息删除,防止占用内存
MQ提供了三种机制
- manual:手动ack,需要在业务代码结束后,调用api发送ack。
- auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除
很显然none模式下,消费者获取完消息之后,服务一旦宕机,消息也就丢失了,除非能确保自己的服务非常稳定才使用
auto模式类似于事务的回滚,消息一旦长时间没被消费会抛出异常,导致回滚,MQ消息不会删除
消费失败重试机制
当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:
解决方案:本地重试
可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
修改consumer服务的application.yml文件,添加内容:
spring: rabbitmq: listener: simple: retry: enabled: true # 开启消费者失败重试 initial-interval: 1000 # 初识的失败等待时长为1秒 multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval max-attempts: 3 # 最大重试次数 stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
重启消费者服务,重复之前的测试。可以发现:
- 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
- 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了
结论:
- 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
- 重试达到最大次数后,Spring会返回ack,消息会被丢弃
失败策略
在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。这个队列就是死信队列
Rabbitmq的可靠性保证
通过以下四点保证
- 开启生产者确认机制,确保生产者的消息能到达队列
- 开启持久化功能,确保消息未消费前在队列中不会丢失
- 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
- 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
版权归原作者 沉默是缙 所有, 如有侵权,请联系我们删除。