0


Flink|《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》学习笔记

学习文档:《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》

学习笔记如下:


概述

Flink 中每条消息都会被放到网络缓冲(network buffer) 中,并以此为最小单位发送到下一个 subtask。 Flink 在传输过程的输入端和输出端使用了网络缓冲队列,即每个 subtask 都有一个输入队列来接收数据和一个输出队列来发送数据,拥有更多的中间缓存数据可以使 Flink 提供更高、更富有弹性的吞吐量,但是也会增加快照时间。

  • 对于对齐的 checkpoints,checkpoint barrier 会随着网络缓冲数据在 job graph 中流动;因此缓冲数据越多,checkpoint barrier的流动时间就越长
  • 对于不对齐的 checkpoint,缓冲数据越多,需要持久化到 checkpoint 中的数据越多,checkpoint 也就会越大

缓冲消胀机制(Buffer Debloating)

Flink 1.14 之前配置缓冲数据量的方法:

  • 指定缓冲数量
  • 指定缓冲大小

Flink 1.14 之后引入缓冲消胀功能来解决这个问题。缓冲消胀功能会计算 subtask 可能达到的最大吞吐(始终保持繁忙状态时),并根据计算结果调整缓冲数据量,从而使得数据的消费时间达到配置值。使用缓冲消胀机制的配置:

  • taskmanager.network.memory.buffer-debloat.enabledtrue:开启缓冲消胀机制
  • taskmanager.network.memory.buffer-debloat.targetduration 类型的值:指定消费缓冲数据的目标时间(默认值应该能满足大多数场景)

如果缓冲消胀功能的消费时间预测不准,会导致以下问题:

  • 缓冲数据不足,影响整体吞吐量
  • 缓冲数据太多,影响 checkpoint barrier 推进或非对齐的 checkpoint 大小

如果 Job 的负载经常波动,可以长治调整以下设置:

  • taskmanager.network.memory.buffer-debloat.period:这是缓冲区大小重算的最小时间周期。周期越小,缓冲消胀机制的反应时间就越快,但是必要的计算会消耗更多的CPU。
  • 调整样本数:样本数越少,缓冲消胀机制的反应时间就越快,但是当吞吐量突然飙升或者下降时,缓冲消胀机制计算的最佳缓冲数据量会更容易出错。 - taskmanager.network.memory.buffer-debloat.samples:计算平均吞吐量的采样数。- taskmanager.network.memory.buffer-debloat.period:计算平均吞吐量的采集样本的频率。
  • taskmanager.network.memory.buffer-debloat.threshold-percentages:防止缓冲区大小频繁改变的优化

可以使用以下指标来监控当前的缓冲区大小:

  • estimatedTimeToConsumeBuffersMs:消费所有输入通道(input channel)中数据的总时间。
  • debloatedBufferSize:当前的缓冲区大小。
限制

当前,有一些场景还没有自动地被缓冲消胀机制处理。

  • 有多个输入流,或者有一个合并的输入的场景:因为吞吐计算和缓冲消胀发生在 subtask 层面,所以可能会导致低吞吐的输入有太多缓冲数据,而高吞吐输入的缓冲区数量太少而不能维持当前吞吐。当不同的输入吞吐差别比较大时,这种现象会更加明显。
  • 缓冲区的大小和数量未实际改变:当前,缓冲消胀仅在使用的缓冲区大小中设置上限,实际的缓冲区大小和个数保持不变。因此,当前的缓冲消胀机制不会减少作业的内存使用,需要手动减少缓冲区的大小或者个数。
  • 高并行度:目前,缓冲消胀机制在高并行度(超过 200)时可能无法正常执行。如果出现吞吐量降低或检查点时间高于预期,则建议将浮动缓冲区数量(taskmanager.network.memory.floating-buffers-per-gate)从默认值增加到至少等于并行度的数量。问题发生的并行度的实际值因作业而异,但通常应超过几百个。

网络缓冲生命周期

在 Flink 中,每个输入和输出流对应一个本地缓冲区池,每个缓冲区池的缓冲区数的计算公式如下:

      channels 
     
    
      × 
     
    
      taskmanager.network.memory.buffers-per-channel 
     
    
      + 
     
    
      taskmanager.network.memory.floating-buffers-per-gate 
     
    
   
     \texttt{channels} \times \texttt{taskmanager.network.memory.buffers-per-channel} + \texttt{taskmanager.network.memory.floating-buffers-per-gate} 
    
   
 channels×taskmanager.network.memory.buffers-per-channel+taskmanager.network.memory.floating-buffers-per-gate

每个缓冲区(Buffer)的大小可以通过

taskmanager.memory.segment-size

来设置。

输入网络缓冲

缓冲区池不一定总能达到目标缓冲区数。Flink 有一个阈值

taskmanager.network.memory.read-buffer.required-per-gate.max

用于控制 Flink 在无法获取到缓冲区时是否会失败。在目标缓冲区数中,小于阈值的部分被称为必须(required)缓冲区,剩余的部分是可选(optional)缓冲区。如果无法获得必须缓冲区,会导致任务失败;如果无法获得可选缓冲区,任务不会失败,但可能会降低性能。

对于流作业,这个阈值的默认值是

Integer.MAX_VALUE

;对于批作业,这个阈值的默认值是 1000。通常来说,这个阈值越小,出现 “网络缓冲区数量不足” 异常的可能性越小,但导致作业静默地性能下降的可能性越大。

输出网络缓冲

输出缓冲区池只有一种类型的缓冲区被所有的 subpartitions 共享。为了避免过多的数据倾斜,每个 subpartition 的缓冲区数量可以通过

taskmanager.network.memory.max-buffers-per-channel

来限制。

