0


RabbitMQ,手动ACK情况下,消费消息的时候出现异常,如何手动ACK或NACK

环境信息

Spring Boot:2.0.8.RELEASE

Spring Cloud:2.0.4.RELEASE

RabbitMQ,用的是spring-boot-starter-amqp:2.0.8.RELEASE

问题背景

RabbitMQ,使用的是消息手动确认模式

spring.rabbitmq.listener.simple.acknowledge-mode=manual

在处理消息出现异常之后,根据情况手动进行ACK或者NACK处理。

常用异常处理机制

使用AOP拦截消息处理方法,统一进行日志的打印和异常的处理:

自定义注解@MqConsumer

可以标记在类上或者方法上

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface MqConsumer {
    String value() default "";
}

AOP拦截器RabbitInterceptor

拦截器,拦截使用@MqConsumer的类或者方法。

在拦截器里打印了消息内容、耗时等,并根据情况手动ACK或者NACK

这里也可以进行异常下的重试或者存表等处理

package com.xxx.mq.interceptor;

import com.rabbitmq.client.Channel;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.amqp.core.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;

@Aspect
@Component
public class RabbitInterceptor {
    private static final Log log = LogFactory.getLog(RabbitInterceptor.class);

    @Value("${spring.rabbitmq.listener.simple.acknowledge-mode:auto}")
    private String acknowledgeMode;

    @Pointcut("@within(com.xxx.mq.support.MqConsumer) || @annotation(com.xxx.mq.support.MqConsumer)")
    public void consumerPointCut() {
    }

    @Around("consumerPointCut()")
    public Object consumerListenerAround(ProceedingJoinPoint joinPoint) throws Throwable {
        String className = joinPoint.getTarget().getClass().getSimpleName();
        String methodName = joinPoint.getSignature().getName();
        Object[] args = joinPoint.getArgs();
        Channel channel = null;
        Message amqpMessage = null;
        String correlationId = "";
        long deliveryTag = -1L;
        for (Object arg : args) {
            if (arg instanceof Message) {
                amqpMessage = Message.class.cast(arg);
                deliveryTag = amqpMessage.getMessageProperties().getDeliveryTag();
                correlationId = amqpMessage.getMessageProperties().getCorrelationId();
            } else if (arg instanceof org.springframework.messaging.Message<?>) {
                org.springframework.messaging.Message message = org.springframework.messaging.Message.class.cast(arg);
                deliveryTag = (long) message.getHeaders().get("amqp_deliveryTag");
                correlationId = (String) message.getHeaders().get("amqp_correlationId");
            } else if (arg instanceof Channel) {
                channel = (Channel) arg;
            }
        }
        if (log.isInfoEnabled()) {
            log.info("MQ_HDL > {}.{}(), parameters: {}", className, methodName, args);
        }
        long start = System.nanoTime();
        Object obj = null;

        if ("auto".equalsIgnoreCase(acknowledgeMode)) {
            obj = joinPoint.proceed(args);
        } else {
            if (channel == null) {
                throw new RuntimeException("手动确认消息,方法参数需要有Channel");
            }
            try {
                obj = joinPoint.proceed(args);
                // 手动签收
                channel.basicAck(deliveryTag, false);
            } catch (Exception e) {
                // 是否重新投递到队列
                channel.basicNack(deliveryTag, false, false);
                throw e;
            }
        }
        if (log.isInfoEnabled()) {
            log.info("MQ_HDL < [" + (System.nanoTime() - start) / 1000000 + "]ms");
        }
        return obj;
    }

}

消费方RabbitConsumer

在消费方里,类上标记了@MqConsumer注解,在方法上配置了监听队列的名称,以及异常处理类rabbitListenerErrorHandlerImpl

@Component
@MqConsumer
public class RabbitConsumer {
    Log log = LogFactory.getLog(RabbitConsumer.class);

