Flink JobMaster源码启动入口从Dispatcher.runJob()方法处开始,下面让我们一起进入到JobMaster的源码分析中。
- Dispatcher.runJob()源码
/** * 启动jobmanagerrunner * 这是启动jobmaster并且调度任务执行的主要方法,后面代码都是围绕核心进行各种设置,此处代码内部实现jobmaster创建,jobgraph到executiongraph的转化 * 以及注册和心跳服务处理 */jobManagerRunner.start();
- 继续追踪堆栈
jobManagerRunner.start();leaderElectionService.start(this);contender.grantLeadership(HighAvailabilityServices.DEFAULT_LEADER_ID);startJobMasterServiceProcessAsync(leaderSessionID)verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(leaderSessionId)createNewJobMasterServiceProcess(leaderSessionId);jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);return new DefaultJobMasterServiceProcess( jobId, leaderSessionId, jobMasterServiceFactory, cause -> createArchivedExecutionGraph(JobStatus.FAILED, cause));this.jobMasterServiceFuture = jobMasterServiceFactory.createJobMasterService(leaderSessionId, this);internalCreateJobMasterService(leaderSessionId, onCompletionActions)
上面属于源码跳转,这里保留关键流程堆栈信息 - 在
internalCreateJobMasterService()
方法内部终于看到了jobamaster核心源码了,如下:// 1 创建jobmaster服务,在创建jobmaster的过程中执行jobgraph到executiongraph的转换操作finalJobMaster jobMaster =newJobMaster( rpcService,JobMasterId.fromUuidOrNull(leaderSessionId), jobMasterConfiguration,ResourceID.generate(), jobGraph, haServices, slotPoolServiceSchedulerFactory, jobManagerSharedServices, heartbeatServices, jobManagerJobMetricGroupFactory, onCompletionActions, fatalErrorHandler, userCodeClassloader, shuffleMaster, lookup ->newJobMasterPartitionTrackerImpl(jobGraph.getJobID(), shuffleMaster, lookup),newDefaultExecutionDeploymentTracker(),DefaultExecutionDeploymentReconciler::new,BlocklistUtils.loadBlocklistHandlerFactory(jobMasterConfiguration.getConfiguration()), initializationTimestamp );// 2 启动jobmaster服务,在内部调用了rpcServer.start();因此会跳转到JobMaster的onStart()方法执行;jobMaster.start();
在new JobMaster()
内执行了jobgraph到executiongraph的转换操作,这块内容见ExecutionGraph源码分析;下面让我们追踪start()
方法,跟踪jobmaster启动过程中都做了哪些具体操作。 - 进入JobMaster.onStrart()方法
startJobExecution();
- startJobExecution()源码如下:
privatevoidstartJobExecution()throwsException{validateRunsInMainThread();JobShuffleContext context =newJobShuffleContextImpl(jobGraph.getJobID(),this); shuffleMaster.registerJob(context);// 启动jobmaster服务,注册心跳和创建监听服务// 向rm注册,维持心跳服务(两个心跳服务:1-和rm的心跳 2-和tm的心跳)// 启动slotpool服务// 申请创建TaskManager startJobMasterServices();// 开始调度startScheduling();}
这里启动jobmaster服务,注册心跳同时创建了监听服务,在jobmaster内部创建了slotpool,用于维护整个任务的资源。TaskManager启动过程详见TaskManager启动源码解析 - 下面追踪
startScheduling();``````startScheduling();schedulerNG.startScheduling();startSchedulingInternal();schedulingStrategy.startScheduling();maybeScheduleRegions(sourceRegions);SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology, regionsToSchedule).forEach(this::scheduleRegion);schedulerOperations.allocateSlotsAndDeploy(regionVerticesSorted.get(region));executionDeployer.allocateSlotsAndDeploy(executionsToDeploy, requiredVersionByVertex);
- allocateSlotsAndDeploy()源码:
publicvoidallocateSlotsAndDeploy(finalList<Execution> executionsToDeploy,// 5个执行器finalMap<ExecutionVertexID,ExecutionVertexVersion> requiredVersionByVertex // 版本信息){validateExecutionStates(executionsToDeploy);transitionToScheduled(executionsToDeploy);// 1-申请资源/** * 这段代码的作用是根据调度策略为需要部署的任务分配执行 slot,生成 ExecutionSlotAssignment 列表。 * * 在 Flink 中,执行 slot 是 Flink 集群中资源分配的基本单位,用于承载任务的执行。调度器根据任务的资源需求和调度策略,将任务分配给不同的 slot 进行执行。 * * 具体来说,这个方法会将需要部署的任务集合 executionsToDeploy 传入 allocateSlotsFor 方法中,该方法会按照预定义的调度策略为每个任务分配执行 slot。执行 slot 的分配策略一般包括以下几个方面的考虑: * * 任务资源需求,如 CPU、内存、IO 等 * 集群资源情况,如空闲的 slot 数量、其他任务占用的 slot 等 * 任务优先级和调度策略,如 FIFO、FAIR、PRIORITY 等 * 分配执行 slot 后,该方法会返回一个 List,其中包含了为每个任务分配的执行 slot 的信息,包括任务 ID、分配的 slot ID 等。这些信息将被传递给后续的部署流程,用于将任务分配到相应的 slot 上进行执行。 */finalList<ExecutionSlotAssignment> executionSlotAssignments =allocateSlotsFor(executionsToDeploy);/** * 获取部署句柄地址,这里返回结果封装了执行器,资源类和版本3个参数的类的集合(共5个元素) */finalList<ExecutionDeploymentHandle> deploymentHandles =createDeploymentHandles(executionsToDeploy, requiredVersionByVertex, executionSlotAssignments);// 2-调度执行waitForAllSlotsAndDeploy(deploymentHandles);}
- 继续
waitForAllSlotsAndDeploy(deploymentHandles);assignAllResourcesAndRegisterProducedPartitions(deploymentHandles).handle(deployAll(deploymentHandles))FutureUtils.assertNoException(slotAssigned.handle(deployOrHandleError(deploymentHandle)));deployTaskSafe(execution);executionOperations.deploy(execution);execution.deploy();final TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory.fromExecution(this) .createDeploymentDescriptor( slot.getAllocationId(), taskRestore, producedPartitions.values() );taskManagerGateway.submitTask(deployment, rpcTimeout)taskExecutorGateway.submitTask(tdd, jobMasterId, timeout);
- 启动task
Task task = new Task( jobInformation, taskInformation, tdd.getExecutionAttemptId(), tdd.getAllocationId(), tdd.getProducedPartitions(), // 输出结果集 tdd.getInputGates(), // 上游输入数据门户 memoryManager, taskExecutorServices.getIOManager(), taskExecutorServices.getShuffleEnvironment(), taskExecutorServices.getKvStateService(), taskExecutorServices.getBroadcastVariableManager(), taskExecutorServices.getTaskEventDispatcher(), externalResourceInfoProvider, taskStateManager, taskManagerActions, inputSplitProvider, checkpointResponder, taskOperatorEventGateway, aggregateManager, classLoaderHandle, fileCache, taskManagerConfiguration, taskMetricGroup, partitionStateChecker, getRpcService().getScheduledExecutor() );task.startTaskThread();
之后会跳转到Task.run()开始执行Task。
本文转载自: https://blog.csdn.net/wen811651208/article/details/138203485
版权归原作者 GawynKing 所有, 如有侵权,请联系我们删除。
版权归原作者 GawynKing 所有, 如有侵权,请联系我们删除。