0


(增加细粒度资源管理)深入理解flink的task slot相关概念

【背景】

之前对flink的task slot的理解太浅了,重新捋一下相关知识点

为什么需要Task Slot

我们知道,flink中每个TaskManager都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask(线程)。但是TaskManager 的计算资源是有限的,并不是所有任务都可以放在同一个 TaskManager 上并行执行。并行的任务越多,每个线程的资源就会越少。为了控制并发量,即限制一个 TaskManager 能同时接受多少个 task,我们需要在 TaskManager 上对每个任务运行所占用的资源做出明确的划分,这就是所谓的task slot(任务槽)。

Task Slot是什么

每个(task slot)其实表示了 TaskManager 拥有计算资源的一个固定大小的子集。 这些资源就是用来独立执行一个subtask的。因此,ResourceManager在做资源分配管理的时候,最小的单位就是slot。

具体来说,有 n 个 slot 的 TaskManager,会将其托管内存 1/n 用于每个 slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争内存,内存是隔离的。

注意此处没有 CPU 隔离,因此不同slot中的subtask会共享CPU。

通过调整 task slot 的数量,用户可以定义 subtask 如何互相隔离。如果每个 TaskManager 只有一个 slot,这意味着这个slot里面的 task 组将独享 JVM 。如果TaskManager包含多个Slot,那么同一个TaskManager中多个Slot内的task可以共享JVM资源,比如共享TCP连接(通过多路复用)、心跳信息、部分数据结构等,从而减少了每个 task 的开销。

TCP 多路复用允许单个服务器进程同时处理多个 TCP 连接。这通过使用一个或多个多路复用器来实现,多路复用器是一种内核机制,它可以监视多个文件描述符(如套接字)的活动。

官方建议将Slot数目设置为TaskManager下可用的CPU核心数,那么平均下来,每个Slot都能获得1个CPU核心(因为slot只隔离内存不隔离CPU,所以无法强制每个slot独占某个cpu核心)。但考虑到超线程,可以让slotNumber=2*cpuCore.

比如我们有2个 TaskManager ,每个TaskManager 设置了3个slot,那么slot总数是6 ,在执行任务的时候,如下图:

Task Slot 有哪些资源?

Task Manager 中有固定数量的 Slot ,Slot 的具体数量由配置决定。同一 Task Manager 上 Slot 之间没有差别,每一个 Slot 都一样大,即资源一样多。

每个 slot 都能跑由多个连续 task 组成的一个 pipeline,比如 MapFunction 的第n个并行实例和 ReduceFunction 的第n个并行实例可以组成一个 pipeline。通过动态的对槽的大小和数量的调整,就可以把任务的执行较好的并行起来。

slot隔离主要是对内存的隔离,CPU本身是不做隔离的,CPU在不同的slot之间是可以共享的。

slot内存是平均分配的,比如机器上有16G内存,如果划分4个slot的话,那每个slot就是4G内存了。如果每个slot内存太小的话,任务就执行不下去了,内存直接被撑爆这个是完全有可能的。

JobManager拿到任务执行计划后,它如何确定到底需要多少个slot:只要看整个作业里面,并行度最高的那个算子设置的并行度就可以了,只要满足它的需求,别的就都能满足了。

SlotSharing

为了更高效地使用资源,Flink默认允许同一个Job中不同Task的SubTask运行在同一个Slot中,这就是SlotSharing,例如一个作业从Source到Sink的所有子任务都可以放置在一个Slot中,这样数据交换成本更低。

为什么SlotSharing可以提升资源利用率呢?因为不同subtask的内存使用量是不同的,它们被分配到不同的slot后,会导致有的slot内存使用大,有的slot内存使用小。如果不允许slotsharding,slot资源就没有得到充分的利用。

但是slotsharding注意以下描述中的几个关键条件:

1.必须是同一个Job。slot是给Job分配的资源,目的就是隔离各个Job,如果跨Job共享,隔离就失效了;

2.必须是不同Task的Subtask,其实就是让不同操作共享资源,让资源消耗尽可能平均。

3.默认是允许sharing的。

举个例子,下图中两个TaskManager节点共有6个slot,5个SubTask,其中sink的并行度为1,另外两个SubTask的并行度为2。此时由于Subtask(虚线圆角框,5个)少于Slot个数(6个),所以每个Subtask独占一个Slot,没有SlotSharing(想想也是,如果此时还进行slotsharing,就是明明有空余的slot,却让多少subtask去挤破头抢同一个slot的资源,不合理)

