1. 生产者重试机制
spring:
rabbitmq:
connection-timeout:1s # 设置MQ的连接超时时间
template:
retry:
enabled:true # 开启超时重试机制
initial-interval:1000ms # 失败后的初始等待时间
multiplier:1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts:3 # 最大重试次数
2. 生产者确认机制
1. 配置
这里publisher-confirm-type有三种模式可选:
- none:关闭confirm机制
- simple:同步阻塞等待MQ的回执
- correlated:MQ异步回调返回回执
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns:true # 开启publisher return机制
2.定义ReturnCallback
每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
importlombok.AllArgsConstructor;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ReturnedMessage;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.context.annotation.Configuration;importjavax.annotation.PostConstruct;@Slf4j@AllArgsConstructor@ConfigurationpublicclassMqConfig{privatefinalRabbitTemplate rabbitTemplate;@PostConstructpublicvoidinit(){
rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returned){
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());}});}}
3.定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
@TestvoidtestPublisherConfirm(){// 1.创建CorrelationDataCorrelationData cd =newCorrelationData();// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(newListenableFutureCallback<CorrelationData.Confirm>(){@OverridepublicvoidonFailure(Throwable ex){// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);}@OverridepublicvoidonSuccess(CorrelationData.Confirm result){// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容if(result.isAck()){// result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");}else{// result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());}}});// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct","q","hello", cd);}
版权归原作者 CRE_MO 所有, 如有侵权,请联系我们删除。