0


flink提交流程源码

flink源码系列总述

本文基于flink-1.17版本,对于flink源码学习了解,仅作为个人学习笔记,如有错误,欢迎指正。

flink提交流程源码 流程解析

看以下流程时,请及时参考本图
在这里插入图片描述

  1. CliFrontend 客户端
  2. YarnJobClusterEntrypoint AM执行的入口
  3. YarnTaskExecutorRunner Yarn模式下TaskManager的入口类

1. CliFrontend 客户端

提交命令
通过flink on yarn per-job模式提交,查看flink脚本可以看到,程序被提交后,会寻找CliFrontend类

CliFrotend n main方法入口
在这里插入图片描述

在这里插入图片描述

其中: parseAndRun方法 在这里插入图片描述

其中:parseAndRun的run中:
[外链图片转存中...(img-XcrKfdwd-1705648822026)]

获取有效配置中:
[外链图片转存中...(img-RXyd4Ols-1705648822027)]

执行程序中:
[外链图片转存中...(img-zB0E1XZ9-1705648822027)]

执行用户代码:
env.execute() ->StreamExecutionEnvironment.execute()

获取StreamGraph StreamExecutionEnvironment类
在这里插入图片描述

获取PipelineExecutor执行器 执行
在这里插入图片描述

生成jobGraph YarnJobClusterExecutor类
在这里插入图片描述

其中1:
创建yarnClient 生成YarnClusterDescriptor YarnClusterClientFactory类
[外链图片转存中...(img-wQfYeuLu-1705648822029)]

其中2:
启动am ApplicationMaster
[外链图片转存中...(img-iqfHF7tv-1705648822029)]

[外链图片转存中...(img-t0YIgpEu-1705648822029)]

2.在部署集群的时候,已经启动了ApplicationMaster

3.ApplicationMaster 执行过程

YarnJobClusterEntrypoint 启动入口 main方法
[外链图片转存中...(img-VwNRfJwZ-1705648822030)]

工厂类构建和对象创建
在这里插入图片描述

对象创建:
创建resourceManager
[外链图片转存中...(img-Mc5iP1Qy-1705648822030)]

创建并启动dispatcher 启动resourceManager
[外链图片转存中...(img-8QK5s2E7-1705648822030)]

其中:创建并启动dispatcher
[外链图片转存中...(img-ycO6RjSl-1705648822031)]

  1. DefaultDispatcherRunnerFactory -> createDispatcherRunner
  2. DefaultDispatcherRunner -> create
  3. DefaultDispatcherRunner -> start
  4. StandaloneLeaderElectionService -> start
  5. DefaultDispatcherRunner -> grantLeadership
  6. DefaultDispatcherRunner -> startNewDispatcherLeaderProcess
  7. AbstractDispatcherLeaderProcess -> start
  8. AbstractDispatcherLeaderProcess -> startInternal
  9. JobDispatcherLeaderProcess -> onStart
  10. DefaultDispatcherGatewayServiceFactory -> create
  11. DefaultDispatcherGatewayServiceFactory[外链图片转存中...(img-wMqKpVun-1705648822031)]
  12. Dispatcher -> onStart[外链图片转存中...(img-9YjzheF8-1705648822031)]

其中:dispatcher服务启动主要进行注册
其中:启动jobMaster

  1. Dispatcher -> startCleanupRetries
  2. Dispatcher -> runCleanupRetry
  3. Dispatcher -> runJob
  4. JobMasterServiceLeadershipRunner -> start
  5. StandaloneLeaderElectionService -> start
  6. JobMasterServiceLeadershipRunner -> grantLeadership
  7. JobMasterServiceLeadershipRunner -> startJobMasterServiceProcessAsync
  8. JobMasterServiceLeadershipRunner -> verifyJobSchedulingStatusAndCreateJobMasterServiceProcess
  9. JobMasterServiceLeadershipRunner -> createNewJobMasterServiceProcess
  10. DefaultJobMasterServiceProcessFactory -> create
  11. DefaultJobMasterServiceProcess -> DefaultJobMasterServiceProcess
  12. DefaultJobMasterServiceFactory -> createJobMasterService
  13. DefaultJobMasterServiceFactory -> internalCreateJobMasterService
  14. 启动[外链图片转存中...(img-BvoyM7U0-1705648822031)]

