0


Flink中ExecutionGraph的构建

一、ExecutionGraph概述

ExecutionGraph

在JobGraph向ExecutionGraph转化的过程中,主要的工作内容根据Operator的并行度来拆分JobVertext,每一个Jobvertex根据自身并行度会拆分成多个ExecutionVertex,使用IntermediateResultPartition对象来接收ExecutionVertex的输出。对于同一个ExecutionVertex的多个输出IntermediaResultPartition对象组成了一个IntermediateResult对象。1.12之后没有了ExecutionEdge 取而代之的是ConsumedPartitionGroup和ConsumedVertexGroup。
在flink的ExecutionGraph中,有一对一和多对多两种模式,当上游节点处于多对多模式时,会遍历所有的ExecutionGraph,时间复杂度为O(n的平方)。这将占用大量内存用于大规模作业。
由于同一ExecutionJobVertex中的ExecutionVertex都是由同一个JobVertext根据并行度划分而来,所以接收他们输出的IntermediaResultpartion的结构是相同的,同理,IntermediateResultPartition所连接的下游的ExecutionJobVertex所有的ExecutionVertex也是结构相同的,因此flink根据ExecutionVertex和IntermediaResultPartition进行分组:对于属于同一个ExecutionJobVertex的所有ExecutionVertex构成了一个ConsumerVertexGroup,所有对此ExecutionJobVertex的输入IntermediatePartition构成了一个ConsumerpaititionGroup。
由于ExecutionEdge被替换成看ConsumerPartitionGroup和ConsumedVertexGroup,所有相同结构分区都连接到同一个下游ConsumedVertexGroup。

二、JobMaster初始化

以session模式为例

publicJobClientexecuteAsync(StreamGraph streamGraph)throwsException{checkNotNull(streamGraph,"StreamGraph cannot be null.");checkNotNull(
                configuration.get(DeploymentOptions.TARGET),"No execution.target specified in your configuration file.");finalPipelineExecutorFactory executorFactory =
                executorServiceLoader.getExecutorFactory(configuration);checkNotNull(
                executorFactory,"Cannot find compatible factory for specified execution.target (=%s)",
                configuration.get(DeploymentOptions.TARGET));CompletableFuture<JobClient> jobClientFuture =
                executorFactory
                        .getExecutor(configuration)// 进入execute()方法.execute(streamGraph, configuration, userClassloader);try{JobClient jobClient = jobClientFuture.get();
            jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient,null));return jobClient;}catch(ExecutionException executionException){finalThrowable strippedException =ExceptionUtils.stripExecutionException(executionException);
            jobListeners.forEach(
                    jobListener -> jobListener.onJobSubmitted(null, strippedException));thrownewFlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
                    strippedException);}}

具体实现类AbstractSessionClusterExecutor per-job的具体实现类是 AbstractJobClusterExecutor

@OverridepublicCompletableFuture<JobClient>execute(@NonnullfinalPipeline pipeline,@NonnullfinalConfiguration configuration,@NonnullfinalClassLoader userCodeClassloader)throwsException{// 这里是将StreamGraph 构建完成 变为JobGraphfinalJobGraph jobGraph =PipelineExecutorUtils.getJobGraph(pipeline, configuration);// try(finalClusterDescriptor<ClusterID> clusterDescriptor =
                clusterClientFactory.createClusterDescriptor(configuration)){finalClusterID clusterID = clusterClientFactory.getClusterId(configuration);checkState(clusterID !=null);// 创建RestClusterClient的Provider ClusterClientprovider// 初始化RestClient RestClient初始化的时候会创建netty客户端// 提交job的netty客户端就是 RestCLusterClient // 接收job的服务端就是jobManager中的WebMonitorEndpoint中启动的netty服务finalClusterClientProvider<ClusterID> clusterClientProvider =
                    clusterDescriptor.retrieve(clusterID);ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();// 这里进行提交 RestClusterClientreturn clusterClient
                    // 调用RestClient内部的netty客户端进行提交 进入submit()方法.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(
                                    jobId ->{ClientUtils.waitUntilJobInitializationFinished(()-> clusterClient.getJobStatus(jobId).get(),()-> clusterClient.requestJobResult(jobId).get(),
                                                userCodeClassloader);return jobId;})).thenApplyAsync(
                            jobID ->(JobClient)newClusterClientJobClientAdapter<>(
                                                    clusterClientProvider,
                                                    jobID,
                                                    userCodeClassloader)).whenCompleteAsync((ignored1, ignored2)-> clusterClient.close());}}
publicCompletableFuture<JobID>submitJob(@NonnullJobGraph jobGraph){CompletableFuture<java.nio.file.Path> jobGraphFileFuture =// 将jobGraph进行持久化成jobgraphFileCompletableFuture.supplyAsync(()->{try{finaljava.nio.file.Path jobGraphFile =// 持久化文件名就是 flink-jobgraph.bin 提交jobGraph到flink集群最终运行的就是这个文件,这个文件最终提交给了WebMonitor(JobSubmitHandler)接收请求来执行处理,JobSubmitHandler在执行处理的时候先进行文件的反序列化Files.createTempFile("flink-jobgraph",".bin");try(ObjectOutputStream objectOut =newObjectOutputStream(Files.newOutputStream(jobGraphFile))){
                                    objectOut.writeObject(jobGraph);}return jobGraphFile;}catch(IOException e){thrownewCompletionException(newFlinkException("Failed to serialize JobGraph.", e));}},
                        executorService);// todo JobGraph进行持久化完成之后,将JobGraphFile添加进上传的文件列表CompletableFuture<Tuple2<JobSubmitRequestBody,Collection<FileUpload>>> requestFuture =
                jobGraphFileFuture.thenApply(
                        jobGraphFile ->{List<String> jarFileNames =newArrayList<>(8);List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames =newArrayList<>(8);Collection<FileUpload> filesToUpload =newArrayList<>(8);// 将jobGraphFile假如待上传的文件列表
                            filesToUpload.add(newFileUpload(
                                            jobGraphFile,RestConstants.CONTENT_TYPE_BINARY));for(Path jar : jobGraph.getUserJars()){
                                jarFileNames.add(jar.getName());// 上传job所需要的jar
                                filesToUpload.add(newFileUpload(Paths.get(jar.toUri()),RestConstants.CONTENT_TYPE_JAR));}for(Map.Entry<String,DistributedCache.DistributedCacheEntry>
                                    artifacts : jobGraph.getUserArtifacts().entrySet()){finalPath artifactFilePath =newPath(artifacts.getValue().filePath);try{// Only local artifacts need to be uploaded.if(!artifactFilePath.getFileSystem().isDistributedFS()){
                                        artifactFileNames.add(newJobSubmitRequestBody.DistributedCacheFile(
                                                        artifacts.getKey(),
                                                        artifactFilePath.getName()));
                                        filesToUpload.add(newFileUpload(Paths.get(artifacts.getValue().filePath),RestConstants.CONTENT_TYPE_BINARY));}}catch(IOException e){thrownewCompletionException(newFlinkException("Failed to get the FileSystem of artifact "+ artifactFilePath
                                                            +".",
                                                    e));}}// 构建提交任务的请求体,包含对应的资源,JobGraphFile以及对应的jar包依赖finalJobSubmitRequestBody requestBody =newJobSubmitRequestBody(
                                            jobGraphFile.getFileName().toString(),
                                            jarFileNames,
                                            artifactFileNames);// 返回一个tuple2 requestBody 和fileToUpload returnTuple2.of(
                                    requestBody,Collections.unmodifiableCollection(filesToUpload));});finalCompletableFuture<JobSubmitResponseBody> submissionFuture =
                requestFuture.thenCompose(
                        requestAndFileUploads ->sendRetriableRequest(JobSubmitHeaders.getInstance(),EmptyMessageParameters.getInstance(),
                                        requestAndFileUploads.f0,
                                        requestAndFileUploads.f1,isConnectionProblemOrServiceUnavailable()));

        submissionFuture
                .thenCombine(jobGraphFileFuture,(ignored, jobGraphFile)-> jobGraphFile).thenAccept(
                        jobGraphFile ->{try{Files.delete(jobGraphFile);}catch(IOException e){LOG.warn("Could not delete temporary file {}.", jobGraphFile, e);}});return submissionFuture
                .thenApply(ignore -> jobGraph.getJobID()).exceptionally((Throwable throwable)->{thrownewCompletionException(newJobSubmissionException(
                                            jobGraph.getJobID(),"Failed to submit JobGraph.",ExceptionUtils.stripCompletionException(throwable)));});}
