0


Flink|《Flink 官方文档 - Operations - 批处理 shuffle》学习笔记

学习文档:《Flink 官方文档 - Operations - Batch - Batch Shuffle》

学习笔记如下:


Flink DataStream API 和 Table / SQL 都支持通过批处理执行模式处理有界输入。在批处理模式下,Flin 提供了两种网络交换模式:

  • Blocking Shuffle(默认):持久化所有的中间数据,只有当数据完全产出后才能被消费
  • Hybrid Shuffle:下一代的批处理数据交换模式,它会更加智能地持久化数据,并且允许在数据生产的同时进行消费(该特性目前是实验行的,且存在一些已知的限制)

Blocking Shuffle

blocking shuffle 将数据持久化到存储中,然后下游任务通过网络获取这些数据。这种交换减少了执行作业所需的资源,因为它不需要同时运行上游和下游任务。

Flink 提供了两种类型的 blocking shuffles:hash shuffle 和 sort shuffle。

Hash Shuffle

在 Flink 1.14 及更低的版本中,Hash Shuffle 是 blocking shuffle 的默认实现,它为每个下游任务将每个上游任务的结果以单独文件的方式保存在 TaskManager 的本地磁盘上。当下游任务运行时,会向上游的 TaskManager 请求分片,TaskManager 在读取文件之后通过网络传输给下游任务。

Hash Shuffle 为读写文件提供了不同的机制:

  • file:通过标准文件 IO 写文件,读取和传输文件需要通过 Netty 的 FileRegion。FileRegion 依靠系统调用 sendfile 来减少数据拷贝和内存消耗。
  • mmap:通过系统调用 mmap 来读写文件。
  • auto:通过标准文件 IO 写文件,对于文件读取,在 32 位机器上降级到 file 选项并且在 64 位机器上使用 mmap。这是为了避免在 32 位机器上 Java 实现 mmap 的文件大小限制。

可以通过修改 TaskManager 参数选择不同的机制。【实验性】

mmap

使用的内存补计算进已有配置的内存限制中,但是一些资源的管理框架比如 YARN 将追中这块内存使用,并且如果容器使用内存超过阈值会被杀掉。

Hash Shuffle 在小规模运行的固态硬盘的任务情况下效果显著,但是仍然有一些问题:

  • 如果任务的规模庞大将会创建很多文件,并且要求同时对这些文件进行大量的写操作。
  • 在机械磁盘情况下,当大量的下游任务同时读取数据,可能会导致随机读写问题
Sort Shuffle

Sort Shuffle 是 Flink 1.13 版本引入的另一种 blocking shuffle 实现,它在 1.15 版本成为默认。不同于 Hash Shuffle,Sort Shuffle 将每个分区结果写入到一个文件。当多个下游任务同时读取结果分片,数据文件只会被打开一次并分享给所有的读请求。因此,集群可以使用更少的资源。

Sort Shuffle 通过写更少的文件和尽可能线性的读取文件,令 Sort Shuffle 在机械硬盘情况下可以获得比 Hash Shuffle 更好的性能。另外,Sort Shuffle 可以额外管理的内存作为读数据缓存并不依赖 sendfile 或 mmap 机制。

当使用 Sort Bloacking Shuffle 的时候需要考虑如下配置:

  • taskmanager.network.sort-shuffle.min-buffers:该选项控制数据写缓存的大小。对于大规模的任务而言,可能需要调大这个配置,正常来说几百兆内存就足够了。这部分内存是从网络内存分配的,所以要增大这个配置值,还需要调整 taskmanager.memory.network.fractiontaskmanager.memory.network.mintaskmanager.memory.network.max 这几个参数来增大总的网络内存以避免出现网络内存不足的情况。
  • taskmanager.memory.framework.off-heap.batch-shuffle.size:该选项控制数据读取缓存大小。对于大规模的任务而言,可能需要调大这个值,正常几百兆内存就足够了。这部分内存是从框架堆外内存中切分出来的,所以想要增大这个配置值,还需要调整 taskmanager.memory.framework.off-heap.size 来增大框架堆外内存以避免出现直接内存溢出的错误。

目前 Sort Shuffle 只通过分区索引来排序而不是记录本身,也就是说 sort 可能被 当成数据聚类算法使用。

