引言:
什么是延时任务?
在我们生活中比较常业务见场景的如下:
- 红包 24 小时未被查收,需要延迟执退还业务;
- 订单下单之后 30 分钟后,用户如果没有付钱,系统需要自动取消订单
在如上场景中延时周期为固定时段。
另外还有一种延时周期不固定的业务场景如下:
业务场景:
平台方发布预运行计划,预计划中需限定申报截至时间,
参与方需在截止前申报各自实际运行计划给平台后由平台统筹运行计划安排。
需求分析:
由于是预计划,会存在截至时间不确定的因素。如:在某月1日做了当月4日的预计划,4日计划申报截止时间为3日14:00;但当月2日又新增/更改了当月3日的预计划截至时间,并且每天申报截止时间不是固定时间。因此可能会出现后加的延时任务比之前的延时任务先到期的情况。
延时任务常见方案:
参考:延时队列的几种实现方案比较
其中 RabbitMQ(TTL+DLX)死信队列的设计目的是为了存储没有被正常消费的消息,便于排查和重新投递。
死信队列同样也没有对投递时间做出保证,在第一条消息成为死信之前,后面的消息即使过期也不会投递为死信。
为了解决这个问题,在RabbitMQ3.5.7及以后的版本提供了 rabbitmq-delayed-message-exchange 插件来做延时消息任务。
本文采用RabbitMQ 延时任务插件。
除了上文中提到的几种延时任务方案,另还有RocketMQ 延时队列
rocketmq在发送延时消息时,是先把消息按照延迟时间段发送到指定的队列中(把延时时间段相同的消息放到同⼀个队列中,
保证了消息处理的顺序性,可以让同⼀个队列中消息延时时间是相同的,整个RocketMQ中延时消息时按照递增顺序排序,
保证信息处理的先后顺序性。)。
之后,通过⼀个定时器来轮询处理这些队列⾥的信息,判断是否到期。对于到期的消息会发送到相应的处理队列中,进⾏处理。注意 :⽬前RocketMQ只⽀持特定的延时时间段,1s,5s,10s,…2h,不能⽀持任意时间段的延时设置。
RabbitMQ 延时插件使用:
1. 插件安装:
参考:Docker 安装 RabbitMQ 并安装延迟队列插件
2. Demo相关版本说明:
RabbitMQ 3.10.2
Springboot 2.7.0
JDK 1.8
3. 代码配置
3.1 RabbitMQ配置
3.1.1 yml文件
server:port:9888spring:rabbitmq:host:#rabbitmq多租户概念,为用户创建的虚拟机(数据隔离)virtual-host:port:5672username:password:# none:表示禁用发布确认模式,默认值,使用此模式之后,不管消息有没有发送到Broker都不会触发ConfirmCallback回调。# correlated:表示消息成功到达Broker后触发ConfirmCalllBack回调# simple模式下如果消息成功到达Broker后一样会触发ConfirmCalllBack回调,# 发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,# 根据返回结果来判定下一步的逻辑,注意:waitForConfirmsOrDie方法如果返回false则会关闭channel信道,则接下来无法发送消息到broker。publisher-confirm-type: correlated
listener:simple:acknowledge-mode: manual #开启手动确认机制
3.1.2 RabbitmqConfig
/**
* @author diqiang
* @version v1.0
* @date 2022/6/20 22:22
*/@Configuration@Slf4j@RequiredArgsConstructorpublicclassRabbitmqConfig{privatefinalConnectionFactory connectionFactory;@BeanpublicRabbitTemplaterabbitTemplate(){RabbitTemplate rabbitTemplate =newRabbitTemplate(connectionFactory);//异步确认回调
rabbitTemplate.setConfirmCallback(confirmCallback());return rabbitTemplate;}/**
* 异步确认回调
* @return RabbitTemplate.ConfirmCallback
*/@BeanpublicRabbitTemplate.ConfirmCallbackconfirmCallback(){return(correlationData, b, s)->{if(b){
log.info("{}消息发送给mq成功",correlationData);
log.info("---------------------------------");}else{
log.warn("消息{} 发送给mq失败",s);//todo 失败处理逻辑,写库记录及重试机制}};}}
3.2 队列配置
/**
* @author diqiang
* @version v1.0
* @date 2022/6/27 14:38
*/@Configuration@Slf4j@RequiredArgsConstructorpublicclassDelayedQueueConfig{publicstaticfinalString Q_DELAYED ="q.delayed";publicstaticfinalString EX_DELAYED ="ex.delayed";publicstaticfinalString R_DELAYED ="r.delayed";/**
* 声明自定义交换机
* */@BeanpublicCustomExchangedelayedExchange(){Map<String,Object> args =newHashMap<>(1);
args.put("x-delayed-type","direct");returnnewCustomExchange(EX_DELAYED,"x-delayed-message",true,false,args);}/**
* 声明队列
* */@BeanpublicQueuedelayedQueue(){returnnewQueue(Q_DELAYED);}/**
* 绑定交换机和队列
* */@BeanpublicBindingdelayedQueueBinding(@Qualifier("delayedQueue")Queue delayedQueue,@Qualifier("delayedExchange")CustomExchange delayedExchange){returnBindingBuilder.bind(delayedQueue).to(delayedExchange).with(R_DELAYED).noargs();}}
3.3 消息消费者
/**
* @author diqiang
* @version v1.0
* @date 2022/6/27 16:21
*/@Component@Slf4jpublicclassDelayQueueListener{@RabbitListener(queues =DelayedQueueConfig.Q_DELAYED)@RabbitHandlerpublicvoidonMessage(Message message,Channel channel)throwsIOException{
log.info("延时队列消费信息ID:{}",message.getMessageProperties().getMessageId());
log.info("延时队列消费消息内容:{}",newString(message.getBody()));
log.info("---------------------------------");//todo 做业务处理逻辑
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
3.4 Controller模拟创建延时任务
/**
* @author diqiang
* @version v1.0
* @date 2022/6/22 19:05
*/@RestController@RequiredArgsConstructor@Slf4j@RequestMapping("msg")publicclassMsgController{privatefinalRabbitTemplate rabbitTemplate;/**
* 创建延时消息
* @param msg 消息内容
* @param delay 延时时间(ms)
* @return String
*/@PostMapping("create-delay")publicStringcreateDelay(String msg,Integer delay){String msgId = UUID.randomUUID().toString();MessageProperties messageProperties =newMessageProperties();
messageProperties.setMessageId(msgId);
messageProperties.setCorrelationId(msgId);
messageProperties.setDelay(delay);Message message =newMessage(msg.getBytes(),messageProperties);CorrelationData correlationData =newCorrelationData();
correlationData.setId(msgId);
log.info("消息ID:{}",msgId);
log.info("内容:{}",msg);
rabbitTemplate.convertAndSend(DelayedQueueConfig.EX_DELAYED,DelayedQueueConfig.R_DELAYED,message,correlationData);return"success";}}
注意: 使用rabbitmq-delayed-message-exchange延迟队列插件设置Mandatory=true,消息无法路由。
会报错:NO_ROUTE,但消息仍会被队列消费。
原因:
延时消息是从磁盘读取消息然后发送(后台任务),发送消息的时候无法保证两点:
1、发送时消息路由的队列还存在
2、发送时原连接仍然支持回调方法
消息写磁盘和从磁盘读取消息发送存在时间差,两个时间点的队列和连接情况可能不同。所以不支持Mandatory设置。
3.5 Demo运行结果:
说明:
通过postman发送了两次请求,分别为
msg=此消息是10s延时&delay=10000
和
msg=此消息是5s延时&delay=5000
打印日志如下:
2022-06-28 17:24:23.582 INFO 15808 --- [ XNIO-1 task-1] c.j.r.controller.MsgController : 消息ID:53cd874e-435d-4c39-a137-170541be03e7
2022-06-28 17:24:23.582 INFO 15808 --- [ XNIO-1 task-1] c.j.r.controller.MsgController : 内容:此消息是10s延时
2022-06-28 17:24:23.598 INFO 15808 --- [nectionFactory2] c.j.rabbitmqdemo.config.RabbitmqConfig : CorrelationData [id=53cd874e-435d-4c39-a137-170541be03e7]消息发送给mq成功
2022-06-28 17:24:23.598 INFO 15808 --- [nectionFactory2] c.j.rabbitmqdemo.config.RabbitmqConfig : ---------------------------------
2022-06-28 17:24:24.594 INFO 15808 --- [ XNIO-1 task-1] c.j.r.controller.MsgController : 消息ID:4b513b66-4117-415a-85ed-780cd0144512
2022-06-28 17:24:24.594 INFO 15808 --- [ XNIO-1 task-1] c.j.r.controller.MsgController : 内容:此消息是5s延时
2022-06-28 17:24:24.607 INFO 15808 --- [nectionFactory2] c.j.rabbitmqdemo.config.RabbitmqConfig : CorrelationData [id=4b513b66-4117-415a-85ed-780cd0144512]消息发送给mq成功
2022-06-28 17:24:24.607 INFO 15808 --- [nectionFactory2] c.j.rabbitmqdemo.config.RabbitmqConfig : ---------------------------------
2022-06-28 17:24:29.608 INFO 15808 --- [ntContainer#1-1] c.j.r.listener.DelayQueueListener : 延时队列消费信息ID:4b513b66-4117-415a-85ed-780cd0144512
2022-06-28 17:24:29.608 INFO 15808 --- [ntContainer#1-1] c.j.r.listener.DelayQueueListener : 延时队列消费消息内容:此消息是5s延时
2022-06-28 17:24:29.608 INFO 15808 --- [ntContainer#1-1] c.j.r.listener.DelayQueueListener : ---------------------------------
2022-06-28 17:24:33.600 INFO 15808 --- [ntContainer#1-1] c.j.r.listener.DelayQueueListener : 延时队列消费信息ID:53cd874e-435d-4c39-a137-170541be03e7
2022-06-28 17:24:33.600 INFO 15808 --- [ntContainer#1-1] c.j.r.listener.DelayQueueListener : 延时队列消费消息内容:此消息是10s延时
2022-06-28 17:24:33.600 INFO 15808 --- [ntContainer#1-1] c.j.r.listener.DelayQueueListener : ---------------------------------
注:如有疑问,欢迎留言指正
版权归原作者 qiang_gege 所有, 如有侵权,请联系我们删除。