0


RabbitMQ服务异步通信-高级篇

消息可靠性

生产者确认机制

提出问题:消息投递过程中,生产者——> MQ ——> 消费者 中间会出现消息丢失问题,导致信息没有及时同步

先梳理一下流程

生产者生产个消息 ——> 建立连接——>通道传递进mq交换机——>交换机传给队列——>消费者拉取数据消费

1.生产者生产完消息,相当于写好代码,写错了自己改,然后建立连接投递,连接建立不成功会再建立,这里不用操心,如果在投递过程中消息丢失了,生产者发送了,消费者没收到,这要是情侣铁定闹分手,有可能是网络波动造成的也有可能没有给了一个不存在的交换机,所以mq官方需要处理这些问题,这就交给他们了,但是我们要了解其中的处理方案

发现问题:消息没到交换机或者到了交换机没到队列,这两种情况基本一致,因为交换机没有存储信息能力

理论解决方案:没到mq的队列就丢失了,连给队列持久化保存到硬盘的机会都没得,那就只能重新传一份。

mq的实践解决方案:生产者确认机制,首先需要给每个信息绑定全局唯一的ID,信息可能有上百万条,谁知道哪个丢失了,然后生产者发生了一个带有ID的信息,连接一旦建立,交换机这边是否接收到了这条ID,接收到了给生产者发一个ack,让他先别发了,没接收到也得发一个nack,如果路由到队列后,则发生一个回执ack,说我任务完成了

消息持久化

前面的生产者确认机制已经确保消息可靠的传输到队列,如果RabbitMQ宕机的话,依旧会导致小心丢失,由于MQ是内存存储,所以需要持久化保存到硬盘

这里面涉及到了三个持久化

交换机持久化,队列持久化,消息持久化

1.交换机默认是非持久化的,mq重启就会丢失,SpringAMQP中可以通过代码指定交换机持久化,默认情况下,由SpringAMQP声明的交换机都是持久化的。

@Bean
public DirectExchange simpleExchange(){
    // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct", true, false);
}

2.队列持久化

队列默认是非持久化的,mq重启就会丢失,SpringAMQP中可以通过代码指定队列持久化,默认情况下,由SpringAMQP声明的队列都是持久化的。

@Bean
public Queue simpleQueue(){
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

3.消息持久化

消息持久化是把生产者生产的信息需要持久化,保存到硬盘里

默认情况下,SpringAMQP发出的任何消息都是持久化的,不用特意指定。

消费者消息确认

RabbitMQ是阅后即焚机制,RabbitMQ确认消息被消费者消费后会立刻删除。

而RabbitMQ是通过消费者回执来确认消费者是否成功处理消息的:消费者获取消息后,应该向RabbitMQ发送ACK回执,表明自己已经处理消息。

这里面涉及一点,消息是存储在MQ当中,消费者拉取完之后,MQ删除消息的时机是哪,如果消费者拉取完之后,没消费呢,自己挂了,又怎么办

结合以上两点,最可靠的就是消费者消费完了,通知一声我完事了,然后MQ把消息删除,防止占用内存

MQ提供了三种机制

  • manual:手动ack,需要在业务代码结束后,调用api发送ack。
  • auto:自动ack,由spring监测listener代码是否出现异常,没有异常则返回ack;抛出异常则返回nack
  • none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除

很显然none模式下,消费者获取完消息之后,服务一旦宕机,消息也就丢失了,除非能确保自己的服务非常稳定才使用

auto模式类似于事务的回滚,消息一旦长时间没被消费会抛出异常,导致回滚,MQ消息不会删除

消费失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力:

解决方案:本地重试

可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000 # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

重启消费者服务,重复之前的测试。可以发现:

  • 在重试3次后,SpringAMQP会抛出异常AmqpRejectAndDontRequeueException,说明本地重试触发了
  • 查看RabbitMQ控制台,发现消息被删除了,说明最后SpringAMQP返回的是ack,mq删除消息了

结论:

  • 开启本地重试时,消息处理过程中抛出异常,不会requeue到队列,而是在消费者本地重试
  • 重试达到最大次数后,Spring会返回ack,消息会被丢弃

失败策略

在之前的测试中,达到最大重试次数后,消息会被丢弃,这是由Spring内部机制决定的。

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecovery接口来处理,它包含三种不同的实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的队列,后续由人工集中处理。这个队列就是死信队列

Rabbitmq的可靠性保证

通过以下四点保证

  • 开启生产者确认机制,确保生产者的消息能到达队列
  • 开启持久化功能,确保消息未消费前在队列中不会丢失
  • 开启消费者确认机制为auto,由spring确认消息处理成功后完成ack
  • 开启消费者失败重试机制,并设置MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

本文转载自: https://blog.csdn.net/weixin_46432131/article/details/136498050
版权归原作者 沉默是缙 所有, 如有侵权,请联系我们删除。

“RabbitMQ服务异步通信-高级篇”的评论:

还没有评论