Blocking Shuffle 的选择
  • 对于在固态硬盘上运行的小规模任务而言,两者都可以
  • 对于在机械硬盘上运行的大规模任务而言,Sort Shuffle 更为合适

在 Sort Shuffle 和 Hash Shuffle 之间切换,需要配置这个参数:

taskmanager.network.sort-shuffle.min-parallelism

这个参数根据消费者 Task 的并发选择当前 Task 使用 Hash Shuffle 或 Sort Shuffle;如果并发小于配置值则使用 Hash Shuffle,否则使用 Sort Shuffle。对于 1.15 以下版本,它的默认值是

Integer.MAX_VALUE

,即默认使用 Hash Shuffle;对于 1.15 及以上版本,它的默认值是 1,即默认使用 Sort Shuffle。

Hybrid Shuffle【实验性】

Hybrid Shuffle 是下一代的批数据交换方法。它结合了 Blocking Shuffle 和 Pipelined Shuffle(流模式)的优势:

  • 与 Blocking Shuffle 一样,它不需要上游任务和下游任务同时运行,允许使用更少的资源执行任务
  • 与 Pipelined Shuffle 一样,它不需要下游任务在上游任务结束之后才能允许,当资源充足时减少了作业的总体运行时间
  • 它通过提供不同的溢出策略,在保留较少数据和在失败时重新启动较少任务之间适应自定义偏好

要使用 Hybrid Shuffle 模式,你将配置

execution.batch-shuffle-mode

设置为:

  • ALL_EXCHANGES_HYBRID_FULL :完全溢出策略(full spilling strategy)
  • ALL_EXCHANGES_HYBRID_SELECTIVE :选择性溢出策略(Selective spilling strategy)
溢出策略(Spilling Strategy)

Hybrid Shuffle 提供了两种溢出策略:

  • 选择性溢出策略(Selective spilling strategy):当前仅当下游任务没有及时消费时保留数据。这减少了保存数据的数量,但代价是如果发生故障,上游任务需要重新启动以再现完整的中间结果。
  • 完全溢出策略(full spilling strategy):无论数据是否被下游任务消费,都保留全部数据。如果失败,可以重新使用持久化的完整中间数据,不需要重新启动上游任务。
数据消费约束

Hybrid Shuffle 将生产者和消费者之间的分区数据的消费约束分为以下三种情况:

  • ALL_PRODUCERS_FINISHED:只有当所有生产者都已经完成时,HyBrid 分区数据才可以被消费
  • ONLY_FINISHED_PRODUCERS:只有当生产者已经生产完成时,该生产者的 HyBrid 分区数据才可以被消费
  • UNFINISHED_PRODUCERS:生产者海没有生产完成时,该生产者的 ByBrid 分区就可以被消费

这可以通过

jobmanager.partition.hybrid.partition-data-consume-constraint

来配置。

  • 对于 AdaptiveBatchScheduler:默认约束是 UNFINISHED_PRODUCERS 以执行类似于 pipined 的 shuffle 模式。如果该值设置为 ALL_PRODUCERS_FINISHEDONLY_FINISHED_PRODUCERS,性能可能会降低。
  • 如果 SpeculativeExecution 被开启,默认约束是 ONLY_FINISHED_PRODUCERS,相较于 blocking shuffle,可以带来一些性能的提升。但是由于生产者和消费者有机会同时运行,可能会导致执行更多的推测性执行,故障的成本也会增加。如果你想恢复到与 blocking shuffle 相同的行为,你可以将它的值设置为 ALL_PRODUCERS_FINISHED。需要注意的是,这个模式不支持 UNFINISHED_PRODUCERS
远程存储支持

Hybrid Shuffle 支持将 shuffle 数据存储到远端存储中。远端存储的路径可以被配置到

taskmanager.network.hybrid-shuffle.remote.path

中。这个特性支持不同的远端存储系统。

需要注意的是,远端存储系统仅支持新 Hybrid Shuffle 模式。

Hybrid Shuffle 的新模式的旧模式

在 Flink 1.18 中,为了解决一些 issues,重构了 Hybrid Shuffle。相较于原有的架构,它有了一些新的优势,包括是用更少的 network memory 以及支持远程存储。