// todo 反序列化得到jobGraph 并提交给DispatcherprotectedCompletableFuture<JobSubmitResponseBody>handleRequest(@NonnullHandlerRequest<JobSubmitRequestBody,EmptyMessageParameters> request,@NonnullDispatcherGateway gateway)throwsRestHandlerException{// 从请求体重获取jobGraphFile序列化文件finalCollection<File> uploadedFiles = request.getUploadedFiles();finalMap<String,Path> nameToFile =
                uploadedFiles.stream().collect(Collectors.toMap(File::getName,Path::fromLocalFile));if(uploadedFiles.size()!= nameToFile.size()){thrownewRestHandlerException(String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
                            uploadedFiles.size()< nameToFile.size()?"lower":"higher",
                            nameToFile.size(),
                            uploadedFiles.size()),HttpResponseStatus.BAD_REQUEST);}// 获取请求体finalJobSubmitRequestBody requestBody = request.getRequestBody();if(requestBody.jobGraphFileName ==null){thrownewRestHandlerException(String.format("The %s field must not be omitted or be null.",JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),HttpResponseStatus.BAD_REQUEST);}// 将jobGraphFile反序列化得到jobGraph 也就是服务端接收客户端提交的jobGraphCompletableFuture<JobGraph> jobGraphFuture =loadJobGraph(requestBody, nameToFile);// 获取job的jarCollection<Path> jarFiles =getJarFilesToUpload(requestBody.jarFileNames, nameToFile);// 获取job的依赖jar包Collection<Tuple2<String,Path>> artifacts =getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);// 将JobGraph + job jar 和 依赖jar 上传至 BlobServerCompletableFuture<JobGraph> finalizedJobGraphFuture =uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);// 将封装好的 finalizedJobGraphFuture 上传至DispaterCompletableFuture<Acknowledge> jobSubmissionFuture =
                finalizedJobGraphFuture.thenCompose(// 由JobSubmitHandler转交给Dispatcher来进行处理 gateway就是dispatcher的代理对象// 进入gateway.submitJob()
                        jobGraph -> gateway.submitJob(jobGraph, timeout));return jobSubmissionFuture.thenCombine(
                jobGraphFuture,(ack, jobGraph)->newJobSubmitResponseBody("/jobs/"+ jobGraph.getJobID()));}privateCompletableFuture<JobGraph>loadJobGraph(JobSubmitRequestBody requestBody,Map<String,Path> nameToFile)throwsMissingFileException{finalPath jobGraphFile =getPathAndAssertUpload(
                        requestBody.jobGraphFileName,FILE_TYPE_JOB_GRAPH, nameToFile);returnCompletableFuture.supplyAsync(()->{JobGraph jobGraph;try(ObjectInputStream objectIn =newObjectInputStream(
                                    jobGraphFile.getFileSystem().open(jobGraphFile))){
                        jobGraph =(JobGraph) objectIn.readObject();}catch(Exception e){thrownewCompletionException(newRestHandlerException("Failed to deserialize JobGraph.",HttpResponseStatus.BAD_REQUEST,
                                        e));}return jobGraph;},
                executor);}

Dispatcher接收JobGraph并初始化和启动JobMaster

进入gateway.submitJob() 具体实现类是Dispatcher类中的submitJob()

publicCompletableFuture<Acknowledge>submitJob(JobGraph jobGraph,Time timeout){
        log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());try{// 判断jobId是否重复if(isDuplicateJob(jobGraph.getJobID())){returnFutureUtils.completedExceptionally(newDuplicateJobSubmissionException(jobGraph.getJobID()));}elseif(isPartialResourceConfigured(jobGraph)){returnFutureUtils.completedExceptionally(newJobSubmissionException(
                                jobGraph.getJobID(),"Currently jobs is not supported if parts of the vertices have "+"resources configured. The limitation will be removed in future versions."));}else{// 提交Job,此时JobGraph所需要的jar都已经上传完毕 JobGraph 会在启动JobMaster的时候 用来构建ExecutionGraph 进入 InternalSunbmitJob()方法returninternalSubmitJob(jobGraph);}}catch(FlinkException e){returnFutureUtils.completedExceptionally(e);}}

进入internalSubmitJob(jobGraph);

privateCompletableFuture<Acknowledge>internalSubmitJob(JobGraph jobGraph){
        log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());finalCompletableFuture<Acknowledge> persistAndRunFuture =// todo 先持久化,然后运行(JobMaster) this::persistAndRunJobwaitForTerminatingJob(jobGraph.getJobID(), jobGraph,// 进入this::persistAndRunJob()方法this::persistAndRunJob).thenApply(ignored ->Acknowledge.get());return persistAndRunFuture.handleAsync((acknowledge, throwable)->{if(throwable !=null){cleanUpJobData(jobGraph.getJobID(),true);ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);finalThrowable strippedThrowable =ExceptionUtils.stripCompletionException(throwable);
                        log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);thrownewCompletionException(newJobSubmissionException(
                                        jobGraph.getJobID(),"Failed to submit job.",
                                        strippedThrowable));}else{return acknowledge;}},
                ioExecutor);}
privatevoidpersistAndRunJob(JobGraph jobGraph)throwsException{// todo 服务端保存JobGraph此时是将JobGraph持久化到文件系统,比如HDFS 并且返回一个句柄,并将句柄状态保存在zookeeper中// 与此同时主节点在启动dispatcher时也会启动一个JobGraphStore服务,如果JobGraph里边有未执行完成的JobGraph,会先进行恢复// jobGraphWriter=DefaultGraphStore
        jobGraphWriter.putJobGraph(jobGraph);runJob(jobGraph,ExecutionType.SUBMISSION);}privatevoidrunJob(JobGraph jobGraph,ExecutionType executionType)throwsException{Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));long initializationTimestamp =System.currentTimeMillis();// 创建JobManagerRunner启动器,内部会初始化DefaultJobMasterServiceProcessFactory对象// 创建JobMaster实例// 在创建JobMaster的时候同时会把JobGraph构建成ExectionGraphJobManagerRunner jobManagerRunner =createJobManagerRunner(jobGraph, initializationTimestamp);// flink集群有两个主从架构:// 1.资源管理器ResourceManager + taskExecutor// 2.任务运行 JobMaster + StreamTask
        runningJobs.put(jobGraph.getJobID(), jobManagerRunner);finalJobID jobId = jobGraph.getJobID();finalCompletableFuture<CleanupJobState> cleanupJobStateFuture =
                jobManagerRunner
                        .getResultFuture().handleAsync((jobManagerRunnerResult, throwable)->{Preconditions.checkState(
                                            runningJobs.get(jobId)== jobManagerRunner,"The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner.");if(jobManagerRunnerResult !=null){returnhandleJobManagerRunnerResult(
                                                jobManagerRunnerResult, executionType);}else{returnjobManagerRunnerFailed(jobId, throwable);}},getMainThreadExecutor());finalCompletableFuture<Void> jobTerminationFuture =
                cleanupJobStateFuture
                        .thenApply(cleanupJobState ->removeJob(jobId, cleanupJobState)).thenCompose(Function.identity());FutureUtils.assertNoException(jobTerminationFuture);registerJobManagerRunnerTerminationFuture(jobId, jobTerminationFuture);}
JobManagerRunnercreateJobManagerRunner(JobGraph jobGraph,long initializationTimestamp)throwsException{finalRpcService rpcService =getRpcService();// todo 构建JobManagerRunner 内部封装了一个DefaultJobMasterServiceProcessFactory// 此对象内部会在后面leader竞选完成后构建JobMaster并启动JobManagerRunner runner =
                jobManagerRunnerFactory.createJobManagerRunner(
                        jobGraph,
                        configuration,
                        rpcService,
                        highAvailabilityServices,
                        heartbeatServices,
                        jobManagerSharedServices,newDefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                        fatalErrorHandler,
                        initializationTimestamp);// 开始进行JobMaster的选举,选举成功后会在ZookeeperLeaderElectionDriver的isLeader()创建JobMaster
        runner.start();return runner;}
