背景
Kafka通过K8S容器化部署,Kafka重启过后,部分集群由于分区众多,会导致集群的启动过程异常缓慢,有部分现场出现需要几个小时才能正常。
又由于Kafka的存活探针最大时长为n分钟,如果服务n分钟没有起来,会导致Kafka服务反复重启,无法恢复。
通过查看Kafka启动的日志,可以
2022-11-10 15:05:43.367 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-19, dir=/var/lib/kafka] Loading producer state till offset 152509 with message format version 2
2022-11-10 15:05:43.368 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-19] Loading producer state from snapshot file '/var/lib/kafka/test1-19/00000000000000152509.snapshot'
2022-11-10 15:05:43.371 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-19, topic=test1, partition=19, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=152509) with 1 segments in 202021ms (2/80 loaded in /var/lib/kafka)
2022-11-10 15:05:43.375 [pool-6-thread-6] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-5, dir=/var/lib/kafka] Recovering unflushed segment 0
2022-11-10 15:05:43.376 [pool-6-thread-6] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-5, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
2022-11-10 15:05:43.416 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-18, dir=/var/lib/kafka] Recovering unflushed segment 0
2022-11-10 15:05:43.417 [pool-6-thread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-18, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
2022-11-10 15:05:50.287 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-16] Writing producer snapshot at offset 152586
2022-11-10 15:05:50.506 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-16, dir=/var/lib/kafka] Loading producer state till offset 152586 with message format version 2
2022-11-10 15:05:50.507 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-16] Loading producer state from snapshot file '/var/lib/kafka/test1-16/00000000000000152586.snapshot'
2022-11-10 15:05:50.511 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-16, topic=test1, partition=16, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=152586) with 1 segments in 209161ms (3/80 loaded in /var/lib/kafka)
2022-11-10 15:05:50.568 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-24, dir=/var/lib/kafka] Recovering unflushed segment 0
2022-11-10 15:05:50.569 [pool-6-thread-9] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-24, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
2022-11-10 15:05:55.771 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-4] Writing producer snapshot at offset 150762
2022-11-10 15:05:56.598 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-4, dir=/var/lib/kafka] Loading producer state till offset 150762 with message format version 2
2022-11-10 15:05:56.600 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-4] Loading producer state from snapshot file '/var/lib/kafka/test1-4/00000000000000150762.snapshot'
2022-11-10 15:05:56.604 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-4, topic=test1, partition=4, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=150762) with 1 segments in 215254ms (4/80 loaded in /var/lib/kafka)
2022-11-10 15:05:56.678 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-6, dir=/var/lib/kafka] Recovering unflushed segment 0
2022-11-10 15:05:56.679 [pool-6-thread-1] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-6, dir=/var/lib/kafka] Loading producer state till offset 0 with message format version 2
2022-11-10 15:06:00.545 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-10] Writing producer snapshot at offset 151632
2022-11-10 15:06:00.886 [pool-6-thread-5] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-7] Writing producer snapshot at offset 152082
2022-11-10 15:06:01.013 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - [Log partition=test1-10, dir=/var/lib/kafka] Loading producer state till offset 151632 with message format version 2
2022-11-10 15:06:01.014 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - [ProducerStateManager partition=test1-10] Loading producer state from snapshot file '/var/lib/kafka/test1-10/00000000000000151632.snapshot'
2022-11-10 15:06:01.019 [pool-6-thread-8] INFO kafka.utils.Logging.info(Logging.scala:66) - Completed load of Log(dir=/var/lib/kafka/test1-10, topic=test1, partition=10, highWatermark=0, lastStableOffset=0, logStartOffset=0, logEndOffset=151632) with 1 segments in 219669ms (5/80 loaded in /var/lib/kafka)
从日志可以看出,在集群启动过程中, test1-19, test1-16, test1-4, test1-10 四个分区,分别由4个线程(pool-6-thread-2,pool-6-thread-6,pool-6-thread-1,pool-6-thread-8)进行加载,每个加载的时长都在200s左右。
整个集群有80个分区需要加载,当前已经完成5个(5/80)
对于这个问题,Kafka 官方给出的解决方案是增加集群的恢复线程数(num.recovery.threads.per.data.dir),来加快集群的恢复速度。
- Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time.
Apache Kafka
[KAFKA-6075] Kafka cannot recover after an unclean shutdown on Windows - ASF JIRA
但在实践的执行过程中,我们发现。如果将恢复线程调整到一个极大的值 ,比如100,磁盘的ioutil 会直接到达100%, 容器的load,也会处于一个非常高的值。
集群的恢复速度直接和磁盘的io挂钩,一个有700G的节点,以70MiB/s的速度加载数据,需要约3个小时(10000s) 才能完成数据的加载。
查看相关源码
private def loadLogs(): Unit = {
info(s"Loading logs from log dirs $liveLogDirs")
val startMs = time.hiResClockMs()
val threadPools = ArrayBuffer.empty[ExecutorService]
val offlineDirs = mutable.Set.empty[(String, IOException)]
val jobs = mutable.Map.empty[File, Seq[Future[_]]]
var numTotalLogs = 0
for (dir <- liveLogDirs) {
val logDirAbsolutePath = dir.getAbsolutePath
try {
val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir) // 初始化 num.recovery.threads.per.data.dir 个线程的线程池
threadPools.append(pool)
//使用文档标记是否正常关闭。
val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
// 如果 .kafka_cleanshutdown 文件存在,则将该文件删除并记录 hadCleanShutdown 状态,后续不需要进行日志恢复的流程。
if (cleanShutdownFile.exists) {
info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
} else {
// log recovery itself is being performed by `Log` class during initialization
info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
brokerState.newState(RecoveringFromUncleanShutdown)
}
// 从 recovery-point-offset-checkpoint 文件读取所有 tp 目录的 recoveryPoint
var recoveryPoints = Map[TopicPartition, Long]()
try {
recoveryPoints = this.recoveryPointCheckpoints(dir).read
} catch {
case e: Exception =>
warn(s"Error occurred while reading recovery-point-offset-checkpoint file of directory " +
s"$logDirAbsolutePath, resetting the recovery checkpoint to 0", e)
}
var logStartOffsets = Map[TopicPartition, Long]()
try {
logStartOffsets = this.logStartOffsetCheckpoints(dir).read
} catch {
case e: Exception =>
warn(s"Error occurred while reading log-start-offset-checkpoint file of directory " +
s"$logDirAbsolutePath, resetting to the base offset of the first segment", e)
}
// 日志的加载与恢复主流程,并发对所有 tp 的日志执行 loadLog
val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(_.isDirectory)
val numLogsLoaded = new AtomicInteger(0)
numTotalLogs += logsToLoad.length
val jobsForDir = logsToLoad.map { logDir =>
val runnable: Runnable = () => {
try {
debug(s"Loading log $logDir")
val logLoadStartMs = time.hiResClockMs()
val log = loadLog(logDir, recoveryPoints, logStartOffsets)
val logLoadDurationMs = time.hiResClockMs() - logLoadStartMs
val currentNumLoaded = numLogsLoaded.incrementAndGet()
info(s"Completed load of $log with ${log.numberOfSegments} segments in ${logLoadDurationMs}ms " +
s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")
} catch {
case e: IOException =>
offlineDirs.add((logDirAbsolutePath, e))
error(s"Error while loading log dir $logDirAbsolutePath", e)
}
}
runnable
}
jobs(cleanShutdownFile) = jobsForDir.map(pool.submit)
} catch {
case e: IOException =>
offlineDirs.add((logDirAbsolutePath, e))
error(s"Error while loading log dir $logDirAbsolutePath", e)
}
}
try {
for ((cleanShutdownFile, dirJobs) <- jobs) {
dirJobs.foreach(_.get)
try {
cleanShutdownFile.delete()
} catch {
case e: IOException =>
offlineDirs.add((cleanShutdownFile.getParent, e))
error(s"Error while deleting the clean shutdown file $cleanShutdownFile", e)
}
}
offlineDirs.foreach { case (dir, e) =>
logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e)
}
} catch {
case e: ExecutionException =>
error(s"There was an error in one of the threads during logs loading: ${e.getCause}")
throw e.getCause
} finally {
threadPools.foreach(_.shutdown())
}
info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
}
可以看到,日志的恢复过程会读取 cleanShutdownFile 的文件,loadLog(logDir, recoveryPoints, logStartOffsets) 进行恢复。
可以看出,是否 cleanShutdown (正常关闭)是集群能否快速启动的关键。
当对Pod进行重启时,Pod将进行关闭流程。
详细可以参考k8s---pod的优雅退出流程(prestop和terminationGracePeriodSeconds) - du-z - 博客园
发现集群terminationGracePeriodSeconds的默认值是30s, 怀疑时间过短,无法支持Kafka进程进行正常的关闭,将宽限时间调整为300s。并且为集群增加prestop
spec:
preStop:
exec:
command: ["sh", "-ce", "kill -s TERM 1; while $(kill -0 1 2>/dev/null); do sleep 1; done"]
调整后,集群仍然无法正常的进行clean showdown,查看日志。
2022-11-16 15:03:36.003 [TxnMarkerSenderThread-2] INFO kafka.utils.Logging.info(Logging.scala:66) - [Transaction Marker Channel Manager 2]: Starting
2022-11-16 15:03:36.003 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [TransactionCoordinator id=2] Startup complete.
2022-11-16 15:03:36.043 [ExpirationReaper-2-AlterAcls] INFO kafka.utils.Logging.info(Logging.scala:66) - [ExpirationReaper-2-AlterAcls]: Starting
2022-11-16 15:03:36.072 [/config/changes-event-process-thread] INFO kafka.utils.Logging.info(Logging.scala:66) - [/config/changes-event-process-thread]: Starting
2022-11-16 15:03:36.116 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [SocketServer brokerId=2] Starting socket server acceptors and processors
2022-11-16 15:03:36.123 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [SocketServer brokerId=2] Started data-plane acceptor and processor(s) for endpoint : ListenerName(PLAINTEXT)
2022-11-16 15:03:36.124 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [SocketServer brokerId=2] Started socket server acceptors and processors
2022-11-16 15:03:36.132 [main] INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:117) - Kafka version: 2.6.2
2022-11-16 15:03:36.133 [main] INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:118) - Kafka commitId: da65af02e5856e34
2022-11-16 15:03:36.133 [main] INFO org.apache.kafka.common.utils.AppInfoParser$AppInfo.<init>(AppInfoParser.java:119) - Kafka startTimeMs: 1668582216125
2022-11-16 15:03:36.136 [main] INFO kafka.utils.Logging.info(Logging.scala:66) - [KafkaServer id=2] started
2022-11-16 15:05:36.204 [main-SendThread(dol-zookeeper:12181)] WARN org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1190) - Client session timed out, have not heard from server in 12001ms for sessionid 0x100000463b00004
2022-11-16 15:05:36.205 [main-SendThread(dol-zookeeper:12181)] INFO org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1238) - Client session timed out, have not heard from server in 12001ms for sessionid 0x100000463b00004, closing socket connection and attempting reconnect
2022-11-16 15:05:54.650 [SIGTERM handler] INFO org.apache.kafka.common.utils.LoggingSignalHandler$1.invoke(LoggingSignalHandler.java:89) - Terminating process due to signal SIGTERM
2022-11-16 15:05:54.654 [kafka-shutdown-hook] INFO kafka.utils.Logging.info(Logging.scala:66) - [KafkaServer id=2] shutting down
2022-11-16 15:05:54.655 [kafka-shutdown-hook] INFO kafka.utils.Logging.info(Logging.scala:66) - [KafkaServer id=2] Starting controlled shutdown
2022-11-16 15:05:57.328 [main-SendThread(dol-zookeeper:12181)] ERROR org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:154) - Unable to resolve address: dol-zookeeper:12181
java.net.UnknownHostException: dol-zookeeper: Temporary failure in name resolution
at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:929)
at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1529)
at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:848)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1519)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1378)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
at org.apache.zookeeper.client.StaticHostProvider$1.getAllByName(StaticHostProvider.java:92)
at org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:147)
at org.apache.zookeeper.client.StaticHostProvider.next(StaticHostProvider.java:375)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1137)
2022-11-16 15:05:57.510 [main-SendThread(dol-zookeeper:12181)] WARN org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1246) - Session 0x100000463b00004 for server dol-zookeeper:12181, unexpected error, closing socket connection and attempting reconnect
java.lang.IllegalArgumentException: Unable to canonicalize address dol-zookeeper:12181 because it's not resolvable
at org.apache.zookeeper.SaslServerPrincipal.getServerPrincipal(SaslServerPrincipal.java:71)
at org.apache.zookeeper.SaslServerPrincipal.getServerPrincipal(SaslServerPrincipal.java:39)
at org.apache.zookeeper.ClientCnxn$SendThread.startConnect(ClientCnxn.java:1087)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1139)
2022-11-16 15:05:57.613 [kafka-shutdown-hook] INFO kafka.utils.Logging.info(Logging.scala:66) - [ZooKeeperClient Kafka server] Waiting until connected.
2022-11-16 15:05:58.612 [main-SendThread(dol-zookeeper:12181)] ERROR org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:154) - Unable to resolve address: dol-zookeeper:12181
java.net.UnknownHostException: dol-zookeeper
at java.base/java.net.InetAddress$CachedAddresses.get(InetAddress.java:797)
at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1519)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1378)
at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1306)
at org.apache.zookeeper.client.StaticHostProvider$1.getAllByName(StaticHostProvider.java:92)
at org.apache.zookeeper.client.StaticHostProvider.resolve(StaticHostProvider.java:147)
at org.apache.zookeeper.client.StaticHostProvider.next(StaticHostProvider.java:375)
at org.apache.zookeeper.ClientCnxn$SendThread.run(ClientCnxn.java:1137)
可以看到,集群正常收到 SIGTERM 指令,并且开始通过 shutdown-hook进行关闭。但是由于无法通过Service访问dol-zookeeper,并且输出了 Waiting until connected. 日志。查看kafka相关代码。
kafka.server.KafkaServer#shutdown
def shutdown(): Unit = {
try {
info("shutting down")
if (isStartingUp.get)
throw new IllegalStateException("Kafka server is still starting up, cannot shut down!")
// To ensure correct behavior under concurrent calls, we need to check `shutdownLatch` first since it gets updated
// last in the `if` block. If the order is reversed, we could shutdown twice or leave `isShuttingDown` set to
// `true` at the end of this method.
if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) {
CoreUtils.swallow(controlledShutdown(), this)
brokerState.newState(BrokerShuttingDown)
if (dynamicConfigManager != null)
CoreUtils.swallow(dynamicConfigManager.shutdown(), this)
// Stop socket server to stop accepting any more connections and requests.
// Socket server will be shutdown towards the end of the sequence.
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
...
if (logManager != null)
CoreUtils.swallow(logManager.shutdown(), this)
if (kafkaController != null)
CoreUtils.swallow(kafkaController.shutdown(), this)
...
// Clear all reconfigurable instances stored in DynamicBrokerConfig
config.dynamicConfig.clear()
brokerState.newState(NotRunning)
startupComplete.set(false)
isShuttingDown.set(false)
CoreUtils.swallow(AppInfoParser.unregisterAppInfo(metricsPrefix, config.brokerId.toString, metrics), this)
shutdownLatch.countDown()
info("shut down completed")
}
}
catch {
case e: Throwable =>
fatal("Fatal error during KafkaServer shutdown.", e)
isShuttingDown.set(false)
throw e
}
}
日志文件正常关闭相关逻辑在logManager.shutdown() 中, 但是在 controlledShutdown()中发生了zk的阻塞。
private def controlledShutdown(): Unit = {
def node(broker: Broker): Node = broker.node(config.interBrokerListenerName)
val socketTimeoutMs = config.controllerSocketTimeoutMs
...
var shutdownSucceeded: Boolean = false
try {
var remainingRetries = retries
var prevController: Broker = null
var ioException = false
while (!shutdownSucceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1
// 1. Find the controller and establish a connection to it.
// Get the current controller info. This is to ensure we use the most recent info to issue the
// controlled shutdown request.
// If the controller id or the broker registration are missing, we sleep and retry (if there are remaining retries)
zkClient.getControllerId match {
case Some(controllerId) =>
zkClient.getBroker(controllerId) match {
case Some(broker) =>
// if this is the first attempt, if the controller has changed or if an exception was thrown in a previous
// attempt, connect to the most recent controller
if (ioException || broker != prevController) {
ioException = false
if (prevController != null)
networkClient.close(node(prevController).idString)
prevController = broker
metadataUpdater.setNodes(Seq(node(prevController)).asJava)
}
case None =>
info(s"Broker registration for controller $controllerId is not available (i.e. the Controller's ZK session expired)")
}
case None =>
info("No controller registered in ZooKeeper")
}
// 2. issue a controlled shutdown to the controller
if (prevController != null) {
try {
if (!NetworkClientUtils.awaitReady(networkClient, node(prevController), time, socketTimeoutMs))
throw new SocketTimeoutException(s"Failed to connect within $socketTimeoutMs ms")
// send the controlled shutdown request
val controlledShutdownApiVersion: Short =
if (config.interBrokerProtocolVersion < KAFKA_0_9_0) 0
else if (config.interBrokerProtocolVersion < KAFKA_2_2_IV0) 1
else if (config.interBrokerProtocolVersion < KAFKA_2_4_IV1) 2
else 3
val controlledShutdownRequest = new ControlledShutdownRequest.Builder(
new ControlledShutdownRequestData()
.setBrokerId(config.brokerId)
.setBrokerEpoch(kafkaController.brokerEpoch),
controlledShutdownApiVersion)
val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,
time.milliseconds(), true)
val clientResponse = NetworkClientUtils.sendAndReceive(networkClient, request, time)
val shutdownResponse = clientResponse.responseBody.asInstanceOf[ControlledShutdownResponse]
if (shutdownResponse.error == Errors.NONE && shutdownResponse.data.remainingPartitions.isEmpty) {
shutdownSucceeded = true
info("Controlled shutdown succeeded")
}
else {
info(s"Remaining partitions to move: ${shutdownResponse.data.remainingPartitions}")
info(s"Error from controller: ${shutdownResponse.error}")
}
}
catch {
case ioe: IOException =>
ioException = true
warn("Error during controlled shutdown, possibly because leader movement took longer than the " +
s"configured controller.socket.timeout.ms and/or request.timeout.ms: ${ioe.getMessage}")
// ignore and try again
}
}
if (!shutdownSucceeded) {
Thread.sleep(config.controlledShutdownRetryBackoffMs)
warn("Retrying controlled shutdown after the previous attempt failed...")
}
}
}
finally
networkClient.close()
shutdownSucceeded
}
if (startupComplete.get() && config.controlledShutdownEnable) {
// We request the controller to do a controlled shutdown. On failure, we backoff for a configured period
// of time and try again for a configured number of retries. If all the attempt fails, we simply force
// the shutdown.
info("Starting controlled shutdown")
brokerState.newState(PendingControlledShutdown)
val shutdownSucceeded = doControlledShutdown(config.controlledShutdownMaxRetries.intValue)
if (!shutdownSucceeded)
warn("Proceeding to do an unclean shutdown as all the controlled shutdown attempts failed")
}
}
controlledShutdown 首先向zk发出请求,获取当前 controllerId。但是当前请求是 retryRequestUntilConnected 的,直接导致整个shut-down-hook被卡住无法正常进行。
def getControllerId: Option[Int] = {
val getDataRequest = GetDataRequest(ControllerZNode.path)
val getDataResponse = retryRequestUntilConnected(getDataRequest)
getDataResponse.resultCode match {
case Code.OK => ControllerZNode.decode(getDataResponse.data)
case Code.NONODE => None
case _ => throw getDataResponse.resultException.get
}
}
而zookeeper 无法被正常解析的原因是由于当pod 在被重启的同时,calico 把pod的endpoint 进行了移除。无法再进行网络请求。(测试发现,该问题是calico 版本bug,升级calico版本后问题修复)
2022-11-16 03:55:26.662 [INFO][93] felix/int_dataplane.go 1447: Received *proto.WorkloadEndpointUpdate update from calculation graph msg=id:<orchestrator_id:"k8s" workload_id:"kedacom-project-namespace/dol-kafka-2" endpoint_id:"eth0" > endpoint:<state:"active" name:"cali607076d309b" profile_ids:"kns.kedacom-project-namespace" profile_ids:"ksa.kedacom-project-namespace.default" ipv4_nets:"10.244.133.146/32" >
2022-11-16 05:07:11.723 [INFO][93] felix/endpoint_mgr.go 476: Re-evaluated workload endpoint status adminUp=false failed=false known=false operUp=false status="" workloadEndpointID=proto.WorkloadEndpointID{OrchestratorId:"k8s", WorkloadId:"namespace/dol-kafka-2", EndpointId:"eth0"}
2022-11-16 05:07:11.723 [INFO][93] felix/status_combiner.go 58: Storing endpoint status update ipVersion=0x4 status="" workload=proto.WorkloadEndpointID{OrchestratorId:"k8s", WorkloadId:"namespace/dol-kafka-2", EndpointId:"eth0"}
2022-11-16 05:07:11.724 [INFO][93] felix/conntrack.go 90: Removing conntrack flows ip=10.244.133.146
由于以上原因,我们尝试直接调整 remainingRetries 为0, 以规避对zk的请求,使logManager 能对正常进行关闭。对应的集群配置为 controlled.shutdown.max.retries=0
重新写入数据进行测试,集群可以在2min内启动。
while (!shutdownSucceeded && remainingRetries > 0) {
remainingRetries = remainingRetries - 1
// 1. Find the controller and establish a connection to it.
从逻辑上来看,进行调整后, 节点重启时,无法进行元数据的更新。如果当前节点是controller节点,无法更新通知其他节点重新选举controller。
metadataUpdater.setNodes(Seq(node(prevController)).asJava)
...
val request = networkClient.newClientRequest(node(prevController).idString, controlledShutdownRequest,time.milliseconds(), true)
由于此方案是个人方案经验方案,可能存在一些未知的问题。大家适度酌情食用。
整体总结
1、通过调整 num.recovery.threads.per.data.dir 进行集群恢复,最大速度取决于磁盘的速度。
2、在没有特殊情况下,尽量正常关闭Kafka集群。直接关闭主机等非正常关闭,可能需要重新加载所有数据文件。
3、可以通过增加 宽限时间terminationGracePeriodSeconds 和 prestop 来给Kafka足够的时间执行 shut-down-hook逻辑 。
4、设置controlled.shutdown.max.retries=0 来解决无法访问zookeeper 阻塞shut-down-hook执行的问题。(测试发现,该问题是calico 版本bug,升级calico版本后问题修复)
版权归原作者 嘈嘈切切错杂弹 所有, 如有侵权,请联系我们删除。