0


RabbitMQ消息的链路跟踪

背景

TraceId能标记一次请求的调用链路,在我们排查问题的时候十分重要。系统引入MQ后,MQ消息默认不带TraceId,所以消息发送和处理的链路就断了。下面分享如何对业务逻辑无感的方式,将TraceId带到消费端。

难点

RabbitMQ的Message对象可以在属性上设置头信息,所以携带TraceId的位置有了,问题是怎么无感的方式设置和获取TraceId?

Spring RabbitMQ拦截器

在Spring里使用RabbitMQ本身没有拦截器,但是有一个消息处理器,可以在发送和接收消息之前对消息进行处理。里面有3个重载的方法,对原始消息进行转换。我们可以借助这个处理器,在Message对象里加上TraceId。

public interface MessagePostProcessor {

    /**
     * Change (or replace) the message.
     * @param message the message.
     * @return the message.
     * @throws AmqpException an exception.
     */
    Message postProcessMessage(Message message) throws AmqpException;

    /**
     * Change (or replace) the message and/or change its correlation data. Only applies to
     * outbound messages.
     * @param message the message.
     * @param correlation the correlation data.
     * @return the message.
     * @since 1.6.7
     */
    default Message postProcessMessage(Message message, Correlation correlation) {
        return postProcessMessage(message);
    }

    /**
     * Change (or replace) the message and/or change its correlation data. Only applies to
     * outbound messages.
     * @param message the message.
     * @param correlation the correlation data.
     * @param exchange the exchange to which the message is to be sent.
     * @param routingKey the routing key.
     * @return the message.
     * @since 2.3.4
     */
    default Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
        return postProcessMessage(message, correlation);
    }

}

发送消息时,携带TraceId

Spring默认使用RabbitTemplate来发送消息,RabbitTemplate的send方法在发送消息之前,会调用beforePublishPostProcessors来处理Message对象。beforePublishPostProcessors集合里存的就是MessagePostProcessor对象,它会在发消息之前执行:

public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
            boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
    // ...
    // 核心代码
    if (this.beforePublishPostProcessors != null) {
        for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
            messageToUse = processor.postProcessMessage(messageToUse, correlationData, exch, rKey);
        }
    }
    // ...

    sendToRabbit(channel, exch, rKey, mandatory, messageToUse);

    // ...
}

我们要做的是在注册RabbitTemplate的时候加上处理TraceId的逻辑

@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
    RabbitTemplate template = new RabbitTemplate();
    configurer.configure(template, connectionFactory);

    template.setBeforePublishPostProcessors(new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 这个方法没调用,调用的是下面那个方法
            return message;
        }

        @Override
        public Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
            String requestId = MDC.get(MdcConstants.REQUESTID);
            if (StringUtils.isEmpty(requestId)) {
                requestId = StringUtils.random();
            }
            message.getMessageProperties().setHeader(RabbitMQConstants.HEADER_REQUESTID, requestId);
            return message;
        }
    });
    return template;
}

消费消息时,获取TraceId

当我们通过@RabbitListener注册consumer时,Spring会通过org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop方法,不断从consumer队列里拿到消息。在把消息交给@RabbitListener标注的对象前,也会对Message对象进行处理。这里的处理器是存在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#afterReceivePostProcessors属性上。

private void doExecuteListener(Channel channel, Object data) {
    if (data instanceof Message) {
        Message message = (Message) data;
        if (this.afterReceivePostProcessors != null) {
            for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
                message = processor.postProcessMessage(message);
                if (message == null) {
                    throw new ImmediateAcknowledgeAmqpException(
                            "Message Post Processor returned 'null', discarding message");
                }
            }
        }
        // ...

        invokeListener(channel, message);
    }
    else {
        invokeListener(channel, data);
    }
}

怎么设置SimpleMessageListenerContainer#afterReceivePostProcessors的值?

SimpleMessageListenerContainer对象是由SimpleRabbitListenerContainerFactory工厂对象创建,在创建SimpleMessageListenerContainer对象时,会把工厂里的属性拷贝过来,afterReceivePostProcessors就是通过工厂拷过来。所以我们直接设置SimpleRabbitListenerContainerFactory的afterReceivePostProcessors值就可以。

@Bean(name = {"rabbitListenerContainerFactory"})
@ConditionalOnProperty(
        prefix = "spring.rabbitmq.listener",
        name = {"type"},
        havingValue = "simple",
        matchIfMissing = true
)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory, ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizer) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    configurer.configure(factory, connectionFactory);
    simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer);

    factory.setAfterReceivePostProcessors(message -> {
        Object requestId = message.getMessageProperties().getHeader(RabbitMQConstants.HEADER_REQUESTID);
        if (StringUtils.isEmpty(requestId)) {
            requestId = StringUtils.random();
        }
        MDC.put(MdcConstants.REQUESTID, String.valueOf(requestId));
        return message;
    });
    return factory;
}

这样,我们就能在@RabbitListener的消费逻辑里拿到TraceId。

标签: rabbitmq 分布式

本文转载自: https://blog.csdn.net/xsgnzb/article/details/133385943
版权归原作者 李昂的数字之旅 所有, 如有侵权,请联系我们删除。

“RabbitMQ消息的链路跟踪”的评论:

还没有评论