0


深入Flink StreamGraph:构建流处理拓扑的奥秘

简单说两句

✨ 正在努力的小新~
💖 超级爱分享,分享各种有趣干货!
👩‍💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~

作者:****不正经小新CSDN后端领域新星创作者 |阿里云专家博主

CSDN个人主页:不正经小新

🔎GZH

  1. 不正经小新

🎉欢迎关注🔎点赞👍收藏⭐️留言📝

Flink StreamGraph

在这里插入图片描述

什么是StreamGraph

  1. StreamGraph

表示流式处理拓扑的类,它包含构建执行任务图所需的所有信息。

说白了就是我们写的代码直接生成的图,表示程序的拓扑结构

StreamGraph类图,可以看到这个类里面包含了执行任务所需的所有信息,比如状态后端,JobType(流or批)、checkpoint配置等等,我们今天从宏观层面看,不深入每个细节,先看大体,再慢慢深入研究~🥹🥹🥹

在这里插入图片描述

代码阅读

getStreamGraph方法代码清单

  1. publicStreamGraphgetStreamGraph(){returngetStreamGraph(true);}

可以看到,这里穿了个默认参数

  1. true

,他的作用就是清除之前注册 transformations

为什么要清除? 就是防止多次执行execute时,执行相同的操作。

getStreamGraphGenerator方法代码清单

  1. privateStreamGraphGeneratorgetStreamGraphGenerator(List<Transformation<?>> transformations){if(transformations.size()<=0){thrownewIllegalStateException("No operators defined in streaming topology. Cannot execute.");}// Synchronize the cached file to config option PipelineOptions.CACHED_FILES because the// field cachedFile haven't been migrated to configuration.if(!getCachedFiles().isEmpty()){
  2. configuration.set(PipelineOptions.CACHED_FILES,DistributedCache.parseStringFromCachedFiles(getCachedFiles()));}// We copy the transformation so that newly added transformations cannot intervene with the// stream graph generation.returnnewStreamGraphGenerator(newArrayList<>(transformations), config, checkpointCfg, configuration).setStateBackend(defaultStateBackend).setTimeCharacteristic(getStreamTimeCharacteristic()).setSlotSharingGroupResource(slotSharingGroupResources);}

getStreamGraphGenerator就是是生成一个 StreamGraphGenerator 对象,用于创建流处理拓扑图

下面是对这个方法的解释

  • 检查 transformations 列表是否为空:若为空,直接给你抛出异常(可演示)
  • 同步缓存文件到配置选项:如果有缓存文件,则将这些文件同步到配置选项 PipelineOptions.CACHED_FILES 中。这是因为 cachedFile 字段还没有迁移到配置中。(这算是个小优化吧~)
  • 创建并返回 StreamGraphGenerator 对象

generate() 代码清单

  1. publicStreamGraphgenerate(){//传入配置
  2. streamGraph =newStreamGraph(
  3. configuration, executionConfig, checkpointConfig, savepointRestoreSettings);//判断是否是批处理模式
  4. shouldExecuteInBatchMode =shouldExecuteInBatchMode();//配置StreamGraphconfigureStreamGraph(streamGraph);//已经转换的Transformation
  5. alreadyTransformed =newIdentityHashMap<>();//遍历所有的Transformationfor(Transformation<?> transformation : transformations){transform(transformation);}//设置插槽共享组资源
  6. streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);setFineGrainedGlobalStreamExchangeMode(streamGraph);//转换成LineageGraphLineageGraph lineageGraph =LineageGraphUtils.convertToLineageGraph(transformations);
  7. streamGraph.setLineageGraph(lineageGraph);//遍历 streamGraph 中的所有 StreamNode 节点,//并检查每个节点的输入边是否需要禁用非对齐检查点。//如果需要禁用,则将这些输入边的 supportsUnalignedCheckpoints 属性设置为 falsefor(StreamNode node : streamGraph.getStreamNodes()){if(node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)){for(StreamEdge edge : node.getInEdges()){
  8. edge.setSupportsUnalignedCheckpoints(false);}}}//清理与返回finalStreamGraph builtStreamGraph = streamGraph;
  9. alreadyTransformed.clear();
  10. alreadyTransformed =null;
  11. streamGraph =null;return builtStreamGraph;}

下面就来重点研究下transform这个方法

在看之前,我们先瞅瞅这个transformations里面有哪些元素

在这里插入图片描述

