0


Flink ExecuteGraph构建源码解析

文章目录


前言

在这里插入图片描述

在JobGraph构建过程中分析了JobGraph的构建过程,本文分析ExecutionGraph的构建过程。JobManager(JobMaster) 根据 JobGraph 生成 ExecutionGraph。ExecutionGraph是JobGraph 的并行化版本,是调度层最核心的数据结构。


ExecutionGraph中的主要抽象概念

1、ExecutionJobVertex:和JobGraph中的JobVertex一一对应。每一个ExecutionJobVertex都有
和并发度一样多的 ExecutionVertex。
2、ExecutionVertex:表示ExecutionJobVertex的其中一个并发子任务,输入是ExecutionEdge,输
出是IntermediateResultPartition。
3、IntermediateResult:和JobGraph中的IntermediateDataSet一一对应。一个
IntermediateResult包含多个IntermediateResultPartition,其个数等于该operator的并发度。
4、IntermediateResultPartition:表示ExecutionVertex的一个输出分区,producer是
ExecutionVertex,consumer是若干个ExecutionEdge。
5、ExecutionEdge:表示ExecutionVertex的输入,source是IntermediateResultPartition,
target是ExecutionVertex。source和target都只能是一个。
6、Execution:是执行一个 ExecutionVertex 的一次尝试。当发生故障或者数据需要重算的情况下
ExecutionVertex 可能会有多个 ExecutionAttemptID。一个 Execution 通过
ExecutionAttemptID 来唯一标识。JM和TM之间关于 task 的部署和 task status 的更新都是通过
ExecutionAttemptID 来确定消息接受者。

源码核心代码入口

ExecutionGraph executioinGraph =SchedulerBase.createAndRestoreExecutionGraph(
                        completedCheckpointStore,
                        checkpointsCleaner,
                        checkpointIdCounter,
                        initializationTimestamp,
                        mainThreadExecutor,
                        jobStatusListener,
                        vertexParallelismStore);

在 SchedulerBase 这个类的内部,有两个成员变量:一个是 JobGraph,一个是 ExecutioinGraph
在创建 SchedulerBase 的子类:DefaultScheduler 的实例对象的时候,会在 SchedulerBase 的构造
方法中去生成 ExecutionGraph。

源码核心流程:

DefaultExecutionGraphFactory.createAndRestoreExecutionGraph()ExecutionGraph newExecutionGraph =createExecutionGraph(...)DefaultExecutionGraphBuilder.buildGraph(jobGraph,....)// 创建 ExecutionGraph 对象
executionGraph =(prior !=null)? prior :newExecutionGraph(...)// 生成 JobGraph 的 JSON 表达形式
executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));// 重点,从 JobGraph 构建 ExecutionGraph
executionGraph.attachJobGraph(sortedTopology);// 遍历 JobVertex 执行并行化生成 ExecutioinVertexfor(JobVertex jobVertex : topologiallySorted){// 每一个 JobVertex 对应到一个 ExecutionJobVertexExecutionJobVertex ejv =newExecutionJobVertex(jobGraph,
    jobVertex);
    ejv.connectToPredecessors(this.intermediateResults);List<JobEdge> inputs = jobVertex.getInputs();for(int num =0; num < inputs.size(); num++){JobEdge edge = inputs.get(num);IntermediateResult ires =intermediateDataSets.get(edgeID);this.inputs.add(ires);// 根据并行度来设置 ExecutionVertexfor(int i =0; i < parallelism; i++){ExecutionVertex ev = taskVertices[i];
            ev.connectSource(num, ires, edge,consumerIndex);}}}

DefaultExecutionGraphBuilder 详细代码如下:

publicclassDefaultExecutionGraphBuilder{publicstaticDefaultExecutionGraphbuildGraph(JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor){finalString jobName = jobGraph.getName();finalJobID jobId = jobGraph.getJobID();finalJobInformation jobInformation =newJobInformation(...);// create a new execution graph, if none exists so farfinalDefaultExecutionGraph executionGraph;
         executionGraph =newDefaultExecutionGraph(....);// set the basic properties
        executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));// initialize the vertices that have a master initialization hook// file output formats create directories here, input formats create splitsfor(JobVertex vertex : jobGraph.getVertices()){String executableClass = vertex.getInvokableClassName();
                vertex.initializeOnMaster(newSimpleInitializeOnMasterContext(
                                classLoader,
                                vertexParallelismStore
                                        .getParallelismInfo(vertex.getID()).getParallelism()));}// topologically sort the job vertices and attach the graph to the existing oneList<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();

        executionGraph.attachJobGraph(sortedTopology);// configure the state checkpointingif(isDynamicGraph){// dynamic graph does not support checkpointing so we skip it
            log.warn("Skip setting up checkpointing for a job with dynamic graph.");}elseif(isCheckpointingEnabled(jobGraph)){JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();// load the state backend from the application settingsfinalStateBackend applicationConfiguredBackend;finalSerializedValue<StateBackend> serializedAppConfigured =
                    snapshotSettings.getDefaultStateBackend();if(serializedAppConfigured ==null){
                applicationConfiguredBackend =null;}else{try{
                    applicationConfiguredBackend =
                            serializedAppConfigured.deserializeValue(classLoader);}catch(IOException|ClassNotFoundException e){thrownewJobExecutionException(
                            jobId,"Could not deserialize application-defined state backend.", e);}}finalStateBackend rootBackend =StateBackendLoader.fromApplicationOrConfigOrDefault(
                             applicationConfiguredBackend,
                             snapshotSettings.isChangelogStateBackendEnabled(),
                             jobManagerConfig,
                             classLoader,
                             log);// load the checkpoint storage from the application settingsfinalCheckpointStorage applicationConfiguredStorage;finalSerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
                    snapshotSettings.getDefaultCheckpointStorage();if(serializedAppConfiguredStorage ==null){
                applicationConfiguredStorage =null;}else{
                    applicationConfiguredStorage =                           serializedAppConfiguredStorage.deserializeValue(classLoader);finalCheckpointStorage rootStorage;try{
                rootStorage =CheckpointStorageLoader.load(
                                applicationConfiguredStorage,null,
                                rootBackend,
                                jobManagerConfig,
                                classLoader,
                                log);}catch(IllegalConfigurationException|DynamicCodeLoadingException e){thrownewJobExecutionException(
                        jobId,"Could not instantiate configured checkpoint storage", e);}// instantiate the user-defined checkpoint hooksfinalSerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
                    snapshotSettings.getMasterHooks();finalList<MasterTriggerRestoreHook<?>> hooks;if(serializedHooks ==null){
                hooks =Collections.emptyList();}else{finalMasterTriggerRestoreHook.Factory[] hookFactories;try{
                    hookFactories = serializedHooks.deserializeValue(classLoader);}catch(IOException|ClassNotFoundException e){thrownewJobExecutionException(
                            jobId,"Could not instantiate user-defined checkpoint hooks", e);}finalThread thread =Thread.currentThread();finalClassLoader originalClassLoader = thread.getContextClassLoader();
                thread.setContextClassLoader(classLoader);try{
                    hooks =newArrayList<>(hookFactories.length);for(MasterTriggerRestoreHook.Factory factory : hookFactories){
                        hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));}}finally{
                    thread.setContextClassLoader(originalClassLoader);}}finalCheckpointCoordinatorConfiguration chkConfig =
                    snapshotSettings.getCheckpointCoordinatorConfiguration();String changelogStorage = jobManagerConfig.getString(STATE_CHANGE_LOG_STORAGE);

            executionGraph.enableCheckpointing(
                    chkConfig,
                    hooks,
                    checkpointIdCounter,
                    completedCheckpointStore,
                    rootBackend,
                    rootStorage,
                    checkpointStatsTrackerFactory.get(),
                    checkpointsCleaner,
                    jobManagerConfig.getString(STATE_CHANGE_LOG_STORAGE));}return executionGraph;}}

本文转载自: https://blog.csdn.net/gwc791224/article/details/136538387
版权归原作者 shandongwill 所有, 如有侵权,请联系我们删除。

“Flink ExecuteGraph构建源码解析”的评论:

还没有评论