前言
在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一层考虑,来保证RabbitMQ消息的可靠性,否则一个个消息丢失可能导致整个业务的数据出现不一致等问题,对系统带来巨大的影响,消息的可靠性可以主要在三个方面去考虑:生产者消息确认,消费者消息确认,消息持久化,这篇文件说明生产者消息确认的。
一、消息确认流程图
由图可知,消息确认是分为生产者确认和消费者确认的,生产者和MQ之间的消息确认机制为生产者消息确认,MQ和消费者之间的消息确认机制为消费者消息确认
消息丢失的情景有三种情况:
1、发送消息过程中出现网络问题:生产者以为发送成功,但MQ没有收到;(需要生产者消息确认)
2、接收到消息后由于MQ服务器宕机或重启等原因(消息默认存在内存中)导致消息丢失;(需要消息持久化)
3、消费者接收到消息后处理消息出错,没有完成消息的处理,但是自动返回ack(这时候需要开启手动确认模式,消费者消息确认)
二、生产者消息确认
RabbitMQ提供了publisher confirm机制来避免消息投递到MQ过程中丢失。这种机制下每个message都必须要有一个独一无二的ID,来区分开不同的消息,避免ack(消息确认参数)冲突。每当消息发送到MQ成功后,MQ都会返回一个结果给生产者,以保证生产者消息确认。在生产者消息确认时,又有两种返回结果方式(通常两个都要实现)来确保消息投递可靠性,分别为publisher-confirm和publisher-return,以下作出说明。
1、publisher-confirm(发送者确认)
消息成功投递到交换机,返回ack
消息未投递到交换机,返回nack
2、publisher-return(发送者回执)
消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。
三、代码实现
1、修改application.yml 配置
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
publish-confirm-type有三个值,none:禁用发布确认模式,是默认值
**simple**:同步等待confirm结果,直到超时
**
correlated
:**异步回调,定义ConfirmCallback,MQ返回结果时 会回调这个ConfirmCallback
publisher-returns:开启消息失败回调,回调函数ReturnCallback
2、ConfirmCallback函数和ReturnCallback函数
/**
* RabbitMQ配置类
* Created by YHJ on 2022/08/18 12:56
*/
@Configuration
public class RabbitMQConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQConfig.class);
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
/**
* TODO RabbitMQ发送者消息确认回调,解决消息可靠性问题
* 消息确认回调,确认消息是否到达broker
* data:消息唯一标识
* ack:确认结果
* cause:失败原因
*/
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
//获得消息的id
String msgId = data.getId();
if (ack) {
LOGGER.info("{}=======>消息发送成功", msgId);
//消息发送成功后,更新数据库消息状态等逻辑
} else {
//信息发送失败,打印日志后,可以根据业务选择是否重发消息
LOGGER.error("{}=======>消息发送失败", msgId);
}
});
/**
* TODO RabbitMQ发送者消息失败回调,解决消息可靠性问题
* 消息失败回调,比如router不到queue时回调
* msg:消息主题
* repCode:响应码
* repText:相应描述
* exchange:交换机
* routingkey:路由键
*/
rabbitTemplate.setReturnsCallback((res) -> {
//若发送失败,打印错误信息,然后可以根据业务选择重发消息
LOGGER.error("{}=======>消息发送queue时失败", res.getMessage().getBody());
});
return rabbitTemplate;
}
// 队列
@Bean
public Queue queue() {
return new Queue("YHJ.queue");
}
// 交换机,路由模式
@Bean
public DirectExchange directExchange() {
return new DirectExchange("YHJ.exchange");
}
// 绑定
@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(directExchange()).with("YHJ.key");
}
}
版权归原作者 OHJ小白 所有, 如有侵权,请联系我们删除。