0


RabbitMQ:延迟队列

✨ RabbitMQ:延迟队列

📃个人主页:不断前进的皮卡丘
🌞博客描述:梦想也许遥不可及,但重要的是追梦的过程,用博客记录自己的成长,记录自己一步一步向上攀登的印记
🔥个人专栏:消息中间件

1.延迟队列基本介绍

一般队列中的元素总是希望能够早点被取出来进行处理,但是延迟队列中的元素则是希望可以在指定时间内被取出和处理,延迟队列中的元素都是带有时间属性的。延迟队列就是用来存放需要在指定时间被处理的元素的队列
在这里插入图片描述

延迟队列就是想要消息延迟一段时间后被处理,TTL可以让消息在延迟一段时间后变成死信。变成死信的消息都会被投递到死信队列中,这样的话,只要消费者一直消费死信队列里面的消息就可以了,因为里面的消息都是希望被马上处理的消息
生产者生产一条延时消息,根据需要延时时间的不同,通过不同的routing key把消息路由到不同的延迟队列,每一个队列都设置了不同的TTL属性,并且绑定在同一个死信交换机中,消息过期了以后,根据routing key的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理就可以了。注意:不要造成重复消费

2.延迟队列使用场景

下面的场景需要使用延迟队列

  1. 订单在十分钟内没有支付就自动取消
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
  3. 账单在一周内没有支付,就会自动结算
  4. 用户注册成功以后,如果三天内没有登录就进行短信题提醒
  5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  6. 预定会议以后,需要提前十分钟通知各个参会人员参加会议。

3.Spring Boot集成RabbitMQ

3.1创建项目,引入依赖

在这里插入图片描述

相关依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>

3.2application.properties配置文件

# RabbitMQ/配置
#服务器地址
spring.rabbitmq.host=服务器地址
#服务端口号
spring.rabbitmq.port=5672
#虚拟主机名称
spring.rabbitmq.virtual-host=/myhost
#用户名
spring.rabbitmq.username=admin
#密码
spring.rabbitmq.password=123456

3.3 队列TTL-代码结构图

在这里插入图片描述

3.4MQ配置类

