一、运行时架构
上一篇我们可以看到Flink的核心组件的Deploy层,该层主要涉及了Flink的部署模式,Flink支持多种部署模式:本地、集群(Standalone/YARN)、云(GCE/EC2)。
Local(本地):单机模式,一般本地开发调试使用,像我们程序写的WordCountStream那个例子,直接运行main方法启动。
Cluster(集群)
Standalone(独立模式):Flink自带集群,自己管理资源调度,生产环境也会有所应用。
YARN(YARN模式):计算资源统一由Hadoop YARN管理,生产环境应用较多。
Cloud(云端):AliCloud Realtime Compute、Amazon EMR、Huawei Cloud Stream Service 等。
我们这里主要来介绍Cluster集群的两种模式Standalone、YARN。
二、YARN集群架构
在讲解Flink集群架构之前,我们先了解一下YARN集群架构,我觉得是很有必要的。YARN集群总体上是经典的主/从(Master/Slave)架构,主要由ResourceManager、NodeManager、ApplicationMaster和Container等几个组件构成。
在这里插入图片描述
2.1 ResourceManager
以后台进程的形式运行,负责对集群资源进行统一管理和任务调度。ResourceManager的主要职责如下:
接收来自客户端的请求。
启动和管理各个应用程序的ApplicationMaster。
接收来自ApplicationMaster的资源申请,并为其分配Container。
管理NodeManager,接收来自NodeManager的资源和节点健康情况汇报。
2.2 NodeManager
集群中每个节点上的资源和任务管理器,以后台进程的形式运行。它会定时向ResourceManager汇报本节点上的资源(内存、CPU)使用情况和各个Container的运行状态,同时会接收并处理来自ApplicationMaster的Container启动/停止等请求。NodeManager不会监视任务,它仅监视Container中的资源使用情况,例如。如果一个Container消耗的内存比最初分配的更多,就会结束该Container。
2.3 Task
应用程序具体执行的任务。一个应用程序可能有多个任务,例如一个MapReduce程序可以有多个Map任务和多个Reduce任务。
2.4 Container
YARN中资源分配的基本单位,封装了CPU和内存资源的一个容器,相当于一个Task运行环境的抽象。从实现上看,Container是一个Java抽象类,定义了资源信息。应用程序的Task将会被发布到Container中运行,从而限定了Task使用的资源量。
一个应用程序所需的Container分为两类:运行ApplicationMaster的Container和运行各类Task的Container。前者是由ResourceManager向内部的资源调度器申请和启动的,后者是由ApplicationMaster向ResourceManager申请的,并由ApplicationMaster请求NodeManager进行启动。
我们可以将Container类比成数据库连接池中的连接,需要的时候进行申请,使用完毕后进行释放,而不需要每次独自创建。
2.5 ApplicationMaster
ApplicationMaster可在Container内运行任何类型的Task。例如,MapReduce ApplicationMaster请求一个容器来启动Map Task或Reduce Task。也可以实现一个自定义的ApplicationMaster来运行特定的Task,以便任何分布式框架都可以受YARN支持,只要实现了相应的ApplicationMaster即可。
我们可以这样认为:ResourceManager管理整个集群,NodeManager管理集群中的单个节点,ApplicationMaster管理单个应用程序(集群中可能同时有多个应用程序在运行,每个应用程序都有各自的ApplicationMaster)。
YARN集群中应用程序的执行流程如下图所示:
客户端提交应用程序(可以是MapReduce程序、Spark程序等)到ResourceManager。
ResourceManager分配用于运行ApplicationMaster的Container,然后与NodeManager通信,要求它在该Container中启动ApplicationMaster。ApplicationMaster启动后,它将负责此应用程序的整个生命周期。
ApplicationMaster向ResourceManager注册(注册后可以通过ResourceManager查看应用程序的运行状态)并请求运行应用程序各个Task所需的Container(资源请求是对一些Container的请求)。如果符合条件,ResourceManager会分配给ApplicationMaster所需的Container(表达为Container ID和主机名)。
ApplicationMaster请求NodeManager使用这些Container来运行应用程序的相应Task(即将Task发布到指定的Container中运行)。
此外,各个运行中的Task会通过RPC协议向ApplicationMaster汇报自己的状态和进度,这样一旦某个Task运行失败,ApplicationMaster就可以对其重新启动。当应用程序运行完成时,ApplicationMaster会向ResourceManager申请注销自己。
在这里插入图片描述
三、Flink Standalone模式
Flink Standalone模式为经典的主从(Master/Slave)架构,资源调度是Flink自己实现的。集群启动后,主节点上会启动一个JobManager进程,类似YARN集群的ResourceManager,因此主节点也称为JobManager节点;各个从节点上会启动一个TaskManager进程,类似YARN集群的NodeManager,因此从节点也称为TaskManager节点。从Flink 1.6版本开始,将主节点上的进程名称改为了StandaloneSessionClusterEntrypoint,从节点的进程名称改为了TaskManagerRunner,在这里为了方便使用,仍然沿用之前版本的称呼,即JobManager和TaskManager。
Client接收到Flink应用程序后,将作业提交给JobManager。JobManager要做的第一件事就是分配Task(任务)所需的资源。完成资源分配后,Task将被JobManager提交给相应的TaskManager,TaskManager会启动线程开始执行。在执行过程中,TaskManager会持续向JobManager汇报状态信息,例如开始执行、进行中或完成等状态。作业执行完成后,结果将通过JobManager发送给Client。
Flink所有组件之间的通信使用的是Akka框架,组件之间的数据交互使用的是Netty框架。
在这里插入图片描述
Client 不是运行时和程序执行的一部分,而是用于准备数据流并将其发送给 JobManager。之后,客户端可以断开连接(分离模式),或保持连接来接收进程报告(附加模式)。客户端可以作为触发执行 Java/Scala 程序的一部分运行,也可以在命令行进程./bin/flink run …中运行。
可以通过多种方式启动 JobManager 和 TaskManager:直接在机器上作为standalone 集群启动、在容器中启动、或者通过YARN等资源框架管理并启动。TaskManager 连接到 JobManagers,宣布自己可用,并被分配工作。
3.1 JobManager
JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:
ResourceManager
ResourceManager 负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考TaskManagers)。Flink 为不同的环境和资源提供者(例如 YARN、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。
Dispatcher
Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。
JobMaster
JobMaster 负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。
始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby。
3.2 TaskManager
TaskManager是Flink集群的工作进程。Task被调度到TaskManager上执行。TaskManager相互通信,只为在后续的Task之间交换数据。
TaskManager的主要作用如下:
接收JobManager分配的任务,负责具体的任务执行。
TaskManager会在同一个JVM进程内以多线程的方式执行任务。· 负责对应任务在每个节点上的资源申请,管理任务的启动、停止、销毁、异常恢复等生命周期。
负责对数据进行缓存。TaskManager之间采用数据流的形式进行数据交互。
3.3 Tasks 和算子链
对于分布式执行,Flink 将算子的 subtasks 链接成 tasks。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。链行为是可以配置的。
下图中样例数据流用 5 个 subtask 执行,因此有 5 个并行线程。
在这里插入图片描述
3.4 Task Slots 和资源
每个 worker(TaskManager)都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask。为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)。
每个 task slot 代表 TaskManager 中资源的固定子集。例如,具有 3 个 slot 的 TaskManager,会将其托管内存 1/3 用于每个 slot。分配资源意味着 subtask 不会与其他作业的 subtask 竞争托管内存,而是具有一定数量的保留托管内存。注意此处没有 CPU 隔离;当前 slot 仅分离 task 的托管内存。
通过调整 task slot 的数量,用户可以定义 subtask 如何互相隔离。每个 TaskManager 有一个 slot,这意味着每个 task 组都在单独的 JVM 中运行(例如,可以在单独的容器中启动)。具有多个 slot 意味着更多 subtask 共享同一 JVM。同一 JVM 中的 task 共享 TCP 连接(通过多路复用)和心跳信息。它们还可以共享数据集和数据结构,从而减少了每个 task 的开销。
在这里插入图片描述
默认情况下,Flink 允许 subtask 共享 slot,即便它们是不同的 task 的 subtask,只要是来自于同一作业即可。结果就是一个 slot 可以持有整个作业管道。允许 slot 共享有两个主要优点:
Flink 集群所需的 task slot 和作业中使用的最大并行度恰好一样。无需计算程序总共包含多少个 task(具有不同并行度)。
容易获得更好的资源利用。如果没有 slot 共享,非密集 subtask(source/map())将阻塞和密集型 subtask(window) 一样多的资源。通过 slot 共享,我们示例中的基本并行度从 2 增加到 6,可以充分利用分配的资源,同时确保繁重的 subtask 在 TaskManager 之间公平分配。
在这里插入图片描述
四、Flink On YARN模式
Flink On YARN模式遵循YARN的官方规范,YARN只负责资源的管理和调度,运行哪种应用程序由用户自己实现,因此可能在YARN上同时运行MapReduce程序、Spark程序、Flink程序等。YARN很好地对每一个程序实现了资源的隔离,这使得Spark、MapReduce、Flink等可以运行于同一个集群中,共享集群存储资源与计算资源。Flink On YARN模式的运行架构如下图所示。
在这里插入图片描述
当启动一个Client(客户端)会话时,Client首先会上传Flink应用程序JAR包和配置文件到HDFS。
Client向ResourceManager申请用于运行ApplicationMaster的Container。
ResourceManager分配用于运行ApplicationMaster的Container,然后与NodeManager通信,要求它在该Container中启动ApplicationMaster(ApplicationMaster与Flink JobManager运行于同一Container中,这样ApplicationMaster就能知道Flink JobManager的地址)。ApplicationMaster启动后,它将负责此应用程序的整个生命周期。另外,ApplicationMaster还提供了Flink的WebUI服务。
ApplicationMaster向ResourceManager注册(注册后可以通过ResourceManager查看应用程序的运行状态)并请求运行Flink TaskManager所需的Container(资源请求是对一些Container的请求)。如果符合条件,ResourceManager会分配给ApplicationMaster所需的Container(表达为Container ID和主机名)。ApplicationMaster请求NodeManager使用这些Container来运行Flink TaskManager。各个NodeManager从HDFS中下载Flink JAR包和配置文件。至此,Flink相关任务就可以运行了。
此外,各个运行中的Flink TaskManager会通过RPC协议向ApplicationMaster汇报自己的状态和进度
版权归原作者 稚辉君.MCA.P8.JAVA高级架构师 所有, 如有侵权,请联系我们删除。