flink源码系列总述
本文基于flink-1.17版本,对于flink源码学习了解,仅作为个人学习笔记,如有错误,欢迎指正。
flink提交流程源码 流程解析
看以下流程时,请及时参考本图
- CliFrontend 客户端
- YarnJobClusterEntrypoint AM执行的入口
- YarnTaskExecutorRunner Yarn模式下TaskManager的入口类
1. CliFrontend 客户端
提交命令
通过flink on yarn per-job模式提交,查看flink脚本可以看到,程序被提交后,会寻找CliFrontend类
CliFrotend n main方法入口
其中: parseAndRun方法
其中:parseAndRun的run中:
获取有效配置中:
执行程序中:
执行用户代码:
env.execute() ->StreamExecutionEnvironment.execute()
获取StreamGraph StreamExecutionEnvironment类
获取PipelineExecutor执行器 执行
生成jobGraph YarnJobClusterExecutor类
其中1:
创建yarnClient 生成YarnClusterDescriptor YarnClusterClientFactory类
其中2:
启动am ApplicationMaster
2.在部署集群的时候,已经启动了ApplicationMaster
3.ApplicationMaster 执行过程
YarnJobClusterEntrypoint 启动入口 main方法
工厂类构建和对象创建
对象创建:
创建resourceManager
创建并启动dispatcher 启动resourceManager
其中:创建并启动dispatcher
- DefaultDispatcherRunnerFactory -> createDispatcherRunner
- DefaultDispatcherRunner -> create
- DefaultDispatcherRunner -> start
- StandaloneLeaderElectionService -> start
- DefaultDispatcherRunner -> grantLeadership
- DefaultDispatcherRunner -> startNewDispatcherLeaderProcess
- AbstractDispatcherLeaderProcess -> start
- AbstractDispatcherLeaderProcess -> startInternal
- JobDispatcherLeaderProcess -> onStart
- DefaultDispatcherGatewayServiceFactory -> create
- DefaultDispatcherGatewayServiceFactory
- Dispatcher -> onStart
其中:dispatcher服务启动主要进行注册
其中:启动jobMaster
- Dispatcher -> startCleanupRetries
- Dispatcher -> runCleanupRetry
- Dispatcher -> runJob
- JobMasterServiceLeadershipRunner -> start
- StandaloneLeaderElectionService -> start
- JobMasterServiceLeadershipRunner -> grantLeadership
- JobMasterServiceLeadershipRunner -> startJobMasterServiceProcessAsync
- JobMasterServiceLeadershipRunner -> verifyJobSchedulingStatusAndCreateJobMasterServiceProcess
- JobMasterServiceLeadershipRunner -> createNewJobMasterServiceProcess
- DefaultJobMasterServiceProcessFactory -> create
- DefaultJobMasterServiceProcess -> DefaultJobMasterServiceProcess
- DefaultJobMasterServiceFactory -> createJobMasterService
- DefaultJobMasterServiceFactory -> internalCreateJobMasterService
- 启动
其中: 启动resourceManager
- ResourceManagerServiceImpl -> grantLeadership
- ResourceManagerServiceImpl -> startNewLeaderResourceManager
- ResourceManagerServiceImpl -> startResourceManagerIfIsLeader
- resourceManager -> start
- ResourceManager -> onStart
- ResourceManager -> startResourceManagerServices
initializate():
创建yarn的reasouceManager和nodeManager的client 并启动
其中:启动jobMaster
JobMaster -> onStart
真正启动jobMaster
建立连接,开始请求资源
- StandaloneLeaderRetrievalService -> start
- JobMaster -> notifyOfNewResourceManagerLeader
- JobMaster -> reconnectToResourceManager
- JobMaster -> tryConnectToResourceManager
- JobMaster -> connectToResourceManager
- RegisteredRpcConnection -> start
注册成功调用 onRegistrationSuccess(), 向 ResourceManager 进行 slot 的申请
- JobMaster -> onRegistrationSuccess
- JobMaster -> establishResourceManagerConnection
- DeclarativeSlotPoolService -> connectToResourceManager
- ResourceManager -> declareRequiredResources
- FineGrainedSlotManager -> processResourceRequirements
其中:notifyResourceRequirements
- DefaultResourceTracker -> notifyResourceRequirements
- JobScopedResourceTracker -> notifyResourceRequirements
其中:checkResourceRequirementsWithDelay
- FineGrainedSlotManager -> checkResourceRequirementsWithDelay
- FineGrainedSlotManager -> checkResourceRequirements
其中 tryFulfillRequirements
tryFulfilledRequirementWithResource
其中:分配资源allocateSlotsAccordingTo
其中:declareNeededResourcesWithDelay
- FineGrainedSlotManager -> declareNeededResourcesWithDelay
- FineGrainedSlotManager -> declareNeededResources
- ActiveResourceManager -> declareResourceNeeded
- ActiveResourceManager -> declareResourceNeeded
- ActiveResourceManager -> checkResourceDeclarations
- ActiveResourceManager -> requestNewWorker
- YarnResourceManagerDriver -> requestResource
- YarnResourceManagerDriver -> addContainerRequest
请求容器 进行回调函数YarnResourceManagerDriver ->onContainersAllocated
2. YarnResourceManagerDriver ->onContainersOfPriorityAllocated
- YarnResourceManagerDriver -> startTaskExecutorInContainerAsync
- YarnResourceManagerDriver -> createTaskExecutorLaunchContext
- Utils.createTaskExecutorContext
所以 能够 启动TaskExecutorRunner main方法
- TaskExecutorRunner -> runTaskManagerProcessSecurely
- TaskExecutorRunner -> runTaskManagerProcessSecurely
- TaskExecutorRunner -> runTaskManager
- TaskExecutorToServiceAdapter -> start
TaskExecutor onstart方法:
- TaskExecutor -> onStart
- TaskExecutor -> startTaskExecutorServices
其中resourceManagerLeaderRetriever.start
- ResourceManagerLeaderListener -> notifyLeaderAddress
- TaskExecutor -> notifyOfNewResourceManagerLeader
- TaskExecutor -> reconnectToResourceManager
- TaskExecutor -> tryConnectToResourceManager
- TaskExecutor -> connectToResourceManager
其中:newRegistration.startRegistration(); // todo invokeRegistration
- RetryingRegistration -> startRegistration
- RetryingRegistration -> register
- RetryingRegistration -> invokeOperation
- TaskExecutorToResourceManagerConnection -> invokeRegistration
- registerTaskExecutor registerTaskExecutorInternal
注册成功调用 onRegistrationSuccess(), 向 ResourceManager 进行 slot 的申请
- TaskExecutorToResourceManagerConnection -> onRegistrationSuccess
- ResourceManagerRegistrationListener -> onRegistrationSuccess
- ResourceManagerRegistrationListener -> establishResourceManagerConnection
- ResourceManager -> sendSlotReport
- FineGrainedSlotManager -> registerTaskManager 进行allocateSlot
- DefaultSlotStatusSyncer -> allocateSlot
- TaskExecutor(!!!) -> requestSlot
- TaskExecutor -> requestSlot
其中 allocateSlotForJob:
- TaskExecutor -> allocateSlotForJob
- TaskExecutor -> allocateSlot
- TaskSlotTableImpl -> allocateSlot
其中:offerSlotsToJobManager
- TaskExecutor -> internalOfferSlotsToJobManager
- JobMaster -> offerSlots
- DeclarativeSlotPoolService -> offerSlots
- TestingDeclarativeSlotPool -> offerSlots
- TestingDeclarativeSlotPool -> internalOfferSlots
版权归原作者 必吃早饭选手 所有, 如有侵权,请联系我们删除。