0


springboot:整合rabbitmq之重试机制

当我们消息消费失败的时候,可以进行重试,
什么情况下会重发消息
1、网络抖动
2、程序抛出异常没有try-catch
RabbitMQ自动补偿机制触发:(多用于调用第三方接口)
1.当我们的消费者在处理我们的消息的时候,程序抛出异常情况下(默认无限次数重试),如果这里的异常try-catch后自己配置的重试机制是不生效的
2.应该对我们的消息重试设置间隔重试时间,比如消费失败最多只能重试5次,间隔3秒(防止重复消费,幂等问题)

如果重试5次,也就是15秒内重试还是失败情况下应该如何处理
1.默认情况下,重试多次还是失败的话,会自动删除该消息(消息可能会丢失)
解决思路:
A:如果重试多次还是失败的情况下,最终存放到死信队列.
B:采用表日志记录,消费失败错误的日志记录 后期人工自动对消息实现补偿.

一、项目准备
依赖

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置类

spring.rabbitmq.port=5672
spring.rabbitmq.username=rabbit
spring.rabbitmq.password=123456
spring.rabbitmq.addresses=192.168.23.145
spring.rabbitmq.virtual-host=/rabbit

二、案例重现

# 声明队列
@Configuration
public class HelloWorldConfig {
    @Bean
    public Queue setQueue(){return new Queue("helloWorldqueue");}}
# 生产者
@Slf4j
@RestController
public class ProducerController {

    @Autowired
    private RabbitTemplate rabbitTemplate;//helloWorld 直连模式
    @ApiOperation(value="helloWorld发送接口",notes="直接发送到队列")
    @GetMapping(value="/helloWorldSend")
    public Object helloWorldSend(String message) throws AmqpException, UnsupportedEncodingException {//设置部分请求参数
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);//发消息
        rabbitTemplate.send("helloWorldqueue",new Message(message.getBytes("UTF-8"),messageProperties));return"message sended : "+message;}}# 消费者
@Component
public class ConcumerReceiver {

    private int count = 1;//直连模式的多个消费者,会分到其中一个消费者进行消费。类似task模式
    //通过注入RabbitContainerFactory对象,来设置一些属性,相当于task里的channel.basicQos
    @RabbitListener(queues="helloWorldqueue")
    public void helloWorldReceive(String message){
        System.out.println("当前执行次数:"+ count++);
         System.out.println("异常前,helloWorld模式 received message : "+message);
         int i = 1/0;
        System.out.println("异常后,helloWorld模式 received message : "+message);}}

启动测试:
无限循环报错
停止后,消息重回Ready状态
在这里插入图片描述
三、实现消息重试
实现重试

spring.rabbitmq.listener.simple.retry.enabled= true
spring.rabbitmq.listener.simple.retry.max-attempts= 5
spring.rabbitmq.listener.simple.retry.max-interval= 10000
spring.rabbitmq.listener.simple.retry.initial-interval= 2000
spring.rabbitmq.listener.simple.retry.multiplier= 2

重启测试
第一次执行时间2s,第二次4s,第三次8s,第四次16s,第五次由于设置了最大间隔为10s,所有变成了10s
最后查看retry_a队列,消息没有了,也就是说重试五次失败之后就会移除该消息
移除操作是由日志中的这个类处理:RejectAndDontRequeueRecoverer(拒绝和不要重新排队)
在这里插入图片描述

对重试失败的消息重新排队
使用下 ImmediateRequeueMessageRecoverer 重新排队在HelloWorldConfig中配置

    @Bean
    public MessageRecoverer messageRecoverer(){return new ImmediateRequeueMessageRecoverer();}

重启运行:

可以看出:重试5次之后,返回队列,然后再重试5次,周而复始直到不抛出异常为止,这样还是会影响后续的消息消费
在这里插入图片描述
把重试失败消息放入重试失败队列
接着使用 RepublishMessageRecoverer 重新发布在HelloWorldConfig中配置

@Configuration
public class HelloWorldConfig {

    @Bean
    public Queue setQueue(){return new Queue("helloWorldqueue");}/*@Bean
    public MessageRecoverer messageRecoverer(){return new ImmediateRequeueMessageRecoverer();}*/
    @Bean
    public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){// 需要配置交换机和绑定键
        return new RepublishMessageRecoverer(rabbitTemplate, RETRY_EXCHANGE, RETRY_FAILURE_KEY);}//失败队列
    public static final String RETRY_FAILURE_KEY = "retry.failure.key";//失败交换机
    public static final String RETRY_EXCHANGE = "retry_exchange";

    @Bean
    public Queue setQueueFailure(){return new Queue(RETRY_FAILURE_KEY);}

    @Bean
    public FanoutExchange setFailureExchange(){return new FanoutExchange(RETRY_EXCHANGE);}
    @Bean
    public Binding bindFailureBind(){return BindingBuilder.bind(setQueueFailure()).to(setFailureExchange());}}

在ConcumerReceiver中创建重试失败消息监听

@RabbitListener(queues="retry.failure.key")
    public void retryFailure(String message) throws InterruptedException {
        Thread.sleep(20000);
        System.out.println(" [ 消费者@重试失败号 ] 接收到消息 ==> '"+ message);}

重启,运行结果:

重试5次之后,将消息 Republishing failed message to exchange ‘retry.exchange’ with routing key retry-key 转发到重试失败队列,由重试失败消费者消费
在这里插入图片描述


本文转载自: https://blog.csdn.net/p393975269/article/details/129836229
版权归原作者 p393975269 所有, 如有侵权,请联系我们删除。

“springboot:整合rabbitmq之重试机制”的评论:

还没有评论