前言
延时的含义为 等待一段时间,应用到RabbitMQ 消息 发布/订阅 模型中的概念就是,拿到消息后不想立即消费,等待一段时间再执行。
ex:
- 定时任务:十分钟后执行某种操作。
- 批量发送短信:用户量过大,一次性发送短信卡死,可以将几万条消息分布在10分钟内随机发送完成。
一、RabbitMQ “延时队列”概念
延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
RabbitMQ 中并没有延时队列的概念,是通过 延时交换机与 死信队列实现。
二、实现RabbitMQ “延时队列”两种方式
1. 利用两个特性:TTL + DLX [A队列过期->转发给B队列] (此种方式有缺陷)
TTL,全称Time To Live,消息过期时间设置。若没有队列进行消息消费,此消息过期后将被丢弃。
但RabbitMq只会检查第一个消息是否过期,如果过期则丢到死信队列。
ex:若有两条消息,第一个消息延迟20秒执行,第二个消息延迟10秒执行,但RabbitMq只会检测队首第一条消息的过期时间。这样就会造成第二条消息延迟30秒执行。
DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
2. 采用 RabbitMq延时插件rabbitmq_delayed_message_exchange的方式。
为了解决 “队列阻塞”问题,新的插件发布了,再消息粒度上实现 消息过期控制。
三、RabbitMQ “延时队列”项目应用
1. 引入pom文件,并配置yml
<dependencies><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- web相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!-- json相关依赖--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency>
2. 下载插件
插件下载官方链接: rabbitmq_delayed_message_exchange
安装指南(以linux环境为例)
- 将rabbitmq_delayed_message_exchange-3.9.0.ez上传指定目录下使用unzip解压即可 安装目录: /rabbitmq/plugins
- 完成第一步解压后,执行以下图中安装操作 开启插件:
export PATH=$PATH:/opt/middle/rabbitmq/erlang/bin
cd /opt/middle/rabbitmq/sbin
./rabbitmq-pluginsenablerabbitmq_delayed_message_exchange
- 查询安装状态 ./rabbitmq-plugins list
./rabbitmq-plugins list
3. 配置资源信息
importlombok.RequiredArgsConstructor;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.CustomExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.beans.factory.config.ConfigurableBeanFactory;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Scope;importjava.util.HashMap;importjava.util.Map;/**
* @author liuzz
* @date 2023/5/18 0018下午 4:09
*/@Configuration("relevancyRabbitMqConfig")@RequiredArgsConstructor(onConstructor =@__(@Autowired))publicclassRelevancyRabbitMqConfig{privatefinalCachingConnectionFactory factory;/**
* rabbitmq 下发计划延时队列
**/publicstaticfinalString RELEVANCY_DELAYED_EXCHANGE ="saas.cbs.relevancy.delayed.exchange";/**
* rabbitmq 下发延时队列订阅路由key
**/publicstaticfinalString RELEVANCY_DELAYED_ROUTINGKEY ="saas.cbs.relevancy.delayed.routingkey";/**
* rabbitmq 下发延时队列
**/publicstaticfinalString RELEVANCY_DELAYED_QUEUE ="saas.cbs.relevancy.delayed.queue";@Bean("relevancyRabbitTemplate")@Scope(value =ConfigurableBeanFactory.SCOPE_PROTOTYPE)publicRabbitTemplaterabbitTemplate(){
factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);RabbitTemplate rabbitTemplate =newRabbitTemplate(factory);//开启发送失败退回
rabbitTemplate.setMandatory(true);//消息转换器
rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());return rabbitTemplate;}//插件版本 -- 实现延迟队列@Bean("relevancyDelayedQueue")publicQueuerelevancyDelayedQueue(){returnnewQueue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE);}//定义延时交换机 -- 插件版本//指定交换器类型为 x-delayed-message //设置属性 x-delayed-type @Bean("relevancyDelayedExchange")publicCustomExchangerelevancyDelayedExchange(){Map<String,Object> args =newHashMap<>();
args.put("x-delayed-type","direct");returnnewCustomExchange(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,"x-delayed-message",true,false, args);}/**
* 绑定延时队列与交换机信息
*/@BeanpublicBindingbindingNotify(@Qualifier("relevancyDelayedQueue")Queue relevancyDelayedQueue,@Qualifier("relevancyDelayedExchange")CustomExchange relevancyDelayedExchange){returnBindingBuilder.bind(relevancyDelayedQueue).to(relevancyDelayedExchange).with(RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY).noargs();}}
4. 发送消息至交换机
@Slf4j@ComponentpublicclassRelevancyExecuteMqConsume{@Autowired@Qualifier("relevancyRabbitTemplate")RabbitTemplate rabbitTemplate;/**
* @Desc: 发送下发计划过期MQ
* @param relevancyFrsMqSendMsgBo
* @param finalExpirationTime
**/publicvoidsendSnapshotPlanMsg(RelevancyFrsMqSendMsgBo relevancyFrsMqSendMsgBo,Integer finalExpirationTime){MessagePostProcessor messagePostProcessor =newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{//1.设置message的信息
message.getMessageProperties().setDelay(finalExpirationTime);//消息的过期时间//2.返回该消息return message;}};
rabbitTemplate.convertAndSend(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY,relevancyFrsMqSendMsgBo,messagePostProcessor);}}
5. 死信队列消费
@Slf4j@ComponentpublicclassRelevancyExecuteMqConsume{@Autowired@Qualifier("relevancyRabbitTemplate")RabbitTemplate rabbitTemplate;@RabbitListener(bindings ={@QueueBinding(value =@Queue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE),
exchange =@Exchange(name =RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,type ="x-delayed-message"),
key=RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY)})publicvoidprocess(Message message,Channel channel){//消费数据}}
版权归原作者 我是程序猿boxing 所有, 如有侵权,请联系我们删除。