RabbitMQ原生并不支持延时消息,需要我们自行实现。
AMQP协议
- 生产者端发布消息到MQ中的交换机Exchange,每条消息会带一个路由键RoutingKey;
- 交换机与队列queue通过路由键进行绑定binding;
- 消息通过路由键路由到对应的队列queue,队列与交换器没有对应的绑定关系则消息会丢失;
- 消息由队列投递给消费者端进行消费。
交换机类型
- direct:直连交换机
- fanout:广播交换机
- topic:主题交换机
- headers:头交换机,基本不用,由direct替代
延时消息实现方式
消息过期时间+死信队列
普通队列绑定死信交换机,发送消息时设置过期时间,过期时间到了进入死信队列,监听死信队列消费。
存在的问题:消息的过期时间不一致时,使用死信队列可能起不到延时的作用。当发送两条不同过期时间的消息时,先发送的消息1过期时间(20s)大于后发送的消息2过期时间(10s),由于消息的顺序消费,消息2过期后并不会立即重新发布到死信交换机,而是等消息1过期后一起被消费。
使用场景:超时时间一致,如订单超时取消。
延时插件
- 下载延时插件,将插件移至rabbitmq插件目录下,通过命令应用延时插件。
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- 声明一个类型为x-delayed-message的交换机CustomExchange。
- 参数添加x-delayed-type,值为交换机类型的属性,如direct,用于路由键的映射。
- 发送消息时,消息头header设置参数x-delay,值为延迟时间,单位毫秒。
云消息队列RabbitMQ(阿里云)
阿里云消息队列rabbitmq提供了延时消息的原生支持,只需要在消息头header里面增加参数delay,值为延迟时间,单位毫秒。
MessageProperties messageProperties =newMessageProperties();String msgId ="CANCEL:"+ order.getId();//此处设置的msgId才能被会转成rabbitmq client的messageId,发送给broker
messageProperties.setMessageId(msgId);long delay =15*60*1000;// 消息头设置延时时间
messageProperties.setHeader("delay", delay);Message message =newMessage(order.getId().getBytes(), messageProperties);
rabbitTemplate.convertAndSend(ORDER_CANCEL_TOPIC,Q_CANCEL_ORDER, message);
代码使用示例
- pom文件添加amqp依赖。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
- 配置文件application.yml添加mq配置。
spring:rabbitmq:host: 127.0.0.1
port:5672username: rabbitmq
password: rabbitmq
virtual-host: rabbitmq
listener:simple:acknowledge-mode: manual
- 编写配置类RabbitMQConfig,创建交换机、队列并进行绑定。
@ConfigurationpublicclassRabbitMQConfig{// 普通交换机publicstaticfinalStringORDER_CANCEL_EXCHANGE="order_cancel_exchange";publicstaticfinalStringORDER_CANCEL_QUEUE="order_cancel_queue";// 死信交换机publicstaticfinalStringDEAD_EXCHANGE="dead_exchange";publicstaticfinalStringDEAD_ROUTING_KEY="dead_routing_key";publicstaticfinalStringDEAD_QUEUE="dead_queue";// 延迟插件交换机publicstaticfinalStringDMP_EXCHANGE="dmp_exchange";publicstaticfinalStringDMP_ROUTING_KEY="dmp_routing_key";publicstaticfinalStringDMP_QUEUE="dmp_queue";@BeanpublicExchangeorderCancelExchange(){returnExchangeBuilder.directExchange(ORDER_CANCEL_EXCHANGE).build();}@BeanpublicQueueorderCancelQueue(){returnQueueBuilder.durable(ORDER_CANCEL_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey(DEAD_ROUTING_KEY).build();}@BeanpublicExchangedeadExchange(){returnExchangeBuilder.directExchange(DEAD_EXCHANGE).build();}@BeanpublicQueuedeadQueue(){returnQueueBuilder.durable(DEAD_QUEUE).build();}@BeanpublicBindingorderCancelBinding(Exchange orderCancelExchange,Queue orderCancelQueue){returnBindingBuilder.bind(orderCancelQueue).to(orderCancelExchange).with("").noargs();}@BeanpublicBindingdeadBinding(Exchange deadExchange,Queue deadQueue){returnBindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();}// 延时插件使用@BeanpublicCustomExchangedmpExchange(){Map<String,Object> args =newHashMap<>();
args.put("x-delayed-type","direct");returnnewCustomExchange(DMP_EXCHANGE,"x-delayed-message",true,false, args);}@BeanpublicQueuedmpQueue(){returnQueueBuilder.durable(DMP_QUEUE).build();}@BeanpublicBindingdmpBinding(CustomExchange dmpExchange,Queue dmpQueue){returnBindingBuilder.bind(dmpQueue).to(dmpExchange).with(DMP_ROUTING_KEY).noargs();}}
4.编写消息生产者MsgSender,实现死信队列、延时插件消息发送。
@Component@Slf4jpublicclassMsgSender{@AutowiredprivateRabbitTemplate rabbitTemplate;/**
* 通过死信队列发送消息
* @param message
* @param time
*/publicvoidsend(String message,Integer time){String expireTime =String.valueOf(time *1000);
rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_CANCEL_EXCHANGE,"", message,newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{//设置消息的过期时间,是以毫秒为单位的
message.getMessageProperties().setExpiration(expireTime);return message;}});
log.info("死信队列消息:{}发送成功,过期时间:{}秒", message, time);}/**
* 通过延迟插件发送消息
* @param message
* @param time
*/publicvoidsendByPlugin(String message,Integer time){
rabbitTemplate.convertAndSend(RabbitMQConfig.DMP_EXCHANGE,RabbitMQConfig.DMP_ROUTING_KEY, message,newMessagePostProcessor(){@OverridepublicMessagepostProcessMessage(Message message)throwsAmqpException{// 延迟插件只需要在消息的header中添加x-delay属性,值为过期时间,单位毫秒
message.getMessageProperties().setHeader("x-delay", time *1000);return message;}});
log.info("延迟插件消息:{}发送成功,过期时间:{}秒", message, time);}}
- 编写消息消费者MsgListener,监听延时消息进行相应的业务处理。
@Component@Slf4jpublicclassMsgListener{@RabbitListener(queues =RabbitMQConfig.DEAD_QUEUE)publicvoidorderCancel(Message message,Channel channel)throwsIOException{
log.info("使用死信队列,收到消息:{}",newString(message.getBody()));// 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}@RabbitListener(queues =RabbitMQConfig.DMP_QUEUE)publicvoidorderCancelByPlugin(Message message,Channel channel)throwsIOException{
log.info("使用延迟插件,收到消息:{}",newString(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}}
- 编写测试类MsgController,启动服务发送消息测试。
@RestControllerpublicclassMsgController{@AutowiredpublicMsgSender msgSender;@GetMapping("/send")publicStringsend(@RequestParamString msg,Integer time){
msgSender.send(msg, time);return"success";}@GetMapping("/sendByPlugin")publicStringsendByPlugin(@RequestParamString msg,Integer time){
msgSender.sendByPlugin(msg, time);return"success";}}
1)使用死信队列发送两次消息http://localhost:8081/send?msg=消息A&time=20 和http://localhost:8081/send?msg=消息B&time=10,消息会按发送顺序消费,并没有起到延时的效果。
2)使用延时插件发送两次消息http://localhost:8081/sendByPlugin?msg=消息A&time=20 和http://localhost:8081/sendByPlugin?msg=消息B&time=10,消息会按照延时时间顺序消费。
版权归原作者 τNeverMindζ 所有, 如有侵权,请联系我们删除。