0


Kafka在K8S中启动时间慢问题

背景

Kafka通过K8S容器化部署,Kafka重启过后,部分集群由于分区众多,会导致集群的启动过程异常缓慢,有部分现场出现需要几个小时才能正常。

又由于Kafka的存活探针最大时长为n分钟,如果服务n分钟没有起来,会导致Kafka服务反复重启,无法恢复。

通过查看Kafka启动的日志,可以

  1. 2022-11-10 15:05:43.367 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-19, dir=/var/lib/kafka] Loading producer state till offset 152509 with message format version 2
  2. 2022-11-10 15:05:43.368 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-19] Loading producer state from snapshot file '/var/lib/kafka/test1-19/00000000000000152509.snapshot'
  3. 2022-11-10 15:05:43.371 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-19, topic=test1, partition=19, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=152509) with 1 segments in 202021ms (2/80 loaded in /var/lib/kafka)
  4. 2022-11-10 15:05:43.375 [pool-6-thread-6] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-5, dir=/var/lib/kafka] Recovering unflushed segment 0
  5. 2022-11-10 15:05:43.376 [pool-6-thread-6] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-5, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
  6. 2022-11-10 15:05:43.416 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-18, dir=/var/lib/kafka] Recovering unflushed segment 0
  7. 2022-11-10 15:05:43.417 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-18, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
  8. 2022-11-10 15:05:50.287 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-16] Writing producer snapshot at offset 152586
  9. 2022-11-10 15:05:50.506 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-16, dir=/var/lib/kafka] Loading producer state till offset 152586 with message format version 2
  10. 2022-11-10 15:05:50.507 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-16] Loading producer state from snapshot file '/var/lib/kafka/test1-16/00000000000000152586.snapshot'
  11. 2022-11-10 15:05:50.511 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-16, topic=test1, partition=16, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=152586) with 1 segments in 209161ms (3/80 loaded in /var/lib/kafka)
  12. 2022-11-10 15:05:50.568 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-24, dir=/var/lib/kafka] Recovering unflushed segment 0
  13. 2022-11-10 15:05:50.569 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-24, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
  14. 2022-11-10 15:05:55.771 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-4] Writing producer snapshot at offset 150762
  15. 2022-11-10 15:05:56.598 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-4, dir=/var/lib/kafka] Loading producer state till offset 150762 with message format version 2
  16. 2022-11-10 15:05:56.600 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-4] Loading producer state from snapshot file '/var/lib/kafka/test1-4/00000000000000150762.snapshot'
  17. 2022-11-10 15:05:56.604 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-4, topic=test1, partition=4, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=150762) with 1 segments in 215254ms (4/80 loaded in /var/lib/kafka)
  18. 2022-11-10 15:05:56.678 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-6, dir=/var/lib/kafka] Recovering unflushed segment 0
  19. 2022-11-10 15:05:56.679 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-6, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
  20. 2022-11-10 15:06:00.545 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-10] Writing producer snapshot at offset 151632
  21. 2022-11-10 15:06:00.886 [pool-6-thread-5] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-7] Writing producer snapshot at offset 152082
  22. 2022-11-10 15:06:01.013 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-10, dir=/var/lib/kafka] Loading producer state till offset 151632 with message format version 2
  23. 2022-11-10 15:06:01.014 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-10] Loading producer state from snapshot file '/var/lib/kafka/test1-10/00000000000000151632.snapshot'
  24. 2022-11-10 15:06:01.019 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-10, topic=test1, partition=10, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=151632) with 1 segments in 219669ms (5/80 loaded in /var/lib/kafka)

从日志可以看出,在集群启动过程中, test1-19, test1-16, test1-4, test1-10 四个分区,分别由4个线程(pool-6-thread-2,pool-6-thread-6,pool-6-thread-1,pool-6-thread-8)进行加载,每个加载的时长都在200s左右。

整个集群有80个分区需要加载,当前已经完成5个(5/80)

