一、理论说明
1.1、数据的丢失问题,可能出现在生产者、MQ、消费者中
1、如下图
1.2、生产者弄丢了数据
1、生产者将数据发送到 RabbitMQ 的时候,可能数据就在半路给搞丢了,因为网络问题啥的,都有可能。此时可以选择用RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect ,然后发送消息,如果消息没有成功RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback ,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit
2、但是问题是,RabbitMQ 事务机制(同步)一搞,基本上吞吐量会下来,因为太耗性能。所以一般来说,如果你要确保说 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。
3、如果RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。
4、事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。
1.3、RabbitMQ 弄丢了数据
1、就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。
设置持久化有两个步骤:
- 创建 queue 的时候将其设置为持久化
这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
- 第二个是发送消息的时候将消息的 deliveryMode 设置为 2
就是将消息设置为持久化,此时 RabbitMQ 就会将消息持久化到磁盘上去。
2、必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复queue,恢复这个 queue 里的数据。注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack ,你也是可以自己重发的
1.4、消费端弄丢了数据
RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,自己还没处理,结果自己的进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack ,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。
1.5、总结
二、结合springBoot并保证消息可靠性
1、模拟业务,我们希望会员服务在多个实例的情况下,每个实例只需要收到一次消息
2、项目结构
3、生产者和消费者引入pom依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
4、生产者和消费者配置rabbitmq信息
spring:# rabbitmq 配置信息rabbitmq:host: 127.0.0.1
port:5672username: guest
password: guest
2.1、保证生产者到rabbitmq阶段消息投递的安全
1、RabbitMQ 为我们提供了两种方式用来控制消息的投递可靠性模式。
- confirm 确认模式
- return 退回模式
2、我们先来了解一下 rabbitmq 整个消息投递的路径,如下:
producer—>rabbitmq broker—>exchange—>queue—>consumer
3、消息从 producer 到 exchange 则会返回一个 confirmCallback 。
4、消息从 exchange–>queue 投递失败则会返回一个 returnCallback 。
我们将利用这两个 callback 控制消息的可靠性投递
2.1.1、【生产者】开启confirm和return 配置
1、在配置文件中直接开启即可
spring:# rabbitmq 配置信息rabbitmq:host: 127.0.0.1
port:5672username: guest
password: guest
#1、确保消息从发送端到服务端投递可靠(分为以下两个步骤)#1.1、确认消息已发送到交换机(Exchange) 可以把publisher-confirms: true 替换为 publisher-confirm-type: correlatepublisher-confirm-type: correlated
#1.2、确认消息从交换机中到队列中publisher-returns:true
2.1.2、【生产者】配置交换器,队列,以及routingKey三件套
1、生产者项目结构
2、
DirectRabbitConfig
这里根据前面的需求,我们创建
Direct
类型的交换器即可,至于其他类型的交换器以及区别,后面有说明
packagecn.gxm.producer.config;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;/**
* @author GXM
* @version 1.0.0
* @Description 创建direct类型的交换机
* @createTime 2023年01月03日
*/@Slf4j@ConfigurationpublicclassDirectRabbitConfig{privatestaticfinalStringQUEUE="TestDirectQueue";privatestaticfinalStringEXCHANGE="TestDirectExchange";privatestaticfinalStringROUTING_KEY="TestDirectRouting";/**
* 创建一个名为TestDirectQueue的队列
*
* @return
*/@BeanpublicQueuetestDirectQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。// arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。returnnewQueue(QUEUE,true);}/**
* 创建一个名为TestDirectExchange的Direct类型的交换机
*
* @return
*/@BeanpublicDirectExchangetestDirectExchange(){// durable:是否持久化,默认是false,持久化交换机。// autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。// arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机returnnewDirectExchange(EXCHANGE,true,false);}/**
* 绑定交换机和队列
*
* @return
*/@BeanpublicBindingbindingDirect(){//bind队列to交换机中with路由key(routing key)returnBindingBuilder.bind(testDirectQueue()).to(testDirectExchange()).with(ROUTING_KEY);}}
3、
RabbitTemplate
配置配置,这里有一个点非常重要,
rabbitTemplate.setMandatory(true);
必须得设置成true,否则无法回调ReturnsCallback
。- Mandatory:为true时,消息通过交换器无法匹配到队列会返回给生产者 并触发MessageReturn,而为false时,匹配不到会直接被丢弃
packagecn.gxm.producer.config;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ReturnedMessage;importorg.springframework.amqp.rabbit.connection.ConnectionFactory;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@Slf4j@ConfigurationpublicclassRabbitConfig{@BeanpublicRabbitTemplatecreateRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate =newRabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);//设置消息投递失败的策略,有两种策略:自动删除或返回到客户端。//我们既然要做可靠性,当然是设置为返回到客户端(true是返回客户端,false是自动删除)
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback(newRabbitTemplate.ConfirmCallback(){@Overridepublicvoidconfirm(CorrelationData correlationData,boolean ack,String cause){if(ack){
log.info("ConfirmCallback 关联数据:{},投递成功,确认情况:{}", correlationData, ack);}else{
log.info("ConfirmCallback 关联数据:{},投递失败,确认情况:{},原因:{}", correlationData, ack, cause);}}});
rabbitTemplate.setReturnsCallback(newRabbitTemplate.ReturnsCallback(){@OverridepublicvoidreturnedMessage(ReturnedMessage returnedMessage){
log.info("ReturnsCallback 消息:{},回应码:{},回应信息:{},交换机:{},路由键:{}", returnedMessage.getMessage(), returnedMessage.getReplyCode(), returnedMessage.getReplyText(), returnedMessage.getExchange(), returnedMessage.getRoutingKey());}});return rabbitTemplate;}}
4、写一个
TestController
,用作消息发送,一共发送5条消息,其中一条触发confirmCallback的失败,一条触发returnCallback,但是五条都会触发confirmCallback,他们的关系是并行的.
packagecn.gxm.producer.controller;importcn.gxm.producer.vo.User;importcom.alibaba.fastjson.JSON;importorg.springframework.amqp.rabbit.connection.CorrelationData;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.RestController;importjava.util.UUID;/**
* @author GXM
* @version 1.0.0
* @Description TODO
* @createTime 2023年01月03日
*/@RestControllerpublicclassTestController{@AutowiredRabbitTemplate rabbitTemplate;@GetMapping("/test")publicStringtest(){return"producer ok";}@GetMapping("/push")publicStringpush(){for(int i =1; i <=5; i++){//这个参数是用来做消息的唯一标识//发布消息时使用,存储在消息的headers中User user =newUser(i,"汪涵");// 关联的数据,可以用在消息投递失败的时候,作为一个线索,比如我把当前用户的id放进去,如果user消息投递失败// 我后面可以根据id再找到user,再次投递数据CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString().concat("-")+ i);if(i ==2){//故意把交换机写错,演示 confirmCallback
rabbitTemplate.convertAndSend("TestDirectExchange_111","TestDirectRouting",JSON.toJSONString(user), correlationData);}elseif(i ==3){//故意把路由键写错,演示 returnCallback
rabbitTemplate.convertAndSend("TestDirectExchange","TestDirectRouting_111",JSON.toJSONString(user), correlationData);}else{//正常发送
rabbitTemplate.convertAndSend("TestDirectExchange","TestDirectRouting",JSON.toJSONString(user), correlationData);}}return"producer push ok";}}
5、开始请求测试,打印日志如下,你会发现,和测试controller的注释写的一样,这里就不再多说了。
2023-01-0316:34:14.912INFO27080---[nio-6072-exec-1]o.a.c.c.C.[Tomcat].[localhost].[/]:InitializingSpringDispatcherServlet 'dispatcherServlet'
2023-01-0316:34:14.912INFO27080---[nio-6072-exec-1]o.s.web.servlet.DispatcherServlet:InitializingServlet 'dispatcherServlet'
2023-01-0316:34:14.913INFO27080---[nio-6072-exec-1]o.s.web.servlet.DispatcherServlet:Completed initialization in 1 ms
2023-01-0316:34:14.975INFO27080---[nio-6072-exec-1]o.s.a.r.c.CachingConnectionFactory:Attemptingtoconnectto:[127.0.0.1:5672]2023-01-0316:34:14.992INFO27080---[nio-6072-exec-1]o.s.a.r.c.CachingConnectionFactory:Creatednew connection: rabbitConnectionFactory#5f95f1e1:0/SimpleConnection@12b7544d[delegate=amqp://[email protected]:5672/, localPort=63178]2023-01-0316:34:15.020ERROR27080---[127.0.0.1:5672]o.s.a.r.c.CachingConnectionFactory:ShutdownSignal: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND- no exchange 'TestDirectExchange_111' in vhost '/',class-id=60, method-id=40)2023-01-0316:34:15.022INFO27080---[nectionFactory2]cn.gxm.producer.config.RabbitConfig:ConfirmCallback 关联数据:CorrelationData[id=0ba3b21e-e4fc-44cf-84d7-17461c8c9c18-1],投递成功,确认情况:true2023-01-0316:34:15.022INFO27080---[nectionFactory3]cn.gxm.producer.config.RabbitConfig:ConfirmCallback 关联数据:CorrelationData[id=54c62476-6847-4193-8f11-2a3efd90c284-2],投递失败,确认情况:false,原因:channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND- no exchange 'TestDirectExchange_111' in vhost '/',class-id=60, method-id=40)2023-01-0316:34:15.023INFO27080---[nectionFactory3]cn.gxm.producer.config.RabbitConfig:ReturnsCallback 消息:(Body:'{"id":3,"name":"汪涵"}' MessageProperties[headers={spring_returned_message_correlation=d91bb792-b6b0-4bea-a909-6eb8d0997b74-3}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]),回应码:312,回应信息:NO_ROUTE,交换机:TestDirectExchange,路由键:TestDirectRouting_1112023-01-0316:34:15.024INFO27080---[nectionFactory2]cn.gxm.producer.config.RabbitConfig:ConfirmCallback 关联数据:CorrelationData[id=d91bb792-b6b0-4bea-a909-6eb8d0997b74-3],投递成功,确认情况:true2023-01-0316:34:15.025INFO27080---[nectionFactory2]cn.gxm.producer.config.RabbitConfig:ConfirmCallback 关联数据:CorrelationData[id=70852973-9d21-49a4-aa07-67ca8f11aed8-4],投递成功,确认情况:true2023-01-0316:34:15.026INFO27080---[nectionFactory2]cn.gxm.producer.config.RabbitConfig:ConfirmCallback 关联数据:CorrelationData[id=9dc23013-5b97-47b4-8ebf-3f842615c2f7-5],投递成功,确认情况:true
6、因为我们此刻是没有启动消费者的,所以,在控制台是能看到三条数据的,id为2和id为3的数据是有问题的,都分别通过
confirmCallback
和
returnCallback
来进行回调了,则具体的补偿措施,可以根据自身的业务来处理。
2.2、保证rabbitmq阶段消息存储的安全
1、这个阶段我们只要保证队列,交换机,以及发送数据的持久化就🆗。
- 队列持久化,第二个持久化的参数为true即可,即使
rabbitmq
重启,这个队列还是存在的
/**
* 创建一个名为TestDirectQueue的队列
*
* @return
*/@BeanpublicQueuetestDirectQueue(){// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,有消息者订阅本队列,然后所有消费者都解除订阅此队列,会自动删除。// arguments:队列携带的参数,比如设置队列的死信队列,消息的过期时间等等。returnnewQueue(QUEUE,true);}
- 交换机持久化,第二个持久化的参数为true即可,即使
rabbitmq
重启,这个交换机还是存在的
/**
* 创建一个名为TestDirectExchange的Direct类型的交换机
*
* @return
*/@BeanpublicDirectExchangetestDirectExchange(){// durable:是否持久化,默认是false,持久化交换机。// autoDelete:是否自动删除,交换机先有队列或者其他交换机绑定的时候,然后当该交换机没有队列或其他交换机绑定的时候,会自动删除。// arguments:交换机设置的参数,比如设置交换机的备用交换机(Alternate Exchange),当消息不能被路由到该交换机绑定的队列上时,会自动路由到备用交换机returnnewDirectExchange(EXCHANGE,true,false);}
- 数据持久化,如果你使用原生方式,设置
deliveryMode
参数为2即可
//消息持久化测试Builder builder =newBuilder();
builder.deliveryMode(2);BasicProperties properties = builder.build();
channel.basicPublish("", queue_name, properties, string.getBytes());
其中针对BasicProperties中的源码信息为:
publicstaticclassBasicPropertiesextendscom.rabbitmq.client.impl.AMQBasicProperties{privateString contentType;//消息类型如:text/plainprivateString contentEncoding;//编码privateMap<String,Object> headers;privateInteger deliveryMode;//1:nonpersistent 不持久 2:persistent 持久privateInteger priority;//优先级privateString correlationId;privateString replyTo;//反馈队列privateString expiration;//expiration到期时间privateString messageId;privateDate timestamp;privateString type;privateString userId;privateString appId;privateString clusterId;...
而我们如果使用springBoot的
RabbitTemplate
,则默认会进行数据持久化,具体springBoot的持久化封装,可以查看文章 Springboot 2.x ——RabbitTemplate为什么会默认消息持久化? 所以,如下发送数据,会默认执行数据持久化
rabbitTemplate.convertAndSend("TestDirectExchange","TestDirectRouting",JSON.toJSONString(user), correlationData);
2.3、保证消费端消费数据
1、根据前面的分析,我们在消费端,只需要手动
ACK
即可,这样就能保证消费端的数据可靠性了
2、消息确认模式有:
- AcknowledgeMode.NONE:自动确认
- AcknowledgeMode.AUTO:根据情况确认
- AcknowledgeMode.MANUAL:手动确认
2.3.1、【消费者】开启手动ack模式
- 默认情况下消息消费者是自动 ack (确认)消息的,如果要手动 ack(确认)则需要修改确认模式为 manual
如果设置了自动应答,又进行手动应答,会出现
double ack
,那么程序会报错。
注意这里使用的是simplet的,其实还有direct的,simple主要包括两种工作模式,direct主要包括四种,根据你的使用模式来选择配置即可
spring:rabbitmq:listener:simple:acknowledge-mode: manual
- 或在 RabbitListenerContainerFactory 中进行开启手动 ack
@BeanpublicRabbitListenerContainerFactory<?>rabbitListenerContainerFactory(ConnectionFactory connectionFactory){SimpleRabbitListenerContainerFactory factory =newSimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(newJackson2JsonMessageConverter());
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//开启手动 ackreturn factory;}
- 最后可以手动确认消息,以下两种方式都可以
@RabbitHandlerpublicvoidprocessMessage2(String message,Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)long tag){System.out.println(message);try{
channel.basicAck(tag,false);// 确认消息}catch(IOException e){
e.printStackTrace();}}
@RabbitListener(queues ="TestDirectQueue")@Component@Slf4jpublicclassDirectConsumer{@RabbitHandlerpublicvoidprocess(Object data,Channel channel,Message message)throwsIOException{
log.info("消费者接受到的消息是:{},消息体为:{}", data, message);//由于配置设置了手动应答,所以这里要进行一个手动应答。注意:如果设置了自动应答,这里又进行手动应答,会出现double ack,那么程序会报错。
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
- 需要注意的 basicAck 方法需要传递两个参数- deliveryTag(唯一标识 ID):当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel- multiple:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息
2.3.2、【消费者】开启ack后拒绝消息 nack
1、以下示例表示了,如果header中没有
error
字段,我们则否认消息,不接受消息,而一旦该消息被 nack 后,则该消息会一直重新入队列然后一直重新消费,所以这个很重要,我们最好设置重试的次数,不然消息一直nack,一直重复消费,如果消息一直在累加,则会越来越多的无法消费的数据,最终会拖垮rabbitmq。
也可以拒绝该消息,消息会被丢弃,不会重回队列,这样就会避免上述拖垮rabbitmq的情况出现
channel.basicReject((Long)map.get(AmqpHeaders.DELIVERY_TAG),false); //拒绝消息
@RabbitHandlerpublicvoidprocessMessage2(String message,Channel channel,@HeadersMap<String,Object> map){System.out.println(message);if(map.get("error")!=null){System.out.println("错误的消息");try{
channel.basicNack((Long)map.get(AmqpHeaders.DELIVERY_TAG),false,true);//否认消息return;}catch(IOException e){
e.printStackTrace();}}try{
channel.basicAck((Long)map.get(AmqpHeaders.DELIVERY_TAG),false);//确认消息}catch(IOException e){
e.printStackTrace();}}
三、Exchange的三种模式:Direct,Fanout,Topic。
1、可以参考 Springboot+RabbitMq整合使用(含配置详解等)
总结:
在Exchange中,有三种模式:Direct,Fanout,Topic。
- Direct模式只会将消息转发到符合绑定routing key的队列中,如果没有符合routing key的队列,那么消息会丢失。而且Direct发送的消息是唯一的,也就是说再Direct中的一个消息,最后只会发送到一个队列中被消费。
- Fanout模式会无视routing key,会把消息转发到所有绑定到该交换机上的队列中。所以Fanout中的一个消息,会转发到所有的队列中,也就是如果绑定了多个队列,那么一个相同的消息会在多个队列中。
- Topic模式有一套转发的routing key规则,只会把消息转发到符合routing key 的队列中。所以在Topic中的一个消息有可能也会被转发到多个队列中进行消费。
四、注意消息模式是广播还是工作队列
1、消息广播,和我们平时说的“广播”意思差不多,就是希望同一条消息,不同消费者都能分别消费;而队列模式,就是不同消费者共享消费同一个队列的数据,相同消息只能被某一个消费者消费一次。
2、比如,同一个用户的注册消息,会员服务需要监听以发送欢迎短信,营销服务同样需要监听以发送新用户小礼物。但是,会员服务、营销服务都可能有多个实例,我们期望的是同一个用户的消息,可以同时广播给不同的服务(广播模式),但对于同一个服务的不同实例(比如会员服务 1 和会员服务 2),不管哪个实例来处理,处理一次即可(工作队列模式):
3、可以参考文章 异步处理好用,但非常容易用错
版权归原作者 铛铛响 所有, 如有侵权,请联系我们删除。