📒博客首页:崇尚学技术的科班人
🏇小肖来了
🍣今天给大家带来的文章是《万字 +图片解析死信队列和死信实战演练》🍣
🍣有的小伙伴可能会问死信队列有啥用?你看了这篇文章就知道了🍣
🍣希望各位小伙伴们能够耐心的读完这篇文章🍣
🙏博主也在学习阶段,如若发现问题,请告知,非常感谢🙏
💗同时也非常感谢各位小伙伴们的支持💗
文章目录
1、死信队列
1.1、概念
- 死信:就是
无法被消费的消息
。由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。 - 应用场景:保证订单业务的消息数据不丢失,当消息发生异常时,将消息投入死信队列中。比如说:用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
1.2、死信来源
- 消息
TTL
过期 - 队列达到最大长度(队列满了,无法再添加数据到队列中)。
- 消息被拒绝并且
requeue = false
1.3、死信实战
1.3.1、代码架构图
1.3.2、TTL过期情况
1. 消费者01
importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xiao.utils.RabbitmqUtil;importjava.util.HashMap;importjava.util.Map;publicclassConsumer01{publicstaticfinalString DEAD_EXCHANGE ="dead_exchange";publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";publicstaticfinalString DEAD_QUEUE ="dead_queue";publicstaticfinalString NORMAL_QUEUE ="normal_queue";/**
* 死信实战
* 消费者01
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();// 死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);Map<String,Object> map =newHashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");
map.put("x-message-ttl",10000);// 普通队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);// 死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 队列绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息......");DeliverCallback deliverCallback =(var1, var2)->{System.out.println("Consumer01控制台接收到的消息是:"+newString(var2.getBody(),"UTF-8"));};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});}}
- 最为复杂的就是
消费者01
,它需要进行 死信交换机绑定死信队列、普通交换机绑定普通队列、普通队列绑定死信交换机。 - 我们为了让消息不被消费,我们需要制造假死现象,也就是
关闭消费者01
。
2. 消费者02
importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xiao.utils.RabbitmqUtil;importjava.util.HashMap;importjava.util.Map;publicclassConsumer02{/**
* 消费者02
*/publicstaticfinalString DEAD_QUEUE ="dead_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();System.out.println("等待接收消息......");DeliverCallback deliverCallback =(var1, var2)->{System.out.println("Consumer02控制台接收到的消息是:"+newString(var2.getBody(),"UTF-8"));};// 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});}}
3. 生产者
importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.xiao.utils.RabbitmqUtil;importjava.nio.charset.StandardCharsets;publicclassProducer{publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();// 单位是毫秒AMQP.BasicProperties properties =newAMQP.BasicProperties().builder().expiration("10000").build();for(int i =1; i <11; i ++){String message ="info"+ i;// 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());}}}
4. 测试结果
- 所有的消息在超过过期时间之后,全部转移到了死信队列中。
1.3.3、队列达到最大长度情况
1. 消费者01
publicclassConsumer01{publicstaticfinalString DEAD_EXCHANGE ="dead_exchange";publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";publicstaticfinalString DEAD_QUEUE ="dead_queue";publicstaticfinalString NORMAL_QUEUE ="normal_queue";/**
* 死信实战
* 消费者01
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();// 死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);Map<String,Object> map =newHashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");
map.put("x-max-length",6);//map.put("x-message-ttl",10000);// 普通队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);// 死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 队列绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息......");DeliverCallback deliverCallback =(var1, var2)->{System.out.println("Consumer01控制台接收到的消息是:"+newString(var2.getBody(),"UTF-8"));};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,var1->{});}}
- 这里我们将过期时间参数改为了
队列最大长度
- 我们为了让消息不被消费和观察到明显现象,我们需要制造假死现象,也就是
关闭消费者01
。
2. 消费者02
importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xiao.utils.RabbitmqUtil;importjava.util.HashMap;importjava.util.Map;publicclassConsumer02{/**
* 消费者02
*/publicstaticfinalString DEAD_QUEUE ="dead_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();System.out.println("等待接收消息......");DeliverCallback deliverCallback =(var1, var2)->{System.out.println("Consumer02控制台接收到的消息是:"+newString(var2.getBody(),"UTF-8"));};// 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});}}
- 消费者02和TTL过期情况下的一模一样
3. 生产者
importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.xiao.utils.RabbitmqUtil;importjava.nio.charset.StandardCharsets;publicclassProducer{publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();// 单位是毫秒AMQP.BasicProperties properties =newAMQP.BasicProperties().builder().expiration("10000").build();for(int i =1; i <11; i ++){String message ="info"+ i;// 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());}}}
- 我们将对应的设置过期时间注释掉
4. 测试结果
- 如果我们启动消费者01会报错,那是因为我们所创建的队列已经存在,我们需要把普通队列删除,因为只有它的参数发生了改变。
- 因为我们设置了普通队列的最大长度6,所以当超过了最大长度的消息都会被作为死信。
1.3.4、消息被拒情况
1. 消费者01
importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xiao.utils.RabbitmqUtil;importjava.util.HashMap;importjava.util.Map;publicclassConsumer01{publicstaticfinalString DEAD_EXCHANGE ="dead_exchange";publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";publicstaticfinalString DEAD_QUEUE ="dead_queue";publicstaticfinalString NORMAL_QUEUE ="normal_queue";/**
* 死信实战
* 消费者01
* @param args
* @throws Exception
*/publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();// 死信交换机
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);// 普通交换机
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);Map<String,Object> map =newHashMap<>();
map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
map.put("x-dead-letter-routing-key","lisi");//map.put("x-max-length",6);//map.put("x-message-ttl",10000);// 普通队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);// 死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 队列绑定
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");System.out.println("等待接收消息......");DeliverCallback deliverCallback =(var1, var2)->{String msg =newString(var2.getBody(),"UTF-8");if(msg.equals("info5")){System.out.println("Consumer01控制台接收到的消息是:"+ msg +": 此消息被拒");
channel.basicReject(var2.getEnvelope().getDeliveryTag(),false);}else{System.out.println("Consumer01控制台接收到的消息是:"+ msg);
channel.basicAck(var2.getEnvelope().getDeliveryTag(),false);}};
channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,var1->{});}}
- 这里我们将
队列最大长度
注释掉 - 我们
还需要开启手动应答
,因为不开启就不会存在消息被拒 的问题。
2. 消费者02
importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importcom.xiao.utils.RabbitmqUtil;importjava.util.HashMap;importjava.util.Map;publicclassConsumer02{/**
* 消费者02
*/publicstaticfinalString DEAD_QUEUE ="dead_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();System.out.println("等待接收消息......");DeliverCallback deliverCallback =(var1, var2)->{System.out.println("Consumer02控制台接收到的消息是:"+newString(var2.getBody(),"UTF-8"));};// 只需要面向 死信队列消费消息,里面的关系绑定已经由C1完成
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,var1->{});}}
- 消费者02和队列达到最大长度情况下的一模一样
3. 生产者
importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;importcom.xiao.utils.RabbitmqUtil;importjava.nio.charset.StandardCharsets;publicclassProducer{publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitmqUtil.getChannel();// 单位是毫秒AMQP.BasicProperties properties =newAMQP.BasicProperties().builder().expiration("10000").build();for(int i =1; i <11; i ++){String message ="info"+ i;// 只需要向交换机里面发送消息,里面的关系绑定已经由C1完成
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());}}}
- 生产者和队列达到最大长度情况下的一模一样
4. 测试结果
- 测试之前我们需要将队列中的消息消费掉,并且需要将普通队列删除。
- 可见只有
"info5"
被作为死信。
2、总结
- 如果觉得这篇文章对你有帮助的话,请给我一个五星好评呗。评论地址,感谢铁汁的支持!!!
版权归原作者 崇尚学技术的科班人 所有, 如有侵权,请联系我们删除。