😊 @ 作者: 一恍过去
💖 @ 主页: https://blog.csdn.net/zhuocailing3390
🎊 @ 社区: Java技术栈交流
🎉 @ 主题: RabbitMQ实现死信队列
⏱️ @ 创作时间: 2023年07月19日
目录
目录
1、概述
概述:
producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,所有的死信就组成了死信队列。
应用场景: 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。
产生原因:
- 消息 TTL 过期(单位)
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
死信队列框架图:
2、代码演示
步骤:
- 生产者: 正常发送消息;
- 消费者(消费死信队列消息): 正常声明交换机、队列,为了让消息进入死信队列,需要将正常交换机与死信交换机进行绑定,这样消息在过期、拒接接收、 队列在达到最大长度时,消息就可以进入到死信队列;核心代码:
arguments.put(" x-dead-letter-exchange","dlx.exchange");
- 消费者(消费生产者消息): 声明死信交换机、队列,从死信队列中获取数据;
代码参考: https://gitee.com/lhzlx/rabbit-simple-demo.git
1、生产者
代码路径:
lhz.dlx
类:
Producer
publicclassProducer{/**
* 设置队列名称
*/privatestaticfinalStringNORMAL_ROUTING_KEY="normal";privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";publicstaticvoidmain(String[] args)throwsException{// 获取ChannelChannel channel =RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);AMQP.BasicProperties properties =null;for(int i =1; i <11; i++){String message ="info"+ i;
channel.basicPublish(NORMAL_EXCHANGE,NORMAL_ROUTING_KEY, properties, message.getBytes());}System.out.println("消息发送完毕");}}
2、消费者(消费死信队列消息)
为了让进入到
死信队列
中的消息,不会一直累计,通过一个特殊的
消费者
,将
死信队列
中的消息进行消费;
代码路径:
lhz.dlx
类:
Consumer02
publicclassConsumer02{/**
* 设置队列名称
*/privatestaticfinalStringROUTING_KEY="dlx";privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";publicstaticvoidmain(String[] args)throwsException{// 获取ChannelChannel channel =RabbitMqUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明死信队列String deadQueue ="dead-queue";
channel.queueDeclare(deadQueue,false,false,false,null);
channel.queueBind(deadQueue,DEAD_EXCHANGE,ROUTING_KEY);System.out.println("Consumer02等待接收死信队列消息.....");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println("Consumer02接收死信队列的消息"+ message);};
channel.basicConsume(deadQueue,true, deliverCallback, consumerTag ->{});}}
3、消费者(消费生产者消息)
3.1、队列达到最大长度
步骤:
- 删除之前模拟TTL的normal-queue、dead-queue;
Consumer01
,设置max-length参数,启动之后关闭该消费者,模拟其接收不到消息;- 正常启动生产者发送消息;
- 消息进入死信队列后,启动
Consumer02
,它消费死信队列里面的消息; - 预期结果:
生产者代码:
与
1、生产者
中代码一致;
消费者代码:
代码路径:
lhz.dlx
类:
Consumer01
publicclassConsumer01{// 路由privatestaticfinalStringROUTING_KEY="dlx";privatestaticfinalStringNORMAL_ROUTING_KEY="normal";//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";publicstaticvoidmain(String[] args)throwsException{// 获取ChannelChannel channel =RabbitMqUtils.getChannel();// 声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明死信队列String deadQueue ="dead-queue";
channel.queueDeclare(deadQueue,false,false,false,null);// 死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue,DEAD_EXCHANGE,ROUTING_KEY);//正常队列绑定死信队列信息Map<String,Object> params =newHashMap<>();//正常队列设置死信交换机 参数 key是固定值
params.put("x-dead-letter-exchange",DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key是固定值
params.put("x-dead-letter-routing-key",ROUTING_KEY);// TODO 如果是演示`队列达到最大长度`的情况,需要设置最大队列数量
params.put("x-max-length",6);// 声明正常队列String normalQueue ="normal-queue";
channel.queueDeclare(normalQueue,false,false,false, params);
channel.queueBind(normalQueue,NORMAL_EXCHANGE,NORMAL_ROUTING_KEY);System.out.println("Consumer01等待接收消息.....");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println("Consumer01接收到消息"+ message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};
channel.basicConsume(normalQueue,false, deliverCallback, consumerTag ->{});}}
3.2、消息被拒
步骤:
- 删除之前模拟时产生的normal-queue、dead-queue;
Consumer01
,设置为消息手动确认并且拒绝接收某条消息,然后启动(不进行关闭
);- 当被拒绝的消息进入死信队列后,启动
Consumer02
它消费死信队列里面的消息; - 预期结果:生产者代码:
与
1、生产者
中代码一致;
消费者代码:
代码路径:
lhz.dlx
类:
Consumer01
publicclassConsumer01{// 路由privatestaticfinalStringROUTING_KEY="dlx";privatestaticfinalStringNORMAL_ROUTING_KEY="normal";//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";publicstaticvoidmain(String[] args)throwsException{// 获取ChannelChannel channel =RabbitMqUtils.getChannel();// 声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明死信队列String deadQueue ="dead-queue";
channel.queueDeclare(deadQueue,false,false,false,null);// 死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue,DEAD_EXCHANGE,ROUTING_KEY);//正常队列绑定死信队列信息Map<String,Object> params =newHashMap<>();//正常队列设置死信交换机 参数 key是固定值
params.put("x-dead-letter-exchange",DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key是固定值
params.put("x-dead-letter-routing-key",ROUTING_KEY);// 声明正常队列String normalQueue ="normal-queue";
channel.queueDeclare(normalQueue,false,false,false, params);
channel.queueBind(normalQueue,NORMAL_EXCHANGE,NORMAL_ROUTING_KEY);System.out.println("Consumer01等待接收消息.....");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);// TODO 模拟拒绝接收进入死信队列if(message.equals("info5")){System.out.println("Consumer01接收到消息"+ message +"并拒绝签收该消息");//requeue设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);}else{System.out.println("Consumer01接收到消息"+ message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);}};
channel.basicConsume(normalQueue,false, deliverCallback, consumerTag ->{});}}
3.3、消息 TTL 过期
步骤:
- 删除之前模拟时产生的normal-queue、dead-queue;
- 先
Consumer01
启动之后关闭该消费者 模拟其接收不到消息; - 启动生产者发送消息(设置了10STTL过期时间);
- 观察rabbitmq控制台,消息进入死信队列后,启动
Consumer02
消费死信队列里面的消息; - 预期结果:
生产者代码:
// 在《1、生产者》基础上加上以下代码:
properties =newAMQP.BasicProperties().builder().expiration("10000").build();
消费者代码:
代码路径:
lhz.dlx
类:
Producer
publicclassConsumer01{// 路由privatestaticfinalStringROUTING_KEY="dlx";privatestaticfinalStringNORMAL_ROUTING_KEY="normal";//普通交换机名称privatestaticfinalStringNORMAL_EXCHANGE="normal_exchange";//死信交换机名称privatestaticfinalStringDEAD_EXCHANGE="dead_exchange";publicstaticvoidmain(String[] args)throwsException{// 获取ChannelChannel channel =RabbitMqUtils.getChannel();// 声明死信和普通交换机 类型为 direct
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 声明死信队列String deadQueue ="dead-queue";
channel.queueDeclare(deadQueue,false,false,false,null);// 死信队列绑定死信交换机与 routingkey
channel.queueBind(deadQueue,DEAD_EXCHANGE,ROUTING_KEY);//正常队列绑定死信队列信息Map<String,Object> params =newHashMap<>();//正常队列设置死信交换机 参数 key是固定值
params.put("x-dead-letter-exchange",DEAD_EXCHANGE);//正常队列设置死信 routing-key 参数 key是固定值
params.put("x-dead-letter-routing-key",ROUTING_KEY);// 声明正常队列String normalQueue ="normal-queue";
channel.queueDeclare(normalQueue,false,false,false, params);
channel.queueBind(normalQueue,NORMAL_EXCHANGE,NORMAL_ROUTING_KEY);System.out.println("Consumer01等待接收消息.....");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),StandardCharsets.UTF_8);System.out.println("Consumer01接收到消息"+ message);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);};
channel.basicConsume(normalQueue,false, deliverCallback, consumerTag ->{});}}
4、源码地址
版权归原作者 一恍过去 所有, 如有侵权,请联系我们删除。