介绍
概要:RabbitMQ中间件的引入对于整个系统来说是一把双刃剑,在对系统进行解耦的同时也降低了消息的可靠性,但是对于某些系统来说我们又必须保证我们的消息是不会丢失的,因此rabbitmq给提供了以下一些功能来保证消息的可靠性,本文我们主要讲解消息可靠性中的 发送端确认机制 以及 消费端确认机制,需要保证消费者、RabbitMQ自己和消费者都不能丢消息
RabbitMQ通过 publisher confirm 机制来实现的消息发送端确认。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从 1 开始),一旦消息被投递到所有匹配的队列之后 (如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了。
在生产者向rabbitmq发送消息的整个流程中,生产者首先是要将消息发送给 交换机,然后交换机 根据指定的 路由键 把消息路由到指定的消息队列中,然后消费者从对应的消息队列对消息进行消费,因此我们要实现生产端的消息确认就需要保证 消息发送到交换机 以及 交换机路由消息到队列 的时候消息是不会丢失的
1、导入依赖
<dependencies><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>
2、配置文件
server:port:7001#RabbitMQspring:rabbitmq:username: admin
password: admin
host: 192.168.132.128
port:5672virtual-host: /
publisher-returns:truelistener:simple:concurrency:10#消费者数量max-concurrency:10#最大消费者的数量prefetch:1#限流(消费者每次从队列获取的消息数量)auto-startup:true#启动时自动启动容器acknowledge-mode: manual #手动ack# retry:# enabled: true# max-attempts: 3 # 重试次数# max-interval: 10000 # 重试最大间隔时间# initial-interval: 2000 # 重试初始间隔时间# multiplier: 2 # 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间publisher-confirm-type: correlated # 开启确认机制/老版 publisher-confirms: true
3、创建交换机
1、Direct交换机模式
packageedu.hunan.rabbitmq.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDirectConfig{//声明注册Direct交换机模式@BeanpublicDirectExchangedirectExchange(){returnnewDirectExchange("direct_exchange");}//声明队列@BeanpublicQueuedirectQueue(){/**
* 参数详解
* durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
* exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
* 一般设置一下队列的持久化就好,其余两个就是默认false
*/returnnewQueue("direct_queue",true);}@BeanpublicQueuedirectQueue2(){/**
* 参数详解
* durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
* exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
* 一般设置一下队列的持久化就好,其余两个就是默认false
*/returnnewQueue("direct_queue2",true);}//交换机和队列绑定@BeanpublicBindingdirectBinding(){returnBindingBuilder.bind(directQueue()).to(directExchange()).with("direct1");}@BeanpublicBindingdirectBinding2(){returnBindingBuilder.bind(directQueue2()).to(directExchange()).with("direct2");}}
2、FanOut交换机模式
packageedu.hunan.rabbitmq.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.FanoutExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassFanOutConfig{//声明注册FanOut交换机模式@BeanpublicFanoutExchangefanoutExchange(){returnnewFanoutExchange("fanout_exchange");}//声明队列@BeanpublicQueuefanoutQueue(){/**
* 参数详解
* durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效
* exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable
* autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。
* 一般设置一下队列的持久化就好,其余两个就是默认false
*/returnnewQueue("fanout_queue",true);}//交换机和队列绑定@BeanpublicBindingfanoutBinding(){returnBindingBuilder.bind(fanoutQueue()).to(fanoutExchange());}}
4、初始化回调方法
packageedu.hunan.rabbitmq.service;importorg.springframework.amqp.core.ReturnedMessage;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;importjavax.annotation.PostConstruct;@ServicepublicclassRabbitMQService{@AutowiredprivateRabbitTemplate rabbitTemplate;/**
* 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次。
*/@PostConstructpublicvoidinit(){/**
* 消息投递到交换机后触发回调
* 使用该功能需要开启确认,spring-boot中配置如下:
* publisher-confirm-type: correlated # 开启确认机制/老版 publisher-confirms: true
*/
rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){if(ack){System.err.println("消息投递成功,消息已确认->"+cause+"\t"+correlationData);}else{/**
* 如果消息投递失败需要设置重发 如果一直重发失败投递到死信队列中/数据库 进行手动排查
* 重发数据库+定时任务实现
*/System.err.println("消息投递失败,消息未确认->"+cause+"\t"+correlationData);}}});/**
* 通过实现ReturnsCallback接口
* 如果消息从交换机投递到队列中失败时触发
* 比如根据发送消息指定Routingkey找不到队列时触发
* 使用该功能需要开启确认,spring-boot中配置如下:
* spring.rabbitmq.publisher-returns = true
*/
rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returned){//需要将接收失败的保存到DB中并且手动排错System.err.println("队列接收消息失败,消息被退回"+returned);}});}}
5、生产者
packageedu.hunan.rabbitmq.service;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;/**
* 生产者
*/@ServicepublicclassDirectServiceImpl{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidmake(String message){//交换机名称String exchangeName ="direct_exchange";//如果有多个队列 通过routingkey 投递到指定的队列String routingkey ="direct1";//投递到交换机中System.err.println("生产者准备开始投递消息");
rabbitTemplate.convertAndSend(exchangeName,routingkey,message);}}
6、消费者
packageedu.hunan.rabbitmq.service;importcom.rabbitmq.client.Channel;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitHandler;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.messaging.handler.annotation.Headers;importorg.springframework.messaging.handler.annotation.Payload;importorg.springframework.stereotype.Service;importjava.io.IOException;/**
* 消费者
*/@Service@RabbitListener(queues ="direct_queue")publicclassConsumerDirectService{@RabbitHandlerpublicvoidreviceMessage(@PayloadStringMessage2,@HeadersChannel channel,Message message)throwsIOException,InterruptedException{System.err.println("测试异步不影响前端返回");/**
* 接收失败设置重试 拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发 消息则丢失
* 方式1 使用自带的重试机制并且控制重试次数 重试机制不能用try/catch否则会死循环 而是将异常抛出
* 触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环。
* 方式2 将接收失败的消息使用死行队列接盘 死行队列+try/catch
*/try{//处理消息System.err.println("消费者消息接收成功-》"+Message2);// 确认消息已经消费成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}catch(Exception e){System.err.println("消费处理异常"+userId+"\t"+e);// 拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发/**
* 接收失败设置重试 拒绝当前消息,并把消息返回原队列 都设置false表示拒绝重发 消息则丢失
* 方式1 使用自带的重试机制并且控制重试次数 重试机制不能用try/catch否则会死循环 而是将异常抛出
* 触发重试机制需要消费者抛出异常,而不能try/catch捕捉异常,不然会死循环。
* 方式2 将接收失败的消息使用死信队列接盘 死信队列+try/catch
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);}}}
版权归原作者 .小罗同学 所有, 如有侵权,请联系我们删除。