环境信息
Spring Boot:2.0.8.RELEASE
Spring Cloud:2.0.4.RELEASE
RabbitMQ,用的是spring-boot-starter-amqp:2.0.8.RELEASE
问题背景
RabbitMQ,使用的是消息手动确认模式
spring.rabbitmq.listener.simple.acknowledge-mode=manual
在处理消息出现异常之后,根据情况手动进行ACK或者NACK处理。
常用异常处理机制
使用AOP拦截消息处理方法,统一进行日志的打印和异常的处理:
自定义注解@MqConsumer
可以标记在类上或者方法上
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqConsumer {
String value() default "";
}
AOP拦截器RabbitInterceptor
拦截器,拦截使用@MqConsumer的类或者方法。
在拦截器里打印了消息内容、耗时等,并根据情况手动ACK或者NACK
这里也可以进行异常下的重试或者存表等处理
package com.xxx.mq.interceptor;
import com.rabbitmq.client.Channel;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
@Aspect
@Component
public class RabbitInterceptor {
private static final Log log = LogFactory.getLog(RabbitInterceptor.class);
@Value("${spring.rabbitmq.listener.simple.acknowledge-mode:auto}")
private String acknowledgeMode;
@Pointcut("@within(com.xxx.mq.support.MqConsumer) || @annotation(com.xxx.mq.support.MqConsumer)")
public void consumerPointCut() {
}
@Around("consumerPointCut()")
public Object consumerListenerAround(ProceedingJoinPoint joinPoint) throws Throwable {
String className = joinPoint.getTarget().getClass().getSimpleName();
String methodName = joinPoint.getSignature().getName();
Object[] args = joinPoint.getArgs();
Channel channel = null;
Message amqpMessage = null;
String correlationId = "";
long deliveryTag = -1L;
for (Object arg : args) {
if (arg instanceof Message) {
amqpMessage = Message.class.cast(arg);
deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
correlationId = amqpMessage.getMessageProperties().getCorrelationId();
} else if (arg instanceof org.springframework.messaging.Message<?>) {
org.springframework.messaging.Message message = org.springframework.messaging.Message.class.cast(arg);
deliveryTag = (long) message.getHeaders().get("amqp_deliveryTag");
correlationId = (String) message.getHeaders().get("amqp_correlationId");
} else if (arg instanceof Channel) {
channel = (Channel) arg;
}
}
if (log.isInfoEnabled()) {
log.info("MQ_HDL > {}.{}(), parameters: {}", className, methodName, args);
}
long start = System.nanoTime();
Object obj = null;
if ("auto".equalsIgnoreCase(acknowledgeMode)) {
obj = joinPoint.proceed(args);
} else {
if (channel == null) {
throw new RuntimeException("手动确认消息,方法参数需要有Channel");
}
try {
obj = joinPoint.proceed(args);
// 手动签收
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 是否重新投递到队列
channel.basicNack(deliveryTag, false, false);
throw e;
}
}
if (log.isInfoEnabled()) {
log.info("MQ_HDL < [" + (System.nanoTime() - start) / 1000000 + "]ms");
}
return obj;
}
}
消费方RabbitConsumer
在消费方里,类上标记了@MqConsumer注解,在方法上配置了监听队列的名称,以及异常处理类rabbitListenerErrorHandlerImpl
@Component
@MqConsumer
public class RabbitConsumer {
Log log = LogFactory.getLog(RabbitConsumer.class);
/**
* 这里按照实际情况配置,请求参数
*
* @param msg
* @param amqpMessage
* @param channel
*/
@RabbitListener(queues = {"${tfb.rabbitmq.properties.configs[0].queues[0].name}"}, errorHandler = "rabbitListenerErrorHandlerImpl")
public void receiveQueue1Msg(@Payload String msg, Message amqpMessage, Channel channel) {
// 业务逻辑
// 模拟异常
// int i = 1/0;
}
}
异常处理类 RabbitListenerErrorHandlerImpl
这个类是实现了RabbitListenerErrorHandler,出现异常之后,handle方法的参数里可以获取得到amqp的Message,org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception,
从而根据具体场景进行处理,比如说异常日志的打印,重试或者存表处理
@Component
public class RabbitListenerErrorHandlerImpl extends ConditionalRejectingErrorHandler implements RabbitListenerErrorHandler {
private static final Log log = LogFactory.getLog(RabbitListenerErrorHandlerImpl.class);
private final FatalExceptionStrategy exceptionStrategy = new DefaultExceptionStrategy();
@Override
public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception {
log.error("Execution of Rabbit message listener failed. amqpMessag[{}]", amqpMessage, exception);
// 目前这里拿不到channel,但是从spring-amqp 2.1.7开始,可以从message的header里获取到。有了channel就能手动nack或ack
if (!this.causeChainContainsARADRE(exception) && this.exceptionStrategy.isFatal(exception)) {
ThreadCacheUtil.cleanAllThreadCache();
throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", exception);
}
ThreadCacheUtil.cleanAllThreadCache();
return null;
}
}
说明
AOP拦截器和消费方配置的异常处理类,有功能重复的地方,比如异常情况下的处理。可以根据实际情况选择其中一个,或者AOP里不处理异常,在异常处理类那边再统一处理。
不同点
AOP拦截器,需要在执行onMessage具体逻辑的时候才会拦截到。如果消息在进入onMessage具体逻辑之前就报错了,那么无法进入拦截器里的异常处理。
而消费方配置的异常处理类都可以处理得到异常,除非异常在AOP里被拦截并且没有抛出。
SpringBoot结合RabbitMQ异常处理,有多种方式:
1.AbstractRabbitListenerContainerFactory里的ErrorHandler
#setErrorHandler(ErrorHandler errorHandler)
这里的ErrorHandler是Spring框架异常处理接口,参数只有一个简单的Throwable t,因此无法获取到一些具体的内容,比如消息体等,也无法对消息进行持久化、手动ACK或NACK。
/**
* @param errorHandler The error handler.
* @see AbstractMessageListenerContainer#setErrorHandler(org.springframework.util.ErrorHandler)
*/
public void setErrorHandler(ErrorHandler errorHandler) {
this.errorHandler = errorHandler;
}
AbstractRabbitListenerContainerFactory是抽象类,是用于创建消息监听容器的,有两个实现类:
SimpleRabbitListenerContainerFactory
DirectRabbitListenerContainerFactory
2.AbstractMessageListenerContainer里的ErrorHandler
AbstractMessageListenerContainer是个抽象类,常见的实现类有以下两个:
SimpleMessageListenerContainer
DirectMessageListenerContainer
这里的ErrorHandler和AbstractRabbitListenerContainerFactory里的是一样的。
参数只有一个简单的Throwable t,因此无法获取到一些具体的内容,比如消息体等,也无法对消息进行持久化、手动ACK或NACK。
例子:
@Bean
public ErrorHandler errorHandler() {
// 自定义异常实现类
return new MqErrorHandler();
}
@Bean
SimpleMessageListenerContainer containerReset(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setDefaultRequeueRejected(false);
container.setErrorHandler(errorHandler());
container.setMessageConverter(jsonConverter());
container.setQueueNames(getQueueAlert());
container.setMessageListener(listenerAlertAdapter);
return container;
}
3.@RabbitListener里的errorHandler(实际是RabbitListenerErrorHandler)
@RabbitListener
/**
* Set an {@link org.springframework.amqp.rabbit.listener.RabbitListenerErrorHandler}
* to invoke if the listener method throws an exception.
* @return the error handler.
* @since 2.0
*/
String errorHandler() default "";
这里的errorHandler是org.springframework.amqp.rabbit.listener.RabbitListenerErrorHandler
/**
* An error handler which is called when a {code @RabbitListener} method
* throws an exception. This is invoked higher up the stack than the
* listener container's error handler.
*
* @author Gary Russell
* @since 2.0
*
*/
@FunctionalInterface
public interface RabbitListenerErrorHandler {
/**
* Handle the error. If an exception is not thrown, the return value is returned to
* the sender using normal {@code replyTo/@SendTo} semantics.
* @param amqpMessage the raw message received.
* @param message the converted spring-messaging message.
* @param exception the exception the listener threw, wrapped in a
* {@link ListenerExecutionFailedException}.
* @return the return value to be sent to the sender.
* @throws Exception an exception which may be the original or different.
*/
Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception) throws Exception;
}
handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
ListenerExecutionFailedException exception)
参数有原生的message,还有转换后的spring-messaging的message,还有包装后的ListenerExecutionFailedException异常信息,于是可以根据这些信息进行消息的处理,包括收到的消息内容是什么,异常信息是什么,以及在这里进行消息的持久化等操作。
如果需要对消息进行手动ACK或NACK,那么就需要获取到Channel才能进行,Channel是和MQ连接的通道,deliveryTag可以从message里获取到。
源码:
Channel接口里的basicAck和basicNack方法。
/**
* Acknowledge one or several received
* messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
* containing the received message being acknowledged.
* @see com.rabbitmq.client.AMQP.Basic.Ack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to acknowledge all messages up to and
* including the supplied delivery tag; false to acknowledge just
* the supplied delivery tag.
* @throws java.io.IOException if an error is encountered
*/
void basicAck(long deliveryTag, boolean multiple) throws IOException;
/**
* Reject one or several received messages.
*
* Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
* or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
* @see com.rabbitmq.client.AMQP.Basic.Nack
* @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
* @param multiple true to reject all messages up to and including
* the supplied delivery tag; false to reject just the supplied
* delivery tag.
* @param requeue true if the rejected message(s) should be requeued rather
* than discarded/dead-lettered
* @throws java.io.IOException if an error is encountered
*/
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;
这里需要注意,spring-amqp版本在2.1.7以前,这里的org.springframework.messaging.Message<?> message里无法获取到Channel,因此无法进行手动ACK、NACK处理,详见Stack Overflow和官方github里的升级记录:
spring boot - How to requeue or reject in RabbitListenerErrorHandler on MANUAL ack mode? - Stack Overflow
Add AmqpHeaders.CHANNEL in error hander · garyrussell/spring-amqp@b314a5f (github.com)
坑的是,本次使用的环境信息里,spring-boot-starter-amqp:2.0.8.RELEASE里包含的spring-amqp版本是2.0.11.RELEASE,不支持!!
实例:
RabbitListenerErrorHandlerImpl.java
@Component
public class RabbitListenerErrorHandlerImpl extends ConditionalRejectingErrorHandler implements RabbitListenerErrorHandler {
private static final Log log = LogFactory.getLog(RabbitListenerErrorHandlerImpl.class);
private final FatalExceptionStrategy exceptionStrategy = new DefaultExceptionStrategy();
@Override
public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception {
log.error("Execution of Rabbit message listener failed. amqpMessag[{}]", amqpMessage, exception);
// 这里可以根据异常的类型等进行精细的判断,决定是否需要ack,以及是否需要重新投递
//if (!this.causeChainContainsARADRE(exception) && this.exceptionStrategy.isFatal(exception)) {
//}
message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
.basicReject(message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);
}
RabbitConsumer.java
@Component
@MqConsumer
@RabbitListener(queues = {"queueName}"}, errorHandler = "rabbitListenerErrorHandlerImpl")
public class RabbitConsumer {
Log log = LogFactory.getLog(RabbitConsumer.class);
@RabbitHandler(isDefault = true)
public void receiveQueue3Msg(@Payload MqRequestDto msg, org.springframework.messaging.Message<?> message, Channel channel, @Headers Map headers) {
log.info(msg);
}
@RabbitHandler()
public void receiveQueue31Msg(@Payload String msg, org.springframework.messaging.Message<?> message, Channel channel, @Headers Map headers) {
log.info(message.getPayload());
}
}
4.RabbitTemplate里的ErrorHandler
/**
* When using a direct reply-to container for request/reply operations, set an error
* handler to be invoked when a reply delivery fails (e.g. due to a late reply).
* @param replyErrorHandler the reply error handler
* @since 2.0.11
* @see #setUseDirectReplyToContainer(boolean)
*/
public void setReplyErrorHandler(ErrorHandler replyErrorHandler) {
this.replyErrorHandler = replyErrorHandler;
}
解决方案
采用上方的@RabbitListener里的errorHandler(实际是RabbitListenerErrorHandler)的方式来处理,并把spring-amqp的版本提升到2.1.7以上
之所以不采用AOP方式,是因为上面提到的异同点:
AOP拦截器,需要在执行onMessage具体逻辑的时候才会拦截到。如果消息在进入@RabbitListener的具体逻辑之前就报错了,那么无法进入拦截器里的异常处理。 而消费方配置的异常处理类都可以处理得到异常,除非异常在AOP里被拦截并且没有抛出。
比如:如果消费方配置错了,导致消息无法进入@RabbitListener的具体处理逻辑,不能成功消费或者拒绝,一直停留在Unacked状态
错误示例:
@RabbitListener(queues = {"queueName"}, errorHandler = "rabbitListenerErrorHandlerImpl")
// 参数里有个MessageProperties,导致消息转换异常,无法进入方法里
public void receiveQueue2Msg(String msg, MessageProperties messageProperties, Channel channel, @Headers Map headers) {
log.info(msg);
// 业务逻辑
}
收到消息之后,抛出异常:
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.eternalinfo.framework.mq.consumer.RabbitConsumer.receiveQueue2Msg(java.lang.String,org.springframework.amqp.core.MessageProperties,com.rabbitmq.client.Channel,java.util.Map)]
Bean [com.eternalinfo.framework.mq.consumer.RabbitConsumer@c088be]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:191)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:126)
... 9 common frames omittedCaused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.amqp.core.MessageProperties] for GenericMessage
版权归原作者 lzhfdxhxm 所有, 如有侵权,请联系我们删除。