publicJobManagerRunnercreateJobManagerRunner(JobGraph jobGraph,Configuration configuration,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,HeartbeatServices heartbeatServices,JobManagerSharedServices jobManagerServices,JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,FatalErrorHandler fatalErrorHandler,long initializationTimestamp)throwsException{checkArgument(jobGraph.getNumberOfVertices()>0,"The given job is empty");finalJobMasterConfiguration jobMasterConfiguration =JobMasterConfiguration.fromConfiguration(configuration);finalRunningJobsRegistry runningJobsRegistry =
                highAvailabilityServices.getRunningJobsRegistry();// todo 获取选举服务,准本进行JobMaster的leader选举finalLeaderElectionService jobManagerLeaderElectionService =
                highAvailabilityServices.getJobManagerLeaderElectionService(jobGraph.getJobID());finalSlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
                        configuration, jobGraph.getJobType());if(jobMasterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER_MODE)==SchedulerExecutionMode.REACTIVE){Preconditions.checkState(
                    slotPoolServiceSchedulerFactory.getSchedulerType()==JobManagerOptions.SchedulerType.Adaptive,"Adaptive Scheduler is required for reactive mode");}finalShuffleMaster<?> shuffleMaster =ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);finalLibraryCacheManager.ClassLoaderLease classLoaderLease =
                jobManagerServices
                        .getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID());finalClassLoader userCodeClassLoader =
                classLoaderLease
                        .getOrResolveClassLoader(
                                jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths()).asClassLoader();// 构建DefaultJobMasterServiceFactory,封装了JobMaster启动所需的基础服务finalDefaultJobMasterServiceFactory jobMasterServiceFactory =newDefaultJobMasterServiceFactory(
                        jobManagerServices.getScheduledExecutorService(),
                        rpcService,
                        jobMasterConfiguration,
                        jobGraph,
                        highAvailabilityServices,
                        slotPoolServiceSchedulerFactory,
                        jobManagerServices,
                        heartbeatServices,
                        jobManagerJobMetricGroupFactory,
                        fatalErrorHandler,
                        userCodeClassLoader,
                        shuffleMaster,
                        initializationTimestamp);finalDefaultJobMasterServiceProcessFactory jobMasterServiceProcessFactory =newDefaultJobMasterServiceProcessFactory(
                        jobGraph.getJobID(),
                        jobGraph.getName(),
                        jobGraph.getCheckpointingSettings(),
                        initializationTimestamp,
                        jobMasterServiceFactory);returnnewJobMasterServiceLeadershipRunner(
                jobMasterServiceProcessFactory,
                jobManagerLeaderElectionService,
                runningJobsRegistry,
                classLoaderLease,
                fatalErrorHandler);}

JobMaster的Leader的选举

在选举完成成功后则会回调当前类的isLeader方法,直接查看onGrantLeadership()

@OverridepublicvoidisLeader(){
        leaderElectionEventHandler.onGrantLeadership();}
@Override@GuardedBy("lock")publicvoidonGrantLeadership(){synchronized(lock){if(running){
                issuedLeaderSessionID =UUID.randomUUID();clearConfirmedLeaderInformation();if(LOG.isDebugEnabled()){LOG.debug("Grant leadership to contender {} with session ID {}.",
                            leaderContender.getDescription(),
                            issuedLeaderSessionID);}/**
                    leaderContender有四种情况
                    1.Dispatcher=DefaultDispatcherRuuner
                    2.JobMaster=JobMasterServiceLeadershipRunner
                    3.ResourceManager=ResourceManager
                    4.WebmonitorEndpoint=WebmonitorEndpoint
                */
                leaderContender.grantLeadership(issuedLeaderSessionID);}else{if(LOG.isDebugEnabled()){LOG.debug("Ignoring the grant leadership notification since the {} has "+"already been closed.",
                            leaderElectionDriver);}}}}

选择JobMasterServiceLeadershipRunner实现

@OverridepublicvoidgrantLeadership(UUID leaderSessionID){runIfStateRunning(()->startJobMasterServiceProcessAsync(leaderSessionID),"starting a new JobMasterServiceProcess");}

进入startJobMasterServiceProcessAsync()

@GuardedBy("lock")privatevoidstartJobMasterServiceProcessAsync(UUID leaderSessionId){
        sequentialOperation =
                sequentialOperation.thenRun(// 校验leader状态()->runIfValidLeader(
                                        leaderSessionId,ThrowingRunnable.unchecked(// 创建JobMaster并启动()->verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
                                                                leaderSessionId)),"verify job scheduling status and create JobMasterServiceProcess"));handleAsyncOperationError(sequentialOperation,"Could not start the job manager.");}
@GuardedBy("lock")privatevoidverifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)throwsFlinkException{finalRunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =getJobSchedulingStatus();if(jobSchedulingStatus ==RunningJobsRegistry.JobSchedulingStatus.DONE){jobAlreadyDone();}else{// 创建JobMaster并启动createNewJobMasterServiceProcess(leaderSessionId);}}
@GuardedBy("lock")privatevoidcreateNewJobMasterServiceProcess(UUID leaderSessionId)throwsFlinkException{Preconditions.checkState(jobMasterServiceProcess.closeAsync().isDone());LOG.debug("Create new JobMasterServiceProcess because we were granted leadership under {}.",
                leaderSessionId);try{// 状态注册,标识当前job为Running状态
            runningJobsRegistry.setJobRunning(getJobID());}catch(IOException e){thrownewFlinkException(String.format("Failed to set the job %s to running in the running jobs registry.",getJobID()),
                    e);}// 创建并启动JobMaster 进入create()
                jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);forwardIfValidLeader(
                leaderSessionId,
                jobMasterServiceProcess.getJobMasterGatewayFuture(),
                jobMasterGatewayFuture,"JobMasterGatewayFuture from JobMasterServiceProcess");forwardResultFuture(leaderSessionId, jobMasterServiceProcess.getResultFuture());confirmLeadership(leaderSessionId, jobMasterServiceProcess.getLeaderAddressFuture());}
@OverridepublicJobMasterServiceProcesscreate(UUID leaderSessionId){// 查看DefaultJobMasterServiceProcess的构造方法returnnewDefaultJobMasterServiceProcess(
                jobId,
                leaderSessionId,
                jobMasterServiceFactory,
                cause ->createArchivedExecutionGraph(JobStatus.FAILED, cause));}
publicDefaultJobMasterServiceProcess(JobID jobId,UUID leaderSessionId,JobMasterServiceFactory jobMasterServiceFactory,Function<Throwable,ArchivedExecutionGraph> failedArchivedExecutionGraphFactory){this.jobId = jobId;this.leaderSessionId = leaderSessionId;// 创建JobMasterthis.jobMasterServiceFuture =
                jobMasterServiceFactory.createJobMasterService(leaderSessionId,this);

        jobMasterServiceFuture.whenComplete((jobMasterService, throwable)->{if(throwable !=null){finalJobInitializationException jobInitializationException =newJobInitializationException(
                                        jobId,"Could not start the JobMaster.", throwable);LOG.debug("Initialization of the JobMasterService for job {} under leader id {} failed.",
                                jobId,
                                leaderSessionId,
                                jobInitializationException);

                        resultFuture.complete(JobManagerRunnerResult.forInitializationFailure(newExecutionGraphInfo(
                                                failedArchivedExecutionGraphFactory.apply(
                                                        jobInitializationException)),
                                        jobInitializationException));}else{registerJobMasterServiceFutures(jobMasterService);}});}
