0


RabbitMQ实现延迟队列

文章目录

RabbitMQ 实现延迟队列的方式主要有两种:
死信交换机、延迟队列插件
延迟队列的使用场景包括:

① 延迟发送短信
② 用户下单,如果用户在15分钟内未支付,则自动取消
③ 预约工作会议,20分钟后自动通知所有参会人员

死信交换机

基本概念

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter)

① 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
② 消息是一个过期消息,超时无人消费
③ 要投递的队列消息堆积满了,最早的消息可能成为死信

如果该队列配置了 dead-letter-exchange 属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)

死信交换机和 RepublishMessageRecoverer 的区别是,死信交换机的消息是通过队列传递给交换机的,而 RepublishMessageRecoverer 是通过消费者传递给交换机的

给队列绑定死信交换机的方式:

1.给队列设置

dead-letter-exchange

属性,指定一个交换机

2.给队列设置

dead-letter-routing-key

属性,设置死信交换机与死信队列的 RoutingKey

TTL:

TTL,也就是 Time-To-Live。如果一个队列中的消息 TTL 结束仍未消费,则会变为死信,TTL 超时分为两种情况:

① 消息所在的队列设置了存活时间
② 消息本身设置了存活时间

若设置了 TTL 和死信交换机,就可以让一个消费者监听死信队列,进而实现消息的延迟投递

如果队列和消息都设置了TTL,那么以较小的值为准

代码实现

TTL

下面实现上图的延迟消息:

1.在 consumer 声明死信交换机和队列,编写监听方法

@ComponentpublicclassDelayListener{@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="dl.queue"),
            exchange =@Exchange(name ="dl.direct"),
            key ="dl"))publicvoidlistenDlQueue(String msg){System.out.println(LocalDateTime.now()+":收到 dl.queue 的延迟消息"+ msg);}}

2.在 consumer 声明延迟队列和对应的交换机,在声明队列时配置

ttl

dead-letter-exchange

dead-letter-routing-key

属性

@ConfigurationpublicclassDelayConfig{@BeanpublicDirectExchangettlExchange(){returnnewDirectExchange("ttl.direct");}@BeanpublicQueuettlQueue(){returnQueueBuilder.durable("ttl.queue").ttl(10000)// 10s.deadLetterExchange("dl.direct").deadLetterRoutingKey("dl").build();}@BeanpublicBindingttlBinding(Queue ttlQueue,DirectExchange ttlExchange){returnBindingBuilder.bind(ttlQueue).to(ttlExchange).with("ttl");}}

3.在 publisher 编写测试类方法,发送消息,并设置超时时间

@SpringBootTestpublicclassDelayTests{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSendTTLMessage(){Message message =MessageBuilder.withBody("ttl message".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration("5000")// 5s.build();
        rabbitTemplate.convertAndSend("ttl.queue","ttl", message);System.out.println(LocalDateTime.now()+":消息发送完毕");}}

测试结果:

image-20240707114805601

image-20240707114835297

image-20240707114846536

可以看到,实际上消息延迟了 5s,原因是如果队列和消息都设置了延迟时间,那么实际的延迟时间是二者的较小值

延迟队列插件

RabbitMQ 的官方也推出了一个延迟队列插件 DelayExchange,需要先去安装这一插件才可以使用。这里对插件的安装不做介绍,主要介绍 SpringAMQP 如何使用延迟队列插件

DelayExchange 的原理是将消息暂存到延迟交换机中,延迟一定时间再发到队列。其本质还是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定 **

delayed

** 属性为 true 即可

1.声明延迟交换机,设置 **

delayed

** 属性为true

设置的方式有两种:

(1)基于注解方式

@RabbitListener(bindings =@QueueBinding(
        value =@Queue(name ="delay.queue"),
        exchange =@Exchange(name ="delay.direct", delayed ="true"),
        key ="delay"))publicvoidlistenDelayQueue(String msg){System.out.println(LocalDateTime.now()+":收到 delay.queue 的延迟消息"+ msg);}

(2)基于配置类的方式

@ConfigurationpublicclassDelayConfig{@BeanpublicDirectExchangedelayExchange(){returnExchangeBuilder.directExchange("delay.direct").delayed().durable(true).build();}@BeanpublicQueuedelayQueue(){returnnewQueue("delay.queue");}@BeanpublicBindingdelayBinding(Queue delayQueue,DirectExchange delayExchange){returnBindingBuilder.bind(delayQueue).to(delayExchange).with("delay");}}

3.在 publisher 中编写测试类方法来发送延迟消息,需要注意的是,这里一定要给消息添加一个 header:**

x-delay

**,值为延迟的时间,单位为毫秒

@SpringBootTestpublicclassDelayTests{@AutowiredprivateRabbitTemplate rabbitTemplate;@TestpublicvoidtestSendDelayMessage(){Message message =MessageBuilder.withBody("delay message".getBytes(StandardCharsets.UTF_8)).setHeader("x-delay",5000)// 5s.build();CorrelationData correlationData =newCorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend("delay.direct","delay", message);System.out.println(LocalDateTime.now()+":消息发送完毕");}}

测试结果:

image-20240707143857235

image-20240707143807154

image-20240707143837790

可以看到,消息延迟了 5s 后消费者才收到。但需要注意的是,如果配置了 ReturnCallback,这里会报错,原因是消息没有立即路由到队列,因此报错 NO_ROUTE(消息没有到队列),但实际上消息只是延迟发送了而已

所以需要修改 ReturnCallback 的逻辑,否则一旦 ReturnCallback 中设置了消息重发,那么此时的延迟消息每次都会被重发

@Configuration@Slf4jpublicclassRabbitConfigimplementsApplicationContextAware{@OverridepublicvoidsetApplicationContext(ApplicationContext applicationContext)throwsBeansException{RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class);// 配置 ReturnCallback
        rabbitTemplate.setReturnsCallback((returnedMessage ->{if(returnedMessage.getMessage().getMessageProperties().getReceivedDelay()>0){// 如果为延迟消息,则直接跳过return;}
            log.error("ReturnCallback");
            log.error("消息路由到队列失败");
            log.error("响应码:{}", returnedMessage.getReplyCode());
            log.error("失败原因;{}", returnedMessage.getReplyText());
            log.error("交换机;{}", returnedMessage.getExchange());
            log.error("路由Key:{}", returnedMessage.getRoutingKey());
            log.error("消息:{}", returnedMessage.getMessage());// 这里省略消息重发的逻辑}));}}

总结

RabbitMQ 实现延迟队列的方式主要有两种:死信交换机、延迟队列插件

死信交换机的实现步骤是:

① 声明死信交换机并编写监听方法
② 声明延迟队列和对应的交换机,在声明队列时配置

ttl

dead-letter-exchange

dead-letter-routing-key

属性
③ 发送消息时,设置超时时间(实际上延迟队列的

ttl

和消息的超时时间至少有一个设置了就行)

延迟队列插件的实现步骤是:

① 声明一个交换机,设置 **

delayed

** 属性为true
② 发送消息时,添加 **

x-delay

** 头,值为超时时间


本文转载自: https://blog.csdn.net/Vendetta_A_A/article/details/140246571
版权归原作者 小璐乱撞xllz 所有, 如有侵权,请联系我们删除。

“RabbitMQ实现延迟队列”的评论:

还没有评论