0


RabbitMQ实现死信队列

😊 @ 作者: 一恍过去

💖 @ 主页: https://blog.csdn.net/zhuocailing3390

🎊 @ 社区: Java技术栈交流

🎉 @ 主题: RabbitMQ实现死信队列

⏱️ @ 创作时间: 2023年07月19日

在这里插入图片描述

目录

目录

1、概述

概述:
producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,所有的死信就组成了死信队列。

应用场景: 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。

产生原因:

  • 消息 TTL 过期(单位)
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.

死信队列框架图:
在这里插入图片描述

2、代码演示

步骤:

  • 生产者: 正常发送消息;
  • 消费者(消费死信队列消息): 正常声明交换机、队列,为了让消息进入死信队列,需要将正常交换机与死信交换机进行绑定,这样消息在过期、拒接接收、 队列在达到最大长度时,消息就可以进入到死信队列;核心代码:arguments.put(" x-dead-letter-exchange","dlx.exchange");
  • 消费者(消费生产者消息): 声明死信交换机、队列,从死信队列中获取数据;

代码参考: https://gitee.com/lhzlx/rabbit-simple-demo.git

1、生产者

代码路径:

lhz.dlx

类:

Producer
publicclassProducer{/**
     * 设置队列名称
     */privatestaticfinalStringNORMAL_ROUTING_KEY="normal";privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";publicstaticvoidmain(String[] args)throwsException{// 获取ChannelChannel channel =RabbitMqUtils.getChannel();

        channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);AMQP.BasicProperties properties =null;for(int i =1; i <11; i++){String message ="info"+ i;
            channel.basicPublish(NORMAL_EXCHANGE,NORMAL_ROUTING_KEY, properties, message.getBytes());}System.out.println("消息发送完毕");}}

2、消费者(消费死信队列消息)

为了让进入到

死信队列

中的消息,不会一直累计,通过一个特殊的

消费者

,将

死信队列

中的消息进行消费;

代码路径:

lhz.dlx

类:

Consumer02
publicclassConsumer02{/**
     * 设置队列名称
     */privatestaticfinalStringROUTING_KEY="dlx";privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";publicstaticvoidmain(String[] args)throwsException{// 获取ChannelChannel channel =RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明死信队列String deadQueue ="dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        channel.queueBind(deadQueue,DEAD_EXCHANGE,ROUTING_KEY);System.out.println("Consumer02等待接收死信队列消息.....");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println("Consumer02接收死信队列的消息"+ message);};
        channel.basicConsume(deadQueue,true, deliverCallback, consumerTag ->{});}}

3、消费者(消费生产者消息)

3.1、队列达到最大长度

步骤:

  • 删除之前模拟TTL的normal-queue、dead-queue;
  • Consumer01,设置max-length参数,启动之后关闭该消费者,模拟其接收不到消息;
  • 正常启动生产者发送消息;
  • 消息进入死信队列后,启动 Consumer02 ,它消费死信队列里面的消息;
  • 预期结果:在这里插入图片描述

生产者代码:

1、生产者

中代码一致;

消费者代码:

代码路径:

lhz.dlx

类:

Consumer01
publicclassConsumer01{// 路由privatestaticfinalStringROUTING_KEY="dlx";privatestaticfinalStringNORMAL_ROUTING_KEY="normal";//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";publicstaticvoidmain(String[] args)throwsException{// 获取ChannelChannel channel =RabbitMqUtils.getChannel();// 声明死信和普通交换机   类型为  direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明死信队列String deadQueue ="dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);// 死信队列绑定死信交换机与  routingkey
        channel.queueBind(deadQueue,DEAD_EXCHANGE,ROUTING_KEY);//正常队列绑定死信队列信息Map<String,Object> params =newHashMap<>();//正常队列设置死信交换机   参数  key是固定值
        params.put("x-dead-letter-exchange",DEAD_EXCHANGE);//正常队列设置死信  routing-key 参数  key是固定值
        params.put("x-dead-letter-routing-key",ROUTING_KEY);// TODO 如果是演示`队列达到最大长度`的情况,需要设置最大队列数量
        params.put("x-max-length",6);// 声明正常队列String normalQueue ="normal-queue";
        channel.queueDeclare(normalQueue,false,false,false, params);
        channel.queueBind(normalQueue,NORMAL_EXCHANGE,NORMAL_ROUTING_KEY);System.out.println("Consumer01等待接收消息.....");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println("Consumer01接收到消息"+ message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};
        channel.basicConsume(normalQueue,false, deliverCallback, consumerTag ->{});}}

