0


RabbitMQ保证消息可靠性

文章目录

生产者消息确认

基础概念

RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。消息发送到 MQ 以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:

1.publisher-confirm,发送者确认

① 消息成功投递到交换机,返回 ack
② 消息未投递到交换机,返回 nack

2.publisher-return,发送者回执

消息投递到交换机了,但是没有路由到队列

在使用确认机制发送消息时,需要给每个消息设置一个全局唯一 id,以区分不同消息,避免 ack 冲突

代码实现

1.在 publisher 的配置文件 application.yml 中添加如下配置:

spring:rabbitmq:publisher-confirm-type: correlated
    publisher-returns:truetemplate:mandatory:true

配置说明:

(1)publish-confirm-type:开启 publisher-confirm,这里支持两种类型:

① simple:同步等待 confirm 结果,直到超时
② correlated:异步回调,定义 ConfirmCallback,MQ 返回结果时会回调这个 ConfirmCallback

(2)publish-returns:开启 publish-return 功能,同样是基于 callback 机制,不过是定义 ReturnCallback

(3)template.mandatory:定义消息路由失败时的策略。true:调用ReturnCallback;false:直接丢弃消息

2.在 publisher 中编写配置类,配置 ConfirmCallback 和 ReturnCallback。实际上每个 RabbitTemplate 只能配置一个 ReturnCallback,而可以配置多个 ConfirmCallback(ConfirmCallback 可以直接在消息中配置,不同的消息可以有不同的 ConfirmCallback),这里为了简便,就只配置一个 ConfirmCallback

@Configuration@Slf4jpublicclassRabbitConfigimplementsApplicationContextAware{@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置ConfirmCallback
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
            log.info("ConfirmCallback");if(ack){
                log.info("消息成功投递到交换机:{}", correlationData);}else{
                log.error("消息投递到交换机失败:{}", correlationData);
                log.error("失败原因:{}", cause);}});// 配置 ReturnCallback
        rabbitTemplate.setReturnsCallback((returnedMessage ->{
            log.error("ReturnCallback");
            log.error("消息路由到队列失败");
            log.error("响应码:{}", returnedMessage.getReplyCode());
            log.error("失败原因;{}", returnedMessage.getReplyText());
            log.error("交换机;{}", returnedMessage.getExchange());
            log.error("路由Key:{}", returnedMessage.getRoutingKey());
            log.error("消息:{}", returnedMessage.getMessage());}));}}

3.在 consumer 中编写配置类,声明一个不绑定队列的独立交换机和一个绑定了一个队列的交换机

@ConfigurationpublicclassConfirmConfig{// 独立交换机,不绑定队列@BeanpublicFanoutExchangeindependentExchange(){returnnewFanoutExchange("independent.exchange");}// 绑定一个队列的交换机@BeanpublicFanoutExchangecarryExchange(){returnnewFanoutExchange("carry.exchange");}@BeanpublicQueuecarryQueue(){returnnewQueue("carry.queue");}@BeanpublicBindingexchangeBinding(Queue carryQueue,FanoutExchange carryExchange){returnBindingBuilder.bind(carryQueue).to(carryExchange);}}

4.编写测试类进行测试

考虑以下三种情况:

① 消息发送到不存在的交换机
② 消息发送到存在的交换机,但是交换机没有绑定队列,即消息路由失败
③ 消息成功投递到队列

(1)消息发送到不存在的交换机

@SpringBootTestpublicclassReliabilityTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 将消息发送给不存在的交换机@TestpublicvoidtestSend2NonExistedExchange(){String message ="hello xllz";String exchangeName ="non-exist-exchange";CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(exchangeName,"", message, correlationData);}}

image-20240706162613605

可以看到消息投递到交换机失败,触发了 ConfirmCallback,并且返回了 nack

(2)消息发送到存在的交换机,但是交换机没有绑定队列,即消息路由失败

@SpringBootTestpublicclassReliabilityTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 将消息发送到存在的交换机,但是交换机不绑定队列@TestpublicvoidtestSend2NonExistedQueue(){String message ="hello xllz";String exchangeName ="independent.exchange";CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(exchangeName,"", message, correlationData);}}

image-20240706162906353

可以看到消息成功投递到交换机,但是路由到队列失败,同时触发了 ConfirmCallback 和 ReturnCallback,其中 ConfirmCallback 返回了 ack

(3)消息成功投递到队列

@SpringBootTestpublicclassReliabilityTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 消息投递到存在的交换机和队列@TestpublicvoidtestSend2ExistedExchangeAndQueue(){String message ="hello xllz";String exchangeName ="carry.exchange";CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(exchangeName,"", message, correlationData);}}

image-20240706163340794

可以看到触发了 ConfirmCallback,但没有触发 ReturnCallback,且在 ConfirmCallback 中返回了 ack

持久化

基础概念

MQ 默认是内存存储消息,开启持久化功能可以确保缓存在 MQ 中的消息不丢失。主要分为三个部分的持久化:交换机持久化、队列持久化和消息持久化。在 SpringAMQP 中,交换机、队列和消息都是默认持久化的,不过我们可以显示设置持久化