@OverridepublicCompletableFuture<JobMasterService>createJobMasterService(UUID leaderSessionId,OnCompletionActions onCompletionActions){returnCompletableFuture.supplyAsync(FunctionUtils.uncheckedSupplier(// 内部构建JobMaster 进入internalCreateJobMasterService()()->internalCreateJobMasterService(leaderSessionId, onCompletionActions)),
                executor);}

JobMaster的初始化和启动

privateJobMasterServiceinternalCreateJobMasterService(UUID leaderSessionId,OnCompletionActions onCompletionActions)throwsException{finalJobMaster jobMaster =newJobMaster(
                        rpcService,JobMasterId.fromUuidOrNull(leaderSessionId),
                        jobMasterConfiguration,ResourceID.generate(),
                        jobGraph,
                        haServices,
                        slotPoolServiceSchedulerFactory,
                        jobManagerSharedServices,
                        heartbeatServices,
                        jobManagerJobMetricGroupFactory,
                        onCompletionActions,
                        fatalErrorHandler,
                        userCodeClassloader,
                        shuffleMaster,
                        lookup ->newJobMasterPartitionTrackerImpl(
                                        jobGraph.getJobID(), shuffleMaster, lookup),newDefaultExecutionDeploymentTracker(),DefaultExecutionDeploymentReconciler::new,
                        initializationTimestamp);// JobMaster继承自Endpoint 所以在初始化完成后会回调JobMaster的onStart()方法
        jobMaster.start();return jobMaster;}

进入JobMaster的构造方法

/**
        1.向ResourceManager注册并保持心跳连接
        2.解析JobGraph 得到ExecutionGraph 由之前的图不难看出ExecutionGraph就是JobGraph的并行化版本
        3.JobMaster负责向ResourceManager去申请slot(一个slot启动一个streamTask)
        4.派发任务运行并监控他们的状态
        5.维持JobMaster和streamTask之间的心跳
        6.JobMaster还需要进行zookeeper的相关操作
        
    */publicJobMaster(RpcService rpcService,JobMasterId jobMasterId,JobMasterConfiguration jobMasterConfiguration,ResourceID resourceId,JobGraph jobGraph,HighAvailabilityServices highAvailabilityService,SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory,JobManagerSharedServices jobManagerSharedServices,HeartbeatServices heartbeatServices,JobManagerJobMetricGroupFactory jobMetricGroupFactory,OnCompletionActions jobCompletionActions,FatalErrorHandler fatalErrorHandler,ClassLoader userCodeLoader,ShuffleMaster<?> shuffleMaster,PartitionTrackerFactory partitionTrackerFactory,ExecutionDeploymentTracker executionDeploymentTracker,ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory,long initializationTimestamp)throwsException{// 开启RPC服务super(rpcService,AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME), jobMasterId);finalExecutionDeploymentReconciliationHandler executionStateReconciliationHandler =newExecutionDeploymentReconciliationHandler(){@OverridepublicvoidonMissingDeploymentsOf(Collection<ExecutionAttemptID> executionAttemptIds,ResourceID host){
                        log.debug("Failing deployments {} due to no longer being deployed.",
                                executionAttemptIds);for(ExecutionAttemptID executionAttemptId : executionAttemptIds){
                            schedulerNG.updateTaskExecutionState(newTaskExecutionState(
                                            executionAttemptId,ExecutionState.FAILED,newFlinkException(String.format("Execution %s is unexpectedly no longer running on task executor %s.",
                                                            executionAttemptId, host))));}}@OverridepublicvoidonUnknownDeploymentsOf(Collection<ExecutionAttemptID> executionAttemptIds,ResourceID host){
                        log.debug("Canceling left-over deployments {} on task executor {}.",
                                executionAttemptIds,
                                host);for(ExecutionAttemptID executionAttemptId : executionAttemptIds){Tuple2<TaskManagerLocation,TaskExecutorGateway> taskManagerInfo =
                                    registeredTaskManagers.get(host);if(taskManagerInfo !=null){
                                taskManagerInfo.f1.cancelTask(executionAttemptId, rpcTimeout);}}}};this.executionDeploymentTracker = executionDeploymentTracker;this.executionDeploymentReconciler =
                executionDeploymentReconcilerFactory.create(executionStateReconciliationHandler);this.jobMasterConfiguration =checkNotNull(jobMasterConfiguration);this.resourceId =checkNotNull(resourceId);// 保存JobGraph到JobMasterthis.jobGraph =checkNotNull(jobGraph);this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();this.highAvailabilityServices =checkNotNull(highAvailabilityService);this.blobWriter = jobManagerSharedServices.getBlobWriter();this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();this.jobCompletionActions =checkNotNull(jobCompletionActions);this.fatalErrorHandler =checkNotNull(fatalErrorHandler);this.userCodeLoader =checkNotNull(userCodeLoader);this.initializationTimestamp = initializationTimestamp;this.retrieveTaskManagerHostName =
                jobMasterConfiguration
                        .getConfiguration().getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);finalString jobName = jobGraph.getName();finalJobID jid = jobGraph.getJobID();

        log.info("Initializing job {} ({}).", jobName, jid);// ResourceManager leader地址接收器
        resourceManagerLeaderRetriever =
                highAvailabilityServices.getResourceManagerLeaderRetriever();// 创建slotPoolService:负责该job的Slot的申请和释放等slot的管理工作this.slotPoolService =checkNotNull(slotPoolServiceSchedulerFactory).createSlotPoolService(jid);this.registeredTaskManagers =newHashMap<>(4);this.partitionTracker =checkNotNull(partitionTrackerFactory).create(
                                resourceID ->{Tuple2<TaskManagerLocation,TaskExecutorGateway>
                                            taskManagerInfo =
                                                    registeredTaskManagers.get(resourceID);if(taskManagerInfo ==null){returnOptional.empty();}returnOptional.of(taskManagerInfo.f1);});this.shuffleMaster =checkNotNull(shuffleMaster);this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);this.jobStatusListener =newJobManagerJobStatusListener();// todo 内部会将JobGraph变成ExecutionGraphthis.schedulerNG =createScheduler(
                        slotPoolServiceSchedulerFactory,
                        executionDeploymentTracker,
                        jobManagerJobMetricGroup,
                        jobStatusListener);this.heartbeatServices =checkNotNull(heartbeatServices);this.taskManagerHeartbeatManager =NoOpHeartbeatManager.getInstance();this.resourceManagerHeartbeatManager =NoOpHeartbeatManager.getInstance();this.resourceManagerConnection =null;this.establishedResourceManagerConnection =null;this.accumulators =newHashMap<>();}

进入createScheduler()

privateSchedulerNGcreateScheduler(SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory,ExecutionDeploymentTracker executionDeploymentTracker,JobManagerJobMetricGroup jobManagerJobMetricGroup,JobStatusListener jobStatusListener)throwsException{finalSchedulerNG scheduler =// 进入createSheduler()
                slotPoolServiceSchedulerFactory.createScheduler(
                        log,
                        jobGraph,
                        scheduledExecutorService,
                        jobMasterConfiguration.getConfiguration(),
                        slotPoolService,
                        scheduledExecutorService,
                        userCodeLoader,
                        highAvailabilityServices.getCheckpointRecoveryFactory(),
                        rpcTimeout,
                        blobWriter,
                        jobManagerJobMetricGroup,
                        jobMasterConfiguration.getSlotRequestTimeout(),
                        shuffleMaster,
                        partitionTracker,
                        executionDeploymentTracker,
                        initializationTimestamp,getMainThreadExecutor(),
                        fatalErrorHandler,
                        jobStatusListener);return scheduler;}

在SlotPoolServiceSchedulerFactory接口中