下面我们把并行度改为6,此时如果不支持slot sharding,即仍然让每个subtask独占一个slot,那么会导致的问题是:执行速度很慢的subtask会长时间占用一个slot,导致这个slot里面本来空出来的资源没有被其他subtask使用到(这里假设一个slot资源是大于一个subtask所需要的资源)。

因此,如下图,Subtask的个数(虚线圆角框,13个)多于Slot了(6个),出现了SlotSharing。一个Slot中分配了多个Subtask,特别是最左边的Slot中跑了一个完整的Pipeline。

SlotSharing除了提高了资源利用率,还简化了并行度和Slot之间的关系:*一个Job运行需要的最少的Slot个数就是其中并行度最大的那个Task(如上图的source+map组合的task)的并行度(即6),因为并行度最大的task对应的subtask会有6个,而且这6个subtask是不能放在同一个slot里的,所以至少需要6个slot*

Slot Sharing Group(SSG)

为什么又有一个ssg的概念呢?原因有二。

原因一:在 Flink 中,默认情况下,所有的任务都属于同一个默认的 Slot Sharing Group--default。这意味着所有任务都会尝试占用 TaskManager 上的所有可用 slot。这种方式可以在不需要特别考虑 slot 分配的情况下快速启动应用程序,但可能会导致某些任务无法同时在同一个 TaskManager 上运行,从而影响整个应用程序的性能。

为了优化应用程序的性能,我们可以通过设置不同的 Slot Sharing Group 来控制任务之间的共享关系。例如,我们可以将一些任务分组到同一个 Slot Sharing Group 中,让它们共享同一个 TaskManager 上的 slot(CoLocationGroup可以让指定的多个task运行在同一个slot里面,比如迭代计算中)。

原因二:为了防止同一个 slot 包含太多的 task,或者我们希望把计算逻辑复杂的算子单独使用 slot ,提高计算速度。

其实,SSG就是对 operator 进行分组,同一个 group 的不同 operator task 可以共享同一个 slot(相同operator task是不能共享同一个slot的,因为需要保证资源利用率尽量高)。

要想确定一个未做SlotSharingGroup设置的算子的group是什么,可以根据上游算子的 group 和自身是否设置 group共同确定(也就是说如果下游算子没有设置分组,它继承上游算子的分组);

只有属于同一个slot共享组的子任务,才会开启slot共享;不同组之间的任务是完全隔离的,必须分配到不同的slot上。在这种场景下,总共需要的slot数量,就是各个slot共享组最大并行度的总和。例如,如果不设置SlotSharingGroup,默认所有task在同一个共享组(可以共享所有slot),那么Flink集群需要的任务槽与作业中使用的最高并行度正好相同。但是如下图所示,如果我们强制指定了map的slot共享组为test,那么map和map下游的组为test,map的上游source的共享组为默认的default,此时default组中最大并行度为10,test组中最大并行度为20,那么需要的Slot=10+20=30;

怎么配置SSG

代码配置

我们可以通过 slotSharingGroup() 为不同的 operator 设置不同的group,决定哪些 task 需要被共享:

dataStream.filter(...).slotSharingGroup("groupName");//表示filter的slot共享组为groupName

它是Flink中用来实现slot共享的类,尽可能的允许不同的JobVertices部署在相同的Slot中,但这是一种宽约束,即也可以不在一个 slot,只是尽量做到,不能完全保证。

还可以通过 CoLocationGroup 来决定哪些 task 需要被单独的 slot 使用,CoLocationGroup可以强制将subTasksk放到同一个slot中,是一种硬约束,但是需要注意两点:

  1. 保证把JobVertices的第n个运行实例和其他相同组内的JobVertices第n个实例运作在相同的slot中(所有的并行度相同的subTasks运行在同一个slot );
  2. 主要用于迭代流(训练机器学习模型) ,用来保证迭代头与迭代尾的第i个subtask能被调度到同一个TaskManager上。

页面配置

阿里云中还可以为不同group的slot设置不同的资源:作业资源配置 - 实时计算Flink版 - 阿里云

Task Slot与parallelism

slot 是静态的概念,是指 taskmanager 具有的并发执行能力。比如说一个task manager中slot为2,表示这个task manager中并发执行能力是2

parallelism 是动态的概念,是指程序运行时实际使用的并发能力

我们设置task的并行度不能超过slot的数量,比如我们这里slot数量是9,那么最大的并行度也就是9

例如,在一个作业中,我们设置defaultParallelism=8,jmMem=3G,tmMem=5G,tmCpu=2,tmSlots=2,那么,他所用的taskmanager只有4个(因为并行度为8,而每个taskmanager有两个slot即俩个并发,所以只需要4个taskmanager),所以总共core需要的是2core4=8core,taskmanager总共内存需要的是4G4=16G

