0


RabbitMQ如何保证消息可靠性

1、RabbitMQ消息丢失的可能性

如下图是消息从生产者到消费者的关系图。通过图片我们可以分析,消息从生产者,MQ,消费者这三个环节任一个都有可能丢失。那么下面我们就从这三点进行分析。

1.1 生产者消息丢失场景
  • 生产者发送消息时连接MQ失败
  • 生产发生消息到达MQ后未找到Exchange
  • 生产者发送消息到达MQ的 Exchange 后,未找到合适的 Queue
  • 消息到达MQ后,处理消息的进程发生异常
1.2 MQ导致消息丢失
  • 消息到达MQ,保存到队列后,尚未消费就突然宕机
1.3 消费者丢失
  • 消息接收后尚未处理突然宕机
  • 消息接收后处理过程中抛出异常

综上,我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

2、如何保证生产者消息的可靠性

2.1 生产者重试机制

生产者发送消息时,出现了网络故障,导致与MQ的连接中断。为了解决这个问题,SpringAMQP提供的消息发送时的重试机制。当 RabbitTemplate与MQ连接超时后,多次重试。

我们可以在生产者对应的yml配置中配置:

spring:
rabbitmq:
 connection-timeout: 1s # 设置MQ的连接超时时间
 template:
  retry:
   enabled: true # 开启超时重试机制
   initial-interval: 1000ms # 失败后的初始等待时间
   multiplier: 2 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval *multiplier
   max-attempts: 3 # 最大重试次数

我这边故意把URL地址写错:

spring:
  rabbitmq:
    host: 601.204.203.40

我们可以发现总共重试了3次。如图所示:

但是SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。如果对于业务性能有要求,建议禁用重试机制。

2.2 生产者确认机制

其实一般我们生产者与MQ网络连接比较稳定,所以基本上不用考虑第一种场景。但是还有一些到达MQ之后可能会丢失的场景,比如:

  • 生产者发送的消息到达MQ没有找到Exchange
  • 生产者发送的消息到达MQ找到Exchange,但是没有找到Queue
  • MQ内部处理消息进程异常

基于上面几种情况,RabbitMQ提供了生产者消息确认机制,包括 Publisher Confirm 和 Publisher Return 两种。在开启确认机制的情况下,当生产者发送消息给MQ后,MQ会根据消息处理的情况返回不同的回执。具体主要有以下几种情况:

  • 当消息发送到MQ上,但是路由失败了,会返回会通过Publisher Return返回返回信息。同时会返回ack确认信息,表示投递成功。
  • 当非持久化消息发送到MQ上,并且入队成功,会返回ACK确认信息,表示投递成功。
  • 当持久化消息发送到MQ上,入队成功并且持久化到磁盘,会返回ACK确认信息,表示投递成功。
  • 其它情况都会返回NACK,告知投递失败

其中 ack 和 nack 属于Publisher Confirm机制, ack 是投递成功; nack 是投递失败。而 return 则属于Publisher Return机制。默认情况,这两种都是关闭的,需要通过配置开启。

2.3 实现生产者确认
2.3.1 配置yml开启生产者确认

我们在生产者对应yml配置中加入

spring:
rabbitmq:
 publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
 publisher-returns: true # 开启publisher return机制

其中publisher-confirm-type一共有三种模式:

  • none :关闭confirm机制
  • simple :同步阻塞等待MQ的回执
  • correlated :MQ异步回调返回回执

一般我们都是开启correlated模式。

2.3.2 定义ReturnCallback

每个 RabbitTemplate 只能配置一个 ReturnCallback,我们可以定义一个配置类统一配置。下面我们在生产者中定义配置类ReturnsCallbackConfig:

package com.chenwen.producer.config;
​
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Configuration;
​
import javax.annotation.PostConstruct;
​
@Slf4j
@AllArgsConstructor
@Configuration
public class ReturnsCallbackConfig {
    private final RabbitTemplate rabbitTemplate;
​
    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("交换机exchange: {}", returned.getExchange());
                log.debug("路由键routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}
2.3.3 定义ConfirmCallback

因为每个消息处理逻辑不同,所以我们需要每个消息单独定义ConfirmCallback。其实简单来说,就是是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数CorrelationData。

CorrelationData中包含两个核心的东西:

  • id :消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture :回执结果的Future对象

将来MQ的回执就会通过这个 Future 来返回,我们可以提前给 CorrelationData 中的 Future 添加回调函数来处理消息回执:

下面我们定义一个测试生产者ConfirmCallback方法:

@Test
    void testProducerConfirmCallback() throws InterruptedException {
        // 创建CorrelationData
        CorrelationData cd = new CorrelationData(UUID.randomUUID().toString());
       cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
           @Override
           public void onFailure(Throwable ex) {
               log.error("消息回调失败", ex);
           }
​
           @Override
           public void onSuccess(CorrelationData.Confirm result) {
               log.info("收到confirm callback回执");
               if (result.isAck()) {
                   log.info("消息发送成功,收到ack");
               } else {
                   // 消息发送失败
                   log.error("消息发送失败,收到nack, 原因:{}", result.getReason());
               }
           }
       });
        rabbitTemplate.convertAndSend("test.queue", "chenwen", "hello", cd);
    }