SchedulerNGcreateScheduler(Logger log,JobGraph jobGraph,ScheduledExecutorService scheduledExecutorService,Configuration configuration,SlotPoolService slotPoolService,ScheduledExecutorService executorService,ClassLoader userCodeLoader,CheckpointRecoveryFactory checkpointRecoveryFactory,Time rpcTimeout,BlobWriter blobWriter,JobManagerJobMetricGroup jobManagerJobMetricGroup,Time slotRequestTimeout,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,ExecutionDeploymentTracker executionDeploymentTracker,long initializationTimestamp,ComponentMainThreadExecutor mainThreadExecutor,FatalErrorHandler fatalErrorHandler,JobStatusListener jobStatusListener)throwsException;}
publicSchedulerNGcreateScheduler(Logger log,JobGraph jobGraph,ScheduledExecutorService scheduledExecutorService,Configuration configuration,SlotPoolService slotPoolService,ScheduledExecutorService executorService,ClassLoader userCodeLoader,CheckpointRecoveryFactory checkpointRecoveryFactory,Time rpcTimeout,BlobWriter blobWriter,JobManagerJobMetricGroup jobManagerJobMetricGroup,Time slotRequestTimeout,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,ExecutionDeploymentTracker executionDeploymentTracker,long initializationTimestamp,ComponentMainThreadExecutor mainThreadExecutor,FatalErrorHandler fatalErrorHandler,JobStatusListener jobStatusListener)throwsException{// 进入createInstance()return schedulerNGFactory.createInstance(
                log,
                jobGraph,
                scheduledExecutorService,
                configuration,
                slotPoolService,
                executorService,
                userCodeLoader,
                checkpointRecoveryFactory,
                rpcTimeout,
                blobWriter,
                jobManagerJobMetricGroup,
                slotRequestTimeout,
                shuffleMaster,
                partitionTracker,
                executionDeploymentTracker,
                initializationTimestamp,
                mainThreadExecutor,
                fatalErrorHandler,
                jobStatusListener);}

进schedulerNGFactory.createInstance方法,选择DefaultSchedulerFactory实现

publicSchedulerNGcreateInstance(finalLogger log,finalJobGraph jobGraph,finalExecutor ioExecutor,finalConfiguration jobMasterConfiguration,finalSlotPoolService slotPoolService,finalScheduledExecutorService futureExecutor,finalClassLoader userCodeLoader,finalCheckpointRecoveryFactory checkpointRecoveryFactory,finalTime rpcTimeout,finalBlobWriter blobWriter,finalJobManagerJobMetricGroup jobManagerJobMetricGroup,finalTime slotRequestTimeout,finalShuffleMaster<?> shuffleMaster,finalJobMasterPartitionTracker partitionTracker,finalExecutionDeploymentTracker executionDeploymentTracker,long initializationTimestamp,finalComponentMainThreadExecutor mainThreadExecutor,finalFatalErrorHandler fatalErrorHandler,finalJobStatusListener jobStatusListener)throwsException{finalSlotPool slotPool =
                slotPoolService
                        .castInto(SlotPool.class).orElseThrow(()->newIllegalStateException("The DefaultScheduler requires a SlotPool."));finalDefaultSchedulerComponents schedulerComponents =createSchedulerComponents(
                        jobGraph.getJobType(),
                        jobGraph.isApproximateLocalRecoveryEnabled(),
                        jobMasterConfiguration,
                        slotPool,
                        slotRequestTimeout);finalRestartBackoffTimeStrategy restartBackoffTimeStrategy =RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
                                jobGraph.getSerializedExecutionConfig().deserializeValue(userCodeLoader).getRestartStrategy(),
                                jobMasterConfiguration,
                                jobGraph.isCheckpointingEnabled()).create();
        log.info("Using restart back off time strategy {} for {} ({}).",
                restartBackoffTimeStrategy,
                jobGraph.getName(),
                jobGraph.getJobID());finalExecutionGraphFactory executionGraphFactory =newDefaultExecutionGraphFactory(
                        jobMasterConfiguration,
                        userCodeLoader,
                        executionDeploymentTracker,
                        futureExecutor,
                        ioExecutor,
                        rpcTimeout,
                        jobManagerJobMetricGroup,
                        blobWriter,
                        shuffleMaster,
                        partitionTracker);// DefaultScheduler的构造方法,并进入父类的构造方法returnnewDefaultScheduler(
                log,
                jobGraph,
                ioExecutor,
                jobMasterConfiguration,
                schedulerComponents.getStartUpAction(),newScheduledExecutorServiceAdapter(futureExecutor),
                userCodeLoader,
                checkpointRecoveryFactory,
                jobManagerJobMetricGroup,
                schedulerComponents.getSchedulingStrategyFactory(),FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
                restartBackoffTimeStrategy,newDefaultExecutionVertexOperations(),newExecutionVertexVersioner(),
                schedulerComponents.getAllocatorFactory(),
                initializationTimestamp,
                mainThreadExecutor,
                jobStatusListener,
                executionGraphFactory);}

DefaultScheduler的构造方法,并进入父类的构造方法

publicSchedulerBase(finalLogger log,finalJobGraph jobGraph,finalExecutor ioExecutor,finalConfiguration jobMasterConfiguration,finalClassLoader userCodeLoader,finalCheckpointRecoveryFactory checkpointRecoveryFactory,finalJobManagerJobMetricGroup jobManagerJobMetricGroup,finalExecutionVertexVersioner executionVertexVersioner,long initializationTimestamp,finalComponentMainThreadExecutor mainThreadExecutor,finalJobStatusListener jobStatusListener,finalExecutionGraphFactory executionGraphFactory)throwsException{this.log =checkNotNull(log);this.jobGraph =checkNotNull(jobGraph);this.executionGraphFactory = executionGraphFactory;this.jobManagerJobMetricGroup =checkNotNull(jobManagerJobMetricGroup);this.executionVertexVersioner =checkNotNull(executionVertexVersioner);this.mainThreadExecutor = mainThreadExecutor;this.checkpointsCleaner =newCheckpointsCleaner();this.completedCheckpointStore =SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
                        jobGraph,
                        jobMasterConfiguration,
                        userCodeLoader,checkNotNull(checkpointRecoveryFactory),
                        log);this.checkpointIdCounter =SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
                        jobGraph,checkNotNull(checkpointRecoveryFactory));// JobGraph向ExecutionGraph转换// 此处入参没有JobGraph 本质上JobGraph已经是实例内部的一个成员变量了this.executionGraph =createAndRestoreExecutionGraph(
                        completedCheckpointStore,
                        checkpointsCleaner,
                        checkpointIdCounter,
                        initializationTimestamp,
                        mainThreadExecutor,
                        jobStatusListener);registerShutDownCheckpointServicesOnExecutionGraphTermination(executionGraph);this.schedulingTopology = executionGraph.getSchedulingTopology();

        stateLocationRetriever =
                executionVertexId ->getExecutionVertex(executionVertexId).getPreferredLocationBasedOnState();
        inputsLocationsRetriever =newExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);this.kvStateHandler =newKvStateHandler(executionGraph);this.executionGraphHandler =newExecutionGraphHandler(executionGraph, log, ioExecutor,this.mainThreadExecutor);this.operatorCoordinatorHandler =newDefaultOperatorCoordinatorHandler(executionGraph,this::handleGlobalFailure);
        operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor);this.exceptionHistory =newBoundedFIFOQueue<>(
                        jobMasterConfiguration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));}
// TODO JobGraph向ExecutionGraph的转换// TODO 此处入参没有JobGraph是因为JobGraph已经是实例内部的一个成员变量了this.executionGraph =createAndRestoreExecutionGraph(
                completedCheckpointStore,
                checkpointsCleaner,
                checkpointIdCounter,
                initializationTimestamp,
                mainThreadExecutor,
                jobStatusListener);
privateExecutionGraphcreateAndRestoreExecutionGraph(CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,long initializationTimestamp,ComponentMainThreadExecutor mainThreadExecutor,JobStatusListener jobStatusListener)throwsException{// 创建或恢复ExecutionGraph 进入createAndRestoreExecutionGraph方法finalExecutionGraph newExecutionGraph =
                executionGraphFactory.createAndRestoreExecutionGraph(
                        jobGraph,
                        completedCheckpointStore,
                        checkpointsCleaner,
                        checkpointIdCounter,TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
                                jobGraph.getJobType()),
                        initializationTimestamp,newDefaultVertexAttemptNumberStore(),computeVertexParallelismStore(jobGraph),
                        log);

        newExecutionGraph.setInternalTaskFailuresListener(newUpdateSchedulerNgOnInternalFailuresListener(this));
        newExecutionGraph.registerJobStatusListener(jobStatusListener);
        newExecutionGraph.start(mainThreadExecutor);return newExecutionGraph;}

进入createAndRestoreExecutionGraph方法 在ExecutionGraphFactory接口内

