0


Flink作业执行之 3.StreamGraph

Flink作业执行之 3.StreamGraph

1. StreamGraphGenerator

在前文了解Transformation和StreamOperator后。接下来Transformation将转换成StreamGraph,即作业的逻辑拓扑结构。

env.execute()

方法中调用

getStreamGraph

方法生成

StreamGraph

实例。

StreamGraph

StreamGraphGenerator

负责生成。

StreamGraphGenerator

实例中封装了前面生成的Transformation集合。

privateStreamGraphgetStreamGraph(List<Transformation<?>> transformations){synchronizeClusterDatasetStatus();// 根据Transformation生成StreamGraphGenerator,然后再生成StreamGraphreturngetStreamGraphGenerator(transformations).generate();}// 创建StreamGraphGenerator实例privateStreamGraphGeneratorgetStreamGraphGenerator(List<Transformation<?>> transformations){// ...returnnewStreamGraphGenerator(// 传入transformations集合newArrayList<>(transformations), config, checkpointCfg, configuration).setStateBackend(defaultStateBackend).setChangelogStateBackendEnabled(changelogStateBackendEnabled).setSavepointDir(defaultSavepointDirectory).setChaining(isChainingEnabled).setUserArtifacts(cacheFile).setTimeCharacteristic(timeCharacteristic).setDefaultBufferTimeout(bufferTimeout).setSlotSharingGroupResource(slotSharingGroupResources);}
generate

方法核心逻辑如下,首先创建一个空的StreamGraph实例。然后通过遍历transformations集合,依次调用

transform

方法完成StreamGraph中节点和边实例的创建,并将节点和边加入到StreamGraph中。

publicStreamGraphgenerate(){// 先实例化一个空的StreamGraph
    streamGraph =newStreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);// ...for(Transformation<?> transformation : transformations){// 依次处理transformationtransform(transformation);}finalStreamGraph builtStreamGraph = streamGraph;// ...return builtStreamGraph;}

一个作业中生成的StreamGraph和Transformation实例数量而言,一个任务会生成多个Transformation实例,单个Transformation实例中仅包含直接上游实例。但一个任务只会生成一个StreamGraph实例,StreamGraph是一个完整的图的表示,其中包含了图中全部的节点和边。

2. TransformationTranslator

TransformationTranslator

负责根据执行模式将给定的

Transformation

转换为其运行时实现,即StreamGraph。其接口中定义了批和流处理模式下的方法。

publicinterfaceTransformationTranslator<OUT,TextendsTransformation<OUT>>{// 批模式Collection<Integer>translateForBatch(finalT transformation,finalContext context);// 流模式Collection<Integer>translateForStreaming(finalT transformation,finalContext context);}

在StreamGraphGenerator实例的创建过程中会通过静态代码块生成如下

Transformation

TransformationTranslator

的映射关系。包含了

Transformation

子类中除

FeedbackTransformation

CoFeedbackTransformation

之外的其他剩余子类,共计16个值。

FeedbackTransformation

CoFeedbackTransformation

未提供TransformationTranslator的实现,需要单独处理。

static{Map<Class<?extendsTransformation>,TransformationTranslator<?,?extendsTransformation>>
            tmp =newHashMap<>();
    tmp.put(OneInputTransformation.class,newOneInputTransformationTranslator<>());
    tmp.put(TwoInputTransformation.class,newTwoInputTransformationTranslator<>());
    tmp.put(MultipleInputTransformation.class,newMultiInputTransformationTranslator<>());
    tmp.put(KeyedMultipleInputTransformation.class,newMultiInputTransformationTranslator<>());
    tmp.put(SourceTransformation.class,newSourceTransformationTranslator<>());
    tmp.put(SinkTransformation.class,newSinkTransformationTranslator<>());
    tmp.put(LegacySinkTransformation.class,newLegacySinkTransformationTranslator<>());
    tmp.put(LegacySourceTransformation.class,newLegacySourceTransformationTranslator<>());
    tmp.put(UnionTransformation.class,newUnionTransformationTranslator<>());
    tmp.put(PartitionTransformation.class,newPartitionTransformationTranslator<>());
    tmp.put(SideOutputTransformation.class,newSideOutputTransformationTranslator<>());
    tmp.put(ReduceTransformation.class,newReduceTransformationTranslator<>());
    tmp.put(TimestampsAndWatermarksTransformation.class,newTimestampsAndWatermarksTransformationTranslator<>());
    tmp.put(BroadcastStateTransformation.class,newBroadcastStateTransformationTranslator<>());
    tmp.put(KeyedBroadcastStateTransformation.class,newKeyedBroadcastStateTransformationTranslator<>());
    tmp.put(CacheTransformation.class,newCacheTransformationTranslator<>());// 将映射关系保存在成员属性中
    translatorMap =Collections.unmodifiableMap(tmp);}

3. StreamGraph

StreamGraph表示Flink执行图,描述了作业的逻辑拓扑结构,并以DAG的形式描述作业中算子之间的上下游连接关系。

StreamGraph

实现了

Pipeline

接口,接口中没有任何内容,仅为了表示

DataStream

中的

StreamGraph

DataSet

中的

Plan

都属于

Pipeline

类型。
在这里插入图片描述

StreamGraph

表示DAG,DAG中节点和边分别使用

StreamNode

StreamEdge

类表示。

三者的UML关系如下
在这里插入图片描述

StreamGraph

中将全部的

StreamNode

节点保存在其集合属性中,同时单独指定了Source节点和sink节点,相关属性如下

// 全部节点数据,key=节点id,即transformation的idprivateMap<Integer,StreamNode> streamNodes;// 表示Source的节点idprivateSet<Integer> sources;// 表示sink的节点idprivateSet<Integer> sinks;privateSet<Integer> expandedSinks;// 旁路输出的节点信息privateMap<Integer,Tuple2<Integer,OutputTag>> virtualSideOutputNodes;// 虚拟节点信息,key = 新生成的虚拟节点id,tuple3为虚拟节点信息.f0=此虚拟节点的上游节点idprivateMap<Integer,Tuple3<Integer,StreamPartitioner<?>,StreamExchangeMode>> virtualPartitionNodes;

一个节点最基础的信息有:节点id/名称、入/出边信息、工作内容。
在这里插入图片描述
上述基础信息维护在以下属性中。其中operatorFactory和jobVertexClass属性表示节点工作内容。

// 节点idprivatefinalint id;// 并行度privateint parallelism;privateint maxParallelism;// 节点名称privatefinalString operatorName;// 工作内容:算子信息privateStreamOperatorFactory<?> operatorFactory;// 节点入边privateList<StreamEdge> inEdges =newArrayList<StreamEdge>();// 节点出边privateList<StreamEdge> outEdges =newArrayList<StreamEdge>();// 工作内容:StreamTask实例,表示该节点所属的StreamTask子类型。privatefinalClass<?extendsTaskInvokable> jobVertexClass;

StreamEdge中表示边基本信息的属性字段如下。

// 边idprivatefinalString edgeId;// 边连接的上游节点id,即StreamNode.idprivatefinalint sourceId;// 边连接的下游节点idprivatefinalint targetId;// 上游节点名称privatefinalString sourceOperatorName;// 下游节点名称privatefinalString targetOperatorName;

4. 生成StreamGraph

对实现了TransformationTranslator接口的16种Transformation而言(上述静态代码内容),Transformation转换过程大致如下。

首先从Transformation中获取id、name、输入类型(即上游Transformation中的输出类型,Source没有)、输出类型、StreamOperatorFactory实例等内容作为节点和边实例中基础信息。

Class<? extends TaskInvokable> vertexClass

信息在具体的TransformationTranslator子类中进行指定。

然后通过StreamGraph中

addNode

方法,生成StreamNode实例并将该实例加入到

Map<Integer, StreamNode> streamNodes

,如果是Source则将节点id加入到

Set<Integer> sources

,如果是sink则将节点id加入到

Set<Integer> sinks

4.1. 生成节点

addNode

方法如下

protectedStreamNodeaddNode(Integer vertexID,// transformation id@NullableString slotSharingGroup,@NullableString coLocationGroup,Class<?extendsTaskInvokable> vertexClass,// StreanTask实例StreamOperatorFactory<?> operatorFactory,// transformation中的工厂实例String operatorName){// transformation name,如果是Source或sink,则分别拼接"Source: "或"Sink: "前缀// ...// 生成节点实例StreamNode vertex =newStreamNode(
                    vertexID,
                    slotSharingGroup,
                    coLocationGroup,
                    operatorFactory,
                    operatorName,
                    vertexClass);// 将节点添加到map
    streamNodes.put(vertexID, vertex);return vertex;}

节点id和名称直接取自Transformation的id和名称。如果是Source或sink,则分别拼接"Source: "或"Sink: "前缀。
节点工作内容来自Transformation中的StreamOperatorFactory实例。

生成节点实例后,根据Transformation中的并行度,设置节点的并行度。如果Transformation中未设置并行度时,获取配置中默认的并行度。

注意,此时的节点并不包含边属性。

4.2. 设置节点的边

节点可能存在入边和出边,根据节点是否存在上游决定是否需要设置入边信息,完成当前节点的入边设置同时,将该边设置为相应上游节点的出边。每个节点的出边由下游节点触发设置
Source作为头节点,不存在上游,因此source节点不存在设置边的操作。

当节点存在上游节点时,通过StreamGraph中

addEdge

方法完成节点边的设置。如果存在多个上游,则循环调用addEdge方法。

publicvoidaddEdge(Integer upStreamVertexID,// 上游节点idInteger downStreamVertexID,// 当前节点idint typeNumber,// 只有co-task任务才会涉及到,多条入边的序号IntermediateDataSetID intermediateDataSetId){// 注意在这里调用时, partitioner、outputTag、exchangeMode传null值addEdgeInternal(
            upStreamVertexID,
            downStreamVertexID,
            typeNumber,null,// 注意newArrayList<String>(),null,// 注意null,// 注意
            intermediateDataSetId);}privatevoidaddEdgeInternal(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner<?> partitioner,List<String> outputNames,OutputTag outputTag,StreamExchangeMode exchangeMode,IntermediateDataSetID intermediateDataSetId){if(virtualSideOutputNodes.containsKey(upStreamVertexID)){// 上游节点是旁路输出节点时int virtualId = upStreamVertexID;
        upStreamVertexID = virtualSideOutputNodes.get(virtualId).f0;if(outputTag ==null){
            outputTag = virtualSideOutputNodes.get(virtualId).f1;}// 递归调用addEdgeInternal(
                upStreamVertexID,
                downStreamVertexID,
                typeNumber,
                partitioner,null,
                outputTag,
                exchangeMode,
                intermediateDataSetId);}elseif(virtualPartitionNodes.containsKey(upStreamVertexID)){// 上游节点是虚拟节点时int virtualId = upStreamVertexID;// 上游(虚拟)节点的父节点id
        upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;if(partitioner ==null){// 获取了虚拟节点的partitioner
            partitioner = virtualPartitionNodes.get(virtualId).f1;}// 获取了虚拟节点的数据exchangeMode
        exchangeMode = virtualPartitionNodes.get(virtualId).f2;// 递归调用addEdgeInternal(
                upStreamVertexID,
                downStreamVertexID,
                typeNumber,
                partitioner,
                outputNames,
                outputTag,
                exchangeMode,
                intermediateDataSetId);}else{// 创建边实例createActualEdge(
                upStreamVertexID,
                downStreamVertexID,
                typeNumber,
                partitioner,
                outputTag,
                exchangeMode,
                intermediateDataSetId);}}
