1.生产者重连
有的时候由于网络波动,可能会出现客户端连接RabbitMQ失败的情况。通过配置我们可以开启连接失败后的重连机制
# Spring配置信息spring:# Rabbitmq配置rabbitmq:# 设置RabbitMQ连接超时时间connection-timeout: 2s
template:retry:# 开启超时重试机制enabled:true# 失败后的初始等待时间initial-interval: 1000ms
# 失败后下次的等待时长倍数,下次等待时间 = initial-interval * multipliermultiplier:1# 最大重试次数max-attempts:3
注:当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的,会影响业务性能。如果对于业务性能有要求,建议禁用重试机制,如果一定要使用,需合理配置等待时长和重试次数,也可以考虑使用异步线程来执行发送消息的代码
2.生产者确认
RabbitMQ提供了Publisher Confirm和Publisher Return两种确认机制。开启确认机制后,在RabbitMQ成功收到消息后会返回确认消息给生产者
(1)RabbitMQ返回的结果情况
- 消息投递到了RabbitMQ,但是路由失败。此时会通过Publisher Return返回路由异常原因,然后返回ACK,告知投递成功(造成原因:代码有问题或者交换机的配置有问题)
- 临时消息(非持久化)投递到了RabbitMQ,并且成功入队,返回ACK,告知投递成功
- 持久消息投递到了RabbitMQ,并且入队完成持久化,返回ACK,告知投递成功
- 其他情况都会返回NACK,告知投递失败
(2)生产者确认代码实现
添加配置文件信息
# Spring配置信息spring:# Rabbitmq配置rabbitmq:# 开启publisher confirm机制 none:关闭publisher confirm机制 simple:同步阻塞等待MQ的回执消息 correlated:MQ异步回调方式返回回执消息publisher-confirm-type: correlated
# 开启publisher return机制publisher-returns:true
编写Publisher Return回调函数(注:每个RabbitTemplate只能配置一个ReturnCallback)
importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.BeansException;importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationContextAware;importorg.springframework.context.annotation.Configuration;@Slf4j@ConfigurationpublicclassCommonConfigimplementsApplicationContextAware{@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{//获取RabbitTemplateRabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);//设置ReturnCallback
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
log.info("消息返回:{},{},{},{},{}",
message, replyCode, replyText, exchange, routingKey);});}}
编写ConfirmCallback(注:在每一个消息发送时候单独指定)
importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Component;importorg.springframework.util.concurrent.ListenableFutureCallback;@Slf4j@ComponentpublicclassProducer{@AutowiredRabbitTemplate rabbitTemplate;publicvoidsendMessage(Object message){// 1.创建CorrelationData对象CorrelationData correlationData =newCorrelationData(randomUUID().toString());// 2.给Future添加ConfirmCallback
correlationData.getFuture().addCallback(newListenableFutureCallback<CorrelationData.Confirm>(){@OverridepublicvoidonFailure(Throwable ex){// Future发生异常时的处理逻辑,基本不会触发
log.error("handle message ack fail",ex);}@OverridepublicvoidonSuccess(CorrelationData.Confirm result){// Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){// result.isAck(),boolean类型,true代表收到ack,false代表收到nack
log.debug("发送消息成功,收到 ack!");}else{// result.getReason(),String类型,返回nack时的异常原因
log.error("发送消息失败,收到 nack,reason : {}", result.getReason());}}});// 发送消息
rabbitTemplate.convertAndSend("交换机名称","routingKey值", message,correlationData);}}
3.如何处理生产者的确认消息
(1)生产者确认需要额外的网络和系统资源开销,尽量不要使用
(2)如果一定要使用,无需开启Publisher-Return机制,因为一般路由失败是自己的业务问题
(3)对于nack消息可以有限次数重试,依然失败则记录异常信息
版权归原作者 Java界第一深情 所有, 如有侵权,请联系我们删除。