ExecutionGraphcreateAndRestoreExecutionGraph(JobGraph jobGraph,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,long initializationTimestamp,VertexAttemptNumberStore vertexAttemptNumberStore,VertexParallelismStore vertexParallelismStore,Logger log)throwsException;}

具体实现:

@OverridepublicExecutionGraphcreateAndRestoreExecutionGraph(JobGraph jobGraph,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,long initializationTimestamp,VertexAttemptNumberStore vertexAttemptNumberStore,VertexParallelismStore vertexParallelismStore,Logger log)throwsException{ExecutionDeploymentListener executionDeploymentListener =newExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);ExecutionStateUpdateListener executionStateUpdateListener =(execution, newState)->{if(newState.isTerminal()){
                        executionDeploymentTracker.stopTrackingDeploymentOf(execution);}};// 将JobGraph 转成成ExecutionGraph 进入builderGraph()方法finalExecutionGraph newExecutionGraph =DefaultExecutionGraphBuilder.buildGraph(
                        jobGraph,
                        configuration,
                        futureExecutor,
                        ioExecutor,
                        userCodeClassLoader,
                        completedCheckpointStore,
                        checkpointsCleaner,
                        checkpointIdCounter,
                        rpcTimeout,
                        jobManagerJobMetricGroup,
                        blobWriter,
                        log,
                        shuffleMaster,
                        jobMasterPartitionTracker,
                        partitionLocationConstraint,
                        executionDeploymentListener,
                        executionStateUpdateListener,
                        initializationTimestamp,
                        vertexAttemptNumberStore,
                        vertexParallelismStore);// 恢复ExecutionGraphfinalCheckpointCoordinator checkpointCoordinator =
                newExecutionGraph.getCheckpointCoordinator();if(checkpointCoordinator !=null){// check whether we find a valid checkpointif(!checkpointCoordinator.restoreInitialCheckpointIfPresent(newHashSet<>(newExecutionGraph.getAllVertices().values()))){// check whether we can restore from a savepointtryRestoreExecutionGraphFromSavepoint(
                        newExecutionGraph, jobGraph.getSavepointRestoreSettings());}}return newExecutionGraph;}

进入DefaultExecutionGraphBuilder.buildGraph()

ExecutionGraph空壳的初始化

publicstaticDefaultExecutionGraphbuildGraph(JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,ClassLoader classLoader,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,Time rpcTimeout,MetricGroup metrics,BlobWriter blobWriter,Logger log,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,ExecutionDeploymentListener executionDeploymentListener,ExecutionStateUpdateListener executionStateUpdateListener,long initializationTimestamp,VertexAttemptNumberStore vertexAttemptNumberStore,VertexParallelismStore vertexParallelismStore)throwsJobExecutionException,JobException{checkNotNull(jobGraph,"job graph cannot be null");finalString jobName = jobGraph.getName();finalJobID jobId = jobGraph.getJobID();finalJobInformation jobInformation =newJobInformation(
                        jobId,
                        jobName,
                        jobGraph.getSerializedExecutionConfig(),
                        jobGraph.getJobConfiguration(),
                        jobGraph.getUserJarBlobKeys(),
                        jobGraph.getClasspaths());finalint maxPriorAttemptsHistoryLength =
                jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);finalPartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
                        jobManagerConfig);// create a new execution graph, if none exists so farfinalDefaultExecutionGraph executionGraph;try{// 开始初始化ExecutionGraph的空壳
            executionGraph =newDefaultExecutionGraph(
                            jobInformation,
                            futureExecutor,
                            ioExecutor,
                            rpcTimeout,
                            maxPriorAttemptsHistoryLength,
                            classLoader,
                            blobWriter,
                            partitionReleaseStrategyFactory,
                            shuffleMaster,
                            partitionTracker,
                            partitionLocationConstraint,
                            executionDeploymentListener,
                            executionStateUpdateListener,
                            initializationTimestamp,
                            vertexAttemptNumberStore,
                            vertexParallelismStore);}catch(IOException e){thrownewJobException("Could not create the ExecutionGraph.", e);}// set the basic propertiestry{// 将JobGraph变成Json形式
            executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));}catch(Throwable t){
            log.warn("Cannot create JSON plan for job", t);// give the graph an empty plan
            executionGraph.setJsonPlan("{}");}// initialize the vertices that have a master initialization hook// file output formats create directories here, input formats create splitsfinallong initMasterStart =System.nanoTime();
        log.info("Running initialization on master for job {} ({}).", jobName, jobId);// 遍历Jobgraph中的所有端点,看是否有启动类for(JobVertex vertex : jobGraph.getVertices()){String executableClass = vertex.getInvokableClassName();if(executableClass ==null|| executableClass.isEmpty()){thrownewJobSubmissionException(
                        jobId,"The vertex "+ vertex.getID()+" ("+ vertex.getName()+") has no invokable class.");}try{
                vertex.initializeOnMaster(classLoader);}catch(Throwable t){thrownewJobExecutionException(
                        jobId,"Cannot initialize task '"+ vertex.getName()+"': "+ t.getMessage(),
                        t);}}

        log.info("Successfully ran initialization on master in {} ms.",(System.nanoTime()- initMasterStart)/1_000_000);// topologically sort the job vertices and attach the graph to the existing one    // 按照顺序将JobGraph中的端点放入集合中List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();if(log.isDebugEnabled()){
            log.debug("Adding {} vertices from job graph {} ({}).",
                    sortedTopology.size(),
                    jobName,
                    jobId);}// 最重要的工作,生成ExecutionJobVertex,以及并行化,根据并行度生成多个ExecutionVertex
        executionGraph.attachJobGraph(sortedTopology);if(log.isDebugEnabled()){
            log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);}// configure the state checkpointing// 解析checkpoint参数,构建checkpoint相关组件if(isCheckpointingEnabled(jobGraph)){JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();// Maximum number of remembered checkpointsint historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);CheckpointStatsTracker checkpointStatsTracker =newCheckpointStatsTracker(
                            historySize,
                            snapshotSettings.getCheckpointCoordinatorConfiguration(),
                            metrics);// load the state backend from the application settingsfinalStateBackend applicationConfiguredBackend;finalSerializedValue<StateBackend> serializedAppConfigured =
                    snapshotSettings.getDefaultStateBackend();if(serializedAppConfigured ==null){
                applicationConfiguredBackend =null;}else{try{
                    applicationConfiguredBackend =
                            serializedAppConfigured.deserializeValue(classLoader);}catch(IOException|ClassNotFoundException e){thrownewJobExecutionException(
                            jobId,"Could not deserialize application-defined state backend.", e);}}finalStateBackend rootBackend;try{
                rootBackend =StateBackendLoader.fromApplicationOrConfigOrDefault(
                                applicationConfiguredBackend, jobManagerConfig, classLoader, log);}catch(IllegalConfigurationException|IOException|DynamicCodeLoadingException e){thrownewJobExecutionException(
                        jobId,"Could not instantiate configured state backend", e);}// load the checkpoint storage from the application settingsfinalCheckpointStorage applicationConfiguredStorage;finalSerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
                    snapshotSettings.getDefaultCheckpointStorage();if(serializedAppConfiguredStorage ==null){
                applicationConfiguredStorage =null;}else{try{
                    applicationConfiguredStorage =
                            serializedAppConfiguredStorage.deserializeValue(classLoader);}catch(IOException|ClassNotFoundException e){thrownewJobExecutionException(
                            jobId,"Could not deserialize application-defined checkpoint storage.",
                            e);}}finalCheckpointStorage rootStorage;try{
                rootStorage =CheckpointStorageLoader.load(
                                applicationConfiguredStorage,null,
                                rootBackend,
                                jobManagerConfig,
                                classLoader,
                                log);}catch(IllegalConfigurationException|DynamicCodeLoadingException e){thrownewJobExecutionException(
                        jobId,"Could not instantiate configured checkpoint storage", e);}// instantiate the user-defined checkpoint hooksfinalSerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
                    snapshotSettings.getMasterHooks();finalList<MasterTriggerRestoreHook<?>> hooks;if(serializedHooks ==null){
                hooks =Collections.emptyList();}else{finalMasterTriggerRestoreHook.Factory[] hookFactories;try{
                    hookFactories = serializedHooks.deserializeValue(classLoader);}catch(IOException|ClassNotFoundException e){thrownewJobExecutionException(
                            jobId,"Could not instantiate user-defined checkpoint hooks", e);}finalThread thread =Thread.currentThread();finalClassLoader originalClassLoader = thread.getContextClassLoader();
                thread.setContextClassLoader(classLoader);try{
                    hooks =newArrayList<>(hookFactories.length);for(MasterTriggerRestoreHook.Factory factory : hookFactories){
                        hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}}finally{
                    thread.setContextClassLoader(originalClassLoader);}}finalCheckpointCoordinatorConfiguration chkConfig =
                    snapshotSettings.getCheckpointCoordinatorConfiguration();

            executionGraph.enableCheckpointing(
                    chkConfig,
                    hooks,
                    checkpointIdCounter,
                    completedCheckpointStore,
                    rootBackend,
                    rootStorage,
                    checkpointStatsTracker,
                    checkpointsCleaner);}// create all the metrics for the Execution Graph

        metrics.gauge(RestartTimeGauge.METRIC_NAME,newRestartTimeGauge(executionGraph));
        metrics.gauge(DownTimeGauge.METRIC_NAME,newDownTimeGauge(executionGraph));
        metrics.gauge(UpTimeGauge.METRIC_NAME,newUpTimeGauge(executionGraph));return executionGraph;}