rabbitTemplate.convertAndSend("test.direct", "chenwen1", "hello", cd);

目前存在交换机test.direct,并且正确的路由键是chenwen。首先我这边故意将路由键我写错成chenwen1。执行测试方法,通过控制台日志可以看到,路由失败后,会通过Publisher Return返回异常信息,并且会返回ACK。

我们修改成功正确的路由键chenwen,执行测试方法,可以看不会返回Publisher Return信息,只返回了ACK。

注意:

开启生产者确认模式比较消耗MQ性能,一般不建议开启。我们分析一下几种场景:

  • 路由失败:这个其实是人为因素。由于我们编程错误导致的。
  • 交换机名称错误:同样是编程错误导致
  • MQ内部故障:这种需要处理,但概率往往较低。所以一般只有对消息可靠性极高的场景才需要开启,这种的我们只需要开启Publisher Confirm模式通过处理nack就可以。

3、MQ消息可靠性

MQ的可靠性,其实就是当消息到达MQ,还没有被消费者消费,MQ就由于某些情况出现重启,导致的消息丢失。主要是这几个方面:

  • 交换机Exchange持久化
  • 队列Queue持久化
  • 消息本身的持久化

下面我们就以控制台展示的为例子:

3.1 Exchange交换机持久化

Durability 就是表示设置交换机持久化的参数, Durable 就是持久化模式, Transient 就是临时模式。

3.2 Queues队列持久化

同样队列持久化可以在控制台Queues那边设置,根据Durability设置,Durable 就是持久化模式, Transient 就是临时模式。

3.3 消息的持久化

根据Delivery mode参数设置成2,就是持久化。

注意:如果开启消息持久化,并且也开启了生产者确认模式。需要等消息持久化到磁盘,才会发送ACK回执,来保证消息的可靠性。

不过出于性能考虑,为了减少IO次数,发送到MQ的消息并不是逐条持久化的,而是每隔一段时间批量持久化。一般间隔在100毫秒左右,这就会导致ACK有一定的延迟,因此建议生产者确认全部采用异步方式。

3.4 LazyQueue

在默认的情况下,生产者发送消息是存放在内存中,以提高收发消息的效率。但是由于某些情况会导致消息堆积。比如:

  • 消费者消费者宕机或出现网络故障
  • 生产者生产消息过快,超过了消费者处理消息的能力
  • 消费者处理业务发生了堵塞

一旦消息堆积,会导致占用的内存越来越大,直到触发内存预警。此时的RabbitMQ会将内存上的消息持久化到磁盘中。这个行为成为PageOut 。 PageOut会耗费一段时间,并且会阻塞队列进程。所以此时RabbitMQ不会再处理新的消息,生产者的所有的请求都会被阻塞。

RabbitMQ为了解决这个问题,从3.6.0版本开始,就增加了Lazy Queues的模式,也就是惰性队列。惰性队列的特性主要有如下:

  • 接收到消息后直接存入磁盘而非内存
  • 消费者要消费消息时才会从磁盘中读取并加载到内存(也就是懒加载)
  • 支持数百万条的消息存储

而在3.12版本之后,LazyQueue已经成为所有队列的默认格式。因此官方推荐升级MQ为3.12版本或者所有队列都设置为LazyQueue模式。

4、消费者的可靠性

当RabbitMQ向消费者投递消息的时候,可能由于某些因素会导致消息丢失,比如:

  • 消息的投递过程中出现网络故障
  • 消费者接受到消息后突然宕机
  • 消息者已经接受到消息了,但是由于消费者处理报错导致异常
  • ...............

一旦发生上面几种情况,都会导致消息丢失。那我RabbitMQ肯定需要知道消费者处理消息的状态,如果失败了,可以再次进行投递。下面我们就来学习一下消费者如何进行消息确认机制的。

4.1 消费者确认机制

消费者处理消息之后,应该向RabbitMQ发送一个回执。告知RabbitMQ自己的消息处理状态。主要有三个:

  • ack:处理消息成功,RabbitMQ从队列中把消息删除
  • nack:处理消息失败,RabbitMQ需要重新投递消息
  • reject:消息处理失败并拒绝该消息,并且RabbitMQ会从队列中删除该消息