transform代码清单

  1. privateCollection<Integer>transform(Transformation<?> transform){//检查是否已经转换,如果是则直接返回if(alreadyTransformed.containsKey(transform)){return alreadyTransformed.get(transform);}
  2. LOG.debug("Transforming "+ transform);if(transform.getMaxParallelism()<=0){// if the max parallelism hasn't been set, then first use the job wide max parallelism// from the ExecutionConfig.int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();if(globalMaxParallelismFromConfig >0){
  3. transform.setMaxParallelism(globalMaxParallelismFromConfig);}}//省略部分代码// call at least once to trigger exceptions about MissingTypeInfo
  4. transform.getOutputType();//根据transform获取具体的实现类@SuppressWarnings("unchecked")finalTransformationTranslator<?,Transformation<?>> translator =(TransformationTranslator<?,Transformation<?>>)
  5. translatorMap.get(transform.getClass());Collection<Integer> transformedIds;if(translator !=null){//根据transform的具体类型,走不同的处理
  6. transformedIds =translate(translator, transform);}else{
  7. transformedIds =legacyTransform(transform);}// need this check because the iterate transformation adds itself before// transforming the feedback edgesif(!alreadyTransformed.containsKey(transform)){
  8. alreadyTransformed.put(transform, transformedIds);}return transformedIds;}

接下来进入translate方法一探究竟

translate方法代码清单 先看【SourceTransformation】的流程

  1. privateCollection<Integer>translate(finalTransformationTranslator<?,Transformation<?>> translator,finalTransformation<?> transform){checkNotNull(translator);checkNotNull(transform);//获取给定的父 Transformation 对象集合中每个 Transformation 对应的节点 ID 集合,// 【有的话会递归调用 transform 方法】finalList<Collection<Integer>> allInputIds =getParentInputIds(transform.getInputs());// the recursive call might have already transformed thisif(alreadyTransformed.containsKey(transform)){return alreadyTransformed.get(transform);}finalString slotSharingGroup =determineSlotSharingGroup(
  2. transform.getSlotSharingGroup().isPresent()? transform.getSlotSharingGroup().get().getName():null,
  3. allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList()));finalTransformationTranslator.Context context =newContextImpl(this, streamGraph, slotSharingGroup, configuration);return shouldExecuteInBatchMode
  4. ? translator.translateForBatch(transform, context)//将给定的 Transformation 对象转换为其在流式执行模式下的运行时实现,并进行配置: translator.translateForStreaming(transform, context);}

根据断点走到了SourceTransformationTranslator类中的translateInternal方法里面

translateInternal方法代码清单

  1. privateCollection<Integer>translateInternal(finalSourceTransformation<OUT,SplitT,EnumChkT> transformation,finalContext context,boolean emitProgressiveWatermarks){checkNotNull(transformation);checkNotNull(context);finalStreamGraph streamGraph = context.getStreamGraph();finalString slotSharingGroup = context.getSlotSharingGroup();finalint transformationId = transformation.getId();finalExecutionConfig executionConfig = streamGraph.getExecutionConfig();SourceOperatorFactory<OUT> operatorFactory =newSourceOperatorFactory<>(
  2. transformation.getSource(),
  3. transformation.getWatermarkStrategy(),
  4. emitProgressiveWatermarks);//设置连接策略,通常是 AlWAYS
  5. operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
  6. operatorFactory.setCoordinatorListeningID(transformation.getCoordinatorListeningID());//添加数据源
  7. streamGraph.addSource(
  8. transformationId,
  9. slotSharingGroup,
  10. transformation.getCoLocationGroupKey(),
  11. operatorFactory,null,
  12. transformation.getOutputType(),"Source: "+ transformation.getName());//获取并设置并行度finalint parallelism =
  13. transformation.getParallelism()!=ExecutionConfig.PARALLELISM_DEFAULT
  14. ? transformation.getParallelism(): executionConfig.getParallelism();
  15. streamGraph.setParallelism(
  16. transformationId, parallelism, transformation.isParallelismConfigured());
  17. streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
  18. streamGraph.setSupportsConcurrentExecutionAttempts(
  19. transformationId, transformation.isSupportsConcurrentExecutionAttempts());returnCollections.singleton(transformationId);}

我们看下addSource方法

这个方法逻辑其实不多,核心在addOperator这个方法里面

addOperator代码清单

  1. private<IN, OUT>voidaddOperator(Integer vertexID,@NullableString slotSharingGroup,@NullableString coLocationGroup,StreamOperatorFactory<OUT> operatorFactory,TypeInformation<IN> inTypeInfo,TypeInformation<OUT> outTypeInfo,String operatorName,Class<?extendsTaskInvokable> invokableClass){//下面有说明addNode(
  2. vertexID,
  3. slotSharingGroup,
  4. coLocationGroup,
  5. invokableClass,
  6. operatorFactory,
  7. operatorName);//(设置输入输出类型的序列化器)setSerializers(vertexID,createSerializer(inTypeInfo),null,createSerializer(outTypeInfo));//(设置输出类型)if(operatorFactory.isOutputTypeConfigurable()&& outTypeInfo !=null){// sets the output type which must be know at StreamGraph creation time
  8. operatorFactory.setOutputType(outTypeInfo, executionConfig);}if(operatorFactory.isInputTypeConfigurable()){
  9. operatorFactory.setInputType(inTypeInfo, executionConfig);}if(LOG.isDebugEnabled()){
  10. LOG.debug("Vertex: {}", vertexID);}}