ExecutionGraph初始化小结:

1.获取JobGraph拿到job的相关信息
2.初始化一个空的ExecutionGraph对象
3.将JobGraph变成json形式
4.判断JobGraph中所有的顶点是否包含启动类
5.按照顺序将JobGraph的顶点添加进集合中去
6.根据JobVertex以及算子并行度设置生成ExecutionVertex
7.解析checkpoint参数,构建checkpoint相关组件
8.为ExecutionGraph构建监控指标
最为核心的便是JobGraph中的JobVertex转化为ExecutionVertex的过程,我们点进下面这段代码中:

// TODO 最重要的工作,生成executionJobVertex,以及并行化,根据并行度生成多个ExecutionVertex
executionGraph.attachJobGraph(sortedTopology);

进入attachJobGraph()

@OverridepublicvoidattachJobGraph(List<JobVertex> topologiallySorted)throwsJobException{assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} "+"vertices and {} intermediate results.",
                topologiallySorted.size(),
                tasks.size(),
                intermediateResults.size());finalArrayList<ExecutionJobVertex> newExecJobVertices =newArrayList<>(topologiallySorted.size());finallong createTimestamp =System.currentTimeMillis();// 遍历JobGraph的端点集合for(JobVertex jobVertex : topologiallySorted){if(jobVertex.isInputVertex()&&!jobVertex.isStoppable()){this.isStoppable =false;}VertexParallelismInformation parallelismInfo =
                    parallelismStore.getParallelismInfo(jobVertex.getID());// create the execution job vertex and attach it to the graph// 为每个JobVertex生成一个ExecutionJobVertex// 进入ExecutionJobVertex查看构建构建过程ExecutionJobVertex ejv =newExecutionJobVertex(this,
                            jobVertex,
                            maxPriorAttemptsHistoryLength,
                            rpcTimeout,
                            createTimestamp,
                            parallelismInfo,
                            initialAttemptCounts.getAttemptCounts(jobVertex.getID()));// 高版本的flink(>=1.13)中ExecutionEdge被优化,由ConsumerPartionGroup 和 ConsumerVertexGroup来替代
            ejv.connectToPredecessors(this.intermediateResults);ExecutionJobVertex previousTask =this.tasks.putIfAbsent(jobVertex.getID(), ejv);if(previousTask !=null){thrownewJobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
                                jobVertex.getID(), ejv, previousTask));}for(IntermediateResult res : ejv.getProducedDataSets()){IntermediateResult previousDataSet =this.intermediateResults.putIfAbsent(res.getId(), res);if(previousDataSet !=null){thrownewJobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
                                    res.getId(), res, previousDataSet));}}this.verticesInCreationOrder.add(ejv);this.numVerticesTotal += ejv.getParallelism();
            newExecJobVertices.add(ejv);}registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);// the topology assigning should happen before notifying new vertices to failoverStrategy
        executionTopology =DefaultExecutionTopology.fromExecutionGraph(this);

        partitionReleaseStrategy =
                partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());}

1.遍历所有JobGraph中的顶点JobVertex
2.为每个JobVextex生成一个ExecutionJobVertex
3.为每个JobVertex构建ConsumedPartitionGroup和ConsumedVertexGroup

ExecutionVertex的构建

publicExecutionJobVertex(InternalExecutionGraphAccessor graph,JobVertex jobVertex,int maxPriorAttemptsHistoryLength,Time timeout,long createTimestamp,VertexParallelismInformation parallelismInfo,SubtaskAttemptNumberStore initialAttemptCounts)throwsJobException{if(graph ==null|| jobVertex ==null){thrownewNullPointerException();}this.graph = graph;this.jobVertex = jobVertex;// 获取算子并行度信息this.parallelismInfo = parallelismInfo;// verify that our parallelism is not higher than the maximum parallelismif(this.parallelismInfo.getParallelism()>this.parallelismInfo.getMaxParallelism()){thrownewJobException(String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",
                            jobVertex.getName(),this.parallelismInfo.getParallelism(),this.parallelismInfo.getMaxParallelism()));}this.resourceProfile =ResourceProfile.fromResourceSpec(jobVertex.getMinResources(),MemorySize.ZERO);this.taskVertices =newExecutionVertex[this.parallelismInfo.getParallelism()];this.inputs =newArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup =checkNotNull(jobVertex.getSlotSharingGroup());this.coLocationGroup = jobVertex.getCoLocationGroup();// create the intermediate results// 通过JobVertex的IntermediateDataSets数量 初始化intermediateResult的空集合this.producedDataSets =newIntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];// 对IntermediateDataSet构建一个IntermediateDataSetsfor(int i =0; i < jobVertex.getProducedDataSets().size(); i++){finalIntermediateDataSet result = jobVertex.getProducedDataSets().get(i);// 构建IntermediateResultthis.producedDataSets[i]=newIntermediateResult(
                            result.getId(),this,this.parallelismInfo.getParallelism(),
                            result.getResultType());}// create all task vertices// 根据并行度生成对应数量的ExecutionVertexfor(int i =0; i <this.parallelismInfo.getParallelism(); i++){ExecutionVertex vertex =newExecutionVertex(this,
                            i,
                            producedDataSets,
                            timeout,
                            createTimestamp,
                            maxPriorAttemptsHistoryLength,
                            initialAttemptCounts.getAttemptCount(i));this.taskVertices[i]= vertex;}// sanity check for the double referencing between intermediate result partitions and// execution verticesfor(IntermediateResult ir :this.producedDataSets){if(ir.getNumberOfAssignedPartitions()!=this.parallelismInfo.getParallelism()){thrownewRuntimeException("The intermediate result's partitions were not correctly assigned.");}}finalList<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =getJobVertex().getOperatorCoordinators();if(coordinatorProviders.isEmpty()){this.operatorCoordinators =Collections.emptyList();}else{finalArrayList<OperatorCoordinatorHolder> coordinators =newArrayList<>(coordinatorProviders.size());try{for(finalSerializedValue<OperatorCoordinator.Provider> provider :
                        coordinatorProviders){
                    coordinators.add(OperatorCoordinatorHolder.create(
                                    provider,this, graph.getUserClassLoader()));}}catch(Exception|LinkageError e){IOUtils.closeAllQuietly(coordinators);thrownewJobException("Cannot instantiate the coordinator for operator "+getName(), e);}this.operatorCoordinators =Collections.unmodifiableList(coordinators);}// set up the input splits, if the vertex has anytry{@SuppressWarnings("unchecked")InputSplitSource<InputSplit> splitSource =(InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();if(splitSource !=null){Thread currentThread =Thread.currentThread();ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
                currentThread.setContextClassLoader(graph.getUserClassLoader());try{
                    inputSplits =
                            splitSource.createInputSplits(this.parallelismInfo.getParallelism());if(inputSplits !=null){
                        splitAssigner = splitSource.getInputSplitAssigner(inputSplits);}}finally{
                    currentThread.setContextClassLoader(oldContextClassLoader);}}else{
                inputSplits =null;}}catch(Throwable t){thrownewJobException("Creating the input splits caused an error: "+ t.getMessage(), t);}}

