0


RabbitMQ延迟队列


💌 介绍

顾名思义:首先它要具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。

💒 使用场景

  • 预支付订单创建成功后,30分钟后还没有支付,自动取消订单,修改订单状态
  • 用户注册成功后,如果3天没有登录则进行短信提醒
  • 优惠券过期前发送短信进行提醒
  • ....

以上场景都可以用延时队列来完成


🏳‍🌈 模拟案例

需求:生产者发布消息,10秒、60秒后消费者拿到消息进行消费

📕 准备工作

导入RabbitMQ依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

配置RabbitMQ连接相关信息

  1. #MySQL
  2. spring:
  3. rabbitmq:
  4. host: 127.0.0.1
  5. port: 5672
  6. username: xxxx
  7. password: xxx
  8. server:
  9. port: 8087

🏴 写法一(死信队列TTL)

生产者生产消息——>到交换机分发给对应的队列(A10秒过期,B60秒过期)——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)

RabbitMQ配置文件

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.util.HashMap;
  6. /**
  7. * @author 小影
  8. * @create: 2022/8/18 10:26
  9. * @describe:mq配置 如示例图配置:2交换机、4队列、4路由key
  10. */
  11. @Configuration
  12. public class RabbitMQConfiguration {
  13. // 延迟交换机
  14. public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
  15. // 延迟队列
  16. public static final String DELAY_QUEUE_NAME_A = "delay.queue.a";
  17. public static final String DELAY_QUEUE_NAME_B = "delay.queue.b";
  18. // 延迟队列路由key
  19. public static final String DELAY_QUEUE_ROUTING_KEY_A = "delay.routingKey.a";
  20. public static final String DELAY_QUEUE_ROUTING_KEY_B = "delay.routingKey.b";
  21. // 死信交换机
  22. public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
  23. // 死信队列
  24. public static final String DEAD_LETTER_QUEUE_NAME_A = "dead.letter.queue.a";
  25. public static final String DEAD_LETTER_QUEUE_NAME_B = "dead.letter.queue.b";
  26. // 私信队列路由key
  27. public static final String DEAD_LETTER_ROUTING_KEY_A = "dead.letter.delay_10s.routingkey.a";
  28. public static final String DEAD_LETTER_ROUTING_KEY_B = "dead.letter.delay_60s.routingkey.b";
  29. // 声明延迟交换机
  30. @Bean("delayExchange")
  31. public DirectExchange delayExchange() {
  32. return new DirectExchange(DELAY_EXCHANGE_NAME);
  33. }
  34. // 声明死信交换机
  35. @Bean("deadLetterExchange")
  36. public DirectExchange deadLetterExchange() {
  37. return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
  38. }
  39. // 声明延迟队列A,延迟10s,并且绑定到对应的死信交换机
  40. @Bean("delayQueueA")
  41. public Queue delayQueueA() {
  42. HashMap<String, Object> args = new HashMap<>();
  43. // 声明队列绑定的死信交换机
  44. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
  45. // 声明队列的属性路由key
  46. args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_A);
  47. // 声明队列的消息TTL存活时间
  48. args.put("x-message-ttl", 10000);
  49. return QueueBuilder.durable(DELAY_QUEUE_NAME_A).withArguments(args).build();
  50. }
  51. // 声明延迟队列B,延迟60s,并且绑定到对应的死信交换机
  52. @Bean("delayQueueB")
  53. public Queue delayQueueB() {
  54. HashMap<String, Object> args = new HashMap<>();
  55. // 声明队列绑定的死信交换机
  56. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
  57. // 声明队列的属性路由key
  58. args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_B);
  59. // 声明队列的消息TTL存活时间
  60. args.put("x-message-ttl", 60000);
  61. return QueueBuilder.durable(DELAY_QUEUE_NAME_B).withArguments(args).build();
  62. }
  63. // 声明死信队列A,用于接收延迟10S的消息
  64. @Bean("deadLetterQueueA")
  65. public Queue deadLetterQueueA() {
  66. return new Queue(DEAD_LETTER_QUEUE_NAME_A);
  67. }
  68. // 声明死信队列B,用于接收延迟60S的消息
  69. @Bean("deadLetterQueueB")
  70. public Queue deadLetterQueueB() {
  71. return new Queue(DEAD_LETTER_QUEUE_NAME_B);
  72. }
  73. // 设置延迟队列A的绑定关系
  74. @Bean
  75. public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
  76. @Qualifier("delayExchange") DirectExchange exchange) {
  77. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_A);
  78. }
  79. // 设置延迟队列B的绑定关系
  80. @Bean
  81. public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
  82. @Qualifier("delayExchange") DirectExchange exchange) {
  83. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY_B);
  84. }
  85. // 设置死信队列A的绑定关系
  86. @Bean
  87. public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
  88. @Qualifier("deadLetterExchange") DirectExchange exchange) {
  89. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_A);
  90. }
  91. // 设置死信队列B的绑定关系
  92. @Bean
  93. public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
  94. @Qualifier("deadLetterExchange") DirectExchange exchange) {
  95. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY_B);
  96. }
  97. }

