Kafka心跳包的处理流程如下图所示:
图的右边是kafka心跳在服务端的核心处理流程,而左边主要展示kafka中所有的心跳请求,根据上图得知Kafka触发心跳处理的主要请求分别如下:
- KafkaConsume主动发送心跳包 消费者会以3s的频率向服务端发送心跳包,服务端对应的入口为 KafkaApis的handleHeartbeatRequest方法。
- 消费者加入消费组 在消费端重平衡过程中,客户端主动向其组协调器发起Join_Group(加入消费组)时,组协调器会认为收到一个有效的心跳包,服务端对应的处理入口:KafkaApis的handleJoinGroup方法。
- 消费者获取队列负载结果 在重平衡的第二个阶段,消费组的Leader在计算出分区负载结果后会发给组协调器,消费组中的其他成员需要发生Sync_Group请求获取负载结果,组协调器同样认为收到了一个有效的心跳包。服务端对应的处理入口:KafkaApis的handleSyncGroupRequest。
- 消费者提交位点 消费者组协调器收到消费者提交位点请求,同样可以认定消费者是存活的。位点提交的处理入口:KafkaApis的handlerCommitOffsets方法。
- __consumers_offsets主题的ISR的Leader发生变化如果__consumers_offsets主题中的各个分区Leader发生变化,与特定分区的组协调器需要重新选举,与此组协调器相关的消费者将触发重平衡。
上述任何一种请求,都能表明消费端是存活的,故能有效阻止服务端将客户端端心跳设置为过期,进入下一个心跳检测周期。
上述各个入口,特别是__consumers_offsets的ISR对消费组的影响,后续会专门展开研究,现在我们将重心转移到服务端是如何处理一个心跳包的。
1.2 源码分析Kafka心跳处理机制
===================
从上面的流程图可以得出,Kafka收到一个心跳包后的处理入口为GroupCoordinator的completeAndScheduleNextExpiration方法,核心代码如下图所示:
在介绍该方法之前首先介绍一个该方法的入参含义:
- GroupMetadata group 消费组的元信息。
- MemberMetadata member 消费者的元信息。
- long timeoutMs 心跳超时时间,默认为10s,这个参数是由消费端的session.timeout.ms参数设置,默认为10s。
Step1:为消费组设置唯一标识:groupId + “-” + memberId构成。
Step2:将hearbeatSatisfied设置为true,表示该消费者收到一个有效的心跳包。
Step3:收到一个有效的心跳包,通知定时调度器停止本次的心跳过期检测。
Step4:构建一个DelayedHearbeat,进入下一个心跳检测周期。
接下来将分别对Step3、Step4展开详细介绍。
1.2.1 心跳检测正常处理逻辑
================
在收到一个心跳包时,尝试将本次检测设置成功,具体的实现由DelayedOperation的checkAndComplete方法,代码如下:
Kafka使用一个数据结构来存储需要跟踪的所有消费者,在这里成为Watch机制。
实现要点:根据key获取WatchList,然后从获取的WatchList中内部的ConcurrentMap中再按照Key获取对应与当前消费者对应的Watch。
- 如果没有找到对应消费者的Watch,则直接返回,无需检测,说明已经成功检测。
- 如果找到了对应消费者的Watch,则执行被watch的tryCompleteWatched方法。
Watch的数据结构如下:
接下来重点关注Watches的tryCompleteWatched方法,该方法的详细调用代码如下图所示:
这边先重点介绍一下组协调器判断一次成功的心跳检测的三个标准中满足一个即可(GroupCoordinator的tryCompleteHeartbeat方法):
- 如果消费组的状态处于Dead
- 如果消费组的状态为Pending(消费组在重平衡中)
- hearbeatSatisfied为true,即收到了一个有效的心跳包。
上述代码的实现比较简单,这里就不一一罗列,其核心关键点如下:
- 删除对应的Watch,表示一次心跳检测成功。
- Watchs中存储的对象是DelayedOperation(Kafka延迟类型的父类)的子类,在心跳检测中具体为DelayedHeartbeat。
- 最终执行DelayedOperation的是TimeTask的cancel方法(取消延迟任务),就是从延迟调度中移除自己,表示没有超时,结束本轮的超时检测,具体的存储结构,将在下文详介绍如果开启新一轮心跳检测时再详细讲解。
为了方便大家阅读源码,其主要的调用时序图如下:
1.2.2 开启下一轮心跳检测
===============
1.2.2.1将延迟任务放入时间轮
=================
在接受到一个新的心跳包首先用于清除上一轮设置的延迟任务,然后需要开启一个新的延迟任务,接下来我们将来具体看看Kafka如何开启新一轮心跳检测机制,其本质上是Kafka的延迟(定时)实现原理。代码入口如下图所示:
开启下一轮调度时首先将Member的heartbeatSatisfied设置为false。
其核心思想是创建一个心跳延迟任务DelayedHeartbeat,并对其检测是否完成或者添加Watch,启动心跳延迟或者等待下一个心跳包的到来。
其实看到这里,我们应该能得到一个关于Kafka心跳检测机制的实现思路:
- 开启一个延迟任务,延迟检查时间为心跳过期时间,一旦延迟任务执行,则意外着心跳超时。
- 当收到一个心跳包时,需要取消上一次设置的延迟任务。
- 使用循环使用延迟任务,从而实现类似定时任务的效果。
接下来我们详细探讨一下DelayedOperationPurgatory的tryCompleteElseWatch方法,其代码如下图所示:
Step1:尝试调用DelayedHeartbeat的tryComplete方法,判断是否可以判断完成,这里主要是消费组是否为重平衡或者状态为Dead,如果上述情况不满足,则会返回false,因为在发起下一轮心跳包时已将heartbeatSatisfied设置为false。
Step2:为该消费者添加到Watch中,表示kafka需要跟踪该消费者的心跳。
Step3:再次调用maybeTryComplete方法,再尝试判断是否该心跳检测完成。
Step4:如果没有完成,则该任务延迟任务(DelayedHeartbeat)添加到定时调度中。
接下来将进入到Kafka心跳的核心机制,即延迟任务的实现机制。
最后
现在正是金三银四的春招高潮,前阵子小编一直在搭建自己的网站,并整理了全套的【一线互联网大厂Java核心面试题库+解析】:包括Java基础、异常、集合、并发编程、JVM、Spring全家桶、MyBatis、Redis、数据库、中间件MQ、Dubbo、Linux、Tomcat、ZooKeeper、Netty等等
机制**。
最后
现在正是金三银四的春招高潮,前阵子小编一直在搭建自己的网站,并整理了全套的【一线互联网大厂Java核心面试题库+解析】:包括Java基础、异常、集合、并发编程、JVM、Spring全家桶、MyBatis、Redis、数据库、中间件MQ、Dubbo、Linux、Tomcat、ZooKeeper、Netty等等
[外链图片转存中…(img-VZHEExCB-1725898389445)]
版权归原作者 2401_86637344 所有, 如有侵权,请联系我们删除。