⭐简单说两句⭐
✨ 正在努力的小新~
💖 超级爱分享,分享各种有趣干货!
👩💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~作者:****不正经小新,CSDN后端领域新星创作者 |阿里云专家博主
CSDN个人主页:不正经小新
🔎GZH:
不正经小新
🎉欢迎关注🔎点赞👍收藏⭐️留言📝
Flink StreamGraph
什么是StreamGraph
StreamGraph
表示流式处理拓扑的类,它包含构建执行任务图所需的所有信息。
说白了就是我们写的代码直接生成的图,表示程序的拓扑结构
StreamGraph类图,可以看到这个类里面包含了执行任务所需的所有信息,比如状态后端,JobType(流or批)、checkpoint配置等等,我们今天从宏观层面看,不深入每个细节,先看大体,再慢慢深入研究~🥹🥹🥹
代码阅读
getStreamGraph方法代码清单
publicStreamGraphgetStreamGraph(){returngetStreamGraph(true);}
可以看到,这里穿了个默认参数
true
,他的作用就是清除之前注册 transformations
为什么要清除? 就是防止多次执行execute时,执行相同的操作。
getStreamGraphGenerator方法代码清单
privateStreamGraphGeneratorgetStreamGraphGenerator(List<Transformation<?>> transformations){if(transformations.size()<=0){thrownewIllegalStateException("No operators defined in streaming topology. Cannot execute.");}// Synchronize the cached file to config option PipelineOptions.CACHED_FILES because the// field cachedFile haven't been migrated to configuration.if(!getCachedFiles().isEmpty()){
configuration.set(PipelineOptions.CACHED_FILES,DistributedCache.parseStringFromCachedFiles(getCachedFiles()));}// We copy the transformation so that newly added transformations cannot intervene with the// stream graph generation.returnnewStreamGraphGenerator(newArrayList<>(transformations), config, checkpointCfg, configuration).setStateBackend(defaultStateBackend).setTimeCharacteristic(getStreamTimeCharacteristic()).setSlotSharingGroupResource(slotSharingGroupResources);}
getStreamGraphGenerator就是是生成一个 StreamGraphGenerator 对象,用于创建流处理拓扑图
下面是对这个方法的解释
- 检查 transformations 列表是否为空:若为空,直接给你抛出异常(可演示)
- 同步缓存文件到配置选项:如果有缓存文件,则将这些文件同步到配置选项 PipelineOptions.CACHED_FILES 中。这是因为 cachedFile 字段还没有迁移到配置中。(这算是个小优化吧~)
- 创建并返回 StreamGraphGenerator 对象
generate() 代码清单
publicStreamGraphgenerate(){//传入配置
streamGraph =newStreamGraph(
configuration, executionConfig, checkpointConfig, savepointRestoreSettings);//判断是否是批处理模式
shouldExecuteInBatchMode =shouldExecuteInBatchMode();//配置StreamGraphconfigureStreamGraph(streamGraph);//已经转换的Transformation
alreadyTransformed =newIdentityHashMap<>();//遍历所有的Transformationfor(Transformation<?> transformation : transformations){transform(transformation);}//设置插槽共享组资源
streamGraph.setSlotSharingGroupResource(slotSharingGroupResources);setFineGrainedGlobalStreamExchangeMode(streamGraph);//转换成LineageGraphLineageGraph lineageGraph =LineageGraphUtils.convertToLineageGraph(transformations);
streamGraph.setLineageGraph(lineageGraph);//遍历 streamGraph 中的所有 StreamNode 节点,//并检查每个节点的输入边是否需要禁用非对齐检查点。//如果需要禁用,则将这些输入边的 supportsUnalignedCheckpoints 属性设置为 falsefor(StreamNode node : streamGraph.getStreamNodes()){if(node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)){for(StreamEdge edge : node.getInEdges()){
edge.setSupportsUnalignedCheckpoints(false);}}}//清理与返回finalStreamGraph builtStreamGraph = streamGraph;
alreadyTransformed.clear();
alreadyTransformed =null;
streamGraph =null;return builtStreamGraph;}
下面就来重点研究下transform这个方法
在看之前,我们先瞅瞅这个transformations里面有哪些元素
transform代码清单
privateCollection<Integer>transform(Transformation<?> transform){//检查是否已经转换,如果是则直接返回if(alreadyTransformed.containsKey(transform)){return alreadyTransformed.get(transform);}
LOG.debug("Transforming "+ transform);if(transform.getMaxParallelism()<=0){// if the max parallelism hasn't been set, then first use the job wide max parallelism// from the ExecutionConfig.int globalMaxParallelismFromConfig = executionConfig.getMaxParallelism();if(globalMaxParallelismFromConfig >0){
transform.setMaxParallelism(globalMaxParallelismFromConfig);}}//省略部分代码// call at least once to trigger exceptions about MissingTypeInfo
transform.getOutputType();//根据transform获取具体的实现类@SuppressWarnings("unchecked")finalTransformationTranslator<?,Transformation<?>> translator =(TransformationTranslator<?,Transformation<?>>)
translatorMap.get(transform.getClass());Collection<Integer> transformedIds;if(translator !=null){//根据transform的具体类型,走不同的处理
transformedIds =translate(translator, transform);}else{
transformedIds =legacyTransform(transform);}// need this check because the iterate transformation adds itself before// transforming the feedback edgesif(!alreadyTransformed.containsKey(transform)){
alreadyTransformed.put(transform, transformedIds);}return transformedIds;}
接下来进入translate方法一探究竟
translate方法代码清单 先看【SourceTransformation】的流程
privateCollection<Integer>translate(finalTransformationTranslator<?,Transformation<?>> translator,finalTransformation<?> transform){checkNotNull(translator);checkNotNull(transform);//获取给定的父 Transformation 对象集合中每个 Transformation 对应的节点 ID 集合,// 【有的话会递归调用 transform 方法】finalList<Collection<Integer>> allInputIds =getParentInputIds(transform.getInputs());// the recursive call might have already transformed thisif(alreadyTransformed.containsKey(transform)){return alreadyTransformed.get(transform);}finalString slotSharingGroup =determineSlotSharingGroup(
transform.getSlotSharingGroup().isPresent()? transform.getSlotSharingGroup().get().getName():null,
allInputIds.stream().flatMap(Collection::stream).collect(Collectors.toList()));finalTransformationTranslator.Context context =newContextImpl(this, streamGraph, slotSharingGroup, configuration);return shouldExecuteInBatchMode
? translator.translateForBatch(transform, context)//将给定的 Transformation 对象转换为其在流式执行模式下的运行时实现,并进行配置: translator.translateForStreaming(transform, context);}
根据断点走到了SourceTransformationTranslator类中的translateInternal方法里面
translateInternal方法代码清单
privateCollection<Integer>translateInternal(finalSourceTransformation<OUT,SplitT,EnumChkT> transformation,finalContext context,boolean emitProgressiveWatermarks){checkNotNull(transformation);checkNotNull(context);finalStreamGraph streamGraph = context.getStreamGraph();finalString slotSharingGroup = context.getSlotSharingGroup();finalint transformationId = transformation.getId();finalExecutionConfig executionConfig = streamGraph.getExecutionConfig();SourceOperatorFactory<OUT> operatorFactory =newSourceOperatorFactory<>(
transformation.getSource(),
transformation.getWatermarkStrategy(),
emitProgressiveWatermarks);//设置连接策略,通常是 AlWAYS
operatorFactory.setChainingStrategy(transformation.getChainingStrategy());
operatorFactory.setCoordinatorListeningID(transformation.getCoordinatorListeningID());//添加数据源
streamGraph.addSource(
transformationId,
slotSharingGroup,
transformation.getCoLocationGroupKey(),
operatorFactory,null,
transformation.getOutputType(),"Source: "+ transformation.getName());//获取并设置并行度finalint parallelism =
transformation.getParallelism()!=ExecutionConfig.PARALLELISM_DEFAULT
? transformation.getParallelism(): executionConfig.getParallelism();
streamGraph.setParallelism(
transformationId, parallelism, transformation.isParallelismConfigured());
streamGraph.setMaxParallelism(transformationId, transformation.getMaxParallelism());
streamGraph.setSupportsConcurrentExecutionAttempts(
transformationId, transformation.isSupportsConcurrentExecutionAttempts());returnCollections.singleton(transformationId);}
我们看下addSource方法
这个方法逻辑其实不多,核心在addOperator这个方法里面
addOperator代码清单
private<IN, OUT>voidaddOperator(Integer vertexID,@NullableString slotSharingGroup,@NullableString coLocationGroup,StreamOperatorFactory<OUT> operatorFactory,TypeInformation<IN> inTypeInfo,TypeInformation<OUT> outTypeInfo,String operatorName,Class<?extendsTaskInvokable> invokableClass){//下面有说明addNode(
vertexID,
slotSharingGroup,
coLocationGroup,
invokableClass,
operatorFactory,
operatorName);//(设置输入输出类型的序列化器)setSerializers(vertexID,createSerializer(inTypeInfo),null,createSerializer(outTypeInfo));//(设置输出类型)if(operatorFactory.isOutputTypeConfigurable()&& outTypeInfo !=null){// sets the output type which must be know at StreamGraph creation time
operatorFactory.setOutputType(outTypeInfo, executionConfig);}if(operatorFactory.isInputTypeConfigurable()){
operatorFactory.setInputType(inTypeInfo, executionConfig);}if(LOG.isDebugEnabled()){
LOG.debug("Vertex: {}", vertexID);}}
addOperator里面的第一个逻辑是addNode (StreamNode)
StreamNode包含程序中的运算符和所有属性
addNode方法代码清单
protectedStreamNodeaddNode(Integer vertexID,@NullableString slotSharingGroup,@NullableString coLocationGroup,Class<?extendsTaskInvokable> vertexClass,@NullableStreamOperatorFactory<?> operatorFactory,String operatorName){if(streamNodes.containsKey(vertexID)){thrownewRuntimeException("Duplicate vertexID "+ vertexID);}//节点 ID、槽共享组、协同位置组、操作符工厂、操作符名称和任务可调用类StreamNode vertex =newStreamNode(
vertexID,
slotSharingGroup,
coLocationGroup,
operatorFactory,
operatorName,
vertexClass);
streamNodes.put(vertexID, vertex);return vertex;}
SourceTransformation的整体流程相对还是比较简单
现在来看一下相对复杂和用的比较多的的OneInputTransformation
前面的步骤是一样的,这次进入的是OneInputTransformationTranslator类中的translateInternal方法
OneInputTransformation具有一个Input,指向它前一个transformation
我们进入到translateInternal方法里面,方法里面前面的逻辑和SourceTransformationTranslator的类似
重点在下面这一块
for(Integer inputId : context.getStreamNodeIds(parentTransformations.get(0))){
streamGraph.addEdge(inputId, transformationId,0);}if(transformation instanceofPhysicalTransformation){
streamGraph.setSupportsConcurrentExecutionAttempts(
transformationId,((PhysicalTransformation<OUT>) transformation).isSupportsConcurrentExecutionAttempts());}
进入addEdge方法->addEdgeInternal方法
第一次进来走的逻辑是
addEdgeInternal方法代码清单
privatevoidcreateActualEdge(Integer upStreamVertexID,Integer downStreamVertexID,int typeNumber,StreamPartitioner<?> partitioner,OutputTag outputTag,StreamExchangeMode exchangeMode,IntermediateDataSetID intermediateDataSetId){//获取上游和下游节点StreamNode upstreamNode =getStreamNode(upStreamVertexID);StreamNode downstreamNode =getStreamNode(downStreamVertexID);//分区器// If no partitioner was specified and the parallelism of upstream and downstream// operator matches use forward partitioning, use rebalance otherwise.if(partitioner ==null&& upstreamNode.getParallelism()== downstreamNode.getParallelism()){
partitioner =
dynamic ?newForwardForUnspecifiedPartitioner<>():newForwardPartitioner<>();}elseif(partitioner ==null){
partitioner =newRebalancePartitioner<Object>();}//删除部分代码if(exchangeMode ==null){
exchangeMode =StreamExchangeMode.UNDEFINED;}int uniqueId =getStreamEdges(upstreamNode.getId(), downstreamNode.getId()).size();StreamEdge edge =newStreamEdge(
upstreamNode,
downstreamNode,
typeNumber,
partitioner,
outputTag,
exchangeMode,
uniqueId,
intermediateDataSetId);//连接上游和下游节点getStreamNode(edge.getSourceId()).addOutEdge(edge);getStreamNode(edge.getTargetId()).addInEdge(edge);}
解释
- virtualSideOutputNodes和virtualPartitionNodes都是虚拟节点
- 虚拟节点都是不会出现在StreamGraph流中的,在添加
edge
的时候,如果节点是虚拟节点,就会递归的寻找上游节点,直到找到一个非虚拟节点。 - partitioner- 如果没有指定,而且上下游的并行度相同,就会使用ForwardPartitioner- 上下游的并行度不同的话(以前老版本就是直接抛出异常了),partitioner 是 ForwardForConsecutiveHashPartitioner 类型,则将 partitioner 转换为其内部的 hashPartitioner。否则,抛出 UnsupportedOperationException 异常
【都看到这了,点点赞点点关注呗,爱你们】😚😚
💬
✨ 正在努力的小新~
💖 超级爱分享,分享各种有趣干货!
👩💻 提供:模拟面试 | 简历诊断 | 独家简历模板
🌈 感谢关注,关注了你就是我的超级粉丝啦!
🔒 以下内容仅对你可见~
作者:****不正经小新,CSDN后端领域新星创作者 |阿里云专家博主
CSDN个人主页:不正经小新
🔎GZH:
不正经小新
🎉欢迎关注🔎点赞👍收藏⭐️留言📝
版权归原作者 不正经小新 所有, 如有侵权,请联系我们删除。