RabbitMQ发布确认机制确保消息从生产者成功传输到交换机和队列,提高系统可靠性。在Spring Boot项目中,通过配置
publisher-confirm-type
和
publisher-returns
,启用发布确认和消息返回机制。配置
RabbitTemplate
的确认回调和返回回调,可以捕捉消息传输状态,处理不同传输结果。测试场景包括消息无法到达交换机、消息到达交换机但无法到达队列以及消息成功到达队列。通过合理设置和优化,可以确保高并发环境下的消息可靠传输,适用于金融支付、电商系统等对消息传输可靠性要求高的场景。
1. RabbitMQ发布确认机制概述
发布确认(Publisher Confirms)是RabbitMQ提供的一种机制,用于确保消息从生产者发送到RabbitMQ服务器并被成功处理。与事务机制不同,发布确认的性能开销更小,非常适合高吞吐量的场景。发布确认机制提供了两种类型的确认:
- 消息到达交换机(Exchange)后的确认
- 消息从交换机路由到队列(Queue)后的确认
2. 配置文件中添加发布确认相关配置
在Spring Boot项目中,通过配置文件来启用发布确认机制非常方便。以下是需要添加到
application.properties
或
application.yml
中的配置:
# 消息到达交换机后会回调发送者
spring.rabbitmq.publisher-confirm-type=correlated
# 消息无法路由到队列时回调发送者
spring.rabbitmq.publisher-returns=true
配置解释:
publisher-confirm-type:设置为correlated表示使用CorrelationData来关联确认与发送的消息。publisher-returns:设置为true表示启用消息返回机制,当消息无法路由到队列时会触发回调。
3. 发布确认类型
在Spring AMQP中,发布确认类型通过
ConfirmType
枚举类来定义:
publicenumConfirmType{SIMPLE,// 使用 RabbitTemplate#waitForConfirms() 或 waitForConfirmsOrDie()CORRELATED,// 使用 CorrelationData 关联确认与发送的消息NONE// 不启用发布确认}
4. 配置RabbitTemplate
为了使用发布确认机制,需要配置
RabbitTemplate
,包括设置确认回调和返回回调:
@Slf4j@ConfigurationpublicclassRabbitTemplateConfig{@BeanpublicRabbitTemplatecreateRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);// 设置mandatory为true,当找不到队列时,broker会调用basic.return方法将消息返还给生产者
rabbitTemplate.setMandatory(true);// 设置确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{if(ack){
log.info("消息已经到达Exchange");}else{
log.info("消息没有到达Exchange");}if(correlationData !=null){
log.info("相关数据:"+ correlationData);}if(cause !=null){
log.info("原因:"+ cause);}});// 设置返回回调
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey)->{
log.info("消息无法到达队列时触发");
log.info("ReturnCallback: "+"消息:"+ message);
log.info("ReturnCallback: "+"回应码:"+ replyCode);
log.info("ReturnCallback: "+"回应信息:"+ replyText);
log.info("ReturnCallback: "+"交换机:"+ exchange);
log.info("ReturnCallback: "+"路由键:"+ routingKey);});return rabbitTemplate;}}
5. 配置测试交换机和队列
为了测试发布确认机制,我们需要配置相应的交换机和队列:
@Slf4j@ConfigurationpublicclassConfirmConfig{@BeanpublicQueueconfirmQueue(){returnnewQueue(Constant.CONFIRM_QUEUE,false);}@BeanDirectExchangeconfirmExchange(){DirectExchange directExchange =newDirectExchange(Constant.CONFIRM_EXCHANGE,false,false);
directExchange.addArgument("alternate-exchange",Constant.CONFIRM_BACKUP_EXCHANGE);return directExchange;}@BeanBindingbindingConfirm(){returnBindingBuilder.bind(confirmQueue()).to(confirmExchange()).with(Constant.CONFIRM_ROUTING_KEY);}@BeanFanoutExchangebackupExchange(){returnnewFanoutExchange(Constant.CONFIRM_BACKUP_EXCHANGE,false,false);}@BeanpublicQueuebackupQueue(){returnnewQueue(Constant.CONFIRM_BACKUP_QUEUE,false);}@BeanpublicQueuewarningQueue(){returnnewQueue(Constant.CONFIRM_WARNING_QUEUE,false);}@BeanBindingbindingConfirmBackup(){returnBindingBuilder.bind(backupQueue()).to(backupExchange());}@BeanBindingbindingConfirmWarning(){returnBindingBuilder.bind(warningQueue()).to(backupExchange());}}
6. 测试场景及分析
6.1 消息无法到达交换机
测试代码:
@AutowiredRabbitTemplate rabbitTemplate;String msg ="一条用于发布确认的消息";@GetMapping("/noExchange")publicvoidnoExchange(){
rabbitTemplate.convertAndSend("noExchange","noExchange", msg);}
配置了
rabbitTemplate.setMandatory(true)
,当消息无法到达交换机时会回调:
ConfirmCallback 消息没有到达ExchangeConfirmCallback 原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND- no exchange 'noExchange' in vhost '/',class-id=60, method-id=40)
6.2 消息到达交换机但无法到达队列
测试代码:
@GetMapping("/toExchange")publicvoidtoExchange(){
rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE,"xxx.xxx.xxx", msg);}
输出:
ConfirmCallback 消息已经到达Exchange
没有收到无法到达队列的消息,是因为配置了备份队列,消息被路由到了备份队列。
6.3 注掉备份队列再试
修改配置:
@BeanDirectExchangeconfirmExchange(){DirectExchange directExchange =newDirectExchange(Constant.CONFIRM_EXCHANGE,true,false);return directExchange;}
测试结果:
消息无法到达队列时触发
ReturnCallback: 消息:(Body:'一条用于发布确认的消息' MessageProperties[headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])ReturnCallback: 回应码:312ReturnCallback: 回应信息:NO_ROUTEReturnCallback: 交换机:myConfirmExchange
ReturnCallback: 路由键:xxx.xxx.xxx
ConfirmCallback 消息已经到达Exchange
此时,
ConfirmCallback
和
ReturnCallback
都被调用了。
6.4 成功到达队列
测试代码:
@GetMapping("/toQueue")publicvoidtoQueue(){
rabbitTemplate.convertAndSend(Constant.CONFIRM_EXCHANGE,Constant.CONFIRM_ROUTING_KEY, msg);}
输出:
ConfirmCallback 消息已经到达Exchange
7. 发布确认流程
下图展示了RabbitMQ发布确认流程:

