1.RabbitMQ的消息可靠性投递
- 什么是消息的可靠性投递 - 保证消息百分百发送到消息队列中去- 保证MQ节点成功接收消息- 消息发送端需要接收到MQ服务端接收到消息的确认应答- 完善的消息补偿机制,发送失败的消息可以再感知并二次处理
- RabbitMQ消息投递路径 - 生产者–>交换机–>队列–>消费者- 通过两个节点控制消息的可靠性投递 - 生产者到交换机:通过confirmCallback- 交换机到队列:通过returnCallback
- 建议 - 开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互,RabbitMQ整体效率变低,吞吐量下降严重,不是很重要的消息不建议使用消息确认机制
2.RabbitMQ消息可靠性投递confirmCallback实战
- 生产者到交换机- 通过confirmCallback- 生产者投递消息后,如果Broker收到消息后,会给生产者一个ACK。生产者通过ACK可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心
- 开启confirmCallback配置
spring.rabbitmq.publisher-confirm-type=correlated
- 消息发送测试
packagecom.gen;importcom.gen.config.RabbitMQConfig;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassGenRabbitmqApplicationTests{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestvoidtestConfirmCallback(){this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause)->{System.out.println(correlationData);System.out.println(cause);if(ack){System.out.println("发送成功");}else{System.out.println("发送失败");}});this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","您有新订单!!!");// 模拟消息投递失败// this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+"1", "order.new", "您有新订单!!!");}}
3.RabbitMQ消息可靠性投递returnCallback实战
- 交换机到队列- 通过returnCallback- 消息从交换机发送到对应队列失败时触发- 两种模式 - 交换机到队列不成功,则丢弃消息(默认)- 交换机到队列不成功,返回给消息生产者,触发returnCallback
- 配置文件开启配置
# 开启returnCallback配置spring.rabbitmq.publisher-returns=true# 修改交换机投递到队列失败的策略,true交换机处理消息到路由失败会返回给生产者spring.rabbitmq.template.mandatory=true
- 消息发送测试
packagecom.gen;importcom.gen.config.RabbitMQConfig;importorg.junit.jupiter.api.Test;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.boot.test.context.SpringBootTest;@SpringBootTestclassGenRabbitmqApplicationTests{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestvoidtestReturnCallback(){this.rabbitTemplate.setReturnsCallback((returned)->{System.out.println(returned);});this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","您有新订单!!!");// 模拟消息转发队列失败// this.rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "gen.order.new", "您有新订单!!!");}}
4.RabbitMQ消息确认机制ACK
- 背景:消费者从Broker中监听消息,需要确保消息被合理处理
- RabbitMQ的ACK介绍- 消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除- 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中- 只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除- 消息的ACK确认机制默认是打开的,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked
- 确认方式- 自动确认(默认)- 手动确认manual
- 配置文件开启手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
- 消费者代码
packagecom.gen.listener;importcom.gen.config.RabbitMQConfig;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.io.IOException;@Component@RabbitListener(queues =RabbitMQConfig.QUEUE_NAME)publicclassOrderMQListener{@RabbitHandlerpublicvoidorderConsumer(String msg,Message message,Channel channel)throwsIOException{System.out.println(msg);System.out.println(message);System.out.println(channel);long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.println(deliveryTag);// 成功确认,消费成功 channel.basicAck(deliveryTag,false);// 拒绝后重新入队// channel.basicNack(deliveryTag, false,true);}}
- deliveryTag介绍:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加
- basicNack和basicReject介绍- basicReject一次只能拒绝接收一个消息,可以设置是否重新入队requeue- basicNack方法可以支持一次0个或者多个消息的拒收,可以设置是否重新入队requeue
- 人工审核异常消息- 设置重试阈值,超过后确认消费成功,记录消息,人工处理
标签:
rabbitmq
本文转载自: https://blog.csdn.net/2302_76363587/article/details/136169238
版权归原作者 水宝的滚动歌词 所有, 如有侵权,请联系我们删除。
版权归原作者 水宝的滚动歌词 所有, 如有侵权,请联系我们删除。