此配置文件的代码关系图如下

生产者

  1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
  2. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_A;
  3. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY_B;
  4. /**
  5. * @author 小影
  6. * @create: 2022/8/18 11:13
  7. * @describe:延迟消息生产者
  8. */
  9. @Component
  10. public class DelayMessageProducer {
  11. @Resource
  12. private RabbitTemplate rabbitTemplate;
  13. public void send(String message,int type) {
  14. switch (type){
  15. case 1: // 10s的消息
  16. // param:队列名称、路由key、消息
  17. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_A, message);
  18. break;
  19. case 2:// 60s的消息
  20. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY_B, message);
  21. break;
  22. }
  23. }
  24. }

消费者

  1. import com.rabbitmq.client.Channel;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.time.LocalDateTime;
  7. import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_A;
  8. import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME_B;
  9. /**
  10. * @author 小影
  11. * @create: 2022/8/18 11:19
  12. * @describe:死信消费者
  13. */
  14. @Slf4j
  15. @Component
  16. public class DeadLetterQueueConsumer {
  17. /**
  18. * 监听私信队列A
  19. * @param message
  20. * @param channel 作手动回执、确认
  21. */
  22. @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_A)
  23. public void receiveA(Message message, Channel channel) {
  24. String msg = new String(message.getBody());
  25. log.info("当前时间:{},死信队列A收到消息:{}", LocalDateTime.now(),msg);
  26. }
  27. /**
  28. * 监听私信队列B
  29. * @param message
  30. * @param channel 作手动回执、确认
  31. */
  32. @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME_B)
  33. public void receiveB(Message message, Channel channel) {
  34. String msg = new String(message.getBody());
  35. log.info("当前时间:{},死信队列B收到消息:{}", LocalDateTime.now(),msg);
  36. }
  37. }

测试

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("rabbitmq")
  4. public class RabbitMqController {
  5. @Resource
  6. private DelayMessageProducer producer;
  7. @GetMapping("send")
  8. public void send(String message, Integer type) {
  9. log.info("当前时间:{},消息:{},延迟类型:{}", LocalDateTime.now(), message, Objects.requireNonNull(type));
  10. producer.send(message, type);
  11. }
  12. }

分别请求

http://localhost:8089/rabbitmq/send?message=我是10秒&type=1

http://localhost:8089/rabbitmq/send?message=我是60秒&type=2

如果出现异常:Channel shutdown: channel error; protocol method:#method(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'type' for exchange 'delay.exchange' in vhost '/': received ''x-delayed-message'' but current is 'direct', class-id=40, method-id=10

可能是mq已经存在交换机了先去删掉

弊端:后期要扩展其他不同延时的时间,就需要增加延时的配置,非常麻烦


🏴 写法二 (死信队列TTL)

生产者生产消息(并设置过期时间)——>到交换机分发给延迟队列——>过期后到死信交换机——>消费者进行消费(执行顺序如下图)

