0


Spark作业提交

一.作业提交

1.1 作业提交模式

spark作业提交使用spark-submit命令,作业提交模式有cluster和client两种。

  • 在 cluster 模式下,Spark Drvier 在应用程序的 Master 进程内运行,该进程由群集上的 YARN 管理,提交作业的客户端可以在启动应用程序后关闭;
  • 在 client 模式下,Spark Drvier 在提交作业的客户端进程中运行,Master 进程仅用于从 YARN 请求资源。

1.2 集群角色介绍

角色****作用Master管理集群和节点,不参与计算。Driver
一个Spark作业运行时包括一个Driver进程,也是作业的主进程,具有main函数,并且有SparkContext的实例,是程序的入口点。

负责向集群申请资源,向master注册信息,负责了作业的调度,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。
Woker
主要功能:

管理当前节点内存,CPU的使用状况,

接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务

worker就类似于包工头,管理分配新进程,做计算的服务,相当于process服务。

需要注意的是:

Worker会向Master汇报当前信息。‌在分布式计算框架中,‌Worker节点通过心跳机制向Master节点报告其状态和资源使用情况。‌尽管Worker心跳给Master的主要信息只有Work ID,‌它不会通过心跳方式发送资源信息,‌但在出现故障时,‌会发送资源信息给Master。‌Worker节点不会直接运行代码,‌而是Executor节点负责运行具体的应用程序写的业务逻辑代码。‌这种设计使得Master节点在分配任务时能够知道Worker的工作负载情况,‌从而更好地管理和调度资源.
SparkContext控制整个application的生命周期,包括DAG sheduler和task scheduler等组件。Container
1)Container作为资源分配和调度的基本单位,其中封装了的资源如内存,CPU,磁盘,网络带宽等。 目前yarn仅仅封装内存和CPU

  2)Container由ApplicationMaster向ResourceManager申请的,由ResouceManager中的资源调度器异步分配给ApplicationMaster

  3)Container的运行是由ApplicationMaster向资源所在的NodeManager发起的,Container运行时需提供内部执行的任务命令.

二.作业提交流程

2.1 作业提交流程

07c3191982cd44fea67b3d4d11e116f9.png

1)客户端client向ResouceManager提交Application,ResouceManager接受Application,并根据集群资源状况选取一个node来启动Application的任务调度器driver(ApplicationMaster)

2)ResouceManager找到那个node,命令其该node上的nodeManager来启动一个新的JVM进程运行程序的driver(ApplicationMaster)部分,driver(ApplicationMaster)启动时会首先向ResourceManager注册,说明由自己来负责当前程序的运行。

3)driver(ApplicationMaster)开始下载相关jar包等各种资源,基于下载的jar等信息决定向ResourceManager申请具体的资源内容。

4)ResouceManager接受到driver(ApplicationMaster)提出的申请后,会最大化的满足资源分配请求,并发送资源的元数据信息给driver(ApplicationMaster);

5)driver(ApplicationMaster)收到发过来的资源元数据信息后会根据元数据信息发指令给具体机器上的NodeManager,让其启动具体的container。

6)NodeManager收到driver发来的指令,启动container,container启动后必须向driver(ApplicationMaster)注册。

7)driver(ApplicationMaster)收到container的注册,开始进行任务的调度和计算,直到任务完成。

  补充:如果ResourceManager第一次没有能够满足driver(ApplicationMaster)的资源请求,后续发现有空闲的资源,会主动向driver(ApplicationMaster)发送可用资源的元数据信息以提供更多的资源用于当前程序的运行。

  ApplicationMaster负责销毁使用完之后的Container。

  spark on yarn Cluster 模式下,driver 位于ApplicationMaster进程中,该进程负责申请资源,还负责监控程序、资源的动态情况。

2.2 SparkContext组件

