延迟队列与SpringBoot实战
概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列
TTL介绍
TTL 是什么呢?TTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置TTL 属性的队列,那么这条消息如果在TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL 和消息的TTL,那么较小的那个值将会被使用,有两种方式设置 TTL。
设置TTL
- 消息设置TTL
rabbitTemplate.convertAndSend("X","XC", message +"ttl:"+ ttl, msg ->{ msg.getMessageProperties().setExpiration(ttl);return msg;});
- 队列设置TTL
args.put("x-message-ttl",15000);QueueBuilder.durable(QUEUE_B).withArguments(args).build();
- 如果不设置 TTL,表示消息永远不会过期,如果将 TTL 设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃
代码实战
配置POM
<dependencies><!--RabbitMQ 依赖--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--swagger--><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger-ui</artifactId><version>2.9.2</version></dependency><!--RabbitMQ 测试依赖--><dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency></dependencies>
配置application
spring.rabbitmq.host=192.168.31.232
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
配置Swagger
packagecom.vmware.config;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importspringfox.documentation.builders.ApiInfoBuilder;importspringfox.documentation.service.ApiInfo;importspringfox.documentation.service.Contact;importspringfox.documentation.spi.DocumentationType;importspringfox.documentation.spring.web.plugins.Docket;importspringfox.documentation.swagger2.annotations.EnableSwagger2;@Configuration@EnableSwagger2publicclassSwaggerConfig{@BeanpublicDocketwebApiConfig(){returnnewDocket(DocumentationType.SWAGGER_2).groupName("webApi").apiInfo(webApiInfo()).select().build();}privateApiInfowebApiInfo(){returnnewApiInfoBuilder().title("rabbitmq 接口文档").description("本文档描述了 rabbitmq 微服务接口定义").version("1.0").contact(newContact("name","url","email")).build();}}
代码架构图
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交换机 Y,它们的类型都是direct,创建一个死信队列 QD,它们的绑定关系如下
RabbitMQ配置类
packagecom.vmware.config;importorg.springframework.amqp.core.*;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassRabbitConfig{//普通交换机privatestaticfinalString X_EXCHANGE ="X";//死信交换机privatestaticfinalString Y_EXCHANGE ="Y";//普通队列AprivatestaticfinalString QUEUE_A ="QA";//普通队列BprivatestaticfinalString QUEUE_B ="QB";//普通队列CprivatestaticfinalString QUEUE_C ="QC";//死信队列DprivatestaticfinalString QUEUE_D ="QD";@Bean("xExchange")publicDirectExchangexExchange(){returnnewDirectExchange(X_EXCHANGE);}@Bean("yExchange")publicDirectExchangeyExchange(){returnnewDirectExchange(Y_EXCHANGE);}@Bean("queueA")publicQueuequeueA(){Map<String,Object> args =newHashMap<>();//设置死信交换机
args.put("x-dead-letter-exchange", Y_EXCHANGE);//设置死信Routing Key
args.put("x-dead-letter-routing-key","YD");//设置超时
args.put("x-message-ttl",10000);//构建队列returnQueueBuilder.durable(QUEUE_A).withArguments(args).build();}@Bean("queueB")publicQueuequeueB(){Map<String,Object> args =newHashMap<>();//设置死信交换机
args.put("x-dead-letter-exchange", Y_EXCHANGE);//设置死心Routing Key
args.put("x-dead-letter-routing-key","YD");//设置超时ttl
args.put("x-message-ttl",15000);//构建队列returnQueueBuilder.durable(QUEUE_B).withArguments(args).build();}@Bean("queueC")publicQueuequeueC(){Map<String,Object> args=newHashMap<>();//设置死信交换机
args.put("x-dead-letter-exchange", Y_EXCHANGE);//设置死信Routing Key
args.put("x-dead-letter-routing-key","YD");//构建队列returnQueueBuilder.durable(QUEUE_C).withArguments(args).build();}@Bean("queueD")publicQueuequeueD(){//构建死信队列DreturnQueueBuilder.durable(QUEUE_D).build();}//绑定普通交换机和队列A@BeanpublicBindingqueueABindingX(){returnBindingBuilder.bind(queueA()).to(xExchange()).with("XA");}//绑定普通交换机与队列B@BeanpublicBindingqueueBBindingX(){returnBindingBuilder.bind(queueB()).to(xExchange()).with("XB");}//绑定普通交换机与队列C@BeanpublicBindingqueueCBindingX(){returnBindingBuilder.bind(queueC()).to(xExchange()).with("XC");}//绑定死信交换机与死信队列@BeanpublicBindingqueueDBindingY(){returnBindingBuilder.bind(queueD()).to(yExchange()).with("YD");}}
生产者
packagecom.vmware.controller;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.*;importjava.util.Date;@RestController@RequestMapping("/ttl")@Slf4jpublicclassSendMsgController{@AutowiredprivateRabbitTemplate rabbitTemplate;/**
* @param message 消息
* @apiNote 生产者代码
*/@GetMapping("/sendMsg/{message}")publicvoidsendMsg(@PathVariableString message){
log.info("当前时间:{},发送消息给两个队列:{}",newDate(), message);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10秒的队列"+ message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为15秒的队列"+ message);}@GetMapping("/sendMsg/{message}/{ttl}")publicvoidsendMsg(@PathVariableString message,@PathVariableString ttl){
rabbitTemplate.convertAndSend("X","XC", message +"ttl:"+ ttl, msg ->{
msg.getMessageProperties().setExpiration(ttl);return msg;});
log.info("当前时间:{},发送消息:{}给队列:XC,ttl:{}",newDate(), message, ttl);}}
消费者
packagecom.vmware.consumer;importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.util.Date;@Slf4j@ComponentpublicclassDeadLetterQueueConsumer{@RabbitListener(queues ={"QD"})publicvoidreceiveD(Message message,Channel channel){
log.info("当前时间:{} 死信队列收到消息:{}",newDate(), message);}}
存在的问题
当生产者发布消息到延迟队列后,消息只能按顺序被消费者消费,当某一消息阻塞时间很长时则会导致其他消息一同阻塞,不能达到ttl到期优先被延时队列的消费者所消费的效果
优化
下载插件rabbitmq_delayed_message_exchange到rabbit的plugin目录下
- 官网:https://www.rabbitmq.com/community-plugins.html
- ubuntu下载方式
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.2/pluginssudowget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
- 启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 重启服务
systemctl restart rabbitmq-server
- 安装完成后可以在rabbit交换机页面看到x-delayed-message
基于插件的延时队列代码实战
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才投递到目标队列中
配置延时队列与交换机
packagecom.vmware.config;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.CustomExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;importjava.util.HashMap;importjava.util.Map;@ConfigurationpublicclassDelayQueueConfig{privatestaticfinalString DELAY_QUEUE_NAME ="delayed.queue";privatestaticfinalString DELAY_EXCHANGE_NAME ="delayed.exchange";privatestaticfinalString DELAY_ROUTING_KEY ="delayed.routingkey";@BeanpublicQueuedelayQueue(){returnnewQueue(DELAY_QUEUE_NAME);}@BeanpublicCustomExchangedelayExchange(){Map<String,Object> args =newHashMap<>();
args.put("x-delayed-type","direct");/**
* 1.交换机名称
* 2.交换机类型:插件类型
* 3.是否持久化
* 4.是否自动删除
*/returnnewCustomExchange(DELAY_EXCHANGE_NAME,"x-delayed-message",true,false,args);}@BeanpublicBindingdelayQueueBindExchange(){returnBindingBuilder.bind(delayQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY).noargs();}}
生产者
packagecom.vmware.controller;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.web.bind.annotation.*;importjava.util.Date;@RestController@RequestMapping("/ttl")@Slf4jpublicclassSendMsgController{@AutowiredprivateRabbitTemplate rabbitTemplate;@GetMapping("/sendDelayMsg/{message}/{delayTime}")publicvoidsendMsg(@PathVariableString message,@PathVariableInteger delayTime){
rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingkey", message, msg ->{
msg.getMessageProperties().setDelay(delayTime);return msg;});
log.info("当前时间:{},发送一条延迟{}毫秒的信息给队列 delayed.queue:{}",newDate(), delayTime, message);}}
消费者
packagecom.vmware.consumer;importcom.rabbitmq.client.Channel;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.util.Date;@Slf4j@ComponentpublicclassDeadLetterQueueConsumer{@RabbitListener(queues ={"delayed.queue"})publicvoidreceiveDelayedQueue(Message message){String msg =newString(message.getBody());
log.info("当前时间:{},收到延时队列的消息:{}",newDate(), msg);}}
实际效果
2022-07-19 23:33:18.021 INFO 23040 --- [nio-8080-exec-4] com.vmware.controller.SendMsgController : 当前时间:Tue Jul 1923:33:18 CST 2022,发送一条延迟20000毫秒的信息给队列 delayed.queue:哈哈哈
2022-07-19 23:33:23.349 INFO 23040 --- [nio-8080-exec-5] com.vmware.controller.SendMsgController : 当前时间:Tue Jul 1923:33:23 CST 2022,发送一条延迟2000毫秒的信息给队列 delayed.queue:哈
2022-07-19 23:33:25.332 INFO 23040 --- [ntContainer#0-1] c.v.consumer.DeadLetterQueueConsumer : 当前时间:Tue Jul 19 23:33:25 CST 2022,收到延时队列的消息:哈2022-07-19 23:33:37.830 INFO 23040 --- [ntContainer#0-1] c.v.consumer.DeadLetterQueueConsumer : 当前时间:Tue Jul 19 23:33:37 CST 2022,收到延时队列的消息:哈哈哈
- 可以看到前一条延时消息并没有阻塞到后面的消息
版权归原作者 冰点契约丶 所有, 如有侵权,请联系我们删除。