背景
TraceId能标记一次请求的调用链路,在我们排查问题的时候十分重要。系统引入MQ后,MQ消息默认不带TraceId,所以消息发送和处理的链路就断了。下面分享如何对业务逻辑无感的方式,将TraceId带到消费端。
难点
RabbitMQ的Message对象可以在属性上设置头信息,所以携带TraceId的位置有了,问题是怎么无感的方式设置和获取TraceId?
Spring RabbitMQ拦截器
在Spring里使用RabbitMQ本身没有拦截器,但是有一个消息处理器,可以在发送和接收消息之前对消息进行处理。里面有3个重载的方法,对原始消息进行转换。我们可以借助这个处理器,在Message对象里加上TraceId。
public interface MessagePostProcessor {
/**
* Change (or replace) the message.
* @param message the message.
* @return the message.
* @throws AmqpException an exception.
*/
Message postProcessMessage(Message message) throws AmqpException;
/**
* Change (or replace) the message and/or change its correlation data. Only applies to
* outbound messages.
* @param message the message.
* @param correlation the correlation data.
* @return the message.
* @since 1.6.7
*/
default Message postProcessMessage(Message message, Correlation correlation) {
return postProcessMessage(message);
}
/**
* Change (or replace) the message and/or change its correlation data. Only applies to
* outbound messages.
* @param message the message.
* @param correlation the correlation data.
* @param exchange the exchange to which the message is to be sent.
* @param routingKey the routing key.
* @return the message.
* @since 2.3.4
*/
default Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
return postProcessMessage(message, correlation);
}
}
发送消息时,携带TraceId
Spring默认使用RabbitTemplate来发送消息,RabbitTemplate的send方法在发送消息之前,会调用beforePublishPostProcessors来处理Message对象。beforePublishPostProcessors集合里存的就是MessagePostProcessor对象,它会在发消息之前执行:
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
// ...
// 核心代码
if (this.beforePublishPostProcessors != null) {
for (MessagePostProcessor processor : this.beforePublishPostProcessors) {
messageToUse = processor.postProcessMessage(messageToUse, correlationData, exch, rKey);
}
}
// ...
sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
// ...
}
我们要做的是在注册RabbitTemplate的时候加上处理TraceId的逻辑
@Bean
public RabbitTemplate rabbitTemplate(RabbitTemplateConfigurer configurer, ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate();
configurer.configure(template, connectionFactory);
template.setBeforePublishPostProcessors(new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 这个方法没调用,调用的是下面那个方法
return message;
}
@Override
public Message postProcessMessage(Message message, Correlation correlation, String exchange, String routingKey) {
String requestId = MDC.get(MdcConstants.REQUESTID);
if (StringUtils.isEmpty(requestId)) {
requestId = StringUtils.random();
}
message.getMessageProperties().setHeader(RabbitMQConstants.HEADER_REQUESTID, requestId);
return message;
}
});
return template;
}
消费消息时,获取TraceId
当我们通过@RabbitListener注册consumer时,Spring会通过org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#mainLoop方法,不断从consumer队列里拿到消息。在把消息交给@RabbitListener标注的对象前,也会对Message对象进行处理。这里的处理器是存在org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer#afterReceivePostProcessors属性上。
private void doExecuteListener(Channel channel, Object data) {
if (data instanceof Message) {
Message message = (Message) data;
if (this.afterReceivePostProcessors != null) {
for (MessagePostProcessor processor : this.afterReceivePostProcessors) {
message = processor.postProcessMessage(message);
if (message == null) {
throw new ImmediateAcknowledgeAmqpException(
"Message Post Processor returned 'null', discarding message");
}
}
}
// ...
invokeListener(channel, message);
}
else {
invokeListener(channel, data);
}
}
怎么设置SimpleMessageListenerContainer#afterReceivePostProcessors的值?
SimpleMessageListenerContainer对象是由SimpleRabbitListenerContainerFactory工厂对象创建,在创建SimpleMessageListenerContainer对象时,会把工厂里的属性拷贝过来,afterReceivePostProcessors就是通过工厂拷过来。所以我们直接设置SimpleRabbitListenerContainerFactory的afterReceivePostProcessors值就可以。
@Bean(name = {"rabbitListenerContainerFactory"})
@ConditionalOnProperty(
prefix = "spring.rabbitmq.listener",
name = {"type"},
havingValue = "simple",
matchIfMissing = true
)
public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory, ObjectProvider<ContainerCustomizer<SimpleMessageListenerContainer>> simpleContainerCustomizer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
simpleContainerCustomizer.ifUnique(factory::setContainerCustomizer);
factory.setAfterReceivePostProcessors(message -> {
Object requestId = message.getMessageProperties().getHeader(RabbitMQConstants.HEADER_REQUESTID);
if (StringUtils.isEmpty(requestId)) {
requestId = StringUtils.random();
}
MDC.put(MdcConstants.REQUESTID, String.valueOf(requestId));
return message;
});
return factory;
}
这样,我们就能在@RabbitListener的消费逻辑里拿到TraceId。
版权归原作者 李昂的数字之旅 所有, 如有侵权,请联系我们删除。