延时队列
RabbitMQ是目前最为流行的消息队列之一,它的高可靠性、高可用性和高性能使得它成为众多应用场景下的首选。在实际应用中,我们经常需要实现延时队列来解决一些业务问题,比如订单超时未支付自动取消等。本文将介绍如何使用RabbitMQ实现延时队列
下面先来解释一下
延时队列(也可以称为延迟队列,其实都是一个意思):
延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
延时消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才收到消息
延时任务:设置在一定时间之后才执行的任务
死信:
当一个队列中的消息满足下列情况之一时,就会成为死信(dead letter):
1、消费者使用basic.reject或basic.nack声明消息消费失败,并且消息的requeue参数设置为false
2、消息是一个过期消息(达到了队列或消息本身设置的过期时间),超时无人消费
3、要投递的队列消息堆积满了,最早的消息可能成为死信
延时队列可以用于以下场景:
- 订单处理:在电商网站中,订单处理是一个常见的业务流程。如果订单需要立即处理,可以使用RabbitMQ的延时队列来实现延迟处理。例如,可以将订单发送到一个延时队列中,并设置一个延迟时间(例如30分钟),然后在延迟时间到达后,将订单从队列中取出并进行处理。
- 消息推送:在移动应用或Web应用程序中,可以使用RabbitMQ的延时队列来实现消息推送。例如,可以将用户订阅的消息发送到一个延时队列中,并设置一个延迟时间(例如1小时),然后在延迟时间到达后,将消息从队列中取出并推送给用户。
- 定时任务:在分布式系统中,可以使用RabbitMQ的延时队列来实现定时任务。例如,可以将需要定期执行的任务发送到一个延时队列中,并设置一个延迟时间(例如每天),然后在延迟时间到达后,将任务从队列中取出并执行。
- 数据备份:在数据库中,可以使用RabbitMQ的延时队列来实现数据备份。例如,可以将需要备份的数据发送到一个延时队列中,并设置一个延迟时间(例如每天),然后在延迟时间到达后,将数据从队列中取出并进行备份。
- 优惠券发放:您可以设置一个延时队列,将优惠券发放任务添加到队列中,设置一定的延时时间,以保证优惠券在特定时间后才能被消费。
- 动态路由:您可以使用延时队列来实现动态路由的功能,将消息发送到延时队列中,并设置一定的路由规则,以实现消息在特定时间后被路由到不同的目标队列中。
业务场景:
我们通常会在电商网站中(或者app比如:京东,淘宝)进行下单,购买商品,但是我们由于没哟及时支付,会出现订单超时未支付自动取消的情况
下面用一张简单的图片来设计一下业务场景:
那我们该如何去实现延时队列呢,下面用一张图片给大家解释一下
话不多说,上代码!!!
作者在这里只创建了一个交换机,这个交换机可以同时绑定两个队列(有两个队列,一个队列设置了它的ttl(消息过期时间),同时设置了消息过期后的路由交换机和路由的routeKey,如果不设置过期策略那么消息过期之后就会进入死信队列,另外一个队列是普通队列,监听的时候只用去监听普通队列,达到延迟队列的效果。跟上图效果一样,消息通过这个交换机到达设置了过期时间的的队列,这个延迟队列没有消费者进行消费,当消息过期之后,会通过这个交换机路由到正常的队列,然后进行消费)
导入依赖
<dependencies>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-all</artifactId>
<version>5.8.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
配置类
package com.atguigu.gulimall.auth.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* @Author YanShuLing
* @Package:com.atguigu.gulimall.auth.config
* @Project: brook
* @Description TODO
* @name:RabbitMQConfig
* @Date 2024/3/8:9:56
*/
@Configuration
public class RabbitMqConfig {
//创建了一个简单的队列
@Bean
public Queue createOrderReleaseQueue(){
return new Queue("gmall.order.release.queue");
}
//这个是一个延时队列
@Bean
public Queue createOrderDeadQueue(){
Map<String,Object> map = new HashMap<>();
//队列消息的过期时间为十秒
map.put("x-message-ttl",10000);
//交换机
map.put("x-dead-letter-exchange","gmall-order-exchange");
//路由key
map.put("x-dead-letter-routing-key","gmall.order.release.queue");
return new Queue("gmall.order.dead.queue",true,false,false,map);
}
//交换机
@Bean
public Exchange createOrderExchange(){
return new DirectExchange("gmall-order-exchange");
}
//交换机和正常队列绑定
@Bean
public Binding createOrderReleaseBind(){
return new Binding("gmall.order.release.queue",Binding.DestinationType.QUEUE,
"gmall-order-exchange","gmall.order.release.queue",null
);
}
//交换机和延迟队列绑定
@Bean
public Binding createOrderDeadBind(){
return new Binding("gmall.order.dead.queue",Binding.DestinationType.QUEUE,
"gmall-order-exchange","gmall.order.dead.queue",null
);
}
}
生产者(作者写了一个发送验证码的代码):
@PostMapping("/createOrder")
public R createOrder(String mobile){
//生成随机的四位数(验证码)
String code = RandomUtil.randomNumbers(4);
//redis给这个验证码设置过期时间为5分钟
redisTemplate.opsForValue().set("send_sms_"+mobile,code,5, TimeUnit.MINUTES);
String content = StrFormatter.format(Constants.SMS_TEMPLATE,code);
//给这个消息生成一个唯一标识,为了解决消息重复消费问题
String messageId = IdUtil.randomUUID();
//生产者发送消息,第一个参数是路由交换机,第二个参数是路由键,作者设置了跟死信队列一样的
名称,无伤大雅
rabbitRemplate.convertAndSend("gmall-order-exchange","gmall.order.dead.queue",
JSON.toJSONstring(new SmsParamVo(mobile,content,messageId)));
//发送验证码,日志打印
log.info("发送延迟消息给ttl队列,当前时间:{},消息内容:{}",new Date().toString(),content);
// smsService.sendSms(mobile,content);
return R.ok("成功");
}
消费者:用来监听消息
//消费者监听队列为gmall.order.release.queue队列的消息
@RabbitListener(queues = {"gmall.order.release.queue"})
@Component
@Slf4j
public class SmsListener {
private final SmsService smsService;
private final RedisTemplate redisTemplate;
public SmsListener(SmsService smsService, RedisTemplate redisTemplate) {
this.smsService = smsService;
this.redisTemplate = redisTemplate;
}
@RabbitHandler
public void sendSms(String string, Channel channel, Message message){
//消息标签
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
SmsParamVo smsParamVo = JSON.parseObject(string, SmsParamVo.class);
if(redisTemplate.hasKey(smsParamVo.getMsgId())){
//拿到消息的唯一标签,如果是已经消费过的消息,直接拒绝签收
channel.basicReject(deliveryTag,false);
return;
}
//打印日志
log.info("发送延迟消息给ttl队列,当前时间:{},消息内容:{}",new Date().toString(),smsPar
mVo);
//调用发送短信
// smsService.sendSms(smsParamVo.getMobile(),smsParamVo.getContext());
redisTemplate.opsForValue().set(smsParamVo.getMsgId(),smsParamVo.getMsgId(),12, TimeUnit.HOURS);
//确认签收,消息会从队列中删除
channel.basicAck(deliveryTag,false);
} catch (IOException e) {
try {
if(deliveryTag<=3){
//如果是由于某种特殊原因,消息没有发送成功,然后重回队列,
channel.basicNack(deliveryTag,false,true);
}
//当重试次数达到一定的数量,就放进死信队列
channel.basicNack(deliveryTag,false,false);
} catch (IOException ex) {
throw new RuntimeException(ex);
}
throw new RuntimeException(e);
}
}
}
测试发送之前,我们先来到rabbitMq可视化界面观察一下
下面我们来测试一下,作者使用的是Postman
看看后台日志打印,我们可以看到我们已经实现了延迟消息的效果
还有一种方式也可以实现延迟消息
那就是延迟消息插件,RabbitMQ的官方也推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后可以暂存一定时间,到期后再投递到队列
1、前往RabbitMQ官网下载往RabbitMQ添加延迟消息的插件
RabbitMQ官网下载插件的网址:Community Plugins | RabbitMQ
2、下载rabbitmq_delayer_message_exchange插件(注:RabbitMQ是什么版本的,下载的插件就得是什么版本的,得对应上,以下截图为官方文档的对插件版本的要求说明
这里作者的版本是3.9.13所以,作者就下载3.9版本的
选择3.9版本
![](https://img-blog.csdnimg.cn/direct/7d0ed958f5bb4628abfd7c5873ff7908.png)
3、把这个插件传输到服务器上
4、拷贝下载好的插件到容器中
docker cp rabbitmq_delayed_message_exchange-3.9.0.ez rabbitmq:/opt/rabbitmq/plugins/rabbitmq_delayed_message_exchange-3.9.0.ez
可以看到我已经copy到容器内部了
5、安装延迟队列插件
进入RabbitMQ安装目录的目录下
//进入容器内部
docker exec -it rabbitmq /bin/bash
进入安装目录
cd /opt/rabbitmq/plugins
使用如下命令启用延迟插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
如下我们就安装好了,然后我们重启rabbitmq容器
使用 exit 命令退出容器
使用docker restart rabbitmq 重启容器
我们来rabbitmq的可视化界面查看
这样说明我们的延迟插件就安装好啦!
到此就结束啦!希望可以帮到你,可以帮作者点个关注和小心心嘛!你们的支持就是我最大的动力,以后也会努力更新的哦!
版权归原作者 颜书凌 所有, 如有侵权,请联系我们删除。