0


如何使用RabbitMq来实现死信队列

  • 首先什么是MQ消息中间件

    1. 全称MessageQueue,主要是⽤于程序和程序直接通信,异步+解耦
    2. 使⽤场景:
    3. 核⼼应⽤
    4. 解耦:订单系统-》物流系统
    5. 异步:⽤户注册-》发送邮件,初始化信息
    6. 削峰:秒杀、⽇志处理
    7. 跨平台 、多语⾔
    8. 分布式事务、最终⼀致性
    9. RPC调⽤上下游对接,数据源变动->通知下属
    10. 总而言之 可以对各种场景进行异步校验,应用广泛可以保持链路的完整性
  • 交换机类型

  • Direct Exchange 定向将⼀个队列绑定到交换机上,要求该消息与⼀个特 定的路由键完全匹配 例⼦:如果⼀个队列绑定到该交换机上要求路由键 “aabb”,则只有被标记为“aabb”的消息才被转发, 不会转发aabb.cc,也不会转发gg.aabb,只会转发 aabb 处理路由健

  • Fanout Exchange ⼴播 只需要简单的将队列绑定到交换机上,⼀个发送到 交换机的消息都会被转发到与该交换机绑定的所有 队列上。很像⼦⽹⼴播,每台⼦⽹内的主机都获得 了⼀份复制的消息 Fanout交换机转发消息是最快的,⽤于发布订阅, ⼴播形式,中⽂是扇形不处理路由健

  • Topic Exchange 通配符 主题交换机是⼀种发布/订阅的模式,结合了直连交 换机与扇形交换机的特点 将路由键和某模式进⾏匹配。此时队列需要绑定要 ⼀个模式上 符号“#”匹配⼀个或多个词,符号“*”匹配不多不少⼀ 个词 例⼦:因此“abc.#”能够匹配到“abc.def.ghi”,但是 “abc.*” 只会匹配到“abc.def”。

  • Headers Exchanges(少⽤) 根据发送的消息内容中的headers属性进⾏匹配, 在 绑定Queue与Exchange时指定⼀组键值对 当消息发送到RabbitMQ时会取到该消息的headers 与Exchange绑定时指定的键值对进⾏匹配; 如果完全匹配则消息会路由到该队列,否则不会路 由到该队列 不处理路由键

  • 什么是TTL time to live 消息存活时间 如果消息在存活时间内未被消费,则会别清除 RabbitMQ⽀持两种ttl设置 单独消息进⾏配置ttl 整个队列进⾏配置ttl(居多)

  • 什么是rabbitmq的死信队列 没有被及时消费的消息存放的队列

  • 什么是rabbitmq的死信交换机 Dead Letter Exchange(死信交换机,缩写:DLX)当 消息成为死信后,会被重新发送到另⼀个交换机,这个 交换机就是DLX死信交换机。

    1. 下面就是实现的大概流程: ![](https://i-blog.csdnimg.cn/direct/03ae7cf2329440178fe78886925715d2.png)

例子:

RabbitMq不自带死信队列,那么我们创建好交换机,延迟队列设置过期时间为15秒,对延迟队列和交换机以及死信队列(普通队列)和交换机进行绑定 ,同时设置交换机为Topic主题交换机。随后消费者通过RabbitMqListener进行监听,同时使用ACK进行标志的确认。

  1. @Configuration
  2. @Data
  3. public class RabbitMqConfig {
  4. /**
  5. * 交换机
  6. */
  7. @Value("${mqconfig.coupon_event_exchange}")
  8. private String eventExchange;
  9. /**
  10. * 第⼀个队列延迟队列,
  11. */
  12. @Value("${mqconfig.coupon_release_delay_queue}")
  13. private String couponReleaseDelayQueue;
  14. /**
  15. * 第⼀个队列的路由key
  16. * 进⼊队列的路由key
  17. */
  18. @Value("${mqconfig.coupon_release_delay_routing_key}")
  19. private String couponReleaseDelayRoutingKey;
  20. /**
  21. * 第⼆个队列,被监听恢复库存的队列
  22. */
  23. @Value("${mqconfig.coupon_release_queue}")
  24. private String couponReleaseQueue;
  25. /**
  26. * 第⼆个队列的路由key
  27. * <p>
  28. * 即进⼊死信队列的路由key
  29. */
  30. @Value("${mqconfig.coupon_release_routing_key}")
  31. private String couponReleaseRoutingKey;
  32. /**
  33. * 过期时间
  34. */
  35. @Value("${mqconfig.ttl}")
  36. private Integer ttl;
  37. /**
  38. * 消息转换器
  39. *
  40. * @return
  41. */
  42. @Bean
  43. public MessageConverter messageConvertor() {
  44. return new Jackson2JsonMessageConverter();
  45. }
  46. /**
  47. * 交换机
  48. *
  49. * @return
  50. */
  51. @Bean
  52. public TopicExchange exchange() {
  53. return new TopicExchange(eventExchange, true, false);
  54. }
  55. /**
  56. * 创建延迟队列
  57. *
  58. * @return
  59. */
  60. @Bean
  61. public Queue couponReleaseDelayQueue() {
  62. Map<String, Object> args = new HashMap<>();
  63. args.put("x-message-ttl", ttl);
  64. args.put("x-dead-letter-exchange", eventExchange);
  65. args.put("x-dead-letter-routing-key", couponReleaseRoutingKey);
  66. return new Queue(couponReleaseDelayQueue, true, false, false, args);
  67. }
  68. /**
  69. * 创建死信队列 普通队列 用于监听
  70. *
  71. * @return
  72. */
  73. @Bean
  74. public Queue couponReleaseQueue() {
  75. return new Queue(couponReleaseQueue, true, false, false);
  76. }
  77. /**
  78. * 绑定延迟队列和交换机
  79. *
  80. * @return
  81. */
  82. @Bean
  83. public Binding couponReleaseDelayBinding() {
  84. return new Binding(couponReleaseDelayQueue, Binding.DestinationType.QUEUE,
  85. eventExchange,couponReleaseDelayRoutingKey,null);
  86. }
  87. /**
  88. * 绑定死信队列队列和交换机
  89. * @return
  90. */
  91. @Bean
  92. public Binding couponReleaseDeadBinding() {
  93. return new Binding(couponReleaseQueue, Binding.DestinationType.QUEUE,
  94. eventExchange, couponReleaseRoutingKey, null);
  95. }
  96. }

新建测试:

  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest(classes = CouponApplication.class)
  3. @Slf4j
  4. public class MQTest {
  5. @Autowired
  6. private RabbitTemplate rabbitTemplate1;
  7. @Test
  8. public void send(){
  9. rabbitTemplate1.convertAndSend("coupon.event.exchange","coupon.release.delay.routing.key","5qeqweqw");
  10. }
  11. }

http://127.0.0.1:15672/#进入RabbitMq后台进行查看

配置文件配置如下:

  1. #消息队列
  2. rabbitmq:
  3. host: 127.0.0.1
  4. port: 5672
  5. virtual-host: /
  6. password: guest
  7. username: guest
  8. #开启⼿动确认消息
  9. listener:
  10. simple:
  11. acknowledge-mode: manual
  12. mqconfig:
  13. #延迟队列,不能被监听消费
  14. coupon_release_delay_queue: coupon.release.delay.queue
  15. #延迟队列的消息过期后转发的队列
  16. coupon_release_queue: coupon.release.queue
  17. #交换机
  18. coupon_event_exchange: coupon.event.exchange
  19. #进⼊延迟队列的路由key
  20. coupon_release_delay_routing_key: coupon.release.delay.routing.key
  21. #消息过期,进⼊释放死信队列的key
  22. coupon_release_routing_key: coupon.release.routing.key
  23. #消息过期时间,毫秒,测试改为15秒
  24. ttl: 15000

监听者进行监听,同时使用ACK进行标志的确认。代码如下:

  1. @Slf4j
  2. @Component
  3. @RabbitListener(queues = "${mqconfig.coupon_release_queue}")
  4. public class CouponMQListener {
  5. @Autowired
  6. private CouponRecordService couponRecordService;
  7. @Autowired
  8. private RedissonClient redissonClient;
  9. @RabbitHandler
  10. public void ReleaseCouponRecord(CouponRecordMessage recordMessage, Message message, Channel channel) throws IOException {
  11. log.info("收到传递的消息:{}",recordMessage);
  12. long tag = message.getMessageProperties().getDeliveryTag();
  13. boolean flag = couponRecordService.ReleaseCouponRecord(recordMessage);
  14. // RLock lock = redissonClient.getLock("coupon:lock:release:" + recordMessage.getTaskId());
  15. // lock.lock();
  16. try{
  17. if(flag){
  18. channel.basicAck(tag,false);
  19. log.info("释放优惠券成功:{}",recordMessage);
  20. }else {
  21. log.error("释放优惠券失败:{}",recordMessage);
  22. channel.basicReject(tag,true);
  23. }
  24. } catch (Exception e) {
  25. log.error("释放优惠券异常:{}",e.getMessage());
  26. channel.basicReject(tag,true);
  27. }
  28. // finally {
  29. // lock.unlock();
  30. // }
  31. }

tag后面的参数是是否重入队列。可以通过下面的网页使用配置的账号和密码进行查看

http://127.0.0.1:15672/#/

标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/qq_62627234/article/details/143979753
版权归原作者 努力的程序猿2 所有, 如有侵权,请联系我们删除。

“如何使用RabbitMq来实现死信队列”的评论:

还没有评论