代码实现

1.交换机持久化

@ConfigurationpublicclassDurableConfig{@BeanpublicDirectExchangedurableExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除returnnewDirectExchange("durable.exchange",true,false);}}

image-20240706193242363

2.队列持久化

@ConfigurationpublicclassDurableConfig{@BeanpublicQueuedurableQueue(){// 两个参数:队列名称、是否持久化returnnewQueue("durable.queue",true);}}

image-20240706193308903

3.消息持久化

@SpringBootTestpublicclassReliabilityTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 投递持久化消息@TestpublicvoidtestSendDurableMessage(){String exchangeName ="durable.exchange";Message message =MessageBuilder.withBody("hello xllz".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
        rabbitTemplate.convertAndSend(exchangeName,"key", message);}}

消费者消息确认

基础概念

RabbitMQ 支持消费者确认机制,即:消费者处理消息后可以向 MQ 发送 ack 回执,MQ 收到 ack 回执后才会删除该消息。而 SpringAMQP 则允许配置三种确认模式:

manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack
auto(推荐):自动 ack,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack;抛出异常则返回 nack
none:关闭 ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除

手动 ack 常用的 API 如下:

  1. channel.basicAck(long deliveryTag, boolean multiple)
    

    :用于确认消息已经被成功处理。

    deliveryTag
    

    是消息的唯一标识符,由 RabbitMQ 生成,可以从

    Message
    

    对象的

    getMessageProperties().getDeliveryTag()
    

    方法获取;

    multiple
    

    表示是否批量确认,true 表示确认所有小于等于当前delivery tag 的消息;如果为 false,则只确认当前消息

  2. channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
    

    :用于拒绝消息。

    deliveryTag
    

    是消息的唯一标识符;

    multiple
    

    表示是否批量拒绝;

    requeue
    

    表示是否重新入队,true 则将消息重新放入队列,false 则丢弃消息

  3. channel.basicReject(long deliveryTag, boolean requeue)
    

    :类似于

    basicNack
    

    ,但只能拒绝单条消息。

    deliveryTag
    

    是消息的唯一标识符;

    requeue
    

    表示是否重新入队

代码实现

自动确认

1.在 consumer 编写配置文件 application.yml

spring:rabbitmq:listener:simple:prefetch:1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack

2.在 consumer 配置类声明一个队列

@ConfigurationpublicclassConfirmConfig{@BeanpublicQueueautoQueue(){returnnewQueue("auto.queue");}}

3.在 consumer 中声明监听方法

@ComponentpublicclassConfirmListener{@RabbitListener(queues ="auto.queue")publicvoidlistenAutoQueue(String msg){System.out.println("接收到 auto queue 的消息"+ msg +",开始处理...");Thread.sleep(5000);System.out.println("消息"+ msg +",处理完成...");}}

4.在 publisher 中编写测试类方法发送消息

@SpringBootTestpublicclassReliabilityTests{// 测试 auto ack@TestpublicvoidtestSendMsg2AutoQueue(){String queueName ="auto.queue";for(int i =1; i <=10; i++){String message ="第"+ i +"条消息,时间:"+LocalDateTime.now();
            rabbitTemplate.convertAndSend(queueName, message);}}}

测试结果:

image-20240706201825487

由于

prefetch=1

,即消费者只会预取一条消息,因此 unacked 值为1,ready 值对应 MQ 中还未被队列取走的消息数量

5.修改 consumer 监听方法和 publisher 发送方法,测试发送异常消息

consumer:

@ComponentpublicclassConfirmListener{@RabbitListener(queues ="auto.queue")publicvoidlistenAutoQueue(String msg)throwsInterruptedException{if(msg.equals("error")){thrownewRuntimeException("消息异常");}System.out.println("接收到 auto queue 的消息"+ msg +",开始处理...");Thread.sleep(5000);System.out.println("消息"+ msg +",处理完成...");}}

publisher:

@SpringBootTestpublicclassReliabilityTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 发送异常消息,测试 auto ack@TestpublicvoidtestSendErrorMsg2AutoQueue(){String queueName ="auto.queue";for(int i =1; i <=10; i++){String message = i ==5?"error":"success";
            rabbitTemplate.convertAndSend(queueName, message);}}}

测试结果:

image-20240706203153850

image-20240706203203534

可以看到,消费者会一直发送 nack,之后消息会不断重复进入队头,导致之后的消息无法被正常消费

手动确认

1.在 consumer 的配置文件 application.yml

spring:rabbitmq:listener:simple:prefetch:1acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack

2.在 consumer 配置类声明一个队列

@ConfigurationpublicclassConfirmConfig{@BeanpublicQueuemanualQueue(){returnnewQueue("manual.queue");}}

3.在 consumer 中声明监听方法

