RabbitMQ消息确认的本质也就是为了解决RabbitMQ消息丢失问题,因为哪怕我们做了RabbitMQ持久化,其实也并不能保证解决我们的消息丢失问题
RabbitMQ的消息确认有两种
- 第一种是消息发送确认。这种是用来确认生产者将消息发送给交换器,交换器传递给队列的过程中,消息是否成功投递。发送确认分为两步,一是确认是否到达交换器,二是确认是否到达队列。
- 第二种是消费接收确认。这种是确认消费者是否成功消费了队列中的消息。
1.消息发送确认(生产者)
正常情况下,生产者会通过交换机发送消息至队列中,再由消费者来进行消费,但是其实RabbitMQ在接收到消息后,还需要一段时间消息才能存入磁盘,并且其实也不是每条消息都会存入磁盘,可能仅仅只保存到cache中,这时,如果RabbitMQ正巧发生崩溃,消息则就会丢失,所以为了避免该情况的发生,我们引入了生产者确认机制,rabbitmq对此提供了两种方式:
方法一:Confirm模式
通过设置生产者Channel为comfirm模式,该Channel上发布的所有消息都会被指派一个唯一ID(每次从1开始累加),当消息到达生产者指定的消息队列后,broker会返回一个确认给生产者(包含之前的ID),这样生产者就能知道哪条消息成功发送了。
代码段:
public void sendQueue(String appId, String handleUserId, List<String> deviceIds) {
List<Object> list = new ArrayList<>();
JSONObject jsonObject = new JSONObject();
jsonObject.put(DeviceConstant.COMMAND, DELETE);
jsonObject.put(DeviceConstant.BODY, list );
String topicExchange = RabbitMqConstant.EXCHANGE_TOPIC_DATA;
String routingKey = RabbitMqConstant.ROUTING_KEY_LOCAL_DATA;
//rabbitTemplate.convertAndSend(topicExchange, routingKey, jsonObject.toJSONString());
try {
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(false);
channel.confirmSelect();
channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
channel.addConfirmListener(new ConfirmListener() {
//消息失败处理
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
log.info("sendQueue-ack-confirm-fail==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}--message:{}", topicExchange, routingKey, deliveryTag, multiple, jsonObject);
try {
Thread.sleep(3000l);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//重发
channel.basicPublish(topicExchange, routingKey, null, jsonObject.toJSONString().getBytes());
}
//消息成功处理
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
log.info("sendQueue-ack-confirm-successs==>exchange:{}--routingkey:{}--deliveryTag:{}--multiple:{}", topicExchange, routingKey, deliveryTag, multiple);
}
});
} catch (Exception e) {
log.error("sendQueue-ack-发送消息失败:{}", ExceptionUtils.getStackTrace(e));
}
}
方法二:手动确认,ConfirmCallback、returnCallback
代码段:
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.stereotype.Component;
import java.util.UUID;
/**
* RabbitMq 生产者ACK
*/
@Slf4j
@Component
public class RabbitMqProducerAck implements RabbitTemplate.ConfirmCallback , RabbitTemplate.ReturnCallback{
@Autowired
private RabbitTemplate rabbitTemplatenew;
/**
* @param message
*/
public void send(String topicName, String routingKey, String message){
//设置由于网络问题导致的连接Rabbitmq失败的重试策略
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3));
//发送之前可以先把消息保存到数据库
rabbitTemplatenew.setEncoding("UTF-8");
rabbitTemplatenew.setMandatory(true);
rabbitTemplatenew.setConfirmCallback(this);// 指定 ConfirmCallback
rabbitTemplatenew.setReturnCallback(this);// 指定 ReturnCallback
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
log.info("rabbitMqProducerAck-confirm-sender==>{}----exchange:{}--routingkey:{}", correlationData.getId(), topicName, routingKey, message);
this.rabbitTemplatenew.convertAndSend(topicName, routingKey, message, correlationData);
try {
Thread.sleep(100);//线程休眠,为了不让方法直接结束,回调函数无法正常回调confirm方法
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
message=null;//强引用设置为null,便于gc回收
}
}
/**
* @param correlationData
* @param ack
* @param cause
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("rabbitMqProducerAck-confirm-successs==>消息回调confirm函数:{},ack:{},cause:{}", JSONObject.toJSONString(correlationData), ack, cause);
}
/**
* @param message
* @param replyCode
* @param replyText
* @param exchange
* @param routingKey
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("rabbitMqProducerAck-confirm-fail==>消息使用的交换器 exchange : {}--消息使用的路由键 routing :{}--消息主体 message : {}-replyCode : {}-replyText: {}", exchange, routingKey, message.getBody(),replyCode,replyText);
try {
Thread.sleep(3000l);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
//从新发送
this.send(exchange, routingKey, new String(message.getBody()));
}
}
2.消息接收确认(消费者)
消息接收确认机制,分为消息自动确认模式和消息手动确认模式,当消息确认后,我们队列中的消息将会移除
那这两种模式要如何选择呢?
- 如果消息不太重要,丢失也没有影响,那么自动ACK会比较方便。好处就是可以提高吞吐量,缺点就是会丢失消息
- 如果消息非常重要,不容丢失,则建议手动ACK,正常情况都是更建议使用手动ACK。虽然可以解决消息不会丢失的问题,但是可能会造成消费者过载
**1):rabbitmq消费者默认情况下是自动确认,不再多说
2):手动确认方式:**
@RabbitHandler
@RabbitListener(queues = RabbitMqConstant.xxx , concurrency = "1-1")
public void receiveQueueCommonLocal(Channel channel, Message message) {
String messageBody = new String(message.getBody());
//System.out.println("messageBody===>"+messageBody);
try {
//todo 业务逻辑
/*手动确认成功
* 参数:
* deliveryTag:该消息的index
* multiple:是否批量处理.true:将一次性ack所有小于deliveryTag的消息
* **/
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (Exception e) {
e.printStackTrace();
log.error("receiveQueueCommonLocal=====>ERROR:{}--josn:{}", ExceptionUtil.getMessage(e), messageBody);
try {
//手动确认回滚 拒绝deliveryTag对应的消息,第二个参数是否requeue,true则重新入队列,否则丢弃或者进入死信队列。
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}
版权归原作者 你是我的小丫小太阳 所有, 如有侵权,请联系我们删除。