0


RabbitMQ 延时消息实现

延时消息实现的两种方式

  1. 设置队列过期时间:延迟队列消息过期 + 死信队列,所有消息过期时间一致
  2. 设置消息的过期时间:此种方式下有缺陷,MQ只会判断队列第一条消息是否过期,会导致消息的阻塞 需要额外安装 rabbitmq_delayed_message_exchange 插件才能解决此问题

前置条件:导入Spring 集成RabbitMQ MAEVN,并书写MQ相关配置

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.2.5.RELEASE</version></dependency>

2. 设置队列过期时间:延迟队列消息过期 + 死信队列

推送消息至延迟队列 -> 消息过期自动推送到死信队列 -> 消费死信队列

2.1. MQ配置信息

2.1.1. 自定义队列配置

…/bootstrap.yml

# rabbitmq自定义配置rabbitmq:ttlExchange: medical_dev_ttl_topic_change
  ttlKey: dev_ttl
  ttlQueue: medical.dev.ttl.topic.queue
  delayExpireTime:600ttlQueueSize:10000deadExchange: medical_dev_dead_topic_change
  deadKey: dev_dead
  deadQueue: medical.dev.dead.topic.queue
2.1.2. 读取自定义MQ配置信息
/**
 * amqp配置文件
 */@Data@Component@ConfigurationProperties(prefix ="rabbitmq")publicclassMyConfigProperties{/**
     * 延迟队列
     */publicString ttlExchange;publicString ttlKey;publicString ttlQueue;privateInteger delayExpireTime;publicInteger ttlQueueSize;/**
     * 死信队列
     */publicString deadExchange;publicString deadKey;publicString deadQueue;}

2.2. 配置文件自动生成队列

