0


RabbitMQ--死信队列

一:死信队列的介绍:

Some messages stored in RabbitMQ queues will expire or be negatively acknowledged by consumers. Instead of silently dropping them, RabbitMQ can be configured to "dead letter" them instead, that is to republish those messages to a special-purpose exchange.

1.1死信队列的引出

以上的内容就是说明死信队列是为了解决死信的问题,什么是死信呢:就是消息在发送之后可能因为种种原因没有被消费的消息就变成了死信。我们为了不将死信直接删除,就设置了死信队列。

1.2死信如何产生:

1.消息被消费者拒绝

2.发送的消息带有时间,时间到了还是没有被消费

3.队列有生存时间,队列生存时间到了,它里面未来得及被消费的消息

4.队列到达最大长度之后,进入的消息

1.3图解死信队列与普通交换机和队列的关系、

二.Java代码使用死信队列

2.1创建项目

小编使用的是java8 + SpringBoot2.4.2

2.2注入Exchange 和Queue

@Configuration
public class Dead_Config {
    public static final String QUEUE_NAME="normal_queue";
    public static final String EXCHANGE_NAME="normal_exchange";
    public static final String NORMAL_ROUTING_KEY = "normal.#";
    public static final String DEAD_NAME="dead_queue";
    public static final String DEAD_EXCHANGE="dead_exchange";
    public static final String DEAD_ROUTING_KEY = "dead.#";
    @Bean
    public Exchange normalExchange(){
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
    }
    @Bean
    public Queue normalQueue(){
        return QueueBuilder.durable(QUEUE_NAME)
                .deadLetterExchange(DEAD_EXCHANGE)  //是死信队列连接DEAD_EXCHANGE交换机
                .deadLetterRoutingKey("dead.abc")
                .build();
    }
    @Bean
    public Binding  normalBinding(Exchange normalExchange, Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
    }
    @Bean
    public Queue deadQueue(){
        return QueueBuilder.durable(DEAD_NAME).build();
    }
    @Bean
    public Exchange deadExchange(){
        return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build();
    }
    @Bean
    public Binding deadBinding(Queue deadQueue,Exchange deadExchange){
        return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
    }
}

2.2消息被消费者拒绝

@Component
public class Consumer {
    /**
     * 测试效果一 :消息被消费者拒绝  消费者执行NACK或者是request设置为false
     * @param msg
     * @param channel
     * @param message
     */
    @RabbitListener(queues = Dead_Config.QUEUE_NAME)
    public void consumer(String msg, Channel channel, Message message) throws IOException {
        //(1)消费者设置NACK
        //关闭自动ACK
        channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
        //(2)
        channel.basicReject(message.getMessageProperties().getDeliveryTag(),false);
    }
}

2.3:为消息设置过期时间

 /**
     * 为消息设置过期时间
     */
    @Test
    public void p1(){
        rabbitTemplate.convertAndSend(Dead_Config.EXCHANGE_NAME, "normal.abc", "这是一个普通的消息", new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");//消息在5秒后过期
                return message;
            }
        });
        System.out.println("消息发送成功");
    }

2.4为队列设置有效期ttl

@Bean
    public Queue normalQueue(){
        return QueueBuilder.durable(QUEUE_NAME)
                .deadLetterExchange(DEAD_EXCHANGE)  //是死信队列连接DEAD_EXCHANGE交换机
                .deadLetterRoutingKey("dead.abc")
                .ttl(10000)
                .build();
    }

2.5设置对列的消息数

@Bean
    public Queue normalQueue(){
        return QueueBuilder.durable(QUEUE_NAME)
                .deadLetterExchange(DEAD_EXCHANGE)  //是死信队列连接DEAD_EXCHANGE交换机
                .deadLetterRoutingKey("dead.abc")
                //.ttl(10000)
                .maxLength(2)
                .build();
    }

注:会把代码里面的重要的点注释,难于理解的代码会特别讲解

三.结果展示

3.1为创建前

没有队列和交换机 注意我们创建的队列名是normal_queue和dead_queue,交换机的名字是normal_exchange和dead_exchange.

3.2测试效果一 :消息被消费者拒绝

消费者执行NACK或者是request设置为false

经过我们的拒绝之后,我们的消息出现在死信队列里面

将之后的效果省略,都是大同小异

标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/hdk5855/article/details/142860978
版权归原作者 逸Y 仙X 所有, 如有侵权,请联系我们删除。

“RabbitMQ--死信队列”的评论:

还没有评论