0


网易三面:说说Kafka的Follower是如何拉取Leader消息的?

搞懂AbstractFetcherThread的processPartitionData、truncate、buildFetch等方法,就掌握了拉取线程的处理逻辑。串联起这三个方法的doWork方法就能完整理解Follower副本应用拉取线程(即ReplicaFetcherThread线程),从Leader副本获取消息并处理的流程了。

AbstractFetcherThread#doWork

doWork,AbstractFetcherThread的核心方法,线程的主逻辑运行方法:

AbstractFetcherThread线程只要一直处运行状态,就会不断重复这俩操作。

  • 为何AbstractFetcherThread线程要不断尝试截断?因为分区的Leader可能随时变化。每当有新Leader产生,Follower副本就必须主动执行截断,将自己的本地日志裁剪成与Leader一模一样的消息序列,甚至,Leader副本也要执行截断,将LEO调整到分区高水位处。

maybeTruncate

先对分区状态进行分组。既然是做截断,则该方法操作的就只能是处于【截断中】状态的分区。

Leader Epoch机制,替换高水位值在日志截断中的作用:

  • 当分区存在Leader Epoch值,将副本的本地日志截断到Leader Epoch对应的最新位移值处,truncateToEpochEndOffsets实现
  • 若分区不存在对应Leader Epoch记录,则仍使用原来高水位机制,调用truncateToHighWatermark将日志调整到高水位值处

truncateToHighWatermark

  • 先遍历给定的所有分区
  • 依次为每个分区获取当前高水位值,并保存在分区读取状态类
  • 调用doTruncate执行日志截断
  • 等给定的所有分区都执行对应操作后,更新这组分区的分区读取状态

doTruncate调用抽象方法truncate,而truncate实现在ReplicaFetcherThread。

maybeFetch

第1步,为partitionStates中的分区构造FetchRequest.Builder对象,之后调用其build方法创建FetchRequest请求对象。这里的partitionStates保存要去获取消息的一组分区及对应状态信息。该步的输出结果是两个对象:

  • ReplicaFetch,要读取的分区核心信息+ FetchRequest.Builder对象。核心信息指要读取哪个分区,从哪个位置开始读,最多读多少字节等
  • 一组出错分区

第2步,处理出错分区:将这组分区加入到有序Map末尾,等待后续重试。若发现当前无可读取分区,会阻塞等待一段时间

第3步,发送FETCH请求给对应Leader副本,并处理相应Response,即processFetchRequest要做的事。

processFetchRequest

img

搞清processFetchRequest的核心逻辑,就能明白拉取线程是如何执行拉取动作:

  1. 调用fetchFromLeader给Leader发送FETCH请求,并阻塞等待Response的返回,然后更新FETCH请求发送速率的监控指标
  2. 拿到Response后,从中取出分区的核心信息: - 比较要读取的位移值==当前AbstractFetcherThread线程缓存的、该分区下一条待读取的位移值- 当前分区是否处于可获取状态

若不满足这俩条件,说明该Request可能是个之前等待了许久都未处理的请求,就不用处理了。

相反,若满足这两个条件且:

  • Response无错误,会提取Response的Leader Epoch值,然后交由子类实现具体的Response处理:调用processPartitionData。之后将该分区放置在有序Map的末尾以保证公平性
  • Response有错误,调用对应错误的定制化处理逻辑,将出错分区加入出错分区列表
  1. 调用handlePartitionsWithErrors统一处理上一步处理过程中出现的错误分区

子类:ReplicaFetcherThread

ReplicaFetcherThread继承自AbstractFetcherThread,是Follower副本端创建的线程,用于向Leader副本拉取消息数据。

类定义及字段

ReplicaFetcherThread的定义代码有些长,但构造器中大部分字段都解析过了。现在,只需学习ReplicaFetcherThread类的字段:

消息获相关字段:

都是FETCH请求的参数,主要控制Follower副本拉取Leader副本消息的行为,如:

  • 一次请求到底能获取多少字节数据
  • 或当未达到累积阈值时,FETCH请求等待多长时间等

