0


RabbitMQ死信队列详解:原理、配置与实战

RabbitMQ 是一个强大的消息代理,它支持多种消息传递模式。在实际应用中,我们经常会遇到一些消息无法被正常处理的情况,这些消息需要特别处理。为了应对这种情况,RabbitMQ 引入了死信队列(Dead Letter Queue, DLQ)概念。


🧑 博主简介:现任阿里巴巴嵌入式技术专家,15年工作经验,深耕嵌入式+人工智能领域,精通嵌入式领域开发、技术管理、简历招聘面试。CSDN优质创作者,提供产品测评、学习辅导、简历面试辅导、毕设辅导、项目开发、C/C++/Java/Python/Linux/AI等方面的服务,如有需要请站内私信或者联系任意文章底部的的VX名片(ID:

gylzbk

💬 博主粉丝群介绍:① 群内初中生、高中生、本科生、研究生、博士生遍布,可互相学习,交流困惑。② 热榜top10的常客也在群里,也有数不清的万粉大佬,可以交流写作技巧,上榜经验,涨粉秘籍。③ 群内也有职场精英,大厂大佬,可交流技术、面试、找工作的经验。④ 进群免费赠送写作秘籍一份,助你由写作小白晋升为创作大佬。⑤ 进群赠送CSDN评论防封脚本,送真活跃粉丝,助你提升文章热度。有兴趣的加文末联系方式,备注自己的CSDN昵称,拉你进群,互相学习共同进步。

在这里插入图片描述

RabbitMQ死信队列详解:原理、配置与实战

在这里插入图片描述

1. 什么是死信队列?

死信队列是用来处理无法被正常消费的消息的队列。当消息在原始队列中无法被正常处理时,会被重新路由到死信队列中。导致消息进入死信队列的常见原因有:

  1. 消息被拒绝(Rejection):消费者使用 basic.rejectbasic.nackrequeue 参数被设置为 false
  2. 消息过期(TTL Expiration):消息的生存时间(TTL)到期。
  3. 队列长度限制(Queue Length Limit):队列达到最大长度限制。

2. 配置死信队列

在 RabbitMQ 中,我们可以通过设置队列的

x-dead-letter-exchange

x-dead-letter-routing-key

参数来配置死信队列。

2.1 创建交换机和队列

首先,我们需要创建一个用于接收死信消息的交换机和队列。

# 创建死信交换机
rabbitmqadmin declare exchange name=dlx type=direct

# 创建死信队列
rabbitmqadmin declare queue name=dlq

# 将死信队列绑定到死信交换机
rabbitmqadmin declare binding source=dlx destination=dlq routing_key=dlq_routing_key

2.2 配置原始队列

接下来,我们需要配置原始队列,使其支持死信功能。

# 创建原始队列并配置死信交换机和路由键
rabbitmqadmin declare queue name=original_queue arguments='{"x-dead-letter-exchange":"dlx", "x-dead-letter-routing-key":"dlq_routing_key"}'

3. 使用死信队列

3.1 生产者发送消息

生产者发送消息到原始队列。

importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.Channel;publicclassProducer{privatefinalstaticStringQUEUE_NAME="original_queue";publicstaticvoidmain(String[] argv)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);String message ="Hello, World!";
            channel.basicPublish("",QUEUE_NAME,null, message.getBytes());System.out.println(" [x] Sent '"+ message +"'");}}}

3.2 消费者处理消息

消费者从原始队列中消费消息,并在无法处理时将消息拒绝,从而使消息进入死信队列。

importcom.rabbitmq.client.*;publicclassConsumer{privatefinalstaticStringQUEUE_NAME="original_queue";publicstaticvoidmain(String[] argv)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),"UTF-8");System.out.println(" [x] Received '"+ message +"'");// 模拟处理失败,拒绝消息
                channel.basicReject(delivery.getEnvelope().getDeliveryTag(),false);};
            channel.basicConsume(QUEUE_NAME,false, deliverCallback, consumerTag ->{});}}}