1.获取并行度信息
2.通过JobVertex的IntermediateDataSet的数量初始化IntermediateResult集合
3.通过遍历JobGraph的IntermediateDataSet,对每一个IntermediateDataSet都构建一个IntermediateResult
4.根据并行度生成对应数量的ExecutionVertex
5.根据ExecutionVertex的数量来切分输入

查看根据并行度生成对应数量的ExecutionVertex

for(int i =0; i <this.parallelismInfo.getParallelism(); i++){ExecutionVertex vertex =newExecutionVertex(this,
                            i,
                            producedDataSets,
                            timeout,
                            createTimestamp,
                            maxPriorAttemptsHistoryLength,
                            initialAttemptCounts.getAttemptCount(i));this.taskVertices[i]= vertex;}

查看ExecutionVertex的构造方法

publicExecutionVertex(ExecutionJobVertex jobVertex,int subTaskIndex,IntermediateResult[] producedDataSets,Time timeout,long createTimestamp,int maxPriorExecutionHistoryLength,int initialAttemptCount){this.jobVertex = jobVertex;this.subTaskIndex = subTaskIndex;this.executionVertexId =newExecutionVertexID(jobVertex.getJobVertexId(), subTaskIndex);this.taskNameWithSubtask =String.format("%s (%d/%d)",
                        jobVertex.getJobVertex().getName(),
                        subTaskIndex +1,
                        jobVertex.getParallelism());this.resultPartitions =newLinkedHashMap<>(producedDataSets.length,1);for(IntermediateResult result : producedDataSets){IntermediateResultPartition irp =newIntermediateResultPartition(
                            result,this,
                            subTaskIndex,getExecutionGraphAccessor().getEdgeManager());
            result.setPartition(subTaskIndex, irp);

            resultPartitions.put(irp.getPartitionId(), irp);}this.priorExecutions =newEvictingBoundedList<>(maxPriorExecutionHistoryLength);/** ExecutionVertex 和 Execution
            JobMaster把要部署的Task的必要信息都封装到一个对象中,然后发送给TaskExecutor
            TaskExecutor接收到这个对象的时候,再次封装得到一个Task对象
            
            部署:
            JobMaster在拿到一个对应节点上的slot资源的时候,把要部署的Task的必要信息,都封装成Execution
            然后执行Execution的deploy()执行部署
            在deploy()方法的内部就会调用PRC请求,把必要的信息发送给TaskExecutor
            TaskExecutor在接受到这些必要的信息的时候,把这些信息封装成一个Task对象 然后启动这个Task就完成了部署

        */this.currentExecution =newExecution(getExecutionGraphAccessor().getFutureExecutor(),this,
                        initialAttemptCount,
                        createTimestamp,
                        timeout);getExecutionGraphAccessor().registerExecution(currentExecution);this.timeout = timeout;this.inputSplits =newArrayList<>();}

ExecutionVertex的构造方法里主要做了两件事儿
1.根据并行度对IntermediateResult进行分区,生成IntermediateResultPartition

2.对Task所需要的一些必要信息进行封装,封装为一个Exection

JobMaster在拿到一个对应节点上的Slot资源的时候,会把要部署的Task的必要信息,都封装成一个Execution,然后执行Execution的deploy()方法执行部署。在该方法内部会调用RPC请求,把必要的信息发送给TaskExecutor。TaskExecutor在接收到这些必要信息的时候,再把这些信息封装成一个Task对象,然后启动这个Task就完成了部署

ConsumedPartitionGroup和ConsumedVertexGroup的构建

ConsumedPartitionGroup和ConsumedVertexGroup的初始化流程在executionGraph.attachJobGraph方法,点击进入ejv.connectToPredecessors(this.intermediateResults) 方法内。

publicvoidconnectToPredecessors(Map<IntermediateDataSetID,IntermediateResult> intermediateDataSets)throwsJobException{// 获取JobVertex的输入List<JobEdge> inputs = jobVertex.getInputs();if(LOG.isDebugEnabled()){LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.",
                            jobVertex.getID(), jobVertex.getName(), inputs.size()));}for(int num =0; num < inputs.size(); num++){JobEdge edge = inputs.get(num);if(LOG.isDebugEnabled()){if(edge.getSource()==null){LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
                                    num,
                                    jobVertex.getID(),
                                    jobVertex.getName(),
                                    edge.getSourceId()));}else{LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
                                    num,
                                    jobVertex.getID(),
                                    jobVertex.getName(),
                                    edge.getSource().getProducer().getID(),
                                    edge.getSource().getProducer().getName()));}}// fetch the intermediate result via ID. if it does not exist, then it either has not// been created, or the order// in which this method is called for the job vertices is not a topological orderIntermediateResult ires = intermediateDataSets.get(edge.getSourceId());if(ires ==null){thrownewJobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "+ edge.getSourceId());}this.inputs.add(ires);// 构建ConsumedPartitionGroup和ConsumedVertexGroupEdgeManagerBuildUtil.connectVertexToResult(this, ires, edge.getDistributionPattern());}}

1.获取JobVertex的输入
2.根据输入获取到的Jobvertex的输入边JobEdge
3.对每一条输入边都来获取IntermediateDataSets中对应的中间结果数据IntermediateResult
4.根据IntermediateResult来构建ConsumedPartitionGroup和ConsumedVertexGroup
进入EdgeManagerBuildUtil.connectVertexToResult()

staticvoidconnectVertexToResult(ExecutionJobVertex vertex,IntermediateResult intermediateResult,DistributionPattern distributionPattern){switch(distributionPattern){casePOINTWISE:connectPointwise(vertex.getTaskVertices(), intermediateResult);break;caseALL_TO_ALL:connectAllToAll(vertex.getTaskVertices(), intermediateResult);break;default:thrownewIllegalArgumentException("Unrecognized distribution pattern.");}}

这里的switch匹配了三种模式:以all-to-all为例

privatestaticvoidconnectAllToAll(ExecutionVertex[] taskVertices,IntermediateResult intermediateResult){// todo 遍历 intermediateResultPartition 来构建ConsumedPartitionGroupConsumedPartitionGroup consumedPartitions =ConsumedPartitionGroup.fromMultiplePartitions(Arrays.stream(intermediateResult.getPartitions()).map(IntermediateResultPartition::getPartitionId).collect(Collectors.toList()));// 将ConsumedpartitionGroup连接到ExecutionVertexfor(ExecutionVertex ev : taskVertices){
            ev.addConsumedPartitionGroup(consumedPartitions);}// 将下游ExecutionVertex构建为ConsumerVertexGroupConsumerVertexGroup vertices =ConsumerVertexGroup.fromMultipleVertices(Arrays.stream(taskVertices).map(ExecutionVertex::getID).collect(Collectors.toList()));// 将每一个IntermediateResultPartition链接至下游ConsumerVertexGroupfor(IntermediateResultPartition partition : intermediateResult.getPartitions()){
            partition.addConsumers(vertices);}}

总结:
1.遍历IntermediateResult中所有分区IntermediateResultPartition,并将这些IntermediateResultPartition构建为一个ConsumedPartitionGroup对象
2.将当前IntermediaResult所有连接的下游ExecutionVextex都连接到这个IntermediateResult构建出的ConsumedPartitionGroup
3.将当前IntermediaResult锁连接的下一级ExecutionVextex都放入ConsumerVertexGroup对象中
4.将当前IntermediateResult的IntermediateResultPartition都连接到该ConsumerVertexGroup对象上

标签: flink 算法 大数据

本文转载自: https://blog.csdn.net/m0_43437171/article/details/136111562
版权归原作者 前兄如后背 所有, 如有侵权,请联系我们删除。

“Flink中ExecutionGraph的构建”的评论:

还没有评论