RabbitMQ配置文件

  1. import org.springframework.amqp.core.*;
  2. import org.springframework.beans.factory.annotation.Qualifier;
  3. import org.springframework.context.annotation.Bean;
  4. import org.springframework.context.annotation.Configuration;
  5. import java.util.HashMap;
  6. /**
  7. * @author 小影
  8. * @create: 2022/8/18 10:26
  9. * @describe:mq配置 如示例图配置:2交换机、2队列、2路由key
  10. */
  11. @Configuration
  12. public class RabbitMQConfiguration {
  13. // 延迟交换机
  14. public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
  15. // 延迟队列
  16. public static final String DELAY_QUEUE_NAME = "delay.queue";
  17. // 延迟队列路由key
  18. public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";
  19. // 死信交换机
  20. public static final String DEAD_LETTER_EXCHANGE_NAME = "dead.letter.exchange";
  21. // 死信队列
  22. public static final String DEAD_LETTER_QUEUE_NAME = "dead.letter.queue";
  23. // 私信队列路由key
  24. public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter.routingkey";
  25. // 声明延迟交换机
  26. @Bean("delayExchange")
  27. public DirectExchange delayExchange() {
  28. return new DirectExchange(DELAY_EXCHANGE_NAME);
  29. }
  30. // 声明死信交换机
  31. @Bean("deadLetterExchange")
  32. public DirectExchange deadLetterExchange() {
  33. return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
  34. }
  35. // 声明延迟队列,不设置存活时间,并且绑定到对应的死信交换机
  36. @Bean("delayQueue")
  37. public Queue delayQueue() {
  38. HashMap<String, Object> args = new HashMap<>();
  39. // 声明队列绑定的死信交换机
  40. args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME);
  41. // 声明队列的属性路由key
  42. args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
  43. return QueueBuilder.durable(DELAY_QUEUE_NAME).withArguments(args).build();
  44. }
  45. // 声明死信队列
  46. @Bean("deadLetterQueue")
  47. public Queue deadLetterQueue() {
  48. return new Queue(DEAD_LETTER_QUEUE_NAME);
  49. }
  50. // 设置延迟队列的绑定关系
  51. @Bean
  52. public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
  53. @Qualifier("delayExchange") DirectExchange exchange) {
  54. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY);
  55. }
  56. // 设置死信队列的绑定关系
  57. @Bean
  58. public Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue,
  59. @Qualifier("deadLetterExchange") DirectExchange exchange) {
  60. return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_ROUTING_KEY);
  61. }
  62. }

生产者

  1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
  2. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;
  3. /**
  4. * @author 小影
  5. * @create: 2022/8/18 11:13
  6. * @describe:延迟消息生产者
  7. */
  8. @Component
  9. public class DelayMessageProducer {
  10. @Resource
  11. private RabbitTemplate rabbitTemplate;
  12. /**
  13. *
  14. * @param message 消息
  15. * @param delayTime 存活时间
  16. */
  17. public void send(String message,String delayTime) {
  18. // param:延迟交换机,路由KEY,存活时间
  19. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
  20. msg.getMessageProperties().setExpiration(delayTime);
  21. return msg;
  22. });
  23. }
  24. }

消费者

  1. import com.rabbitmq.client.Channel;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.core.Message;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. import java.time.LocalDateTime;
  7. import static com.ying.demo.config.RabbitMQConfiguration.DEAD_LETTER_QUEUE_NAME;
  8. /**
  9. * @author 小影
  10. * @create: 2022/8/18 11:19
  11. * @describe:死信消费者
  12. */
  13. @Slf4j
  14. @Component
  15. public class DeadLetterQueueConsumer {
  16. /**
  17. * 监听私信队列A
  18. * @param message
  19. * @param channel 作手动回执、确认
  20. */
  21. @RabbitListener(queues = DEAD_LETTER_QUEUE_NAME)
  22. public void receiveA(Message message, Channel channel) {
  23. String msg = new String(message.getBody());
  24. log.info("当前时间:{},死信队列收到消息:{}", LocalDateTime.now(),msg);
  25. }
  26. }

测试

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("rabbitmq")
  4. public class RabbitMqController {
  5. @Resource
  6. private DelayMessageProducer producer;
  7. @GetMapping("send")
  8. public void send(String message, String delayTime) {
  9. log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);
  10. producer.send(message, delayTime);
  11. }
  12. }

分别请求

http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000

http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000

弊端:由于是先进先出的,如果60秒进去了,10秒在进去,10秒结束了,他要等60秒结束,60秒出来10秒才能出来


🚩 写法三 (插件版本-推荐)

安装插件后会生成新的Exchange类型 x-delayed-message ,该类型消息支持延迟投递机制,接收消息后并未立即将消息投递至目标队列,而是存储在mnesia(一个分布式数据库)中,随后检测消息延迟时间,如达到投递时间讲其通过 x-delayed-type 类型标记的交换机投至目标队列。

插件安装

1.进入mq官网社区插件:Community Plugins — RabbitMQ

2.找到rabbitmq_delayed_message_exchange

选择对应版本的ez文件下载

Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub

注:我的MQ是通过yum安装的

1.在系统中查看安装的rabbitmq

  1. rpm -qa |grep rabbitmq

2.查询mq的安装的相关文件目录

  1. rpm -ql rabbitmq-server-3.10.7-1.el8.noarch

翻到最下面发现mnesia的安装目录; mnesia=分布式数据库,看看就好

然后把我们下载的ez安装包解压放到 /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.7/plugins 里面

