0


【技术分享】四、RabbitMQ “延时队列”

前言

延时的含义为 等待一段时间,应用到RabbitMQ 消息 发布/订阅 模型中的概念就是,拿到消息后不想立即消费,等待一段时间再执行。
ex:

  1. 定时任务:十分钟后执行某种操作。
  2. 批量发送短信:用户量过大,一次性发送短信卡死,可以将几万条消息分布在10分钟内随机发送完成。

一、RabbitMQ “延时队列”概念

延迟队列存储的对象是对应的延迟消息,所谓“延迟消息”是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。

RabbitMQ 中并没有延时队列的概念,是通过 延时交换机与 死信队列实现。

二、实现RabbitMQ “延时队列”两种方式

1. 利用两个特性:TTL + DLX [A队列过期->转发给B队列] (此种方式有缺陷)

TTL,全称Time To Live,消息过期时间设置。若没有队列进行消息消费,此消息过期后将被丢弃。
RabbitMq只会检查第一个消息是否过期,如果过期则丢到死信队列。
ex:若有两条消息,第一个消息延迟20秒执行,第二个消息延迟10秒执行,但RabbitMq只会检测队首第一条消息的过期时间。这样就会造成第二条消息延迟30秒执行。

DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。

2. 采用 RabbitMq延时插件rabbitmq_delayed_message_exchange的方式。

为了解决 “队列阻塞”问题,新的插件发布了,再消息粒度上实现 消息过期控制。

三、RabbitMQ “延时队列”项目应用

1. 引入pom文件,并配置yml

<dependencies><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- web相关依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--  json相关依赖--><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency>

2. 下载插件

插件下载官方链接: rabbitmq_delayed_message_exchange
在这里插入图片描述

安装指南(以linux环境为例)
  1. 将rabbitmq_delayed_message_exchange-3.9.0.ez上传指定目录下使用unzip解压即可 安装目录: /rabbitmq/plugins在这里插入图片描述
  2. 完成第一步解压后,执行以下图中安装操作 开启插件:
    export PATH=$PATH:/opt/middle/rabbitmq/erlang/bin
    cd /opt/middle/rabbitmq/sbin
    ./rabbitmq-pluginsenablerabbitmq_delayed_message_exchange
  1. 查询安装状态 ./rabbitmq-plugins list
./rabbitmq-plugins list

3. 配置资源信息

importlombok.RequiredArgsConstructor;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.CustomExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.rabbit.connection.CachingConnectionFactory;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.support.converter.Jackson2JsonMessageConverter;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.beans.factory.config.ConfigurableBeanFactory;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importorg.springframework.context.annotation.Scope;importjava.util.HashMap;importjava.util.Map;/**
 * @author liuzz
 * @date 2023/5/18 0018下午 4:09
 */@Configuration("relevancyRabbitMqConfig")@RequiredArgsConstructor(onConstructor =@__(@Autowired))publicclassRelevancyRabbitMqConfig{privatefinalCachingConnectionFactory factory;/**
     * rabbitmq 下发计划延时队列
     **/publicstaticfinalString RELEVANCY_DELAYED_EXCHANGE ="saas.cbs.relevancy.delayed.exchange";/**
     * rabbitmq 下发延时队列订阅路由key
     **/publicstaticfinalString RELEVANCY_DELAYED_ROUTINGKEY ="saas.cbs.relevancy.delayed.routingkey";/**
     * rabbitmq 下发延时队列
     **/publicstaticfinalString RELEVANCY_DELAYED_QUEUE ="saas.cbs.relevancy.delayed.queue";@Bean("relevancyRabbitTemplate")@Scope(value =ConfigurableBeanFactory.SCOPE_PROTOTYPE)publicRabbitTemplaterabbitTemplate(){
        factory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);RabbitTemplate rabbitTemplate =newRabbitTemplate(factory);//开启发送失败退回
        rabbitTemplate.setMandatory(true);//消息转换器
        rabbitTemplate.setMessageConverter(newJackson2JsonMessageConverter());return rabbitTemplate;}//插件版本 -- 实现延迟队列@Bean("relevancyDelayedQueue")publicQueuerelevancyDelayedQueue(){returnnewQueue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE);}//定义延时交换机 -- 插件版本//指定交换器类型为 x-delayed-message //设置属性 x-delayed-type @Bean("relevancyDelayedExchange")publicCustomExchangerelevancyDelayedExchange(){Map<String,Object> args =newHashMap<>();
        args.put("x-delayed-type","direct");returnnewCustomExchange(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,"x-delayed-message",true,false, args);}/**
    * 绑定延时队列与交换机信息
    */@BeanpublicBindingbindingNotify(@Qualifier("relevancyDelayedQueue")Queue relevancyDelayedQueue,@Qualifier("relevancyDelayedExchange")CustomExchange relevancyDelayedExchange){returnBindingBuilder.bind(relevancyDelayedQueue).to(relevancyDelayedExchange).with(RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY).noargs();}}

4. 发送消息至交换机

@Slf4j@ComponentpublicclassRelevancyExecuteMqConsume{@Autowired@Qualifier("relevancyRabbitTemplate")RabbitTemplate rabbitTemplate;/**
     * @Desc: 发送下发计划过期MQ
     * @param relevancyFrsMqSendMsgBo
     * @param finalExpirationTime
     **/publicvoidsendSnapshotPlanMsg(RelevancyFrsMqSendMsgBo relevancyFrsMqSendMsgBo,Integer finalExpirationTime){MessagePostProcessor messagePostProcessor =newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{//1.设置message的信息
                message.getMessageProperties().setDelay(finalExpirationTime);//消息的过期时间//2.返回该消息return message;}};
        rabbitTemplate.convertAndSend(RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY,relevancyFrsMqSendMsgBo,messagePostProcessor);}}

5. 死信队列消费

@Slf4j@ComponentpublicclassRelevancyExecuteMqConsume{@Autowired@Qualifier("relevancyRabbitTemplate")RabbitTemplate rabbitTemplate;@RabbitListener(bindings ={@QueueBinding(value =@Queue(RelevancyDelayedConstant.RELEVANCY_DELAYED_QUEUE),
                    exchange =@Exchange(name =RelevancyDelayedConstant.RELEVANCY_DELAYED_EXCHANGE,type ="x-delayed-message"),
                    key=RelevancyDelayedConstant.RELEVANCY_DELAYED_ROUTINGKEY)})publicvoidprocess(Message message,Channel channel){//消费数据}}
标签: rabbitmq 分布式 java

本文转载自: https://blog.csdn.net/qq_35511685/article/details/131522162
版权归原作者 我是程序猿boxing 所有, 如有侵权,请联系我们删除。

“【技术分享】四、RabbitMQ “延时队列””的评论:

还没有评论