标题基于RabbitMQ实现定时任务
1.首先确保项目安装了 rabbitMQ 的相关依赖,打开 pom.xml 文件添加依赖
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
2.application.properties 文件配置
#rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
3.创建 RabbitMQConfig 类
这里使用的是 rabbitmq 的延时消息插件所以在声明交换机的时候需将其 type 声明为 “x-delayed-message“
@ConfigurationpublicclassRabbitMQConfig{// 声明 1 个交换机 1个路由key 1个队列//延迟交换机publicstaticfinalStringDELAY_EXCHANGE_NAME="delay.exchange";//延迟队列publicstaticfinalStringDELAY_QUEUE_NAME="delay.queue";//路由keypublicstaticfinalStringDELAY_QUEUE_ROUTING_KEY="delay.queue.routing.key";//声明延迟队列@Bean("delayQueue")publicQueuedelayQueue(){returnnewQueue(DELAY_QUEUE_NAME);}//声明延迟交换机@Bean("delayExchange")publicCustomExchangedelayExchange(){Map<String,Object> args =newHashMap<>();
args.put("x-delayed-type","direct");returnnewCustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);}//声明延迟队列的绑定关系@BeanpublicBindingdelayBinding(@Qualifier("delayQueue")Queue queue,@Qualifier("delayExchange")CustomExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUE_ROUTING_KEY).noargs();}}
4.创建消息的发布者 DelayMessageProducer
@ComponentpublicclassDelayMessageProducer{@ResourceprotectedRabbitTemplate rabbitTemplate;/**
*
* @param message 消息体
* @param delayTime 延时时间
*/publicvoidsend(String message,long delayTime){
rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME,DELAY_QUEUE_ROUTING_KEY,message,msg ->{// msg.getMessageProperties().setDelay(delayTime);
msg.getMessageProperties().getHeaders().put("x-delay",delayTime);return msg;});}}
5.创建消息的消费者 DelayMessageConsumer
@Component@Slf4jpublicclassDelayMessageConsumer{//监听 DELAY_QUEUE_NAME 队列@RabbitListener(queues =DELAY_QUEUE_NAME)publicvoidreceive(Message message){// 获取消息String msg =newString(message.getBody());// 记录日志
log.info("延迟队列收到消息:{}", msg);// 具体的消费逻辑}
6.由于 rabbitmq 的延时时间有上限,超过则立刻消费,如何解决?
1.创建消息时判断传入的延时时间是否大于延时时间最大值,大于则在消息体中传入个 false 参数,小于等于则传入 true 参数,示例代码:
@Component@Slf4jpublicclassPublishMessage{protectedstaticfinalintMAX_DELAY_SEGMENT=Integer.MAX_VALUE;@ResourceprivateDelayMessageProducer producer;publicbooleanpullMessage(String id,Date startAt,Date endAt){Date now =newDate();boolean flat =false;//判断执行时间是否在当前时间之后,调用接口传入执行时间,消息体if(startAt !=null&& startAt.compareTo(now)>0){long delayTime = startAt.getTime()- now.getTime();if(delayTime >MAX_DELAY_SEGMENT){
log.info("开始延时时间超出,进行分段发布");
producer.send(id +","+ startAt +",false",MAX_DELAY_SEGMENT);}else{
producer.send(id +","+ startAt +",true", delayTime);}
flat =true;}}
2.消费者拿出数据时,判断该参数是 true 还是 false,如果是 true 则直接执行消费逻辑,如果为 false 则需判断执行时间减去当前时间是否还大于延时时间最大值,如果还大于延时最大时间,继续传参 false ,等待下次消费,示例代码:
@RabbitListener(queues =DELAY_QUEUE_NAME)publicvoidreceive(Message message){// 获取消息String msg =newString(message.getBody());// 记录日志
log.info("延迟队列收到消息:{}", msg);//获取消息id 和 执行时间 ,参数String[] split = msg.split(",");String id = split[0];String time = split[1];DateTime dateTime =DateUtil.parseCST(time);Boolean flat =Boolean.valueOf(split[2]);if(flat){// 消费 notice 消息consumeNotice(id,dateTime);}else{long delayTime = dateTime.getTime()-System.currentTimeMillis();// 如果还大于延时最大时间,继续传参 false ,等待下次消费if(delayTime >MAX_DELAY_SEGMENT){
producer.send(id +","+ delayTime +",false",MAX_DELAY_SEGMENT);}else{// log.info("消息:{}",id + "," + time + ",true");
producer.send(id +","+ time +",true", delayTime);}}}
版权归原作者 qq_53639759 所有, 如有侵权,请联系我们删除。