0


rabbitmq:retry重试机制和延迟消息的实现

rabbitmq:retry重试机制和延迟消息的实现

在消费者消费消息的时候可能会因为网络等外部原因导致消息处理失败,这个时候如果将消息直接丢弃会导致正常的业务丢失,但是如果是一条本身就有问题的消息,那么这个时候又必须丢弃掉,如果选择用channel.basicNack

channel.basicReject方法让消息重回对了,会导致消费者在不停的消费这条消息,这将是一个致命的问题。

所幸,rabbitmq提供了retry机制来控制消息的重试

在这里插入图片描述

yml配置文件:

spring:rabbitmq:host: IP
    port:5672username: guest
    password: guest
    virtual-host: smallJHost
    # 消费者确认机制相关配置# 开启publisher-confirm,# 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallbackpublisher-confirm-type: correlated
    # publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallbackpublisher-returns:true# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息template:mandatory:truelistener:simple:# ack机制类型acknowledge-mode: manual
        # 设置预取消息数量prefetch:2# 失败重试retry:# 开启消费者失败重试enabled:true# 初始的失败等待时长为1秒initial-interval:1000# 失败的等待时长倍数,下次等待时长 = multiplier * last-intervalmultiplier:3# 最大重试次数max-attempts:4# true无状态;false有状态。如果业务中包含事务,这里改为falsestateless:true

在RabbitmqConfig中增加如下配置:

packagecom.gitee.small.config;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.Binding;importorg.springframework.amqp.core.BindingBuilder;importorg.springframework.amqp.core.Queue;importorg.springframework.amqp.core.TopicExchange;importorg.springframework.amqp.rabbit.core.RabbitTemplate;importorg.springframework.amqp.rabbit.retry.MessageRecoverer;importorg.springframework.amqp.rabbit.retry.RepublishMessageRecoverer;importorg.springframework.beans.BeansException;importorg.springframework.context.ApplicationContext;importorg.springframework.context.ApplicationContextAware;importorg.springframework.context.annotation.Bean;importorg.springframework.context.annotation.Configuration;@Configuration@Slf4jpublicclassRabbitMQConfigimplementsApplicationContextAware{// 其他队列、交换机、绑定、回调等代码省略,需要的朋友可看我之前的文章、、、@BeanpublicQueueerrorQueue(){returnnewQueue("error");}@BeanpublicTopicExchangeexchange(){returnnewTopicExchange("topicExchange");}@Bean(name ="binding.error")publicBindingbindingExchangeMessage3(){returnBindingBuilder.bind(errorQueue()).to(exchange()).with("error");}/**
     * 定义 MessageRecoverer 将错误消息发送到指定队列
     */@BeanpublicMessageRecovererrepublishMessageRecoverer(RabbitTemplate rabbitTemplate){returnnewRepublishMessageRecoverer(rabbitTemplate,"topicExchange","error");}}

在消费者定义中有一点需要注意,不能直接将异常处理掉,否则是不会将消息发送到error队列的。

packagecom.gitee.small.rabbitmq;importcom.rabbitmq.client.Channel;importlombok.SneakyThrows;importlombok.extern.slf4j.Slf4j;importorg.springframework.amqp.core.ExchangeTypes;importorg.springframework.amqp.core.Message;importorg.springframework.amqp.rabbit.annotation.Exchange;importorg.springframework.amqp.rabbit.annotation.Queue;importorg.springframework.amqp.rabbit.annotation.QueueBinding;importorg.springframework.amqp.rabbit.annotation.RabbitListener;importorg.springframework.stereotype.Component;importjava.util.concurrent.TimeUnit;@Component@Slf4jpublicclassWorkRabbitReceiver{privatestaticInteger index =0;@RabbitListener(bindings =@QueueBinding(
            value =@Queue(name ="监听队列名称"),
            exchange =@Exchange(value ="binding对象beanname", type =ExchangeTypes.TOPIC)))publicvoidprocess(String msg,Channel channel,Message message)throwsException{try{System.out.println(1/0);}catch(Exception e){
            log.error("消息重试");thrownewException();}}}

小结:

  • 消息重试是在本地进行重试,不会回到消息队列中
  • 重试模式下,重试次数耗尽后,如果消息依然失败,为了防止消息被直接丢弃,需要有MessageRecovery 接口来处理,它包含三种不同的实现- RejectAndDontRequeueRecoverer:重试耗尽后,直接 reject,丢弃消息。默认就是这种方式- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机很显然,RepublishMessageRecoverer方案应用最广最合理,本文中也是以此为例
实现延迟队列

消息超时方案:

  • 给队列设置 ttl 属性,进入队列后超过 ttl 时间的消息变为死信
  • 给消息设置 ttl 属性,队列接收到消息超过 ttl 时间后变为死信

本文讲给消息设置超时,因为这个方案更灵活。

  1. 创建死信队列和死信交换机,并将其绑定@BeanpublicDirectExchangedlExchange(){// 声明死信交换机 dl.directreturnnewDirectExchange("dl.direct",true,false);}@BeanpublicQueuedlQueue(){// 声明存储死信的队列 dl.queuereturnnewQueue("dl.queue",true);}@Bean(name ="binding.dl")publicBindingdlBinding(){// 将死信队列 与 死信交换机绑定returnBindingBuilder.bind(dlQueue()).to(dlExchange()).with("dl");}
  2. 指定消息过期时间,向正常消息队列发送消息,一条5秒延时,一条10秒延时privatevoiddeadLetter(){finalMessage message =MessageBuilder.withBody("延迟消息测试".getBytes(StandardCharsets.UTF_8)).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration("5000").build();finalMessage message2 =MessageBuilder.withBody("延迟消息测试".getBytes()).setDeliveryMode(MessageDeliveryMode.PERSISTENT).setExpiration("10000").build(); rabbitTemplate.convertAndSend("topicExchange","topic.dead", message); rabbitTemplate.convertAndSend("topicExchange","topic.dead", message2);}
  3. 监听死信队列,实现延迟消息具体逻辑/*** 监听死信队列,处理延迟消息*/@RabbitListener(bindings =@QueueBinding( value =@Queue(name ="dl.queue"), exchange =@Exchange(value ="binding.dl", type =ExchangeTypes.TOPIC)))publicvoidprocess(String msg,Channel channel,Message message)throwsIOException{ log.info("延迟消息:{}", msg); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);}

使用场景:

  • 延迟发送短信
  • 用户下单,如果用户在一小时内未支付,自动取消
  • 会议前半小时提醒参会

小结:

  1. 创建一个交换机作为死信交换机并绑定一个队列作为死信队列
  2. 给消息的目标队列设置队列超时时间并指定死信交换机和路由 key
  3. 将消息的目标队列绑定到死信交换机
  4. 消费者监听死信队列获取超时消息

本文转载自: https://blog.csdn.net/qq_41992429/article/details/128707411
版权归原作者 small+ 所有, 如有侵权,请联系我们删除。

“rabbitmq:retry重试机制和延迟消息的实现”的评论:

还没有评论