引言
在我们实际项目中需要对消息消费的
高可用
做保证,首先需要做到的就是消息的重试机制,设想一下以下场景:
当库存服务处理上游服务发过来的订单消息时,此时服务宕机了,或者网络不可用了,那我这个消息是应该算消费成功还是消费失败呢?
显然,我们肯定要对处理不成功的消息进行重试,那么如果消费不成功的话,就要无限次数的重试吗?
答案是否,因为当代码逻辑有Bug或者上游的消息内容是错误的时候,无论再重试多少次也不会成功,此时应该将消息抛弃掉。
所以
消费的最佳实践
是:消息处理失败需要重试,且重试需要指定最大次数以及重试的时间间隔,当消息重试次数耗尽时,对消息做持久化处理。
那么该如何实现这个功能呢?由此引出今天的主题:
针对Spring-RabbitMq,如何实现重试机制
。
功能实现
这一功能其实并不需要我们自己实现,Spring-RabbitMq已经为我们做好了,我们只需要进行一些配置。
重试机制的配置同样依赖
RabbitListenerContainerFactory
,对这个不熟悉的可以先阅读上一篇博客
【Spring-RabbitMq配置一个消费端接入多个vhost】。
配置MethodInterceptor
spring-rabbitmq包下的
org.springframework.amqp.rabbit.config.RetryInterceptorBuilder
,该类是一个建造者,通过他可以配置重试机制,在我的项目中指定了最多重试5次,并且重试间隔是:重试初始间隔8秒.2倍递增,不超过60秒。
/**
* 重试机制
* 重试初始间隔8秒.2倍递增,不超过60秒
* 最多重试5次
*
* @return 重试模板
*/publicMethodInterceptorcreateInterceptor(){returnRetryInterceptorBuilder.stateless().maxAttempts(8).backOffOptions(8000,2,180*1000)// 指定消息重试达到最大次数后的回调.recoverer(messageRecoverer()).build();// RejectAndDontRequeueRecoverer: 重试最大次数后消息会一直处于 UnAcked 状态,当断开连接时,消息会转为 Ready,继续等待被消费}
配置重试次数耗尽后的策略MessageRecoverer
让我们看
org.springframework.amqp.rabbit.retry.MessageRecoverer
,这是一个函数式接口,在消息重试次数耗尽后,会我们执行这个方法。
packageorg.springframework.amqp.rabbit.retry;importorg.springframework.amqp.core.Message;/**
* @author Dave Syer
* @author Gary Russell
*
*/@FunctionalInterfacepublicinterfaceMessageRecoverer{/**
* Callback for message that was consumed but failed all retry attempts.
*
* @param message the message to recover
* @param cause the cause of the error
*/voidrecover(Message message,Throwable cause);}
在我的项目中,次数耗尽后,我们会把消息内容记录到日志中,并发出企业微信告警。
这里主要是结合具体业务,把
消息转发到死信交换机
或者
持久化到数据库
都是可以的。
/**
* 消息达到最大重试次数后的回调
*
* @return 已消费但所有重试失败的消息的回调。
*/privateMessageRecoverermessageRecoverer(){return(message, cause)->{String messageContent =newString(message.getBody(),StandardCharsets.UTF_8);
log.error("消息达到最大重试次数,从队列删除,消息:{},message:{}",newString(message.getBody(),StandardCharsets.UTF_8),JSON.toJSONString(message));
log.error("消息达到最大重试次数,异常",cause);try{String errorMsg =Optional.ofNullable(cause.getMessage()).map(msg -> msg.length()>100? msg.substring(0,100): msg).orElse("");String notifyContent = weChatWarnMessageUtil.buildMqWarnMessage(StrUtil.format(MESSAGE_RECOVER_MSG, message.getMessageProperties().getConsumerQueue(),
messageContent, errorMsg),"MQ消费异常");
weChatWarnMessageUtil.send(notifyContent,WeChatNotifyContentType.MARKDOWM);}catch(Exception e){
log.info("企业微信告警失败", e);}thrownewAmqpRejectAndDontRequeueException(null,true, cause);};}
最后,像上篇内容说的一样,配置好RabbitListenerContainerFactory,再与队列绑定即可。
版权归原作者 有答案直接发给我 所有, 如有侵权,请联系我们删除。