1. 消息确认机制
生产者发送消息之后, 到达消费端之后, 可能会有以下情况:
(1)消息处理成功
(2)消息处理失败
RabbitMQ向消费者发送消息之后, 就会把这条消息删掉, 那么第二种情况, 就会造成消息丢失。
那么如何确保消费端已经成功接收了, 并正确处理了呢?
为了保证消息从队列可靠地到达消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。
消费者在订阅队列时,可以指定autoAck参数,根据这个参数设置,消息确认机制分为以下两种:
- 自动确认:当autoAck等于true时,RabbitMQ 会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息,自动确认模式适合对于消息可靠性要求不高的场景。
- 手动确认:当autoAck等于false时,RabbitMO会等待消费者显式地调用Basic.Ack命令,回复确认信号后才从内存(或者磁盘)中移除消息,这种模式适合对消息可靠性要求比较高的场景。
当autoAck参数置为false,对于RabbitMO服务端而言,队列中的消息分成了两个部分:
一是等待投递给消费者的消息。
二是已经投递给消费者,但是还没有收到消费者确认信号的消息。
如果RabbitM一直没有收到消费者的确认信号,并且消费此消息的消费者已经断开连接。
则RabbitM会安排该消息重新进入队列,等待投递给下一个消费者,当然也有可能还是原来的那个消费。
从RabbitMQ的Web管理平台上, 也可以看到当前队列中Ready状态和Unacked状态的
Ready: 等待投递给消费者的消息数。
Unacked: 已经投递给消费者, 但是未收到消费者确认信号的消息。
2. 手动确认消息
消费者在收到消息之后,可以选择确认,也可以选择直接拒绝或者跳过,RabbitMQ也提供了不同的确认应答的方式,消费者客户端可以调用与其对应的channel的相关方法,共有以下三种:
(1)肯定确认
Channel.basicAck(long deliveryTag, boolean multiple)
RabbitMo已知道该消息并且成功的处理消息.可以将其丢弃了
参数说明:
deliveryTag:消息的唯一标识,它是一个单调递增的64位的长整型值。
multiple:是否批量确认。
deliveryTag 是RabbitMQ中消息确认机制的一个重要组成部分,它确保了消息传递的可靠性和顺序性。
(2)否定确认
Channel.basicReject(long deliveryTag, boolean requeue)
参数说明:
requeue:表示拒绝后,这条消息如何处理。
如果requeue参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者。
如果requeue参数设置为false,则RabbitMQ会把消息从队列中移除,而不会把它发送给新的消者。
Channel.basicNack(long deliveryTag, boolean multiple, boolean requeue)
Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。
3. 代码示例
Spring-AMQP 对消息确认机制提供了三种策略:
public enum AcknowledgeMode {
NONE,
MANUAL,
AUTO;
}
AcknowledgeMode.NONE
这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMO就会自动确认消息,从RabbitMQ队列中移除消息。如果消费者处理消息失败,消息可能会丢失。
AcknowledgeMode.AUTO(默认)
这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息。
AcknowledgeMode.MANUAL
手动确认模式下,消费者必须在成功处理消息后显式调用basicAck方法来确认消息。
如果消息未被确认,RabbitMO会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息。
这种模式提高了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理。
(1) AcknowledgeMode.NONE
配置确认机制
spring:
rabbitmq:
# 消息监听配置
listener:
simple:
acknowledge-mode: none
发送消息
交换机,队列配置
@Configuration
public class RabbitMQConfig {
@Bean("ackQueue")
public Queue ackQueue() {
return QueueBuilder.durable(Constant.ACK_QUEUE).build();
}
@Bean("ackExchange")
public Exchange ackExchange() {
return ExchangeBuilder.directExchange(Constant.ACK_EXCHANGE).durable(true).build();
}
/**
* 绑定ack队列和ack交换机
*
* @param ackExchange
* @param ackQueue
* @return
*/
@Bean("ackBinding")
public Binding ackBinding(@Qualifier("ackExchange") Exchange ackExchange, @Qualifier("ackQueue") Queue ackQueue) {
return BindingBuilder.bind(ackQueue).to(ackExchange).with("ack").noargs();
}
}
通过接口发送消息
package com.example.orderservice.controller;
import com.example.orderservice.Constant;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class AckController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/ack")
public String ack() {
rabbitTemplate.convertAndSend(Constant.ACK_EXCHANGE, "ack", "ack test...");
return "发送成功";
}
}
消费端逻辑
package com.example.materialflowservice.listener;
import com.example.orderservice.Constant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class AckListener {
/**
* 监听消息
*
* @param message
*/
@RabbitListener(queues = Constant.ACK_QUEUE)
public void listener(Message message, Channel channel) {
log.info("接收到消息: {}, deliverTag: {}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());
//int num = 3 / 0;
log.info("业务处理完成");
}
}
正常情况下是能接收到消息的
将 int num = 3 / 0 的注释放开
可以看到报异常了
但消息没有保留,依然被处理掉了。
这种情况就可能会导致我们的消息丢失。
(2)AcknowledgeMode.AUTO(默认)
根据deliverTag可以看出,消息是一直在不停重试的
并且还是保留在队列当中的,并没有丢弃
(3)AcknowledgeMode.MANUAL
消费者逻辑
package com.example.materialflowservice.listener;
import com.example.orderservice.Constant;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@Slf4j
@Component
public class AckListener {
/**
* 监听消息
*
* @param message
*/
@RabbitListener(queues = Constant.ACK_QUEUE)
public void listener(Message message, Channel channel) throws Exception {
try {
log.info("接收到消息: {}, deliverTag: {}", new String(message.getBody(), StandardCharsets.UTF_8), message.getMessageProperties().getDeliveryTag());
//int num = 3 / 0;
log.info("业务处理完成");
// 确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝消息,重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
}
}
}
此时是正常情况,消息被正常签收。
将 int num = 3 / 0 的注释放开之后,再次运行
可以看出一直在进行重新入队操作。
如果我们将确认消息注释掉:
// channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
可以看到Unacked变为1了,未确认状态。
当我们把basicNack第三个参数设为false:
// 拒绝消息,禁止重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, false);
可以看到消息被丢弃了。
以上就是Spring-AMQP 对消息确认机制提供了三种策略。
版权归原作者 白开水不加冰 所有, 如有侵权,请联系我们删除。