文章目录
0. 什么是消息的可靠性投递
在生产环境中,如果因为一些不明原因导致RabbitMQ重启,RabbitMQ重启过程中是无法接收消息的,那么我们就需要生产者重新发送消息。或者在消息从交换机到队列的过程中出现意外,消息没有正常投放,我们需要消息回到交换机重新投放。
为了解决因为这些种种意外而产生的问题,就需要使用
消息的可靠性投递
。我们需要在三个过程保证消息传输的正常 :
- 消息
从生产者到交换机
的过程 - 消息
从交换机到队列
的过程 - 消息
从队列到消费者
的过程
消息从生产者到交换机的过程中,我们可以使用
confirm机制
来保障消息的可靠性。
消息从交换机到队列的过程中,我们可以使用
return机制
来保障消息的可靠性。
消息从队列到消费者的过程中,我们可以使用
ack
这个参数来保障消息的可靠性。
其中
confirm机制
和
return机制
都是使用回调函数,当消息投放失败后在回调函数中将消息放入redis,用定时任务重新投放。需要我们自己编码。
ack
参数是RabbitMQ实现的,我们需要手动ack。RabbitMQ将一条消息推送给消费者,如果没有接收到ack就会重发。
1. confirm机制
使用confirm机制需要在配置文件中将
rabbitmq.publisher-confirm-type
设置为
correlated
。表示使用confirm机制。
publisher-confirm-type: correlated
publisher-confirm-type
有以下三种值:
simple
:简单的执行ack的判断,在发布消息成功后使用 rabbitTemplate调用waitForConfirms或者waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判断下一步的逻辑。但是要注意的是当 waitForConfirmsOrDie 方法如果返回false则会关闭channel。(同步confirm,性能较差)correlated
:打开消息确认机制, 执行ack的时候还会携带消息的元数据。none
:禁用发布确认模式,默认值。
最后在RabbitTemplate中设置回调函数。这个函数不管是消息发送成功与否都会执行。该函数接收一个接口 :ConfirmCallback,此接口只有一个方法 :
voidconfirm(@NullableCorrelationData correlationData,boolean ack,@NullableString cause);
- correlationData :保存消息id以及相关信息
- ack :交换机是否收到消息,收到为true
- cause :消息接收不到原因,成功接收消息则为null
在这个函数里,可以判断ack,如果为false则代表信息发送失败,可以存入redis,让定时任务扫描redis重新发送消息。(解决方案有很多,这里简单提个建议)
@Slf4j@ConfigurationpublicclassRabbitConfig{@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{// 打印日志
log.info("ConfirmCallback:"+"相关数据:"+ correlationData);
log.info("ConfirmCallback:"+"确认情况:"+ ack);
log.info("ConfirmCallback:"+"原因:"+ cause);// 存入数据库中定时重发.// ...});return rabbitTemplate;}}
总结 :
confirm机制有两步 :
- 在yml配置文件中将
publisher-confirm-type
设置为correlated
,开启confirm机制publisher-confirm-type: correlated
- 设置回调函数,setConfirmCallback
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{// 打印日志 log.info("ConfirmCallback:"+"相关数据:"+ correlationData); log.info("ConfirmCallback:"+"确认情况:"+ ack); log.info("ConfirmCallback:"+"原因:"+ cause);// 存入数据库中定时重发.// ...});
2. return机制
在了解return 机制前先了解一个参数 :
mandatory
。
mandatory
是AMQP协议中basic.publish方法中的标识位。
如果交换机根据路由键找不到对应的队列,那么此时它有两个选择 :将消息返回交换机、将消息丢弃,当mandatory的值为true时,消息将返回给交换机;当mandatory的值为false时,消息将被丢弃。
我们想要实现
消息的可靠性投递
,当消息投放错时让消息重新投放,那么我们就需要将
mandatory
的值设置为true。
所以实现return机制有三步 :
想使用return机制,首先要在配置文件中将
publisher-returns
的值设置为
true
,代表开启return机制
publisher-returns:true
接下来将mandatory这个值设置为true,代表如果消息丢失或者出现意外,将消息返回,而不是丢弃。
rabbitTemplate.setMandatory(true);
最后实现
ReturnCallback
这个接口 :
voidreturnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey);
- message :此条消息
- replyCode :错误编码
- replyText :消息接收失败的原因
- exchange :此条消息的目标交换机
- routingKey :此条消息的路由键
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{// 打印日志
log.info("ReturnCallback:"+"消息:"+ message);
log.info("ReturnCallback:"+"回应码:"+ replyCode);
log.info("ReturnCallback:"+"回应信息:"+ replyText);
log.info("ReturnCallback:"+"交换机:"+ exchange);
log.info("ReturnCallback:"+"路由键:"+ routingKey);// 做其他处理...});
执行到returnCallback函数就代表消息从交换机到队列的过程出现问题,例如
路由键错误
、
网络问题
、
找不到相应队列
…这些情况下可以先打印日志,再使用手动签收机制让交换机重新发送消息。
因为路由键错误这种就是代码写错了,重发消息也没用,只能打印日志等操作来尽可能提醒开发人员,但是网络问题可以重新发送消息。
3. 总结
实现消息的可靠性投递通过confirm机制和return机制。
- 在配置文件中开启这两种机制:
publisher-confirm-type: correlatedpublisher-returns:true
- 注入RabbitTemplate时将mandatory设置为true,代表消息投递异常时将消息返回,而不是丢弃。
- 注入RabbitTemplate时实现ConfirmCallback和ReturnCallback。在里面完成消息投放错误后的保障工作。
@Slf4j@ConfigurationpublicclassRabbitConfig{@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate(); rabbitTemplate.setConnectionFactory(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{// 打印日志 log.info("ConfirmCallback:"+"相关数据:"+ correlationData); log.info("ConfirmCallback:"+"确认情况:"+ ack); log.info("ConfirmCallback:"+"原因:"+ cause);// 存入redis数据库中定时重发.// ...}); rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{// 打印日志 log.info("ReturnCallback:"+"消息:"+ message); log.info("ReturnCallback:"+"回应码:"+ replyCode); log.info("ReturnCallback:"+"回应信息:"+ replyText); log.info("ReturnCallback:"+"交换机:"+ exchange); log.info("ReturnCallback:"+"路由键:"+ routingKey);});return rabbitTemplate;}}
1. 在保证confirm机制和return机制的同时别忘记让消费者手动ack。注意 :> 为什么return机制和ack机制可以重新发送,而confirm机制只能借用外部手段呢(例如redis、xxl-job)?因为return机制触发时消息已经到了交换机,可以通过判断通过mandatory这个参数来确定是重新发送还是丢弃。(只是在发送给队列时出现错误,人家RabbitMQ自己设计的可以重发。)但是confirm机制触发时消息刚从生产者发送给交换机,还没进入RabbitMQ,只能借用外部手段了。
版权归原作者 小何┌ 所有, 如有侵权,请联系我们删除。