独占缓冲区和流动缓冲区:输出缓冲区池的独占缓冲区和流动缓冲区只被当作推荐值,如果没有足够的缓冲区,每个输出 subpartition 可以只使用一个独占缓冲区而没有流动缓冲区。

透支缓冲区(Overdraft buffers)

每个 subtask 输出数据时可以至多请求

taskmanager.network.memory.max-overdraft-buffers-per-gate

(默认 5)个额外的透支缓冲区(overdraft buffers)。只有当前 subtask 被下游 subtasks 反压且当前 subtask 需要请求超过 1 个网络缓冲区(network buffer)才能完成当前的操作时,透支缓冲区才会被使用。

可能使用透支缓冲区的场景包括:

  • 序列化非常大的 records,不能放到单个网络缓冲区中
  • 类似 flatMap 的算子在处理单个 record 时产生了过多的 records
  • 周期性地或某些事件触发产生大量 records 的算子(例如 WindowOperator 的触发)。

在这些情况下,如果没有透支缓冲区,Flink 的 subtask 线程会被阻塞在反压,从而阻止例如 Unaligned Checkpoint 的完成。 为了缓解这种情况,增加了透支缓冲区的概念。这些透支缓冲区是可选的,Flink 可以仅仅使用常规的缓冲区逐渐取得进展,也就是 说

0

taskmanager.network.memory.max-overdraft-buffers-per-gate

可以接受的配置值。

设置缓冲区的大小和数量

独占缓冲区和流动缓冲区的默认配置应该足以应对最大吞吐。

如果想要最小化缓冲数据量,那么可以将独占缓冲区设置为 0,同时减小内存段的大小。

选择缓冲区的大小

在往下游 subtask 发送数据部分时,缓冲区通过汇集 record 来优化网络开销。下游 subtask 应该在接收到完整的 record 后才开始处理它。

如果缓冲区太小,或缓冲区刷新得过于频繁(通过

execution.buffer-timeout

参数配置),则可能导致吞吐量降低,因每个缓冲区的开销明显高于每个记录的开销。

根据经验,通常不建议增加缓冲区的大小或缓冲区的超时时间,除非在实际工作中观察到网络瓶颈,即下游 Operator 空闲,上游 Operator 背压,上游的输出缓冲区队列已满,下游的输入队列为空。

如果缓冲区太大,会导致:

  • 内存使用高
  • 大量的 checkpoint 数据量(针对非对齐的 checkpoints)
  • 漫长的 checkpoint 时间(针对对齐的 checkpoints)
  • execution.buffer-timeout 较小时内存分配使用率会比较低,因为缓冲区还没被塞满数据就被发送下去了
选择缓冲区的数量

缓冲区的数量通过

taskmanager.network.memory.buffers-per-channel

taskmanager.network.memory.floating-buffers-per-gate

配置的。

为了最好的吞吐率,建议使用独占缓冲区和流动缓冲区的默认值。如果缓冲数据量存在问题,则建议打开缓冲消胀。

可以人工地调整网络缓冲区的个数,但是需要注意:

  1. 根据期待的吞吐量来调整缓冲区的数量。协调数据传输量(大约两个节点之间的两个往返消息)。延迟也取决于您的网络。 使用 buffer 往返时间(大概 1ms 在正常的本地网络中),缓冲区大小和期待的吞吐,可以通过下面的公式计算维持吞吐所需要的缓冲区数量:

       number_of_buffers 
      
     
       = 
      
     
       expected_throughput 
      
     
       × 
      
     
       buffer_roundtrip 
      
     
       / 
      
     
       buffer_size 
      
     
    
      \texttt{number\_of\_buffers} = \texttt{expected\_throughput} \times \texttt{buffer\_roundtrip} / \texttt{buffer\_size} 
     
    

    number_of_buffers=expected_throughput×buffer_roundtrip/buffer_size

比如,期待吞吐为 320MB/s,往返延迟为 1ms,内存段为默认大小,为了维持吞吐需要使用10个活跃的缓冲区:

      number_of_buffers 
     
    
      = 
     
    
      320 
     
    
      M 
     
    
      B 
     
    
      / 
     
    
      s 
     
    
      × 
     
    
      1 
     
    
      m 
     
    
      s 
     
    
      / 
     
    
      32 
     
    
      K 
     
    
      B 
     
    
      = 
     
    
      10 
     
    
   
     \texttt{number\_of\_buffers} = 320MB/s \times 1ms / 32KB = 10 
    
   
 number_of_buffers=320MB/s×1ms/32KB=10
  1. 流动缓冲区的目的是为了处理数据倾斜。理想情况下,流动缓冲区的数量(默认8个)和每个通道独占缓冲区的数量(默认2个)能够使网络吞吐量饱和。但这并不总是可行和必要的,所有 subtask 中只有一个通道被使用也是非常罕见的。
  2. 独占缓冲区的目的是提供一个流畅的吞吐量。当一个缓冲区在传输数据时,另一个缓冲区被填充。当吞吐量比较高时,独占缓冲区的数量是决定 Flink 中缓冲数据的主要因素。

当低吞吐量下出现反压时,应该考虑减少独占缓冲区。

总结

可以通过开启缓冲消胀机制来简化 Flink 网络的内存配置调整。

如果缓冲消胀机制不能起作用,则可以关闭缓冲消胀机制并且人工地配置内存段的大小和缓冲区个数。此时推荐:

  • 使用默认值以获得最大吞吐
  • 减少内存段大小、独占缓冲区的数量来加快 checkpoint 并减少网络栈消耗的内存量

本文转载自: https://blog.csdn.net/Changxing_J/article/details/135671926
版权归原作者 长行 所有, 如有侵权,请联系我们删除。

“Flink|《Flink 官方文档 - 部署 - 内存配置 - 网络缓冲调优》学习笔记”的评论:

还没有评论