0


【Spring-RabbitMq】设置消费重试次数

引言

在我们实际项目中需要对消息消费的

高可用

做保证,首先需要做到的就是消息的重试机制,设想一下以下场景:

当库存服务处理上游服务发过来的订单消息时,此时服务宕机了,或者网络不可用了,那我这个消息是应该算消费成功还是消费失败呢?

显然,我们肯定要对处理不成功的消息进行重试,那么如果消费不成功的话,就要无限次数的重试吗?

答案是否,因为当代码逻辑有Bug或者上游的消息内容是错误的时候,无论再重试多少次也不会成功,此时应该将消息抛弃掉。

所以

消费的最佳实践

是:消息处理失败需要重试,且重试需要指定最大次数以及重试的时间间隔,当消息重试次数耗尽时,对消息做持久化处理。

那么该如何实现这个功能呢?由此引出今天的主题:

针对Spring-RabbitMq,如何实现重试机制

功能实现

这一功能其实并不需要我们自己实现,Spring-RabbitMq已经为我们做好了,我们只需要进行一些配置。

重试机制的配置同样依赖

RabbitListenerContainerFactory

,对这个不熟悉的可以先阅读上一篇博客
【Spring-RabbitMq配置一个消费端接入多个vhost】。

配置MethodInterceptor

spring-rabbitmq包下的

org.springframework.amqp.rabbit.config.RetryInterceptorBuilder

,该类是一个建造者,通过他可以配置重试机制,在我的项目中指定了最多重试5次,并且重试间隔是:重试初始间隔8秒.2倍递增,不超过60秒。

/**
     * 重试机制
     * 重试初始间隔8秒.2倍递增,不超过60秒
     * 最多重试5次
     *
     * @return 重试模板
     */publicMethodInterceptorcreateInterceptor(){returnRetryInterceptorBuilder.stateless().maxAttempts(8).backOffOptions(8000,2,180*1000)// 指定消息重试达到最大次数后的回调.recoverer(messageRecoverer()).build();// RejectAndDontRequeueRecoverer: 重试最大次数后消息会一直处于 UnAcked 状态,当断开连接时,消息会转为 Ready,继续等待被消费}

配置重试次数耗尽后的策略MessageRecoverer

让我们看

org.springframework.amqp.rabbit.retry.MessageRecoverer

,这是一个函数式接口,在消息重试次数耗尽后,会我们执行这个方法。

packageorg.springframework.amqp.rabbit.retry;importorg.springframework.amqp.core.Message;/**
 * @author Dave Syer
 * @author Gary Russell
 *
 */@FunctionalInterfacepublicinterfaceMessageRecoverer{/**
     * Callback for message that was consumed but failed all retry attempts.
     *
     * @param message the message to recover
     * @param cause the cause of the error
     */voidrecover(Message message,Throwable cause);}

在我的项目中,次数耗尽后,我们会把消息内容记录到日志中,并发出企业微信告警。

这里主要是结合具体业务,把

消息转发到死信交换机

或者

持久化到数据库

都是可以的。

/**
     * 消息达到最大重试次数后的回调
     *
     * @return 已消费但所有重试失败的消息的回调。
     */privateMessageRecoverermessageRecoverer(){return(message, cause)->{String messageContent =newString(message.getBody(),StandardCharsets.UTF_8);
            log.error("消息达到最大重试次数,从队列删除,消息:{},message:{}",newString(message.getBody(),StandardCharsets.UTF_8),JSON.toJSONString(message));
            log.error("消息达到最大重试次数,异常",cause);try{String errorMsg =Optional.ofNullable(cause.getMessage()).map(msg -> msg.length()>100? msg.substring(0,100): msg).orElse("");String notifyContent = weChatWarnMessageUtil.buildMqWarnMessage(StrUtil.format(MESSAGE_RECOVER_MSG, message.getMessageProperties().getConsumerQueue(),
                        messageContent, errorMsg),"MQ消费异常");
                weChatWarnMessageUtil.send(notifyContent,WeChatNotifyContentType.MARKDOWM);}catch(Exception e){
                log.info("企业微信告警失败", e);}thrownewAmqpRejectAndDontRequeueException(null,true, cause);};}

最后,像上篇内容说的一样,配置好RabbitListenerContainerFactory,再与队列绑定即可。
在这里插入图片描述


本文转载自: https://blog.csdn.net/u014135956/article/details/141093652
版权归原作者 有答案直接发给我 所有, 如有侵权,请联系我们删除。

“【Spring-RabbitMq】设置消费重试次数”的评论:

还没有评论