3.重启RabbitMQ服务

  1. systemctl restart rabbitmq-server.service

4.重启插件

  1. rabbitmq-plugins enable rabbitmq_delayed_message_exchange


RabbitMQ配置文件

  1. /**
  2. * @author 小影
  3. * @create: 2022/8/18 10:26
  4. * @describe:mq配置 如示例图配置:1交换机、1队列、1路由key
  5. */
  6. @Configuration
  7. public class RabbitMQConfiguration {
  8. // 延迟交换机
  9. public static final String DELAY_EXCHANGE_NAME = "delay.exchange";
  10. // 延迟队列
  11. public static final String DELAY_QUEUE_NAME = "delay.queue";
  12. // 延迟队列路由key
  13. public static final String DELAY_QUEUE_ROUTING_KEY = "delay.routingKey";
  14. // 声明延迟交换机
  15. @Bean("delayExchange")
  16. public CustomExchange delayExchange() {
  17. HashMap<String, Object> args = new HashMap<>();
  18. args.put("x-delayed-type", "direct");
  19. return new CustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);
  20. }
  21. // 声明延迟队列
  22. @Bean("delayQueue")
  23. public Queue delayQueue() {
  24. return new Queue(DELAY_QUEUE_NAME);
  25. }
  26. // 设置延迟队列的绑定关系
  27. @Bean
  28. public Binding delayBinding(@Qualifier("delayQueue") Queue queue,
  29. @Qualifier("delayExchange") CustomExchange exchange) {
  30. return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY).noargs();
  31. }
  32. }

生产者

  1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_EXCHANGE_NAME;
  2. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_ROUTING_KEY;
  3. /**
  4. * @author 小影
  5. * @create: 2022/8/18 11:13
  6. * @describe:延迟消息生产者
  7. */
  8. @Component
  9. public class DelayMessageProducer {
  10. @Resource
  11. private RabbitTemplate rabbitTemplate;
  12. /**
  13. *
  14. * @param message 消息
  15. * @param delayTime 存活时间
  16. */
  17. public void send(String message,Integer delayTime) {
  18. // param:延迟交换机,路由KEY,存活时间
  19. rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUE_ROUTING_KEY, message, msg -> {
  20. msg.getMessageProperties().setDelay(delayTime);
  21. return msg;
  22. });
  23. }
  24. }

消费者

  1. import static com.ying.demo.config.RabbitMQConfiguration.DELAY_QUEUE_NAME;
  2. /**
  3. * @author 小影
  4. * @create: 2022/8/18 11:19
  5. * @describe:消费者
  6. */
  7. @Slf4j
  8. @Component
  9. public class DeadLetterQueueConsumer {
  10. /*
  11. * 监听私信队列
  12. * @param message
  13. * @param channel 作手动回执、确认
  14. */
  15. @RabbitListener(queues = DELAY_QUEUE_NAME)
  16. public void receiveA(Message message, Channel channel) {
  17. String msg = new String(message.getBody());
  18. log.info("当前时间:{},延迟队列收到消息:{}", LocalDateTime.now(),msg);
  19. }
  20. }

测试

  1. @Slf4j
  2. @RestController
  3. @RequestMapping("rabbitmq")
  4. public class RabbitMqController {
  5. @Resource
  6. private DelayMessageProducer producer;
  7. @GetMapping("send")
  8. public void send(String message, Integer delayTime) {
  9. log.info("当前时间:{},消息:{},存活时间:{}", LocalDateTime.now(), message, delayTime);
  10. producer.send(message, delayTime);
  11. }
  12. }

启动项目查看rabbitmq的可视化界面

如下图此时生成的交换机是x-delayed-message类型的

分别发送:

http://localhost:8089/rabbitmq/send?message=我是60秒&delayTime=60000

http://localhost:8089/rabbitmq/send?message=我是10秒&delayTime=10000

结局并不是60秒先被消费,完成了我们的意愿。

原理:

  1. 交换机里面有个数据库,生产者生产信息把这个信息放入数据库中
  2. 交换机里面的插件就会一直监听这个时间
  3. 时间到了把对应数据取出来,放入队列,让消费者进行消费

👍 延迟队列方法推荐

这是小编在开发学习使用和总结, 这中间或许也存在着不足,希望可以得到大家的理解和建议。如有侵权联系小编!

标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/weixin_46522803/article/details/126521534
版权归原作者 小影~ 所有, 如有侵权,请联系我们删除。

“RabbitMQ延迟队列”的评论:

还没有评论