一、Flink简介
Apache Flink 是一个开源的流处理框架,用于在无边界和有边界的数据流上进行有状态的计算。它能够在所有常见的集群环境中运行,并能以内存速度和任意规模进行计算。Flink 支持高吞吐、低延迟的高性能计算,并且具有精确一次的状态一致性保障。它提供了丰富的 API,包括 DataStream API、DataSet API、Table API 以及 SQL on Stream & Batch Data。Flink 能够处理无界和有界数据集,并且具有强大的状态管理和事件时间处理能力。
二、作业并行度的设置
在 Apache Flink 中,**并行度**是指任务在执行时能够同时运行的最大线程数或任务数。它定义了作业在集群中并行执行的程度,是影响作业吞吐量和资源利用率的关键因素。
设置并行度
一个 task 的并行度可以从多个层次指定:
算子层次
单个算子、数据源和数据接收器的并行度可以通过调用
setParallelism()
方法来指定。如下所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
.flatMap(new LineSplitter())
.keyBy(value -> value.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum(1).setParallelism(5);
wordCounts.print();
env.execute("Word Count Example");
执行环境层次
如此节所描述,Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。
可以通过调用
setParallelism()
方法指定执行环境的默认并行度。如果想以并行度
3
来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);
DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();
env.execute("Word Count Example");
客户端层次
将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。
在 CLI 客户端中,可以通过
-p
参数指定并行度,例如:
./bin/flink run -p 10 ../examples/*WordCount-java*.jar
在 Java/Scala 程序中,可以通过如下方式指定并行度:
try {
PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
Configuration config = new Configuration();
Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
// set the parallelism to 10 here
client.run(program, 10, true);
} catch (ProgramInvocationException e) {
e.printStackTrace();
}
系统层次
可以通过设置
./conf/flink-conf.yaml
文件中的
parallelism.default
参数,在系统层次来指定所有执行环境的默认并行度。你可以通过查阅配置文档获取更多细节。
设置最大并行度
最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用
setParallelism()
方法修改并行度相似,你可以通过调用
setMaxParallelism()
方法来设定最大并行度。
默认的最大并行度等于将
operatorParallelism + (operatorParallelism / 2)
值四舍五入到大于等于该值的一个整型值,并且这个整型值是
2
的幂次方,注意默认最大并行度下限为
128
,上限为
32768
。
注意 为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。
三、Key Groups说明
在 Apache Flink 中,Key Groups 是一个重要的概念,它们是 Keyed State 的原子单位,用于在并行实例之间分配和管理状态。以下是关于 Key Groups 的一些核心理解:
- Key Groups 的定义:Key Groups 是对 Key 进行分组的方式,它们将无限的 Key 通过特定的算法分成有限的组。每个 Key 都会被分配到一个 Key Group 中,这个过程是通过 Key 的哈希值对 Key Groups 的数量进行取模来完成的。例如,如果有一个 Key,它的哈希码是 11,而最大并行度设置为 10,那么这个 Key 会被分配到 Key Group 1 中(因为 11 % 10 = 1)。
- Key Groups 的作用:Key Groups 主要有两个作用。首先,它们确保 Key 能够均匀分散到每个并行算子上,这是通过将 Key Groups 平均分配到现有的并行度上来实现的。其次,当集群重启或者扩容后,Key Groups 确保新进入的数据能够找到之前的状态,这对于状态的恢复和一致性至关重要。
- Key Groups 与并行度:每个并行实例(Sub-Task)都会处理一个或多个 Key Groups。Key Groups 的总数与作业的最大并行度相等。这意味着,如果作业的最大并行度是 10,那么就有 10 个 Key Groups,索引从 0 到 9。
- 最大并行度限制:在 Flink 中,最大并行度是一个重要的配置,它定义了作业能够达到的最大并行度。这个值一旦在作业的第一次启动后设置,就不应该无代价地改变,因为 Key Groups 的数量与最大并行度相等,改变这个值可能会导致状态无法正确映射,从而影响作业的恢复。
- Key Groups 的分配算法:Key Groups 到并行算子的分配算法尽量保证均匀分配。例如,如果有 8 个 Key Groups 和 3 个并行算子,那么前两个算子会分配到 3 个 Key Groups,第三个算子分配到 2 个 Key Groups。这种分配方式确保了负载均衡。
- Key Groups 与状态管理:在状态管理方面,Key Groups 允许 Flink 在进行 Checkpoint 时,将状态数据按照 Key Groups 进行分组保存。这样,在作业重启时,可以根据 Key Groups 快速恢复状态。
- 使用注意事项:在设置最大并行度时,应该考虑到状态的大小和 Checkpoint 的性能。增加最大并行度会导致更多的 Key Groups,这会增加状态元数据的大小,从而可能影响 Checkpoint 的性能。因此,应该在满足业务需求的前提下,尽可能设置一个较小的最大并行度。
综上所述,Key Groups 在 Flink 中是实现高效状态管理和并行处理的关键机制。正确理解和使用 Key Groups 对于优化 Flink 作业的性能和资源利用率非常重要。
版权归原作者 似水_逆行 所有, 如有侵权,请联系我们删除。