


  1. 服务端源码 :实现Kafka Broker的核心功能,包括日志存储、控制器、协调器、元数据管理及状态机管理、延迟机制、消费者组管理、高并发网络架构模型实现等。
  2. Java客户端源码 :实现了Producer和Consumer与Broker的交互机制,以及通用组件支撑代码。
  3. Connect源码 :用来构建异构数据双向流式同步服务。
  4. Stream源码 :用来实现实时流处理相关功能。
  5. Raft源码 :实现了Raft一致性协议。
  6. Admin模块 :Kafka的管理员模块,操作和管理其topic,partition相关,包含创建,删除topic,或者拓展分区等。
  7. Api模块 :负责数据交互,客户端与服务端交互数据的编码与解码。
  8. Client模块 :包含Producer读取Kafka Broker元数据信息的类,如topic和分区,以及leader。
  9. Cluster模块 :包含Broker、Cluster、Partition、Replica等实体类。
  10. Common模块 :包含各种异常类以及错误验证。
  11. Consumer模块 :消费者处理模块,负责客户端消费者数据和逻辑处理。
  12. Controller模块 :负责中央控制器的选举,分区的Leader选举,Replica的分配或重新分配,分区和副本的扩容等。
  13. Coordinator模块 :负责管理部分consumer group和他们的offset。
  14. Javaapi模块 :提供Java语言的Producer和Consumer的API接口。
  15. Log模块 :负责Kafka文件存储,读写所有Topic消息数据。
  16. Message模块 :封装多条数据组成数据集或压缩数据集。
  17. Metrics模块 :负责内部状态监控。
  18. Network模块 :处理客户端连接,网络事件模块。
  19. Producer模块 :生产者细节实现,包括同步和异步消息发送。
  20. Security模块 :负责Kafka的安全验证和管理。
  21. Serializer模块 :序列化和反序列化消息内容。
  22. Server模块 :涉及Leader和Offset的checkpoint,动态配置,延时创建和删除Topic,Leader选举,Admin和Replica管理等。
  23. Tools模块 :包含多种工具,如导出consumer offset值,LogSegments信息,Topic的log位置信息,Zookeeper上的offset值等。
  24. Utils模块 :包含各种工具类,如Json,ZkUtils,线程池工具类,KafkaScheduler公共调度器类等。




//副本管理器/* start replica manager */
replicaManager = createReplicaManager(isShuttingDown)


def startup(){// start ISR expiration thread// A follower can lag behind leader for up to config.replicaLagTimeMaxMs x 1.5 before it is removed from ISR//周期性检查 topic-partition 的 isr 是否有 replica 因为延迟或 hang 住需要从 isr 中移除;
    scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs /2, unit = TimeUnit.MILLISECONDS)//判断是不是需要对 isr 进行更新,如果有 topic-partition 的 isr 发生了变动需要进行更新,那么这个方法就会被调用,它会触发 zk 的相应节点,进而触发 controller 进行相应的操作。
    scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period =2500L, unit = TimeUnit.MILLISECONDS)val haltBrokerOnFailure = config.interBrokerProtocolVersion < KAFKA_1_0_IV0
    logDirFailureHandler =new LogDirFailureHandler("LogDirFailureHandler", haltBrokerOnFailure)
    logDirFailureHandler.start()}//遍历本节点所有的 Partition 实例,来检查它们 isr 中的 replica 是否需要从 isr 中移除privatedef maybeShrinkIsr():Unit={
    trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
   * This function periodically runs to see if ISR needs to be propagated. It propagates ISR when:
   * 1. There is ISR change not propagated yet.
   * 2. There is no ISR Change in the last five seconds, or it has been more than 60 seconds since the last ISR propagation.
   * This allows an occasional ISR change to be propagated within a few seconds, and avoids overwhelming controller and
   * other brokers when large amount of ISR change occurs.
   *///将那些 isr 变动的 topic-partition 列表(isrChangeSet)通过 ReplicationUtils 的 propagateIsrChanges() 方法更新 zk 上,// 这时候 Controller 才能知道哪些 topic-partition 的 isr 发生了变动。def maybePropagateIsrChanges(){val now = System.currentTimeMillis()
    isrChangeSet synchronized {//有 topic-partition 的 isr 需要更新if(isrChangeSet.nonEmpty &&// 5s 内没有触发过isr change(lastIsrChangeMs.get()+ ReplicaManager.IsrChangePropagationBlackOut < now ||//或60s内没有没有触发isr propagation
          lastIsrPropagationMs.get()+ ReplicaManager.IsrChangePropagationInterval < now)){//在 zk 创建 isr 变动的提醒
        ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)//清空 isrChangeSet,它记录着 isr 变动的 topic-partition 信息
        isrChangeSet.clear()//更新触发isr propagation的时间