再次尝试:把某个算子的并行度从8改成16后,会发现整体资源都上升了,变成了:

总共core需要的是2core8=8core,taskmanager总共内存需要的是4G8=32G

这是因为并行度最大的task对应的subtask会有16个,而且这16个subtask是不能放在同一个slot里的,所以至少需要16个slot,也就是说是由最大并行度的task来决定的,此处我们设置了一个task manager划分成两个slot,所以需要8个task manager(如果我们一开始只设置了一个taskmanger只能有一个slot,那么就至少要有16个taskmanager)

一个分配Slot的例子

Flink在调度任务分配Slot的时候遵循两个重要原则:

  1. 同一个Job中的同一分组中的不同Task可以共享同一个Slot;
  2. Flink是按照拓扑顺序依次从Source调度到sink。
  • 假设有两个TM:TM1、TM2,每个TM有3个Slot:S1,S2,S3。假设source/map的并行度为2,keyBy/window/sink的并行度为4,那么调度的顺序依次为source/map[1] ->source/map[2] ->keyBy/window/sink[1]->keyBy/window/sink[2]->keyBy/window/sink[3]->keyBy/window/sink[4]。那么Flink调度任务时(使用默认共享分组):
  1. 首先调度子任务source/map[1]到TM1.S1;
  2. 然后调度子任务source/map[2] ,根据Flink的调度原则:source/map[1] 和source/map[2] 属于同一个Task下的两个SubTask,所以他们不能放到同一个Slot中,所以source/map[2]被调度到TM1.S2;
  3. 然后调度keyBy/window/sink,keyBy/window/sink的子任务会被依次调度到TM1.S1、TM1.S2、TM2.S1、TM2.S2(之所以不分配到TM1.S3,是为了平衡分配到tm1和tm2)。但是如果source/map与keyBy/window/sink属于不同分组,那么keyBy/window/sink会被调度到TM1.S3、TM2.S1、TM2.S2、TM2.S3。

细粒度资源管理(Fine-Grained Resource Management)

1.14版本推出的高级功能,用于提高大型共享集群的资源利用率。

为什么需要细粒度资源管理

Flink 集群执行多种多样的数据处理工作负载。不同的数据处理步骤通常需要不同的资源,如计算资源、内存等。例如,大多数映射函数都比较轻量,而较大的、保留时间较长的窗口函数往往受益于大量内存。默认情况下,Flink 以粗粒度的 Slot 管理资源,一个 Slot 代表 TaskManager 的一个资源切片。tasks 被提前定义部署,通常被分配相同的 slots 而没有每个 slot 包含多少资源的概念。一个 Slot 可以存放流式处理流程中每个算子的一个并发子任务实例,即一个 Slot 可持有一整条处理流程的并发子任务实例。通过 Slot Sharing Group,用户可以影响子任务在 Slot 上的分布。

对于许多jobs,使用粗粒度的资源管理并简单地把所有的tasks放入一个Slot 共享组中运行,就资源利用率而言,也能运行得很好。

  • 对于许多有相同并行度的 tasks 的流作业而言,每个 slot 会包含整个 pipeline。理想情况条件下,所有的 pipelines 应该使用大致相同的资源,这可以容易被满足通过调节相同 slot 的资源。
  • tasks 的资源消耗随时间变化不同。当一个 task 的资源消耗减少,其他的资源可以被另外一个 task 使用,该 task 的消耗增加。这就是被称为“调峰填谷效应”的现象,它降低了所需要的总体需求。

尽管如此,有些情况下使用粗粒度资源管理效果并不好。

  • 当 Slot 比较小时,为每个 Slot 专门申请 TaskManager 的代价是非常高的(JVM 开销、Flink 框架开销等)。Slot Sharing 通过让不同类型的算子共享 Slot,即在轻量的算子(需要较小的 Slot)和重量的算子(需要较大的 Slot)间共享资源,在一定程度上解决了这个问题。然而,这仅在所有算子的并发度相同时有较好的效果,并非总是最优的。此外,有些算子更适合单独运行(例如机器学习中负责训练的算子需要专用的 GPU资源)。

Kubernetes 和 Yarn 往往需要花费一段时间来满足资源请求,特别是在集群负载较高时。对于一些批处理作业,等待资源的时间会降低作业的执行效率。

  • Tasks 会有不同的并行度。有时,这种不同的并行度是不可避免的。例如,象 source/sink/lookup 这些类别的 tasks 的并行度可能被分区数和外部上下游系统的 IO 负载所限制。在这种情况下,拥有更少的 tasks 的 slots 会需要更少的资源相比 tasks 的整个 pipeline。
  • 有时整个pipeline需要的资源可能会太大以致难于与单一的 slot/TaskManager 的场景相适应。在这种情况下,pipeline 需要被分割成多个 SSGs,它们可能不总是有相同的资源需求。
  • 对于批作业,不是所有的 tasks 能够在同时被执行。因此,整个 pipeline 的瞬时资源需求而时间变化。