对于这个问题,Kafka 官方给出的解决方案是增加集群的恢复线程数(num.recovery.threads.per.data.dir),来加快集群的恢复速度。

  • Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time.

Apache Kafka

[KAFKA-6075] Kafka cannot recover after an unclean shutdown on Windows - ASF JIRA

但在实践的执行过程中,我们发现。如果将恢复线程调整到一个极大的值 ,比如100,磁盘的ioutil 会直接到达100%, 容器的load,也会处于一个非常高的值。

集群的恢复速度直接和磁盘的io挂钩,一个有700G的节点,以70MiB/s的速度加载数据,需要约3个小时(10000s) 才能完成数据的加载。

查看相关源码

  1. private def loadLogs(): Unit = {
  2. info(s"Loading logs from log dirs $liveLogDirs")
  3. val startMs = time.hiResClockMs()
  4. val threadPools = ArrayBuffer.empty[ExecutorService]
  5. val offlineDirs = mutable.Set.empty[(String, IOException)]
  6. val jobs = mutable.Map.empty[File, Seq[Future[_]]]
  7. var numTotalLogs = 0
  8. for (dir <- liveLogDirs) {
  9. val logDirAbsolutePath = dir.getAbsolutePath
  10. try {
  11. val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) // 初始化 num.recovery.threads.per.data.dir 个线程的线程池
  12. threadPools.append(pool)
  13. //使用文档标记是否正常关闭。
  14. val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
  15. // 如果 .kafka_cleanshutdown 文件存在,则将该文件删除并记录 hadCleanShutdown 状态,后续不需要进行日志恢复的流程。
  16. if (cleanShutdownFile.exists) {
  17. info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
  18. } else {
  19. // log recovery itself is being performed by `Log` class during initialization
  20. info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
  21. brokerState.newState(RecoveringFromUncleanShutdown)
  22. }
  23. // 从 recovery-point-offset-checkpoint 文件读取所有 tp 目录的 recoveryPoint
  24. var recoveryPoints = Map[TopicPartition, Long]()
  25. try {
  26. recoveryPoints = this.recoveryPointCheckpoints(dir).read
  27. } catch {
  28. case e: Exception =>
  29. warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +
  30. s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
  31. }
  32. var logStartOffsets = Map[TopicPartition, Long]()
  33. try {
  34. logStartOffsets = this.logStartOffsetCheckpoints(dir).read
  35. } catch {
  36. case e: Exception =>
  37. warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +
  38. s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
  39. }
  40. // 日志的加载与恢复主流程,并发对所有 tp 的日志执行 loadLog
  41. val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(_.isDirectory)
  42. val numLogsLoaded = new AtomicInteger(0)
  43. numTotalLogs += logsToLoad.length
  44. val jobsForDir = logsToLoad.map { logDir =>
  45. val runnable: Runnable = () => {
  46. try {
  47. debug(s"Loading log $logDir")
  48. val logLoadStartMs = time.hiResClockMs()
  49. val log = loadLog(logDir, recoveryPoints, logStartOffsets)
  50. val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
  51. val currentNumLoaded = numLogsLoaded.incrementAndGet()
  52. info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " +
  53. s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")
  54. } catch {
  55. case e: IOException =>
  56. offlineDirs.add((logDirAbsolutePath, e))
  57. error(s"Error while loading log dir $logDirAbsolutePath", e)
  58. }
  59. }
  60. runnable
  61. }
  62. jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
  63. } catch {
  64. case e: IOException =>
  65. offlineDirs.add((logDirAbsolutePath, e))
  66. error(s"Error while loading log dir $logDirAbsolutePath", e)
  67. }
  68. }
  69. try {
  70. for ((cleanShutdownFile, dirJobs) <- jobs) {
  71. dirJobs.foreach(_.get)
  72. try {
  73. cleanShutdownFile.delete()
  74. } catch {
  75. case e: IOException =>
  76. offlineDirs.add((cleanShutdownFile.getParent, e))
  77. error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e)
  78. }
  79. }
  80. offlineDirs.foreach { case (dir, e) =>
  81. logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e)
  82. }
  83. } catch {
  84. case e: ExecutionException =>
  85. error(s"There was an error in one of the threads during logs loading: ${e.getCause}")
  86. throw e.getCause
  87. } finally {
  88. threadPools.foreach(_.shutdown())
  89. }
  90. info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
  91. }

