延迟消息有两种实现方案:
1,基于死信队列
2,集成延迟插件
1. 基于死信实现延迟消息
使用RabbitMQ来实现延迟消息必须先了解RabbitMQ的两个概念:消息的TTL和死信Exchange,通过这两者的组合来实现延迟队列
1.1 消息的TTL(Time To Live)
消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。
如何设置TTL:
我们创建一个队列queue.temp,在Arguments 中添加x-message-ttl 为5000 (单位是毫秒),那所在压在这个队列的消息在5秒后会消失。
1.2 死信交换机 Dead Letter Exchanges
一个消息在满足如下条件下,会进死信路由,记住这里是路由而不是队列,一个路由可以对应很多队列。
(1)一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。
(2)上面的消息的TTL到了,消息过期了。
(3)队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上
Dead Letter Exchange其实就是一种普通的exchange,和创建其他exchange没有两样。只是在某一个设置Dead Letter Exchange的队列中有消息过期了,会自动触发消息的转发,发送到Dead Letter Exchange中去。
我们现在可以测试一下延迟队列。
(1)创建死信队列
(2)创建交换机
(3)建立交换器与队列之间的绑定
(4)创建队列
1.3 代码实现
1.3.1 添加配置类
importorg.springframework.amqp.core.DirectExchange;importorg.springframework.amqp.core.Queue;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@ConfigurationpublicclassDeadLetterMqConfig{// 声明一些变量publicstaticfinalString exchange_dead ="exchange.dead";publicstaticfinalString routing_dead_1 ="routing.dead.1";publicstaticfinalString routing_dead_2 ="routing.dead.2";publicstaticfinalString queue_dead_1 ="queue.dead.1";publicstaticfinalString queue_dead_2 ="queue.dead.2";// 定义交换机@BeanpublicDirectExchangeexchange(){returnnewDirectExchange(exchange_dead,true,false,null);}@BeanpublicQueuequeue1(){// 设置如果队列一 出现问题,则通过参数转到exchange_dead,routing_dead_2 上!HashMap<String,Object> map =newHashMap<>();// 参数绑定 此处的key 固定值,不能随意写
map.put("x-dead-letter-exchange",exchange_dead);
map.put("x-dead-letter-routing-key",routing_dead_2);// 设置延迟时间
map.put("x-message-ttl",10*1000);// 队列名称,是否持久化,是否独享、排外的【true:只可以在本次连接中访问】,是否自动删除,队列的其他属性参数returnnewQueue(queue_dead_1,tr8
`ue,false,false,map);}@BeanpublicBindingbinding(){// 将队列一 通过routing_dead_1 key 绑定到exchange_dead 交换机上returnBindingBuilder.bind(queue1()).to(exchange()).with(routing_dead_1);}// 这个队列二就是一个普通队列@BeanpublicQueuequeue2(){returnnewQueue(queue_dead_2,true,false,false,null);}// 设置队列二的绑定规则@BeanpublicBindingbinding2(){// 将队列二通过routing_dead_2 key 绑定到exchange_dead交换机上!returnBindingBuilder.bind(queue2()).to(exchange()).with(routing_dead_2);}}
1.3.2 配置发送消息
@RestController@RequestMapping("/mq")@Slf4jpublicclassMqController{@AutowiredprivateRabbitTemplate rabbitTemplate;@AutowiredprivateRabbitService rabbitService;@GetMapping("sendDeadLettle")publicResultsendDeadLettle(){SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");this.rabbitTemplate.convertAndSend(DeadLetterMqConfig.exchange_dead,DeadLetterMqConfig.routing_dead_1,"ok");System.out.println(sdf.format(newDate())+" Delay sent.");returnResult.ok();}}
1.3.3消息接收方
@Component@ConfigurationpublicclassDeadLetterReceiver{@RabbitListener(queues =DeadLetterMqConfig.queue_dead_2)publicvoidget(String msg){System.out.println("Receive:"+ msg);SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("Receive queue_dead_2: "+ sdf.format(newDate())+" Delay rece."+ msg);}}
2. 基于延迟插件实现延迟消息
Rabbitmq实现了一个插件x-delay-message来实现延时队列
2.1 插件安装
- 首先我们将刚下载下来的rabbitmq_delayed_message_exchange-3.8.0.ez文件上传到RabbitMQ所在服务器,下载地址:https://www.rabbitmq.com/community-plugins.html
- 切换到插件所在目录,执行 docker cp rabbitmq_delayed_message_exchange-3.8.0.ez rabbitmq:/plugins 命令,将刚插件拷贝到容器内plugins目录下
- 执行 docker exec -it rabbitmq /bin/bash 命令进入到容器内部,并 cd plugins 进入plugins目录
- 执行 ls -l|grep delay 命令查看插件是否copy成功
- 在容器内plugins目录下,执行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令启用插件
- exit命令退出RabbitMQ容器内部,然后执行 docker restart rabbitmq 命令重启RabbitMQ容器
2.2 代码实现
配置队列
@ConfigurationpublicclassDelayedMqConfig{publicstaticfinalString exchange_delay ="exchange.delay";publicstaticfinalString routing_delay ="routing.delay";publicstaticfinalString queue_delay_1 ="queue.delay.1";/**
* 队列不要在RabbitListener上面做绑定,否则不会成功,如队列2,必须在此绑定
*
* @return
*/@BeanpublicQueuedelayQeue1(){// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化returnnewQueue(queue_delay_1,true);}@BeanpublicCustomExchangedelayExchange(){Map<String,Object> args =newHashMap<String,Object>();
args.put("x-delayed-type","direct");returnnewCustomExchange(exchange_delay,"x-delayed-message",true,false, args);}@BeanpublicBindingdelayBbinding1(){returnBindingBuilder.bind(delayQeue1()).to(delayExchange()).with(routing_delay).noargs();}}
发送消息
@GetMapping("sendDelay")publicResultsendDelay(){SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");this.rabbitTemplate.convertAndSend(DelayedMqConfig.exchange_delay,DelayedMqConfig.routing_delay, sdf.format(newDate()),newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{
message.getMessageProperties().setDelay(10*1000);System.out.println(sdf.format(newDate())+" Delay sent.");return message;}});returnResult.ok();}
接收消息
@ComponentpublicclassDelayReceiver{@RabbitListener(queues =DelayedMqConfig.queue_delay_1)publicvoidget(String msg){SimpleDateFormat sdf =newSimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("Receive queue_delay_1: "+ sdf.format(newDate())+" Delay rece."+ msg);}}
启动测试即可
版权归原作者 ㏒灵韵№ 所有, 如有侵权,请联系我们删除。