createActualEdge

方法完成边的创建并将边添加到上下游节点中。

privatevoidcreateActualEdge(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner<?> partitioner,OutputTag outputTag,StreamExchangeMode exchangeMode,IntermediateDataSetID intermediateDataSetId){StreamNode upstreamNode =getStreamNode(upStreamVertexID);StreamNode downstreamNode =getStreamNode(downStreamVertexID);// 设置数据分区
    partitioner =...// 算子之间的数据交换模式if(exchangeMode ==null){
        exchangeMode =StreamExchangeMode.UNDEFINED;}int uniqueId =getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();// 生成边实例StreamEdge edge =newStreamEdge(
                    upstreamNode,
                    downstreamNode,
                    typeNumber,
                    partitioner,
                    outputTag,
                    exchangeMode,
                    uniqueId,
                    intermediateDataSetId);// 最后将生成的边分别添加到上游节点的List<StreamEdge> outEdges和当前节点的List<StreamEdge>getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}

5. WordCount实例的StreamGraph

WordCount示例中,按照DataStream的转换流程将得到如下关系的Transformation信息。因此StreamGraph将由如下Transformation得到。
在这里插入图片描述
前文提到Transformation分为物理和虚拟两大类,物理类别将会生成节点,而虚拟类别将生成边。上述生成的5个Transformation中PartitionTransformation属于虚拟类别,而其余4个均数据物理类别。既然虚拟类别将生成边,那么其处理方式定然与其他4个节点有所不同。

