0


【注意】Kafka生产者异步发送消息仍有可能阻塞

文章目录

问题描述

Kafka是常用的消息中间件。在Spring Boot项目中,使用KafkaTemplate作为生产者发送消息。有时,为了不影响主业务流程,会采用异步发送的方式,如下所示。

  1. @Slf4j@ComponentpublicclassKafkaSender{@ResourceprivateKafkaTemplate<String,String> kafkaTemplate;publicvoidsendAsync(String topic,String message){
  2. kafkaTemplate.send(topic, message).addCallback(
  3. sendResult -> log.info("Send success"),
  4. e -> log.error("Send failed", e));}}

本以为采用异步发送,必然不会影响到主业务流程。但实际使用时发现,在第一次发送消息时,如果Kafka Broker连接失败,调用

  1. sendAsync()

方法的主线程会长时间阻塞。这点是出乎意料的。

原因分析

跟踪源码可知,Kafka生产者在第一次发送消息时,会尝试从Broker获取元数据Metadata(见

  1. KafkaProducer

  1. waitOnMetadata()

方法),如果Broker连接失败,则会一直阻塞于此,循环尝试获取,直至超时(超时时间由

  1. max.block.ms

定义)。

  1. /**
  2. * Wait for cluster metadata including partitions for the given topic to be available.
  3. * @param topic The topic we want metadata for
  4. * @param partition A specific partition expected to exist in metadata, or null if there's no preference
  5. * @param nowMs The current time in ms
  6. * @param maxWaitMs The maximum time in ms for waiting on the metadata
  7. * @return The cluster containing topic metadata and the amount of time we waited in ms
  8. * @throws TimeoutException if metadata could not be refreshed within {@code max.block.ms}
  9. * @throws KafkaException for all Kafka-related exceptions, including the case where this method is called after producer close
  10. */privateClusterAndWaitTimewaitOnMetadata(String topic,Integer partition,long nowMs,long maxWaitMs)throwsInterruptedException{// add topic to metadata topic list if it is not there already and reset expiryCluster cluster = metadata.fetch();if(cluster.invalidTopics().contains(topic))thrownewInvalidTopicException(topic);
  11. metadata.add(topic, nowMs);Integer partitionsCount = cluster.partitionCountForTopic(topic);// Return cached metadata if we have it, and if the record's partition is either undefined// or within the known partition rangeif(partitionsCount !=null&&(partition ==null|| partition < partitionsCount))returnnewClusterAndWaitTime(cluster,0);long remainingWaitMs = maxWaitMs;long elapsed =0;// Issue metadata requests until we have metadata for the topic and the requested partition,// or until maxWaitTimeMs is exceeded. This is necessary in case the metadata// is stale and the number of partitions for this topic has increased in the meantime.do{if(partition !=null){
  12. log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);}else{
  13. log.trace("Requesting metadata update for topic {}.", topic);}
  14. metadata.add(topic, nowMs + elapsed);int version = metadata.requestUpdateForTopic(topic);
  15. sender.wakeup();try{
  16. metadata.awaitUpdate(version, remainingWaitMs);}catch(TimeoutException ex){// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMsthrownewTimeoutException(String.format("Topic %s not present in metadata after %d ms.",
  17. topic, maxWaitMs));}
  18. cluster = metadata.fetch();
  19. elapsed = time.milliseconds()- nowMs;if(elapsed >= maxWaitMs){thrownewTimeoutException(partitionsCount ==null?String.format("Topic %s not present in metadata after %d ms.",
  20. topic, maxWaitMs):String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
  21. partition, topic, partitionsCount, maxWaitMs));}
  22. metadata.maybeThrowExceptionForTopic(topic);
  23. remainingWaitMs = maxWaitMs - elapsed;
  24. partitionsCount = cluster.partitionCountForTopic(topic);}while(partitionsCount ==null||(partition !=null&& partition >= partitionsCount));returnnewClusterAndWaitTime(cluster, elapsed);}

也就是说,Kafka生产者在发送消息前,要先获取到Metadata。对于异步发送,虽然消息发送的过程是非阻塞的,但获取Metadata的过程是阻塞的。如果因为Broker连接失败、Topic未创建等原因而一直获取不到Metadata,主线程将长时间阻塞。

解决办法

解决办法也很简单。如果Kafka发送消息并非关键业务,为了不影响主业务流程的进行,可以创建线程池来专门执行消息发送工作,保证

  1. sendAsync()

方法一定是异步执行的。注意,线程池大小和工作队列长度需要合理限定,避免因阻塞任务过多而OOM;拒绝策略可以视情况选择DiscardPolicy。

另外,还可以考虑指定

  1. max.block.ms

,来限制获取Metadata的最大阻塞时间(默认60000ms):

  1. spring:kafka:producer:properties:max.block.ms:1000

实际上,在异步发送消息的过程中,除了因为获取不到Metadata而阻塞外,还可能因为消息缓冲池已满而阻塞(参考:Kafka Producer 异步发送消息居然也会阻塞?)。这2种阻塞的超时时间均由

  1. max.block.ms

定义。

总结

Kafka生产者异步发送消息的方法(如Spring Boot中的

  1. kafkaTemplate.send()

),看似异步,实则可能阻塞。由于发送消息前需要获取元数据Metadata,如果一直获取失败(可能原因包括Broker连接失败、Topic未创建等),将导致长时间阻塞。这点与我们的一般理解不符,需要特别注意。

标签: kafka java 中间件

本文转载自: https://blog.csdn.net/weixin_43952265/article/details/128754720
版权归原作者 Season-0209 所有, 如有侵权,请联系我们删除。

“【注意】Kafka生产者异步发送消息仍有可能阻塞”的评论:

还没有评论