可以看到,日志的恢复过程会读取 cleanShutdownFile 的文件,loadLog(logDir, recoveryPoints, logStartOffsets) 进行恢复。

可以看出,是否 cleanShutdown (正常关闭)是集群能否快速启动的关键。

当对Pod进行重启时,Pod将进行关闭流程。

详细可以参考k8s---pod的优雅退出流程(prestop和terminationGracePeriodSeconds) - du-z - 博客园

发现集群terminationGracePeriodSeconds的默认值是30s, 怀疑时间过短,无法支持Kafka进程进行正常的关闭,将宽限时间调整为300s。并且为集群增加prestop

  1. spec:
  2. preStop:
  3. exec:
  4. command: ["sh", "-ce", "kill -s TERM 1; while $(kill -0 1 2>/dev/null); do sleep 1; done"]

调整后,集群仍然无法正常的进行clean showdown,查看日志。

  1. 2022-11-16 15:03:36.003 [TxnMarkerSenderThread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Transaction Marker Channel Manager 2]: Starting
  2. 2022-11-16 15:03:36.003 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [TransactionCoordinator id=2] Startup complete.
  3. 2022-11-16 15:03:36.043 [ExpirationReaper-2-AlterAcls] INFO kafka.utils.Logging.info(Logging.scala:66) - [ExpirationReaper-2-AlterAcls]: Starting
  4. 2022-11-16 15:03:36.072 [/config/changes-event-process-thread] INFO kafka.utils.Logging.info(Logging.scala:66) - [/config/changes-event-process-thread]: Starting
  5. 2022-11-16 15:03:36.116 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [SocketServer brokerId=2] Starting socket server acceptors and processors
  6. 2022-11-16 15:03:36.123 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [SocketServer brokerId=2] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT)
  7. 2022-11-16 15:03:36.124 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [SocketServer brokerId=2] Started socket server acceptors and processors
  8. 2022-11-16 15:03:36.132 [main] INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:117) - Kafka version: 2.6.2
  9. 2022-11-16 15:03:36.133 [main] INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:118) - Kafka commitId: da65af02e5856e34
  10. 2022-11-16 15:03:36.133 [main] INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:119) - Kafka startTimeMs: 1668582216125
  11. 2022-11-16 15:03:36.136 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [KafkaServer id=2] started
  12. 2022-11-16 15:05:36.204 [main-SendThread(dol-zookeeper:12181)] WARN org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1190) - Client session timed out, have not heard from server in 12001ms for sessionid 0x100000463b00004
  13. 2022-11-16 15:05:36.205 [main-SendThread(dol-zookeeper:12181)] INFO org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1238) - Client session timed out, have not heard from server in 12001ms for sessionid 0x100000463b00004, closing socket connection and attempting reconnect
  14. 2022-11-16 15:05:54.650 [SIGTERM handler] INFO org.apache.kafka.common.utils.LoggingSignalHandler$1.invoke(LoggingSignalHandler.java:89) - Terminating process due to signal SIGTERM
  15. 2022-11-16 15:05:54.654 [kafka-shutdown-hook] INFO kafka.utils.Logging.info(Logging.scala:66) - [KafkaServer id=2] shutting down
  16. 2022-11-16 15:05:54.655 [kafka-shutdown-hook] INFO kafka.utils.Logging.info(Logging.scala:66) - [KafkaServer id=2] Starting controlled shutdown
  17. 2022-11-16 15:05:57.328 [main-SendThread(dol-zookeeper:12181)] ERROR org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:154) - Unable to resolve address: dol-zookeeper:12181
  18. java.net.UnknownHostException: dol-zookeeper: Temporary failure in name resolution
  19. at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
  20. at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:929)
  21. at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1529)
  22. at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:848)
  23. at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1519)
  24. at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1378)
  25. at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
  26. at org.apache.zookeeper.client.StaticHostProvider$1.getAllByName(StaticHostProvider.java:92)
  27. at org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:147)
  28. at org.apache.zookeeper.client.StaticHostProvider.next(StaticHostProvider.java:375)
  29. at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1137)
  30. 2022-11-16 15:05:57.510 [main-SendThread(dol-zookeeper:12181)] WARN org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1246) - Session 0x100000463b00004 for server dol-zookeeper:12181, unexpected error, closing socket connection and attempting reconnect
  31. java.lang.IllegalArgumentException: Unable to canonicalize address dol-zookeeper:12181 because it's not resolvable
  32. at org.apache.zookeeper.SaslServerPrincipal.getServerPrincipal(SaslServerPrincipal.java:71)
  33. at org.apache.zookeeper.SaslServerPrincipal.getServerPrincipal(SaslServerPrincipal.java:39)
  34. at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1087)
  35. at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1139)
  36. 2022-11-16 15:05:57.613 [kafka-shutdown-hook] INFO kafka.utils.Logging.info(Logging.scala:66) - [ZooKeeperClient Kafka server] Waiting until connected.
  37. 2022-11-16 15:05:58.612 [main-SendThread(dol-zookeeper:12181)] ERROR org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:154) - Unable to resolve address: dol-zookeeper:12181
  38. java.net.UnknownHostException: dol-zookeeper
  39. at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
  40. at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1519)
  41. at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1378)
  42. at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
  43. at org.apache.zookeeper.client.StaticHostProvider$1.getAllByName(StaticHostProvider.java:92)
  44. at org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:147)
  45. at org.apache.zookeeper.client.StaticHostProvider.next(StaticHostProvider.java:375)
  46. at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1137)

