0


Skywalking Kafka Tracing实现

背景

Skywalking默认场景下,Tracing对于消息队列的发送场景,无法将TraceId传递到下游消费者,但对于微服务场景下,是有大量消息队列的业务场景的,这显然无法满足业务预期。

解决方案

Skywalking的官方社区中,有用户提出了该场景问题,Skywalking在补充工具包中,提供了对Kafka的tracing支持。

skywalking kafka problem

代码实现:

<dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-toolkit-kafka</artifactId><version>${skywalking.version}</version></dependency>

对于该工具包,默认情况下,是针对KafkaTemplate进行trace,即如果使用KafkaTemplate发送消息,代码层面无需做任何改动。

如果没有使用KafkaTemplate的场景,toolkit也提供的了注解的支持:

publicclassConsumerThread2extendsThread{@Overridepublicvoidrun(){Properties consumerProperties =newProperties();//...consumerProperties.put()KafkaConsumer<String,String> consumer =newKafkaConsumer<>(consumerProperties);
        consumer.subscribe(topicPattern,newNoOpConsumerRebalanceListener());while(true){if(pollAndInvoke(consumer))break;}
        consumer.close();}@KafkaPollAndInvokeprivatebooleanpollAndInvoke(KafkaConsumer<String,String> consumer){try{Thread.sleep(1000);}catch(InterruptedException e){}ConsumerRecords<String,String> records = consumer.poll(100);if(!records.isEmpty()){OkHttpClient client =newOkHttpClient.Builder().build();Request request =newRequest.Builder().url("http://localhost:8080/kafka-scenario/case/kafka-thread2-ping").build();Response response =null;try{
                response = client.newCall(request).execute();}catch(IOException e){}
            response.body().close();returntrue;}returnfalse;}}

异步线程Tracing

对于Kafka消息的发送,经常会配合异步线程池的场景使用,Tracing的基本原理是基于ThreadLocal进行实现的,那么对于异步场景,是会丢失TraceId,通常的解决方式,是需要手动将主线程的TraceId手动赋值给子线程,但这种方式需要手动代码侵入,并不友好。

幸运的是,Skywalking的toolkit中提供了对于异步线程tracing的支持。

<dependency><groupId>org.apache.skywalking</groupId><artifactId>apm-toolkit-trace</artifactId><version>${skywalking.version}</version></dependency>

推荐用法:

ExecutorService executorService =Executors.newFixedThreadPool(1);
executorService.execute(RunnableWrapper.of(newRunnable(){@Overridepublicvoidrun(){//your code}}));

或者:

@TraceCrossThreadpublicstaticclassMyCallable<String>implementsCallable<String>{@OverridepublicStringcall()throwsException{returnnull;}}...ExecutorService executorService =Executors.newFixedThreadPool(1);
    executorService.submit(newMyCallable());

PS:事实上,RunnableWrapper也是基于@TraceCrossThread实现。

相关文档:
https://skywalking.apache.org/docs/skywalking-java/v8.16.0/en/setup/service-agent/java-agent/application-toolkit-kafka/

https://skyapm.github.io/document-cn-translation-of-skywalking/zh/6.1.0/setup/service-agent/java-agent/Application-toolkit-trace-cross-thread.html

https://blog.51cto.com/knifeedge/5268667

https://blog.csdn.net/lijunwyf/article/details/107954543


本文转载自: https://blog.csdn.net/wtopps/article/details/132447230
版权归原作者 wtopps 所有, 如有侵权,请联系我们删除。

“Skywalking Kafka Tracing实现”的评论:

还没有评论