    /**
     * 这里按照实际情况配置,请求参数
     *
     * @param msg
     * @param amqpMessage
     * @param channel
     */
    @RabbitListener(queues = {"${tfb.rabbitmq.properties.configs[0].queues[0].name}"}, errorHandler = "rabbitListenerErrorHandlerImpl")
    public void receiveQueue1Msg(@Payload String msg, Message amqpMessage, Channel channel) {
    // 业务逻辑
    // 模拟异常
    // int i = 1/0;        

    }

}

异常处理类 RabbitListenerErrorHandlerImpl

这个类是实现了RabbitListenerErrorHandler,出现异常之后,handle方法的参数里可以获取得到amqp的Message,org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception,

从而根据具体场景进行处理,比如说异常日志的打印,重试或者存表处理

@Component
public class RabbitListenerErrorHandlerImpl extends ConditionalRejectingErrorHandler implements RabbitListenerErrorHandler {
    private static final Log log = LogFactory.getLog(RabbitListenerErrorHandlerImpl.class);
    private final FatalExceptionStrategy exceptionStrategy = new DefaultExceptionStrategy();

    @Override
    public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception {
        log.error("Execution of Rabbit message listener failed. amqpMessag[{}]", amqpMessage, exception);

        // 目前这里拿不到channel,但是从spring-amqp 2.1.7开始,可以从message的header里获取到。有了channel就能手动nack或ack
        if (!this.causeChainContainsARADRE(exception) && this.exceptionStrategy.isFatal(exception)) {
            ThreadCacheUtil.cleanAllThreadCache();
            throw new AmqpRejectAndDontRequeueException("Error Handler converted exception to fatal", exception);
        }
        ThreadCacheUtil.cleanAllThreadCache();

        return null;
    }
}

说明

    AOP拦截器和消费方配置的异常处理类,有功能重复的地方,比如异常情况下的处理。可以根据实际情况选择其中一个,或者AOP里不处理异常,在异常处理类那边再统一处理。

不同点

    AOP拦截器,需要在执行onMessage具体逻辑的时候才会拦截到。如果消息在进入onMessage具体逻辑之前就报错了,那么无法进入拦截器里的异常处理。

    而消费方配置的异常处理类都可以处理得到异常,除非异常在AOP里被拦截并且没有抛出。

SpringBoot结合RabbitMQ异常处理,有多种方式:

1.AbstractRabbitListenerContainerFactory里的ErrorHandler

#setErrorHandler(ErrorHandler errorHandler)

这里的ErrorHandler是Spring框架异常处理接口,参数只有一个简单的Throwable t,因此无法获取到一些具体的内容,比如消息体等,也无法对消息进行持久化、手动ACK或NACK。

    /**
     * @param errorHandler The error handler.
     * @see AbstractMessageListenerContainer#setErrorHandler(org.springframework.util.ErrorHandler)
     */
    public void setErrorHandler(ErrorHandler errorHandler) {
        this.errorHandler = errorHandler;
    }

AbstractRabbitListenerContainerFactory是抽象类,是用于创建消息监听容器的,有两个实现类:

SimpleRabbitListenerContainerFactory
DirectRabbitListenerContainerFactory

2.AbstractMessageListenerContainer里的ErrorHandler

AbstractMessageListenerContainer是个抽象类,常见的实现类有以下两个:
SimpleMessageListenerContainer
DirectMessageListenerContainer

这里的ErrorHandler和AbstractRabbitListenerContainerFactory里的是一样的。

参数只有一个简单的Throwable t,因此无法获取到一些具体的内容,比如消息体等,也无法对消息进行持久化、手动ACK或NACK。

例子:


  @Bean
  public ErrorHandler errorHandler() {
    // 自定义异常实现类
    return new MqErrorHandler();
  }

  @Bean
  SimpleMessageListenerContainer containerReset(ConnectionFactory connectionFactory,
      MessageListenerAdapter listenerAdapter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setDefaultRequeueRejected(false);
    container.setErrorHandler(errorHandler());
    container.setMessageConverter(jsonConverter());
    container.setQueueNames(getQueueAlert());
    container.setMessageListener(listenerAlertAdapter);
    return container;
  }

