一:死信队列的介绍:
Some messages stored in RabbitMQ queues will expire or be negatively acknowledged by consumers. Instead of silently dropping them, RabbitMQ can be configured to "dead letter" them instead, that is to republish those messages to a special-purpose exchange.
1.1死信队列的引出
以上的内容就是说明死信队列是为了解决死信的问题,什么是死信呢:就是消息在发送之后可能因为种种原因没有被消费的消息就变成了死信。我们为了不将死信直接删除,就设置了死信队列。
1.2死信如何产生:
1.消息被消费者拒绝
2.发送的消息带有时间,时间到了还是没有被消费
3.队列有生存时间,队列生存时间到了,它里面未来得及被消费的消息
4.队列到达最大长度之后,进入的消息
1.3图解死信队列与普通交换机和队列的关系、

二.Java代码使用死信队列
2.1创建项目
小编使用的是java8 + SpringBoot2.4.2
2.2注入Exchange 和Queue
@Configuration
public class Dead_Config {
public static final String QUEUE_NAME="normal_queue";
public static final String EXCHANGE_NAME="normal_exchange";
public static final String NORMAL_ROUTING_KEY = "normal.#";
public static final String DEAD_NAME="dead_queue";
public static final String DEAD_EXCHANGE="dead_exchange";
public static final String DEAD_ROUTING_KEY = "dead.#";
@Bean
public Exchange normalExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(QUEUE_NAME)
.deadLetterExchange(DEAD_EXCHANGE) //是死信队列连接DEAD_EXCHANGE交换机
.deadLetterRoutingKey("dead.abc")
.build();
}
@Bean
public Binding normalBinding(Exchange normalExchange, Queue normalQueue){
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
}
@Bean
public Queue deadQueue(){
return QueueBuilder.durable(DEAD_NAME).build();
}
@Bean
public Exchange deadExchange(){
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
}
@Bean
public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
}
2.2消息被消费者拒绝
@Component
public class Consumer {
/**
* 测试效果一 :消息被消费者拒绝 消费者执行NACK或者是request设置为false
* @param msg
* @param channel
* @param message
*/
@RabbitListener(queues = Dead_Config.QUEUE_NAME)
public void consumer(String msg, Channel channel, Message message) throws IOException {
//(1)消费者设置NACK
//关闭自动ACK
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
//(2)
channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
}
}
2.3:为消息设置过期时间
/**
* 为消息设置过期时间
*/
@Test
public void p1(){
rabbitTemplate.convertAndSend(Dead_Config.EXCHANGE_NAME, "normal.abc", "这是一个普通的消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");//消息在5秒后过期
return message;
}
});
System.out.println("消息发送成功");
}
2.4为队列设置有效期ttl
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(QUEUE_NAME)
.deadLetterExchange(DEAD_EXCHANGE) //是死信队列连接DEAD_EXCHANGE交换机
.deadLetterRoutingKey("dead.abc")
.ttl(10000)
.build();
}
2.5设置对列的消息数
@Bean
public Queue normalQueue(){
return QueueBuilder.durable(QUEUE_NAME)
.deadLetterExchange(DEAD_EXCHANGE) //是死信队列连接DEAD_EXCHANGE交换机
.deadLetterRoutingKey("dead.abc")
//.ttl(10000)
.maxLength(2)
.build();
}
注:会把代码里面的重要的点注释,难于理解的代码会特别讲解
三.结果展示
3.1为创建前
没有队列和交换机 注意我们创建的队列名是normal_queue和dead_queue,交换机的名字是normal_exchange和dead_exchange.

3.2测试效果一 :消息被消费者拒绝
消费者执行NACK或者是request设置为false

经过我们的拒绝之后,我们的消息出现在死信队列里面

将之后的效果省略,都是大同小异
版权归原作者 逸Y 仙X 所有, 如有侵权,请联系我们删除。