//检查这个 isr 中的每个 replica 是否需要从isr列表移除def maybeShrinkIsr(replicaMaxLagTimeMs:Long){val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock){//先判断本地的replica是不是这个 partition 的 leader副本,这个操作只会在 leader副本 上进行,如果不是 leader 直接跳过
      leaderReplicaIfLocal match{case Some(leaderReplica)=>//遍历除 leader 外 isr 的所有 replica,找到那些满足条件(落后超过 maxLagMs 时间的副本)需要从 isr 中移除的 replicaval outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)if(outOfSyncReplicas.nonEmpty){val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
            info("Shrinking ISR from %s to %s".format(inSyncReplicas.map(_.brokerId).mkString(","),
              newInSyncReplicas.map(_.brokerId).mkString(",")))// update ISR in zk and in cache//得到了新的 isr 列表,调用 updateIsr() 将新的 isr 更新到 zk 上,并且这个方法内部又调用了 ReplicaManager 的 recordIsrChange() 方法// 来告诉 ReplicaManager 当前这个 topic-partition 的 isr 发生了变化// (可以看出,zk 上这个 topic-partition 的 isr 信息虽然变化了,但是实际上 controller 还是无法感知的)
            updateIsr(newInSyncReplicas)// we may need to increment high watermark since ISR could be down to 1//更新 metrics
            replicaManager.isrShrinkRate.mark()//因为 isr 发生了变动,所以这里会通过 maybeIncrementLeaderHW() 方法来检查一下这个 partition 的 HW 是否需要增加。//因为isr减少了,因此hw值可能增加(可能isr中同步延迟高的副本已经被移除了)
            maybeIncrementLeaderHW(leaderReplica)}else{false}case None =>false// do nothing if no longer leader}}// some delayed operations may be unblocked after HW changedif(leaderHWIncremented)
      tryCompleteDelayedRequests()}def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs:Long): Set[Replica]={/**
     * there are two cases that will be handled here -
     * 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms,
     *                     the follower is stuck and should be removed from the ISR
     * 2. Slow followers: If the replica has not read up to the leo within the last maxLagMs ms,
     *                    then the follower is lagging and should be removed from the ISR
     * Both these cases are handled by checking the lastCaughtUpTimeMs which represents
     * the last time when the replica was fully caught up. If either of the above conditions
     * is violated, that replica is considered to be out of sync
     **/val candidateReplicas = inSyncReplicas - leaderReplica

    val laggingReplicas = candidateReplicas.filter(r =>(time.milliseconds - r.lastCaughtUpTimeMs)> maxLagMs)if(laggingReplicas.nonEmpty)
      debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(",")))

  }//更新isr信息到zkprivatedef updateIsr(newIsr: Set[Replica]){val newLeaderAndIsr =new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(_.brokerId).toList, zkVersion)//执行更新操作val(updateSucceeded,newVersion)= ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
      newLeaderAndIsr, controllerEpoch, zkVersion)// 成功更新到 zk 上if(updateSucceeded){//告诉 replicaManager 这个 partition 的 isr 需要更新
      inSyncReplicas = newIsr
      zkVersion = newVersion
      trace("ISR updated to [%s] and zkVersion updated to [%d]".format(newIsr.mkString(","), zkVersion))}else{
      info("Cached zkVersion [%d] not equal to that in zookeeper, skip updating ISR".format(zkVersion))}}/**
   * Check and maybe increment the high watermark of the partition;
   * this function can be triggered when
   * 1. Partition ISR changed
   * 2. Any replica's LEO changed
   * The HW is determined by the smallest log end offset among all replicas that are in sync or are considered caught-up.
   * This way, if a replica is considered caught-up, but its log end offset is smaller than HW, we will wait for this
   * replica to catch up to the HW before advancing the HW. This helps the situation when the ISR only includes the
   * leader replica and a follower tries to catch up. If we don't wait for the follower when advancing the HW, the
   * follower's log end offset may keep falling behind the HW (determined by the leader's log end offset) and therefore
   * will never be added to ISR.
   * Returns true if the HW was incremented, and false otherwise.
   * Note There is no need to acquire the leaderIsrUpdate lock here
   * since all callers of this private API acquire that lock
   *///检查并可能更新 Leader 副本的高水位(High Watermark,HW),即消费者可以看到的该分区中最大的已提交的偏移量(offset)//可能增加hw的情况:1.Partition ISR 变动; 2. 任何副本的 LEO 改变;//在获取 HW 时,是从 isr 和认为能追得上的副本中选择最小的 LEO,之所以也要从能追得上的副本中选择,是为了等待 follower 追上 HW,否则可能没机会追上了privatedef maybeIncrementLeaderHW(leaderReplica: Replica, curTime:Long= time.milliseconds):Boolean={// 获取 isr 以及能够追上 isr (认为最近一次 fetch 的时间在 replica.lag.time.max.time 之内) 副本的 LEO 信息。val allLogEndOffsets = assignedReplicas.filter { replica =>
      curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica)}.map(_.logEndOffset)//新的hw值val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)val oldHighWatermark = leaderReplica.highWatermark

    // Ensure that the high watermark increases monotonically. We also update the high watermark when the new// offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment.if(oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||(oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))){
      leaderReplica.highWatermark = newHighWatermark
      debug(s"High watermark updated to $newHighWatermark")true}else{
      debug(s"Skipping update high watermark since new hw $newHighWatermark is not larger than old hw $oldHighWatermark."+s"All LEOs are ${allLogEndOffsets.mkString(",")}")false}}

