0


第三部分:Spark调优篇

第一部分:Spark基础篇_奔跑者-辉的博客-CSDN博客

第一部分:Spark基础篇_奔跑者-辉的博客-CSDN博客

第三部分:Spark调优篇_奔跑者-辉的博客-CSDN博客


1 常规性能调优

常规性能调优一: 最优资源配置

Spark性能调优的第一步,就是为任务分配更多的资源,在一定范围内,增加资源的分配与性能的提升是成正比的,实现了最优的资源配置后,在此基础上再考虑进行后面论述的性能调优策略。

资源的分配在使用脚本提交Spark任务时进行指定: 参数值可以如下表

名称

说明

--num-executors

配置Executor的数量 50~100

--driver-memory

配置Driver内存(影响不大) 1~5G

--executor-memory

配置每个Executor的内存大小 6~10G

--executor-cores

配置每个Executor的CPU core数量 3

常规性能调优二: RDD调优

①RDD复用

在对RDD进行算子时,要避免相同计算逻辑之下对RDD进行重复的计算;

②RDD持久化
通过持久化将公共RDD的数据缓存到内存/磁盘中,之后对于公共RDD的计算都会从内存/磁盘中直接获取RDD数据即可;

③RDD尽可能早的filter操作

获取到初始RDD后,应该考虑尽早地过滤掉不需要的数据,进而减少对内存的占用,从而提升Spark作业的运行效率。

常规性能调优三:并行度调节

Spark作业中的并行度指各个stage的task的数量。

理想的并行度设置,应该是让并行度与资源相匹配,简单来说就是在资源允许的前提下,并行度要设置的尽可能大,达到可以充分利用集群资源。合理的设置并行度,可以提升整个Spark作业的性能和运行速度。

Spark官方推荐,task数量应该设置为Spark作业总CPU core数量的2~3倍。

常规性能调优四: 广播变量

当算子函数中使用到外部变量时,需要用到广播变量; 广播变量会保证每个Executor的内存中,只驻留1份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。这样的话,可以大大减少变量副本的数量,减少网络传输性能开销,减少对Executor内存的占用的开销,提高spark的效率;

常规性能调优五:Kryo序列化

默认情况下,Spark使用Java的序列化机制 , Java序列化机制的效率不高,序列化速度慢并且序列化后的数据所占用的空间依然较大。

Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便,但从Spark 2.0.0版本开始,简单类型、简单类型数组、字符串类型的Shuffling RDDs 已经默认使用Kryo序列化方式了。

2 算子调优

算子调优一:调节mapPartitions

普通的map算子对RDD中的每一个元素进行操作,而mapPartitions算子对RDD中每一个分区进行操作。但是如果使用mapPartitions算子,但数据量非常大时,function一次处理一个分区的数据,如果一旦内存不足,此时无法回收内存,就可能会OOM,即内存溢出。

因此,mapPartitions算子适用于数据量不是特别大的时候,此时使用mapPartitions算子对性能的提升效果还是不错的。(当数据量很大的时候,一旦使用mapPartitions算子,就会直接OOM);

算子调优二:foreachPartition优化数据库操作

如果使用foreach算子完成数据库的操作,由于foreach算子是遍历RDD的每条数据,因此,每条数据都会建立一个数据库连接,这是对资源的极大浪费; 在生产环境中,通常使用foreachPartition算子来完成对1个数据库的写入,通过foreachPartition算子,可以优化写数据库的性能。

算子调优三:filter与coalesce的配合使用

在Spark任务中我们经常会使用filter算子完成RDD中数据的过滤,但是一旦进过filter过滤后,每个分区的数据量有可能会存在较大差异,如图下图所示:

为了解决filter过滤后每个分区数据量不均匀的情况,我们可以使用coalesce进行重分区操作.

算子调优四:repartition解决SparkSQL低并行度问题

为了解决Spark SQL无法手动设置并行度和task数量的问题, 我们可以使用repartition算子,以提高Spark SQL并行度和task的数量;

算子调优五:reduceByKey本地聚合

使用reduceByKey对性能的提升如下:

(1) 本地聚合后,在map端的数据量变少,减少了磁盘IO,也减少了对磁盘空间的占用;

(2) 本地聚合后,下一个stage拉取的数据量变少,减少了网络传输的数据量;

(3) 本地聚合后,在reduce端进行数据缓存的内存占用减少;

(4) 本地聚合后,在reduce端进行聚合的数据量减少。

3 Shuffle调优

Shuffle调优一:调节map端缓冲区大小

在Spark任务运行过程中,如果shuffle的map端处理的数据量比较大,但是map端缓冲的大小是固定的,可能会出现map端缓冲数据频繁溢写到磁盘文件中的情况,使得性能非常低下,通过调节map端缓冲的大小,可以避免频繁的磁盘IO操作,进而提升Spark任务的整体性能。

map端缓冲的默认配置是32KB,如果每个task处理640KB的数据,那么会发生640/32 = 20次溢写,如果每个task处理64000KB的数据,机会发生64000/32=2000此溢写,这对于性能的影响是非常严重的。32kb[按2倍左右调]====>64KB;

Shuffle调优二:调节reduce端拉取数据缓冲区大小

Spark Shuffle过程中,shuffle reduce task的buffer缓冲区大小决定了reduce task每次能够缓冲的数据量,也就是每次能够拉取的数据量,如果内存资源较为充足,适当增加拉取数据缓冲区的大小,可以减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。

reduce端数据拉取缓冲区的大小可以通过spark.reducer.maxSizeInFlight参数进行设置,默认为48MB,可优化调为96M ;