addOperator里面的第一个逻辑是addNode (StreamNode)

StreamNode包含程序中的运算符和所有属性

addNode方法代码清单

  1. protectedStreamNodeaddNode(Integer vertexID,@NullableString slotSharingGroup,@NullableString coLocationGroup,Class<?extendsTaskInvokable> vertexClass,@NullableStreamOperatorFactory<?> operatorFactory,String operatorName){if(streamNodes.containsKey(vertexID)){thrownewRuntimeException("Duplicate vertexID "+ vertexID);}//节点 ID、槽共享组、协同位置组、操作符工厂、操作符名称和任务可调用类StreamNode vertex =newStreamNode(
  2. vertexID,
  3. slotSharingGroup,
  4. coLocationGroup,
  5. operatorFactory,
  6. operatorName,
  7. vertexClass);
  8. streamNodes.put(vertexID, vertex);return vertex;}

SourceTransformation的整体流程相对还是比较简单

现在来看一下相对复杂和用的比较多的的OneInputTransformation

前面的步骤是一样的,这次进入的是OneInputTransformationTranslator类中的translateInternal方法

OneInputTransformation具有一个Input,指向它前一个transformation

我们进入到translateInternal方法里面,方法里面前面的逻辑和SourceTransformationTranslator的类似

重点在下面这一块

  1. for(Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))){
  2. streamGraph.addEdge(inputId, transformationId,0);}if(transformation instanceofPhysicalTransformation){
  3. streamGraph.setSupportsConcurrentExecutionAttempts(
  4. transformationId,((PhysicalTransformation<OUT>) transformation).isSupportsConcurrentExecutionAttempts());}

进入addEdge方法->addEdgeInternal方法

第一次进来走的逻辑是

addEdgeInternal方法代码清单

  1. privatevoidcreateActualEdge(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner<?> partitioner,OutputTag outputTag,StreamExchangeMode exchangeMode,IntermediateDataSetID intermediateDataSetId){//获取上游和下游节点StreamNode upstreamNode =getStreamNode(upStreamVertexID);StreamNode downstreamNode =getStreamNode(downStreamVertexID);//分区器// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.if(partitioner ==null&& upstreamNode.getParallelism()== downstreamNode.getParallelism()){
  2. partitioner =
  3. dynamic ?newForwardForUnspecifiedPartitioner<>():newForwardPartitioner<>();}elseif(partitioner ==null){
  4. partitioner =newRebalancePartitioner<Object>();}//删除部分代码if(exchangeMode ==null){
  5. exchangeMode =StreamExchangeMode.UNDEFINED;}int uniqueId =getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();StreamEdge edge =newStreamEdge(
  6. upstreamNode,
  7. downstreamNode,
  8. typeNumber,
  9. partitioner,
  10. outputTag,
  11. exchangeMode,
  12. uniqueId,
  13. intermediateDataSetId);//连接上游和下游节点getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}

解释

  • virtualSideOutputNodes和virtualPartitionNodes都是虚拟节点
  • 虚拟节点都是不会出现在StreamGraph流中的,在添加edge的时候,如果节点是虚拟节点,就会递归的寻找上游节点,直到找到一个非虚拟节点。
  • partitioner- 如果没有指定,而且上下游的并行度相同,就会使用ForwardPartitioner- 上下游的并行度不同的话(以前老版本就是直接抛出异常了),partitioner 是 ForwardForConsecutiveHashPartitioner 类型,则将 partitioner 转换为其内部的 hashPartitioner。否则,抛出 UnsupportedOperationException 异常

【都看到这了,点点赞点点关注呗,爱你们】😚😚

在这里插入图片描述

💬

✨ 正在努力的小新~
💖 超级爱分享,分享各种有趣干货!
👩‍💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~

作者:****不正经小新CSDN后端领域新星创作者 |阿里云专家博主

CSDN个人主页:不正经小新

🔎GZH

  1. 不正经小新

🎉欢迎关注🔎点赞👍收藏⭐️留言📝

标签: 大数据 flink 后端

本文转载自: https://blog.csdn.net/m0_46833224/article/details/143698131
版权归原作者 不正经小新 所有, 如有侵权,请联系我们删除。

“深入Flink StreamGraph:构建流处理拓扑的奥秘”的评论:

还没有评论