一、Spark的优势
1.1 DAG计算模型
Spark是一个分布式的大数据计算框架,提供了以**RDD**和**DAG**为核心的计算模型,Spark相比中Hadoop传统的MapReduce最重要的一个优化点得益于他的DAG有向无环图,DAG基于宽窄依赖来进行Stage的划分,数据在执行到一些重分区算子时,才会发生shuffle,进行数据的重组与落盘,这意味着对于非keyby的一些计算逻辑,Spark可以连续在内存中执行多个类map方法,最终在shuffle后落盘一次即可,对于相同的计算任务,Spark相比MR中间数据shuffle的次数大大减少。
1.2 缓存与容错
当然,Spark如此快速的原因不仅仅由于上面一点,Spark提供了RDD缓存和**Checkpoint**检查点的相关机制,可以将中间计算结果保留在缓存或持久化到磁盘,对于一些重复的计算步骤,可以直接读取中间结果进行向下计算,同时任务失败时也可以从上次检查点的步骤恢复,不用重新计算整个链路。![](https://i-blog.csdnimg.cn/direct/e4a9718e3f604c529f2f0d197d038230.png)
上图为Spark执行CheckPoint的整个过程,感兴趣的读者可以阅读《Spark源码(四)Checkpoint原理》。
1.3 线程级调度
此外,Spark一个任务的最小计算单元是一个Task,每个Stage的Task个数=**当前Stage最后一个RDD的分区数量**,每个Task携带着当前RDD一个分区的数据信息和计算逻辑,**DAGScheduler**这些Task装入一个**TaskSet**中交给**TaskSechduler**,TaskScheduler会先为TaskSet创建**TaskSetManager**,并将TaskSetManager添加到**SchedulableBuilder**中(调度器)的任务池**(Pool)**,调度器可以选择FIFO和FAIR两种,随后,根据调度算法(**比较Job、Stage的顺序,Task的优先级、本地化级别等**),决定了TaskSetManager的出队列顺序,将Task任务序列化后发往对应的Executor,在Executor的线程池启动一个线程执行计算任务。
因此Spark的Task是一个**线程**级别的计算任务,多个线程共享这个Executor节点的计算核和其他资源,而MapReduce是进程级的,两个核心MapTask和ReduceTask都是一个**jvm进程**,进程的初始化和资源开销要远大于线程,Spark相比MR就显得十分的轻量和快速。
1.4 Shuffle优化
最后,Spark针对Shuffle也做了优化策略,提供了**HashShuffleManager**和**SortShuffleManager**两种Shuffle实现,其中SortShuffle主要是为了解决HashShuffle的**小文件**问题,在对数据进行分区重组的同时进行排序,同时生成一份索引文件,记录数据的**分区**和**偏移量**,极大的减少了中间生成文件的数量和下游的读取次数但是当数据巨大时,分布式排序的性能也是一个问题,因此Spark又提供了一种by-pass机制,在下游分区数量较小时(默认是200),可以免去排序的过程,得益于这两种shuffle策略的动态权衡,Spark在Shuffle上也是相对优于MapReduce的。
二、Spark中的性能瓶颈
2.1 何为基于内存
在讲Spark的性能瓶颈之前,首先思考一个问题,我们知道在计算机系统中,计算操作都是在内存中进行的,内存的读写速度非常快,通常在纳秒级别。它用于存储当前正在处理的数据和指令,以便CPU可以快速访问和处理,那既然这样,为什么经常说Spark是一个基于内存的计算框架呢?下面两点是我的理解:
(1) Spark支持将RDD数据集Persist或者Cache到内存里去,如果将频繁使用的数据集缓存到内存中,则可以大大减少IO、网络传输以及重新计算的代价,尤其是对于Shuffle后的数据集进行缓存,可显著提高应用运行速度,这也是在第三节中会列举的一种优化策略。
(2) 基于pipeline的计算执行策略,此策略可以实现对内存中数据的重用最大化,减少了大量中间结果的IO操作,此优势可相较于Hadoop的MapReduce而言,在一个Stage内部,各个计算操作则可以在内存中Pipeline执行完成,不会涉及到中间结果在磁盘和内存中换入换出的问题,从而减少了大量的IO,计算速度自然会快很多。对pipline的形象理解可以参考:Spark之pipeline机制_spark pipeline setparent-CSDN博客
内存是**十分宝贵**的计算资源,由于Spark对内存的**持续性要求**很高,大量的数据需要**长时间的在占用内存**进行计算,因此内存问题常常会是Spark和任务失败的主要原因,除了内存之外,作为一个分布式框架,Spark的性能瓶颈会下下面几点产生,这也是我们在使用常见的大数据框架(hive、flink、kafka等)的常见问题
2.2 磁盘IO
Spark在计算时会对文件系统上的数据进行频繁的读取和写出操作,在计算过程中,IO的消耗主要集中在Shuffle阶段,我们先暂且不讨论Shuffle的重组和排序等过程,单独讨论对数据文件的读取和写出操作,如:
val data = sc.textFile("...").count()
这样一个stage,就是简单的从hdfs中读取文本数据计算有多少行。几乎没有cpu操作,shuffle的量也很小,主要的操作在DISK IO上,因此,这个stage耗时最长的部分就是在磁盘读写上。我总结了一下,在利用Spark处理数据时,与磁盘的交互主要主要出现在如下几个位置:
(1) Spark直接读取磁盘中的数据进行处理。
(2)在Shuffle过程中,会首先将数据写入内存缓冲区,缓冲区填满会溢写到磁盘产生临时文件。
(3)溢出的临时文件会进行归并,最终形成Shuffle的中间结果文件存储到磁盘(ShuffleWrite)。
(4)Shuffle完成后的下游分区读取对应磁盘中的中间结果文件进行后续处理(ShuffleRead)。
(5)调用persit() 方法时候,如果缓存级别是,则会把内存中装不下的数据放到磁盘,或者直接放到磁盘;调用checkpoint()方法时,也会将RDD持久化到磁盘。
(6)在执行BroadCast操作时,如果被广播的Executor节点内存不足,也会将广播数据写入磁盘。
(7)Spark将结果数据(经处理后)写出到磁盘的目标路径。
上述几个过程,在数据量巨大,或者次数偏多的情况下,会严重拖慢整个任务的进度。
2.3 网络IO
Spark是一个分布式的计算框架,不同节点之间要通过**RPC**(Remote Procedure Call,远程过程调用)高效的网络通信,Spark在2.0.0版本后废除了Akka,完全使用Netty作为Spark的通信工具。 不过Spark 基于 Netty 新的 RPC框架还是借鉴了 Akka 的中的设计,它是基于 Actor模型,各个组件可以认为是一个 个独立的实体,各个实体之间通过消息来进行通信,其中重要的几个组件如下:
(1)RpcEnv:是Spark的RPC通信环境上下文对象,一般通过RpcEnvFactory创建,服务端和客户端都可以使用它来做通信,它负责管理 RPC 的生命周期、创建和查找 RPC 端点、接收RpcEndPoint和RpcEndPointEef的注册处理消息传递等,是 Spark RPC 系统的核心组件。
(2)RpcEndPoint:是一个可以相应请求的服务,表示一个 RPC 端点,其中有 receive() 方法用来接收客户端发送过来的信息,也有 receiveAndReply() 方法用来接收并应答,应答通过 RpcContext 回调。
(3)RpcEndpointRef:是 Spark 中的一个抽象类,是对一个RpcEndPoint 的引用,持有远程 RpcEndPoint 的地址名称等,提供了** send()** 方法和 ask() 方法用于发送请求。
这是Spark内部帮我们搭好的通信框架,下面总结一下Spark网络通信发生的几个位置,这里从内部通信和外部通信两个方面来介绍:
内部:
(1)driver 和 master 的通信,比如 driver 会向 master 发送 RegisterApplication 消息。
**(2)master **和 **worker 的通信,比如 worker 会向 master 上报 worker 上运行 Executor 信息。(3)executor **和 **driver **的的通信, executor 运行在 worker 上, spark 的 tasks 被分发到运行在各个executor中的线程池调度执行。
(4)executor 中, executor 需要通过向 driver 发送任务运行结果。
**(5)worker **和 **worker **的通信, task 运行期间需要从其他地方 fetch 数据,这些数据是由运行在其他 worker上的executor 上的 task 产生,因此需要到 worker 上 fetch 数据。
外部:包括不限于其他数据库、HDFS、Hive、Yarn等,这里主要通过介绍Yarn-Cluster模式中提交的通信来举例。
上图为Spark执行CheckPoint的整个过程,感兴趣的读者可以阅读《Spark源码(二)Yarn模式Spark任务提交》。
(1)用户提交spark-submit后,提交节点和ResourceMnanager进行通信,申请启动ApplicationMaster。
(2)ApplicationMaster内启动Driver线程后,向AM注册自己。
(3)Driver向AM发送请求,AM向RM发送请求,申请资源,选择NM节点启动。CoarseGrainExecutorBackend(Executor),启动后向Driver注册自己。
(4) 计算任务的调度和执行同上
总的来说,网络IO主要发生在如下两个位置:1. **汇集信息**,例如 task 变化信息, executor 状态变化信息。2. **传输数据**, spark shuffle (也就是 reduce 从上游 map 的输出中汇集输入数据)阶段存在大量的数据传输。当我们的计算任务较为复杂时,分区数量较多,节点间通信频繁,对网络带宽是一个巨大的考验,如果**网络带宽**资源不足,亦或集群的**网络空间布局**不是很合理,很可能会造成网络请求超时,最终导致Task或整个Job失败。
2.4 计算和存储资源
系统资源是影响程序性能的最直接因素,在Spark中,比较重要的当属内存与CPU,内存直接影响着Spark作业的性能(不仅Executor,Driver端内存也很重要),充足的内存可以减少磁盘读写,加快数据处理速度,同时也影响着作业的稳定性和并行度。内存不足会导致频繁的磁盘交换,严重影响性能。CPU则是驱动计算的引擎,在并行计算中扮演着关键角色。在Spark中,Job的实际并行度是由于Executor的个数和Executor分配的core(CPU核)数决定的(num executors * executor cores),与RDD的分区个数无关。
高性能的CPU可以支持更多并发任务,加快作业执行速度。同时,合理分配CPU资源可以优化作业的负载均衡,提高整体性能。因此,在设计和优化Spark集群时,充分考虑内存和CPU的配置很重要,它们直接决定了作业的执行效率和整体性能。
2.5 Shuffle
Shuffle是分布式计算中常见的也是最耗时的操作之一,上游的数据按照**分区规则**,**重组洗牌**后发往下游节点执行聚合等计算操作,在Shuffle中也**必然**伴随着磁盘与网络IO,因此上面叙述的问题在Shuffle中同样会产生,**并且是最容易最集中会产生的**,因为Shuffle过程会产生大量的中间文件、上下游分区节点的通信、数据的Read和Write。
在Spark中,Shuffle有**HashShuffle**与**SortShuffle**两种,关于这两种Shuffle的底层实现,在这里就不做赘述了,可以参考:Spark Shuffle机制-腾讯云开发者社区-腾讯云。需要明确的点是,SortShuffle解决了HashShuffle的**大量小文件**问题,在一定程度上减少了**磁盘IO**,但是在大数据场景下,分布式排序是一个特别复杂切耗时的过程,会**持续占用**CPU资源,在极端场景下,非by-pass机制的SortShuffle的性能甚至可能逊于普通的HashShuffle,造成性能瓶颈,这也是**by-pass**机制产生的原因**之一**,使得Spark在一定场景下可以**避免**大规模的数据排序。
如果经过Shuffle过后,数据分布不均匀,某些key的数据量非常巨大,这些key经Shuffle后全部被分到同一分区,就会产生大数据处理中的常见问题 ---- **数据倾斜。**
产生数据倾斜并不一定会导致任务报错,但会使得倾斜分区的处理时长远超过其他Task,进而拖慢整个任务,当然如果这个Task处理的数据量异常巨大,超过了**Executor**的内存配置,就会导致OOM
经上面分析,Spark在Shuffle中产生的性能问题有如下几点
(1)磁盘和网络IO
(2)CPU和内存负载(计算资源)
(3)数据充分区与分布式排序的复杂计算损耗(计算过程)
可见,之前分析的三点原因在Shuffle中都可能会遇到,因此Shuffle是我们在使用Spark进行大数据开发过程中最棘手的点,Shuffle的性能极大的影响着整个任务的处理进度。
三、总结
调优是一个抽丝剥茧、层层深入、的过程,需要业务视角和技术视角的权衡。Spark调优需要全面考虑数据、任务和系统资源等多个方面的因素,需要不断实践和总结经验,才能达到最佳的调优效果。调优并非一蹴而就的过程,但是通过持续的努力和实践,可以使我们逐渐提升对Spark调优的理解和技能,对Spark的具体调优方法见下一章《Spark调优(二)Spark的调优策略》。
版权归原作者 祺嘉朱 所有, 如有侵权,请联系我们删除。