可以看到,集群正常收到 SIGTERM 指令,并且开始通过 shutdown-hook进行关闭。但是由于无法通过Service访问dol-zookeeper,并且输出了 Waiting until connected. 日志。查看kafka相关代码。

  1. kafka.server.KafkaServer#shutdown
  2. def shutdown(): Unit = {
  3. try {
  4. info("shutting down")
  5. if (isStartingUp.get)
  6. throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
  7. // To ensure correct behavior under concurrent calls, we need to check `shutdownLatch` first since it gets updated
  8. // last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to
  9. // `true` at the end of this method.
  10. if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
  11. CoreUtils.swallow(controlledShutdown(), this)
  12. brokerState.newState(BrokerShuttingDown)
  13. if (dynamicConfigManager != null)
  14. CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
  15. // Stop socket server to stop accepting any more connections and requests.
  16. // Socket server will be shutdown towards the end of the sequence.
  17. if (socketServer != null)
  18. CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
  19. ...
  20. if (logManager != null)
  21. CoreUtils.swallow(logManager.shutdown(), this)
  22. if (kafkaController != null)
  23. CoreUtils.swallow(kafkaController.shutdown(), this)
  24. ...
  25. // Clear all reconfigurable instances stored in DynamicBrokerConfig
  26. config.dynamicConfig.clear()
  27. brokerState.newState(NotRunning)
  28. startupComplete.set(false)
  29. isShuttingDown.set(false)
  30. CoreUtils.swallow(AppInfoParser.unregisterAppInfo(metricsPrefix, config.brokerId.toString, metrics), this)
  31. shutdownLatch.countDown()
  32. info("shut down completed")
  33. }
  34. }
  35. catch {
  36. case e: Throwable =>
  37. fatal("Fatal error during KafkaServer shutdown.", e)
  38. isShuttingDown.set(false)
  39. throw e
  40. }
  41. }