@ComponentpublicclassConfirmListener{@RabbitListener(queues ="manual.queue", ackMode ="MANUAL")publicvoidlistenManualQueue(Message message,Channel channel)throwsException{try{String body =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println("接收到 manual queue 的消息"+ body +",处理中...");Thread.sleep(5000);// acklong deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicAck(deliveryTag,false);System.out.println("消息"+ body +"处理完成...");}catch(Exception e){// nacklong deliveryTag = message.getMessageProperties().getDeliveryTag();
            channel.basicNack(deliveryTag,false,true);System.out.println("消息处理失败,"+ e.getMessage());}}}

4.在 publisher 中编写测试类方法发送消息

@SpringBootTestpublicclassReliabilityTests{// 测试 manual ack@TestpublicvoidtestSendMsg2ManualQueue(){String queueName ="manual.queue";for(int i =1; i <=10; i++){String message ="第"+ i +"条消息,时间:"+LocalDateTime.now();
            rabbitTemplate.convertAndSend(queueName, message);}}}

测试结果:

image-20240706210342467

这里如果发送了异常消息,结果和自动确认一致,因此手动确认和自动确认本质上机制是一样的,只是确认方式不同,一个由 spring 自动确认,一个由业务代码调用 api 手动确认

失败重试机制

基础概念

消费者确认机制选用 auto 时,当消费者出现异常后,消息会不断 requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致 MQ 的消息处理飙升,带来不必要的压力

我们可以利用 Spring 的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 MQ 队列

消费者失败消息处理策略:

在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三种不同的实现

① **

RejectAndDontRequeueRecoverer

**:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式
② **

ImmediateRequeueMessageRecoverer

**:重试耗尽后,返回 nack,消息重新入队
③ **

RepublishMessageRecoverer

**(推荐):重试耗尽后,将失败消息投递到指定的交换机。此时就可以在这个交换机后绑定一个专门处理失败消息的队列,并可以让一个新的消费者来监听队列,通过告诉管理员或其他方式来处理这些失败消息

代码实现

这里以

RepublishMessageRecoverer

的失败重试机制作为示例

1.在 consumer 中编写配置文件 application.yml

spring:rabbitmq:listener:simple:retry:enabled:true# 开启消费者失败重试initial-interval:1000# 初始的失败等待时长为1秒multiplier:3# 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts:4# 最大重试次数stateless:true# true无状态;false有状态。如果业务中包含事务,这里改为false

2.在 consumer 中编写配置类,声明交换机、队列及绑定关系

@ConfigurationpublicclassRetryConfig{@BeanpublicDirectExchangeerrorExchange(){returnnewDirectExchange("error.exchange");}@BeanpublicQueueerrorQueue(){returnnewQueue("error.queue");}@BeanpublicBindingerrorBinding(DirectExchange errorExchange,Queue errorQueue){returnBindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@BeanpublicDirectExchangeretryExchange(){returnnewDirectExchange("retry.exchange");}@BeanpublicQueueretryQueue(){returnnewQueue("retry.queue");}@BeanpublicBindingretryBinding(DirectExchange retryExchange,Queue retryQueue){returnBindingBuilder.bind(retryQueue).to(retryExchange).with("retry");}}

3.在 consumer 定义

RepublishMessageRecoverer
@ConfigurationpublicclassRetryConfig{@BeanpublicMessageRecovererrepublishMessageRecover(RabbitTemplate rabbitTemplate){// 第一个参数是rabbitTemplate,第二个参数是异常消息投递的交换机,第三个参数是异常消息的routingKeyreturnnewRepublishMessageRecoverer(rabbitTemplate,"error.exchange","error");}}

4.在 consumer 编写监听类方法

@ComponentpublicclassRetryListener{@RabbitListener(queues ="retry.queue")publicvoidlistenRetryQueue(String msg){System.out.println(LocalDateTime.now()+",接收到消息"+ msg);if(msg.equals("error")){thrownewRuntimeException("异常消息");}}@RabbitListener(queues ="error.queue")publicvoidlistenErrorQueue(String msg){System.out.println(LocalDateTime.now()+",接收到异常消息"+ msg);}}

5.在 publisher 编写测试类方法,发送异常消息

@SpringBootTestpublicclassRetryTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 发送异常消息@TestpublicvoidtestSendErrorMsg(){String message ="error";String exchangeName ="retry.exchange";
        rabbitTemplate.convertAndSend(exchangeName,"retry", message);}}

测试结果:

image-20240706214244080

可以看到,消息一共重试了 4 次,最后消息被发送到了一个专门的异常交换机中

总结

确保 RabbitMQ 消息的可靠性一共分为四步:

1.开启生产者确认机制,确保生产者的消息能到达队列

2.开启持久化功能,确保消息未消费前在队列中不会丢失

3.开启消费者确认机制为 auto,由 Spring 确认消息处理成功后完成 ack

4.开启消费者失败重试机制,并设置 MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理

标签: rabbitmq Java 后端

本文转载自: https://blog.csdn.net/Vendetta_A_A/article/details/140236301
版权归原作者 小璐乱撞xllz 所有, 如有侵权,请联系我们删除。

“RabbitMQ保证消息可靠性”的评论:

还没有评论