什么是死信队列?
在消息队列中,执行异步任务时,通常是将消息生产者发布的消息存储在队列中,由消费者从队列中获取并处理这些消息。但是,在某些情况下,消息可能无法正常地被处理和消耗,例如:格式错误、设备故障等,这些未成功处理的消息就被称为“死信”。
为了避免这些未成功处理的消息导致程序异常或对系统造成影响,我们需要使用
死信队列
(Dead Letter Queue)。当我们设置死信队列后,所有无法成功处理的消息将被捕获并重定向到指定的死信交换机中。消费者可以从该交换机中读取并处理这些“死信”。
死信队列的优点
使用死信队列有以下优点:
- 提高系统可靠性:避免因未处理的死信而导致程序异常,提高系统的可靠性。
- 实现延迟消息:可以通过设置TTL时间,将超时未消费的消息转移到死信队列中,实现延迟消息的功能。
- 防止滥用:当某些生产者恶意发送低质量的消息或进行滥用时,可以通过丢弃或重定向死信消息来防止滥用和恶意攻击。
死信队列的应用场景
死信队列在以下场景下是非常有用的:
- 消息格式错误:当消息格式错误时,可能会导致消费者无法正确地解析或处理该消息,这个问题通常与生产者的代码有关。为了避免消息失效,并提高系统可靠性,我们可以使用死信队列。
- 消费者故障:另一个常见的场景是消息处理者无法正确地处理或响应到推入到队列中的消息,例如消费者创建了一个协程并在逻辑执行完成后未正确地关闭该协程。由于该协程始终处于打开状态,它将一直阻止该消费者对其他消息进行正确消费。为了避免这种消息挂起并影响其他消息的正常处理,可以将其加入死信中心。
死信队列的实现方式
下面通过RabbitMQ和Spring Boot,演示如何实现死信队列。
RabbitMQ实现
创建交换机和队列
import pika
defmain():
credentials = pika.PlainCredentials('guest','guest')
parameters = pika.ConnectionParameters('localhost',5672,'/', credentials)# 创建死信交换机
dlx_exchnage_name ='my-dlx-exchange'with pika.BlockingConnection(parameters)as connection:
channel = connection.channel()
channel.exchange_declare(exchange=dlx_exchnage_name, exchange_type='fanout', durable=True)# 创建死信队列和交换机
dead_letter_queue_name ='dead-letter-queue'with pika.BlockingConnection(parameters)as connection:
channel = connection.channel()
channel.queue_declare(queue=dead_letter_queue_name, durable=True)
channel.queue_bind(queue=dead_letter_queue_name, exchange=dlx_exchnage_name)# 创建消息队列,并将其绑定到死信队列上
queue_name ="job-queue"
arguments ={"x-dead-letter-exchange": dlx_exchnage_name}with pika.BlockingConnection(parameters)as connection:
channel = connection.channel()
channel.queue_declare(queue=queue_name, durable=True,
arguments=arguments)
channel.queue_bind(exchange='', queue=queue_name, routing_key=queue_name)print("Queue is created")if __name__ =='__main__':
main()
以上代码创建了两个队列,一个是
my-dlx-exchange
,一个是
dead-letter-queue
。同时创建另外一个名为
job-queue
的队列,它绑定了
dead-letter-exchange
这个交换机。
在发送消息时,需要提供一些属性来指定该队列应采取哪些步骤来防止该类丢失的消息。 这里我们可以使用
x-dead-letter-exchange
和
x-message-ttl
两个特殊属性,告诉RabbitMQ,如果消息在某段时间内无法正确处理,则将其放入死信队列中。
发送和接收消息
import pika
defsend_message():
credentials = pika.PlainCredentials('guest','guest')
parameters = pika.ConnectionParameters('localhost',5672,'/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()# 发送一个消息5秒后过期,并且未被消费端确认
queue_name ="job-queue"
properties = pika.BasicProperties(delivery_mode=2,
expiration="5000")
channel.basic_publish(exchange='', routing_key=queue_name, body='Hello World!', properties=properties)
channel.close()
connection.close()defreceive_message():
credentials = pika.PlainCredentials('guest','guest')
parameters = pika.ConnectionParameters('localhost',5672,'/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
dlx_exchnage_name ='my-dlx-exchange'defcallback(ch, method, properties, body):print("Receivedmessage: %r"% body)
ch.basic_ack(delivery_tag=method.delivery_tag)# 消费来自job-queue的消息
queue_name ="job-queue"
channel.basic_consume(queue_name, callback)
channel.start_consuming()if __name__ =='__main__':
send_message()
receive_message()
以上代码是一个简单的生产者和消费者。
send_message()
函数发送一个消息,并且未被消费端确认;
receive_message()
函数从
job-queue
中接收消息。
Spring Boot实现
在Spring Boot中,我们可以使用RabbitMQ来实现死信队列。
添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
配置文件
spring:rabbitmq:host: localhost
port:5672username: guest
password: guest
创建死信交换机、队列和绑定
@ConfigurationpublicclassRabbitConfig{// 死信交换机publicstaticfinalString DLX_EXCHANGE_NAME ="my-dlx-exchange";// 死信队列publicstaticfinalString DEAD_LETTER_QUEUE_NAME ="dead-letter-queue";// 业务处理队列publicstaticfinalString PROCESS_QUEUE_NAME ="process-queue";@BeanpublicTopicExchangedlxExchange(){returnnewTopicExchange(DLX_EXCHANGE_NAME);}@BeanpublicQueuedeadLetterQueue(){returnQueueBuilder.durable(DEAD_LETTER_QUEUE_NAME).withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME).build();}@BeanpublicQueueprocessQueue(){returnQueueBuilder.durable(PROCESS_QUEUE_NAME).withArgument("x-message-ttl",5000).withArgument("x-dead-letter-exchange", DLX_EXCHANGE_NAME).build();}@BeanpublicBindingdlxBinding(Queue deadLetterQueue,TopicExchange dlxExchange){returnBindingBuilder.bind(deadLetterQueue).to(dlxExchange).with("#");}}
以上代码创建了
my-dlx-exchange
死信交换机和
dead-letter-queue
死信队列。同时创建一个
process-queue
业务处理队列,该队列设置了消息的生存时间为5s,并在该时间内未被消费者消费,则将该消息转移到死信队列中。
发送和接收消息
@ComponentpublicclassMessageSender{@AutowiredprivateRabbitTemplate rabbitTemplate;publicvoidsend(String message){
rabbitTemplate.convertAndSend("",RabbitConfig.PROCESS_QUEUE_NAME, message);}}@ComponentpublicclassMessageReceiver{@RabbitListener(queues ="process-queue")publicvoidreceive(String message)throwsException{System.out.println("Received message: "+ message);Thread.sleep(10000);}}
以上代码是一个简单的生产者和消费者。生产者使用
RabbitTemplate
来发送消息到
process-queue
队列中;消费者通过使用注解
@RabbitListener
监听
process-queue
队列的消息并进行处理。
结语
通过本篇文章,我们详细介绍了什么是死信队列、死信队列的优点、应用场景以及如何实现死信队列。通过RabbitMQ和Spring Boot的实现方式,不难看出死信队列在项目中的重要性和实际价值。在工程实践中,我们可以根据具体业务需求,结合技术选型,灵活运用死信队列来提高系统的可靠性和稳定性。
版权归原作者 DevCorner 所有, 如有侵权,请联系我们删除。