日志文件正常关闭相关逻辑在logManager.shutdown() 中, 但是在 controlledShutdown()中发生了zk的阻塞。

  1. private def controlledShutdown(): Unit = {
  2. def node(broker: Broker): Node = broker.node(config.interBrokerListenerName)
  3. val socketTimeoutMs = config.controllerSocketTimeoutMs
  4. ...
  5. var shutdownSucceeded: Boolean = false
  6. try {
  7. var remainingRetries = retries
  8. var prevController: Broker = null
  9. var ioException = false
  10. while (!shutdownSucceeded && remainingRetries > 0) {
  11. remainingRetries = remainingRetries - 1
  12. // 1. Find the controller and establish a connection to it.
  13. // Get the current controller info. This is to ensure we use the most recent info to issue the
  14. // controlled shutdown request.
  15. // If the controller id or the broker registration are missing, we sleep and retry (if there are remaining retries)
  16. zkClient.getControllerId match {
  17. case Some(controllerId) =>
  18. zkClient.getBroker(controllerId) match {
  19. case Some(broker) =>
  20. // if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
  21. // attempt, connect to the most recent controller
  22. if (ioException || broker != prevController) {
  23. ioException = false
  24. if (prevController != null)
  25. networkClient.close(node(prevController).idString)
  26. prevController = broker
  27. metadataUpdater.setNodes(Seq(node(prevController)).asJava)
  28. }
  29. case None =>
  30. info(s"Broker registration for controller $controllerId is not available (i.e. the Controller's ZK session expired)")
  31. }
  32. case None =>
  33. info("No controller registered in ZooKeeper")
  34. }
  35. // 2. issue a controlled shutdown to the controller
  36. if (prevController != null) {
  37. try {
  38. if (!NetworkClientUtils.awaitReady(networkClient, node(prevController), time, socketTimeoutMs))
  39. throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
  40. // send the controlled shutdown request
  41. val controlledShutdownApiVersion: Short =
  42. if (config.interBrokerProtocolVersion < KAFKA_0_9_0) 0
  43. else if (config.interBrokerProtocolVersion < KAFKA_2_2_IV0) 1
  44. else if (config.interBrokerProtocolVersion < KAFKA_2_4_IV1) 2
  45. else 3
  46. val controlledShutdownRequest = new ControlledShutdownRequest.Builder(
  47. new ControlledShutdownRequestData()
  48. .setBrokerId(config.brokerId)
  49. .setBrokerEpoch(kafkaController.brokerEpoch),
  50. controlledShutdownApiVersion)
  51. val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,
  52. time.milliseconds(), true)
  53. val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time)
  54. val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
  55. if (shutdownResponse.error == Errors.NONE && shutdownResponse.data.remainingPartitions.isEmpty) {
  56. shutdownSucceeded = true
  57. info("Controlled shutdown succeeded")
  58. }
  59. else {
  60. info(s"Remaining partitions to move: ${shutdownResponse.data.remainingPartitions}")
  61. info(s"Error from controller: ${shutdownResponse.error}")
  62. }
  63. }
  64. catch {
  65. case ioe: IOException =>
  66. ioException = true
  67. warn("Error during controlled shutdown, possibly because leader movement took longer than the " +
  68. s"configured controller.socket.timeout.ms and/or request.timeout.ms: ${ioe.getMessage}")
  69. // ignore and try again
  70. }
  71. }
  72. if (!shutdownSucceeded) {
  73. Thread.sleep(config.controlledShutdownRetryBackoffMs)
  74. warn("Retrying controlled shutdown after the previous attempt failed...")
  75. }
  76. }
  77. }
  78. finally
  79. networkClient.close()
  80. shutdownSucceeded
  81. }
  82. if (startupComplete.get() && config.controlledShutdownEnable) {
  83. // We request the controller to do a controlled shutdown. On failure, we backoff for a configured period
  84. // of time and try again for a configured number of retries. If all the attempt fails, we simply force
  85. // the shutdown.
  86. info("Starting controlled shutdown")
  87. brokerState.newState(PendingControlledShutdown)
  88. val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
  89. if (!shutdownSucceeded)
  90. warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
  91. }
  92. }

