flink源码系列总述
本文基于flink-1.17版本,对于flink源码学习了解,仅作为个人学习笔记,如有错误,欢迎指正。
flink提交流程源码 流程解析
看以下流程时,请及时参考本图
- CliFrontend 客户端
- YarnJobClusterEntrypoint AM执行的入口
- YarnTaskExecutorRunner Yarn模式下TaskManager的入口类
1. CliFrontend 客户端
提交命令
通过flink on yarn per-job模式提交,查看flink脚本可以看到,程序被提交后,会寻找CliFrontend类
CliFrotend n main方法入口

其中: parseAndRun方法
其中:parseAndRun的run中:![[外链图片转存中...(img-XcrKfdwd-1705648822026)]](https://img-blog.csdnimg.cn/direct/40a8d916302d4691b826bc1617cc5f23.png)
获取有效配置中:![[外链图片转存中...(img-RXyd4Ols-1705648822027)]](https://img-blog.csdnimg.cn/direct/78ebe52fa89145cf9ca546597c08a076.png)
执行程序中:![[外链图片转存中...(img-zB0E1XZ9-1705648822027)]](https://img-blog.csdnimg.cn/direct/908f57bb208b40939445f7ec230ae5eb.png)
执行用户代码:
env.execute() ->StreamExecutionEnvironment.execute()
获取StreamGraph StreamExecutionEnvironment类
获取PipelineExecutor执行器 执行
生成jobGraph YarnJobClusterExecutor类
其中1:
创建yarnClient 生成YarnClusterDescriptor YarnClusterClientFactory类![[外链图片转存中...(img-wQfYeuLu-1705648822029)]](https://img-blog.csdnimg.cn/direct/c6752433ec8b4cd0ba6745958dffa0d2.png)
其中2:
启动am ApplicationMaster![[外链图片转存中...(img-iqfHF7tv-1705648822029)]](https://img-blog.csdnimg.cn/direct/d199813b16da470780c1727c1685b8ec.png)
![[外链图片转存中...(img-t0YIgpEu-1705648822029)]](https://img-blog.csdnimg.cn/direct/390c4e0cab2f48a1aa6c9f4c48378aea.png)
2.在部署集群的时候,已经启动了ApplicationMaster
3.ApplicationMaster 执行过程
YarnJobClusterEntrypoint 启动入口 main方法![[外链图片转存中...(img-VwNRfJwZ-1705648822030)]](https://img-blog.csdnimg.cn/direct/715f7ee140414a2d971bed6664f37538.png)
工厂类构建和对象创建
对象创建:
创建resourceManager![[外链图片转存中...(img-Mc5iP1Qy-1705648822030)]](https://img-blog.csdnimg.cn/direct/693ec1fa9b65451d9c347c16118675ef.png)
创建并启动dispatcher 启动resourceManager![[外链图片转存中...(img-8QK5s2E7-1705648822030)]](https://img-blog.csdnimg.cn/direct/560b68bdab26410187d113af0cb516eb.png)
其中:创建并启动dispatcher![[外链图片转存中...(img-ycO6RjSl-1705648822031)]](https://img-blog.csdnimg.cn/direct/afd3d30d50f8490e86dbe50aadfe14b6.png)
- DefaultDispatcherRunnerFactory -> createDispatcherRunner
- DefaultDispatcherRunner -> create
- DefaultDispatcherRunner -> start
- StandaloneLeaderElectionService -> start
- DefaultDispatcherRunner -> grantLeadership
- DefaultDispatcherRunner -> startNewDispatcherLeaderProcess
- AbstractDispatcherLeaderProcess -> start
- AbstractDispatcherLeaderProcess -> startInternal
- JobDispatcherLeaderProcess -> onStart
- DefaultDispatcherGatewayServiceFactory -> create
- DefaultDispatcherGatewayServiceFactory
![[外链图片转存中...(img-wMqKpVun-1705648822031)]](https://img-blog.csdnimg.cn/direct/cc6dbe7ad3034db0a281804a4a9c24fb.png)
- Dispatcher -> onStart
![[外链图片转存中...(img-9YjzheF8-1705648822031)]](https://img-blog.csdnimg.cn/direct/070d6a2529da486cbfeba7d94f113a84.png)
其中:dispatcher服务启动主要进行注册
其中:启动jobMaster
- Dispatcher -> startCleanupRetries
- Dispatcher -> runCleanupRetry
- Dispatcher -> runJob
- JobMasterServiceLeadershipRunner -> start
- StandaloneLeaderElectionService -> start
- JobMasterServiceLeadershipRunner -> grantLeadership
- JobMasterServiceLeadershipRunner -> startJobMasterServiceProcessAsync
- JobMasterServiceLeadershipRunner -> verifyJobSchedulingStatusAndCreateJobMasterServiceProcess
- JobMasterServiceLeadershipRunner -> createNewJobMasterServiceProcess
- DefaultJobMasterServiceProcessFactory -> create
- DefaultJobMasterServiceProcess -> DefaultJobMasterServiceProcess
- DefaultJobMasterServiceFactory -> createJobMasterService
- DefaultJobMasterServiceFactory -> internalCreateJobMasterService
- 启动
![[外链图片转存中...(img-BvoyM7U0-1705648822031)]](https://img-blog.csdnimg.cn/direct/99a040c19f0a488c948830cbce28566e.png)
其中: 启动resourceManager
- ResourceManagerServiceImpl -> grantLeadership
- ResourceManagerServiceImpl -> startNewLeaderResourceManager
- ResourceManagerServiceImpl -> startResourceManagerIfIsLeader
- resourceManager -> start
![[外链图片转存中...(img-KGG3utJ9-1705648822032)]](https://img-blog.csdnimg.cn/direct/09ed6d18491643a39e9d0e08b86c70ec.png)
- ResourceManager -> onStart
- ResourceManager -> startResourceManagerServices
![[外链图片转存中...(img-ewVOPCOx-1705648822032)]](https://img-blog.csdnimg.cn/direct/60edb6ab0957490eabc88b6cdc5906a1.png)
initializate():
创建yarn的reasouceManager和nodeManager的client 并启动![[外链图片转存中...(img-2rBTANHn-1705648822032)]](https://img-blog.csdnimg.cn/direct/ff96eafdeb894c29a8fdfd7410749e60.png)
其中:启动jobMaster
JobMaster -> onStart![[外链图片转存中...(img-UskQ4Xgr-1705648822032)]](https://img-blog.csdnimg.cn/direct/3af9cc9cd2d64e03b7b4375d379382f2.png)
![[外链图片转存中...(img-EcxNNU7m-1705648822032)]](https://img-blog.csdnimg.cn/direct/86737c0616b647249e95078a92075588.png)
真正启动jobMaster![[外链图片转存中...(img-fTOnYDU4-1705648822033)]](https://img-blog.csdnimg.cn/direct/791a438f6b7a4d6397e243fe992d06f3.png)
建立连接,开始请求资源
- StandaloneLeaderRetrievalService -> start
- JobMaster -> notifyOfNewResourceManagerLeader
- JobMaster -> reconnectToResourceManager
- JobMaster -> tryConnectToResourceManager
- JobMaster -> connectToResourceManager
![[外链图片转存中...(img-T0n1qcbR-1705648822033)]](https://img-blog.csdnimg.cn/direct/1ffdbd456c5244459e4d1378f1106243.png)
- RegisteredRpcConnection -> start
![[外链图片转存中...(img-NILsreuz-1705648822033)]](https://img-blog.csdnimg.cn/direct/b3485e77a73a4ba590efc357fe78b73a.png)
![[外链图片转存中...(img-ybOlhge3-1705648822033)]](https://img-blog.csdnimg.cn/direct/875c307994f3425982f7e998ce291e85.png)
![[外链图片转存中...(img-HqZMTtNE-1705648822033)]](https://img-blog.csdnimg.cn/direct/c7cc8fb725bd4d4ca7e91d5fefa2a623.png)
注册成功调用 onRegistrationSuccess(), 向 ResourceManager 进行 slot 的申请
- JobMaster -> onRegistrationSuccess
- JobMaster -> establishResourceManagerConnection
![[外链图片转存中...(img-pZPmzMDo-1705648822034)]](https://img-blog.csdnimg.cn/direct/bee05b30e14744bcb8779dae39feb6e0.png)
- DeclarativeSlotPoolService -> connectToResourceManager
- ResourceManager -> declareRequiredResources
![[外链图片转存中...(img-fGEJZ59q-1705648822034)]](https://img-blog.csdnimg.cn/direct/446fc98b9f804e5e938eb50cf5712a45.png)
- FineGrainedSlotManager -> processResourceRequirements
![[外链图片转存中...(img-XXgbFa1G-1705648822034)]](https://img-blog.csdnimg.cn/direct/7b23cfb63e5e491a84b535d83af51128.png)
其中:notifyResourceRequirements
- DefaultResourceTracker -> notifyResourceRequirements
- JobScopedResourceTracker -> notifyResourceRequirements
![[外链图片转存中...(img-JFP4mUVB-1705648822034)]](https://img-blog.csdnimg.cn/direct/22e7aa5629ea4096a4c97805da7b5207.png)
其中:checkResourceRequirementsWithDelay
- FineGrainedSlotManager -> checkResourceRequirementsWithDelay
- FineGrainedSlotManager -> checkResourceRequirements
![[外链图片转存中...(img-ABizKy9A-1705648822034)]](https://img-blog.csdnimg.cn/direct/794b06f32e2a42618733c9693578d22c.png)
![[外链图片转存中...(img-H8xdnJBC-1705648822035)]](https://img-blog.csdnimg.cn/direct/262c7b42ed7b43eab11400a868f6d0f2.png)
其中 tryFulfillRequirements![[外链图片转存中...(img-FeN4Xt3E-1705648822035)]](https://img-blog.csdnimg.cn/direct/be5f2fe5e29a450b93699888ab46b408.png)
tryFulfilledRequirementWithResource![[外链图片转存中...(img-7ZwYBv11-1705648822035)]](https://img-blog.csdnimg.cn/direct/f8cb5992fd54471eb017b6fec3d80b7f.png)
其中:分配资源allocateSlotsAccordingTo![[外链图片转存中...(img-ycuQbtdR-1705648822035)]](https://img-blog.csdnimg.cn/direct/f1afd3ed27aa42db991d190b4d55e8e5.png)
![[外链图片转存中...(img-9KKKIKIp-1705648822035)]](https://img-blog.csdnimg.cn/direct/74735577c1b84d05a23dd3a2137e4683.png)
其中:declareNeededResourcesWithDelay
- FineGrainedSlotManager -> declareNeededResourcesWithDelay
- FineGrainedSlotManager -> declareNeededResources
- ActiveResourceManager -> declareResourceNeeded
- ActiveResourceManager -> declareResourceNeeded
- ActiveResourceManager -> checkResourceDeclarations
- ActiveResourceManager -> requestNewWorker
![[外链图片转存中...(img-TpRe15Az-1705648822036)]](https://img-blog.csdnimg.cn/direct/6c13dd6177e8455fb9b7bd932d162f34.png)
- YarnResourceManagerDriver -> requestResource
- YarnResourceManagerDriver -> addContainerRequest
![[外链图片转存中...(img-pyZpjmDX-1705648822036)]](https://img-blog.csdnimg.cn/direct/d3b1f413cca14695b3a4cae71bccfe9f.png)
请求容器 进行回调函数YarnResourceManagerDriver ->onContainersAllocated
2. YarnResourceManagerDriver ->onContainersOfPriorityAllocated![[外链图片转存中...(img-tJLHvIcE-1705648822036)]](https://img-blog.csdnimg.cn/direct/8bf2879f48dc4944a99653608b942557.png)
- YarnResourceManagerDriver -> startTaskExecutorInContainerAsync
- YarnResourceManagerDriver -> createTaskExecutorLaunchContext
![[外链图片转存中...(img-8SWeWaO5-1705648822037)]](https://img-blog.csdnimg.cn/direct/6e276b157ec94c1ea1afc80c5321caa5.png)
- Utils.createTaskExecutorContext
![[外链图片转存中...(img-B1zrP3me-1705648822037)]](https://img-blog.csdnimg.cn/direct/eb6484d49ff348bf91b077d9b9965fc6.png)
所以 能够 启动TaskExecutorRunner main方法
- TaskExecutorRunner -> runTaskManagerProcessSecurely
- TaskExecutorRunner -> runTaskManagerProcessSecurely
![[外链图片转存中...(img-b8mbyo3m-1705648822037)]](https://img-blog.csdnimg.cn/direct/8ebdec159ef843cbba614f43e9c52075.png)
- TaskExecutorRunner -> runTaskManager
- TaskExecutorToServiceAdapter -> start
![[外链图片转存中...(img-JKKolBO2-1705648822037)]](https://img-blog.csdnimg.cn/direct/6b7a950701d74a54b8b2be69904dee00.png)
TaskExecutor onstart方法:
- TaskExecutor -> onStart
![[外链图片转存中...(img-2NfQ5kFR-1705648822038)]](https://img-blog.csdnimg.cn/direct/49ca3cdd05664437a39fb1be5493fc22.png)
- TaskExecutor -> startTaskExecutorServices
![[外链图片转存中...(img-S6cZB0qx-1705648822038)]](https://img-blog.csdnimg.cn/direct/e3dea755a8df458c88a6d3ce9f0732d1.png)
其中resourceManagerLeaderRetriever.start
- ResourceManagerLeaderListener -> notifyLeaderAddress
- TaskExecutor -> notifyOfNewResourceManagerLeader
- TaskExecutor -> reconnectToResourceManager
- TaskExecutor -> tryConnectToResourceManager
- TaskExecutor -> connectToResourceManager
![[外链图片转存中...(img-kgysiIBl-1705648822038)]](https://img-blog.csdnimg.cn/direct/bfe430eaff234898bfbdeaa3f055b5f2.png)
![[外链图片转存中...(img-cjtKJsfV-1705648822039)]](https://img-blog.csdnimg.cn/direct/a642e14b9d5548e58084c5dd366595ad.png)
其中:newRegistration.startRegistration(); // todo invokeRegistration
- RetryingRegistration -> startRegistration
- RetryingRegistration -> register
- RetryingRegistration -> invokeOperation
- TaskExecutorToResourceManagerConnection -> invokeRegistration
![[外链图片转存中...(img-gH3eCk1N-1705648822039)]](https://img-blog.csdnimg.cn/direct/d99396b8e1494a71ba0531cc14eac5ef.png)
- registerTaskExecutor registerTaskExecutorInternal
注册成功调用 onRegistrationSuccess(), 向 ResourceManager 进行 slot 的申请
- TaskExecutorToResourceManagerConnection -> onRegistrationSuccess
- ResourceManagerRegistrationListener -> onRegistrationSuccess
- ResourceManagerRegistrationListener -> establishResourceManagerConnection
![[外链图片转存中...(img-WAssamTi-1705648822039)]](https://img-blog.csdnimg.cn/direct/931907ad22604fca85004aca0cad57b6.png)
- ResourceManager -> sendSlotReport
![[外链图片转存中...(img-7xolryvZ-1705648822039)]](https://img-blog.csdnimg.cn/direct/a4f6184cb5454eed9de9863b679a3f15.png)
- FineGrainedSlotManager -> registerTaskManager 进行allocateSlot
![[外链图片转存中...(img-OcU34w3z-1705648822040)]](https://img-blog.csdnimg.cn/direct/f0bd49b9b77240a49418b41e9d786d12.png)
![[外链图片转存中...(img-P3YbfxiK-1705648822040)]](https://img-blog.csdnimg.cn/direct/bfa396d5c7344fc29c35a4d4b93b2b31.png)
- DefaultSlotStatusSyncer -> allocateSlot
- TaskExecutor(!!!) -> requestSlot
![[外链图片转存中...(img-FSCBjZMw-1705648822040)]](https://img-blog.csdnimg.cn/direct/8736cb30df394225b7a9aeffe02d3125.png)
- TaskExecutor -> requestSlot
其中 allocateSlotForJob:
- TaskExecutor -> allocateSlotForJob
- TaskExecutor -> allocateSlot
- TaskSlotTableImpl -> allocateSlot
![[外链图片转存中...(img-8IcyrUGN-1705648822041)]](https://img-blog.csdnimg.cn/direct/c54586ef279f4452ad9fb76eb4247b56.png)
其中:offerSlotsToJobManager
- TaskExecutor -> internalOfferSlotsToJobManager
- JobMaster -> offerSlots
- DeclarativeSlotPoolService -> offerSlots
- TestingDeclarativeSlotPool -> offerSlots
- TestingDeclarativeSlotPool -> internalOfferSlots
![[外链图片转存中...(img-ldrX1KKH-1705648822041)]](https://img-blog.csdnimg.cn/direct/255ca348887e4a1db2981cd7211e9ad7.png)
版权归原作者 必吃早饭选手 所有, 如有侵权,请联系我们删除。