8. 深入解析RabbitMQ发布确认机制
8.1 事务机制与发布确认机制的对比
事务机制和发布确认机制都是确保消息可靠投递的手段,但它们在实现和性能方面有明显区别:
- 事务机制:通过
txSelect、txCommit和txRollback实现,性能开销较大,不适合高并发场景。 - 发布确认机制:通过异步确认消息是否成功到达交换机和队列,性能开销小,适合高并发场景。
8.2 发布确认机制的优缺点
优点
- 性能高:相比事务机制,发布确认机制对性能的影响较小。
- 异步处理:使用回调函数处理确认结果,不阻塞消息发送。
- 可靠性高:确保消息成功到达交换机和队列,提高系统可靠性。
缺点
- 实现复杂:需要配置和处理回调函数,增加了代码复杂度。
- 延迟高:确认机制引入了额外的网络延迟。
8.3 发布确认机制的应用场景
- 金融支付系统:确保支付消息的可靠传输,避免重复支付或支付丢失。
- 电商系统:确保订单消息的可靠传输,避免订单丢失或重复处理。
- 日志系统:确保日志消息的可靠传输,避免日志丢
失。
8.4 发布确认机制的最佳实践
- 合理设置超时时间:在高并发场景下,设置合理的超时时间,避免消息发送阻塞。
- 优化回调函数:回调函数中避免复杂逻辑,确保回调处理快速完成。
- 监控和报警:建立监控机制,及时发现和处理消息投递失败问题。
9. 总结
本文详细介绍了RabbitMQ消息的发布确认机制,包括配置、实现及其在不同场景下的表现。通过合理配置和使用发布确认机制,可以有效提高消息传输的可靠性,确保消息在高并发环境下的可靠投递。希望本文能够帮助读者深入理解并应用RabbitMQ的发布确认机制,提高系统的可靠性和性能。
版权归原作者 九转成圣 所有, 如有侵权,请联系我们删除。