3.3 处理死信队列中的消息

创建一个新的消费者,从死信队列中消费死信消息。

importcom.rabbitmq.client.*;publicclassDLQConsumer{privatefinalstaticStringQUEUE_NAME="dlq";publicstaticvoidmain(String[] argv)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);System.out.println(" [*```java
]Waitingfor messages in DLQ. To exit press CTRL+C");DeliverCallback deliverCallback =(consumerTag, delivery)->{String message =newString(delivery.getBody(),"UTF-8");System.out.println(" [x] Received in DLQ: '"+ message +"'");// 此处可以对死信消息进行处理,例如记录日志或重新处理};
            channel.basicConsume(QUEUE_NAME,true, deliverCallback, consumerTag ->{});}}}

4. 死信队列的高级配置

4.1 设置消息TTL(Time-To-Live)

我们可以为队列或消息设置TTL,以控制消息的生存时间。当消息的TTL到期后,消息将被转移到死信队列。

为队列设置TTL

rabbitmqadmin declare queue name=original_queue arguments='{"x-dead-letter-exchange":"dlx", "x-dead-letter-routing-key":"dlq_routing_key", "x-message-ttl":60000}'

为消息设置TTL

importcom.rabbitmq.client.AMQP;importcom.rabbitmq.client.ConnectionFactory;importcom.rabbitmq.client.Connection;importcom.rabbitmq.client.Channel;publicclassProducerWithTTL{privatefinalstaticStringQUEUE_NAME="original_queue";publicstaticvoidmain(String[] argv)throwsException{ConnectionFactory factory =newConnectionFactory();
        factory.setHost("localhost");try(Connection connection = factory.newConnection();Channel channel = connection.createChannel()){
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);String message ="Hello, World with TTL!";AMQP.BasicProperties properties =newAMQP.BasicProperties.Builder().expiration("60000")// 设置消息的TTL为60000毫秒(1分钟).build();
            channel.basicPublish("",QUEUE_NAME, properties, message.getBytes());System.out.println(" [x] Sent '"+ message +"' with TTL");}}}

4.2 设置队列长度限制

我们可以为队列设置最大长度限制,当队列中的消息数量超过此限制时,最早的消息将被转移到死信队列。

rabbitmqadmin declare queue name=original_queue arguments='{"x-dead-letter-exchange":"dlx", "x-dead-letter-routing-key":"dlq_routing_key", "x-max-length":10}'

5. 监控和管理死信队列

为了确保死信队列能够正常工作并及时处理死信消息,我们可以使用RabbitMQ管理控制台或命令行工具进行监控和管理。

5.1 RabbitMQ管理控制台

RabbitMQ提供了一个Web管理控制台,可以方便地查看队列、交换机和消息的状态。你可以通过以下步骤访问管理控制台:

  1. 打开浏览器,访问http://localhost:15672
  2. 输入用户名和密码(默认用户名和密码均为guest)。
  3. 在管理控制台中查看和管理死信队列。

5.2 RabbitMQ命令行工具

RabbitMQ提供了一些命令行工具,可以用于管理和监控队列。例如:

# 查看队列信息
rabbitmqctl list_queues

# 查看交换机信息
rabbitmqctl list_exchanges

# 查看队列中的消息
rabbitmqadmin get queue=dlq

6. 总结

死信队列是RabbitMQ中处理无法正常消费消息的重要机制。通过配置死信交换机和死信队列,我们可以确保消息在无法处理时不会丢失,而是能够被重新路由到专门的队列进行处理。本文详细介绍了配置和使用死信队列的方法,并提供了示例代码来帮助你更好地理解和应用这一机制。、

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/g310773517/article/details/140955376
版权归原作者 I'mAlex 所有, 如有侵权,请联系我们删除。

“RabbitMQ死信队列详解:原理、配置与实战”的评论:

还没有评论