死信队列
无法被正常消费的消息就称为死信,存储这种消息的队列成为死信队列,便于开发者分析和处理这些“死信”,以便找出问题所在,而不是直接丢弃这些消息。
死信来源
- 消息 TTL 过期
TTL是 Time To Live 的缩写, 也就是生存时间
- 队列达到最大长度
队列满了,无法再添加数据到 MQ 中
- 消息被拒绝
(basic.reject 或 basic.nack) 并且 requeue = false
实现方式
为正常的交换机配置一个死信交换机,当消息无法正常消费时,正常交换机自动把消息转发给死信交换机。
验证TTL过期
消费者代码如下,启动之后马上停止运行(模拟消费者的消费消息失败)
importcom.rabbitmq.client.BuiltinExchangeType;importcom.rabbitmq.client.Channel;importcom.rabbitmq.client.DeliverCallback;importjava.nio.charset.StandardCharsets;importjava.util.HashMap;importjava.util.Map;publicclassConsumer01{//普通交换机的名称publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";//死信交换机的名称publicstaticfinalString DEAD_EXCHANGE ="dead_exchange";//普通队列的名称publicstaticfinalString NORMAL_QUEUE ="normal_queue";//死信队列的名称publicstaticfinalString DEAD_QUEUE ="dead_queue";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMQUtil.getChannel();// 声明死信和普通交换机,类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE,BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);Map<String,Object> arguments =newHashMap<>();// 正常队列关联死信交换机,意味着正常队列消费异常的消息会转发到关联的这个死信交换机
arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key","routingKeyTwo");// 声明普通队列
channel.queueDeclare(NORMAL_QUEUE,false,false,false, arguments);// 声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);// 绑定普通交换机及其队列
channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE,"routingKeyOne");// 绑定死信交换机及其死信队列
channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE,"routingKeyTwo");System.out.println("等待接收消息...");DeliverCallback deliverCallback =(consumerTag, message)->{System.out.println("Consumer01接受的消息是:"+newString(message.getBody(),StandardCharsets.UTF_8));};
channel.basicConsume(NORMAL_QUEUE,true, deliverCallback, consumerTag ->{});}}
生产者代码如下,主要生产消息,并给消息设置TTL。
importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.Channel;publicclassDeadProducer{// 普通交换机的名称publicstaticfinalString NORMAL_EXCHANGE ="normal_exchange";publicstaticvoidmain(String[] args)throwsException{Channel channel =RabbitMQUtil.getChannel();// 死信消息 设置TTL(live to time)单位是msAMQP.BasicProperties properties =newAMQP.BasicProperties().builder().expiration("10000").build();for(int i =1; i <=10; i++){String message ="message"+ i;
channel.basicPublish(NORMAL_EXCHANGE,"routingKeyOne", properties, message.getBytes());}}}
测试结果如下
- 启动消费者停止后,会创建出对应的交换机和队列,此时正常队列中没有消息。
- 启动生产者后,可以看到正常队列中有消息,在10s后,消息陆续转发到死信队列中
- 可以看到最终消息来到了死信队列,因为没有消费者处理死信队列,所有消息会一直堆积在这。
验证队列达到最大长度
消费者代码中,对普通队列参数加上以下代码,设置普通队列的最大长度。(启动之后关闭该消费者,模拟其接收不到消息)
// 设置正常队列长度的限制,例如发送10个消息,6个为正常,4个为死信
arguments.put("x-max-length",6);
把前面生产者代码中,设置消息TTL这段代码注释掉。(启动生产者)
// AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
测试结果如下
因为正常队列设置了最大长度为6,所有同时收到10个消息后,会把多余的4个当做死信转发到死信队列。
图:正常队列中消息数
图:死信队列中消息数
验证消息被拒
在前面代码的基础上,把消费者代码中,普通队列长度设置的代码注释
// arguments.put("x-max-length", 6);
同时关闭自动确认,使用手动应答。启动消费者(这里不能马上停止),再启动生产者。
DeliverCallback deliverCallback =(consumerTag, message)->{String msgContent =newString(message.getBody(),StandardCharsets.UTF_8);if(msgContent.equals("message5")){System.out.println("Consumer01接受的消息是:"+ msgContent +": 此消息是被C1拒绝的");// requeue 设置为 false 代表拒绝重新入队 该队列如果配置了死信交换机将发送到死信队列中
channel.basicReject(message.getEnvelope().getDeliveryTag(),false);}else{System.out.println("Consumer01接受的消息是:"+ msgContent);
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);}};
channel.basicConsume(NORMAL_QUEUE,false, deliverCallback, consumerTag ->{});
测试结果如下,被拒绝的且不能重新入队的消息称为死信转发到死信队列。
延时队列
延时队列就是用来存放需要在指定时间被处理的元素的队列。比如:订单在十分钟之内未支付则自动取消。
队列和消息可以分别设置TTL
两者同时设置,取最小值。
- 设置队列的TTL属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中)。
- 设置消息的TTL属性,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的。在这种情况下需要注意,如果消息A的TTL大于消息B的TTL,他们先后放入到一个正常队列中,虽然消息B的TTL要小,但不会先放入到死信队列处理。
实现方式
给消息设置TTL,消息过期后会转发到死信交换机,为死信队列配置消费者处理这些消息,就到达延时队列的目的。
死信队列和延时队列区别
死信队列主要存储无法正常消费的消息,便于处理分析。延时队列利用消息TTL产生死信这一特点来控制消息的消费时间。
版权归原作者 @阿秋 所有, 如有侵权,请联系我们删除。