概述
Kafka源码包含多个模块,每个模块负责不同的功能。以下是一些核心模块及其功能的概述:
- 服务端源码 :实现Kafka Broker的核心功能,包括日志存储、控制器、协调器、元数据管理及状态机管理、延迟机制、消费者组管理、高并发网络架构模型实现等。
- Java客户端源码 :实现了Producer和Consumer与Broker的交互机制,以及通用组件支撑代码。
- Connect源码 :用来构建异构数据双向流式同步服务。
- Stream源码 :用来实现实时流处理相关功能。
- Raft源码 :实现了Raft一致性协议。
- Admin模块 :Kafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。
- Api模块 :负责数据交互,客户端与服务端交互数据的编码与解码。
- Client模块 :包含Producer读取Kafka Broker元数据信息的类,如topic和分区,以及leader。
- Cluster模块 :包含Broker、Cluster、Partition、Replica等实体类。
- Common模块 :包含各种异常类以及错误验证。
- Consumer模块 :消费者处理模块,负责客户端消费者数据和逻辑处理。
- Controller模块 :负责中央控制器的选举,分区的Leader选举,Replica的分配或重新分配,分区和副本的扩容等。
- Coordinator模块 :负责管理部分consumer group和他们的offset。
- Javaapi模块 :提供Java语言的Producer和Consumer的API接口。
- Log模块 :负责Kafka文件存储,读写所有Topic消息数据。
- Message模块 :封装多条数据组成数据集或压缩数据集。
- Metrics模块 :负责内部状态监控。
- Network模块 :处理客户端连接,网络事件模块。
- Producer模块 :生产者细节实现,包括同步和异步消息发送。
- Security模块 :负责Kafka的安全验证和管理。
- Serializer模块 :序列化和反序列化消息内容。
- Server模块 :涉及Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader选举,Admin和Replica管理等。
- Tools模块 :包含多种工具,如导出consumer offset值,LogSegments信息,Topic的log位置信息,Zookeeper上的offset值等。
- Utils模块 :包含各种工具类,如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类等。
这些模块共同构成了Kafka的整体架构,使其能够提供高吞吐量、高可用性的消息队列服务。
kafka源码分支为1.0.2
kafkaServer启动时会调用replicaManager.startup()方法:
//副本管理器/* start replica manager */
replicaManager = createReplicaManager(isShuttingDown)
replicaManager.startup()
replicaManager.startup(),会注册两个定时任务:
def startup(){// start ISR expiration thread// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR//周期性检查 topic-partition 的 isr 是否有 replica 因为延迟或 hang 住需要从 isr 中移除;
scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs /2, unit = TimeUnit.MILLISECONDS)//判断是不是需要对 isr 进行更新,如果有 topic-partition 的 isr 发生了变动需要进行更新,那么这个方法就会被调用,它会触发 zk 的相应节点,进而触发 controller 进行相应的操作。
scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period =2500L, unit = TimeUnit.MILLISECONDS)val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
logDirFailureHandler =new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
logDirFailureHandler.start()}//遍历本节点所有的 Partition 实例,来检查它们 isr 中的 replica 是否需要从 isr 中移除privatedef maybeShrinkIsr():Unit={
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs))}/**
* This function periodically runs to see if ISR needs to be propagated. It propagates ISR when:
* 1. There is ISR change not propagated yet.
* 2. There is no ISR Change in the last five seconds, or it has been more than 60 seconds since the last ISR propagation.
* This allows an occasional ISR change to be propagated within a few seconds, and avoids overwhelming controller and
* other brokers when large amount of ISR change occurs.
*///将那些 isr 变动的 topic-partition 列表(isrChangeSet)通过 ReplicationUtils 的 propagateIsrChanges() 方法更新 zk 上,// 这时候 Controller 才能知道哪些 topic-partition 的 isr 发生了变动。def maybePropagateIsrChanges(){val now = System.currentTimeMillis()
isrChangeSet synchronized {//有 topic-partition 的 isr 需要更新if(isrChangeSet.nonEmpty &&// 5s 内没有触发过isr change(lastIsrChangeMs.get()+ ReplicaManager.IsrChangePropagationBlackOut < now ||//或60s内没有没有触发isr propagation
lastIsrPropagationMs.get()+ ReplicaManager.IsrChangePropagationInterval < now)){//在 zk 创建 isr 变动的提醒
ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)//清空 isrChangeSet,它记录着 isr 变动的 topic-partition 信息
isrChangeSet.clear()//更新触发isr propagation的时间
lastIsrPropagationMs.set(now)}}}
Partition类中和maybeShrinkIsr定时任务相关的方法:
//检查这个 isr 中的每个 replica 是否需要从isr列表移除def maybeShrinkIsr(replicaMaxLagTimeMs:Long){val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock){//先判断本地的replica是不是这个 partition 的 leader副本,这个操作只会在 leader副本 上进行,如果不是 leader 直接跳过
leaderReplicaIfLocal match{case Some(leaderReplica)=>//遍历除 leader 外 isr 的所有 replica,找到那些满足条件(落后超过 maxLagMs 时间的副本)需要从 isr 中移除的 replicaval outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)if(outOfSyncReplicas.nonEmpty){val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.nonEmpty)
info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","),
newInSyncReplicas.map(_.brokerId).mkString(",")))// update ISR in zk and in cache//得到了新的 isr 列表,调用 updateIsr() 将新的 isr 更新到 zk 上,并且这个方法内部又调用了 ReplicaManager 的 recordIsrChange() 方法// 来告诉 ReplicaManager 当前这个 topic-partition 的 isr 发生了变化// (可以看出,zk 上这个 topic-partition 的 isr 信息虽然变化了,但是实际上 controller 还是无法感知的)
updateIsr(newInSyncReplicas)// we may need to increment high watermark since ISR could be down to 1//更新 metrics
replicaManager.isrShrinkRate.mark()//因为 isr 发生了变动,所以这里会通过 maybeIncrementLeaderHW() 方法来检查一下这个 partition 的 HW 是否需要增加。//因为isr减少了,因此hw值可能增加(可能isr中同步延迟高的副本已经被移除了)
maybeIncrementLeaderHW(leaderReplica)}else{false}case None =>false// do nothing if no longer leader}}// some delayed operations may be unblocked after HW changedif(leaderHWIncremented)
tryCompleteDelayedRequests()}def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs:Long): Set[Replica]={/**
* there are two cases that will be handled here -
* 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms,
* the follower is stuck and should be removed from the ISR
* 2. Slow followers: If the replica has not read up to the leo within the last maxLagMs ms,
* then the follower is lagging and should be removed from the ISR
* Both these cases are handled by checking the lastCaughtUpTimeMs which represents
* the last time when the replica was fully caught up. If either of the above conditions
* is violated, that replica is considered to be out of sync
*
**/val candidateReplicas = inSyncReplicas - leaderReplica
val laggingReplicas = candidateReplicas.filter(r =>(time.milliseconds - r.lastCaughtUpTimeMs)> maxLagMs)if(laggingReplicas.nonEmpty)
debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(",")))
laggingReplicas
}//更新isr信息到zkprivatedef updateIsr(newIsr: Set[Replica]){val newLeaderAndIsr =new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)//执行更新操作val(updateSucceeded,newVersion)= ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
newLeaderAndIsr, controllerEpoch, zkVersion)// 成功更新到 zk 上if(updateSucceeded){//告诉 replicaManager 这个 partition 的 isr 需要更新
replicaManager.recordIsrChange(topicPartition)
inSyncReplicas = newIsr
zkVersion = newVersion
trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))}else{
replicaManager.failedIsrUpdatesRate.mark()
info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))}}/**
* Check and maybe increment the high watermark of the partition;
* this function can be triggered when
*
* 1. Partition ISR changed
* 2. Any replica's LEO changed
*
* The HW is determined by the smallest log end offset among all replicas that are in sync or are considered caught-up.
* This way, if a replica is considered caught-up, but its log end offset is smaller than HW, we will wait for this
* replica to catch up to the HW before advancing the HW. This helps the situation when the ISR only includes the
* leader replica and a follower tries to catch up. If we don't wait for the follower when advancing the HW, the
* follower's log end offset may keep falling behind the HW (determined by the leader's log end offset) and therefore
* will never be added to ISR.
*
* Returns true if the HW was incremented, and false otherwise.
* Note There is no need to acquire the leaderIsrUpdate lock here
* since all callers of this private API acquire that lock
*///检查并可能更新 Leader 副本的高水位(High Watermark,HW),即消费者可以看到的该分区中最大的已提交的偏移量(offset)//可能增加hw的情况:1.Partition ISR 变动; 2. 任何副本的 LEO 改变;//在获取 HW 时,是从 isr 和认为能追得上的副本中选择最小的 LEO,之所以也要从能追得上的副本中选择,是为了等待 follower 追上 HW,否则可能没机会追上了privatedef maybeIncrementLeaderHW(leaderReplica: Replica, curTime:Long= time.milliseconds):Boolean={// 获取 isr 以及能够追上 isr (认为最近一次 fetch 的时间在 replica.lag.time.max.time 之内) 副本的 LEO 信息。val allLogEndOffsets = assignedReplicas.filter { replica =>
curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica)}.map(_.logEndOffset)//新的hw值val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)val oldHighWatermark = leaderReplica.highWatermark
// Ensure that the high watermark increases monotonically. We also update the high watermark when the new// offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.if(oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))){
leaderReplica.highWatermark = newHighWatermark
debug(s"High watermark updated to $newHighWatermark")true}else{
debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark."+s"All LEOs are ${allLogEndOffsets.mkString(",")}")false}}
在 ReplicaManager 的 fetchMessages() 方法中,如果 Fetch 请求是来自副本,那么会调用 updateFollowerLogReadResults() 更新远程副本的信息:
/**
* Update the follower's fetch state in the leader based on the last fetch request and update `readResult`,
* if the follower replica is not recognized to be one of the assigned replicas. Do not update
* `readResult` otherwise, so that log start/end offset and high watermark is consistent with
* records in fetch response. Log start/end offset and high watermark may change not only due to
* this fetch request, e.g., rolling new log segment and removing old log segment may move log
* start offset further than the last offset in the fetched records. The followers will get the
* updated leader's state in the next fetch response.
*///在 ReplicaManager 的 fetchMessages() 方法中,如果 Fetch 请求是来自副本,那么会调用 updateFollowerLogReadResults() 更新远程副本的信息//这个方法的作用就是找到本节点这个 Partition 对象,然后调用其 updateReplicaLogReadResult() 方法更新副本的 LEO 信息和拉取时间信息。privatedef updateFollowerLogReadResults(replicaId:Int,
readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)]={
debug(s"Recording follower broker $replicaId log end offsets: $readResults")
readResults.map {case(topicPartition, readResult)=>var updatedReadResult = readResult
nonOfflinePartition(topicPartition)match{case Some(partition)=>
partition.getReplica(replicaId)match{case Some(replica)=>//更新副本的相关信息
partition.updateReplicaLogReadResult(replica, readResult)case None =>
warn(s"Leader $localBrokerId failed to record follower $replicaId's position "+s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be "+s"one of the assigned replicas ${partition.assignedReplicas.map(_.brokerId).mkString(",")} "+s"for partition $topicPartition. Empty records will be returned for this partition.")
updatedReadResult = readResult.withEmptyFetchInfo
}case None =>
warn(s"While recording the replica LEO, the partition $topicPartition hasn't been created.")}
topicPartition -> updatedReadResult
}}
会调用Partition类中的相关方法:
/**
* Update the the follower's state in the leader based on the last fetch request. See
* [[kafka.cluster.Replica#updateLogReadResult]] for details.
*
* @return true if the leader's log start offset or high watermark have been updated
*/def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult):Boolean={val replicaId = replica.brokerId
// No need to calculate low watermark if there is no delayed DeleteRecordsRequestval oldLeaderLW =if(replicaManager.delayedDeleteRecordsPurgatory.delayed >0) lowWatermarkIfLeader else-1L// 更新副本的相关信息,这里是更新该副本的 LEO、lastFetchLeaderLogEndOffset 和 lastFetchTimeMs;
replica.updateLogReadResult(logReadResult)val newLeaderLW =if(replicaManager.delayedDeleteRecordsPurgatory.delayed >0) lowWatermarkIfLeader else-1L// check if the LW of the partition has incremented// since the replica's logStartOffset may have incrementedval leaderLWIncremented = newLeaderLW > oldLeaderLW
// check if we need to expand ISR to include this replica// if it is not in the ISR yet//如果该副本不在 isr 中, 检查是否需要进行加入到isr,即是否有不在 isr 内的副本满足进入 isr 的条件。val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)val result = leaderLWIncremented || leaderHWIncremented
// some delayed operations may be unblocked after HW or LW changedif(result)
tryCompleteDelayedRequests()
debug(s"Recorded replica $replicaId log end offset (LEO) position ${logReadResult.info.fetchOffsetMetadata.messageOffset}.")
result
}/**
* Check and maybe expand the ISR of the partition.
* A replica will be added to ISR if its LEO >= current hw of the partition.
*
* Technically, a replica shouldn't be in ISR if it hasn't caught up for longer than replicaLagTimeMaxMs,
* even if its log end offset is >= HW. However, to be consistent with how the follower determines
* whether a replica is in-sync, we only check HW.
*
* This function can be triggered when a replica's LEO has incremented.
*
* @return true if the high watermark has been updated
*///检查当前 Partition 是否需要扩充 ISR, 副本的 LEO 大于等于 hw 的副本将会被添加到 isr 中def maybeExpandIsr(replicaId:Int, logReadResult: LogReadResult):Boolean={
inWriteLock(leaderIsrUpdateLock){// check if this replica needs to be added to the ISR
leaderReplicaIfLocal match{case Some(leaderReplica)=>val replica = getReplica(replicaId).get
val leaderHW = leaderReplica.highWatermark
//这个replica不在isr中if(!inSyncReplicas.contains(replica)&&//而是在AR列表中
assignedReplicas.map(_.brokerId).contains(replicaId)&&//replica LEO 大于 HW 的情况下,加入 isr 列表
replica.logEndOffset.offsetDiff(leaderHW)>=0){val newInSyncReplicas = inSyncReplicas + replica
info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} "+s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")// update ISR in ZK and cache// 更新这个 topic-partition 的 isr 信息到zk
updateIsr(newInSyncReplicas)
replicaManager.isrExpandRate.mark()}// check if the HW of the partition can now be incremented// since the replica may already be in the ISR and its LEO has just incremented//检查 HW 是否需要更新//因为replica可能加入到了isr,且其LEO值增加了
maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)case None =>false// nothing to do if no longer leader}}}
对于Controller发送的Updata-Metadata 请求的处理,入口方法kafkaApis.handleUpdateMetadataRequest():
//处理来自controller的 update-metadata 请求def handleUpdateMetadataRequest(request: RequestChannel.Request){val correlationId = request.header.correlationId
val updateMetadataRequest = request.body[UpdateMetadataRequest]if(authorize(request.session, ClusterAction, Resource.ClusterResource)){// 调用replicaManager更新 metadata, 并返回需要删除的 Partitionval deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)if(deletedPartitions.nonEmpty){//调用GroupCoordinator 删除相关 partition 的信息
groupCoordinator.handleDeletedPartitions(deletedPartitions)}if(adminManager.hasDelayedTopicOperations){
updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
adminManager.tryCompleteDelayedTopicOperations(topic)}}
sendResponseExemptThrottle(request,new UpdateMetadataResponse(Errors.NONE))}else{
sendResponseMaybeThrottle(request, _ =>new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED))}}
ReplicaManager 的 maybeUpdateMetadataCache() 方法实现:
// Controller 向所有的 Broker 发送请求, 让它们去更新各自的 meta 信息def maybeUpdateMetadataCache(correlationId:Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition]={
replicaStateChangeLock synchronized {//来自过期的 controller请求,抛出异常if(updateMetadataRequest.controllerEpoch < controllerEpoch){val stateControllerEpochErrorMessage =s"Received update metadata request with correlation id $correlationId "+s"from an old controller ${updateMetadataRequest.controllerId} with epoch ${updateMetadataRequest.controllerEpoch}. "+s"Latest known controller epoch is $controllerEpoch"
stateChangeLogger.warn(stateControllerEpochErrorMessage)thrownew ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage))}else{//更新元数据缓存,然后返回需要删除的 topic-partition 列表。val deletedPartitions = metadataCache.updateCache(correlationId, updateMetadataRequest)
controllerEpoch = updateMetadataRequest.controllerEpoch
deletedPartitions
}}}
这个方法中会调用 metadataCache.updateCache() 方法更新 meta 缓存,然后返回需要删除的 topic-partition 列表:
// This method returns the deleted TopicPartitions received from UpdateMetadataRequest//更新本地的元数据信息,并返回要删除的 topic-partitiondef updateCache(correlationId:Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition]={
inWriteLock(partitionMetadataLock){
controllerId = updateMetadataRequest.controllerId match{case id if id <0=> None
case id => Some(id)}//清空 aliveNodes 和 aliveBrokers 记录,并更新成最新的记录
aliveNodes.clear()
aliveBrokers.clear()
updateMetadataRequest.liveBrokers.asScala.foreach { broker =>// `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which// is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could// move to `AnyRefMap`, which has comparable performance.val nodes =new java.util.HashMap[ListenerName, Node]val endPoints =new mutable.ArrayBuffer[EndPoint]
broker.endPoints.asScala.foreach { ep =>
endPoints += EndPoint(ep.host, ep.port, ep.listenerName, ep.securityProtocol)
nodes.put(ep.listenerName,new Node(broker.id, ep.host, ep.port))}
aliveBrokers(broker.id)= Broker(broker.id, endPoints, Option(broker.rack))
aliveNodes(broker.id)= nodes.asScala
}val deletedPartitions =new mutable.ArrayBuffer[TopicPartition]
updateMetadataRequest.partitionStates.asScala.foreach {case(tp, info)=>val controllerId = updateMetadataRequest.controllerId
val controllerEpoch = updateMetadataRequest.controllerEpoch
// 对于要删除的 topic-partition,从缓存中删除,并记录下来作为这个方法的返回;if(info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete){//从 cache 中删除
removePartitionInfo(tp.topic, tp.partition)
stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata "+s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
deletedPartitions += tp
}else{// 对于其他的 topic-partition,执行 updateOrCreate 操作。
addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to "+s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")}}
deletedPartitions
}}
版权归原作者 mhHao 所有, 如有侵权,请联系我们删除。