一般我们可以使用try catch 成功即返回ack,失败返回nack。但是SpringAMQP帮我们实现了,我们只需要通过配置对应acknowledge-mode参数即可实现。主要有三种模式:

  • none:不处理。即消息投递给消费者后立刻ack,消息会立刻从MQ删除,这种不建议使用
  • manual:手动模式。需要自己在业务代码中调用api,发送 ack 或 reject ,存在业务入侵,但更灵活
  • auto:自动模式。SpringAMQP利用AOP切面对我们方法进行环绕增强。正常执行返回ack,失败则根据异常返回nack或者reject- 如果是业务异常,会自动返回 nack;- 如果是消息处理或校验异常,自动返回 reject ;
spring:
rabbitmq:
 listener:
  simple:
   acknowledge-mode: none # 不做处理
4.1.1首先我们来测试一下acknowledge-mode: none不做处理的场景:

我们先向队列test.queue里面发送一条消息,可以看到test.queue队列现在有一条消息。

消费者这边去监听这个消息,对应代码如下:

@Slf4j
@Component
public class ConsumeMqListener {
    @RabbitListener(queues = "test.queue")
    public void listenWorkQueue2(String msg) throws InterruptedException {
        log.info("spring 消费者接收到消息:【" + msg + "】");
        if (true) {
            throw new MessageConversionException("测试没有开启消费者确认");
        }
        log.info("消息处理完成");
    }
}

dubug断点,还未抛出异常之前,此时我们先去刷下一下UI控制台页面,可以看到test.queue的一条消息已经不存在了。

4.1.2 测试acknowledge-mode: auto自动处理情况下
4.1.2.1 消费者抛出消息异常

现在我们在异常点打上断点,然后看看UI后台消息的状态,我们看到现在消息状态变成了Unacked。

断点执行完之后,我们看到此时UI页面的消息数量为0。说明抛出MessageConversionException异常,是将消息直接reject的。

4.1.2.2 消费者抛出业务异常

我们将消费者内部逻辑抛出RuntimeException异常,在抛出异常之前打上断点,然后观察看UI页面的消息状态也是为Unacked状态。

异常抛出之后我们去看UI页面队列中消息的状态又回到了Ready状态了,这样就可以确保消费者业务异常之后,消息还能够再次投递

5、消费者失败重试机制

5.1 消费者失败重试机制

当消费者出现异常后,消息会不断requeue(重入队)到队列,再重新发送给消费者。如果消费者再次执行依然出错,消息会再次requeue到队列,再次投递,直到消息处理成功为止。极端情况就是消费者一直无法执行成功,那么消息requeue就会无限循环,导致mq的消息处理飙升,带来不必要的压力。为了解决这个问题,Spring又提供了消费者重试机制:在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。

在消费者的application.yml配置下面参数:

spring:
rabbitmq:
 listener:
  simple:
   retry:
    enabled: true # 开启消费者失败重试
    initial-interval: 1000ms # 初识的失败等待时长为1秒
    multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
    max-attempts: 3 # 最大重试次数
    stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

可以看到:

  • 消费者在消息失败之后消息没有重新回到队列,而是在本地重试了三次
  • 在本地重试三次以后,抛出了 AmqpRejectAndDontRequeueException 异常。我们查看UI页面看到消息被删除了,说明返回的消息回执是reject。

5.2 失败处理策略

我们可以看到,当失败重试3次,消息会被从队列中删除。这样对于一些要求消息可靠性比较高的情况下,肯定是不符合的。因此Spring有提供了失败处理的策略。这个策略是由 MessageRecovery 接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试次数耗尽之后,返回reject。直接将消息丢弃,这个是默认模式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回 nack ,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

最好的策略是RepublishMessageRecoverer,重试次数耗尽之后,将消息投递到指定的交换机中,后续由人工来处理。下面我们就来演示一下这个场景,我们在消费者服务这边新增配置类ErrorConfiguration,声明交换机和队列,并绑定:

package com.chenwen.consumer.config;
​
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
@Slf4j
@Configuration
@ConditionalOnProperty(name = "spring.rabbitmq.listener.simple.retry.enabled", havingValue = "true")
public class ErrorConfiguration {
​
​
    @Bean
    public DirectExchange errorExchange(){
        return new DirectExchange("error.direct");
    }
​
    @Bean
    public Queue errorQueue(){
        return new Queue("error.queue");
    }
​
    @Bean
    public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange){
        return BindingBuilder.bind(errorQueue).to(errorExchange).with("error");
    }
​
    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
        log.debug("加载RepublishMessageRecoverer");
        return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error");
    }
}

可以看到,当重试3次数耗尽之后,会将MQ信息放在error.queue队列中,此时error.queue队列多了一条数据,后续我们人为去处理,或者单独使用一个监听去处理。

标签: rabbitmq

本文转载自: https://blog.csdn.net/Chenwen666/article/details/136063818
版权归原作者 陈炆 所有, 如有侵权,请联系我们删除。

“RabbitMQ如何保证消息可靠性”的评论:

还没有评论