0


Spring RabbitMQ那些事(2-两种方式实现延时消息订阅)

目录

一、序言

业务开发中有很多延时操作的场景,比如最常见的

超时订单自动关闭

延时异步处理

,我们常用的实现方式有:

  • 定时任务轮询(有延时)。
  • 借助Redission的延时队列
  • Redis的key过期事件通知机制(需开启key过期事件通知,对Redis有性能损耗)。
  • RocketMQ中定时消息推送(支持的时间间隔固定,不支持自定义)。
  • RabbitMQ中的死信队列和延迟消息交换机

其中用的最多的也是借助

Redisson实现的数据结构延迟队列

RabbitMQ中的死信队列来实现

,今天我们通过RabbitMQ死信队列和延迟消息交换机(新特性)来实现延时消息推送。


二、死信交换机和消息TTL实现延迟消息

1、死信队列介绍

这种方式主要通过结合消息过期和私信交换机来实现延迟消息推送,首先先了解下哪些消息会进入死信队列:

  • 被消费者nack(negatively acknowleged)的消息。
  • TTL过期后未被消费的消息。
  • 超过队列长度限制后被丢弃的消息。

备注:更多信息请参考RabbitMQ中的 Dead Letter Exchange。

2、代码示例

(1) 死信交换机配置

@ConfigurationprotectedstaticclassDeadLetterExchangeConfig{@BeanpublicQueuedeadLetterQueue(){returnQueueBuilder.durable("dead-letter-queue").build();}@BeanpublicDirectExchangedeadLetterExchange(){returnExchangeBuilder.directExchange("dead-letter-exchange").build();}@BeanpublicBindingbindQueueToDeadLetterExchange(Queue deadLetterQueue,DirectExchange deadLetterExchange){returnBindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dead-letter-routing-key");}@BeanpublicQueuenormalQueue(){returnQueueBuilder.durable("normal-queue").deadLetterExchange("dead-letter-exchange").deadLetterRoutingKey("dead-letter-routing-key").build();}}

(2) 消息生产者

@Slf4j@Component@RequiredArgsConstructorpublicclassRabbitMqProducer{privatefinalRabbitTemplate rabbitTemplate;publicvoidsendMsgToDeadLetterExchange(String body,int timeoutInMillSeconds){
        log.info("开始发送消息到dead letter exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds,LocalDateTime.now());MessageProperties messageProperties =MessagePropertiesBuilder.newInstance().setExpiration(String.valueOf(timeoutInMillSeconds)).build();Message message =MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
        rabbitTemplate.send("normal-queue", message);}}

(3) 消息消费者

@Slf4j@ComponentpublicclassRabbitMqConsumer{@RabbitListener(queues ="dead-letter-queue")publicvoidhandleMsgFromDeadLetterQueue(String msg){
        log.info("Message received from dead-letter-queue, message body: {}, current time:{}", msg,LocalDateTime.now());}}

3、测试用例

@RestController@RequiredArgsConstructorpublicclassRabbitMsgController{privatefinalRabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/dead-letter")publicResponseEntity<String>sendMsgToDeadLetterExchange(String body,int timeout){
        rabbitMqProducer.sendMsgToDeadLetterExchange(body, timeout);returnResponseEntity.ok("消息发送到死信交换机成功");}}

浏览器访问

http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000

,可以看到消息被延迟5s处理。

2023-11-2611:50:33.041INFO19152---[nio-8080-exec-7]c.u.r.i.producer.RabbitMqProducer: 开始发送消息到dead letter exchange 消息体:hello, 消息延迟:5000ms, 当前时间:2023-11-26T11:50:33.0412023-11-2611:50:38.054INFO19152---[ntContainer#4-4]c.u.r.i.consumer.RabbitMqConsumer:Message received from dead-letter-queue, message body: hello, current time:2023-11-26T11:50:38.054

三、延迟消息交换机实现延迟消息

上面通过消息TTL和死信交换机实现延迟消息的解决方案是由一个叫

James Carr

的人提出来的,后来RabbitMQ提供了一个开箱即用的解决方案,通过延时消息插件来实现。

该插件以前被当做是试验性产品,但是现在已经可以投产使用了。(PS:2015年4月16号就已经有该插件文档)

Spring AMQP中,同样提供了对该延时消息插件的支持,并且在RabbitMQ 3.6.0版本就已经测试通过。

1、安装延时消息插件

该延时消息插件为社区插件,因此需要自己手动下载安装的RabbMQ版本对应的插件,下载地址:RabbitMQ延时消息插件releases。

我安装的RabbitMQ版本为

3.9.9

,3.9.0版本的插件对所有

3.9.x

版本的RabbitMQ都支持。

在这里插入图片描述
下载完后把

.ez

结尾的插件复制RabbitMQ的插件目录下,插件目录为

/usr/lib/rabbitmq/plugins 

在这里插入图片描述
通过命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

安装该插件,通过命令

rabbitmq-plugins list

查看插件列表,可以看到该延时消息插件已经成功安装。

在这里插入图片描述

2、代码示例

(1) 延时消息交换机配置

@ConfigurationprotectedstaticclassDelayedMsgExchangePluginConfig{@BeanpublicQueuedelayedQueue(){returnQueueBuilder.durable("delayed-queue").build();}@BeanpublicDirectExchangedelayedExchange(){returnExchangeBuilder.directExchange("delayed-exchange").delayed().build();}@BeanpublicBindingbindDelayedQueueToDelayedChange(Queue delayedQueue,DirectExchange delayedExchange){returnBindingBuilder.bind(delayedQueue).to(delayedExchange).with("delayed-routing-key");}}

备注:延时交换机的类型可以为DirectExchage、TopicExcahgeFanoutExchange,这些都支持。

(2) 消息生产者

@Slf4j@Component@RequiredArgsConstructorpublicclassRabbitMqProducer{privatefinalRabbitTemplate rabbitTemplate;publicvoidsendDelayedMsg(String body,int timeoutInMillSeconds){
        log.info("开始发送消息到delayed-exchange 消息体:{}, 消息延迟:{}ms, 当前时间:{}", body, timeoutInMillSeconds,LocalDateTime.now());MessageProperties messageProperties =newMessageProperties();
        messageProperties.setDelay(timeoutInMillSeconds);Message message =MessageBuilder.withBody(body.getBytes(StandardCharsets.UTF_8)).andProperties(messageProperties).build();
        rabbitTemplate.send("delayed-exchange","delayed-routing-key", message);}}

(3) 消息消费者

@Slf4j@ComponentpublicclassRabbitMqConsumer{@RabbitListener(queues ="delayed-queue")publicvoidhandleMsgFromDelayedQueue(String msg){
        log.info("Message received from delayed-queue, message body: {}, current time:{}", msg,LocalDateTime.now());}}

3、测试用例

@RestController@RequiredArgsConstructorpublicclassRabbitMsgController{privatefinalRabbitMqProducer rabbitMqProducer;@RequestMapping("/exchange/delayed")publicResponseEntity<String>sendMsgToHeadersExchange(String body,int timeout){
        rabbitMqProducer.sendDelayedMsg(body, timeout);returnResponseEntity.ok("消息发送到延迟交换机成功");}}

浏览器访问

http://localhost:8080/exchange/dead-letter?body=hello&timeout=5000

,可以看到消息被延迟5s处理。

2023-11-2613:02:07.816INFO26524---[nio-8080-exec-3] c.u.r.i.producer.RabbitMqProducer        : 开始发送消息到delayed-exchange 消息体:Hello,消息延迟:5000ms,当前时间:2023-11-26T13:02:07.8162023-11-2613:02:12.830INFO26524---[ntContainer#5-5] c.u.r.i.consumer.RabbitMqConsumer        : Message received from delayed-queue, message body: Hello, current time:2023-11-26T13:02:12.829

四、两种实现方式优缺点

1、延时消息插件

  • 优点:配置更加简单,少配置1个过期消息接收队列,且语义更明确,容易定位消息出入口。
  • 缺点:延时消息插件对RabbitMQ版本有要求,只有RabbitMQ 3.8.x及以上版本支持。

2、TLL&死信交换机

  • 优点:基本适用于所有RabbitMQ版本。
  • 缺点:配置相对来说复杂一些,还有就是我们最开始提到的,不只是TTL过期的消息才会进入死信队列,还有超出队列限制nack的消息也会进入死信队列,触发的条件没那么纯粹。

在这里插入图片描述


本文转载自: https://blog.csdn.net/lingbomanbu_lyl/article/details/134468974
版权归原作者 凌波漫步& 所有, 如有侵权,请联系我们删除。

“Spring RabbitMQ那些事(2-两种方式实现延时消息订阅)”的评论:

还没有评论