5.1. 虚拟节点

在PartitionTransformationTranslator中

translateInternal

方法中,将调用StreamGraph中的

addVirtualPartitionNode

方法,将PartitionTransformation加入到表示虚拟节点集合中。并没有生成节点的操作。

privateCollection<Integer>translateInternal(finalPartitionTransformation<OUT> transformation,finalContext context,boolean supportsBatchExchange){checkNotNull(transformation);checkNotNull(context);finalStreamGraph streamGraph = context.getStreamGraph();// 上游Transformation,在本示例中为OneInputTransformation,tId=2finalTransformation<?> input =...List<Integer> resultIds =newArrayList<>();StreamExchangeMode exchangeMode =...;for(Integer inputId : context.getStreamNodeIds(input)){// 当前作业中已生成5个Transformation实例,因此下一个自增id为6finalint virtualId =Transformation.getNewNodeId();// 加入虚拟节点集合中
        streamGraph.addVirtualPartitionNode(// inputId即上游id=2,virtualId=6
                inputId, virtualId, transformation.getPartitioner(), exchangeMode);
        resultIds.add(virtualId);}// 最后将新生成的ids返回return resultIds;}// StreamGraph中的addVirtualPartitionNode方法publicvoidaddVirtualPartitionNode(Integer originalId,Integer virtualId,StreamPartitioner<?> partitioner,StreamExchangeMode exchangeMode){
    virtualPartitionNodes.put(virtualId,newTuple3<>(originalId, partitioner, exchangeMode));}