试图以相同的 slots 执行所有的tasks,这样会造成非最优的资源利用率。相同 slot 的资源能够满足最高的资源需求,这对于其他资源需求将是浪费的。当涉及到像 GPU 这样昂贵的外部资源时,这样的浪费将是难以承受的。细粒度资源管理运用不同资源的 slots 提高了资源利用率在种使用场景中。

有了细粒度资源管理,TaskManager 上的 Slot 可以动态改变大小。转换和算子指定所需的资源配置(CPU、内存、磁盘等),由 Flink 的 ResourceManager 和 TaskManager 负责从 TaskManager 的总资源中划分出指定大小的资源切片。你可以将这看做是 Flink 中的一层最小化、轻量化的资源编排。下图展示了细粒度资源管理与目前默认的共享固定大小 Slot 资源管理方式的区别。

工作原理

在一个 TaskManager 中,执行 task 时使用的资源被分割成许多个 slots。 Slot 既是资源调度的基本单元,又是Flink运行时申请资源的基本单元。

对于细粒度资源管理,Slot 资源请求包含用户指定的特定的资源配置文件。Flink 会遵从这些用户指定的资源请求并从 TaskManager 可用的资源中动态地切分出精确匹配的 slot。

Flink之前的资源申请只包含必须指定的 slots,但没有精细化的资源配置,这是一种粗粒度的资源管理.在这种管理方式下, TaskManager 以固定相同的 slots 的个数的方式来满足资源需求,如上图左侧图所示。

对于没有指定资源配置的资源请求,Flink会自动决定资源配置。

举一个细粒度资源管理的例子:

Flink 会精确地从 TaskManager 中切分出匹配的 slot 为指定的 slot 资源请求。如上图第一个方框所示,TaskManager将以总资源的形式被启动,但是没有提前指定 slots,所以是free resource=1core 4GB。

当一个 slot 请求 0.25 Core 和 1GB 的内存,Flink 将会选择一个有足够可用资源的 TaskManger 和创建一个新的已经被资源申请的 slot。如果一个 slot 未被使用,它会将它的资源返回到 TaskManager 中的可用资源中去。

在当前的资源分配策略中,Flink 会遍历所有被注册的 TaskMangers 并选择第一个有充足资源的 TaskManager 来满足 Slot 资源请求。当没有 TaskManager 有足够的资源时,Flink将会从Native Kubernetes或者YARN分配一个新的 TaskManager(在当前的策略中,Flink 会根据 用户配置分配相同规格的TaskManagers,因为 TaskManagers 的规格组合是预定义的。)

  • 集群中可能会存在资源碎片。例如,两个 slots 请求 3GB 堆内存,然而 TaskManager 总共的堆内存是 4GB,Flink 会启动两个 TaskManagers,每个 TaskManager 会有1G的堆内存被浪费。未来,可能会有一种资源分配策略,可以根据 job 的 slot 请求分配异构 TaskManagers(即配置不同的tm),从而缓解资源碎片。
  • 确保配置的 Slot 共享组的资源组成不能大于 TaskManager 的总资源。否则,job 会失败,并抛出异常。

牺牲资源隔离来降低整体内存消耗

Flink 1.17 版本还提供了参数扩大 TaskManager 的 slot 之间共享内存的范围,这种方式可以在 TaskManager 中 slot 内存使用不均匀时提高内存效率。基于此在调整参数后可以以资源隔离为代价来降低整体内存消耗。

请参考了解更多相关信息。这个参数的作用:

每个任务管理器的所有 RocksDB 实例之间共享的固定内存总量(集群级选项)。仅当未配置 'state.backend.rocksdb.memory.management' 和 'state.backend.rocksdb.memory.fixed-per-slot' 时,此选项才会生效。默认值为空(不配置),值是内存值。如果没有配置,那么每个 RocksDB 列族状态都有自己的内存缓存(由列族选项控制)。共享资源的相关选项(例如 write-buffer-ratio)可以在同一级别设置(flink-conf.yaml)。注意,此功能打破了插槽之间的资源隔离

标签: flink 大数据

本文转载自: https://blog.csdn.net/oTianShangDiXia/article/details/136276206
版权归原作者 天上地下 所有, 如有侵权,请联系我们删除。

“(增加细粒度资源管理)深入理解flink的task slot相关概念”的评论:

还没有评论