新模式是默认开启的,如果需要切换到旧模式,可以将

taskmanager.network.hybrid-shuffle.enable-new-mode

配置为 false。

不建议使用旧模式,且旧模式将在未来的发行版中移除。

限制

当前,Hybrid Shuffle 模式仍然是实验性的方法,并存在一些已知的限制:

  • 不支持 slot 共享:当前,Flink 强制 Hybrid Shuffle Mode 中的每个 task 都在各自的 slot 上执行;如果 slot 被指定为共享,则会出现错误
  • 不支持动态图的 pipeline 执行:如果启用了自动并行度或自适应批调度器,将会等待上游任务完成后,再决定下游任务的并行度,这意味着围 Hybrid Shuffle 实际上会回退到 Blocking Shuffer(即 ALL_PRODUCERS_FINISHED 约束)

性能调优

Blocking Shuffle
  • 如果使用机器硬盘作为存储设备,那么应该总是使用 Sort Shuffle,因为这可以极大地提升稳定性和性能。从 1.15 开始,Sort Shuffle 已经成为默认实现;对于 1.14 及之前的版本,需要通过将 taskmanager.network.sort-shuffle.min-parallelism 设置为 1 或手动开启 Sort Shuffle。
  • 对于 Sort Shuffle 和 Hash Shuffle 两种实现,都可以考虑开启数据压缩除非数据本身无法压缩。从 1.15 开始,数据压缩时默认开启的。
  • 当使用 Sort Shuffle 时,减少独占网络缓冲区并增加流动网络缓冲区有利于性能提升。对于 1.14 以及更高版本,建议将 taskmanager.network.memory.buffers-per-channel 设置为 0,并将 taskmanager.network.memory.floating-buffers-per-gate 设为一个较大的值,例如 4096。 - 首先这解耦了并发与网络内存使用量,对于大规模作业,这降低了遇到网络缓冲区不足的可能性- 网络缓冲区可以根据需求在不同的数据通道间共享流动,这可以提高网络缓冲区的利用率,进而可以提高性能
  • 增大总的网络内存。目前网络内存的大小是比较保守的。对于大规模作业,为了实现更高的性能,建议将网络内存比例增加到至少 0.2。为了使调整生效,可能需要同时调整网络内存大小下界和网络内存大小上界。
  • 增加数据写出内存。对于大规模作业,如果有充足的空间内存,建议增大数据写出内存大小至少(2 * 并发数)。在增加这个配置后,为避免出现网络缓冲不足,可能还需要增大总的网络内存大小。
  • 增加数据读取内存。对于大规模作业,建议增大数据读取内存到一个较大的值(例如 256M 或 512M)。因为这个内存是从框架的堆外内存切分出来的,因此必须增加相同的内存大小到 taskmanager.memory.framework.off-heap.size 以避免出现直接内存溢出错误。
Hybrid Shuffle
  • 增大总的网络内存。目前网络内存的大小是比较保守的。对于大规模作业,为了实现更高的性能,建议将网络内存比例增加到至少 0.2。为了使调整生效,可能需要同时调整网络内存大小下界和网络内存大小上界。
  • 增加数据写出内存。对于大规模作业,建议增大总内存大小,用于数据写入的内存越大,下游越有机会直接从内存读取数据。如果使用 Legacy Hybrid Shuffle 模式,则需要保证每个 Result Partition 至少能够分配到 numSubpartition + 1 个 buffer,否则可能会遇到 insufficient number of network buffers 错误。
  • 增加数据读取内存。对于大规模作业,建议增大数据读取内存到一个较大的值(例如 256M 或 512M)。因为这个内存是从框架的堆外内存切分出来的,因此必须增加相同的内存大小到 taskmanager.memory.framework.off-heap.size 以避免出现直接内存溢出错误。
  • 当使用 Legacy Hybrid Shuffle 模式时,减少独占网络缓冲区可能会严重影响性能。因此,最好不要将该值设置为 0,并且对于大规模作业可以适当增加该值。需要注意的是,Legacy Hybrid Shuffle 默认会将 taskmanager.network.memory.read-buffer.required-per-gate.max 设置为 Integer.MAX_VALUE,最好不要去调整该配置,否则可能会造成性能的下降。

