0


Flink系列-作业并行度的设置

一、Flink简介

    Apache Flink 是一个开源的流处理框架,用于在无边界和有边界的数据流上进行有状态的计算。它能够在所有常见的集群环境中运行,并能以内存速度和任意规模进行计算。Flink 支持高吞吐、低延迟的高性能计算,并且具有精确一次的状态一致性保障。它提供了丰富的 API,包括 DataStream API、DataSet API、Table API 以及 SQL on Stream & Batch Data。Flink 能够处理无界和有界数据集,并且具有强大的状态管理和事件时间处理能力。

二、作业并行度的设置

    在 Apache Flink 中,**并行度**是指任务在执行时能够同时运行的最大线程数或任务数。它定义了作业在集群中并行执行的程度,是影响作业吞吐量和资源利用率的关键因素。

设置并行度

一个 task 的并行度可以从多个层次指定:

算子层次

单个算子、数据源和数据接收器的并行度可以通过调用

setParallelism()

方法来指定。如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = text
    .flatMap(new LineSplitter())
    .keyBy(value -> value.f0)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .sum(1).setParallelism(5);

wordCounts.print();

env.execute("Word Count Example");

执行环境层次

如此节所描述,Flink 程序运行在执行环境的上下文中。执行环境为所有执行的算子、数据源、数据接收器 (data sink) 定义了一个默认的并行度。可以显式配置算子层次的并行度去覆盖执行环境的并行度。

可以通过调用

setParallelism()

方法指定执行环境的默认并行度。如果想以并行度

3

来执行所有的算子、数据源和数据接收器。可以在执行环境上设置默认并行度,如下所示:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(3);

DataStream<String> text = [...]
DataStream<Tuple2<String, Integer>> wordCounts = [...]
wordCounts.print();

env.execute("Word Count Example");

客户端层次

将作业提交到 Flink 时可在客户端设定其并行度。客户端可以是 Java 或 Scala 程序,Flink 的命令行接口(CLI)就是一种典型的客户端。

在 CLI 客户端中,可以通过

-p

参数指定并行度,例如:

./bin/flink run -p 10 ../examples/*WordCount-java*.jar

在 Java/Scala 程序中,可以通过如下方式指定并行度:

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}

系统层次

可以通过设置

./conf/flink-conf.yaml

文件中的

parallelism.default

参数,在系统层次来指定所有执行环境的默认并行度。你可以通过查阅配置文档获取更多细节。

设置最大并行度

最大并行度可以在所有设置并行度的地方进行设定(客户端和系统层次除外)。与调用

setParallelism()

方法修改并行度相似,你可以通过调用

setMaxParallelism()

方法来设定最大并行度。

默认的最大并行度等于将

operatorParallelism + (operatorParallelism / 2)

值四舍五入到大于等于该值的一个整型值,并且这个整型值是

2

的幂次方,注意默认最大并行度下限为

128

,上限为

32768

注意 为最大并行度设置一个非常大的值将会降低性能,因为一些 state backends 需要维持内部的数据结构,而这些数据结构将会随着 key-groups 的数目而扩张(key-group 是状态重新分配的最小单元)。

三、Key Groups说明

在 Apache Flink 中,Key Groups 是一个重要的概念,它们是 Keyed State 的原子单位,用于在并行实例之间分配和管理状态。以下是关于 Key Groups 的一些核心理解:

  1. Key Groups 的定义:Key Groups 是对 Key 进行分组的方式,它们将无限的 Key 通过特定的算法分成有限的组。每个 Key 都会被分配到一个 Key Group 中,这个过程是通过 Key 的哈希值对 Key Groups 的数量进行取模来完成的。例如,如果有一个 Key,它的哈希码是 11,而最大并行度设置为 10,那么这个 Key 会被分配到 Key Group 1 中(因为 11 % 10 = 1)。
  2. Key Groups 的作用:Key Groups 主要有两个作用。首先,它们确保 Key 能够均匀分散到每个并行算子上,这是通过将 Key Groups 平均分配到现有的并行度上来实现的。其次,当集群重启或者扩容后,Key Groups 确保新进入的数据能够找到之前的状态,这对于状态的恢复和一致性至关重要。
  3. Key Groups 与并行度:每个并行实例(Sub-Task)都会处理一个或多个 Key Groups。Key Groups 的总数与作业的最大并行度相等。这意味着,如果作业的最大并行度是 10,那么就有 10 个 Key Groups,索引从 0 到 9。
  4. 最大并行度限制:在 Flink 中,最大并行度是一个重要的配置,它定义了作业能够达到的最大并行度。这个值一旦在作业的第一次启动后设置,就不应该无代价地改变,因为 Key Groups 的数量与最大并行度相等,改变这个值可能会导致状态无法正确映射,从而影响作业的恢复。
  5. Key Groups 的分配算法:Key Groups 到并行算子的分配算法尽量保证均匀分配。例如,如果有 8 个 Key Groups 和 3 个并行算子,那么前两个算子会分配到 3 个 Key Groups,第三个算子分配到 2 个 Key Groups。这种分配方式确保了负载均衡。
  6. Key Groups 与状态管理:在状态管理方面,Key Groups 允许 Flink 在进行 Checkpoint 时,将状态数据按照 Key Groups 进行分组保存。这样,在作业重启时,可以根据 Key Groups 快速恢复状态。
  7. 使用注意事项:在设置最大并行度时,应该考虑到状态的大小和 Checkpoint 的性能。增加最大并行度会导致更多的 Key Groups,这会增加状态元数据的大小,从而可能影响 Checkpoint 的性能。因此,应该在满足业务需求的前提下,尽可能设置一个较小的最大并行度。

综上所述,Key Groups 在 Flink 中是实现高效状态管理和并行处理的关键机制。正确理解和使用 Key Groups 对于优化 Flink 作业的性能和资源利用率非常重要。

标签: flink 大数据

本文转载自: https://blog.csdn.net/qq_43462685/article/details/142132618
版权归原作者 似水_逆行 所有, 如有侵权,请联系我们删除。

“Flink系列-作业并行度的设置”的评论:

还没有评论