点击这里查看 Flink 1.13 源码解析 目录汇总
点击查看相关章节 Flink 1.13 源码解析——JobManager启动流程 WebMonitorEndpoint启动
Flink 1.13 源码解析——Flink作业提交流程 下
目录
前言
在之前的章节里我们针对Flink集群主、从节点的启动分别进行了源码级别的分析。我们总说Flink可以将一个编写好的代码,构建成高级抽象,那么高级抽象是什么?我认为是:针对一个**任意类型数据**的**任意类型计算逻辑**的**任务复杂和数据规模**的计算应用程序编程模型的抽象!
在接下来的几章中,我们将分析Flink作业的提交流程、Flink JobMaster与JobManager的交互以及Flink StreamGraph、JobGraph、ExecutionGraph的构建和转换流程。本章将分析Flink作业的提交流程。
一、Flink Job 提交以及运行的前置工作
首先我们在提交Flink job的时候会执行flink run命令,此时会执行flink.sh脚本,我们通过该脚本文件中的以下内容确定flink中的入口类是什么:
exec "${JAVA_RUN}" $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@"
接下来我们去看这个org.apache.flink.client.cli.CliFrontend类的main方法:
public static void main(final String[] args) {
EnvironmentInformation.logEnvironmentInfo(LOG, "Command Line Client", args);
// 1. find the configuration directory
// TODO 获取配置目录
final String configurationDirectory = getConfigurationDirectoryFromEnv();
// 2. load the global configuration
// TODO 解析配置文件 flink-conf
final Configuration configuration =
GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
// TODO 构造解析args命令行的对象,里面构建了三种对象
final List<CustomCommandLine> customCommandLines =
loadCustomCommandLines(configuration, configurationDirectory);
int retCode = 31;
try {
final CliFrontend cli = new CliFrontend(configuration, customCommandLines);
// TODO
SecurityUtils.install(new SecurityConfiguration(cli.configuration));
retCode = SecurityUtils.getInstalledContext().runSecured(() -> cli.parseAndRun(args));
} catch (Throwable t) {
final Throwable strippedThrowable =
ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
LOG.error("Fatal error while running command line interface.", strippedThrowable);
strippedThrowable.printStackTrace();
} finally {
System.exit(retCode);
}
}
可以看到在这里做了以下一些工作:
1、获取配置文件路径
2、解析配置文件,构建configuration对象
3、将命令行参数构建为CustomCommandLine集合
4、解析命令行参数并执行
我们来看最重要的一步:解析命令行参数并执行,点进 cli.parseAndRun(args)方法:
public int parseAndRun(String[] args) {
// check for action
// TODO 检查命令参数正确性
if (args.length < 1) {
CliFrontendParser.printHelp(customCommandLines);
System.out.println("Please specify an action.");
return 1;
}
// get action
// TODO 从命令行flink 后面的参数解析要执行的动作,例如flink run,动作就是run
String action = args[0];
// remove action from parameters
final String[] params = Arrays.copyOfRange(args, 1, args.length);
try {
// do action
switch (action) {
case ACTION_RUN:
// TODO 如果是run
run(params);
return 0;
case ACTION_RUN_APPLICATION:
runApplication(params);
return 0;
case ...
case "-h":
case "--help":
CliFrontendParser.printHelp(customCommandLines);
return 0;
case "-v":
case "--version":
String version = EnvironmentInformation.getVersion();
String commitID = EnvironmentInformation.getRevisionInformation().commitId;
System.out.print("Version: " + version);
System.out.println(
commitID.equals(EnvironmentInformation.UNKNOWN)
? ""
: ", Commit ID: " + commitID);
return 0;
default:
System.out.printf("\"%s\" is not a valid action.\n", action);
... ...
return 1;
}
} catch (....) {
...
}
}
从上面这段代码可以看出,在这里解析命令行的第一个参数,我们以run命令为例,当解析到flink后面跟着的命令参数为run是调用 run(params),我们点进来继续看:
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
// TODO 真正开始解析命令行参数
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
// TODO 如果是 flink -h,则打印flink帮助文档
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
// 创建程序参数对象
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
// TODO 获取job的jar包和其他依赖jar
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
// TODO 将解析出来的参数封装为配置对象
final Configuration effectiveConfiguration =
getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
// TODO 获取打包的程序
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
// TODO 执行程序
executeProgram(effectiveConfiguration, program);
}
}
在这里开始解析所有的命令行参数,如果判断命令行里带有-h的命令则打印帮助日志,然后再根据命令参数创建程序参数对象、获取job所依赖的jar以及job本身的jar,然后将解析出来的各种配置、参数封装为一个配置对象,并根据这个对象构建我们的job程序,最后执行该程序,我们看executeProgram方法时如何执行这个程序的:
protected void executeProgram(final Configuration configuration, final PackagedProgram program)
throws ProgramInvocationException {
// TODO
ClientUtils.executeProgram(
new DefaultExecutorServiceLoader(), configuration, program, false, false);
}
我们再点进ClientUtils.executeProgram方法里:
public static void executeProgram(
PipelineExecutorServiceLoader executorServiceLoader,
Configuration configuration,
PackagedProgram program,
boolean enforceSingleJobExecution,
boolean suppressSysout)
throws ProgramInvocationException {
checkNotNull(executorServiceLoader);
final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(userCodeClassLoader);
LOG.info(
"Starting program (detached: {})",
!configuration.getBoolean(DeploymentOptions.ATTACHED));
// TODO 配置执行环境
ContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
StreamContextEnvironment.setAsContext(
executorServiceLoader,
configuration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
try {
// TODO 真正提交执行
program.invokeInteractiveModeForExecution();
} finally {
ContextEnvironment.unsetAsContext();
StreamContextEnvironment.unsetAsContext();
}
} finally {
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}
在这个方法里,首先根据传入的配置对象开始配置执行环境,最后通过 program.invokeInteractiveModeForExecution()正式开始执行Job,我们进入这个方法:
public void invokeInteractiveModeForExecution() throws ProgramInvocationException {
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
try {
// TODO 调用自己编写的应用程序的main方法
callMainMethod(mainClass, args);
} finally {
FlinkSecurityManager.unmonitorUserSystemExitForCurrentThread();
}
}
再进入callMainMethod(mainClass, args)方法:
private static void callMainMethod(Class<?> entryClass, String[] args)
throws ProgramInvocationException {
Method mainMethod;
if (!Modifier.isPublic(entryClass.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " must be public.");
}
// TODO 反射拿到main的实例
try {
mainMethod = entryClass.getMethod("main", String[].class);
} catch (NoSuchMethodException e) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " has no main(String[]) method.");
} catch (Throwable t) {
throw new ProgramInvocationException(
"Could not look up the main(String[]) method from the class "
+ entryClass.getName()
+ ": "
+ t.getMessage(),
t);
}
if (!Modifier.isStatic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-static main method.");
}
if (!Modifier.isPublic(mainMethod.getModifiers())) {
throw new ProgramInvocationException(
"The class " + entryClass.getName() + " declares a non-public main method.");
}
try {
// TODO 调用main 方法
mainMethod.invoke(null, (Object) args);
} catch (IllegalArgumentException e) {
throw new ProgramInvocationException(
"Could not invoke the main method, arguments are not matching.", e);
} catch (IllegalAccessException e) {
throw new ProgramInvocationException(
"Access to the main method was denied: " + e.getMessage(), e);
} catch (InvocationTargetException e) {
Throwable exceptionInMethod = e.getTargetException();
if (exceptionInMethod instanceof Error) {
throw (Error) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramParametrizationException) {
throw (ProgramParametrizationException) exceptionInMethod;
} else if (exceptionInMethod instanceof ProgramInvocationException) {
throw (ProgramInvocationException) exceptionInMethod;
} else {
throw new ProgramInvocationException(
"The main method caused an error: " + exceptionInMethod.getMessage(),
exceptionInMethod);
}
} catch (Throwable t) {
throw new ProgramInvocationException(
"An error occurred while invoking the program's main method: " + t.getMessage(),
t);
}
}
可以看到在这里,通过反射拿到了我们主程序jar的main方法实例,再通过mainMethod.invoke来执行我们的main方法,到这里前置准备工作就完成了,接下来我们看在我们的主程序中是如何将我们编写的逻辑提交执行的。
二、Flink Job的提交
2.1、StreamExecutionEnvironment的构建
我们以Flink-example-streaming中提供的案例来举例,也正如我们平时编写Flink作业时的编程模型,首先我们来看Flink入口类的构建流程:
/* TODO
1. 初始化得到StateBackend
2. 解析所有checkpoint相关配置
*/
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
我们点进getExecutionEnvironment方法,再点进getExecutionEnvironment:
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
// TODO 构建StreamExecutionEnvironment
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}
我们再次进入factory.createExecutionEnvironment方法,选择StreamContextEnvironment实现:
public static void setAsContext(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userCodeClassLoader,
final boolean enforceSingleJobExecution,
final boolean suppressSysout) {
StreamExecutionEnvironmentFactory factory =
conf -> {
Configuration mergedConfiguration = new Configuration();
mergedConfiguration.addAll(configuration);
mergedConfiguration.addAll(conf);
// TODO 初始化StreamContextEnvironment
return new StreamContextEnvironment(
executorServiceLoader,
mergedConfiguration,
userCodeClassLoader,
enforceSingleJobExecution,
suppressSysout);
};
initializeContextEnvironment(factory);
}
可以看到,在这里初始化了StreamContextEnvironment,我们点进他的构造方法,选择父类的构造方法:
@PublicEvolving
public StreamExecutionEnvironment(
final PipelineExecutorServiceLoader executorServiceLoader,
final Configuration configuration,
final ClassLoader userClassloader) {
this.executorServiceLoader = checkNotNull(executorServiceLoader);
this.configuration = new Configuration(checkNotNull(configuration));
this.userClassloader =
userClassloader == null ? getClass().getClassLoader() : userClassloader;
/*
TODO 进行各种组件配置
1.初始化得到StateBackend
2.初始化checkpoint相关参数
*/
this.configure(this.configuration, this.userClassloader);
}
我们在点进this.configure方法:
@PublicEvolving
public void configure(ReadableConfig configuration, ClassLoader classLoader) {
configuration
.getOptional(StreamPipelineOptions.TIME_CHARACTERISTIC)
.ifPresent(this::setStreamTimeCharacteristic);
// TODO 加载得到StateBackEnd
Optional.ofNullable(loadStateBackend(configuration, classLoader))
.ifPresent(this::setStateBackend);
configuration
.getOptional(PipelineOptions.OPERATOR_CHAINING)
.ifPresent(c -> this.isChainingEnabled = c);
configuration
.getOptional(ExecutionOptions.BUFFER_TIMEOUT)
.ifPresent(t -> this.setBufferTimeout(t.toMillis()));
configuration
.getOptional(DeploymentOptions.JOB_LISTENERS)
.ifPresent(listeners -> registerCustomListeners(classLoader, listeners));
configuration
.getOptional(PipelineOptions.CACHED_FILES)
.ifPresent(
f -> {
this.cacheFile.clear();
this.cacheFile.addAll(DistributedCache.parseCachedFilesFromString(f));
});
configuration
.getOptional(ExecutionOptions.RUNTIME_MODE)
.ifPresent(
runtimeMode ->
this.configuration.set(ExecutionOptions.RUNTIME_MODE, runtimeMode));
configuration
.getOptional(ExecutionOptions.SORT_INPUTS)
.ifPresent(
sortInputs ->
this.getConfiguration()
.set(ExecutionOptions.SORT_INPUTS, sortInputs));
configuration
.getOptional(ExecutionOptions.USE_BATCH_STATE_BACKEND)
.ifPresent(
sortInputs ->
this.getConfiguration()
.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, sortInputs));
configuration
.getOptional(PipelineOptions.NAME)
.ifPresent(jobName -> this.getConfiguration().set(PipelineOptions.NAME, jobName));
config.configure(configuration, classLoader);
// TODO checkpoint相关参数的解析和配置
/*
TODO 1、从configuration对象中解析各种跟checkpoint有关的参数放置在CheckpointConfig对象中
2、将来解析各种算子,构造StreamGraph的时候,这个checkpointConfig会传递给StreamGraph
3、由StreamGraph去构造JobGraph的时候,会继续传递
*/
checkpointCfg.configure(configuration);
}
可以看到这里做了两件主要的工作:
- 加载得到StateBackend
- 解析和配置checkpoint的相关参数 1. 从configuration对象中解析各种跟checkpoint有关的参数放置在CheckpointConfig对象中2. 将来解析各种算子,构造StreamGraph的时候,这个checkpointConfig会传递给StreamGraph3. 由StreamGraph去构造JobGraph的时候,会继续传递
我们首先来看状态后端的加载,点击loadStateBackend(configuration, classLoader)方法:
private StateBackend loadStateBackend(ReadableConfig configuration, ClassLoader classLoader) {
try {
// TODO 获取配置中有关StateBackend的相关配置,构建StateBackend
return StateBackendLoader.loadStateBackendFromConfig(configuration, classLoader, null);
} catch (DynamicCodeLoadingException | IOException e) {
throw new WrappingRuntimeException(e);
}
}
在点进StateBackendLoader.loadStateBackendFromConfig方法:
public static StateBackend loadStateBackendFromConfig(
ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger)
throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
checkNotNull(config, "config");
checkNotNull(classLoader, "classLoader");
// TODO 获取StateBackend的相关配置
final StateBackend backend =
loadUnwrappedStateBackendFromConfig(config, classLoader, logger);
checkArgument(
!(backend instanceof DelegatingStateBackend),
"expecting non-delegating state backend");
if (config.get(CheckpointingOptions.ENABLE_STATE_CHANGE_LOG) && (backend != null)) {
return loadChangelogStateBackend(backend, classLoader);
} else {
return backend;
}
}
可以看到,在这里获取了StateBackend的相关配置,并在最后将StateBackend对象返回了出去,我们点进loadUnwrappedStateBackendFromConfig方法:
private static StateBackend loadUnwrappedStateBackendFromConfig(
ReadableConfig config, ClassLoader classLoader, @Nullable Logger logger)
throws IllegalConfigurationException, DynamicCodeLoadingException, IOException {
checkNotNull(config, "config");
checkNotNull(classLoader, "classLoader");
final String backendName = config.get(StateBackendOptions.STATE_BACKEND);
if (backendName == null) {
return null;
}
// by default the factory class is the backend name
String factoryClassName = backendName;
switch (backendName.toLowerCase()) {
case MEMORY_STATE_BACKEND_NAME:
MemoryStateBackend backend =
new MemoryStateBackendFactory().createFromConfig(config, classLoader);
if (logger != null) {
logger.warn(
"MemoryStateBackend has been deprecated. Please use 'hashmap' state "
+ "backend instead with JobManagerCheckpointStorage for equivalent "
+ "functionality");
logger.info("State backend is set to job manager {}", backend);
}
return backend;
case FS_STATE_BACKEND_NAME:
if (logger != null) {
logger.warn(
"{} state backend has been deprecated. Please use 'hashmap' state "
+ "backend instead.",
backendName.toLowerCase());
}
// fall through and use the HashMapStateBackend instead which
// utilizes the same HeapKeyedStateBackend runtime implementation.
case HASHMAP_STATE_BACKEND_NAME:
HashMapStateBackend hashMapStateBackend =
new HashMapStateBackendFactory().createFromConfig(config, classLoader);
if (logger != null) {
logger.info("State backend is set to heap memory {}", hashMapStateBackend);
}
return hashMapStateBackend;
// TODO 如果是rocksdb 则 RocksDB
case ROCKSDB_STATE_BACKEND_NAME:
factoryClassName = ROCKSDB_STATE_BACKEND_FACTORY;
// fall through to the 'default' case that uses reflection to load the backend
// that way we can keep RocksDB in a separate module
default:
... ...
return factory.createFromConfig(config, classLoader);
}
}
可以看到这里对配置中的StateBackend相关配置进行了匹配,在Flink1.13版本以前,Flink支持三种状态后端的配置:
- JobManager
- filesystem
- rocksdb
但从Flink1.13开始,状态后端只支持两种,一种是HashMap,一种是RocksDB。
1、HashMap这种方式就是我们之前所说的,把状态存放在内存里。具体实现上,哈希表状态后端在内部会直接把状态当作对象(objects),保存在 Taskmanager 的 JVM 堆(heap)上。普通的状态,以及窗口中收集的数据和触发器(triggers),都会以键值对(key-value)的形式存储起来,所以底层是一个哈希表(HashMap),
这种状态后端也因此得名。
2、HashMapStateBackend 是内存计算,读写速度非常快;但是,状态的大小会受到集群可用内存的限制,如果应用的状态随着时间不停地增长,就会耗尽内存资源。而 RocksDB 是硬盘存储,所以可以根据可用的磁盘空间进行扩展,而且是唯一支持增量检查点的状态后端,所以它非常适合于超级海量状态的存储。不过由于每个状态的读写都需要做序列化/反序列化,而且可能需要直接从磁盘读取数据,这就会导致性能的降低,平均读写性能要比 HashMapStateBackend 慢一个数量级。
到此为止,我们StreamExecutionEnvironment的构建就已经完成了,接下来将进入算子的构建环节。
2.2、算子的构建
在看代码之前,我们先来聊几个概念,也就是算子的转化过程。
我们在算子内写的计算逻辑是一个Function,而算子的工作就是将这个Function封装为一个StreamOperator,最终StreamOperator也将被封装为一个Transformation,然后加入到env的Transformation的集合中,总结来说关系是这样: Function => StreamOperator => Transformation。中间还涉及到一个对象是DataStream,我们可以把DataStream看做是Function的载体,作为不同算子之间连接的桥梁。
接下来我们来看代码,在StreamExecutionEnvironment构建完成之后,我们要开始进行Source的构建,正如代码中的:
env.readTextFile(input)
这个readTextFile也是一个算子,我们以他为例来看看算子内部所做的工作,我们点进这个readTextFile方法,来到这里:
public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(filePath),
"The file path must not be null or blank.");
TextInputFormat format = new TextInputFormat(new Path(filePath));
format.setFilesFilter(FilePathFilter.createDefaultFilter());
TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
format.setCharsetName(charsetName);
// TODO
return readFile(format, filePath, FileProcessingMode.PROCESS_ONCE, -1, typeInfo);
}
我们继续看readFile方法:
@PublicEvolving
public <OUT> DataStreamSource<OUT> readFile(
FileInputFormat<OUT> inputFormat,
String filePath,
FileProcessingMode watchType,
long interval,
TypeInformation<OUT> typeInformation) {
Preconditions.checkNotNull(inputFormat, "InputFormat must not be null.");
Preconditions.checkArgument(
!StringUtils.isNullOrWhitespaceOnly(filePath),
"The file path must not be null or blank.");
inputFormat.setFilePath(filePath);
// TODO
return createFileInput(
inputFormat, typeInformation, "Custom File Source", watchType, interval);
}
再进入createFileInput方法:
private <OUT> DataStreamSource<OUT> createFileInput(
FileInputFormat<OUT> inputFormat,
TypeInformation<OUT> typeInfo,
String sourceName,
FileProcessingMode monitoringMode,
long interval) {
Preconditions.checkNotNull(inputFormat, "Unspecified file input format.");
Preconditions.checkNotNull(typeInfo, "Unspecified output type information.");
Preconditions.checkNotNull(sourceName, "Unspecified name for the source.");
Preconditions.checkNotNull(monitoringMode, "Unspecified monitoring mode.");
Preconditions.checkArgument(
monitoringMode.equals(FileProcessingMode.PROCESS_ONCE)
|| interval >= ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL,
"The path monitoring interval cannot be less than "
+ ContinuousFileMonitoringFunction.MIN_MONITORING_INTERVAL
+ " ms.");
// TODO 生成一个function
ContinuousFileMonitoringFunction<OUT> monitoringFunction =
new ContinuousFileMonitoringFunction<>(
inputFormat, monitoringMode, getParallelism(), interval);
ContinuousFileReaderOperatorFactory<OUT, TimestampedFileInputSplit> factory =
new ContinuousFileReaderOperatorFactory<>(inputFormat);
final Boundedness boundedness =
monitoringMode == FileProcessingMode.PROCESS_ONCE
? Boundedness.BOUNDED
: Boundedness.CONTINUOUS_UNBOUNDED;
// TODO 生成DataStreamSource
// TODO Function => StreamOperator => Transformation
SingleOutputStreamOperator<OUT> source =
// TODO 将function封装为DataStream
addSource(monitoringFunction, sourceName, null, boundedness)
// TODO 执行转换再转为Transformation,将得到的Transformation加入 Transformations集合中
.transform("Split Reader: " + sourceName, typeInfo, factory);
return new DataStreamSource<>(source);
}
到这里,就能开始看到算子的转化步骤了:
- 通过new ContinuousFileMonitoringFunction初始化了一个Function对象
- 通过addSource(...)方法将Function封装为StreamOperator,
- 然后通过DataStream承载StreamOperator交给transform方法封装为Transformation,并交给env的Transformations集合
我们首先来看Function封装为StreamOperator,并交给DataStream的过程,我们点进addSource(...)方法:
private <OUT> DataStreamSource<OUT> addSource(
final SourceFunction<OUT> function,
final String sourceName,
@Nullable final TypeInformation<OUT> typeInfo,
final Boundedness boundedness) {
checkNotNull(function);
checkNotNull(sourceName);
checkNotNull(boundedness);
TypeInformation<OUT> resolvedTypeInfo =
getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);
boolean isParallel = function instanceof ParallelSourceFunction;
clean(function);
/*
TODO 注意几个要点:
1.StreamSource 本身是一个StreamOperator
2.StreamSource 包装了Function
3. StreamSource作为成员变量被封装成一个Transformation
所以三者的关系: Function => StreamOperator => Transformation
*/
final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
return new DataStreamSource<>(
this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);
}
可以看到,在这里将Function封装为了一个StreamSource,根据继承关系可以看到StreamSource本身就是一个StreamOperator。接下来又将构建好的StreamSource交给了DataStreamSource,DataStreamSource的顶级父类就是一个DataStream。
在构建完成DataStream之后,我们来看Transform方法,点进来:
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperatorFactory<T, R> operatorFactory) {
// TODO
return doTransform(operatorName, outTypeInfo, operatorFactory);
}
继续进入doTransform方法:
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// TODO 构建出了Transformation
OneInputTransformation<T, R> resultTransform =
new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream =
new SingleOutputStreamOperator(environment, resultTransform);
// TODO 将封装了Transformation的StreamOperator加入Transformations集合
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
可以看到,在这个方法中,我们封装出了Transformation对象,并在方法尾部将Transformation对象添加进了env的Transformations集合中,然后又将我们这个Transformation对象封装进了DataStream里,继续往下游传输。
到这里,readTextFile算子的构建就已经完成了,接下来我们继续来看下一个算子:
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
// TODO
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(value -> value.f0)
.sum(1);
我们首先进入flatMap算子中:
// TODO 可以看到这里的入参为一个Function, 每一个StreamOperator都包含了一个Function
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper) {
TypeInformation<R> outType =
TypeExtractor.getFlatMapReturnTypes(
clean(flatMapper), getType(), Utils.getCallLocationName(), true);
// TODO
return flatMap(flatMapper, outType);
}
这里可以看到,我们将一个Function对象作为参数传入了进来,我们继续看flatMap方法:
public <R> SingleOutputStreamOperator<R> flatMap(
FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
// TODO
return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
点进transform里:
@PublicEvolving
public <R> SingleOutputStreamOperator<R> transform(
String operatorName,
TypeInformation<R> outTypeInfo,
OneInputStreamOperator<T, R> operator) {
// TODO
return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));
}
再进入doTransformation:
protected <R> SingleOutputStreamOperator<R> doTransform(
String operatorName,
TypeInformation<R> outTypeInfo,
StreamOperatorFactory<R> operatorFactory) {
// read the output type of the input Transform to coax out errors about MissingTypeInfo
transformation.getOutputType();
// TODO 构建出了Transformation
OneInputTransformation<T, R> resultTransform =
new OneInputTransformation<>(
this.transformation,
operatorName,
operatorFactory,
outTypeInfo,
environment.getParallelism());
@SuppressWarnings({"unchecked", "rawtypes"})
SingleOutputStreamOperator<R> returnStream =
new SingleOutputStreamOperator(environment, resultTransform);
// TODO 将封装了Transformation的StreamOperator加入Transformations集合
getExecutionEnvironment().addOperator(resultTransform);
return returnStream;
}
可以看到,我们又回到了这里。基本上所有的算子都是这个处理逻辑,通过Function构建StreamOperator,再构建Transformation,在添加完Transformations集合后封装为DataStream返回继续传递给下游算子。
2.3、env.execute方法的实现
在完成了一系列算子的计算和转换之后,所有的算子也以Transformation的形式添加到了env的Transformations集合中,接下来我们来看env.execute方法的实现,我们点进这个方法:
public JobExecutionResult execute(String jobName) throws Exception {
Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");
// TODO 获取到StreamGraph,并执行StreamGraph
return execute(getStreamGraph(jobName));
}
可以看到,这里会构建一个StreamGraph,然后再去执行这个StreamGraph。关于StreamGraph的构建流程我将在下一章中详细分析,这里就先以job的提交流程为主,我们来看这个execute方法:
@Internal
public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {
// 异步执行StreamGraph
final JobClient jobClient = executeAsync(streamGraph);
try {
final JobExecutionResult jobExecutionResult;
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
// TODO 通过get方法阻塞等待StreamGraph的提交结果
jobExecutionResult = jobClient.getJobExecutionResult().get();
} else {
jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());
}
jobListeners.forEach(
jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));
return jobExecutionResult;
} catch (Throwable t) {
// get() on the JobExecutionResult Future will throw an ExecutionException. This
// behaviour was largely not there in Flink versions before the PipelineExecutor
// refactoring so we should strip that exception.
Throwable strippedException = ExceptionUtils.stripExecutionException(t);
jobListeners.forEach(
jobListener -> {
jobListener.onJobExecuted(null, strippedException);
});
ExceptionUtils.rethrowException(strippedException);
// never reached, only make javac happy
return null;
}
}
可以看到,在这个方法里异步执行了这个StreamGraph,并在后面的代码中通过get方法阻塞等待StreamGraph的执行结果,我们来看StreamGraph的异步执行过程,点进executeAsync方法:
@Internal
public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
checkNotNull(streamGraph, "StreamGraph cannot be null.");
checkNotNull(
configuration.get(DeploymentOptions.TARGET),
"No execution.target specified in your configuration file.");
final PipelineExecutorFactory executorFactory =
executorServiceLoader.getExecutorFactory(configuration);
checkNotNull(
executorFactory,
"Cannot find compatible factory for specified execution.target (=%s)",
configuration.get(DeploymentOptions.TARGET));
/*
TODO 异步提交得到future
*/
CompletableFuture<JobClient> jobClientFuture =
executorFactory
.getExecutor(configuration)
.execute(streamGraph, configuration, userClassloader);
try {
// TODO 阻塞获取StreamGraph的执行结果
JobClient jobClient = jobClientFuture.get();
jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));
return jobClient;
} catch (ExecutionException executionException) {
final Throwable strippedException =
ExceptionUtils.stripExecutionException(executionException);
jobListeners.forEach(
jobListener -> jobListener.onJobSubmitted(null, strippedException));
throw new FlinkException(
String.format("Failed to execute job '%s'.", streamGraph.getJobName()),
strippedException);
}
}
可以看到在这里使用了异步编程来提交StreamGraph,我们继续点进execute方法,选择AbstractSessionClusterExecutor实现:
// TODO 此处的pipeline参数就是StreamGraph
@Override
public CompletableFuture<JobClient> execute(
@Nonnull final Pipeline pipeline,
@Nonnull final Configuration configuration,
@Nonnull final ClassLoader userCodeClassloader)
throws Exception {
// TODO 通过StreamGraph构建JobGraph
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
/*
TODO 到此为止,JobGraph已经构建完成,接下来开始JobGraph的提交
*/
// TODO
try (final ClusterDescriptor<ClusterID> clusterDescriptor =
clusterClientFactory.createClusterDescriptor(configuration)) {
final ClusterID clusterID = clusterClientFactory.getClusterId(configuration);
checkState(clusterID != null);
/*
TODO 用于创建RestClusterClient的 Provider: ClusterClientProvider
1. 内部会初始化得到RestClusterClient
2. 初始化RestClusterClient的时候,会初始化他内部的成员变量: RestClient
3. 在初始化RestClient的时候,也会初始化他内部的一个netty客户端
TODO 提交Job的客户端: RestClusterClient中的RestClient中的Netty客户端
TODO 接受Job的服务端: JobManager中启动的WebMonitorEndpoint中的Netty 服务端
*/
final ClusterClientProvider<ClusterID> clusterClientProvider =
clusterDescriptor.retrieve(clusterID);
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
/*
TODO 提交执行
1. MiniClusterClient 本地执行
2. RestClusterClient 提交到Flink Rest服务器接受处理
*/
return clusterClient
// TODO 调用RestClient 内部的netty客户端进行提交
.submitJob(jobGraph)
.thenApplyAsync(
FunctionUtils.uncheckedFunction(
jobId -> {
ClientUtils.waitUntilJobInitializationFinished(
() -> clusterClient.getJobStatus(jobId).get(),
() -> clusterClient.requestJobResult(jobId).get(),
userCodeClassloader);
return jobId;
}))
.thenApplyAsync(
jobID ->
(JobClient)
new ClusterClientJobClientAdapter<>(
clusterClientProvider,
jobID,
userCodeClassloader))
.whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());
}
}
在这里做了以下的工作:
1、通过PipelineExecutorUtils.getJobGraph方法,根据StreamGraph获取JobGraph。
2、构建了一个ClusterDescriptor对象,并使用此对象构建出ClusterClientProvider,进而构建出我们真正进行提交的对象ClusterClient。
我们首先来看ClusterClientProvider的构建过程,点进clusterDescriptor.retrieve方法,选择StandaloneClusterDescriptor实现:
@Override
public ClusterClientProvider<StandaloneClusterId> retrieve(
StandaloneClusterId standaloneClusterId) throws ClusterRetrieveException {
return () -> {
try {
// TODO
return new RestClusterClient<>(config, standaloneClusterId);
} catch (Exception e) {
throw new RuntimeException("Couldn't retrieve standalone cluster", e);
}
};
}
在这个方法里,初始化并返回了一个RestClusterClient,我们来看构造方法:
private RestClusterClient(
Configuration configuration,
@Nullable RestClient restClient,
T clusterId,
WaitStrategy waitStrategy,
ClientHighAvailabilityServices clientHAServices)
throws Exception {
this.configuration = checkNotNull(configuration);
// TODO 解析配置
this.restClusterClientConfiguration =
RestClusterClientConfiguration.fromConfiguration(configuration);
if (restClient != null) {
this.restClient = restClient;
} else {
// TODO 构建一个RestClient
// TODO 内部其实就是构建了一个Netty客户端
this.restClient =
new RestClient(
restClusterClientConfiguration.getRestClientConfiguration(),
executorService);
}
this.waitStrategy = checkNotNull(waitStrategy);
this.clusterId = checkNotNull(clusterId);
this.clientHAServices = checkNotNull(clientHAServices);
this.webMonitorRetrievalService = clientHAServices.getClusterRestEndpointLeaderRetriever();
this.retryExecutorService =
Executors.newSingleThreadScheduledExecutor(
new ExecutorThreadFactory("Flink-RestClusterClient-Retry"));
// TODO 监听WebMonitorEndpoint的地址改变
startLeaderRetrievers();
}
在这里主要完成了三个工作:
1、解析配置
2、构建Netty客户端
3、监听WebMonitorEndpoint的地址改变
在完成了Netty客户端的构建之后,我们继续看JobGraph的提交,我们继续看这段代码:
/*
TODO 提交执行
1. MiniClusterClient 本地执行
2. RestClusterClient 提交到Flink Rest服务器接受处理
*/
return clusterClient
// TODO 调用RestClient 内部的netty客户端进行提交
.submitJob(jobGraph)
.thenApplyAsync(
FunctionUtils.uncheckedFunction(
jobId -> {
ClientUtils.waitUntilJobInitializationFinished(
() -> clusterClient.getJobStatus(jobId).get(),
() -> clusterClient.requestJobResult(jobId).get(),
userCodeClassloader);
return jobId;
}))
.thenApplyAsync(
jobID ->
(JobClient)
new ClusterClientJobClientAdapter<>(
clusterClientProvider,
jobID,
userCodeClassloader))
.whenCompleteAsync((ignored1, ignored2) -> clusterClient.close());
我们点进提交方法clusterClient.submitJob,选择RestClusterClient实现,这个方法很长,我们拆分开分析,首先是这段代码:
CompletableFuture<java.nio.file.Path> jobGraphFileFuture =
CompletableFuture.supplyAsync(
() -> {
try {
final java.nio.file.Path jobGraphFile =
Files.createTempFile("flink-jobgraph", ".bin");
try (ObjectOutputStream objectOut =
new ObjectOutputStream(
Files.newOutputStream(jobGraphFile))) {
objectOut.writeObject(jobGraph);
}
return jobGraphFile;
} catch (IOException e) {
throw new CompletionException(
new FlinkException("Failed to serialize JobGraph.", e));
}
},
executorService);
在这段代码里,进行的工作就是将JobGraph进行持久化,持久化成一个JobGraphFile,这个file的前缀是flink-jobgraph,后缀是 .bin。我们在提交JobGraph到Flink集群运行的时候,其实提交的就是这个文件,最终由Flink集群的WebMonitor(JobSubmitHandler)去接收请求来执行处理。JobSubmitHandler在执行处理的第一件事就是把接收到的文件反序列化得到JobGraph对象。
我们继续看下一段代码:
/*
TODO 等待持久化完成之后,将JobGraphFile加入待上传的文件列表
*/
CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture =
jobGraphFileFuture.thenApply(
jobGraphFile -> {
List<String> jarFileNames = new ArrayList<>(8);
List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames =
new ArrayList<>(8);
Collection<FileUpload> filesToUpload = new ArrayList<>(8);
// TODO 将JobGraphFile加入待上传的文件列表
filesToUpload.add(
new FileUpload(
jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));
// TODO 上传Job所需的jar
for (Path jar : jobGraph.getUserJars()) {
jarFileNames.add(jar.getName());
filesToUpload.add(
new FileUpload(
Paths.get(jar.toUri()),
RestConstants.CONTENT_TYPE_JAR));
}
... ...
// TODO 构建提交任务的请求体,包含对应的一些资源,主要是JobGraph的持久化文件和对应的依赖jar
final JobSubmitRequestBody requestBody =
new JobSubmitRequestBody(
jobGraphFile.getFileName().toString(),
jarFileNames,
artifactFileNames);
// TODO 返回一个Tuple2,包含两个内容: requestBody和filesToUpload
return Tuple2.of(
requestBody, Collections.unmodifiableCollection(filesToUpload));
});
在这段代码里,将JobGraphFile加入待上传的文件列表,并将job所需的jar也加入此列表,最后构建提交任务的requestBody,这个requestBody中包含了所需的一些资源,主要是JobGraph的持久化文件和对应的依赖jar。
我们继续来看下一段代码:
// TODO 发送请求
final CompletableFuture<JobSubmitResponseBody> submissionFuture =
requestFuture.thenCompose(
requestAndFileUploads ->
// TODO 提交
sendRetriableRequest(
JobSubmitHeaders.getInstance(),
EmptyMessageParameters.getInstance(),
requestAndFileUploads.f0,
requestAndFileUploads.f1,
isConnectionProblemOrServiceUnavailable()));
在这段代码里,我们进行了JobGraph的提交,我们点进sendRetriableRequest方法:
private <
M extends MessageHeaders<R, P, U>,
U extends MessageParameters,
R extends RequestBody,
P extends ResponseBody>
CompletableFuture<P> sendRetriableRequest(
M messageHeaders,
U messageParameters,
R request,
Collection<FileUpload> filesToUpload,
Predicate<Throwable> retryPredicate) {
// TODO 可重试机制
return retry(
() ->
// TODO 获取主节点JobManager中的WebMonitorEndpoint的地址
// TODO 其实客户端提交JobGraph就是提交给WebMonitorEndpoint
getWebMonitorBaseUrl()
.thenCompose(
webMonitorBaseUrl -> {
try {
/*
TODO 提交Request给WebMonitorEndpoint,最终由JobSubmitHandler来执行请求处理
通过 Http Restful方式提交
*/
return restClient.sendRequest(
webMonitorBaseUrl.getHost(),
webMonitorBaseUrl.getPort(),
messageHeaders,
messageParameters,
request,
filesToUpload);
} catch (IOException e) {
throw new CompletionException(e);
}
}),
retryPredicate);
}
在这个方法里,首先获取WebMonitorEndpoint的地址,然后再通过http restfull的方式提交了作业任务,我们继续来看提交流程,点进restClient.sendRequest方法:
@Override
public <
M extends MessageHeaders<R, P, U>,
U extends MessageParameters,
R extends RequestBody,
P extends ResponseBody>
CompletableFuture<P> sendRequest(
final String targetAddress,
final int targetPort,
final M messageHeaders,
final U messageParameters,
final R request,
final Collection<FileUpload> files)
throws IOException {
if (failHttpRequest.test(messageHeaders, messageParameters, request)) {
return FutureUtils.completedExceptionally(new IOException("expected"));
} else {
// TODO 继续提交
return super.sendRequest(
targetAddress,
targetPort,
messageHeaders,
messageParameters,
request,
files);
}
}
再点进super.sendRequest方法:
public <
M extends MessageHeaders<R, P, U>,
U extends MessageParameters,
R extends RequestBody,
P extends ResponseBody>
CompletableFuture<P> sendRequest(
String targetAddress,
int targetPort,
M messageHeaders,
U messageParameters,
R request,
Collection<FileUpload> fileUploads,
RestAPIVersion apiVersion)
throws IOException {
Preconditions.checkNotNull(targetAddress);
Preconditions.checkArgument(
NetUtils.isValidHostPort(targetPort),
"The target port " + targetPort + " is not in the range [0, 65535].");
Preconditions.checkNotNull(messageHeaders);
... ...
... ...
/*
TODO 处理得到url,然后决定使用WebMonitorEndpoint中的哪个Handler来执行处理
*/
String versionedHandlerURL =
"/" + apiVersion.getURLVersionPrefix() + messageHeaders.getTargetRestEndpointURL();
String targetUrl = MessageParameters.resolveUrl(versionedHandlerURL, messageParameters);
LOG.debug(
"Sending request of class {} to {}:{}{}",
request.getClass(),
targetAddress,
targetPort,
targetUrl);
// serialize payload
StringWriter sw = new StringWriter();
objectMapper.writeValue(sw, request);
ByteBuf payload =
Unpooled.wrappedBuffer(sw.toString().getBytes(ConfigConstants.DEFAULT_CHARSET));
// TODO 构建一个http Request对象
Request httpRequest =
createRequest(
targetAddress + ':' + targetPort,
targetUrl,
messageHeaders.getHttpMethod().getNettyHttpMethod(),
payload,
fileUploads);
final JavaType responseType;
final Collection<Class<?>> typeParameters = messageHeaders.getResponseTypeParameters();
if (typeParameters.isEmpty()) {
responseType = objectMapper.constructType(messageHeaders.getResponseClass());
} else {
responseType =
objectMapper
.getTypeFactory()
.constructParametricType(
messageHeaders.getResponseClass(),
typeParameters.toArray(new Class<?>[typeParameters.size()]));
}
// TODO 提交请求
return submitRequest(targetAddress, targetPort, httpRequest, responseType);
}
在这个方法里,做了以下工作;
- 处理得到url,然后决定使用WebMonitorEndpoint中的哪个Handler来执行处理
- 构建一个http Request对象
- 提交请求
我们继续看请求的提交,点进submitRequest方法里:
private <P extends ResponseBody> CompletableFuture<P> submitRequest(
String targetAddress, int targetPort, Request httpRequest, JavaType responseType) {
/*
TODO 通过netty客户端发送请求给netty服务端
*/
final ChannelFuture connectFuture = bootstrap.connect(targetAddress, targetPort);
final CompletableFuture<Channel> channelFuture = new CompletableFuture<>();
connectFuture.addListener(
(ChannelFuture future) -> {
if (future.isSuccess()) {
channelFuture.complete(future.channel());
} else {
channelFuture.completeExceptionally(future.cause());
}
});
return channelFuture
.thenComposeAsync(
channel -> {
ClientHandler handler = channel.pipeline().get(ClientHandler.class);
CompletableFuture<JsonResponse> future;
boolean success = false;
try {
if (handler == null) {
throw new IOException(
"Netty pipeline was not properly initialized.");
} else {
// TODO 发送请求数据包到服务端
httpRequest.writeTo(channel);
future = handler.getJsonFuture();
success = true;
}
} catch (IOException e) {
future =
FutureUtils.completedExceptionally(
new ConnectionException(
"Could not write request.", e));
} finally {
if (!success) {
channel.close();
}
}
return future;
},
executor)
.thenComposeAsync(
(JsonResponse rawResponse) -> parseResponse(rawResponse, responseType),
executor);
}
可以看到,这里使用了bootstrap.connect去连接netty服务端,在连接成功后,调用httpRequest.writeTo(channel);方法发送数据。
这里的bootstrap是netty客户端的引导程序,主节点启动的时候,启动了WebMonitorEndpoint的组件,这个组件在启动的时候启动了Netty的服务端,然后客户端提交Job的时候,其实是通过RestClient提交的.在初始化RestClient的时候就初始化了Netty客户端。 如果调用 submitRequest(...)方法,就会执行请求的提交,netty客户端链接netty服务端,发送请求,其实就是将Request请求对象的数据写入服务端。
到此为止,我们的Job就已经提交给了主节点的WebMonitorEndpoint了,在本章中没有对StreamGraph和JobGraph的构建流程进行详细的讲解,我计划在后续章节中分别来分析这两个Graph的构建。
总结
在StreamExecutionEnvironment初始化的工作中,主要做了两件事,分别是StateBackend的配置和Checkpoint的配置
在 Flink 应用程序中,其实所有的操作,都是 StreamOperator,分为 SourceOperator, StreamOperator,SinkOperator,然后能被优化的 Operator 就会 chain 在一起,形成一个 OperatorChain。
算子的转换流程为: Function => StreamOperator => Transformation => OperatorChain(并行化之后,得到 StreamTask 执行)。
在env.execute环节中,根据我们构建的Transformations集合,构建出StreamGraph,再将StreamGraph转化为JobGraph,并将JobGraph持久化,最终将我们的JobGraphFile以及依赖Jar以及其他一些配置构建为一个RequestBody,通过RestClient内部构建的Netty客户端发送至JobManager中的WebMonitorEndpoint中的Netty 服务端,再由Netty服务端解析url交给对应的handler处理。
版权归原作者 EdwardsWang丶 所有, 如有侵权,请联系我们删除。