保证消息可靠性
一、生产者可靠性
1、生产者重连机制(防止网络波动)
spring:rabbitmq:connection-timeout: 1s #设置MQ的连接超时时间template:retry:enabled:true#开启超时重试机制(默认是false)initial-interval: 1000ms #失败后的初始等待时间multiplier:1#失败后下次的等待时长倍数,下次等待时长= initial-interval * multipliermax-attempts:3#最大重试次数
2、生产者确认机制
Publisher Return 确认机制
消息投递到MQ但是MQ路由失败,MQ返回路由失败原因
spring:rabbitmq:publisher-returns:true# 开启publisher return机制
@Slf4j@AllArgsConstructor@ConfigurationpublicclassMqConfig{privatefinalRabbitTemplate rabbitTemplate;@PostConstructpublicvoidinit(){
rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returned){
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());}});}}
Publisher Confirm 确认机制
临时消息投递到了MQ且入队成功,返回ACK
持久消息投递到了MQ且入队完成持久化,返回ACK
消息投递异常,返回NACK
spring:rabbitmq:publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-confirm-type
的三种类型
- none 关闭confirm机制
- simple 同步阻塞等待MQ回执消息
- correlated MQ异步回调返回回执消息
@TestvoidtestPublisherConfirm()throwsInterruptedException{CorrelationData cd =newCorrelationData();
cd.getFuture().addCallback(newListenableFutureCallback<CorrelationData.Confirm>(){@OverridepublicvoidonFailure(Throwable ex){// Future发生异常时的处理逻辑,基本不会触发
log.error("handle message ack fail", ex);}@OverridepublicvoidonSuccess(CorrelationData.Confirm result){// Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){
log.debug("发送消息成功,收到 ack!");}else{
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());//TODO 重试发送}}});// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct","red1","hello", cd);}
二、MQ 可靠性
1、数据持久化
交换机、队列持久化
默认创建时就是持久化的(Durability = Durable)
消息持久化
RabbitTemplate 的
convertAndSend()
方法发送的消息默认就是持久化的(
delivery mode
= 2)
如果非要发送一个非持久化的消息,需要在调用 rabbitTemplate.convertAndSend() 方法时,显式地设置消息的 MessageProperties,并将 deliveryMode 设置为 1 (非持久化)
2、Lazy Queue 惰性队列
Lazy Queue是一种以惰性模式运行的队列,它尽可能地将消息存储在磁盘上,而不是内存中。只有当消费者需要消费消息时,这些消息才会被加载到内存中,效率比传统队列高。
3.12版本后,所有队列都是Lazy Queue模式,无法更改。
三、消费者可靠性
1、消费者确认机制
消费者回执消息类型
ack
消费者处理成功,RabbitMQ 将从队列中删除消息nack
消费者处理失败,RabbitMQ 需再次投递消息reject
消费者拒绝处理,RabbitMQ 将从队列中删除消息
SpringAMQP 消息监听器的三种确认模式
none
不处理。即消费者收到消息后立刻返回ack,消息会丢失,非常不安全。manual
手动模式。业务代码手动调用api发送 ack 或 reject,存在业务入侵,但更灵活。auto
自动模式(默认)。通过 AOP 对消息处理方法做环绕增强,正常返回ack,出现业务异常返回nack,出现消息处理或校验异常返回reject
spring:rabbitmq:listener:simple:acknowledge-mode: auto
2、失败重试机制
消费者处理消息出现异常时利用本地重试,而不是无限的requeue到mq,让mq重新投递给消费者
spring:rabbitmq:listener:simple:retry:enabled:true# 开启消费者失败重试(默认是关闭的)initial-interval: 1000ms # 初始的失败等待时长为1秒multiplier:1# 下次失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmax-attempts:3# 最大重试次数stateless:true# true无状态;false有状态。如果业务中包含事务,这里改为false
开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现。
失败消息处理策略
- RejectAndDontRequeueRecoverer:默认实现,重试耗尽后直接reject,丢弃消息。
- ImmediateRequeueMessageRecoverer:重试耗尽后返回nack,消息重新入队
- RepublishMessageRecoverer:重试耗尽后将失败消息投递到指定的交换机
以第三种失败消息处理策略为例,配置方式如下:
@BeanpublicMessageRecovererrepublishMessageRecoverer(RabbitTemplate rabbitTemplate){returnnewRepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
3、业务幂等性
由于存在各种确认和重试机制,消费者有重复消费消息的可能性,因此要保证业务的幂等性。
保证业务幂等性的方式如下:
- 方案一:发送消息时生成唯一消息ID,投递给消费者,消费者接收到消息,业务处理成功后将消息ID保存到数据库,下次根据消息ID去数据库查询判断是否已处理,如果已处理则放弃处理。
@BeanpublicMessageConvertermessageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jjmc =newJackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jjmc.setCreateMessageIds(true);return jjmc;}
- 方案二:结合业务逻辑,基于业务本身做判断。
版权归原作者 BLUcoding 所有, 如有侵权,请联系我们删除。