3.@RabbitListener里的errorHandler(实际是RabbitListenerErrorHandler)

@RabbitListener


    /**
     * Set an {@link org.springframework.amqp.rabbit.listener.RabbitListenerErrorHandler}
     * to invoke if the listener method throws an exception.
     * @return the error handler.
     * @since 2.0
     */
    String errorHandler() default "";

这里的errorHandler是org.springframework.amqp.rabbit.listener.RabbitListenerErrorHandler

/**
 * An error handler which is called when a {code @RabbitListener} method
 * throws an exception. This is invoked higher up the stack than the
 * listener container's error handler.
 *
 * @author Gary Russell
 * @since 2.0
 *
 */
@FunctionalInterface
public interface RabbitListenerErrorHandler {

    /**
     * Handle the error. If an exception is not thrown, the return value is returned to
     * the sender using normal {@code replyTo/@SendTo} semantics.
     * @param amqpMessage the raw message received.
     * @param message the converted spring-messaging message.
     * @param exception the exception the listener threw, wrapped in a
     * {@link ListenerExecutionFailedException}.
     * @return the return value to be sent to the sender.
     * @throws Exception an exception which may be the original or different.
     */
    Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
            ListenerExecutionFailedException exception) throws Exception;

}
handleError(Message amqpMessage, org.springframework.messaging.Message<?> message,
      ListenerExecutionFailedException exception)

参数有原生的message,还有转换后的spring-messaging的message,还有包装后的ListenerExecutionFailedException异常信息,于是可以根据这些信息进行消息的处理,包括收到的消息内容是什么,异常信息是什么,以及在这里进行消息的持久化等操作。

如果需要对消息进行手动ACK或NACK,那么就需要获取到Channel才能进行,Channel是和MQ连接的通道,deliveryTag可以从message里获取到。

源码:

Channel接口里的basicAck和basicNack方法。

    /**
     * Acknowledge one or several received
     * messages. Supply the deliveryTag from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
     * or {@link com.rabbitmq.client.AMQP.Basic.Deliver} method
     * containing the received message being acknowledged.
     * @see com.rabbitmq.client.AMQP.Basic.Ack
     * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
     * @param multiple true to acknowledge all messages up to and
     * including the supplied delivery tag; false to acknowledge just
     * the supplied delivery tag.
     * @throws java.io.IOException if an error is encountered
     */
    void basicAck(long deliveryTag, boolean multiple) throws IOException;

    /**
     * Reject one or several received messages.
     *
     * Supply the <code>deliveryTag</code> from the {@link com.rabbitmq.client.AMQP.Basic.GetOk}
     * or {@link com.rabbitmq.client.AMQP.Basic.GetOk} method containing the message to be rejected.
     * @see com.rabbitmq.client.AMQP.Basic.Nack
     * @param deliveryTag the tag from the received {@link com.rabbitmq.client.AMQP.Basic.GetOk} or {@link com.rabbitmq.client.AMQP.Basic.Deliver}
     * @param multiple true to reject all messages up to and including
     * the supplied delivery tag; false to reject just the supplied
     * delivery tag.
     * @param requeue true if the rejected message(s) should be requeued rather
     * than discarded/dead-lettered
     * @throws java.io.IOException if an error is encountered
     */
    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
            throws IOException;

这里需要注意,spring-amqp版本在2.1.7以前,这里的org.springframework.messaging.Message<?> message里无法获取到Channel,因此无法进行手动ACK、NACK处理,详见Stack Overflow和官方github里的升级记录:

spring boot - How to requeue or reject in RabbitListenerErrorHandler on MANUAL ack mode? - Stack Overflow

Add AmqpHeaders.CHANNEL in error hander · garyrussell/spring-amqp@b314a5f (github.com)

坑的是,本次使用的环境信息里,spring-boot-starter-amqp:2.0.8.RELEASE里包含的spring-amqp版本是2.0.11.RELEASE,不支持!!

实例:

RabbitListenerErrorHandlerImpl.java

