文章目录
概要
在 Flink 中,StreamGraph 是数据流的逻辑表示,它描述了如何在 Flink 作业中执行数据流转换。StreamGraph 是 Flink 运行时生成执行计划的基础。
使用DataStream API开发的应用程序,首先被转换为 Transformation,再被映射为StreamGraph,在客户端进行StreamGraph、JobGraph的转换,提交JobGraph到Flink集群后,Flink集群负责将JobGraph转换为ExecutionGraph,之后进入调度执行阶段。
SteramGraph 核心对象
- StreamNode StreamNode 是 StremGraph 中的节点 ,从 Transformation 转换而来,可以简单理解为一个 StreamNode 表示一个算子,从逻辑上来说,SteramNode 在 StreamGraph 中存在实体和虚拟的 StreamNode。StremNode 可以有多个输入,也可以有多个输出。 实体的 StreamNode 会最终变成物理算子。虚拟的 StreamNode 会附着在 StreamEdge 上。
- StreamEdge StreamEdge 是 StreamGraph 中的边,用来连接两个 StreamNode,一个 StreamNode 可以有多个出边、入边,StreamEdge 中包含了旁路输出、分区器、字段筛选输出等信息。
SteramGraph 生成过程
StreamGraph 在 FlinkClient 中生成,由 FlinkClient 在提交的时候触发 Flink 应用的 main 方法,用户编写的业务逻辑组装成 Transformation 流水线,在最后调用 StreamExecutionEnvironment.execute() 的时候开始触发 StreamGraph 构建。
StreamGraph在Flink的作业提交前生成,生成StreamGraph的入口在StreamExecutionEnvironment中
@InternalpublicStreamGraphgetStreamGraph(){returnthis.getStreamGraph(this.getJobName());}@InternalpublicStreamGraphgetStreamGraph(String jobName){returnthis.getStreamGraph(jobName,true);}@InternalpublicStreamGraphgetStreamGraph(String jobName,boolean clearTransformations){StreamGraph streamGraph =this.getStreamGraphGenerator().setJobName(jobName).generate();if(clearTransformations){this.transformations.clear();}return streamGraph;}privateStreamGraphGeneratorgetStreamGraphGenerator(){if(this.transformations.size()<=0){thrownewIllegalStateException("No operators defined in streaming topology. Cannot execute.");}else{RuntimeExecutionMode executionMode =(RuntimeExecutionMode)this.configuration.get(ExecutionOptions.RUNTIME_MODE);return(newStreamGraphGenerator(this.transformations,this.config,this.checkpointCfg,this.getConfiguration())).setRuntimeExecutionMode(executionMode).setStateBackend(this.defaultStateBackend).setChaining(this.isChainingEnabled).setUserArtifacts(this.cacheFile).setTimeCharacteristic(this.timeCharacteristic).setDefaultBufferTimeout(this.bufferTimeout);}}
StreamGraph实际上是在StreamGraphGenerator中生成的,从SinkTransformation(输出向前追溯到SourceTransformation)。在遍历过程中一边遍历一遍构建StreamGraph,如代码清单所示
@InternalpublicclassStreamGraphGenerator{privatefinalList<Transformation<?>> transformations;privateStateBackend stateBackend;privatestaticfinalMap<Class<?extendsTransformation>,TransformationTranslator<?,?extendsTransformation>> translatorMap;protectedstaticInteger iterationIdCounter;privateStreamGraph streamGraph;privateMap<Transformation<?>,Collection<Integer>> alreadyTransformed;publicStreamGraphGenerator(List<Transformation<?>> transformations,ExecutionConfig executionConfig,CheckpointConfig checkpointConfig){this(transformations, executionConfig, checkpointConfig,newConfiguration());}publicStreamGraphGenerator(List<Transformation<?>> transformations,ExecutionConfig executionConfig,CheckpointConfig checkpointConfig,ReadableConfig configuration){this.chaining =true;this.timeCharacteristic =DEFAULT_TIME_CHARACTERISTIC;this.jobName ="Flink Streaming Job";this.savepointRestoreSettings =SavepointRestoreSettings.none();this.defaultBufferTimeout =-1L;this.runtimeExecutionMode =RuntimeExecutionMode.STREAMING;this.transformations =(List)Preconditions.checkNotNull(transformations);this.executionConfig =(ExecutionConfig)Preconditions.checkNotNull(executionConfig);this.checkpointConfig =newCheckpointConfig(checkpointConfig);this.configuration =(ReadableConfig)Preconditions.checkNotNull(configuration);}publicStreamGraphgenerate(){this.streamGraph =newStreamGraph(this.executionConfig,this.checkpointConfig,this.savepointRestoreSettings);this.shouldExecuteInBatchMode =this.shouldExecuteInBatchMode(this.runtimeExecutionMode);this.configureStreamGraph(this.streamGraph);this.alreadyTransformed =newHashMap();Iterator var1 =this.transformations.iterator();while(var1.hasNext()){Transformation<?> transformation =(Transformation)var1.next();this.transform(transformation);}StreamGraph builtStreamGraph =this.streamGraph;this.alreadyTransformed.clear();this.alreadyTransformed =null;this.streamGraph =null;return builtStreamGraph;}privateCollection<Integer>transform(Transformation<?> transform){if(this.alreadyTransformed.containsKey(transform)){return(Collection)this.alreadyTransformed.get(transform);}else{LOG.debug("Transforming "+ transform);if(transform.getMaxParallelism()<=0){int globalMaxParallelismFromConfig =this.executionConfig.getMaxParallelism();if(globalMaxParallelismFromConfig >0){
transform.setMaxParallelism(globalMaxParallelismFromConfig);}}
transform.getOutputType();TransformationTranslator<?,Transformation<?>> translator =(TransformationTranslator)translatorMap.get(transform.getClass());Collection transformedIds;if(translator !=null){
transformedIds =this.translate(translator, transform);}else{
transformedIds =this.legacyTransform(transform);}if(!this.alreadyTransformed.containsKey(transform)){this.alreadyTransformed.put(transform, transformedIds);}return transformedIds;}}privateCollection<Integer>legacyTransform(Transformation<?> transform){Collection transformedIds;if(transform instanceofFeedbackTransformation){
transformedIds =this.transformFeedback((FeedbackTransformation)transform);}else{if(!(transform instanceofCoFeedbackTransformation)){thrownewIllegalStateException("Unknown transformation: "+ transform);}
transformedIds =this.transformCoFeedback((CoFeedbackTransformation)transform);}if(transform.getBufferTimeout()>=0L){this.streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());}else{this.streamGraph.setBufferTimeout(transform.getId(),this.defaultBufferTimeout);}if(transform.getUid()!=null){this.streamGraph.setTransformationUID(transform.getId(), transform.getUid());}if(transform.getUserProvidedNodeHash()!=null){this.streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());}if(!this.streamGraph.getExecutionConfig().hasAutoGeneratedUIDsEnabled()&& transform instanceofPhysicalTransformation&& transform.getUserProvidedNodeHash()==null&& transform.getUid()==null){thrownewIllegalStateException("Auto generated UIDs have been disabled but no UID or hash has been assigned to operator "+ transform.getName());}else{if(transform.getMinResources()!=null&& transform.getPreferredResources()!=null){this.streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());}this.streamGraph.setManagedMemoryUseCaseWeights(transform.getId(), transform.getManagedMemoryOperatorScopeUseCaseWeights(), transform.getManagedMemorySlotScopeUseCases());return transformedIds;}}private<T>Collection<Integer>transformFeedback(FeedbackTransformation<T> iterate){if(this.shouldExecuteInBatchMode){thrownewUnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '"+ExecutionOptions.RUNTIME_MODE.key()+"'="+RuntimeExecutionMode.STREAMING.name());}elseif(iterate.getFeedbackEdges().size()<=0){thrownewIllegalStateException("Iteration "+ iterate +" does not have any feedback edges.");}else{List<Transformation<?>> inputs = iterate.getInputs();Preconditions.checkState(inputs.size()==1);Transformation<?> input =(Transformation)inputs.get(0);List<Integer> resultIds =newArrayList();Collection<Integer> inputIds =this.transform(input);
resultIds.addAll(inputIds);if(this.alreadyTransformed.containsKey(iterate)){return(Collection)this.alreadyTransformed.get(iterate);}else{Tuple2<StreamNode,StreamNode> itSourceAndSink =this.streamGraph.createIterationSourceAndSink(iterate.getId(),getNewIterationNodeId(),getNewIterationNodeId(), iterate.getWaitTime(), iterate.getParallelism(), iterate.getMaxParallelism(), iterate.getMinResources(), iterate.getPreferredResources());StreamNode itSource =(StreamNode)itSourceAndSink.f0;StreamNode itSink =(StreamNode)itSourceAndSink.f1;this.streamGraph.setSerializers(itSource.getId(),(TypeSerializer)null,(TypeSerializer)null, iterate.getOutputType().createSerializer(this.executionConfig));this.streamGraph.setSerializers(itSink.getId(), iterate.getOutputType().createSerializer(this.executionConfig),(TypeSerializer)null,(TypeSerializer)null);
resultIds.add(itSource.getId());this.alreadyTransformed.put(iterate, resultIds);List<Integer> allFeedbackIds =newArrayList();Iterator var10 = iterate.getFeedbackEdges().iterator();while(var10.hasNext()){Transformation<T> feedbackEdge =(Transformation)var10.next();Collection<Integer> feedbackIds =this.transform(feedbackEdge);
allFeedbackIds.addAll(feedbackIds);Iterator var13 = feedbackIds.iterator();while(var13.hasNext()){Integer feedbackId =(Integer)var13.next();this.streamGraph.addEdge(feedbackId, itSink.getId(),0);}}String slotSharingGroup =this.determineSlotSharingGroup((String)null, allFeedbackIds);if(slotSharingGroup ==null){
slotSharingGroup ="SlotSharingGroup-"+ iterate.getId();}
itSink.setSlotSharingGroup(slotSharingGroup);
itSource.setSlotSharingGroup(slotSharingGroup);return resultIds;}}}private<F>Collection<Integer>transformCoFeedback(CoFeedbackTransformation<F> coIterate){if(this.shouldExecuteInBatchMode){thrownewUnsupportedOperationException("Iterations are not supported in BATCH execution mode. If you want to execute such a pipeline, please set the '"+ExecutionOptions.RUNTIME_MODE.key()+"'="+RuntimeExecutionMode.STREAMING.name());}else{Tuple2<StreamNode,StreamNode> itSourceAndSink =this.streamGraph.createIterationSourceAndSink(coIterate.getId(),getNewIterationNodeId(),getNewIterationNodeId(), coIterate.getWaitTime(), coIterate.getParallelism(), coIterate.getMaxParallelism(), coIterate.getMinResources(), coIterate.getPreferredResources());StreamNode itSource =(StreamNode)itSourceAndSink.f0;StreamNode itSink =(StreamNode)itSourceAndSink.f1;this.streamGraph.setSerializers(itSource.getId(),(TypeSerializer)null,(TypeSerializer)null, coIterate.getOutputType().createSerializer(this.executionConfig));this.streamGraph.setSerializers(itSink.getId(), coIterate.getOutputType().createSerializer(this.executionConfig),(TypeSerializer)null,(TypeSerializer)null);Collection<Integer> resultIds =Collections.singleton(itSource.getId());this.alreadyTransformed.put(coIterate, resultIds);List<Integer> allFeedbackIds =newArrayList();Iterator var7 = coIterate.getFeedbackEdges().iterator();while(var7.hasNext()){Transformation<F> feedbackEdge =(Transformation)var7.next();Collection<Integer> feedbackIds =this.transform(feedbackEdge);
allFeedbackIds.addAll(feedbackIds);Iterator var10 = feedbackIds.iterator();while(var10.hasNext()){Integer feedbackId =(Integer)var10.next();this.streamGraph.addEdge(feedbackId, itSink.getId(),0);}}String slotSharingGroup =this.determineSlotSharingGroup((String)null, allFeedbackIds);
itSink.setSlotSharingGroup(slotSharingGroup);
itSource.setSlotSharingGroup(slotSharingGroup);returnCollections.singleton(itSource.getId());}}privateCollection<Integer>translate(TransformationTranslator<?,Transformation<?>> translator,Transformation<?> transform){Preconditions.checkNotNull(translator);Preconditions.checkNotNull(transform);List<Collection<Integer>> allInputIds =this.getParentInputIds(transform.getInputs());if(this.alreadyTransformed.containsKey(transform)){return(Collection)this.alreadyTransformed.get(transform);}else{String slotSharingGroup =this.determineSlotSharingGroup(transform.getSlotSharingGroup(),(Collection)allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList()));Context context =newStreamGraphGenerator.ContextImpl(this,this.streamGraph, slotSharingGroup,this.configuration);returnthis.shouldExecuteInBatchMode ? translator.translateForBatch(transform, context): translator.translateForStreaming(transform, context);}}privateList<Collection<Integer>>getParentInputIds(@NullableCollection<Transformation<?>> parentTransformations){List<Collection<Integer>> allInputIds =newArrayList();if(parentTransformations ==null){return allInputIds;}else{Iterator var3 = parentTransformations.iterator();while(var3.hasNext()){Transformation<?> transformation =(Transformation)var3.next();
allInputIds.add(this.transform(transformation));}return allInputIds;}}privateStringdetermineSlotSharingGroup(String specifiedGroup,Collection<Integer> inputIds){if(specifiedGroup !=null){return specifiedGroup;}else{String inputGroup =null;Iterator var4 = inputIds.iterator();while(var4.hasNext()){int id =(Integer)var4.next();String inputGroupCandidate =this.streamGraph.getSlotSharingGroup(id);if(inputGroup ==null){
inputGroup = inputGroupCandidate;}elseif(!inputGroup.equals(inputGroupCandidate)){return"default";}}return inputGroup ==null?"default": inputGroup;}}static{DEFAULT_TIME_CHARACTERISTIC=TimeCharacteristic.ProcessingTime;Map<Class<?extendsTransformation>,TransformationTranslator<?,?extendsTransformation>> tmp =newHashMap();
tmp.put(OneInputTransformation.class,newOneInputTransformationTranslator());
tmp.put(TwoInputTransformation.class,newTwoInputTransformationTranslator());
tmp.put(MultipleInputTransformation.class,newMultiInputTransformationTranslator());
tmp.put(KeyedMultipleInputTransformation.class,newMultiInputTransformationTranslator());
tmp.put(SourceTransformation.class,newSourceTransformationTranslator());
tmp.put(SinkTransformation.class,newSinkTransformationTranslator());
tmp.put(LegacySinkTransformation.class,newLegacySinkTransformationTranslator());
tmp.put(LegacySourceTransformation.class,newLegacySourceTransformationTranslator());
tmp.put(UnionTransformation.class,newUnionTransformationTranslator());
tmp.put(PartitionTransformation.class,newPartitionTransformationTranslator());
tmp.put(SideOutputTransformation.class,newSideOutputTransformationTranslator());
tmp.put(ReduceTransformation.class,newReduceTransformationTranslator());
tmp.put(TimestampsAndWatermarksTransformation.class,newTimestampsAndWatermarksTransformationTranslator());
tmp.put(BroadcastStateTransformation.class,newBroadcastStateTransformationTranslator());
tmp.put(KeyedBroadcastStateTransformation.class,newKeyedBroadcastStateTransformationTranslator());
translatorMap =Collections.unmodifiableMap(tmp);
iterationIdCounter =0;}}
版权归原作者 shandongwill 所有, 如有侵权,请联系我们删除。