文章目录
对 RabbitMQ 不是很了解的同学,可以看一下我的另一篇博文:RabbitMQ快速入门(MQ的概念、安装RabbitMQ、在 SpringBoot 项目中集成 RabbitMQ )
1. 消息丢失的情况
消息丢失的情况主要有以下三种:
- 生产者向消息代理传递消息的过程中,消息丢失了
- 消息代理( RabbitMQ )把消息弄丢了
- 消费者把消息弄丢了
那怎么保证消息的可靠性呢,我们可以从消息丢失的情况入手——从生产者、消息代理( RabbitMQ )、消费者三个方面来保证消息的可靠性
2. 生产者的可靠性
2.1 生产者重连
由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制
application.yml(将 host 更改为部署 RabbitMQ 的服务器的地址)
spring:rabbitmq:host: 127.0.0.1
port:5672virtual-host: /blog
username: CaiXuKun
password: T1rhFXMGXIOYCoyi
connection-timeout: 1s # 连接超时时间template:retry:enabled:true# 开启连接超时重试机制initial-interval: 1000ms # 连接失败后的初始等待时间multiplier:1# 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multipliermax-attempts:3# 最大重试次数
填写完配置信息后,我们手动停止 RabbitMQ ,模拟生产者连接 RabbitMQ 失败的情况
sudodocker stop rabbitmq
启动测试类
@TestvoidtestSendMessageToQueue(){String queueName ="simple.queue";String msg ="Hello, SpringAMQP!";
rabbitTemplate.convertAndSend(queueName, msg);}
可以在控制台看到,总共有三次重新连接 RabbitMQ 的记录,三次连接都失败后,就直接抛异常了
注意事项:
- 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能
- 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也 可以考虑使用异步线程来执行发送消息的代码
2.2 生产者确认
RabbitMQ 提供了
Publisher Confirm
和
Publisher Return
两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况:
- 消息投递到了 MQ,但是路由失败,此时会通过 PublisherReturn 机制返回路由异常的原因,然后返回 ACK,告知生产者消息投递成功
- 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功
- 其它情况都会返回 NACK,告知生产者消息投递失败
2.3 生产者确认机制的代码实现
在 publisher 服务中编写与生产者确认机制有关的配置信息( application.yml 文件)
spring:rabbitmq:publisher-returns:truepublisher-confirm-type: correlated
publisher-confirm-type 有三种模式:
- none:关闭 confirm 机制
- simple:以同步阻塞等待的方式返回 MQ 的回执消息
- correlated:以异步回调方式的方式返回 MQ 的回执消息
每个 RabbitTemplate 只能配置一个 ReturnCallback
在 publisher 模块新增一个名为
RabbitMQConfig
的配置类,并让该类实现
ApplicationContextAware
接口
importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.BeansException;importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationContextAware;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassRabbitMQConfigimplementsApplicationContextAware{@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置回调
rabbitTemplate.setReturnsCallback((returnedMessage)->{System.out.println("收到消息的return callback, "+"exchange = "+ returnedMessage.getExchange()+", "+"routingKey = "+ returnedMessage.getRoutingKey()+", "+"replyCode = "+ returnedMessage.getReplyCode()+", "+"replyText = "+ returnedMessage.getReplyText()+", "+"message = "+ returnedMessage.getMessage());});}}
测试前先运行 RabbitMQ
sudodocker start rabbitmq
在 publisher 模块添加一个测试类,测试 ReturnCallback 的效果
@TestvoidtestConfirmCallback()throwsInterruptedException{CorrelationData correlationData =newCorrelationData();
correlationData.getFuture().whenCompleteAsync((confirm, throwable)->{if(confirm.isAck()){// 消息发送成功System.out.println("消息发送成功,收到ack");}else{// 消息发送失败System.err.println("消息发送失败,收到nack,原因是"+ confirm.getReason());}if(throwable !=null){// 消息回调失败System.err.println("消息回调失败");}});
rabbitTemplate.convertAndSend("blog.direct","red","Hello, confirm callback", correlationData);// 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果Thread.sleep(2000);}
发送成功后可以看到消息发送成功的回调信息
如果交换机不存在会怎么样呢,我们故意使用一个不存在的交换机,观察控制台的输出结果
如果 routingKey 不存在会怎么样呢,我们故意使用一个不存在的 routingKey ,观察控制台的输出结果
可以看到,confirmCallback 和 ReturnCallback 都返回了回调信息(
deliveryTag
为
0
表示消息无法路由到队列)
2.4 如何看待和处理生产者的确认信息
- 生产者确认需要额外的网络开销和系统资源开销,尽量不要使用
- 如果一定要使用,无需开启 Publisher-Return 机制,因为路由失败一般是业务出了问题
- 对于返回 nack 的消息,可以尝试重新投递,如果依然失败,则记录异常消息
3. 消息代理(RabbitMQ)的可靠性
在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:
- 一旦 RabbitMQ 宕机,内存中的消息会丢失
- 内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象)
怎么理解 MQ 阻塞呢,当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息
我们来测试一下消息丢失的情况,在 RabbitMQ 的控制台中向 simple.queue 队列发送一条信息,发送后重启 RabbitMQ ,模拟 RabbitMQ 宕机后重启的情况
测试前,记得先把监听 simple.queue 队列的代码注释掉
@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueue(String message){System.out.println("消费者收到了simple.queue的消息:【"+ message +"】");}
第一步:先发送一条消息
第二步:查看消息的情况
第三步:重启 RabbitMQ ,模拟 RabbitMQ 宕机后重启的情况
sudodocker restart rabbitmq
第四步:查看消息的情况(可以看到,RabbitMQ 重启后,消息丢失了)
3.1 数据持久化
RabbitMQ 实现数据持久化包括 3 个方面:
- 交换机持久化
- 队列持久化
- 消息持久化
注意事项:
- 利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的
- 在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)
我们来演示一下 RabbitMQ 发生 Paged Out 现象(也就是队列的空间被消息占满了之后,将老旧消息移到磁盘,为新消息腾出空间的情况)
我们编写一个测试类,向 simple.queue 一次性发送一百万条消息
在发送消息之前,先把生产者确认机制关闭,提高消息发送的速度
spring:rabbitmq:publisher-returns:falsepublisher-confirm-type: none
先测试发送非持久化信息
@TestvoidtestPagedOut(){Message message =MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT).build();for(int i =0; i <1; i++){
rabbitTemplate.convertAndSend("simple.queue", message);}}
测试结果
再测试发送持久化信息
@TestvoidtestPagedOut(){Message message =MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).build();for(int i =0; i <1; i++){
rabbitTemplate.convertAndSend("simple.queue", message);}}
3.2 LazyQueue( 3.12 版本后所有队列都是 Lazy Queue 模式)
从 RabbitMQ 的
3.6.0
版本开始,增加了 Lazy Queue 的概念,也就是惰性队列,惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认 2048条 )
- 消费者要处理消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储,在 3.12 版本后,所有队列都是 Lazy Queue 模式,无法更改
开启持久化和生产者确认时,RabbitMQ 只有在消息持久化完成后才会给生产者返回 ACK 回执
在 RabbitMQ 的控制台可以看到 RabbitMQ 的版本
在 RabbitMQ 控制台中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可
x-queue-mode
在 Java 代码中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可
编程式创建
@Beanpublicorg.springframework.amqp.core.QueuelazeQueue(){returnQueueBuilder.durable("lazy.queue1").lazy().build();}
注解式创建
@RabbitListener(queuesToDeclare [email protected](
name ="lazy.queue2",
durable ="true",
arguments =@Argument(
name ="x-queue-mode",
value ="lazy")))publicvoidlistenLazeQueue(String message){System.out.println("消费者收到了 laze.queue2的消息: "+ message);}
4. 消费者的可靠性
4.1 消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。处理消息后,消费者应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:
- ack:成功处理消息,RabbitMQ 从队列中删除该消息
- nack:消息处理失败,RabbitMQ 需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息
SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:
- none:不处理,即消息投递给消费者后立刻 ack,消息会会立刻从 MQ 中删除,非常不安全,不建议使用
- manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject ,存在业务入侵,但更灵活
- auto:自动模式,SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack,当业务出现异常时,会根据异常的类型返回不同结果: - 如果是业务异常,会自动返回 nack- 如果是消息处理或校验异常,自动返回 reject
开启消息确认机制,需要在
application.yml
文件中编写相关的配置
spring:rabbitmq:listener:simple:prefetch:1acknowledge-mode: none
先测试处理模式为 none 的情况,向 simple.queue 队列发送一条消息,同时监听 simple.queue 队列的消息,监听到队列中的消息后手动抛出一个异常
publisher 服务
@TestvoidtestSendMessageToQueue(){String queueName ="simple.queue";String msg ="Hello, SpringAMQP!";
rabbitTemplate.convertAndSend(queueName, msg);}
consumer 服务
@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueue(String message){System.out.println("消费者收到了simple.queue的消息:【"+ message +"】");thrownewRuntimeException("故意抛出异常");}
不出意外,程序报错了
但在 RabbitMQ 的控制台可以看到,消息也丢失了
再测试处理模式为 auto 、抛出业务异常(RunTime Exception)的情况
可以看到,控制台一直在报错,报错之后一直在尝试重新发送消息
在 RabbitMQ 的控制台可以看到,simple.queue 一直在收发消息,速率达到了 97 次每秒(状态为 running ,消息的状态为 Unacked )
此时,我们手动关闭 consumer 服务,查看 RabbitMQ 的控制台,可以看到消息恢复到正常的状态了
再来测试处理模式为 auto 、异常类型为 MessageConversionException 的情况
@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueue(String message){System.out.println("消费者收到了simple.queue的消息:【"+ message +"】");thrownewMessageConversionException("故意抛出异常");}
在控制台可以看到,消息被拒绝了,而且消息也没有重新发送
查看 RabbitMQ 的控制台,可以发现消息已经从队列中移除了
4.2 失败重试机制
当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入 无限循环,给 RabbitMQ 带来不必要的压力
我们可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队
在 application.yml 配置文件中开启失败重试机制
spring:rabbitmq:listener:simple:prefetch:1acknowledge-mode: auto
retry:enabled:true# 开启消息消费失败重试机制initial-interval: 1000ms # 消息消费失败后的初始等待时间multiplier:1# 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multipliermax-attempts:3# 最大重试次数stateless:true# true表示无状态,false表示有状态,如果业务中包含事务,需要设置为false
我们将抛出的异常类型改回 RuntimeException
@RabbitListener(queues ="simple.queue")publicvoidlistenSimpleQueue(String message){System.out.println("消费者收到了simple.queue的消息:【"+ message +"】");thrownewRuntimeException("故意抛出异常");}
在控制台可以看出,消息的重新发送次数已经耗尽了
查看 RabbitMQ 的控制台,发现消息也丢失了
正常情况下,消息丢失都不是我们想看到的,该怎么解决这个问题呢
4.3 失败消息的处理策略
开启重试模式后,如果重试次数耗尽后消息依然处理失败,则需要由 MessageRecoverer 接口来处理, MessageRecoverer 有三个实现类:
RejectAndDontRequeueRecoverer
:重试次数耗尽后,直接 reject,丢弃消息,默认就是这种方式ImmediateRequeueMessageRecoverer
:重试次数耗尽后,返回 nack,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
我们来演示一下使用 RepublishMessageRecoverer 类的情况
第一步:定义一个名为 blog.error 的交换机、一个名为 error.queue 的队列,并将队列和交换机进行绑定
importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.retry.MessageRecoverer;importorg.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;importorg.springframework.boot.autoconfigure.condition.ConditionalOnProperty;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@Configuration@ConditionalOnProperty(prefix ="spring.rabbitmq.listener.simple.retry", name ="enabled", havingValue ="true")publicclassErrorConfiguration{@BeanpublicDirectExchangeerrorExchange(){returnnewDirectExchange("error.direct",true,false);}@BeanpublicQueueerrorQueue(){returnnewQueue("error.queue",true,false,false);}@BeanpublicBindingerrorBinding(Queue errorQueue,DirectExchange errorExchange){returnBindingBuilder.bind(errorQueue).to(errorExchange).with("error");}}
第二步:将失败处理策略改为 RepublishMessageRecoverer (开起了消费者重试机制才会生效)
@BeanpublicMessageRecoverermessageRecoverer(RabbitTemplate rabbitTemplate){returnnewRepublishMessageRecoverer(rabbitTemplate,"error.direct","error");}
在控制台中可以看到,消息的重试次数耗尽后,消息被放入了 error.queue 队列
在 RabbitMQ 的控制塔也可以看到, error.direct 交换机 和 error.queue 队列成功创建,消息也成功放入了 error.queue 队列
总结:消费者如何保证消息一定被消费?
- 开启消费者确认机制为 auto ,由 Spring 帮我们确认,消息处理成功后返回 ack,异常时返回 nack
- 开启消费者失败重试机制,并设置
MessageRecoverer
,多次重试失败后将消息投递到异常交换机,交由人工处理
4.4 业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),绝对值函数具有幂等性
在程序开发中,幂等是指同一个业务,执行一次或多次对业务状态的影响是一致的
那么有什么方法能够确保业务的幂等性呢
4.4.1 方案一:为每条消息设置一个唯一的 id
给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:
- 为每条消息都生成一个唯一的 id,与消息一起投递给消费者
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库
- 如果消费者下次又收到相同消息,先去数据库查询该消息对应的 id 是否存在,如果存在则为重复消息,放弃处理
可以在指定 MessageConverter 的具体类型时,同时为 MessageConverter 设置自动创建一个 messageId
@BeanpublicMessageConverterjacksonMessageConvertor(){Jackson2JsonMessageConverter jackson2JsonMessageConverter =newJackson2JsonMessageConverter();
jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}
发送消息后,在 RabbitMQ 的控制台可以看到,消息的 properties 属性附带了 messageId 信息
但这种方式对业务有一定的侵入性
4.4.2 方案二:结合业务判断
结合业务逻辑,基于业务本身做判断。以支付业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付,只有未支付订单才需要修改,其它状态的订单不做处理
总结:如何保证支付服务与交易服务之间的订单状态一致性?
- 首先,支付服务会正在用户支付成功以后利用 MQ 发送消息通知交易服务,完成订单状态同步
- 其次,为了保证 MQ 消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性,同时也开启了MQ的持久化,避免因服务宕机导致消息丢失
- 最后,我们还在交易服务更新订单状态时做了业务幕等判断,避免因消息重复消费导致订单状态异常
4.5 兜底的解决方案
如果交易服务消息处理失败,支付服务和交易服务出现了数据不一致的情况,有没有什么兜底的解决方案?
我们可以在交易服务设置定时任务,定期查询订单支付状态,这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性
5. 延迟消息
5.1 什么是延迟消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息
延迟任务:一定时间之后才会执行的任务
5.2 死信交换机
当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
- 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)
利用死信交换机的特点,可以实现发送延迟消息的功能
5.3 延迟消息插件(推荐使用)
5.3.1 下载并安装延迟插件
RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后,可以将消息暂存一段时间,时间到了之后再将消息投递到队列中
插件的下载地址:rabbitmq-delayed-message-exchange
下载完插件后,运行以下指令,在输出信息中找到 Mounts ,再找到 RabbitMQ 的插件的安装目录
sudodocker inspect rabbitmq
然后进入 RabbitMQ 的插件的安装目录,将刚才下载的插件上传到该目录下
一般与 docker 相关的目录只有 root 用户才有权限访问,所以我们需要先打开 docker 目录的部分权限(耗时可能较长)
sudochmod +rx -R /var/lib/docker
接着打开
/var/lib/docker/volumes/rabbitmq-plugins/_data
目录的写权限(如果修改权限不生效,请切换到 root 用户执行指令)
sudochmod777 /var/lib/docker/volumes/rabbitmq-plugins/_data
将刚才下载的插件上传到
/var/lib/docker/volumes/rabbitmq-plugins/_data
目录
上传成功后将
/var/lib/docker/volumes/rabbitmq-plugins/_data
目录的权限复原
sudochmod755 /var/lib/docker/volumes/rabbitmq-plugins/_data
最后进入容器内部,运行指令安装插件,安装完成后退出容器内部
sudodockerexec-it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
看到以下信息,说明插件安装成功了
5.3.2 安装插件时可能遇到的问题
如果你遇到了以下错误,在执行
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
指令前先执行以下指令
chmod400 /var/lib/rabbitmq/.erlang.cookie
5.3.3 在 Java 代码中发送延迟消息
声明延迟交换机
@BeanpublicDirectExchangedelayExchange(){returnExchangeBuilder.directExchange("delay.direct").delayed().build();}
声明队列和延迟交换机,并将队列和延迟交换机绑定在一起
@RabbitListener(bindings =@QueueBinding(
value =@Queue(name ="delay.queue"),
exchange =@Exchange(name ="delay.direct", delayed ="true", type =ExchangeTypes.DIRECT),
key ="delay"))publicvoidlistenDelayQueue(String message){SimpleDateFormat simpleDateFormat =newSimpleDateFormat();
simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("消费者收到了 delay.queue的消息: "+ message +",时间:"+ simpleDateFormat.format(System.currentTimeMillis()));}
编写测试方法,测试发送延迟消息
@TestvoidtestSendDelayMessage(){
rabbitTemplate.convertAndSend("delay.direct","delay","Hello, DelayQueue!",newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
message.getMessageProperties().setDelay(10000);// 毫秒return message;}});SimpleDateFormat simpleDateFormat =newSimpleDateFormat();
simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("发送消息成功!发送时间:"+ simpleDateFormat.format(System.currentTimeMillis()));}
发送延迟消息的本质是在消息头属性中添加 x-delay 属性
5.3.4 延迟消息的原理和缺点
RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒
RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)
定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大
所以说,延迟消息适用于延迟时间较短的场景
5.4 取消超时订单
设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:
- 如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
- 大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源
5.5 发送延迟检测订单的消息
我们定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)
importjava.util.ArrayList;importjava.util.Arrays;importjava.util.List;publicclassMultipleDelayMessage<T>{privateT data;privateList<Long> delayMillis;publicMultipleDelayMessage(){}publicMultipleDelayMessage(T data,Long... delayMillis){this.data = data;this.delayMillis =newArrayList<>(Arrays.asList(delayMillis));}publicMultipleDelayMessage(T data,List<Long> delayMillis){this.data = data;this.delayMillis = delayMillis;}publicstatic<T>MultipleDelayMessage<T>of(T data,Long... delayMillis){returnnewMultipleDelayMessage<>(data,newArrayList<>(Arrays.asList(delayMillis)));}publicstatic<T>MultipleDelayMessage<T>of(T data,List<Long> delayMillis){returnnewMultipleDelayMessage<>(data, delayMillis);}publicbooleanhasNextDelay(){return!delayMillis.isEmpty();}publicLongremoveNextDelay(){return delayMillis.remove(0);}publicTgetData(){return data;}publicvoidsetData(T data){this.data = data;}publicList<Long>getDelayMillis(){return delayMillis;}publicvoidsetDelayMillis(List<Long> delayMillis){this.delayMillis = delayMillis;}@OverridepublicStringtoString(){return"MultipleDelayMessage{"+"data="+ data +", delayMillis="+ delayMillis +'}';}}
我们再定义一个发送延迟消息的消息处理器,供所有服务使用
importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;publicclassDelayMessagePostProcessorimplementsMessagePostProcessor{privatefinalInteger delay;publicDelayMessagePostProcessor(Integer delay){this.delay = delay;}@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
message.getMessageProperties().setDelay(delay);return message;}}
改造后的发送延迟消息的测试方法
@TestvoidtestSendDelayMessage(){
rabbitTemplate.convertAndSend("delay.direct","delay","Hello, DelayQueue!",newDelayMessagePostProcessor(10000));SimpleDateFormat simpleDateFormat =newSimpleDateFormat();
simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS");System.out.println("发送消息成功!发送时间:"+ simpleDateFormat.format(System.currentTimeMillis()));}
版权归原作者 聂 可 以 所有, 如有侵权,请联系我们删除。