0


RabbitMQ消息可靠性投递与ACK确认机制

1.RabbitMQ的消息可靠性投递

  • 什么是消息的可靠性投递 - 保证消息百分百发送到消息队列中去- 保证MQ节点成功接收消息- 消息发送端需要接收到MQ服务端接收到消息的确认应答- 完善的消息补偿机制,发送失败的消息可以再感知并二次处理
  • RabbitMQ消息投递路径 - 生产者–>交换机–>队列–>消费者- 通过两个节点控制消息的可靠性投递 - 生产者到交换机:通过confirmCallback- 交换机到队列:通过returnCallback
  • 建议 - 开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,RabbitMQ整体效率变低,吞吐量下降严重,不是很重要的消息不建议使用消息确认机制

2.RabbitMQ消息可靠性投递confirmCallback实战

  • 生产者到交换机- 通过confirmCallback- 生产者投递消息后,如果Broker收到消息后,会给生产者一个ACK。生产者通过ACK可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心
  • 开启confirmCallback配置spring.rabbitmq.publisher-confirm-type=correlated
  • 消息发送测试packagecom.gen;importcom.gen.config.RabbitMQConfig;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassGenRabbitmqApplicationTests{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestvoidtestConfirmCallback(){this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{System.out.println(correlationData);System.out.println(cause);if(ack){System.out.println("发送成功");}else{System.out.println("发送失败");}});this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","您有新订单!!!");// 模拟消息投递失败// this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+"1", "order.new", "您有新订单!!!");}}

3.RabbitMQ消息可靠性投递returnCallback实战

  • 交换机到队列- 通过returnCallback- 消息从交换机发送到对应队列失败时触发- 两种模式 - 交换机到队列不成功,则丢弃消息(默认)- 交换机到队列不成功,返回给消息生产者,触发returnCallback
  • 配置文件开启配置# 开启returnCallback配置spring.rabbitmq.publisher-returns=true# 修改交换机投递到队列失败的策略,true交换机处理消息到路由失败会返回给生产者spring.rabbitmq.template.mandatory=true
  • 消息发送测试packagecom.gen;importcom.gen.config.RabbitMQConfig;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassGenRabbitmqApplicationTests{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestvoidtestReturnCallback(){this.rabbitTemplate.setReturnsCallback((returned)->{System.out.println(returned);});this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","您有新订单!!!");// 模拟消息转发队列失败// this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "gen.order.new", "您有新订单!!!");}}

4.RabbitMQ消息确认机制ACK

  • 背景:消费者从Broker中监听消息,需要确保消息被合理处理

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • RabbitMQ的ACK介绍- 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除- 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中- 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除- 消息的ACK确认机制默认是打开的,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked
  • 确认方式- 自动确认(默认)- 手动确认manual
  • 配置文件开启手动确认spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 消费者代码packagecom.gen.listener;importcom.gen.config.RabbitMQConfig;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.IOException;@Component@RabbitListener(queues =RabbitMQConfig.QUEUE_NAME)publicclassOrderMQListener{@RabbitHandlerpublicvoidorderConsumer(String msg,Message message,Channel channel)throwsIOException{System.out.println(msg);System.out.println(message);System.out.println(channel);long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println(deliveryTag);// 成功确认,消费成功 channel.basicAck(deliveryTag,false);// 拒绝后重新入队// channel.basicNack(deliveryTag, false,true);}}
  • deliveryTag介绍:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
  • basicNack和basicReject介绍- basicReject一次只能拒绝接收一个消息,可以设置是否重新入队requeue- basicNack方法可以支持一次0个或者多个消息的拒收,可以设置是否重新入队requeue
  • 人工审核异常消息- 设置重试阈值,超过后确认消费成功,记录消息,人工处理
标签: rabbitmq

本文转载自: https://blog.csdn.net/2302_76363587/article/details/136169238
版权归原作者 水宝的滚动歌词 所有, 如有侵权,请联系我们删除。

“RabbitMQ消息可靠性投递与ACK确认机制”的评论:

还没有评论