目录
当我们在项目中引入了新的中间件之后,数据的风险性就要多一层考虑。那么,RabbitMQ 的消息是怎么知道有没有被消费者消费的呢?生产者又怎么确保自己发送成功了呢?这些问题将在文章中进行解答。
一、简介
1.1 背景
在 MQ 中,消费者和生产者并不直接进行通信,生产者只负责把消息发送到队列,消费者只负责从队列获取消息。
- 消费者从队列 获取到消息后,这条消息就不在队列中了。如果此时消费者所在的信道 因为网络中断没有消费到,那这条消息就 被永远地丢失了。所以,我们希望等待消费者 成功消费掉这条消息之后再删除消息。
- 生产者向交换机 发送消息后,也 不能保证消息准确发送过去了,消息就像 石沉大海 一样,所以 发送消息也需要进行消息确认。
1.2 定义
为了保证消息从队列可靠地到达消费者,RabbitMQ 提供了 消息确认机制(Message Acknowledgement)。
消费者在订阅队列时,可以指定
autoAck
参数:
autoAck=false
:RabbitMQ 会 等待消费者显式地回复确认信号 后才从内存(或磁盘)中移除消息(实际上时先打上删除标记,之后再删除)。autoAck=true
:RabbitMQ 会 自动把发送出去的消息置为确认,然后内存(或磁盘)中删除,而 不管消费者是否真正地消费到了这些消息。
采用消息确认机制后,只要设置
autoAck
参数为
false
,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为 RabbitMQ 会一直等待持有消息知道消费者显式调用
Basic.Ack
命令为止。
对于 RabbitMQ 服务器端而言,当
autoAck
参数为
false
时,队列中的消息分成了两部分:
- 一部分是 等待投递给消费者的消息;
- 另一部分是 已经投递给消费者,但是还没有收到消费者确认信号的消息。
如果 RabbitMQ 服务器端 一直没有收到消费者的确认信息,并且 消费此消息的消费者已经断开连接,则服务器端会安排 该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
RabbitMQ 不会为未确认的消息设置过期时间,它 判断此消息是否需要重新投递给消费者的唯一依据是该消息连接是否已经断开,这个设计的原因是 RabbitMQ 允许消费者消费一条消息的时间可以很久很久。
1.3 如何查看确认/未确认的消息数?
RabbitMQ 的 Web 管理平台上可以看到当前队列中的 “Ready” 状态和 “Unacknowledged” 状态的消息数:
- Read 状态: 等待投递给消费者的消息数。
- Unacknowledged 状态: 已经投递给消费者但是未收到确认信号的消息树。
二、消息确认机制的分类
RabbitMQ 消息确认机制分为两大类:
消息发送确认
,又分为: - 生产者到交换机的确认;- 交换机到队列的确认。消息接收确认
。
2.1 消息发送确认
RabbitMQ 的消息发送确认有两种实现方式:ConfirmCallback 方法、ReturnCallback 方法。
1)ConfirmCallback方法
ConfirmCallback
是一个回调接口,用于确认消息否是到达交换机中。
配置方式:
spring.rabbitmq.publisher-confirm-type=correlated
它有三个值:
none
:禁用发布确认模式,默认值。correlated
:发布消息成功到交换机后触发回调方法。simple
:经测试有两种效果:一是和 correlated 一样会触发回调方法;二是在发布消息成功后使用 rabbitTemplate 调用 waitForConfirm 或 waitForConfirmsOrDie方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑。要注意的是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker。
2)ReturnCallback方法
ReturnCallback
也是一个回调接口,用于确认消息是否在交换机中路由到了队列。
(该方法可以不使用,因为交换机和队列是在代码里面绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非代码写错了。)
配置方式:
spring.rabbitmq.publisher-returns=true
3)代码实现方式一:统一配置
a.配置类
RabbitDirectConfig.java
importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* <p> @Title RabbitDirectConfig
* <p> @Description 直连交换机配置
* Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。
*
* @author ACGkaka
* @date 2023/1/12 15:09
*/@Slf4j@ConfigurationpublicclassRabbitDirectConfig{publicstaticfinalStringDIRECT_EXCHANGE_NAME="TEST_DIRECT_EXCHANGE";publicstaticfinalStringDIRECT_ROUTING_NAME="TEST_DIRECT_ROUTING";publicstaticfinalStringDIRECT_QUEUE_NAME="TEST_DIRECT_QUEUE";@BeanpublicRabbitTemplaterabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);// 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);//设置message序列化方法
rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());// 设置消息发送到交换机(Exchange)回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{if(ack){
log.info(">>>>>>>>>>【INFO】消息发送到交换机(Exchange)成功, 相关数据: {}", correlationData);}else{
log.error(">>>>>>>>>>【ERROR】消息发送到交换机(Exchange)失败, 错误原因: {}, 相关数据: {}", cause, correlationData);}});// 设置消息发送到队列(Queue)回调(经测试,只有失败才会调用)
rabbitTemplate.setReturnsCallback((returnedMessage)->{
log.error(">>>>>>>>>>【ERROR】消息发送到队列(Queue)失败:响应码: {}, 响应信息: {}, 交换机: {}, 路由键: {}, 消息内容: {}",
returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey(), returnedMessage.getMessage());});return rabbitTemplate;}/**
* 消息监听-反序列化
*/@BeanpublicRabbitListenerContainerFactory<?>rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(newJackson2JsonMessageConverter());return factory;}/**
* 队列,命名:testDirectQueue
*
* @return 队列
*/@BeanpublicQueuetestDirectQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable。// autoDelete:是否自动删除,当没有生产者或消费者使用此队列,该队列会自动删除。// 一般设置一下队列的持久化就好,其余两个默认falsereturnnewQueue(DIRECT_QUEUE_NAME,true);}/**
* Direct交换机,命名:testDirectExchange
* @return Direct交换机
*/@BeanDirectExchangetestDirectExchange(){returnnewDirectExchange(DIRECT_EXCHANGE_NAME,true,false);}/**
* 绑定 将队列和交换机绑定,并设置用于匹配键:testDirectRouting
* @return 绑定
*/@BeanBindingbindingDirect(){returnBindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(DIRECT_ROUTING_NAME);}}
a.生产者
SendMessageController.java
importcom.demo.config.RabbitDirectConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;importjava.time.LocalDateTime;importjava.time.format.DateTimeFormatter;importjava.util.HashMap;importjava.util.Map;importjava.util.UUID;/**
* <p> @Title SendMessageController
* <p> @Description 推送消息接口
*
* @author ACGkaka
* @date 2023/1/12 15:23
*/@Slf4j@RestControllerpublicclassSendMessageController{/**
* 使用 RabbitTemplate,这提供了接收/发送等方法。
*/@AutowiredprivateRabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")publicStringsendDirectMessage(){String messageId =String.valueOf(UUID.randomUUID());String messageData ="Hello world.";String createTime =LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map =newHashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);// 将消息携带绑定键值:TEST_DIRECT_ROUTING,发送到交换机:TEST_DIRECT_EXCHANGE
rabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME,RabbitDirectConfig.DIRECT_ROUTING_NAME, map);return"OK";}}
c.消费者
DirectReceiver.java
importcom.demo.config.RabbitDirectConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.util.Map;/**
* <p> @Title DirectReceiver
* <p> @Description 直连交换机监听类
*
* @author ACGkaka
* @date 2023/1/12 15:59
*/@Slf4j@ComponentpublicclassDirectReceiver{@RabbitListener(queues =RabbitDirectConfig.DIRECT_QUEUE_NAME)publicvoidprocess(Map<String,Object> testMessage){System.out.println("DirectReceiver消费者收到消息:"+ testMessage.toString());}}
d.测试结果
成功发送时,执行结果:
交换机错误时,执行结果:
路由键错误时,执行结果:
4)代码实现方式二:单独配置
除了在配置类里面统一设置回调方法外,还可以在每次推送消息到队列时,手动使用
CorrelationData
指定回调方法。
@GetMapping("/sendDirectMessage2")publicStringsendDirectMessage2(){String messageId =String.valueOf(UUID.randomUUID());String messageData ="Hello world.";String createTime =LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map =newHashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);//生成唯一标识CorrelationData correlationData =newCorrelationData(messageId);//不管成功失败都会调用confirm或者throwable,这是异步调用
correlationData.getFuture().addCallback(
confirm ->{// 设置消息发送到交换机(Exchange)回调if(confirm !=null&& confirm.isAck()){
log.info(">>>>>>>>>>【INFO】发送成功ACK,msgId: {}, message: {}", correlationData.getId(), map);}else{
log.error(">>>>>>>>>>【ERROR】发送失败NACK,msgId: {}, message: {}", correlationData.getId(), map);}},
throwable ->{//发生错误,链接mq异常,mq未打开等...报错回调System.out.println("发送失败throwable = "+ throwable +", id:"+ correlationData.getId());});// 将消息携带绑定键值:TEST_DIRECT_ROUTING,发送到交换机:TEST_DIRECT_EXCHANGE
rabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME,RabbitDirectConfig.DIRECT_ROUTING_NAME, map, correlationData);return"OK";}
2.2 消息接收确认
消费者确认发生在 监听队列的消费者处理业务失败,如:发生了异常、不符合要求的数据等。这些场景就 需要我们手动处理消息,比如:重新发送消息或者丢弃消息。
RabbitMQ 的
消息确认机制(ACK)
默认是自动确认的。自动确认会 在消息发送给消费者后立即确认,但 存在丢失消息的可能。如果消费端消费逻辑抛出了异常,假如我们使用了事务的回滚,也只是保证了数据的一致性,消息还是丢失了。也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
消息的确认模式有三种:
AcknowledgeMode.NONE
:自动确认。(默认)AcknowledgeMode.AUTO
:根据情况确认。AcknowledgeMode.MANUAL
:手动确认。(推荐)
消费者收到消息后,手动调用 Channel 的
basicAck()
/
basicReject()
/
basicNack()
方法后,RabbitMQ 收到消息后,才认为本次投递完成。
basicAck()
:用于确认当前消息。basicReject()
:用于拒绝当前消息,可以自定义是否重回队列。basicNack()
:用于批量拒绝消息(这是 AMPQ 0-9-1 的 RabbitMQ 扩展)。
1)basicAck() 方法
basicAck()
方法 用于确认当前消息,Channel 类中的方法定义如下:
voidbasicAck(long deliveryTag,boolean multiple)throwsIOException;
参数说明:
- long deliveryTag: 当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel。RabbitMQ 会用
basic.deliver
方法向消费者推送消息,这个方法携带了一个deliveryTag
,它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识ID,是一个单调递增的正整数,deliveryTag
的范围仅限于当前 Channel。 - boolean multiple: 是否批处理,一般为 false,当该参数为 true 时,则可以一次性确认
deliveryTag
小于等于传入值的所有消息。
2)basicReject() 方法
basicReject()
方法 用于明确拒绝当前的消息。RabbitMQ 在 2.0.0 版本开始引入,Channel 类中的方法定义如下:
voidbasicReject(long deliveryTag,boolean requeue)throwsIOException;
参数说明:
- long deliveryTag: 当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel。RabbitMQ 会用
basic.deliver
方法向消费者推送消息,这个方法携带了一个deliveryTag
,它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识ID,是一个单调递增的正整数,deliveryTag
的范围仅限于当前 Channel。 - boolean requeue: 是否重新放回队列。 - 如果参数为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者。- 如果参数为 false,则 RabbitMQ 会立即把消息从队列中移除,不会把它发送给新的消费者。
3)basicNack() 方法
basicNack()
方法 用于批量拒绝消息。由于 basicReject() 方法一次只能拒绝一条消息,如果想批量拒绝消息,则可以使用 basicNack() 方法。Channel 类中的方法定义如下:
参数说明:
- long deliveryTag: 当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel。RabbitMQ 会用
basic.deliver
方法向消费者推送消息,这个方法携带了一个deliveryTag
,它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识ID,是一个单调递增的正整数,deliveryTag
的范围仅限于当前 Channel。 - boolean multiple: 是否批处理,一般为 false,当该参数为 true 时,则可以一次性确认
deliveryTag
小于等于传入值的所有消息。 - boolean requeue: 是否重新放回队列。 - 如果参数为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者。- 如果参数为 false,则 RabbitMQ 会立即把消息从队列中移除,不会把它发送给新的消费者。
4)代码实现
a.配置方式一:代码配置
如果我们之前配置了
Jackson2JsonMessageConverter.java
的序列化方式,那么我们可以接着指定消费方的消息确认模式为
AcknowledgeMode.MANUL
。
/**
* 消息监听配置
*/@BeanpublicRabbitListenerContainerFactory<?>rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();// 设置连接工厂
factory.setConnectionFactory(connectionFactory);// 设置消息确认模式
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);// 设置反序列化
factory.setMessageConverter(newJackson2JsonMessageConverter());return factory;}
b.配置方式二:配置文件
我们可以直接在
application.yml
中进行如下配置:
# 确认模式,默认auto,自动确认;manual:手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
注意: yaml中指定的是消费端容器的默认配置,如果我们在代码中有自定义注入
RabbitListenerContainerFactory
示例之后,还需要使用默认配置,需要在代码中进行设置,如下所示:
@AutowiredprivateSimpleRabbitListenerContainerFactoryConfigurer configurer;/**
* 消息监听配置
*/@BeanpublicRabbitListenerContainerFactory<?>rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();// 设置连接工厂
factory.setConnectionFactory(connectionFactory);// 采用yaml中的配置
configurer.configure(factory, connectionFactory);// 设置反序列化
factory.setMessageConverter(newJackson2JsonMessageConverter());return factory;}
c.生产者
SendMessageController.java
importcom.demo.config.RabbitDirectConfig;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;importjava.time.LocalDateTime;importjava.time.format.DateTimeFormatter;importjava.util.HashMap;importjava.util.Map;importjava.util.UUID;/**
* <p> @Title SendMessageController
* <p> @Description 推送消息接口
*
* @author ACGkaka
* @date 2023/1/12 15:23
*/@Slf4j@RestControllerpublicclassSendMessageController{/**
* 使用 RabbitTemplate,这提供了接收/发送等方法。
*/@AutowiredprivateRabbitTemplate rabbitTemplate;@GetMapping("/sendDirectMessage")publicStringsendDirectMessage(){String messageId =String.valueOf(UUID.randomUUID());String messageData ="Hello world.";String createTime =LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));Map<String,Object> map =newHashMap<>();
map.put("messageId", messageId);
map.put("messageData", messageData);
map.put("createTime", createTime);// 将消息携带绑定键值:TEST_DIRECT_ROUTING,发送到交换机:TEST_DIRECT_EXCHANGE
rabbitTemplate.convertAndSend(RabbitDirectConfig.DIRECT_EXCHANGE_NAME,RabbitDirectConfig.DIRECT_ROUTING_NAME, map);return"OK";}}
d.消费者
DirectReceiver.java
importcom.demo.config.RabbitDirectConfig;importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.IOException;importjava.util.Map;/**
* <p> @Title DirectReceiver
* <p> @Description 直连交换机监听类
*
* @author ACGkaka
* @date 2023/1/12 15:59
*/@Slf4j@ComponentpublicclassDirectReceiver{@RabbitListener(queues =RabbitDirectConfig.DIRECT_QUEUE_NAME)publicvoidprocess(Map<String,Object> testMessage,Message message,Channel channel)throwsIOException{try{
log.info("DirectReceiver消费者收到消息: {}", testMessage.toString());// 手动答应消费完成,从队列中删除该消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){
log.error("DirectReceiver消费者消费失败,原因: {}", e.getMessage(), e);// 手动答应消费完成,从队列中删除该消息(不重回队列)
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);}}}
e.测试结果
场景一:消费者进行手动确认,生产者推送2条消息:
可以看到,生产者推送2条消息后立马被消费了。
场景二:消费者不进行手动确认,生产者推送2条消息:
虽然消费者消费完毕,但是由于没有进行手动确认,所以2条消息会一直处于
Unacked
状态,直到消费者下线。
关闭 SpringBoot 程序,消费者下线后,消息由
Unacked
状态转为
Ready
状态,等待下一个消费者上线后重新进行消费。
整理完毕,完结撒花~ 🌻
参考地址:
1.RabbitMQ(4):消息确认机制详解,https://juejin.cn/post/7029232312197840904
2.RabbitMQ消息确认机制(ACK),https://blog.csdn.net/pan_junbiao/article/details/112956537
3.RabbitMQ高级,https://blog.csdn.net/hnhroot/article/details/125921527
4.关于rabbitMQ在yml配置手动ack不生效,重复答应的问题,https://blog.csdn.net/love_Saber_Archer/article/details/109111088
版权归原作者 不愿放下技术的小赵 所有, 如有侵权,请联系我们删除。