2.2.1. 延迟队列
importcom.awsa.site.mq.MyConfigProperties;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.annotation.Resource;importjava.util.HashMap;/**
 * 延迟队列配置文件
 * 
 * @author mingAn.xie
 */@ConfigurationpublicclassRabbitMQConfigTTL{@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@BeanpublicTopicExchangettlTopicExchange(){returnnewTopicExchange(myConfigProperties.getTtlExchange());}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@BeanpublicQueuettlTopicduanxinQueue(){HashMap<String,Object> args =newHashMap<>();// 给队列设置消息过期时间:毫秒值
        args.put("x-message-ttl", mqConfigProperties.getDelayExpireTime()*1000);// 设置队列最大长度
        args.put("x-max-length", myConfigProperties.getTtlQueueSize());// 设置死信队列交换机名称// 当消息在一个队列中变成死信后,它能就发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列被称之为死信队列// 编程死信队列的原因:消息被拒绝,消息过期,队列达到最大长度
        args.put("x-dead-letter-exchange", myConfigProperties.getDeadExchange());// 设置死信队列路由key
        args.put("x-dead-letter-routing-key", myConfigProperties.getDeadKey());returnnewQueue(myConfigProperties.getTtlQueue(),true,false,false, args);}// 3: 绑定对用关系@BeanpublicBindingttlTopicsmsBinding(){returnBindingBuilder.bind(ttlTopicduanxinQueue()).to(ttlTopicExchange()).with(myConfigProperties.getTtlKey());}}
2.2.2. 死信队列
importcom.awsa.site.mq.MyConfigProperties;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.annotation.Resource;/**
 * 死信队列配置文件
 * 
 * @author mingAn.xie
 */@ConfigurationpublicclassRabbitMQConfigDead{@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@BeanpublicTopicExchangedeadTopicExchange(){returnnewTopicExchange(myConfigProperties.getDeadExchange());}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@BeanpublicQueuedeadTopicduanxinQueue(){returnnewQueue(myConfigProperties.getDeadQueue(),true);}// 3: 绑定对用关系@BeanpublicBindingdeadTopicsmsBinding(){returnBindingBuilder.bind(deadTopicduanxinQueue()).to(deadTopicExchange()).with(myConfigProperties.getDeadKey());}}

2.3. 生产者推送消息

importcom.awsa.site.mq.MyConfigProperties;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/**
 * RabbitMQ生产者推送消息类
 * 
 * @author xiemingan
 */@Component@Slf4jpublicclassRabbitmqProducer{@ResourceprivateRabbitTemplate rabbitTemplate;@ResourceprivateMyConfigProperties myConfigProperties;/**
     * @param pushMessage 推送消息体
     */publicvoidpushTtlMessage(String pushMessage){// 推送消息至交换机,并指定路由key
        rabbitTemplate.convertAndSend(myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);
        log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}", myConfigProperties.getTtlExchange(), myConfigProperties.getTtlKey(), pushMessage);}}

2.4. 消费者处理消息

importlombok.extern.log4j.Log4j2;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importorg.springframework.util.StringUtils;/**
 * @author mingAn.xie
 */@Log4j2@ComponentpublicclassRabbitmqConsumer{/**
     * 消费死信队列
     * @param message 消息体
     */@RabbitListener(queues ="${rabbitmq.deadQueue}")publicvoidpushMessages(Message message){String body =newString(message.getBody()).trim();if(StringUtils.isEmpty(body)){return;}
        log.info("MQ消息消费, RabbitmqConsumer.pushMessages() : {}", body);}}

3. 设置消息的过期时间

设置交换机类型为

x-delayed-type

,推送消息至交换机,直连队列消费

3.1. 安装插件

rabbitmq_delayed_message_exchange

前言:这里默认使用环境为

Liunx 

系统

Docker 

安装

RabbitMQ

具体可以参考这篇文章:Docker 安装 RabbitMQ 挂载配置文件

安装插件版本需要与RabbitMQ版本一致,否则可能会导致安装失败,可先进入RabbitMQ容器中查看其他插件版本

插件各版本地址: https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

  • 这里以最新版本 v3.13.0 举例
# 下载插件wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez

# 将插件复制进容器中: rabbitmq_xxxxxxdockercp rabbitmq_delayed_message_exchange-3.13.0.ez rabbitmq_xxxxxx:/plugins

# 进入容器: rabbitmq_xxxxxxdockerexec-it rabbitmq_xxxxxx bashcd plugins

# 查询插件列表, 此处可看到插件的版本
rabbitmq-plugins list

# 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 交换机类型中出现 x-delayed-type 表示安装成功

在这里插入图片描述

3.2. MQ配置信息

3.2.1. 自定义队列配置

…/bootstrap.yml

#mq队列自定义配置rabbitmq:saveTaskTtlExchange: ey240001_pro_save_task_ttl_topic_exchange
  saveTaskTtlKey: ey240001_pro_save_task_ttl
  saveTaskTtlQueue: ey240001.pro.save.task.ttl.topic.queue
  saveTaskTtlQueueSize:10000
3.2.2. 读取自定义MQ配置信息
/**
 * amqp配置文件
 *
 * @author mingAn.xie
 */@Data@Component@ConfigurationProperties(prefix ="rabbitmq")publicclassMyConfigProperties{/**
     * 任务待办生成延时队列
     */publicString saveTaskTtlExchange;publicString saveTaskTtlKey;publicString saveTaskTtlQueue;publicInteger saveTaskTtlQueueSize;}

3.3. 配置文件生成

x-delayed-type

交换机

importcom.awsa.site.mq.MyConfigProperties;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjavax.annotation.Resource;importjava.util.HashMap;importjava.util.Map;/**
 * x-delayed-type 交换机延迟队列配置
 * 
 * @author mingAn.xie
 */@ConfigurationpublicclassRabbitMQConfigSaveTaskTtl{@ResourceMyConfigProperties myConfigProperties;// 1: 声明交换机@BeanpublicCustomExchangesaveTaskTopicExchange(){Map<String,Object> args =newHashMap<>();// 设置延迟队列插件类型:按过期时间消费
        args.put("x-delayed-type","direct");// 参数:name 交换机名称,type 交换机类型,durable 是否持久化,autoDelete 是否自动删除,arguments 参数returnnewCustomExchange(myConfigProperties.getSaveTaskTtlExchange(),"x-delayed-message",true,false, args);}// 2: 声明队列// durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。@BeanpublicQueuesaveTaskTopicduanxinQueue(){returnnewQueue(myConfigProperties.getSaveTaskTtlQueue(),true,false,false);}// 3: 绑定对用关系@BeanpublicBindingsaveTaskTopicsmsBinding(){returnBindingBuilder.bind(saveTaskTopicduanxinQueue()).to(saveTaskTopicExchange()).with(myConfigProperties.getSaveTaskTtlKey()).noargs();}}

3.4. 生产者推送消息

importcom.awsa.site.mq.MyConfigProperties;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.stereotype.Component;importjavax.annotation.Resource;/**
 * 生产者推送消息类
 * 
 * @author xiemingan
 */@Component@Slf4jpublicclassRabbitmqProducer{@ResourceprivateRabbitTemplate rabbitTemplate;@ResourceprivateMyConfigProperties myConfigProperties;/**
     * @param pushMessage 推送消息体
     * @param ttlTime     延时时间(毫秒值)
     */publicvoidpushTtlMessage(String pushMessage,long ttlTime){
        ttlTime = ttlTime <=0?1000: ttlTime;// 3.1.推送MQ延迟消息队列long finalTtlTime = ttlTime;MessagePostProcessor messagePostProcessor = message ->{// 设置延迟时间
            message.getMessageProperties().setDelay((int) finalTtlTime);return message;};
        rabbitTemplate.convertAndSend(myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, messagePostProcessor);
        log.info("MQ消息推送队列, exchange: {}, key: {}, message: {}, ttlTime: {}", myConfigProperties.getSaveTaskTtlExchange(), myConfigProperties.getSaveTaskTtlKey(), pushMessage, ttlTime);}}

3.5. 消费者处理消息

importlombok.extern.log4j.Log4j2;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importorg.springframework.util.StringUtils;/**
 * @author mingAn.xie
 */@Log4j2@ComponentpublicclassRabbitmqConsumer{/**
     * 消费延时消息
     * @param message 消息体
     */@RabbitListener(queues ="${rabbitmq.saveTaskTtlQueue}")publicvoidpushMessages(Message message){String body =newString(message.getBody()).trim();if(StringUtils.isEmpty(body)){return;}
        log.info("MQ延迟消息消费, RabbitmqConsumer.pushMessages() : {}", body);}}
标签: rabbitmq java

本文转载自: https://blog.csdn.net/weixin_46759279/article/details/137142279
版权归原作者 Pole丶逐 所有, 如有侵权,请联系我们删除。

“RabbitMQ 延时消息实现”的评论:

还没有评论