0


Flink1.19JobGraph是如何生成的

文章目录

概要

本篇主要讲解Flink程序是如何转换为JobGraph的,由于底层每一步实现都很复杂而且很多,我们这一篇只讲解大概流程,具体细节后续补充。

数据准备

这里是我准备的一个很简单的flink程序,我们以这个job任务为例,讲述flink的jobgraph是如何生成的

source->map->flatMap->keyby->sum->sink 总计六个算子

package org.example.demo;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class SocketWordCountDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> socketTextStream = env.fromData("data");
        SingleOutputStreamOperator<String> mapStream = socketTextStream.map(String::toLowerCase).setParallelism(5);
        SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapStream = mapStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                out.collect(Tuple2.of(value, 1));
            }
        }).setParallelism(5);
        KeyedStream<Tuple2<String, Integer>, String> keyedStream = flatMapStream.keyBy(x -> x.f0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = keyedStream.sum(1).setParallelism(5);
        resultStream.print("result:").setParallelism(5);
        env.execute();
    }
}

整体架构流程

1.首先当我们的程序执行到execute方法后,我们编写的flink程序算子就已经存储到StreamExecutionEnvironment这个对象的transformations集合里了

2.getStreamGraph()方法会将我们transformations里存储的各种transformation转换为StreamNode,最终生成StreamGraph

3.进入execute(streamGraph)

4.进入executeAsync(streamGraph)

5.继续调用executor.execute()

6.getJobGraph()

7.进入FlinkPipelineTranslationUtil.getJobGraph()

8.继续点击进入translateJobGraph方法

9.继续点streamGraph.getJobGraph,底层代码就是这样,一层一层的调用,要有耐心

10.createJobGraph()方法,入参是类加载器和StreamGraph和jobId

11.进入StreamJobGraphGenerator对象的构造器

12.可以很清楚的看到这里创建了一个JobGraph对象

13.但是从debug的结果上看,这个jobgraph很多属性还没有赋值,只是个空壳

14.回退出来,调用createJobGraph,对这个jobgraph进行属性的赋值,这一步才算真正的创建了JobGraph,我们进入这个方法

private JobGraph createJobGraph() {
        preValidate();
        jobGraph.setJobType(streamGraph.getJobType());
        jobGraph.setDynamic(streamGraph.isDynamic());

        jobGraph.enableApproximateLocalRecovery(
                streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());

        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        Map<Integer, byte[]> hashes =
                defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

        setChaining(hashes, legacyHashes);

        if (jobGraph.isDynamic()) {
            setVertexParallelismsForDynamicGraphIfNecessary();
        }

        // Note that we set all the non-chainable outputs configuration here because the
        // "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job
        // vertices and partition-reuse
        final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs =
                new HashMap<>();
        setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);
        setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);

        setPhysicalEdges();

        markSupportingConcurrentExecutionAttempts();

        validateHybridShuffleExecuteInBatchMode();

        setSlotSharingAndCoLocation();

        setManagedMemoryFraction(
                Collections.unmodifiableMap(jobVertices),
                Collections.unmodifiableMap(vertexConfigs),
                Collections.unmodifiableMap(chainedConfigs),
                id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
                id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());

        configureCheckpointing();

        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

        final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
                JobGraphUtils.prepareUserArtifactEntries(
                        streamGraph.getUserArtifacts().stream()
                                .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
                        jobGraph.getJobID());

        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                distributedCacheEntries.entrySet()) {
            jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
        }

        // set the ExecutionConfig last when it has been finalized
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        } catch (IOException e) {
            throw new IllegalConfigurationException(
                    "Could not serialize the ExecutionConfig."
                            + "This indicates that non-serializable types (like custom serializers) were registered");
        }
        jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());

        addVertexIndexPrefixInVertexName();

        setVertexDescription();

        // Wait for the serialization of operator coordinators and stream config.
        try {
            FutureUtils.combineAll(
                            vertexConfigs.values().stream()
                                    .map(
                                            config ->
                                                    config.triggerSerializationAndReturnFuture(
                                                            serializationExecutor))
                                    .collect(Collectors.toList()))
                    .get();

            waitForSerializationFuturesAndUpdateJobVertices();
        } catch (Exception e) {
            throw new FlinkRuntimeException("Error in serialization.", e);
        }

        if (!streamGraph.getJobStatusHooks().isEmpty()) {
            jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
        }

        return jobGraph;
    }

15.这里的代码比较多,我复制出来,可以很清楚的看到在为JobGraph对象属性赋值,JobGraph就正式创建好了

16.在这么多代码中,最为核心的就是这一行代码,setChaining()这个方法从名字上看,就是设置算子链,在这个方法里,完成了JobGraph整体结构的创建和算子任务的合并,由于比较复杂,我这一篇就不赘述了,后面会出一篇单独讲

小结

JobGraph整体创建流程就是Transformations->StreamGraph->JobGraph

标签: flink

本文转载自: https://blog.csdn.net/m0_73904819/article/details/140827052
版权归原作者 BigDataLover520 所有, 如有侵权,请联系我们删除。

“Flink1.19JobGraph是如何生成的”的评论:

还没有评论