packagecom.zyh.config;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;/**
 * @author zengyihong
 * @create 2022--10--04 16:44
 */@ConfigurationpublicclassTtlQueueConfiguration{//普通交换机publicstaticfinalString X_EXCHANGE ="X";//普通队列publicstaticfinalString QUEUE_A ="QA";publicstaticfinalString QUEUE_B ="QB";//死信交换机publicstaticfinalString Y_DEAD_LETTER_EXCHANGE ="Y";//死信队列QDpublicstaticfinalString QUEUE_D ="QD";/**
     * 声明普通交换机X
     *
     * @return
     */@BeanpublicDirectExchangexExchange(){returnnewDirectExchange(X_EXCHANGE);}/**
     * 声明队列QA
     *
     * @return
     */@BeanpublicQueuequeueA(){//创建集合保存队列属性Map<String,Object> map =newHashMap<>();//设置该队列绑定的死信交换机名称
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置routing key
        map.put("x-dead-letter-routing-key","YD");//设置队列延迟时间 10秒
        map.put("x-message-ttl",10000);//创建队列returnQueueBuilder.durable(QUEUE_A).withArguments(map).build();}/**
     * 把QA队列和交换机X进行绑定
     *
     * @return
     */@BeanpublicBindingqueueA_BindingX(@Qualifier("queueA")Queue queue,@Qualifier("xExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with("XA");}/**
     * 声明队列QB
     *
     * @return
     */@BeanpublicQueuequeueB(){//创建集合保存队列属性Map<String,Object> map =newHashMap<>();//设置该队列绑定的死信交换机名称
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置routing key
        map.put("x-dead-letter-routing-key","YD");//设置队列延迟时间 10秒
        map.put("x-message-ttl",40000);//创建队列returnQueueBuilder.durable(QUEUE_A).withArguments(map).build();}/**
     * 把QB队列和交换机X进行绑定
     *
     * @return
     */@BeanpublicBindingqueueB_BindingX(@Qualifier("queueB")Queue queue,@Qualifier("xExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with("XB");}/**
     * 声明死信交换机Y
     *
     * @return
     */@BeanpublicDirectExchangeyExchange(){returnnewDirectExchange(Y_DEAD_LETTER_EXCHANGE);}/**
     * 声明死信队列QD
     *
     * @return
     */@BeanpublicQueuequeueD(){returnnewQueue(QUEUE_D);}/**
     * 把死信交换机和死信队列进行绑定
     * @param queue
     * @param exchange
     * @return
     */@BeanpublicBindingdeadLetterBindingQD(@Qualifier("queueD")Queue queue,@Qualifier("yExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with("YD");}}

3.5生产者代码

@Slf4j@RestController@RequestMapping("/ttl")publicclassSendMessageController{@ResourceprivateRabbitTemplate rabbitTemplate;/**
     * 生产者发送消息
     * @param message
     */@GetMapping("/sendMessage/{message}")publicvoidsendMessage(@PathVariableString message){//记录日志
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}",newDate(),message);//给QA队列发送消息
        rabbitTemplate.convertSendAndReceive("X","XA","消息来自TTL为10秒的队列:"+message);
        rabbitTemplate.convertSendAndReceive("X","XB","消息来自TTL为40秒的队列:"+message);}}

3.6消费者代码

@Slf4j@ComponentpublicclassDeadLetterQueueConsumer{@RabbitListener(queues =TtlQueueConfiguration.QUEUE_D)publicvoidreceiveQD(Message message,Channel channel){//获取消息String msg=newString(message.getBody());
        log.info("当前时间:{},收到死信队列消息:{}",newDate(),msg);}}

3.7测试

在这里插入图片描述

在这里插入图片描述

启动boot项目,在浏览器输入localhost:8080/ttl/sendMessage/Hello
在这里插入图片描述

但是这种方式有一种缺点,现在我们只有TTL为10s和40s的延迟队列,如果我们需要其他延时时间的队列的话,那么我们又得新增其他队列,这样其实并不方便,我们想要的是能够动态设置TTL,这样就不需要为每个TTL设置新的延迟队列了。

4.延迟队列优化

4.1代码结构图

在这里插入图片描述

4.2配置类

在之前写的代码基础上新增一个配置类

packagecom.zyh.config;importorg.springframework.amqp.core.*;importorg.springframework.beans.factory.annotation.Qualifier;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;/**
 * @author zengyihong
 * @create 2022--10--05 10:44
 */@ConfigurationpublicclassMessageTtlQueueConfiguration{//死信交换机publicstaticfinalString Y_DEAD_LETTER_EXCHANGE ="Y";//普通队列publicstaticfinalString QUEUE_C ="QC";/**
     * 声明QC队列
     * @return
     */@BeanpublicQueuequeueC(){//创建集合保存队列属性Map<String,Object> map =newHashMap<>();//设置该队列绑定的死信交换机名称
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);//设置routing key
        map.put("x-dead-letter-routing-key","YD");//设置队列延迟时间 10秒
        map.put("x-message-ttl",10000);returnQueueBuilder.durable(QUEUE_C).withArguments(map).build();}/**
     * 把QC队列和正常交换机X进行绑定
     *
     * @return
     */@BeanpublicBindingqueueC_BindingX(@Qualifier("queueC")Queue queue,@Qualifier("xExchange")DirectExchange exchange){returnBindingBuilder.bind(queue).to(exchange).with("XC");}}

4.3生产者

packagecom.zyh.controller;importlombok.Data;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.AmqpException;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.core.MessagePostProcessor;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.GetMapping;importorg.springframework.web.bind.annotation.PathVariable;importorg.springframework.web.bind.annotation.RequestMapping;importorg.springframework.web.bind.annotation.RestController;importjavax.annotation.Resource;importjavax.annotation.Resources;importjava.util.Date;/**
 * @author zengyihong
 * @create 2022--10--04 19:36
 */@Slf4j@RestController@RequestMapping("/ttl")publicclassSendMessageController{@ResourceprivateRabbitTemplate rabbitTemplate;/**
     * 生产者发送消息
     *
     * @param message
     */@GetMapping("/sendMessage/{message}")publicvoidsendMessage(@PathVariableString message){//记录日志
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}",newDate(), message);//给QA队列发送消息
        rabbitTemplate.convertSendAndReceive("X","XA","消息来自TTL为10秒的队列:"+ message);
        rabbitTemplate.convertSendAndReceive("X","XB","消息来自TTL为40秒的队列:"+ message);}/**
     * 生产者发送消息(动态设置有效期)
     *
     * @param message
     */@GetMapping("/sendMessage/{message}/{ttlTime}")publicvoidsendMessage(@PathVariableString message,@PathVariableString ttlTime){MessagePostProcessor messagePostProcessor =newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{//设置消息有效期
                message.getMessageProperties().setExpiration(ttlTime);return message;}};//记录日志
        log.info("当前时间:{},发送一条时长{}毫秒信息给队列QC:{}",newDate(),ttlTime, message);//给QC队列发送消息

        rabbitTemplate.convertAndSend("X","XC", message, messagePostProcessor);}}

4.4消费者

@Slf4j@ComponentpublicclassDeadLetterQueueConsumer{@RabbitListener(queues =TtlQueueConfiguration.QUEUE_D)publicvoidreceiveQD(Message message,Channel channel){//获取消息String msg=newString(message.getBody());
        log.info("当前时间:{},收到死信队列消息:{}",newDate(),msg);}}

4.5测试

启动boot项目
image.png
在浏览器输入
http://localhost:8080/ttl/sendMessage/Hello/20000
http://localhost:8080/ttl/sendMessage/你好/2000
image.png

如果在消息属性上设置TTL的方式,那么消息可能不会按时死亡,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行


本文转载自: https://blog.csdn.net/qq_52797170/article/details/127361706
版权归原作者 不断前进的皮卡丘 所有, 如有侵权,请联系我们删除。

“RabbitMQ:延迟队列”的评论:

还没有评论