一、ExecutionGraph概述
在JobGraph向ExecutionGraph转化的过程中,主要的工作内容根据Operator的并行度来拆分JobVertext,每一个Jobvertex根据自身并行度会拆分成多个ExecutionVertex,使用IntermediateResultPartition对象来接收ExecutionVertex的输出。对于同一个ExecutionVertex的多个输出IntermediaResultPartition对象组成了一个IntermediateResult对象。1.12之后没有了ExecutionEdge 取而代之的是ConsumedPartitionGroup和ConsumedVertexGroup。
在flink的ExecutionGraph中,有一对一和多对多两种模式,当上游节点处于多对多模式时,会遍历所有的ExecutionGraph,时间复杂度为O(n的平方)。这将占用大量内存用于大规模作业。
由于同一ExecutionJobVertex中的ExecutionVertex都是由同一个JobVertext根据并行度划分而来,所以接收他们输出的IntermediaResultpartion的结构是相同的,同理,IntermediateResultPartition所连接的下游的ExecutionJobVertex所有的ExecutionVertex也是结构相同的,因此flink根据ExecutionVertex和IntermediaResultPartition进行分组:对于属于同一个ExecutionJobVertex的所有ExecutionVertex构成了一个ConsumerVertexGroup,所有对此ExecutionJobVertex的输入IntermediatePartition构成了一个ConsumerpaititionGroup。
由于ExecutionEdge被替换成看ConsumerPartitionGroup和ConsumedVertexGroup,所有相同结构分区都连接到同一个下游ConsumedVertexGroup。
二、JobMaster初始化
以session模式为例
publicJobClientexecuteAsync(StreamGraph streamGraph)throwsException{checkNotNull(streamGraph,"StreamGraph cannot be null.");checkNotNull(
configuration.get(DeploymentOptions.TARGET),"No execution.target specified in your configuration file.");finalPipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);checkNotNull(
executorFactory,"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));CompletableFuture<JobClient> jobClientFuture =
executorFactory
.getExecutor(configuration)// 进入execute()方法.execute(streamGraph, configuration, userClassloader);try{JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient,null));return jobClient;}catch(ExecutionException executionException){finalThrowable strippedException =ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(
jobListener -> jobListener.onJobSubmitted(null, strippedException));thrownewFlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
strippedException);}}
具体实现类AbstractSessionClusterExecutor per-job的具体实现类是 AbstractJobClusterExecutor
@OverridepublicCompletableFuture<JobClient>execute(@NonnullfinalPipeline pipeline,@NonnullfinalConfiguration configuration,@NonnullfinalClassLoader userCodeClassloader)throwsException{// 这里是将StreamGraph 构建完成 变为JobGraphfinalJobGraph jobGraph =PipelineExecutorUtils.getJobGraph(pipeline, configuration);// try(finalClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration)){finalClusterID clusterID = clusterClientFactory.getClusterId(configuration);checkState(clusterID !=null);// 创建RestClusterClient的Provider ClusterClientprovider// 初始化RestClient RestClient初始化的时候会创建netty客户端// 提交job的netty客户端就是 RestCLusterClient // 接收job的服务端就是jobManager中的WebMonitorEndpoint中启动的netty服务finalClusterClientProvider<ClusterID> clusterClientProvider =
clusterDescriptor.retrieve(clusterID);ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();// 这里进行提交 RestClusterClientreturn clusterClient
// 调用RestClient内部的netty客户端进行提交 进入submit()方法.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(
jobId ->{ClientUtils.waitUntilJobInitializationFinished(()-> clusterClient.getJobStatus(jobId).get(),()-> clusterClient.requestJobResult(jobId).get(),
userCodeClassloader);return jobId;})).thenApplyAsync(
jobID ->(JobClient)newClusterClientJobClientAdapter<>(
clusterClientProvider,
jobID,
userCodeClassloader)).whenCompleteAsync((ignored1, ignored2)-> clusterClient.close());}}
publicCompletableFuture<JobID>submitJob(@NonnullJobGraph jobGraph){CompletableFuture<java.nio.file.Path> jobGraphFileFuture =// 将jobGraph进行持久化成jobgraphFileCompletableFuture.supplyAsync(()->{try{finaljava.nio.file.Path jobGraphFile =// 持久化文件名就是 flink-jobgraph.bin 提交jobGraph到flink集群最终运行的就是这个文件,这个文件最终提交给了WebMonitor(JobSubmitHandler)接收请求来执行处理,JobSubmitHandler在执行处理的时候先进行文件的反序列化Files.createTempFile("flink-jobgraph",".bin");try(ObjectOutputStream objectOut =newObjectOutputStream(Files.newOutputStream(jobGraphFile))){
objectOut.writeObject(jobGraph);}return jobGraphFile;}catch(IOException e){thrownewCompletionException(newFlinkException("Failed to serialize JobGraph.", e));}},
executorService);// todo JobGraph进行持久化完成之后,将JobGraphFile添加进上传的文件列表CompletableFuture<Tuple2<JobSubmitRequestBody,Collection<FileUpload>>> requestFuture =
jobGraphFileFuture.thenApply(
jobGraphFile ->{List<String> jarFileNames =newArrayList<>(8);List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames =newArrayList<>(8);Collection<FileUpload> filesToUpload =newArrayList<>(8);// 将jobGraphFile假如待上传的文件列表
filesToUpload.add(newFileUpload(
jobGraphFile,RestConstants.CONTENT_TYPE_BINARY));for(Path jar : jobGraph.getUserJars()){
jarFileNames.add(jar.getName());// 上传job所需要的jar
filesToUpload.add(newFileUpload(Paths.get(jar.toUri()),RestConstants.CONTENT_TYPE_JAR));}for(Map.Entry<String,DistributedCache.DistributedCacheEntry>
artifacts : jobGraph.getUserArtifacts().entrySet()){finalPath artifactFilePath =newPath(artifacts.getValue().filePath);try{// Only local artifacts need to be uploaded.if(!artifactFilePath.getFileSystem().isDistributedFS()){
artifactFileNames.add(newJobSubmitRequestBody.DistributedCacheFile(
artifacts.getKey(),
artifactFilePath.getName()));
filesToUpload.add(newFileUpload(Paths.get(artifacts.getValue().filePath),RestConstants.CONTENT_TYPE_BINARY));}}catch(IOException e){thrownewCompletionException(newFlinkException("Failed to get the FileSystem of artifact "+ artifactFilePath
+".",
e));}}// 构建提交任务的请求体,包含对应的资源,JobGraphFile以及对应的jar包依赖finalJobSubmitRequestBody requestBody =newJobSubmitRequestBody(
jobGraphFile.getFileName().toString(),
jarFileNames,
artifactFileNames);// 返回一个tuple2 requestBody 和fileToUpload returnTuple2.of(
requestBody,Collections.unmodifiableCollection(filesToUpload));});finalCompletableFuture<JobSubmitResponseBody> submissionFuture =
requestFuture.thenCompose(
requestAndFileUploads ->sendRetriableRequest(JobSubmitHeaders.getInstance(),EmptyMessageParameters.getInstance(),
requestAndFileUploads.f0,
requestAndFileUploads.f1,isConnectionProblemOrServiceUnavailable()));
submissionFuture
.thenCombine(jobGraphFileFuture,(ignored, jobGraphFile)-> jobGraphFile).thenAccept(
jobGraphFile ->{try{Files.delete(jobGraphFile);}catch(IOException e){LOG.warn("Could not delete temporary file {}.", jobGraphFile, e);}});return submissionFuture
.thenApply(ignore -> jobGraph.getJobID()).exceptionally((Throwable throwable)->{thrownewCompletionException(newJobSubmissionException(
jobGraph.getJobID(),"Failed to submit JobGraph.",ExceptionUtils.stripCompletionException(throwable)));});}
// todo 反序列化得到jobGraph 并提交给DispatcherprotectedCompletableFuture<JobSubmitResponseBody>handleRequest(@NonnullHandlerRequest<JobSubmitRequestBody,EmptyMessageParameters> request,@NonnullDispatcherGateway gateway)throwsRestHandlerException{// 从请求体重获取jobGraphFile序列化文件finalCollection<File> uploadedFiles = request.getUploadedFiles();finalMap<String,Path> nameToFile =
uploadedFiles.stream().collect(Collectors.toMap(File::getName,Path::fromLocalFile));if(uploadedFiles.size()!= nameToFile.size()){thrownewRestHandlerException(String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
uploadedFiles.size()< nameToFile.size()?"lower":"higher",
nameToFile.size(),
uploadedFiles.size()),HttpResponseStatus.BAD_REQUEST);}// 获取请求体finalJobSubmitRequestBody requestBody = request.getRequestBody();if(requestBody.jobGraphFileName ==null){thrownewRestHandlerException(String.format("The %s field must not be omitted or be null.",JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),HttpResponseStatus.BAD_REQUEST);}// 将jobGraphFile反序列化得到jobGraph 也就是服务端接收客户端提交的jobGraphCompletableFuture<JobGraph> jobGraphFuture =loadJobGraph(requestBody, nameToFile);// 获取job的jarCollection<Path> jarFiles =getJarFilesToUpload(requestBody.jarFileNames, nameToFile);// 获取job的依赖jar包Collection<Tuple2<String,Path>> artifacts =getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);// 将JobGraph + job jar 和 依赖jar 上传至 BlobServerCompletableFuture<JobGraph> finalizedJobGraphFuture =uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);// 将封装好的 finalizedJobGraphFuture 上传至DispaterCompletableFuture<Acknowledge> jobSubmissionFuture =
finalizedJobGraphFuture.thenCompose(// 由JobSubmitHandler转交给Dispatcher来进行处理 gateway就是dispatcher的代理对象// 进入gateway.submitJob()
jobGraph -> gateway.submitJob(jobGraph, timeout));return jobSubmissionFuture.thenCombine(
jobGraphFuture,(ack, jobGraph)->newJobSubmitResponseBody("/jobs/"+ jobGraph.getJobID()));}privateCompletableFuture<JobGraph>loadJobGraph(JobSubmitRequestBody requestBody,Map<String,Path> nameToFile)throwsMissingFileException{finalPath jobGraphFile =getPathAndAssertUpload(
requestBody.jobGraphFileName,FILE_TYPE_JOB_GRAPH, nameToFile);returnCompletableFuture.supplyAsync(()->{JobGraph jobGraph;try(ObjectInputStream objectIn =newObjectInputStream(
jobGraphFile.getFileSystem().open(jobGraphFile))){
jobGraph =(JobGraph) objectIn.readObject();}catch(Exception e){thrownewCompletionException(newRestHandlerException("Failed to deserialize JobGraph.",HttpResponseStatus.BAD_REQUEST,
e));}return jobGraph;},
executor);}
Dispatcher接收JobGraph并初始化和启动JobMaster
进入gateway.submitJob() 具体实现类是Dispatcher类中的submitJob()
publicCompletableFuture<Acknowledge>submitJob(JobGraph jobGraph,Time timeout){
log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());try{// 判断jobId是否重复if(isDuplicateJob(jobGraph.getJobID())){returnFutureUtils.completedExceptionally(newDuplicateJobSubmissionException(jobGraph.getJobID()));}elseif(isPartialResourceConfigured(jobGraph)){returnFutureUtils.completedExceptionally(newJobSubmissionException(
jobGraph.getJobID(),"Currently jobs is not supported if parts of the vertices have "+"resources configured. The limitation will be removed in future versions."));}else{// 提交Job,此时JobGraph所需要的jar都已经上传完毕 JobGraph 会在启动JobMaster的时候 用来构建ExecutionGraph 进入 InternalSunbmitJob()方法returninternalSubmitJob(jobGraph);}}catch(FlinkException e){returnFutureUtils.completedExceptionally(e);}}
进入internalSubmitJob(jobGraph);
privateCompletableFuture<Acknowledge>internalSubmitJob(JobGraph jobGraph){
log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());finalCompletableFuture<Acknowledge> persistAndRunFuture =// todo 先持久化,然后运行(JobMaster) this::persistAndRunJobwaitForTerminatingJob(jobGraph.getJobID(), jobGraph,// 进入this::persistAndRunJob()方法this::persistAndRunJob).thenApply(ignored ->Acknowledge.get());return persistAndRunFuture.handleAsync((acknowledge, throwable)->{if(throwable !=null){cleanUpJobData(jobGraph.getJobID(),true);ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);finalThrowable strippedThrowable =ExceptionUtils.stripCompletionException(throwable);
log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);thrownewCompletionException(newJobSubmissionException(
jobGraph.getJobID(),"Failed to submit job.",
strippedThrowable));}else{return acknowledge;}},
ioExecutor);}
privatevoidpersistAndRunJob(JobGraph jobGraph)throwsException{// todo 服务端保存JobGraph此时是将JobGraph持久化到文件系统,比如HDFS 并且返回一个句柄,并将句柄状态保存在zookeeper中// 与此同时主节点在启动dispatcher时也会启动一个JobGraphStore服务,如果JobGraph里边有未执行完成的JobGraph,会先进行恢复// jobGraphWriter=DefaultGraphStore
jobGraphWriter.putJobGraph(jobGraph);runJob(jobGraph,ExecutionType.SUBMISSION);}privatevoidrunJob(JobGraph jobGraph,ExecutionType executionType)throwsException{Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));long initializationTimestamp =System.currentTimeMillis();// 创建JobManagerRunner启动器,内部会初始化DefaultJobMasterServiceProcessFactory对象// 创建JobMaster实例// 在创建JobMaster的时候同时会把JobGraph构建成ExectionGraphJobManagerRunner jobManagerRunner =createJobManagerRunner(jobGraph, initializationTimestamp);// flink集群有两个主从架构:// 1.资源管理器ResourceManager + taskExecutor// 2.任务运行 JobMaster + StreamTask
runningJobs.put(jobGraph.getJobID(), jobManagerRunner);finalJobID jobId = jobGraph.getJobID();finalCompletableFuture<CleanupJobState> cleanupJobStateFuture =
jobManagerRunner
.getResultFuture().handleAsync((jobManagerRunnerResult, throwable)->{Preconditions.checkState(
runningJobs.get(jobId)== jobManagerRunner,"The job entry in runningJobs must be bound to the lifetime of the JobManagerRunner.");if(jobManagerRunnerResult !=null){returnhandleJobManagerRunnerResult(
jobManagerRunnerResult, executionType);}else{returnjobManagerRunnerFailed(jobId, throwable);}},getMainThreadExecutor());finalCompletableFuture<Void> jobTerminationFuture =
cleanupJobStateFuture
.thenApply(cleanupJobState ->removeJob(jobId, cleanupJobState)).thenCompose(Function.identity());FutureUtils.assertNoException(jobTerminationFuture);registerJobManagerRunnerTerminationFuture(jobId, jobTerminationFuture);}
JobManagerRunnercreateJobManagerRunner(JobGraph jobGraph,long initializationTimestamp)throwsException{finalRpcService rpcService =getRpcService();// todo 构建JobManagerRunner 内部封装了一个DefaultJobMasterServiceProcessFactory// 此对象内部会在后面leader竞选完成后构建JobMaster并启动JobManagerRunner runner =
jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,newDefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler,
initializationTimestamp);// 开始进行JobMaster的选举,选举成功后会在ZookeeperLeaderElectionDriver的isLeader()创建JobMaster
runner.start();return runner;}
publicJobManagerRunnercreateJobManagerRunner(JobGraph jobGraph,Configuration configuration,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,HeartbeatServices heartbeatServices,JobManagerSharedServices jobManagerServices,JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,FatalErrorHandler fatalErrorHandler,long initializationTimestamp)throwsException{checkArgument(jobGraph.getNumberOfVertices()>0,"The given job is empty");finalJobMasterConfiguration jobMasterConfiguration =JobMasterConfiguration.fromConfiguration(configuration);finalRunningJobsRegistry runningJobsRegistry =
highAvailabilityServices.getRunningJobsRegistry();// todo 获取选举服务,准本进行JobMaster的leader选举finalLeaderElectionService jobManagerLeaderElectionService =
highAvailabilityServices.getJobManagerLeaderElectionService(jobGraph.getJobID());finalSlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory =DefaultSlotPoolServiceSchedulerFactory.fromConfiguration(
configuration, jobGraph.getJobType());if(jobMasterConfiguration.getConfiguration().get(JobManagerOptions.SCHEDULER_MODE)==SchedulerExecutionMode.REACTIVE){Preconditions.checkState(
slotPoolServiceSchedulerFactory.getSchedulerType()==JobManagerOptions.SchedulerType.Adaptive,"Adaptive Scheduler is required for reactive mode");}finalShuffleMaster<?> shuffleMaster =ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);finalLibraryCacheManager.ClassLoaderLease classLoaderLease =
jobManagerServices
.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID());finalClassLoader userCodeClassLoader =
classLoaderLease
.getOrResolveClassLoader(
jobGraph.getUserJarBlobKeys(), jobGraph.getClasspaths()).asClassLoader();// 构建DefaultJobMasterServiceFactory,封装了JobMaster启动所需的基础服务finalDefaultJobMasterServiceFactory jobMasterServiceFactory =newDefaultJobMasterServiceFactory(
jobManagerServices.getScheduledExecutorService(),
rpcService,
jobMasterConfiguration,
jobGraph,
highAvailabilityServices,
slotPoolServiceSchedulerFactory,
jobManagerServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
fatalErrorHandler,
userCodeClassLoader,
shuffleMaster,
initializationTimestamp);finalDefaultJobMasterServiceProcessFactory jobMasterServiceProcessFactory =newDefaultJobMasterServiceProcessFactory(
jobGraph.getJobID(),
jobGraph.getName(),
jobGraph.getCheckpointingSettings(),
initializationTimestamp,
jobMasterServiceFactory);returnnewJobMasterServiceLeadershipRunner(
jobMasterServiceProcessFactory,
jobManagerLeaderElectionService,
runningJobsRegistry,
classLoaderLease,
fatalErrorHandler);}
JobMaster的Leader的选举
在选举完成成功后则会回调当前类的isLeader方法,直接查看onGrantLeadership()
@OverridepublicvoidisLeader(){
leaderElectionEventHandler.onGrantLeadership();}
@Override@GuardedBy("lock")publicvoidonGrantLeadership(){synchronized(lock){if(running){
issuedLeaderSessionID =UUID.randomUUID();clearConfirmedLeaderInformation();if(LOG.isDebugEnabled()){LOG.debug("Grant leadership to contender {} with session ID {}.",
leaderContender.getDescription(),
issuedLeaderSessionID);}/**
leaderContender有四种情况
1.Dispatcher=DefaultDispatcherRuuner
2.JobMaster=JobMasterServiceLeadershipRunner
3.ResourceManager=ResourceManager
4.WebmonitorEndpoint=WebmonitorEndpoint
*/
leaderContender.grantLeadership(issuedLeaderSessionID);}else{if(LOG.isDebugEnabled()){LOG.debug("Ignoring the grant leadership notification since the {} has "+"already been closed.",
leaderElectionDriver);}}}}
选择JobMasterServiceLeadershipRunner实现
@OverridepublicvoidgrantLeadership(UUID leaderSessionID){runIfStateRunning(()->startJobMasterServiceProcessAsync(leaderSessionID),"starting a new JobMasterServiceProcess");}
进入startJobMasterServiceProcessAsync()
@GuardedBy("lock")privatevoidstartJobMasterServiceProcessAsync(UUID leaderSessionId){
sequentialOperation =
sequentialOperation.thenRun(// 校验leader状态()->runIfValidLeader(
leaderSessionId,ThrowingRunnable.unchecked(// 创建JobMaster并启动()->verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
leaderSessionId)),"verify job scheduling status and create JobMasterServiceProcess"));handleAsyncOperationError(sequentialOperation,"Could not start the job manager.");}
@GuardedBy("lock")privatevoidverifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)throwsFlinkException{finalRunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =getJobSchedulingStatus();if(jobSchedulingStatus ==RunningJobsRegistry.JobSchedulingStatus.DONE){jobAlreadyDone();}else{// 创建JobMaster并启动createNewJobMasterServiceProcess(leaderSessionId);}}
@GuardedBy("lock")privatevoidcreateNewJobMasterServiceProcess(UUID leaderSessionId)throwsFlinkException{Preconditions.checkState(jobMasterServiceProcess.closeAsync().isDone());LOG.debug("Create new JobMasterServiceProcess because we were granted leadership under {}.",
leaderSessionId);try{// 状态注册,标识当前job为Running状态
runningJobsRegistry.setJobRunning(getJobID());}catch(IOException e){thrownewFlinkException(String.format("Failed to set the job %s to running in the running jobs registry.",getJobID()),
e);}// 创建并启动JobMaster 进入create()
jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);forwardIfValidLeader(
leaderSessionId,
jobMasterServiceProcess.getJobMasterGatewayFuture(),
jobMasterGatewayFuture,"JobMasterGatewayFuture from JobMasterServiceProcess");forwardResultFuture(leaderSessionId, jobMasterServiceProcess.getResultFuture());confirmLeadership(leaderSessionId, jobMasterServiceProcess.getLeaderAddressFuture());}
@OverridepublicJobMasterServiceProcesscreate(UUID leaderSessionId){// 查看DefaultJobMasterServiceProcess的构造方法returnnewDefaultJobMasterServiceProcess(
jobId,
leaderSessionId,
jobMasterServiceFactory,
cause ->createArchivedExecutionGraph(JobStatus.FAILED, cause));}
publicDefaultJobMasterServiceProcess(JobID jobId,UUID leaderSessionId,JobMasterServiceFactory jobMasterServiceFactory,Function<Throwable,ArchivedExecutionGraph> failedArchivedExecutionGraphFactory){this.jobId = jobId;this.leaderSessionId = leaderSessionId;// 创建JobMasterthis.jobMasterServiceFuture =
jobMasterServiceFactory.createJobMasterService(leaderSessionId,this);
jobMasterServiceFuture.whenComplete((jobMasterService, throwable)->{if(throwable !=null){finalJobInitializationException jobInitializationException =newJobInitializationException(
jobId,"Could not start the JobMaster.", throwable);LOG.debug("Initialization of the JobMasterService for job {} under leader id {} failed.",
jobId,
leaderSessionId,
jobInitializationException);
resultFuture.complete(JobManagerRunnerResult.forInitializationFailure(newExecutionGraphInfo(
failedArchivedExecutionGraphFactory.apply(
jobInitializationException)),
jobInitializationException));}else{registerJobMasterServiceFutures(jobMasterService);}});}
@OverridepublicCompletableFuture<JobMasterService>createJobMasterService(UUID leaderSessionId,OnCompletionActions onCompletionActions){returnCompletableFuture.supplyAsync(FunctionUtils.uncheckedSupplier(// 内部构建JobMaster 进入internalCreateJobMasterService()()->internalCreateJobMasterService(leaderSessionId, onCompletionActions)),
executor);}
JobMaster的初始化和启动
privateJobMasterServiceinternalCreateJobMasterService(UUID leaderSessionId,OnCompletionActions onCompletionActions)throwsException{finalJobMaster jobMaster =newJobMaster(
rpcService,JobMasterId.fromUuidOrNull(leaderSessionId),
jobMasterConfiguration,ResourceID.generate(),
jobGraph,
haServices,
slotPoolServiceSchedulerFactory,
jobManagerSharedServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
onCompletionActions,
fatalErrorHandler,
userCodeClassloader,
shuffleMaster,
lookup ->newJobMasterPartitionTrackerImpl(
jobGraph.getJobID(), shuffleMaster, lookup),newDefaultExecutionDeploymentTracker(),DefaultExecutionDeploymentReconciler::new,
initializationTimestamp);// JobMaster继承自Endpoint 所以在初始化完成后会回调JobMaster的onStart()方法
jobMaster.start();return jobMaster;}
进入JobMaster的构造方法
/**
1.向ResourceManager注册并保持心跳连接
2.解析JobGraph 得到ExecutionGraph 由之前的图不难看出ExecutionGraph就是JobGraph的并行化版本
3.JobMaster负责向ResourceManager去申请slot(一个slot启动一个streamTask)
4.派发任务运行并监控他们的状态
5.维持JobMaster和streamTask之间的心跳
6.JobMaster还需要进行zookeeper的相关操作
*/publicJobMaster(RpcService rpcService,JobMasterId jobMasterId,JobMasterConfiguration jobMasterConfiguration,ResourceID resourceId,JobGraph jobGraph,HighAvailabilityServices highAvailabilityService,SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory,JobManagerSharedServices jobManagerSharedServices,HeartbeatServices heartbeatServices,JobManagerJobMetricGroupFactory jobMetricGroupFactory,OnCompletionActions jobCompletionActions,FatalErrorHandler fatalErrorHandler,ClassLoader userCodeLoader,ShuffleMaster<?> shuffleMaster,PartitionTrackerFactory partitionTrackerFactory,ExecutionDeploymentTracker executionDeploymentTracker,ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory,long initializationTimestamp)throwsException{// 开启RPC服务super(rpcService,AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME), jobMasterId);finalExecutionDeploymentReconciliationHandler executionStateReconciliationHandler =newExecutionDeploymentReconciliationHandler(){@OverridepublicvoidonMissingDeploymentsOf(Collection<ExecutionAttemptID> executionAttemptIds,ResourceID host){
log.debug("Failing deployments {} due to no longer being deployed.",
executionAttemptIds);for(ExecutionAttemptID executionAttemptId : executionAttemptIds){
schedulerNG.updateTaskExecutionState(newTaskExecutionState(
executionAttemptId,ExecutionState.FAILED,newFlinkException(String.format("Execution %s is unexpectedly no longer running on task executor %s.",
executionAttemptId, host))));}}@OverridepublicvoidonUnknownDeploymentsOf(Collection<ExecutionAttemptID> executionAttemptIds,ResourceID host){
log.debug("Canceling left-over deployments {} on task executor {}.",
executionAttemptIds,
host);for(ExecutionAttemptID executionAttemptId : executionAttemptIds){Tuple2<TaskManagerLocation,TaskExecutorGateway> taskManagerInfo =
registeredTaskManagers.get(host);if(taskManagerInfo !=null){
taskManagerInfo.f1.cancelTask(executionAttemptId, rpcTimeout);}}}};this.executionDeploymentTracker = executionDeploymentTracker;this.executionDeploymentReconciler =
executionDeploymentReconcilerFactory.create(executionStateReconciliationHandler);this.jobMasterConfiguration =checkNotNull(jobMasterConfiguration);this.resourceId =checkNotNull(resourceId);// 保存JobGraph到JobMasterthis.jobGraph =checkNotNull(jobGraph);this.rpcTimeout = jobMasterConfiguration.getRpcTimeout();this.highAvailabilityServices =checkNotNull(highAvailabilityService);this.blobWriter = jobManagerSharedServices.getBlobWriter();this.scheduledExecutorService = jobManagerSharedServices.getScheduledExecutorService();this.jobCompletionActions =checkNotNull(jobCompletionActions);this.fatalErrorHandler =checkNotNull(fatalErrorHandler);this.userCodeLoader =checkNotNull(userCodeLoader);this.initializationTimestamp = initializationTimestamp;this.retrieveTaskManagerHostName =
jobMasterConfiguration
.getConfiguration().getBoolean(JobManagerOptions.RETRIEVE_TASK_MANAGER_HOSTNAME);finalString jobName = jobGraph.getName();finalJobID jid = jobGraph.getJobID();
log.info("Initializing job {} ({}).", jobName, jid);// ResourceManager leader地址接收器
resourceManagerLeaderRetriever =
highAvailabilityServices.getResourceManagerLeaderRetriever();// 创建slotPoolService:负责该job的Slot的申请和释放等slot的管理工作this.slotPoolService =checkNotNull(slotPoolServiceSchedulerFactory).createSlotPoolService(jid);this.registeredTaskManagers =newHashMap<>(4);this.partitionTracker =checkNotNull(partitionTrackerFactory).create(
resourceID ->{Tuple2<TaskManagerLocation,TaskExecutorGateway>
taskManagerInfo =
registeredTaskManagers.get(resourceID);if(taskManagerInfo ==null){returnOptional.empty();}returnOptional.of(taskManagerInfo.f1);});this.shuffleMaster =checkNotNull(shuffleMaster);this.jobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);this.jobStatusListener =newJobManagerJobStatusListener();// todo 内部会将JobGraph变成ExecutionGraphthis.schedulerNG =createScheduler(
slotPoolServiceSchedulerFactory,
executionDeploymentTracker,
jobManagerJobMetricGroup,
jobStatusListener);this.heartbeatServices =checkNotNull(heartbeatServices);this.taskManagerHeartbeatManager =NoOpHeartbeatManager.getInstance();this.resourceManagerHeartbeatManager =NoOpHeartbeatManager.getInstance();this.resourceManagerConnection =null;this.establishedResourceManagerConnection =null;this.accumulators =newHashMap<>();}
进入createScheduler()
privateSchedulerNGcreateScheduler(SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory,ExecutionDeploymentTracker executionDeploymentTracker,JobManagerJobMetricGroup jobManagerJobMetricGroup,JobStatusListener jobStatusListener)throwsException{finalSchedulerNG scheduler =// 进入createSheduler()
slotPoolServiceSchedulerFactory.createScheduler(
log,
jobGraph,
scheduledExecutorService,
jobMasterConfiguration.getConfiguration(),
slotPoolService,
scheduledExecutorService,
userCodeLoader,
highAvailabilityServices.getCheckpointRecoveryFactory(),
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
jobMasterConfiguration.getSlotRequestTimeout(),
shuffleMaster,
partitionTracker,
executionDeploymentTracker,
initializationTimestamp,getMainThreadExecutor(),
fatalErrorHandler,
jobStatusListener);return scheduler;}
在SlotPoolServiceSchedulerFactory接口中
SchedulerNGcreateScheduler(Logger log,JobGraph jobGraph,ScheduledExecutorService scheduledExecutorService,Configuration configuration,SlotPoolService slotPoolService,ScheduledExecutorService executorService,ClassLoader userCodeLoader,CheckpointRecoveryFactory checkpointRecoveryFactory,Time rpcTimeout,BlobWriter blobWriter,JobManagerJobMetricGroup jobManagerJobMetricGroup,Time slotRequestTimeout,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,ExecutionDeploymentTracker executionDeploymentTracker,long initializationTimestamp,ComponentMainThreadExecutor mainThreadExecutor,FatalErrorHandler fatalErrorHandler,JobStatusListener jobStatusListener)throwsException;}
publicSchedulerNGcreateScheduler(Logger log,JobGraph jobGraph,ScheduledExecutorService scheduledExecutorService,Configuration configuration,SlotPoolService slotPoolService,ScheduledExecutorService executorService,ClassLoader userCodeLoader,CheckpointRecoveryFactory checkpointRecoveryFactory,Time rpcTimeout,BlobWriter blobWriter,JobManagerJobMetricGroup jobManagerJobMetricGroup,Time slotRequestTimeout,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,ExecutionDeploymentTracker executionDeploymentTracker,long initializationTimestamp,ComponentMainThreadExecutor mainThreadExecutor,FatalErrorHandler fatalErrorHandler,JobStatusListener jobStatusListener)throwsException{// 进入createInstance()return schedulerNGFactory.createInstance(
log,
jobGraph,
scheduledExecutorService,
configuration,
slotPoolService,
executorService,
userCodeLoader,
checkpointRecoveryFactory,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
slotRequestTimeout,
shuffleMaster,
partitionTracker,
executionDeploymentTracker,
initializationTimestamp,
mainThreadExecutor,
fatalErrorHandler,
jobStatusListener);}
进schedulerNGFactory.createInstance方法,选择DefaultSchedulerFactory实现
publicSchedulerNGcreateInstance(finalLogger log,finalJobGraph jobGraph,finalExecutor ioExecutor,finalConfiguration jobMasterConfiguration,finalSlotPoolService slotPoolService,finalScheduledExecutorService futureExecutor,finalClassLoader userCodeLoader,finalCheckpointRecoveryFactory checkpointRecoveryFactory,finalTime rpcTimeout,finalBlobWriter blobWriter,finalJobManagerJobMetricGroup jobManagerJobMetricGroup,finalTime slotRequestTimeout,finalShuffleMaster<?> shuffleMaster,finalJobMasterPartitionTracker partitionTracker,finalExecutionDeploymentTracker executionDeploymentTracker,long initializationTimestamp,finalComponentMainThreadExecutor mainThreadExecutor,finalFatalErrorHandler fatalErrorHandler,finalJobStatusListener jobStatusListener)throwsException{finalSlotPool slotPool =
slotPoolService
.castInto(SlotPool.class).orElseThrow(()->newIllegalStateException("The DefaultScheduler requires a SlotPool."));finalDefaultSchedulerComponents schedulerComponents =createSchedulerComponents(
jobGraph.getJobType(),
jobGraph.isApproximateLocalRecoveryEnabled(),
jobMasterConfiguration,
slotPool,
slotRequestTimeout);finalRestartBackoffTimeStrategy restartBackoffTimeStrategy =RestartBackoffTimeStrategyFactoryLoader.createRestartBackoffTimeStrategyFactory(
jobGraph.getSerializedExecutionConfig().deserializeValue(userCodeLoader).getRestartStrategy(),
jobMasterConfiguration,
jobGraph.isCheckpointingEnabled()).create();
log.info("Using restart back off time strategy {} for {} ({}).",
restartBackoffTimeStrategy,
jobGraph.getName(),
jobGraph.getJobID());finalExecutionGraphFactory executionGraphFactory =newDefaultExecutionGraphFactory(
jobMasterConfiguration,
userCodeLoader,
executionDeploymentTracker,
futureExecutor,
ioExecutor,
rpcTimeout,
jobManagerJobMetricGroup,
blobWriter,
shuffleMaster,
partitionTracker);// DefaultScheduler的构造方法,并进入父类的构造方法returnnewDefaultScheduler(
log,
jobGraph,
ioExecutor,
jobMasterConfiguration,
schedulerComponents.getStartUpAction(),newScheduledExecutorServiceAdapter(futureExecutor),
userCodeLoader,
checkpointRecoveryFactory,
jobManagerJobMetricGroup,
schedulerComponents.getSchedulingStrategyFactory(),FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
restartBackoffTimeStrategy,newDefaultExecutionVertexOperations(),newExecutionVertexVersioner(),
schedulerComponents.getAllocatorFactory(),
initializationTimestamp,
mainThreadExecutor,
jobStatusListener,
executionGraphFactory);}
DefaultScheduler的构造方法,并进入父类的构造方法
publicSchedulerBase(finalLogger log,finalJobGraph jobGraph,finalExecutor ioExecutor,finalConfiguration jobMasterConfiguration,finalClassLoader userCodeLoader,finalCheckpointRecoveryFactory checkpointRecoveryFactory,finalJobManagerJobMetricGroup jobManagerJobMetricGroup,finalExecutionVertexVersioner executionVertexVersioner,long initializationTimestamp,finalComponentMainThreadExecutor mainThreadExecutor,finalJobStatusListener jobStatusListener,finalExecutionGraphFactory executionGraphFactory)throwsException{this.log =checkNotNull(log);this.jobGraph =checkNotNull(jobGraph);this.executionGraphFactory = executionGraphFactory;this.jobManagerJobMetricGroup =checkNotNull(jobManagerJobMetricGroup);this.executionVertexVersioner =checkNotNull(executionVertexVersioner);this.mainThreadExecutor = mainThreadExecutor;this.checkpointsCleaner =newCheckpointsCleaner();this.completedCheckpointStore =SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(
jobGraph,
jobMasterConfiguration,
userCodeLoader,checkNotNull(checkpointRecoveryFactory),
log);this.checkpointIdCounter =SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(
jobGraph,checkNotNull(checkpointRecoveryFactory));// JobGraph向ExecutionGraph转换// 此处入参没有JobGraph 本质上JobGraph已经是实例内部的一个成员变量了this.executionGraph =createAndRestoreExecutionGraph(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
initializationTimestamp,
mainThreadExecutor,
jobStatusListener);registerShutDownCheckpointServicesOnExecutionGraphTermination(executionGraph);this.schedulingTopology = executionGraph.getSchedulingTopology();
stateLocationRetriever =
executionVertexId ->getExecutionVertex(executionVertexId).getPreferredLocationBasedOnState();
inputsLocationsRetriever =newExecutionGraphToInputsLocationsRetrieverAdapter(executionGraph);this.kvStateHandler =newKvStateHandler(executionGraph);this.executionGraphHandler =newExecutionGraphHandler(executionGraph, log, ioExecutor,this.mainThreadExecutor);this.operatorCoordinatorHandler =newDefaultOperatorCoordinatorHandler(executionGraph,this::handleGlobalFailure);
operatorCoordinatorHandler.initializeOperatorCoordinators(this.mainThreadExecutor);this.exceptionHistory =newBoundedFIFOQueue<>(
jobMasterConfiguration.getInteger(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));}
// TODO JobGraph向ExecutionGraph的转换// TODO 此处入参没有JobGraph是因为JobGraph已经是实例内部的一个成员变量了this.executionGraph =createAndRestoreExecutionGraph(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
initializationTimestamp,
mainThreadExecutor,
jobStatusListener);
privateExecutionGraphcreateAndRestoreExecutionGraph(CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,long initializationTimestamp,ComponentMainThreadExecutor mainThreadExecutor,JobStatusListener jobStatusListener)throwsException{// 创建或恢复ExecutionGraph 进入createAndRestoreExecutionGraph方法finalExecutionGraph newExecutionGraph =
executionGraphFactory.createAndRestoreExecutionGraph(
jobGraph,
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
jobGraph.getJobType()),
initializationTimestamp,newDefaultVertexAttemptNumberStore(),computeVertexParallelismStore(jobGraph),
log);
newExecutionGraph.setInternalTaskFailuresListener(newUpdateSchedulerNgOnInternalFailuresListener(this));
newExecutionGraph.registerJobStatusListener(jobStatusListener);
newExecutionGraph.start(mainThreadExecutor);return newExecutionGraph;}
进入createAndRestoreExecutionGraph方法 在ExecutionGraphFactory接口内
ExecutionGraphcreateAndRestoreExecutionGraph(JobGraph jobGraph,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,long initializationTimestamp,VertexAttemptNumberStore vertexAttemptNumberStore,VertexParallelismStore vertexParallelismStore,Logger log)throwsException;}
具体实现:
@OverridepublicExecutionGraphcreateAndRestoreExecutionGraph(JobGraph jobGraph,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,long initializationTimestamp,VertexAttemptNumberStore vertexAttemptNumberStore,VertexParallelismStore vertexParallelismStore,Logger log)throwsException{ExecutionDeploymentListener executionDeploymentListener =newExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);ExecutionStateUpdateListener executionStateUpdateListener =(execution, newState)->{if(newState.isTerminal()){
executionDeploymentTracker.stopTrackingDeploymentOf(execution);}};// 将JobGraph 转成成ExecutionGraph 进入builderGraph()方法finalExecutionGraph newExecutionGraph =DefaultExecutionGraphBuilder.buildGraph(
jobGraph,
configuration,
futureExecutor,
ioExecutor,
userCodeClassLoader,
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
rpcTimeout,
jobManagerJobMetricGroup,
blobWriter,
log,
shuffleMaster,
jobMasterPartitionTracker,
partitionLocationConstraint,
executionDeploymentListener,
executionStateUpdateListener,
initializationTimestamp,
vertexAttemptNumberStore,
vertexParallelismStore);// 恢复ExecutionGraphfinalCheckpointCoordinator checkpointCoordinator =
newExecutionGraph.getCheckpointCoordinator();if(checkpointCoordinator !=null){// check whether we find a valid checkpointif(!checkpointCoordinator.restoreInitialCheckpointIfPresent(newHashSet<>(newExecutionGraph.getAllVertices().values()))){// check whether we can restore from a savepointtryRestoreExecutionGraphFromSavepoint(
newExecutionGraph, jobGraph.getSavepointRestoreSettings());}}return newExecutionGraph;}
进入DefaultExecutionGraphBuilder.buildGraph()
ExecutionGraph空壳的初始化
publicstaticDefaultExecutionGraphbuildGraph(JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,ClassLoader classLoader,CompletedCheckpointStore completedCheckpointStore,CheckpointsCleaner checkpointsCleaner,CheckpointIDCounter checkpointIdCounter,Time rpcTimeout,MetricGroup metrics,BlobWriter blobWriter,Logger log,ShuffleMaster<?> shuffleMaster,JobMasterPartitionTracker partitionTracker,TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,ExecutionDeploymentListener executionDeploymentListener,ExecutionStateUpdateListener executionStateUpdateListener,long initializationTimestamp,VertexAttemptNumberStore vertexAttemptNumberStore,VertexParallelismStore vertexParallelismStore)throwsJobExecutionException,JobException{checkNotNull(jobGraph,"job graph cannot be null");finalString jobName = jobGraph.getName();finalJobID jobId = jobGraph.getJobID();finalJobInformation jobInformation =newJobInformation(
jobId,
jobName,
jobGraph.getSerializedExecutionConfig(),
jobGraph.getJobConfiguration(),
jobGraph.getUserJarBlobKeys(),
jobGraph.getClasspaths());finalint maxPriorAttemptsHistoryLength =
jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);finalPartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
jobManagerConfig);// create a new execution graph, if none exists so farfinalDefaultExecutionGraph executionGraph;try{// 开始初始化ExecutionGraph的空壳
executionGraph =newDefaultExecutionGraph(
jobInformation,
futureExecutor,
ioExecutor,
rpcTimeout,
maxPriorAttemptsHistoryLength,
classLoader,
blobWriter,
partitionReleaseStrategyFactory,
shuffleMaster,
partitionTracker,
partitionLocationConstraint,
executionDeploymentListener,
executionStateUpdateListener,
initializationTimestamp,
vertexAttemptNumberStore,
vertexParallelismStore);}catch(IOException e){thrownewJobException("Could not create the ExecutionGraph.", e);}// set the basic propertiestry{// 将JobGraph变成Json形式
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));}catch(Throwable t){
log.warn("Cannot create JSON plan for job", t);// give the graph an empty plan
executionGraph.setJsonPlan("{}");}// initialize the vertices that have a master initialization hook// file output formats create directories here, input formats create splitsfinallong initMasterStart =System.nanoTime();
log.info("Running initialization on master for job {} ({}).", jobName, jobId);// 遍历Jobgraph中的所有端点,看是否有启动类for(JobVertex vertex : jobGraph.getVertices()){String executableClass = vertex.getInvokableClassName();if(executableClass ==null|| executableClass.isEmpty()){thrownewJobSubmissionException(
jobId,"The vertex "+ vertex.getID()+" ("+ vertex.getName()+") has no invokable class.");}try{
vertex.initializeOnMaster(classLoader);}catch(Throwable t){thrownewJobExecutionException(
jobId,"Cannot initialize task '"+ vertex.getName()+"': "+ t.getMessage(),
t);}}
log.info("Successfully ran initialization on master in {} ms.",(System.nanoTime()- initMasterStart)/1_000_000);// topologically sort the job vertices and attach the graph to the existing one // 按照顺序将JobGraph中的端点放入集合中List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();if(log.isDebugEnabled()){
log.debug("Adding {} vertices from job graph {} ({}).",
sortedTopology.size(),
jobName,
jobId);}// 最重要的工作,生成ExecutionJobVertex,以及并行化,根据并行度生成多个ExecutionVertex
executionGraph.attachJobGraph(sortedTopology);if(log.isDebugEnabled()){
log.debug("Successfully created execution graph from job graph {} ({}).", jobName, jobId);}// configure the state checkpointing// 解析checkpoint参数,构建checkpoint相关组件if(isCheckpointingEnabled(jobGraph)){JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();// Maximum number of remembered checkpointsint historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);CheckpointStatsTracker checkpointStatsTracker =newCheckpointStatsTracker(
historySize,
snapshotSettings.getCheckpointCoordinatorConfiguration(),
metrics);// load the state backend from the application settingsfinalStateBackend applicationConfiguredBackend;finalSerializedValue<StateBackend> serializedAppConfigured =
snapshotSettings.getDefaultStateBackend();if(serializedAppConfigured ==null){
applicationConfiguredBackend =null;}else{try{
applicationConfiguredBackend =
serializedAppConfigured.deserializeValue(classLoader);}catch(IOException|ClassNotFoundException e){thrownewJobExecutionException(
jobId,"Could not deserialize application-defined state backend.", e);}}finalStateBackend rootBackend;try{
rootBackend =StateBackendLoader.fromApplicationOrConfigOrDefault(
applicationConfiguredBackend, jobManagerConfig, classLoader, log);}catch(IllegalConfigurationException|IOException|DynamicCodeLoadingException e){thrownewJobExecutionException(
jobId,"Could not instantiate configured state backend", e);}// load the checkpoint storage from the application settingsfinalCheckpointStorage applicationConfiguredStorage;finalSerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
snapshotSettings.getDefaultCheckpointStorage();if(serializedAppConfiguredStorage ==null){
applicationConfiguredStorage =null;}else{try{
applicationConfiguredStorage =
serializedAppConfiguredStorage.deserializeValue(classLoader);}catch(IOException|ClassNotFoundException e){thrownewJobExecutionException(
jobId,"Could not deserialize application-defined checkpoint storage.",
e);}}finalCheckpointStorage rootStorage;try{
rootStorage =CheckpointStorageLoader.load(
applicationConfiguredStorage,null,
rootBackend,
jobManagerConfig,
classLoader,
log);}catch(IllegalConfigurationException|DynamicCodeLoadingException e){thrownewJobExecutionException(
jobId,"Could not instantiate configured checkpoint storage", e);}// instantiate the user-defined checkpoint hooksfinalSerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
snapshotSettings.getMasterHooks();finalList<MasterTriggerRestoreHook<?>> hooks;if(serializedHooks ==null){
hooks =Collections.emptyList();}else{finalMasterTriggerRestoreHook.Factory[] hookFactories;try{
hookFactories = serializedHooks.deserializeValue(classLoader);}catch(IOException|ClassNotFoundException e){thrownewJobExecutionException(
jobId,"Could not instantiate user-defined checkpoint hooks", e);}finalThread thread =Thread.currentThread();finalClassLoader originalClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(classLoader);try{
hooks =newArrayList<>(hookFactories.length);for(MasterTriggerRestoreHook.Factory factory : hookFactories){
hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}}finally{
thread.setContextClassLoader(originalClassLoader);}}finalCheckpointCoordinatorConfiguration chkConfig =
snapshotSettings.getCheckpointCoordinatorConfiguration();
executionGraph.enableCheckpointing(
chkConfig,
hooks,
checkpointIdCounter,
completedCheckpointStore,
rootBackend,
rootStorage,
checkpointStatsTracker,
checkpointsCleaner);}// create all the metrics for the Execution Graph
metrics.gauge(RestartTimeGauge.METRIC_NAME,newRestartTimeGauge(executionGraph));
metrics.gauge(DownTimeGauge.METRIC_NAME,newDownTimeGauge(executionGraph));
metrics.gauge(UpTimeGauge.METRIC_NAME,newUpTimeGauge(executionGraph));return executionGraph;}
ExecutionGraph初始化小结:
1.获取JobGraph拿到job的相关信息
2.初始化一个空的ExecutionGraph对象
3.将JobGraph变成json形式
4.判断JobGraph中所有的顶点是否包含启动类
5.按照顺序将JobGraph的顶点添加进集合中去
6.根据JobVertex以及算子并行度设置生成ExecutionVertex
7.解析checkpoint参数,构建checkpoint相关组件
8.为ExecutionGraph构建监控指标
最为核心的便是JobGraph中的JobVertex转化为ExecutionVertex的过程,我们点进下面这段代码中:
// TODO 最重要的工作,生成executionJobVertex,以及并行化,根据并行度生成多个ExecutionVertex
executionGraph.attachJobGraph(sortedTopology);
进入attachJobGraph()
@OverridepublicvoidattachJobGraph(List<JobVertex> topologiallySorted)throwsJobException{assertRunningInJobMasterMainThread();LOG.debug("Attaching {} topologically sorted vertices to existing job graph with {} "+"vertices and {} intermediate results.",
topologiallySorted.size(),
tasks.size(),
intermediateResults.size());finalArrayList<ExecutionJobVertex> newExecJobVertices =newArrayList<>(topologiallySorted.size());finallong createTimestamp =System.currentTimeMillis();// 遍历JobGraph的端点集合for(JobVertex jobVertex : topologiallySorted){if(jobVertex.isInputVertex()&&!jobVertex.isStoppable()){this.isStoppable =false;}VertexParallelismInformation parallelismInfo =
parallelismStore.getParallelismInfo(jobVertex.getID());// create the execution job vertex and attach it to the graph// 为每个JobVertex生成一个ExecutionJobVertex// 进入ExecutionJobVertex查看构建构建过程ExecutionJobVertex ejv =newExecutionJobVertex(this,
jobVertex,
maxPriorAttemptsHistoryLength,
rpcTimeout,
createTimestamp,
parallelismInfo,
initialAttemptCounts.getAttemptCounts(jobVertex.getID()));// 高版本的flink(>=1.13)中ExecutionEdge被优化,由ConsumerPartionGroup 和 ConsumerVertexGroup来替代
ejv.connectToPredecessors(this.intermediateResults);ExecutionJobVertex previousTask =this.tasks.putIfAbsent(jobVertex.getID(), ejv);if(previousTask !=null){thrownewJobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
jobVertex.getID(), ejv, previousTask));}for(IntermediateResult res : ejv.getProducedDataSets()){IntermediateResult previousDataSet =this.intermediateResults.putIfAbsent(res.getId(), res);if(previousDataSet !=null){thrownewJobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
res.getId(), res, previousDataSet));}}this.verticesInCreationOrder.add(ejv);this.numVerticesTotal += ejv.getParallelism();
newExecJobVertices.add(ejv);}registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);// the topology assigning should happen before notifying new vertices to failoverStrategy
executionTopology =DefaultExecutionTopology.fromExecutionGraph(this);
partitionReleaseStrategy =
partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());}
1.遍历所有JobGraph中的顶点JobVertex
2.为每个JobVextex生成一个ExecutionJobVertex
3.为每个JobVertex构建ConsumedPartitionGroup和ConsumedVertexGroup
ExecutionVertex的构建
publicExecutionJobVertex(InternalExecutionGraphAccessor graph,JobVertex jobVertex,int maxPriorAttemptsHistoryLength,Time timeout,long createTimestamp,VertexParallelismInformation parallelismInfo,SubtaskAttemptNumberStore initialAttemptCounts)throwsJobException{if(graph ==null|| jobVertex ==null){thrownewNullPointerException();}this.graph = graph;this.jobVertex = jobVertex;// 获取算子并行度信息this.parallelismInfo = parallelismInfo;// verify that our parallelism is not higher than the maximum parallelismif(this.parallelismInfo.getParallelism()>this.parallelismInfo.getMaxParallelism()){thrownewJobException(String.format("Vertex %s's parallelism (%s) is higher than the max parallelism (%s). Please lower the parallelism or increase the max parallelism.",
jobVertex.getName(),this.parallelismInfo.getParallelism(),this.parallelismInfo.getMaxParallelism()));}this.resourceProfile =ResourceProfile.fromResourceSpec(jobVertex.getMinResources(),MemorySize.ZERO);this.taskVertices =newExecutionVertex[this.parallelismInfo.getParallelism()];this.inputs =newArrayList<>(jobVertex.getInputs().size());// take the sharing groupthis.slotSharingGroup =checkNotNull(jobVertex.getSlotSharingGroup());this.coLocationGroup = jobVertex.getCoLocationGroup();// create the intermediate results// 通过JobVertex的IntermediateDataSets数量 初始化intermediateResult的空集合this.producedDataSets =newIntermediateResult[jobVertex.getNumberOfProducedIntermediateDataSets()];// 对IntermediateDataSet构建一个IntermediateDataSetsfor(int i =0; i < jobVertex.getProducedDataSets().size(); i++){finalIntermediateDataSet result = jobVertex.getProducedDataSets().get(i);// 构建IntermediateResultthis.producedDataSets[i]=newIntermediateResult(
result.getId(),this,this.parallelismInfo.getParallelism(),
result.getResultType());}// create all task vertices// 根据并行度生成对应数量的ExecutionVertexfor(int i =0; i <this.parallelismInfo.getParallelism(); i++){ExecutionVertex vertex =newExecutionVertex(this,
i,
producedDataSets,
timeout,
createTimestamp,
maxPriorAttemptsHistoryLength,
initialAttemptCounts.getAttemptCount(i));this.taskVertices[i]= vertex;}// sanity check for the double referencing between intermediate result partitions and// execution verticesfor(IntermediateResult ir :this.producedDataSets){if(ir.getNumberOfAssignedPartitions()!=this.parallelismInfo.getParallelism()){thrownewRuntimeException("The intermediate result's partitions were not correctly assigned.");}}finalList<SerializedValue<OperatorCoordinator.Provider>> coordinatorProviders =getJobVertex().getOperatorCoordinators();if(coordinatorProviders.isEmpty()){this.operatorCoordinators =Collections.emptyList();}else{finalArrayList<OperatorCoordinatorHolder> coordinators =newArrayList<>(coordinatorProviders.size());try{for(finalSerializedValue<OperatorCoordinator.Provider> provider :
coordinatorProviders){
coordinators.add(OperatorCoordinatorHolder.create(
provider,this, graph.getUserClassLoader()));}}catch(Exception|LinkageError e){IOUtils.closeAllQuietly(coordinators);thrownewJobException("Cannot instantiate the coordinator for operator "+getName(), e);}this.operatorCoordinators =Collections.unmodifiableList(coordinators);}// set up the input splits, if the vertex has anytry{@SuppressWarnings("unchecked")InputSplitSource<InputSplit> splitSource =(InputSplitSource<InputSplit>) jobVertex.getInputSplitSource();if(splitSource !=null){Thread currentThread =Thread.currentThread();ClassLoader oldContextClassLoader = currentThread.getContextClassLoader();
currentThread.setContextClassLoader(graph.getUserClassLoader());try{
inputSplits =
splitSource.createInputSplits(this.parallelismInfo.getParallelism());if(inputSplits !=null){
splitAssigner = splitSource.getInputSplitAssigner(inputSplits);}}finally{
currentThread.setContextClassLoader(oldContextClassLoader);}}else{
inputSplits =null;}}catch(Throwable t){thrownewJobException("Creating the input splits caused an error: "+ t.getMessage(), t);}}
1.获取并行度信息
2.通过JobVertex的IntermediateDataSet的数量初始化IntermediateResult集合
3.通过遍历JobGraph的IntermediateDataSet,对每一个IntermediateDataSet都构建一个IntermediateResult
4.根据并行度生成对应数量的ExecutionVertex
5.根据ExecutionVertex的数量来切分输入
查看根据并行度生成对应数量的ExecutionVertex
for(int i =0; i <this.parallelismInfo.getParallelism(); i++){ExecutionVertex vertex =newExecutionVertex(this,
i,
producedDataSets,
timeout,
createTimestamp,
maxPriorAttemptsHistoryLength,
initialAttemptCounts.getAttemptCount(i));this.taskVertices[i]= vertex;}
查看ExecutionVertex的构造方法
publicExecutionVertex(ExecutionJobVertex jobVertex,int subTaskIndex,IntermediateResult[] producedDataSets,Time timeout,long createTimestamp,int maxPriorExecutionHistoryLength,int initialAttemptCount){this.jobVertex = jobVertex;this.subTaskIndex = subTaskIndex;this.executionVertexId =newExecutionVertexID(jobVertex.getJobVertexId(), subTaskIndex);this.taskNameWithSubtask =String.format("%s (%d/%d)",
jobVertex.getJobVertex().getName(),
subTaskIndex +1,
jobVertex.getParallelism());this.resultPartitions =newLinkedHashMap<>(producedDataSets.length,1);for(IntermediateResult result : producedDataSets){IntermediateResultPartition irp =newIntermediateResultPartition(
result,this,
subTaskIndex,getExecutionGraphAccessor().getEdgeManager());
result.setPartition(subTaskIndex, irp);
resultPartitions.put(irp.getPartitionId(), irp);}this.priorExecutions =newEvictingBoundedList<>(maxPriorExecutionHistoryLength);/** ExecutionVertex 和 Execution
JobMaster把要部署的Task的必要信息都封装到一个对象中,然后发送给TaskExecutor
TaskExecutor接收到这个对象的时候,再次封装得到一个Task对象
部署:
JobMaster在拿到一个对应节点上的slot资源的时候,把要部署的Task的必要信息,都封装成Execution
然后执行Execution的deploy()执行部署
在deploy()方法的内部就会调用PRC请求,把必要的信息发送给TaskExecutor
TaskExecutor在接受到这些必要的信息的时候,把这些信息封装成一个Task对象 然后启动这个Task就完成了部署
*/this.currentExecution =newExecution(getExecutionGraphAccessor().getFutureExecutor(),this,
initialAttemptCount,
createTimestamp,
timeout);getExecutionGraphAccessor().registerExecution(currentExecution);this.timeout = timeout;this.inputSplits =newArrayList<>();}
ExecutionVertex的构造方法里主要做了两件事儿
1.根据并行度对IntermediateResult进行分区,生成IntermediateResultPartition
2.对Task所需要的一些必要信息进行封装,封装为一个Exection
JobMaster在拿到一个对应节点上的Slot资源的时候,会把要部署的Task的必要信息,都封装成一个Execution,然后执行Execution的deploy()方法执行部署。在该方法内部会调用RPC请求,把必要的信息发送给TaskExecutor。TaskExecutor在接收到这些必要信息的时候,再把这些信息封装成一个Task对象,然后启动这个Task就完成了部署
ConsumedPartitionGroup和ConsumedVertexGroup的构建
ConsumedPartitionGroup和ConsumedVertexGroup的初始化流程在executionGraph.attachJobGraph方法,点击进入ejv.connectToPredecessors(this.intermediateResults) 方法内。
publicvoidconnectToPredecessors(Map<IntermediateDataSetID,IntermediateResult> intermediateDataSets)throwsJobException{// 获取JobVertex的输入List<JobEdge> inputs = jobVertex.getInputs();if(LOG.isDebugEnabled()){LOG.debug(String.format("Connecting ExecutionJobVertex %s (%s) to %d predecessors.",
jobVertex.getID(), jobVertex.getName(), inputs.size()));}for(int num =0; num < inputs.size(); num++){JobEdge edge = inputs.get(num);if(LOG.isDebugEnabled()){if(edge.getSource()==null){LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via ID %s.",
num,
jobVertex.getID(),
jobVertex.getName(),
edge.getSourceId()));}else{LOG.debug(String.format("Connecting input %d of vertex %s (%s) to intermediate result referenced via predecessor %s (%s).",
num,
jobVertex.getID(),
jobVertex.getName(),
edge.getSource().getProducer().getID(),
edge.getSource().getProducer().getName()));}}// fetch the intermediate result via ID. if it does not exist, then it either has not// been created, or the order// in which this method is called for the job vertices is not a topological orderIntermediateResult ires = intermediateDataSets.get(edge.getSourceId());if(ires ==null){thrownewJobException("Cannot connect this job graph to the previous graph. No previous intermediate result found for ID "+ edge.getSourceId());}this.inputs.add(ires);// 构建ConsumedPartitionGroup和ConsumedVertexGroupEdgeManagerBuildUtil.connectVertexToResult(this, ires, edge.getDistributionPattern());}}
1.获取JobVertex的输入
2.根据输入获取到的Jobvertex的输入边JobEdge
3.对每一条输入边都来获取IntermediateDataSets中对应的中间结果数据IntermediateResult
4.根据IntermediateResult来构建ConsumedPartitionGroup和ConsumedVertexGroup
进入EdgeManagerBuildUtil.connectVertexToResult()
staticvoidconnectVertexToResult(ExecutionJobVertex vertex,IntermediateResult intermediateResult,DistributionPattern distributionPattern){switch(distributionPattern){casePOINTWISE:connectPointwise(vertex.getTaskVertices(), intermediateResult);break;caseALL_TO_ALL:connectAllToAll(vertex.getTaskVertices(), intermediateResult);break;default:thrownewIllegalArgumentException("Unrecognized distribution pattern.");}}
这里的switch匹配了三种模式:以all-to-all为例
privatestaticvoidconnectAllToAll(ExecutionVertex[] taskVertices,IntermediateResult intermediateResult){// todo 遍历 intermediateResultPartition 来构建ConsumedPartitionGroupConsumedPartitionGroup consumedPartitions =ConsumedPartitionGroup.fromMultiplePartitions(Arrays.stream(intermediateResult.getPartitions()).map(IntermediateResultPartition::getPartitionId).collect(Collectors.toList()));// 将ConsumedpartitionGroup连接到ExecutionVertexfor(ExecutionVertex ev : taskVertices){
ev.addConsumedPartitionGroup(consumedPartitions);}// 将下游ExecutionVertex构建为ConsumerVertexGroupConsumerVertexGroup vertices =ConsumerVertexGroup.fromMultipleVertices(Arrays.stream(taskVertices).map(ExecutionVertex::getID).collect(Collectors.toList()));// 将每一个IntermediateResultPartition链接至下游ConsumerVertexGroupfor(IntermediateResultPartition partition : intermediateResult.getPartitions()){
partition.addConsumers(vertices);}}
总结:
1.遍历IntermediateResult中所有分区IntermediateResultPartition,并将这些IntermediateResultPartition构建为一个ConsumedPartitionGroup对象
2.将当前IntermediaResult所有连接的下游ExecutionVextex都连接到这个IntermediateResult构建出的ConsumedPartitionGroup
3.将当前IntermediaResult锁连接的下一级ExecutionVextex都放入ConsumerVertexGroup对象中
4.将当前IntermediateResult的IntermediateResultPartition都连接到该ConsumerVertexGroup对象上
版权归原作者 前兄如后背 所有, 如有侵权,请联系我们删除。