Shuffle调优三:调节reduce端拉取数据重试次数

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试。对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如6次),以避免由于网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。

reduce端拉取数据重试次数可以通过spark.shuffle.io.maxRetries参数进行设置,该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败,默认为3次.

Shuffle调优四:调节reduce端拉取数据等待间隔

Spark Shuffle过程中,reduce task拉取属于自己的数据时,如果因为网络异常等原因导致失败会自动进行重试,在一次失败后,会等待一定的时间间隔再进行重试,可以通过加大间隔时长(比如60s),以增加shuffle操作的稳定性。

reduce端拉取数据等待间隔可以通过spark.shuffle.io.retryWait参数进行设置,默认值为5s.

Shuffle调优五:调节SortShuffle排序操作阈值

对于SortShuffleManager,如果shuffle reduce task的数量小于某一阈值则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。

当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量,那么此时map-side就不会进行排序了,减少了排序的性能开销,但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。

SortShuffleManager排序阈值的设置可以通过spark.shuffle.sort. bypassMergeThreshold这一参数进行设置,默认值为200;

4 JVM调优

JVM调优一:降低cache操作的内存占比

①静态内存管理机制 [老版本--->现在已经不需要手动调节]

根据Spark静态内存管理机制,堆内存被划分为了两块,Storage和Execution。Storage主要用于缓存RDD数据和broadcast数据,Execution主要用于缓存在shuffle过程中产生的中间数据,Storage占系统内存的60%,Execution占系统内存的20%,并且两者完全独立。

在Spark UI中可以查看每个stage的运行情况,包括每个task的运行时间、gc时间等等,如果发现gc太频繁,时间太长,就可以考虑调节Storage的内存占比,让task执行算子函数式,有更多的内存可以使用。

②统一内存管理机制 [动态内存管理1.6版本开始引进--->2016.1月推出]

根据Spark统一内存管理机制,堆内存被划分为了两块,Storage和Execution。Storage主要用于缓存数据RDD数据和broadcast数据,Execution主要用于缓存在shuffle过程中产生的中间数据,两者所组成的内存部分称为统一内存,Storage和Execution各占统一内存的50%,由于动态占用机制的实现,shuffle过程需要的内存过大时,会自动占用Storage的内存区域,因此无需手动进行调节。

JVM调优二:调节Executor堆外内存

Executor的堆外内存主要用于程序的共享库、Perm Space、 线程Stack和一些Memory mapping等, 或者类C方式allocate object。
在运行Spark任务的时候,也打开堆外内存,为堆内内存分担些压力;堆外但是堆外内存打开时默认只取内存的10%大小 ,对于任务来说不够用,要把堆外内存提高至少1G以上。

以上参数配置完成后,会避免掉某些JVM OOM的异常问题,同时,可以提升整体Spark作业的性能。

JVM调优三:调节连接等待时长

垃圾回收会导致工作现场全部停止,也就是说,垃圾回收一旦执行,Spark的Executor进程就会停止工作。此时,由于没有响应,无法建立网络连接,会导致网络连接超时。

解决: 连接等待时长可以在spark-submit脚本中进行设置。

5 Spark数据倾斜

Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同, 导致的不同task所处理的数据量不同的问题。

例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配到了1万条数据,计算5分钟内完成,第三个task分配到了98万数据,此时第三个task可能需要10个小时完成,这使得整个Spark作业需要10个小时才能运行完成,这就是数据倾斜所带来的后果。

注意,要区分开数据倾斜与数据量过量这两种情况,数据倾斜是指少数task被分配了绝大多数的数据,因此少数task运行缓慢;数据过量是指所有task被分配的数据量都很大,相差不多,所有task都运行缓慢。

数据倾斜的表现

① Spark作业的大部分task都执行迅速,只有有限的几个task执行的非常慢,此时可能出现了数据倾斜,作业可以运行,但是运行得非常慢;

② Spark作业的大部分task都执行迅速,但是有的task在运行过程中会突然报出OOM,反复执行几次都在某一个task报出OOM错误,此时可能出现了数据倾斜,作业无法正常运行。

定位数据倾斜问题

① 查阅代码中的shuffle算子,例如reduceByKey、countByKey、groupByKey、join等算子,根据代码逻辑判断此处是否会出现数据倾斜;

② 查看Spark作业的log文件,log文件对于错误的记录会精确到代码的某一行,可以根据异常定位到的代码位置来明确错误发生在第几个stage,对应的shuffle算子是哪一个;

解决方案:

**解决方案一: 避免数据源的数据倾斜 **
通过对Hive表中对倾斜的数据进行预处理,以及在进行kafka数据分发时尽量进行平均分配 , 这种方案从根源上解决了数据倾斜问题,实现起来方便;

解决方案二: 避免shuffle过程
为了避免数据倾斜,我们可以考虑避免shuffle过程,如果避免了shuffle过程,那么从根本上就消除了发生数据倾斜问题的可能;

解决方案三:过滤导致倾斜的key
如果在Spark作业中允许丢弃某些数据,那么可以考虑将可能导致数据倾斜的key对应的数据进行过滤,这样,在Spark作业中就不会发生数据倾斜了;

解决方案四:提高shuffle操作中的reduce并行度
提高shuffle过程中的reduce端并行度, 就增加了reduce端task的数量,那么每个task分配到的数据量就会相应减少,由此缓解数据倾斜问题。

标签: spark jvm java

本文转载自: https://blog.csdn.net/index_test/article/details/126729526
版权归原作者 奔跑者-辉 所有, 如有侵权,请联系我们删除。

“第三部分:Spark调优篇”的评论:

还没有评论