3.2、消息被拒

步骤:

  • 删除之前模拟时产生的normal-queue、dead-queue;
  • Consumer01,设置为消息手动确认并且拒绝接收某条消息,然后启动(不进行关闭);
  • 当被拒绝的消息进入死信队列后,启动 Consumer02 它消费死信队列里面的消息;
  • 预期结果:在这里插入图片描述生产者代码:

1、生产者

中代码一致;

消费者代码:

代码路径:

lhz.dlx

类:

Consumer01
publicclassConsumer01{// 路由privatestaticfinalStringROUTING_KEY="dlx";privatestaticfinalStringNORMAL_ROUTING_KEY="normal";//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";publicstaticvoidmain(String[] args)throwsException{// 获取ChannelChannel channel =RabbitMqUtils.getChannel();// 声明死信和普通交换机   类型为  direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明死信队列String deadQueue ="dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);// 死信队列绑定死信交换机与  routingkey
        channel.queueBind(deadQueue,DEAD_EXCHANGE,ROUTING_KEY);//正常队列绑定死信队列信息Map<String,Object> params =newHashMap<>();//正常队列设置死信交换机   参数  key是固定值
        params.put("x-dead-letter-exchange",DEAD_EXCHANGE);//正常队列设置死信  routing-key 参数  key是固定值
        params.put("x-dead-letter-routing-key",ROUTING_KEY);// 声明正常队列String normalQueue ="normal-queue";
        channel.queueDeclare(normalQueue,false,false,false, params);
        channel.queueBind(normalQueue,NORMAL_EXCHANGE,NORMAL_ROUTING_KEY);System.out.println("Consumer01等待接收消息.....");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);// TODO 模拟拒绝接收进入死信队列if(message.equals("info5")){System.out.println("Consumer01接收到消息"+ message +"并拒绝签收该消息");//requeue设置为  false 代表拒绝重新入队   该队列如果配置了死信交换机将发送到死信队列中
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);}else{System.out.println("Consumer01接收到消息"+ message);
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}};
        channel.basicConsume(normalQueue,false, deliverCallback, consumerTag ->{});}}

3.3、消息 TTL 过期

步骤:

  • 删除之前模拟时产生的normal-queue、dead-queue;
  • Consumer01 启动之后关闭该消费者 模拟其接收不到消息;
  • 启动生产者发送消息(设置了10STTL过期时间);
  • 观察rabbitmq控制台,消息进入死信队列后,启动 Consumer02 消费死信队列里面的消息;
  • 预期结果:在这里插入图片描述

生产者代码:

// 在《1、生产者》基础上加上以下代码:
properties =newAMQP.BasicProperties().builder().expiration("10000").build();

在这里插入图片描述

消费者代码:

代码路径:

lhz.dlx

类:

Producer
publicclassConsumer01{// 路由privatestaticfinalStringROUTING_KEY="dlx";privatestaticfinalStringNORMAL_ROUTING_KEY="normal";//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";publicstaticvoidmain(String[] args)throwsException{// 获取ChannelChannel channel =RabbitMqUtils.getChannel();// 声明死信和普通交换机   类型为  direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明死信队列String deadQueue ="dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);// 死信队列绑定死信交换机与  routingkey
        channel.queueBind(deadQueue,DEAD_EXCHANGE,ROUTING_KEY);//正常队列绑定死信队列信息Map<String,Object> params =newHashMap<>();//正常队列设置死信交换机   参数  key是固定值
        params.put("x-dead-letter-exchange",DEAD_EXCHANGE);//正常队列设置死信  routing-key 参数  key是固定值
        params.put("x-dead-letter-routing-key",ROUTING_KEY);// 声明正常队列String normalQueue ="normal-queue";
        channel.queueDeclare(normalQueue,false,false,false, params);
        channel.queueBind(normalQueue,NORMAL_EXCHANGE,NORMAL_ROUTING_KEY);System.out.println("Consumer01等待接收消息.....");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println("Consumer01接收到消息"+ message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};
        channel.basicConsume(normalQueue,false, deliverCallback, consumerTag ->{});}}

4、源码地址

代码参考: https://gitee.com/lhzlx/rabbit-simple-demo.git

在这里插入图片描述

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/zhuocailing3390/article/details/131776619
版权归原作者 一恍过去 所有, 如有侵权,请联系我们删除。

“RabbitMQ实现死信队列”的评论:

还没有评论