在SparkContext中包含了整个框架中很重要的几部分:

  • SparkEnv:Spark的运行环境,Executor会依赖它去执行分配的task,不光Executor中有,同时为了保证本地模式任务也能跑起来,Driver中也有
  • SparkUI:Spark作业的监控页面,底层并没有采用前端技术,纯后端实现,用以对当前SparkJob的监控和调优,可以从页面观察到目前的Executor的jvm信息,每个job的stage划分和task划分,同时还可以观察到每个task处理的数据,用以发现数据是否倾斜
  • DAGScheduler:DAG调度器,是SparkJob调度系统的重要组件之一,负责创建job,根据RDD依赖情况划分stage,提交stage,将作业划分成一个有向无环图
  • TaskScheduler:任务调度器,是SparkJob调度系统的重要组件之一,负责按照调度算法将DAGScheduler创建的task分发至Executor,DAGScheduler是它的前置调度
  • SparkStatusTracker:提供对作业、Stage的监控
  • ConsoleProcessBar:利用SparkStatusTracker提供监控信息,将任务进度以日志的形式打印到终端中
  • HearbeatReceiver:心跳接收器,所有Executor都会定期向它发送心跳信息,用以统计存活的Executor,此信息会一直同步给TaskScheduler,用以保证TaskScheduler去分发task的时候会挑选合适的Executor
  • ContextCleaner:上下文清理器,用异步的方式去清理那些超出应用作用域范围的RDD、ShuffleDependency和Broadcast
  • LiveListenerBus:SparkContext中的事件总线,可以接收各个组件的事件,并且通过异步的方式对事件进行匹配并调用不同的回调方法
  • ShutdownHookManager:关闭时的钩子管理器,用以做一些清理工作,比如资源释放等
  • AppStatusStore:存储Application状态数据,在2.3.0之后的版本引入
  • EventLoggingListener(可选):将事件持久化到存储的监听器,通过spark.eventLog.enabled 进行控制
  • ExecutorAllocationManager(可选):Executor动态分配管理器,根据工作负载状态动态调整Executor的数量,通过属性spark.dynamicAllocation.enabledspark.dynamicAllocation.testing 进行控制

2.3 DAG生成流程

在 Spark 中,DAG(Directed Acyclic Graph,有向无环图)是一种用于表示作业中所有操作的图。它是对整个作业流程的一种逻辑表示,包括了所有的转换操作(Transformations)和最终的行动操作(Actions)。DAG 的生成和使用对于理解 Spark 的执行和优化非常关键。

DAG 简述:

DAG 是一个节点和边组成的图,其中每个节点表示数据集(例如 RDD)上的一个操作,每条边表示数据的依赖关系。由于它是“无环”的,所以不会出现一个操作依赖于自身的情况,保证了作业可以顺利完成。

DAG 的生成过程:

  1. 代码转化为逻辑执行计划:当用户编写 Spark 程序并触发行动操作时,Spark 首先将代码转化为一系列的逻辑转换步骤。这些步骤对应于各种转换操作(如 map、filter、groupBy 等)。
  2. 逻辑计划转化为物理执行计划:接着,Spark 根据这些逻辑步骤创建出物理执行计划。在这个过程中,Spark 优化器(Catalyst)会对逻辑计划进行优化,比如重新排序转换操作,合并可以管道化执行的操作。
  3. 物理执行计划映射为 DAG:最终,物理执行计划会被映射到一个 DAG。在这个 DAG 中,节点代表着不同的 RDDs 或数据分区,边代表着操作,如转换或结果计算。DAG 详细地描述了数据如何通过各种操作被转换和聚合。
  4. DAG 划分为 Stage:DAG 进一步被分解为一系列的 Stage,每个 Stage 都是一个独立的任务集合,可以并行执行。Stage 的划分主要是基于数据的 Shuffle 需求,每次 Shuffle 会形成一个新的 Stage。
  5. 执行:一旦 DAG 和 Stage 划分完成,Spark 会按照顺序调度 Stage 并行执行其中的 Task。如果某个 Stage 执行失败,Spark 可以重新执行该 Stage,而不需要从头开始执行整个 DAG。

通过这种方式,DAG 使得 Spark 在执行大规模并行数据处理时,能够有效地进行任务调度、容错、以及优化执行计划。DAG 的生成和优化是 Spark 能够高效处理大规模数据的关键因素之一。


本文转载自: https://blog.csdn.net/jpfx6008/article/details/141112169
版权归原作者 Tom无敌宇宙猫 所有, 如有侵权,请联系我们删除。

“Spark作业提交”的评论:

还没有评论