API

Follower副本拉取线程要做的最重要的三件事:

  • 处理拉取的消息
  • 构建拉取消息的请求
  • 执行截断日志操作

processPartitionData

AbstractFetcherThread线程从Leader副本拉取回消息后,要调用processPartitionData执行后续动作:

img

processPartitionData中的process就是写入Follower副本本地日志。因此,该方法的主体逻辑就是调用分区对象Partition的appendRecordsToFollowerOrFutureReplica写入获取到的消息。沿着这个写入方法追踪,就会发现它调用appendAsFollower。

仅写入日志还不够,还要做一些更新。如更新Follower副本的高水位值:将FETCH请求Response中包含的高水位值作为新的高水位值,还要尝试更新Follower副本的Log Start Offset值。

为何Log Start Offset值也可能变化?因为Leader的Log Start Offset可能发生变化,如用户手动执行删除消息的操作。Follower副本的日志要和Leader保持严格一致,因此,若Leader的该值发生变化,Follower自然也要发生变化。

此外还会更新其他一些统计指标值,最后将写入结果返回。

buildFetch

构建发送给Leader副本所在Broker的FETCH请求:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ccGd2xLJ-1641571213281)(/Users/apple/Library/Application Support/typora-user-images/image-20220107235217950.png)]

img

构造FETCH请求的Builder对象然后返回。有Builder对象,就能构造出FETCH请求,仅需调用builder.build()。

该方法的一个副产品是汇总出错分区,调用方后续可统一处理这些出错分区。

构造Builder的过程中,会用到ReplicaFetcherThread类定义的那些与消息获取相关的字段,如maxWait、minBytes和maxBytes。

truncate

对给定分区执行日志截断操作:

overridedef truncate(
  tp: TopicPartition, 
  offsetTruncationState: OffsetTruncationState):Unit={// 拿到分区对象val partition = replicaMgr.nonOfflinePartition(tp).get
  //拿到分区本地日志 val log = partition.localLogOrException
  // 执行截断操作,截断到的位置由offsetTruncationState的offset指定
  partition.truncateTo(offsetTruncationState.offset, isFuture =false)if(offsetTruncationState.offset < log.highWatermark)
    warn(s"Truncating $tp to offset ${offsetTruncationState.offset} below high watermark "+
      s"${log.highWatermark}")if(offsetTruncationState.truncationCompleted)
    replicaMgr.replicaAlterLogDirsManager
      .markPartitionsForTruncation(brokerConfig.brokerId, tp,
      offsetTruncationState.offset)}

利用给定的offsetTruncationState的offset值,对给定分区的本地日志进行截断操作。该操作由Partition对象的truncateTo方法完成,但实际上底层调用的是Log#truncateTo:将日志截断到小于给定值的最大位移值处。

总结

AbstractFetcherThread线程的doWork完整了拉取线程要执行的逻辑,即日志截断(truncate)+日志获取(buildFetch)+日志处理(processPartitionData),而其子类ReplicaFetcherThread是真正实现这3个方法:Follower副本利用ReplicaFetcherThread线程实时地从Leader副本拉取消息并写入到本地日志,从而实现了与Leader副本之间的同步。

要点:

  • doWork方法:拉取线程工作入口方法,联结所有重要的子功能方法,如执行截断操作,获取Leader副本消息以及写入本地日志。
  • truncate方法:根据Leader副本返回的位移值和Epoch值执行本地日志的截断操作。
  • buildFetch方法:为一组特定分区构建FetchRequest对象所需的数据结构。
  • processPartitionData方法:处理从Leader副本获取到的消息,主要是写入到本地日志中。

Follower副本正是利用它来获取对应分区Partition对象的,然后依靠该对象执行消息写入。

标签: kafka java 分布式

本文转载自: https://blog.csdn.net/qq_33589510/article/details/122374591
版权归原作者 JavaEdge. 所有, 如有侵权,请联系我们删除。

“网易三面:说说Kafka的Follower是如何拉取Leader消息的?”的评论:

还没有评论