@Component
public class RabbitListenerErrorHandlerImpl extends ConditionalRejectingErrorHandler implements RabbitListenerErrorHandler {
    private static final Log log = LogFactory.getLog(RabbitListenerErrorHandlerImpl.class);
    private final FatalExceptionStrategy exceptionStrategy = new DefaultExceptionStrategy();

    @Override
    public Object handleError(Message amqpMessage, org.springframework.messaging.Message<?> message, ListenerExecutionFailedException exception) throws Exception {
        log.error("Execution of Rabbit message listener failed. amqpMessag[{}]", amqpMessage, exception);
        

        // 这里可以根据异常的类型等进行精细的判断,决定是否需要ack,以及是否需要重新投递
        //if (!this.causeChainContainsARADRE(exception) && this.exceptionStrategy.isFatal(exception)) {
                    
        //}
        message.getHeaders().get(AmqpHeaders.CHANNEL, Channel.class)
                .basicReject(message.getHeaders().get(AmqpHeaders.DELIVERY_TAG, Long.class), false);

}

RabbitConsumer.java

@Component
@MqConsumer
@RabbitListener(queues = {"queueName}"}, errorHandler = "rabbitListenerErrorHandlerImpl")
public class RabbitConsumer {
    Log log = LogFactory.getLog(RabbitConsumer.class);

    @RabbitHandler(isDefault = true)
    public void receiveQueue3Msg(@Payload MqRequestDto msg, org.springframework.messaging.Message<?> message, Channel channel, @Headers Map headers) {
        log.info(msg);
    }

    @RabbitHandler()
    public void receiveQueue31Msg(@Payload String msg, org.springframework.messaging.Message<?> message, Channel channel, @Headers Map headers) {
        log.info(message.getPayload());
    }

}

4.RabbitTemplate里的ErrorHandler


    /**
     * When using a direct reply-to container for request/reply operations, set an error
     * handler to be invoked when a reply delivery fails (e.g. due to a late reply).
     * @param replyErrorHandler the reply error handler
     * @since 2.0.11
     * @see #setUseDirectReplyToContainer(boolean)
     */
    public void setReplyErrorHandler(ErrorHandler replyErrorHandler) {
        this.replyErrorHandler = replyErrorHandler;
    }

解决方案

    采用上方的@RabbitListener里的errorHandler(实际是RabbitListenerErrorHandler)的方式来处理,并把spring-amqp的版本提升到2.1.7以上

    之所以不采用AOP方式,是因为上面提到的异同点:
    AOP拦截器,需要在执行onMessage具体逻辑的时候才会拦截到。如果消息在进入@RabbitListener的具体逻辑之前就报错了,那么无法进入拦截器里的异常处理。

    而消费方配置的异常处理类都可以处理得到异常,除非异常在AOP里被拦截并且没有抛出。
    比如:如果消费方配置错了,导致消息无法进入@RabbitListener的具体处理逻辑,不能成功消费或者拒绝,一直停留在Unacked状态

错误示例:

    @RabbitListener(queues = {"queueName"}, errorHandler = "rabbitListenerErrorHandlerImpl")
    // 参数里有个MessageProperties,导致消息转换异常,无法进入方法里
    public void receiveQueue2Msg(String msg, MessageProperties messageProperties, Channel channel, @Headers Map headers) {
        log.info(msg);
        // 业务逻辑
    }

收到消息之后,抛出异常:

Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.eternalinfo.framework.mq.consumer.RabbitConsumer.receiveQueue2Msg(java.lang.String,org.springframework.amqp.core.MessageProperties,com.rabbitmq.client.Channel,java.util.Map)]
Bean [com.eternalinfo.framework.mq.consumer.RabbitConsumer@c088be]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:191)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:126)
... 9 common frames omitted

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.amqp.core.MessageProperties] for GenericMessage


本文转载自: https://blog.csdn.net/lzhfdxhxm/article/details/125643854
版权归原作者 lzhfdxhxm 所有, 如有侵权,请联系我们删除。

“RabbitMQ,手动ACK情况下,消费消息的时候出现异常,如何手动ACK或NACK”的评论:

还没有评论