这篇文章,主要介绍消息队列RabbitMQ之死信队列。
一、RabbitMQ死信队列
1.1、什么是死信队列
在RabbitMQ中,并没有提供真正意义上的延迟队列,但是RabbitMQ可以设置队列、消息的过期时间,当队列或者消息到达过期时间之后,还没有被消费者消费,那么RabbitMQ会将这些消息放入另外一个队列,这个队列叫做:死信队列,而这个过期的消息就叫做:死信消息。
哪些情况下,消息会变成死信消息???
- 第一种情况:Queue队列已经满了,无法保存新进入的消息,那么这个消息就会被放入死信队列。
- 第二种情况:队列中的消息被消费者拒绝消费了,并且没有设置重新放入Queue队列里面。
- 第三种情况:队列中的消息到了过期时间还没有被消费者消费。
死信队列需要通过一个死信交换机来分发死信消息,死信交换机是Dead Letter Exchange,简称:DLX。死信队列大致原理图如下所示:
1.2、设置过期时间TTL
RabbitMQ可以给Queue队列设置过期时间,也可以给单独的某一条消息Message设置过期时间,过期时间就是指消息存活时间,英文全称是Time To Live,简称:TTL。
- 第一种方式:设置队列过期时间。- RabbitMQ可以给整个Queue队列设置过期时间,设置整个Queue过期时间,也就是设置这个队列中的所有Message消息的过期时间。- 这种方式不太灵活,因为如果每一个消息的延迟时间不一样,那么就需要保存到不同的Queue队列里面,给每一个Queue队列设置不同的过期时间,这样就会导致Queue队列非常的多。- 注意:通过队列的属性【x-message-ttl】设置队列的过期时间,单位是【ms】毫秒。
- 第二种方式:设置单条消息过期时间。- RabbitMQ也支持给一个队列中的每一条Message消息设置过期时间,这种方式就更加灵活啦,同一个队列里面可以保存不同过期时间的消息,减少了队列的数量。- 这种方式也有一个缺点,那就是:每一条消息的过期时间不同,如果队列之前的消息没有被消费掉,那么后面的消息过期时间到了,也不会被剔除掉。- RabbitMQ剔除过期消息是在消费者消费之前判断是否过期的。- 举个例子: - 有A、B两条消息,A消息过期时间是10秒,B消息过期时间是5秒,并且A消息在B消息之前进入队列,5秒之后,A消息还没有被消费者消费掉,此时B消息到了过期时间,但是它不会立即从队列里面剔除,而是会在消费者消费B消息时候,判断B消息是否过期。- 当A消息消费结束之后,开始消费B消息,发现B消息已经过期,并且已经过期5秒,就会将这一条消息转发到死信交换机。- 注意:通过消息的【expiration】属性设置过期时间,单位是【ms】毫秒。
注意:可以同时设置队列和消息的过期时间,RabbitMQ会根据最短的时间来决定消息是否已经过期了。
1.3、配置死信交换机和死信队列(代码配置)
死信交换机和死信队列和普通的交换机、队列没有什么不同,只是叫法不同,在配置死信交换机和死信队列的时候,只需要给具体的业务队列配置死信交换机,然后给这个死信交换机绑定一个死信队列即可。
可以通过代码设置死信交换机和死信队列,也可以通过RabbitMQ提供的管理界面直接配置死信交换机和死信队列。在实际开发中,交换机和队列都是事先创建好的,之后生产者、消费者直接从指定的队列消费消息即可。
(1)设置队列过期时间
在声明队列的时候,给队列设置【x-message-ttl】属性即可。
// 6、指定需要操作的消息队列,如果队列不存在,则会创建
String queueName = "queue_demo_2023";
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", "30000"); // 设置队列的过期时间
channel.queueDeclare(queueName, false, false, false, arguments);
(2)设置单条消息过期时间
在生产者发送消息的时候,通过【AMQP.BasicProperties】设置过期时间属性。
// 6、发送消息
String message = "RabbitMQ死信队列案例";
// TODO 设置消息的属性
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
// 设置消息的过期时间是:30秒
.expiration("30000")
.build();
channel.basicPublish(exchangeName, "demo_key_2023", properties, message.getBytes());
(3)队列设置死信交换机
- 声明队列的时候,通过给队列指定【x-dead-letter-exchange】属性,指定当前队列需要转发的死信交换机。
- 通过【x-dead-letter-routing-key】属性,指定死信交换机绑定的路由键routingKey。
// 声明 Exchange,如果不存在,则会创建
channel.exchangeDeclare("exchange_demo_2023", "direct");
// TODO 声明死信交换机
channel.exchangeDeclare("exchange_dead_2023", "direct");
// 指定需要操作的消息队列,如果队列不存在,则会创建
// TODO 设置队列过期时间【业务队列】
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 10000); // 设置队列的过期时间
// 指定队列绑定的死信交换机名称
arguments.put("x-dead-letter-exchange", "exchange_dead_2023");
// 指定死信交换机的路由键routingKey
arguments.put("x-dead-letter-routing-key", "dead_key_2023");
channel.queueDeclare("queue_demo_2023", false, false, false, arguments);
// TODO 创建死信队列
channel.queueDeclare("queue_dead_2023", false, false, false, null);
// 绑定 Exchange 和 Queue
channel.queueBind("queue_demo_2023", "exchange_demo_2023", "demo_key_2023");
// TODO 死信队列绑定死信交换机
channel.queueBind("queue_dead_2023", "exchange_dead_2023", "dead_key_2023");
(4)配置的基本思路
- 业务队列和业务交换机绑定。
- 死信队列和死信交换机绑定。
- 业务队列和死信交换机,是通过【x-dead-letter-exchange】和【x-dead-letter-routing-key】参数建立绑定关系的。
1.4、配置死信交换机和死信队列(RabbitMQ管理界面配置)
- 第一步:创建业务交换机【busi_exchange_2023】。
- 第二步:创建死信交换机【dead_exchange_2023】。
- 第三步:创建业务队列【busi_queue_2023】。- 设置【x-message-ttl】消息过期时间。- 设置【x-dead-letter-exchange】死信交换机。- 设置【x-dead-letter-routing-key】死信交换机的路由键routingKey。
- 第五步:创建死信队列【dead_queue_2023】。
- 第六步:业务队列【busi_queue_2023】绑定业务交换机【busi_exchange_2023】。
- 第七步:死信队列【dead_queue_2023】绑定死信交换机【dead_exchange_2023】。
- 第八步:通过前面七个步骤,已经配置好了死信队列和死信交换机啦,可以进行测试啦。
- 第九步:查看业务队列和死信队列的变化情况。
到此,RabbitMQ中的死信队列就介绍完啦。
综上,这篇文章结束了,主要介绍消息队列RabbitMQ之死信队列。
版权归原作者 朱友斌 所有, 如有侵权,请联系我们删除。