在 ReplicaManager 的 fetchMessages() 方法中,如果 Fetch 请求是来自副本,那么会调用 updateFollowerLogReadResults() 更新远程副本的信息:

   * Update the follower's fetch state in the leader based on the last fetch request and update `readResult`,
   * if the follower replica is not recognized to be one of the assigned replicas. Do not update
   * `readResult` otherwise, so that log start/end offset and high watermark is consistent with
   * records in fetch response. Log start/end offset and high watermark may change not only due to
   * this fetch request, e.g., rolling new log segment and removing old log segment may move log
   * start offset further than the last offset in the fetched records. The followers will get the
   * updated leader's state in the next fetch response.
   *///在 ReplicaManager 的 fetchMessages() 方法中,如果 Fetch 请求是来自副本,那么会调用 updateFollowerLogReadResults() 更新远程副本的信息//这个方法的作用就是找到本节点这个 Partition 对象,然后调用其 updateReplicaLogReadResult() 方法更新副本的 LEO 信息和拉取时间信息。privatedef updateFollowerLogReadResults(replicaId:Int,
                                           readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)]={
    debug(s"Recording follower broker $replicaId log end offsets: $readResults")
    readResults.map {case(topicPartition, readResult)=>var updatedReadResult = readResult
      nonOfflinePartition(topicPartition)match{case Some(partition)=>
          partition.getReplica(replicaId)match{case Some(replica)=>//更新副本的相关信息
              partition.updateReplicaLogReadResult(replica, readResult)case None =>
              warn(s"Leader $localBrokerId failed to record follower $replicaId's position "+s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be "+s"one of the assigned replicas ${partition.assignedReplicas.map(_.brokerId).mkString(",")} "+s"for partition $topicPartition. Empty records will be returned for this partition.")
              updatedReadResult = readResult.withEmptyFetchInfo
          }case None =>
          warn(s"While recording the replica LEO, the partition $topicPartition hasn't been created.")}
      topicPartition -> updatedReadResult


   * Update the the follower's state in the leader based on the last fetch request. See
   * [[kafka.cluster.Replica#updateLogReadResult]] for details.
   * @return true if the leader's log start offset or high watermark have been updated
   */def updateReplicaLogReadResult(replica: Replica, logReadResult: LogReadResult):Boolean={val replicaId = replica.brokerId
    // No need to calculate low watermark if there is no delayed DeleteRecordsRequestval oldLeaderLW =if(replicaManager.delayedDeleteRecordsPurgatory.delayed >0) lowWatermarkIfLeader else-1L// 更新副本的相关信息,这里是更新该副本的 LEO、lastFetchLeaderLogEndOffset 和 lastFetchTimeMs;
    replica.updateLogReadResult(logReadResult)val newLeaderLW =if(replicaManager.delayedDeleteRecordsPurgatory.delayed >0) lowWatermarkIfLeader else-1L// check if the LW of the partition has incremented// since the replica's logStartOffset may have incrementedval leaderLWIncremented = newLeaderLW > oldLeaderLW
    // check if we need to expand ISR to include this replica// if it is not in the ISR yet//如果该副本不在 isr 中, 检查是否需要进行加入到isr,即是否有不在 isr 内的副本满足进入 isr 的条件。val leaderHWIncremented = maybeExpandIsr(replicaId, logReadResult)val result = leaderLWIncremented || leaderHWIncremented
    // some delayed operations may be unblocked after HW or LW changedif(result)

    debug(s"Recorded replica $replicaId log end offset (LEO) position ${logReadResult.info.fetchOffsetMetadata.messageOffset}.")
   * Check and maybe expand the ISR of the partition.
   * A replica will be added to ISR if its LEO >= current hw of the partition.
   * Technically, a replica shouldn't be in ISR if it hasn't caught up for longer than replicaLagTimeMaxMs,
   * even if its log end offset is >= HW. However, to be consistent with how the follower determines
   * whether a replica is in-sync, we only check HW.
   * This function can be triggered when a replica's LEO has incremented.
   * @return true if the high watermark has been updated
   *///检查当前 Partition 是否需要扩充 ISR, 副本的 LEO 大于等于 hw 的副本将会被添加到 isr 中def maybeExpandIsr(replicaId:Int, logReadResult: LogReadResult):Boolean={
    inWriteLock(leaderIsrUpdateLock){// check if this replica needs to be added to the ISR
      leaderReplicaIfLocal match{case Some(leaderReplica)=>val replica = getReplica(replicaId).get
          val leaderHW = leaderReplica.highWatermark
             assignedReplicas.map(_.brokerId).contains(replicaId)&&//replica LEO 大于 HW 的情况下,加入 isr 列表
             replica.logEndOffset.offsetDiff(leaderHW)>=0){val newInSyncReplicas = inSyncReplicas + replica
            info(s"Expanding ISR from ${inSyncReplicas.map(_.brokerId).mkString(",")} "+s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")// update ISR in ZK and cache// 更新这个 topic-partition 的 isr 信息到zk
            replicaManager.isrExpandRate.mark()}// check if the HW of the partition can now be incremented// since the replica may already be in the ISR and its LEO has just incremented//检查 HW 是否需要更新//因为replica可能加入到了isr,且其LEO值增加了
          maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)case None =>false// nothing to do if no longer leader}}}

对于Controller发送的Updata-Metadata 请求的处理,入口方法kafkaApis.handleUpdateMetadataRequest():

//处理来自controller的 update-metadata 请求def handleUpdateMetadataRequest(request: RequestChannel.Request){val correlationId = request.header.correlationId
    val updateMetadataRequest = request.body[UpdateMetadataRequest]if(authorize(request.session, ClusterAction, Resource.ClusterResource)){// 调用replicaManager更新 metadata, 并返回需要删除的 Partitionval deletedPartitions = replicaManager.maybeUpdateMetadataCache(correlationId, updateMetadataRequest)if(deletedPartitions.nonEmpty){//调用GroupCoordinator 删除相关 partition 的信息
        updateMetadataRequest.partitionStates.keySet.asScala.map(_.topic).foreach { topic =>
      sendResponseExemptThrottle(request,new UpdateMetadataResponse(Errors.NONE))}else{
      sendResponseMaybeThrottle(request, _ =>new UpdateMetadataResponse(Errors.CLUSTER_AUTHORIZATION_FAILED))}}

ReplicaManager 的 maybeUpdateMetadataCache() 方法实现:

// Controller 向所有的 Broker 发送请求, 让它们去更新各自的 meta 信息def maybeUpdateMetadataCache(correlationId:Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition]={
    replicaStateChangeLock synchronized {//来自过期的 controller请求,抛出异常if(updateMetadataRequest.controllerEpoch < controllerEpoch){val stateControllerEpochErrorMessage =s"Received update metadata request with correlation id $correlationId "+s"from an old controller ${updateMetadataRequest.controllerId} with epoch ${updateMetadataRequest.controllerEpoch}. "+s"Latest known controller epoch is $controllerEpoch"
        stateChangeLogger.warn(stateControllerEpochErrorMessage)thrownew ControllerMovedException(stateChangeLogger.messageWithPrefix(stateControllerEpochErrorMessage))}else{//更新元数据缓存,然后返回需要删除的 topic-partition 列表。val deletedPartitions = metadataCache.updateCache(correlationId, updateMetadataRequest)
        controllerEpoch = updateMetadataRequest.controllerEpoch

这个方法中会调用 metadataCache.updateCache() 方法更新 meta 缓存,然后返回需要删除的 topic-partition 列表:

// This method returns the deleted TopicPartitions received from UpdateMetadataRequest//更新本地的元数据信息,并返回要删除的 topic-partitiondef updateCache(correlationId:Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition]={
      controllerId = updateMetadataRequest.controllerId match{case id if id <0=> None
          case id => Some(id)}//清空 aliveNodes 和 aliveBrokers 记录,并更新成最新的记录
      updateMetadataRequest.liveBrokers.asScala.foreach { broker =>// `aliveNodes` is a hot path for metadata requests for large clusters, so we use java.util.HashMap which// is a bit faster than scala.collection.mutable.HashMap. When we drop support for Scala 2.10, we could// move to `AnyRefMap`, which has comparable performance.val nodes =new java.util.HashMap[ListenerName, Node]val endPoints =new mutable.ArrayBuffer[EndPoint]
        broker.endPoints.asScala.foreach { ep =>
          endPoints += EndPoint(ep.host, ep.port, ep.listenerName, ep.securityProtocol)
          nodes.put(ep.listenerName,new Node(broker.id, ep.host, ep.port))}
        aliveBrokers(broker.id)= Broker(broker.id, endPoints, Option(broker.rack))
        aliveNodes(broker.id)= nodes.asScala
      }val deletedPartitions =new mutable.ArrayBuffer[TopicPartition]
      updateMetadataRequest.partitionStates.asScala.foreach {case(tp, info)=>val controllerId = updateMetadataRequest.controllerId
        val controllerEpoch = updateMetadataRequest.controllerEpoch
        // 对于要删除的 topic-partition,从缓存中删除,并记录下来作为这个方法的返回;if(info.basePartitionState.leader == LeaderAndIsr.LeaderDuringDelete){//从 cache 中删除
          removePartitionInfo(tp.topic, tp.partition)
          stateChangeLogger.trace(s"Deleted partition $tp from metadata cache in response to UpdateMetadata "+s"request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")
          deletedPartitions += tp
        }else{// 对于其他的 topic-partition,执行 updateOrCreate 操作。
          addOrUpdatePartitionInfo(tp.topic, tp.partition, info)
          stateChangeLogger.trace(s"Cached leader info $info for partition $tp in response to "+s"UpdateMetadata request sent by controller $controllerId epoch $controllerEpoch with correlation id $correlationId")}}
标签: kafka linq 分布式

本文转载自: https://blog.csdn.net/hmh13548571896/article/details/140593903
版权归原作者 mhHao 所有, 如有侵权,请联系我们删除。

