文章目录
生产者消息确认
基础概念
RabbitMQ 提供了 publisher confirm 机制来避免消息发送到 MQ 过程中丢失。消息发送到 MQ 以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
1.publisher-confirm,发送者确认
① 消息成功投递到交换机,返回 ack
② 消息未投递到交换机,返回 nack
2.publisher-return,发送者回执
消息投递到交换机了,但是没有路由到队列
在使用确认机制发送消息时,需要给每个消息设置一个全局唯一 id,以区分不同消息,避免 ack 冲突
代码实现
1.在 publisher 的配置文件 application.yml 中添加如下配置:
spring:rabbitmq:publisher-confirm-type: correlated
publisher-returns:truetemplate:mandatory:true
配置说明:
(1)publish-confirm-type:开启 publisher-confirm,这里支持两种类型:
① simple:同步等待 confirm 结果,直到超时
② correlated:异步回调,定义 ConfirmCallback,MQ 返回结果时会回调这个 ConfirmCallback
(2)publish-returns:开启 publish-return 功能,同样是基于 callback 机制,不过是定义 ReturnCallback
(3)template.mandatory:定义消息路由失败时的策略。true:调用ReturnCallback;false:直接丢弃消息
2.在 publisher 中编写配置类,配置 ConfirmCallback 和 ReturnCallback。实际上每个 RabbitTemplate 只能配置一个 ReturnCallback,而可以配置多个 ConfirmCallback(ConfirmCallback 可以直接在消息中配置,不同的消息可以有不同的 ConfirmCallback),这里为了简便,就只配置一个 ConfirmCallback
@Configuration@Slf4jpublicclassRabbitConfigimplementsApplicationContextAware{@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置ConfirmCallback
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{
log.info("ConfirmCallback");if(ack){
log.info("消息成功投递到交换机:{}", correlationData);}else{
log.error("消息投递到交换机失败:{}", correlationData);
log.error("失败原因:{}", cause);}});// 配置 ReturnCallback
rabbitTemplate.setReturnsCallback((returnedMessage ->{
log.error("ReturnCallback");
log.error("消息路由到队列失败");
log.error("响应码:{}", returnedMessage.getReplyCode());
log.error("失败原因;{}", returnedMessage.getReplyText());
log.error("交换机;{}", returnedMessage.getExchange());
log.error("路由Key:{}", returnedMessage.getRoutingKey());
log.error("消息:{}", returnedMessage.getMessage());}));}}
3.在 consumer 中编写配置类,声明一个不绑定队列的独立交换机和一个绑定了一个队列的交换机
@ConfigurationpublicclassConfirmConfig{// 独立交换机,不绑定队列@BeanpublicFanoutExchangeindependentExchange(){returnnewFanoutExchange("independent.exchange");}// 绑定一个队列的交换机@BeanpublicFanoutExchangecarryExchange(){returnnewFanoutExchange("carry.exchange");}@BeanpublicQueuecarryQueue(){returnnewQueue("carry.queue");}@BeanpublicBindingexchangeBinding(Queue carryQueue,FanoutExchange carryExchange){returnBindingBuilder.bind(carryQueue).to(carryExchange);}}
4.编写测试类进行测试
考虑以下三种情况:
① 消息发送到不存在的交换机
② 消息发送到存在的交换机,但是交换机没有绑定队列,即消息路由失败
③ 消息成功投递到队列
(1)消息发送到不存在的交换机
@SpringBootTestpublicclassReliabilityTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 将消息发送给不存在的交换机@TestpublicvoidtestSend2NonExistedExchange(){String message ="hello xllz";String exchangeName ="non-exist-exchange";CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchangeName,"", message, correlationData);}}
可以看到消息投递到交换机失败,触发了 ConfirmCallback,并且返回了 nack
(2)消息发送到存在的交换机,但是交换机没有绑定队列,即消息路由失败
@SpringBootTestpublicclassReliabilityTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 将消息发送到存在的交换机,但是交换机不绑定队列@TestpublicvoidtestSend2NonExistedQueue(){String message ="hello xllz";String exchangeName ="independent.exchange";CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchangeName,"", message, correlationData);}}
可以看到消息成功投递到交换机,但是路由到队列失败,同时触发了 ConfirmCallback 和 ReturnCallback,其中 ConfirmCallback 返回了 ack
(3)消息成功投递到队列
@SpringBootTestpublicclassReliabilityTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 消息投递到存在的交换机和队列@TestpublicvoidtestSend2ExistedExchangeAndQueue(){String message ="hello xllz";String exchangeName ="carry.exchange";CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend(exchangeName,"", message, correlationData);}}
可以看到触发了 ConfirmCallback,但没有触发 ReturnCallback,且在 ConfirmCallback 中返回了 ack
持久化
基础概念
MQ 默认是内存存储消息,开启持久化功能可以确保缓存在 MQ 中的消息不丢失。主要分为三个部分的持久化:交换机持久化、队列持久化和消息持久化。在 SpringAMQP 中,交换机、队列和消息都是默认持久化的,不过我们可以显示设置持久化
代码实现
1.交换机持久化
@ConfigurationpublicclassDurableConfig{@BeanpublicDirectExchangedurableExchange(){// 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除returnnewDirectExchange("durable.exchange",true,false);}}
2.队列持久化
@ConfigurationpublicclassDurableConfig{@BeanpublicQueuedurableQueue(){// 两个参数:队列名称、是否持久化returnnewQueue("durable.queue",true);}}
3.消息持久化
@SpringBootTestpublicclassReliabilityTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 投递持久化消息@TestpublicvoidtestSendDurableMessage(){String exchangeName ="durable.exchange";Message message =MessageBuilder.withBody("hello xllz".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();
rabbitTemplate.convertAndSend(exchangeName,"key", message);}}
消费者消息确认
基础概念
RabbitMQ 支持消费者确认机制,即:消费者处理消息后可以向 MQ 发送 ack 回执,MQ 收到 ack 回执后才会删除该消息。而 SpringAMQP 则允许配置三种确认模式:
① manual:手动 ack,需要在业务代码结束后,调用 api 发送 ack
② auto(推荐):自动 ack,由 spring 监测 listener 代码是否出现异常,没有异常则返回 ack;抛出异常则返回 nack
③ none:关闭 ack,MQ 假定消费者获取消息后会成功处理,因此消息投递后立即被删除
手动 ack 常用的 API 如下:
channel.basicAck(long deliveryTag, boolean multiple)
:用于确认消息已经被成功处理。
deliveryTag
是消息的唯一标识符,由 RabbitMQ 生成,可以从
Message
对象的
getMessageProperties().getDeliveryTag()
方法获取;
multiple
表示是否批量确认,true 表示确认所有小于等于当前delivery tag 的消息;如果为 false,则只确认当前消息
channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
:用于拒绝消息。
deliveryTag
是消息的唯一标识符;
multiple
表示是否批量拒绝;
requeue
表示是否重新入队,true 则将消息重新放入队列,false 则丢弃消息
channel.basicReject(long deliveryTag, boolean requeue)
:类似于
basicNack
,但只能拒绝单条消息。
deliveryTag
是消息的唯一标识符;
requeue
表示是否重新入队
代码实现
自动确认
1.在 consumer 编写配置文件 application.yml
spring:rabbitmq:listener:simple:prefetch:1acknowledge-mode: auto # none,关闭ack;manual,手动ack;auto:自动ack
2.在 consumer 配置类声明一个队列
@ConfigurationpublicclassConfirmConfig{@BeanpublicQueueautoQueue(){returnnewQueue("auto.queue");}}
3.在 consumer 中声明监听方法
@ComponentpublicclassConfirmListener{@RabbitListener(queues ="auto.queue")publicvoidlistenAutoQueue(String msg){System.out.println("接收到 auto queue 的消息"+ msg +",开始处理...");Thread.sleep(5000);System.out.println("消息"+ msg +",处理完成...");}}
4.在 publisher 中编写测试类方法发送消息
@SpringBootTestpublicclassReliabilityTests{// 测试 auto ack@TestpublicvoidtestSendMsg2AutoQueue(){String queueName ="auto.queue";for(int i =1; i <=10; i++){String message ="第"+ i +"条消息,时间:"+LocalDateTime.now();
rabbitTemplate.convertAndSend(queueName, message);}}}
测试结果:
由于
prefetch=1
,即消费者只会预取一条消息,因此 unacked 值为1,ready 值对应 MQ 中还未被队列取走的消息数量
5.修改 consumer 监听方法和 publisher 发送方法,测试发送异常消息
consumer:
@ComponentpublicclassConfirmListener{@RabbitListener(queues ="auto.queue")publicvoidlistenAutoQueue(String msg)throwsInterruptedException{if(msg.equals("error")){thrownewRuntimeException("消息异常");}System.out.println("接收到 auto queue 的消息"+ msg +",开始处理...");Thread.sleep(5000);System.out.println("消息"+ msg +",处理完成...");}}
publisher:
@SpringBootTestpublicclassReliabilityTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 发送异常消息,测试 auto ack@TestpublicvoidtestSendErrorMsg2AutoQueue(){String queueName ="auto.queue";for(int i =1; i <=10; i++){String message = i ==5?"error":"success";
rabbitTemplate.convertAndSend(queueName, message);}}}
测试结果:
可以看到,消费者会一直发送 nack,之后消息会不断重复进入队头,导致之后的消息无法被正常消费
手动确认
1.在 consumer 的配置文件 application.yml
spring:rabbitmq:listener:simple:prefetch:1acknowledge-mode: manual # none,关闭ack;manual,手动ack;auto:自动ack
2.在 consumer 配置类声明一个队列
@ConfigurationpublicclassConfirmConfig{@BeanpublicQueuemanualQueue(){returnnewQueue("manual.queue");}}
3.在 consumer 中声明监听方法
@ComponentpublicclassConfirmListener{@RabbitListener(queues ="manual.queue", ackMode ="MANUAL")publicvoidlistenManualQueue(Message message,Channel channel)throwsException{try{String body =newString(message.getBody(),StandardCharsets.UTF_8);System.out.println("接收到 manual queue 的消息"+ body +",处理中...");Thread.sleep(5000);// acklong deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag,false);System.out.println("消息"+ body +"处理完成...");}catch(Exception e){// nacklong deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicNack(deliveryTag,false,true);System.out.println("消息处理失败,"+ e.getMessage());}}}
4.在 publisher 中编写测试类方法发送消息
@SpringBootTestpublicclassReliabilityTests{// 测试 manual ack@TestpublicvoidtestSendMsg2ManualQueue(){String queueName ="manual.queue";for(int i =1; i <=10; i++){String message ="第"+ i +"条消息,时间:"+LocalDateTime.now();
rabbitTemplate.convertAndSend(queueName, message);}}}
测试结果:
这里如果发送了异常消息,结果和自动确认一致,因此手动确认和自动确认本质上机制是一样的,只是确认方式不同,一个由 spring 自动确认,一个由业务代码调用 api 手动确认
失败重试机制
基础概念
消费者确认机制选用 auto 时,当消费者出现异常后,消息会不断 requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致 MQ 的消息处理飙升,带来不必要的压力
我们可以利用 Spring 的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制的 requeue 到 MQ 队列
消费者失败消息处理策略:
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有 MessageRecoverer 接口来处理,它包含三种不同的实现
① **
RejectAndDontRequeueRecoverer
**:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式
② **
ImmediateRequeueMessageRecoverer
**:重试耗尽后,返回 nack,消息重新入队
③ **
RepublishMessageRecoverer
**(推荐):重试耗尽后,将失败消息投递到指定的交换机。此时就可以在这个交换机后绑定一个专门处理失败消息的队列,并可以让一个新的消费者来监听队列,通过告诉管理员或其他方式来处理这些失败消息
代码实现
这里以
RepublishMessageRecoverer
的失败重试机制作为示例
1.在 consumer 中编写配置文件 application.yml
spring:rabbitmq:listener:simple:retry:enabled:true# 开启消费者失败重试initial-interval:1000# 初始的失败等待时长为1秒multiplier:3# 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts:4# 最大重试次数stateless:true# true无状态;false有状态。如果业务中包含事务,这里改为false
2.在 consumer 中编写配置类,声明交换机、队列及绑定关系
@ConfigurationpublicclassRetryConfig{@BeanpublicDirectExchangeerrorExchange(){returnnewDirectExchange("error.exchange");}@BeanpublicQueueerrorQueue(){returnnewQueue("error.queue");}@BeanpublicBindingerrorBinding(DirectExchange errorExchange,Queue errorQueue){returnBindingBuilder.bind(errorQueue).to(errorExchange).with("error");}@BeanpublicDirectExchangeretryExchange(){returnnewDirectExchange("retry.exchange");}@BeanpublicQueueretryQueue(){returnnewQueue("retry.queue");}@BeanpublicBindingretryBinding(DirectExchange retryExchange,Queue retryQueue){returnBindingBuilder.bind(retryQueue).to(retryExchange).with("retry");}}
3.在 consumer 定义
RepublishMessageRecoverer
@ConfigurationpublicclassRetryConfig{@BeanpublicMessageRecovererrepublishMessageRecover(RabbitTemplate rabbitTemplate){// 第一个参数是rabbitTemplate,第二个参数是异常消息投递的交换机,第三个参数是异常消息的routingKeyreturnnewRepublishMessageRecoverer(rabbitTemplate,"error.exchange","error");}}
4.在 consumer 编写监听类方法
@ComponentpublicclassRetryListener{@RabbitListener(queues ="retry.queue")publicvoidlistenRetryQueue(String msg){System.out.println(LocalDateTime.now()+",接收到消息"+ msg);if(msg.equals("error")){thrownewRuntimeException("异常消息");}}@RabbitListener(queues ="error.queue")publicvoidlistenErrorQueue(String msg){System.out.println(LocalDateTime.now()+",接收到异常消息"+ msg);}}
5.在 publisher 编写测试类方法,发送异常消息
@SpringBootTestpublicclassRetryTests{@AutowiredprivateRabbitTemplate rabbitTemplate;// 发送异常消息@TestpublicvoidtestSendErrorMsg(){String message ="error";String exchangeName ="retry.exchange";
rabbitTemplate.convertAndSend(exchangeName,"retry", message);}}
测试结果:
可以看到,消息一共重试了 4 次,最后消息被发送到了一个专门的异常交换机中
总结
确保 RabbitMQ 消息的可靠性一共分为四步:
1.开启生产者确认机制,确保生产者的消息能到达队列
2.开启持久化功能,确保消息未消费前在队列中不会丢失
3.开启消费者确认机制为 auto,由 Spring 确认消息处理成功后完成 ack
4.开启消费者失败重试机制,并设置 MessageRecoverer,多次重试失败后将消息投递到异常交换机,交由人工处理
版权归原作者 小璐乱撞xllz 所有, 如有侵权,请联系我们删除。