常见问题处理

Blocking Shuffle
  • Insufficient number of network buffers:这意味着网络内存大小不足以支撑作业运行,需要增加总的网络内存的大小。从 1.15 开始,Sort Shuffle 已经成为默认实现,对于一些场景,Sort Shuffle 可能比 Hash Shuffle 需要更多的网络内存,因此当你的批作业升级到 1.15 以后可能会遇到这个网络内存不足的问题。这种情况下,你只需要增大总的网络内存大小即可。
  • Too many open files:这意味着文件句柄不够用了。如果你使用的是 Hash Shuffle,请切换到 Sort Shuffle;如果你已经在使用 Sort Shuffle,请考虑增加操作系统文件句柄上限并且检查是否是作业代码占用了过多的文件句柄。
  • Connection reset by peer:这通常意味着网络不太稳定或者压力较大,SSL 握手超时等也可能会导致这一问题。如果你使用的是 Hash Shuffle,请切换到 Sort Shuffle;如果你已经在使用 Sort Shuffle,增加网络连接 backlog 可能会有帮助。
  • Network connection timeout:这通常意味着网络不太稳定或者压力较大。可以考虑增大网络连接超时时间或者开启网络连接重试。
  • Socket read/write timeout:这通常意味着网络传输速度较慢或者压力较大。可以考虑增大网络收发缓冲区大小。如果作业运行在 Kubernetes 环境,可以考虑使用 host network。
  • Read buffer request timeout:这个问题只会出现在 Sort Shuffle,它意味着对数据读取缓冲区的激烈竞争。要解决这一问题,可以考虑增加 taskmanager.memory.framework.off-heap.batch-shuffle.sizetaskmanager.memory.framework.off-heap.size
  • No space left on device:这通常意味着磁盘存储空间或者 inodes 被耗尽。可以考虑扩展磁盘存储空间或者做一些数据清理。
  • Out of memory error:如果你使用的是 Hash Shuffle,请切换到 Sort Shuffle。如果你已经在使用 Sort Shuffle,可以考虑增大相应的内存大小,对于堆上内存,可以增大 taskmanager.memory.task.heap.size,对于直接内存,可以增大 taskmanager.memory.task.off-heap.size
  • Container killed by external resource manger:有多种原因可能会导致容器被杀。例如,杀掉一个低优先级容器以释放资源启动高优先级容器,或者容器占用了过多的资源。Hash Shuffle 可能会使用过多的内存而被 YARN 杀掉。如果你使用的是 Hash Shuffle,请切换到 Sort Shuffle;如果你已经在使用 Sort Shuffle,可能需要同时检查 Flink 日志以及资源管理框架的日志以找出容器被杀的根本原因,并且做出相应的修复。
Hybrid Shuffle
  • Insufficient number of network buffers:这意味着网络内存大小不足以支撑作业运行,需要增加总的网络内存的大小。
  • Connection reset by peer:这通常意味着网络不太稳定或者压力较大,SSL 握手超时等也可能会导致这一问题,可以考虑增加网络连接 backlog。
  • Network connection timeout:这通常意味着网络不太稳定或者压力较大。可以考虑增大网络连接超时时间或者开启网络连接重试。
  • Socket read/write timeout:这通常意味着网络传输速度较慢或者压力较大。可以考虑增大网络收发缓冲区大小。如果作业运行在 Kubernetes 环境,可以考虑使用 host network。
  • Read buffer request timeout:这个问题只会出现在 Sort Shuffle,它意味着对数据读取缓冲区的激烈竞争。要解决这一问题,可以考虑增加 taskmanager.memory.framework.off-heap.batch-shuffle.sizetaskmanager.memory.framework.off-heap.size
  • No space left on device:这通常意味着磁盘存储空间或者 inodes 被耗尽。可以考虑扩展磁盘存储空间或者做一些数据清理。

本文转载自: https://blog.csdn.net/Changxing_J/article/details/135835495
版权归原作者 长行 所有, 如有侵权,请联系我们删除。

“Flink|《Flink 官方文档 - Operations - 批处理 shuffle》学习笔记”的评论:

还没有评论