controlledShutdown 首先向zk发出请求,获取当前 controllerId。但是当前请求是 retryRequestUntilConnected 的,直接导致整个shut-down-hook被卡住无法正常进行。

  1. def getControllerId: Option[Int] = {
  2. val getDataRequest = GetDataRequest(ControllerZNode.path)
  3. val getDataResponse = retryRequestUntilConnected(getDataRequest)
  4. getDataResponse.resultCode match {
  5. case Code.OK => ControllerZNode.decode(getDataResponse.data)
  6. case Code.NONODE => None
  7. case _ => throw getDataResponse.resultException.get
  8. }
  9. }

而zookeeper 无法被正常解析的原因是由于当pod 在被重启的同时,calico 把pod的endpoint 进行了移除。无法再进行网络请求。(测试发现,该问题是calico 版本bug,升级calico版本后问题修复)

  1. 2022-11-16 03:55:26.662 [INFO][93] felix/int_dataplane.go 1447: Received *proto.WorkloadEndpointUpdate update from calculation graph msg=id:<orchestrator_id:"k8s" workload_id:"kedacom-project-namespace/dol-kafka-2" endpoint_id:"eth0" > endpoint:<state:"active" name:"cali607076d309b" profile_ids:"kns.kedacom-project-namespace" profile_ids:"ksa.kedacom-project-namespace.default" ipv4_nets:"10.244.133.146/32" >
  2. 2022-11-16 05:07:11.723 [INFO][93] felix/endpoint_mgr.go 476: Re-evaluated workload endpoint status adminUp=false failed=false known=false operUp=false status="" workloadEndpointID=proto.WorkloadEndpointID{OrchestratorId:"k8s", WorkloadId:"namespace/dol-kafka-2", EndpointId:"eth0"}
  3. 2022-11-16 05:07:11.723 [INFO][93] felix/status_combiner.go 58: Storing endpoint status update ipVersion=0x4 status="" workload=proto.WorkloadEndpointID{OrchestratorId:"k8s", WorkloadId:"namespace/dol-kafka-2", EndpointId:"eth0"}
  4. 2022-11-16 05:07:11.724 [INFO][93] felix/conntrack.go 90: Removing conntrack flows ip=10.244.133.146

由于以上原因,我们尝试直接调整 remainingRetries 为0, 以规避对zk的请求,使logManager 能对正常进行关闭。对应的集群配置为 controlled.shutdown.max.retries=0

重新写入数据进行测试,集群可以在2min内启动。

  1. while (!shutdownSucceeded && remainingRetries > 0) {
  2. remainingRetries = remainingRetries - 1
  3. // 1. Find the controller and establish a connection to it.

从逻辑上来看,进行调整后, 节点重启时,无法进行元数据的更新。如果当前节点是controller节点,无法更新通知其他节点重新选举controller。

  1. metadataUpdater.setNodes(Seq(node(prevController)).asJava)
  2. ...
  3. val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,time.milliseconds(), true)

由于此方案是个人方案经验方案,可能存在一些未知的问题。大家适度酌情食用。

整体总结

1、通过调整 num.recovery.threads.per.data.dir 进行集群恢复,最大速度取决于磁盘的速度。

2、在没有特殊情况下,尽量正常关闭Kafka集群。直接关闭主机等非正常关闭,可能需要重新加载所有数据文件。

3、可以通过增加 宽限时间terminationGracePeriodSeconds 和 prestop 来给Kafka足够的时间执行 shut-down-hook逻辑 。

4、设置controlled.shutdown.max.retries=0 来解决无法访问zookeeper 阻塞shut-down-hook执行的问题。(测试发现,该问题是calico 版本bug,升级calico版本后问题修复)


本文转载自: https://blog.csdn.net/DB29T/article/details/127895132
版权归原作者 嘈嘈切切错杂弹 所有, 如有侵权,请联系我们删除。

“Kafka在K8S中启动时间慢问题”的评论:

还没有评论