文章目录
前言
在JobGraph构建过程中分析了JobGraph的构建过程,本文分析ExecutionGraph的构建过程。JobManager(JobMaster) 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph是JobGraph 的并行化版本,是调度层最核心的数据结构。
ExecutionGraph中的主要抽象概念
1、ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有
和并发度一样多的 ExecutionVertex。
2、ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输
出是IntermediateResultPartition。
3、IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个
IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。
4、IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是
ExecutionVertex,consumer是若干个ExecutionEdge。
5、ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,
target是ExecutionVertex。source和target都只能是一个。
6、Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下
ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过
ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过
ExecutionAttemptID 来确定消息接受者。
源码核心代码入口
ExecutionGraph executioinGraph =SchedulerBase.createAndRestoreExecutionGraph(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
initializationTimestamp,
mainThreadExecutor,
jobStatusListener,
vertexParallelismStore);
在 SchedulerBase 这个类的内部,有两个成员变量:一个是 JobGraph,一个是 ExecutioinGraph
在创建 SchedulerBase 的子类:DefaultScheduler 的实例对象的时候,会在 SchedulerBase 的构造
方法中去生成 ExecutionGraph。
源码核心流程:
DefaultExecutionGraphFactory.createAndRestoreExecutionGraph()ExecutionGraph newExecutionGraph =createExecutionGraph(...)DefaultExecutionGraphBuilder.buildGraph(jobGraph,....)// 创建 ExecutionGraph 对象
executionGraph =(prior !=null)? prior :newExecutionGraph(...)// 生成 JobGraph 的 JSON 表达形式
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));// 重点,从 JobGraph 构建 ExecutionGraph
executionGraph.attachJobGraph(sortedTopology);// 遍历 JobVertex 执行并行化生成 ExecutioinVertexfor(JobVertex jobVertex : topologiallySorted){// 每一个 JobVertex 对应到一个 ExecutionJobVertexExecutionJobVertex ejv =newExecutionJobVertex(jobGraph,
jobVertex);
ejv.connectToPredecessors(this.intermediateResults);List<JobEdge> inputs = jobVertex.getInputs();for(int num =0; num < inputs.size(); num++){JobEdge edge = inputs.get(num);IntermediateResult ires =intermediateDataSets.get(edgeID);this.inputs.add(ires);// 根据并行度来设置 ExecutionVertexfor(int i =0; i < parallelism; i++){ExecutionVertex ev = taskVertices[i];
ev.connectSource(num, ires, edge,consumerIndex);}}}
DefaultExecutionGraphBuilder 详细代码如下:
publicclassDefaultExecutionGraphBuilder{publicstaticDefaultExecutionGraphbuildGraph(JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor){finalString jobName = jobGraph.getName();finalJobID jobId = jobGraph.getJobID();finalJobInformation jobInformation =newJobInformation(...);// create a new execution graph, if none exists so farfinalDefaultExecutionGraph executionGraph;
executionGraph =newDefaultExecutionGraph(....);// set the basic properties
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));// initialize the vertices that have a master initialization hook// file output formats create directories here, input formats create splitsfor(JobVertex vertex : jobGraph.getVertices()){String executableClass = vertex.getInvokableClassName();
vertex.initializeOnMaster(newSimpleInitializeOnMasterContext(
classLoader,
vertexParallelismStore
.getParallelismInfo(vertex.getID()).getParallelism()));}// topologically sort the job vertices and attach the graph to the existing oneList<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
executionGraph.attachJobGraph(sortedTopology);// configure the state checkpointingif(isDynamicGraph){// dynamic graph does not support checkpointing so we skip it
log.warn("Skip setting up checkpointing for a job with dynamic graph.");}elseif(isCheckpointingEnabled(jobGraph)){JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();// load the state backend from the application settingsfinalStateBackend applicationConfiguredBackend;finalSerializedValue<StateBackend> serializedAppConfigured =
snapshotSettings.getDefaultStateBackend();if(serializedAppConfigured ==null){
applicationConfiguredBackend =null;}else{try{
applicationConfiguredBackend =
serializedAppConfigured.deserializeValue(classLoader);}catch(IOException|ClassNotFoundException e){thrownewJobExecutionException(
jobId,"Could not deserialize application-defined state backend.", e);}}finalStateBackend rootBackend =StateBackendLoader.fromApplicationOrConfigOrDefault(
applicationConfiguredBackend,
snapshotSettings.isChangelogStateBackendEnabled(),
jobManagerConfig,
classLoader,
log);// load the checkpoint storage from the application settingsfinalCheckpointStorage applicationConfiguredStorage;finalSerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
snapshotSettings.getDefaultCheckpointStorage();if(serializedAppConfiguredStorage ==null){
applicationConfiguredStorage =null;}else{
applicationConfiguredStorage = serializedAppConfiguredStorage.deserializeValue(classLoader);finalCheckpointStorage rootStorage;try{
rootStorage =CheckpointStorageLoader.load(
applicationConfiguredStorage,null,
rootBackend,
jobManagerConfig,
classLoader,
log);}catch(IllegalConfigurationException|DynamicCodeLoadingException e){thrownewJobExecutionException(
jobId,"Could not instantiate configured checkpoint storage", e);}// instantiate the user-defined checkpoint hooksfinalSerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
snapshotSettings.getMasterHooks();finalList<MasterTriggerRestoreHook<?>> hooks;if(serializedHooks ==null){
hooks =Collections.emptyList();}else{finalMasterTriggerRestoreHook.Factory[] hookFactories;try{
hookFactories = serializedHooks.deserializeValue(classLoader);}catch(IOException|ClassNotFoundException e){thrownewJobExecutionException(
jobId,"Could not instantiate user-defined checkpoint hooks", e);}finalThread thread =Thread.currentThread();finalClassLoader originalClassLoader = thread.getContextClassLoader();
thread.setContextClassLoader(classLoader);try{
hooks =newArrayList<>(hookFactories.length);for(MasterTriggerRestoreHook.Factory factory : hookFactories){
hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}}finally{
thread.setContextClassLoader(originalClassLoader);}}finalCheckpointCoordinatorConfiguration chkConfig =
snapshotSettings.getCheckpointCoordinatorConfiguration();String changelogStorage = jobManagerConfig.getString(STATE_CHANGE_LOG_STORAGE);
executionGraph.enableCheckpointing(
chkConfig,
hooks,
checkpointIdCounter,
completedCheckpointStore,
rootBackend,
rootStorage,
checkpointStatsTrackerFactory.get(),
checkpointsCleaner,
jobManagerConfig.getString(STATE_CHANGE_LOG_STORAGE));}return executionGraph;}}
版权归原作者 shandongwill 所有, 如有侵权,请联系我们删除。