处理完成PartitionTransformation之后,StreamGraph实例中的虚拟节点集合中

Map<Integer, Tuple3<Integer, StreamPartitioner<?>, StreamExchangeMode>> virtualPartitionNodes

中便存在了元素。
接下来处理ReduceTransformation,其上游节点是虚拟节点,因此在生成边时,在

addEdgeInternal

方法中将会执行上游节点是虚拟节点时得逻辑分支。

还记得前面提到的

addEdgeInternal

方法中存在3个逻辑判断吗?

privatevoidaddEdgeInternal(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner<?> partitioner,List<String> outputNames,OutputTag outputTag,StreamExchangeMode exchangeMode,IntermediateDataSetID intermediateDataSetId){if(virtualSideOutputNodes.containsKey(upStreamVertexID)){// 上游节点是旁路输出节点时// ...}elseif(virtualPartitionNodes.containsKey(upStreamVertexID)){// 上游节点是虚拟节点时// 本实例中ReduceTransformation的上游节点为虚拟节点,因此将会执行这段逻辑。int virtualId = upStreamVertexID;// 6,为什么是6在介绍PartitionTransformationTranslator处理逻辑时有解释// 上游(虚拟)节点的父节点
        upStreamVertexID = virtualPartitionNodes.get(virtualId).f0;// 2,即OneInputTransformation的idif(partitioner ==null){// 获取了虚拟节点的partitioner
            partitioner = virtualPartitionNodes.get(virtualId).f1;}// 获取了虚拟节点的exchangeMode
        exchangeMode = virtualPartitionNodes.get(virtualId).f2;// 递归调用时,PartitionTransformation从上下游中消失了,仅仅从PartitionTransformation中获取了partitioner和exchangeMode信息。addEdgeInternal(
                upStreamVertexID,// 2
                downStreamVertexID,// 4
                typeNumber,
                partitioner,
                outputNames,
                outputTag,
                exchangeMode,
                intermediateDataSetId);}else{// 生成边信息// ...}}

原始的Transformation关系中,ReduceTransformation的上游是PartitionTransformationT(tId=3),从前面PartitionTransformationTranslator处理逻辑中已知,PartitionTransformation并未真正生成节点,而是加入到了表示虚拟节点集合中,因此获PartitionTransformation的上游节点即OneInputTransformation(tId=2),作为ReduceTransformation在StreamGraph的父节点。

最终得到的StreamGraph示意图如下图所示(省略并行度信息)。
在这里插入图片描述
当作业中存在旁路输出时,处理方式与虚拟节点类似,不在赘述。

6. 一点理解

试着理解下为什么要将Transformation转成StreamGraph?
最初设计者的设计和初衷不得而知,以下纯粹个人理解。

Transformation到StreamGraph转换可以看作是链表结构到图结构的转换。

Transformation是类似于单向链表的结构,并且还是指向上游的逆向链表,从其中任何一个Transformation开始只能获取其上游数据。必须遍历全部的Transformation实例后,才能得到完成的作业信息。
Transformation结构中和上游是嵌套关系,这样多个实例中都最终指向同一个上游,处理关系时存在冗余。
但是Transformation的好处是生成方便。每次DataStream转换时,十分清楚的知道上游是谁,直接将上游实例传递到当前实例中即可。

StreamGraph是图的结构。可以使用图的处理方式快速处理节点关系。同时也更接近最终的作业执行拓扑结构。

标签: flink 大数据

本文转载自: https://blog.csdn.net/qq_33446500/article/details/139679000
版权归原作者 有数的编程笔记 所有, 如有侵权,请联系我们删除。

“Flink作业执行之 3.StreamGraph”的评论:

还没有评论