文章目录
概要
本篇主要讲解Flink程序是如何转换为JobGraph的,由于底层每一步实现都很复杂而且很多,我们这一篇只讲解大概流程,具体细节后续补充。
数据准备
这里是我准备的一个很简单的flink程序,我们以这个job任务为例,讲述flink的jobgraph是如何生成的
source->map->flatMap->keyby->sum->sink 总计六个算子
package org.example.demo;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class SocketWordCountDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> socketTextStream = env.fromData("data");
SingleOutputStreamOperator<String> mapStream = socketTextStream.map(String::toLowerCase).setParallelism(5);
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMapStream = mapStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(Tuple2.of(value, 1));
}
}).setParallelism(5);
KeyedStream<Tuple2<String, Integer>, String> keyedStream = flatMapStream.keyBy(x -> x.f0);
SingleOutputStreamOperator<Tuple2<String, Integer>> resultStream = keyedStream.sum(1).setParallelism(5);
resultStream.print("result:").setParallelism(5);
env.execute();
}
}
整体架构流程
1.首先当我们的程序执行到execute方法后,我们编写的flink程序算子就已经存储到StreamExecutionEnvironment这个对象的transformations集合里了
2.getStreamGraph()方法会将我们transformations里存储的各种transformation转换为StreamNode,最终生成StreamGraph
3.进入execute(streamGraph)
4.进入executeAsync(streamGraph)
5.继续调用executor.execute()
6.getJobGraph()
7.进入FlinkPipelineTranslationUtil.getJobGraph()
8.继续点击进入translateJobGraph方法
9.继续点streamGraph.getJobGraph,底层代码就是这样,一层一层的调用,要有耐心
10.createJobGraph()方法,入参是类加载器和StreamGraph和jobId
11.进入StreamJobGraphGenerator对象的构造器
12.可以很清楚的看到这里创建了一个JobGraph对象
13.但是从debug的结果上看,这个jobgraph很多属性还没有赋值,只是个空壳
14.回退出来,调用createJobGraph,对这个jobgraph进行属性的赋值,这一步才算真正的创建了JobGraph,我们进入这个方法
private JobGraph createJobGraph() {
preValidate();
jobGraph.setJobType(streamGraph.getJobType());
jobGraph.setDynamic(streamGraph.isDynamic());
jobGraph.enableApproximateLocalRecovery(
streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
// Generate deterministic hashes for the nodes in order to identify them across
// submission iff they didn't change.
Map<Integer, byte[]> hashes =
defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
setChaining(hashes, legacyHashes);
if (jobGraph.isDynamic()) {
setVertexParallelismsForDynamicGraphIfNecessary();
}
// Note that we set all the non-chainable outputs configuration here because the
// "setVertexParallelismsForDynamicGraphIfNecessary" may affect the parallelism of job
// vertices and partition-reuse
final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs =
new HashMap<>();
setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);
setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);
setPhysicalEdges();
markSupportingConcurrentExecutionAttempts();
validateHybridShuffleExecuteInBatchMode();
setSlotSharingAndCoLocation();
setManagedMemoryFraction(
Collections.unmodifiableMap(jobVertices),
Collections.unmodifiableMap(vertexConfigs),
Collections.unmodifiableMap(chainedConfigs),
id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
configureCheckpointing();
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
JobGraphUtils.prepareUserArtifactEntries(
streamGraph.getUserArtifacts().stream()
.collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
jobGraph.getJobID());
for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
distributedCacheEntries.entrySet()) {
jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
}
// set the ExecutionConfig last when it has been finalized
try {
jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
} catch (IOException e) {
throw new IllegalConfigurationException(
"Could not serialize the ExecutionConfig."
+ "This indicates that non-serializable types (like custom serializers) were registered");
}
jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());
addVertexIndexPrefixInVertexName();
setVertexDescription();
// Wait for the serialization of operator coordinators and stream config.
try {
FutureUtils.combineAll(
vertexConfigs.values().stream()
.map(
config ->
config.triggerSerializationAndReturnFuture(
serializationExecutor))
.collect(Collectors.toList()))
.get();
waitForSerializationFuturesAndUpdateJobVertices();
} catch (Exception e) {
throw new FlinkRuntimeException("Error in serialization.", e);
}
if (!streamGraph.getJobStatusHooks().isEmpty()) {
jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
}
return jobGraph;
}
15.这里的代码比较多,我复制出来,可以很清楚的看到在为JobGraph对象属性赋值,JobGraph就正式创建好了
16.在这么多代码中,最为核心的就是这一行代码,setChaining()这个方法从名字上看,就是设置算子链,在这个方法里,完成了JobGraph整体结构的创建和算子任务的合并,由于比较复杂,我这一篇就不赘述了,后面会出一篇单独讲
小结
JobGraph整体创建流程就是Transformations->StreamGraph->JobGraph
版权归原作者 BigDataLover520 所有, 如有侵权,请联系我们删除。