其中: 启动resourceManager

  1. ResourceManagerServiceImpl -> grantLeadership
  2. ResourceManagerServiceImpl -> startNewLeaderResourceManager
  3. ResourceManagerServiceImpl -> startResourceManagerIfIsLeader
  4. resourceManager -> start[外链图片转存中...(img-KGG3utJ9-1705648822032)]
  5. ResourceManager -> onStart
  6. ResourceManager -> startResourceManagerServices[外链图片转存中...(img-ewVOPCOx-1705648822032)]

initializate():
创建yarn的reasouceManager和nodeManager的client 并启动
[外链图片转存中...(img-2rBTANHn-1705648822032)]

其中:启动jobMaster
JobMaster -> onStart
[外链图片转存中...(img-UskQ4Xgr-1705648822032)]

[外链图片转存中...(img-EcxNNU7m-1705648822032)]

真正启动jobMaster
[外链图片转存中...(img-fTOnYDU4-1705648822033)]

建立连接,开始请求资源

  1. StandaloneLeaderRetrievalService -> start
  2. JobMaster -> notifyOfNewResourceManagerLeader
  3. JobMaster -> reconnectToResourceManager
  4. JobMaster -> tryConnectToResourceManager
  5. JobMaster -> connectToResourceManager[外链图片转存中...(img-T0n1qcbR-1705648822033)]
  6. RegisteredRpcConnection -> start[外链图片转存中...(img-NILsreuz-1705648822033)]

[外链图片转存中...(img-ybOlhge3-1705648822033)]

[外链图片转存中...(img-HqZMTtNE-1705648822033)]

注册成功调用 onRegistrationSuccess(), 向 ResourceManager 进行 slot 的申请

  1. JobMaster -> onRegistrationSuccess
  2. JobMaster -> establishResourceManagerConnection[外链图片转存中...(img-pZPmzMDo-1705648822034)]
  3. DeclarativeSlotPoolService -> connectToResourceManager
  4. ResourceManager -> declareRequiredResources[外链图片转存中...(img-fGEJZ59q-1705648822034)]
  5. FineGrainedSlotManager -> processResourceRequirements[外链图片转存中...(img-XXgbFa1G-1705648822034)]

其中:notifyResourceRequirements

  1. DefaultResourceTracker -> notifyResourceRequirements
  2. JobScopedResourceTracker -> notifyResourceRequirements[外链图片转存中...(img-JFP4mUVB-1705648822034)]

其中:checkResourceRequirementsWithDelay

  1. FineGrainedSlotManager -> checkResourceRequirementsWithDelay
  2. FineGrainedSlotManager -> checkResourceRequirements[外链图片转存中...(img-ABizKy9A-1705648822034)]

[外链图片转存中...(img-H8xdnJBC-1705648822035)]

其中 tryFulfillRequirements
[外链图片转存中...(img-FeN4Xt3E-1705648822035)]

tryFulfilledRequirementWithResource
[外链图片转存中...(img-7ZwYBv11-1705648822035)]

其中:分配资源allocateSlotsAccordingTo
[外链图片转存中...(img-ycuQbtdR-1705648822035)]

[外链图片转存中...(img-9KKKIKIp-1705648822035)]

其中:declareNeededResourcesWithDelay

  1. FineGrainedSlotManager -> declareNeededResourcesWithDelay
  2. FineGrainedSlotManager -> declareNeededResources
  3. ActiveResourceManager -> declareResourceNeeded
  4. ActiveResourceManager -> declareResourceNeeded
  5. ActiveResourceManager -> checkResourceDeclarations
  6. ActiveResourceManager -> requestNewWorker[外链图片转存中...(img-TpRe15Az-1705648822036)]
  7. YarnResourceManagerDriver -> requestResource
  8. YarnResourceManagerDriver -> addContainerRequest[外链图片转存中...(img-pyZpjmDX-1705648822036)]

