概述
在 RabbitMQ 中实现延迟消息通常需要借助插件(如 RabbitMQ 延迟队列插件),因为 RabbitMQ 本身不原生支持延迟消息。
延迟消息的一个典型场景是,当消息发布到队列后,等待一段时间再由消费者消费。这可以通过配置 TTL(Time-To-Live)和死信队列(DLX, Dead Letter Exchange)实现,或者通过 RabbitMQ 的延迟插件实现。
安装插件
下载地址
直接点击下载,然后将下载后的文件直接放入在 plugins 目录中:
启动插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
检查是否成功
打开可视化面板,访问 http://localhost:15672/#/ 账号密码都是 guest
发现新增了一个延迟队列类型 x-delayed-message:
延迟消息实现步骤
1. 创建一个延迟交换机
RabbitMQ 延迟插件允许我们使用一种特殊的交换机类型
x-delayed-message
,可以设置延迟时间。
2. 发送延迟消息
通过设置消息属性中的
x-delay
来定义延迟时间。
3. 消费消息
消费者在消息到达指定的延迟时间后可以消费。
代码示例
1. 发送延迟消息的生产者代码
const amqp =require('amqplib');asyncfunctionsendDelayedMessage(){const exchangeName ='delayed_exchange';const routingKey ='my_routing_key';const delayTime =5000;// 延迟 5 秒// 连接到 RabbitMQ 服务器const connection =await amqp.connect('amqp://localhost');const channel =await connection.createChannel();// 声明延迟交换机await channel.assertExchange(exchangeName,'x-delayed-message',{durable:true,arguments:{'x-delayed-type':'direct',// 交换机的基础类型},});const message ='Hello, this is a delayed message!';// 发送带有延迟的消息
channel.publish(exchangeName, routingKey, Buffer.from(message),{headers:{'x-delay': delayTime,// 设置延迟时间},});
console.log(`[x] Sent delayed message: "${message}" with delay: ${delayTime}ms`);// 关闭连接setTimeout(()=>{
connection.close();},1000);}sendDelayedMessage().catch(console.error);
2. 消费延迟消息的消费者代码
const amqp =require('amqplib');asyncfunctionconsumeDelayedMessage(){const exchangeName ='delayed_exchange';const queueName ='delayed_queue';const routingKey ='my_routing_key';// 连接到 RabbitMQ 服务器const connection =await amqp.connect('amqp://localhost');const channel =await connection.createChannel();// 声明队列并绑定到交换机await channel.assertQueue(queueName,{durable:true});await channel.bindQueue(queueName, exchangeName, routingKey);
console.log('[*] Waiting for messages in delayed queue. To exit press CTRL+C');// 消费消息
channel.consume(queueName,(msg)=>{if(msg !==null){
console.log(`[x] Received delayed message: "${msg.content.toString()}"`);
channel.ack(msg);// 手动确认消息}});}consumeDelayedMessage().catch(console.error);
- 生产者部分:- 使用
x-delayed-message
交换机,它允许消息在交换机中保留一段时间(通过x-delay
属性),再发布到相应的队列。- 通过设置消息属性headers: { 'x-delay': delayTime }
来指定延迟的时间。 - 消费者部分:- 声明一个队列并将其绑定到延迟交换机,消费者从队列中接收消息。- 当消息的延迟时间到达后,消息被投递到队列并由消费者处理。
总结
使用延迟插件可以简化 RabbitMQ 中延迟消息的实现。
通过
x-delayed-message
交换机和
x-delay
属性,开发者可以灵活地控制消息的延迟发送时间。这种方式常用于需要延迟执行某些任务的场景,例如订单超时处理、延迟通知、预约外卖时延迟预约消息推送到商家等。
版权归原作者 秀秀_heo 所有, 如有侵权,请联系我们删除。