请求容器 进行回调函数YarnResourceManagerDriver ->onContainersAllocated
2. YarnResourceManagerDriver ->onContainersOfPriorityAllocated
[外链图片转存中...(img-tJLHvIcE-1705648822036)]

  1. YarnResourceManagerDriver -> startTaskExecutorInContainerAsync
  2. YarnResourceManagerDriver -> createTaskExecutorLaunchContext[外链图片转存中...(img-8SWeWaO5-1705648822037)]
  3. Utils.createTaskExecutorContext[外链图片转存中...(img-B1zrP3me-1705648822037)]

所以 能够 启动TaskExecutorRunner main方法

  1. TaskExecutorRunner -> runTaskManagerProcessSecurely
  2. TaskExecutorRunner -> runTaskManagerProcessSecurely[外链图片转存中...(img-b8mbyo3m-1705648822037)]
  3. TaskExecutorRunner -> runTaskManager
  4. TaskExecutorToServiceAdapter -> start[外链图片转存中...(img-JKKolBO2-1705648822037)]

TaskExecutor onstart方法:

  1. TaskExecutor -> onStart[外链图片转存中...(img-2NfQ5kFR-1705648822038)]
  2. TaskExecutor -> startTaskExecutorServices[外链图片转存中...(img-S6cZB0qx-1705648822038)]

其中resourceManagerLeaderRetriever.start

  1. ResourceManagerLeaderListener -> notifyLeaderAddress
  2. TaskExecutor -> notifyOfNewResourceManagerLeader
  3. TaskExecutor -> reconnectToResourceManager
  4. TaskExecutor -> tryConnectToResourceManager
  5. TaskExecutor -> connectToResourceManager[外链图片转存中...(img-kgysiIBl-1705648822038)]

[外链图片转存中...(img-cjtKJsfV-1705648822039)]

其中:newRegistration.startRegistration(); // todo invokeRegistration

  1. RetryingRegistration -> startRegistration
  2. RetryingRegistration -> register
  3. RetryingRegistration -> invokeOperation
  4. TaskExecutorToResourceManagerConnection -> invokeRegistration[外链图片转存中...(img-gH3eCk1N-1705648822039)]
  5. registerTaskExecutor registerTaskExecutorInternal

注册成功调用 onRegistrationSuccess(), 向 ResourceManager 进行 slot 的申请

  1. TaskExecutorToResourceManagerConnection -> onRegistrationSuccess
  2. ResourceManagerRegistrationListener -> onRegistrationSuccess
  3. ResourceManagerRegistrationListener -> establishResourceManagerConnection[外链图片转存中...(img-WAssamTi-1705648822039)]
  4. ResourceManager -> sendSlotReport[外链图片转存中...(img-7xolryvZ-1705648822039)]
  5. FineGrainedSlotManager -> registerTaskManager 进行allocateSlot[外链图片转存中...(img-OcU34w3z-1705648822040)]

[外链图片转存中...(img-P3YbfxiK-1705648822040)]

  1. DefaultSlotStatusSyncer -> allocateSlot
  2. TaskExecutor(!!!) -> requestSlot[外链图片转存中...(img-FSCBjZMw-1705648822040)]
  3. TaskExecutor -> requestSlot

其中 allocateSlotForJob:

  1. TaskExecutor -> allocateSlotForJob
  2. TaskExecutor -> allocateSlot
  3. TaskSlotTableImpl -> allocateSlot[外链图片转存中...(img-8IcyrUGN-1705648822041)]

其中:offerSlotsToJobManager

  1. TaskExecutor -> internalOfferSlotsToJobManager
  2. JobMaster -> offerSlots
  3. DeclarativeSlotPoolService -> offerSlots
  4. TestingDeclarativeSlotPool -> offerSlots
  5. TestingDeclarativeSlotPool -> internalOfferSlots[外链图片转存中...(img-ldrX1KKH-1705648822041)]
标签: flink java

本文转载自: https://blog.csdn.net/xiaochaochoa/article/details/135699334
版权归原作者 必吃早饭选手 所有, 如有侵权,请联系我们删除。

“flink提交流程源码”的评论:

还没有评论