一、Shuffle原理
1、spark shuffle介绍
- 什么时候会发生shuffle,当使用下列算子时,就会发生shuffle: - 重新分区的算子:coalesce, repartition;- byKey类型的算子:reduceByKey,groupByKey,aggregateByKey,foldByKey, combineByKey,sortByKey;- join类型的算子join,leftOuterJoin,cogroup 等
- Spark在DAG调度阶段将job划分成多个stage,上游stage做map操作,下游stage做reduce操作,其本质还是MR计算架构。
- Shuffle是连接map和reduce之间的桥梁,它将map的输出对应到reduce的输⼊,这期间涉及到序列化和反序列化、跨节点⽹络IO和磁盘读写IO等,所以说shuffle是整个应⽤过程特别昂贵的阶段。
以reduceByKey为例,spark的shuffle实现⼤致如下图所⽰,在DAG阶段以shuffle为界,划分stage,上游stage做map task,每个map task将计算结果数据分成多份,每⼀份对应到下游stage的每个partition中,并将其临时写到磁盘,该过程就叫做shuffle write;下游stage叫做reduce task,每个reduce task通过⽹络拉取指定分区结果数据,该过程叫做shuffle read, 最后完成reduce的业务逻辑。
在map阶段,除了map的业务逻辑外,还有shuffle write的过程,这个过程涉及序列化、磁盘IO等耗时操作;
在reduce阶段,除了reduce的业务逻辑外,还有shuffle read过程,这个过程涉及到⽹络IO、反序列化等耗时操作。
所以整个shuffle过程是极其昂贵的。
2、Hadoop shuffle介绍
- 在MapReduce框架,Shuffle是连接Map和Reduce之间的桥梁,Map阶段通过shuffle读取数据并输出到对应的Reduce,而Reduce阶段负责从Map端拉取数据并进行计算。
- 在整个shuffle过程中,往往伴随着大量的磁盘和网络I/O。
- 所以,Hadoop中的shuffle性能的高低也会直接决定了整个程序的性能高低。
具体Shuffle过程详解,如下:
1)MapTask收集我们的map()方法输出的kv对,放到内存缓冲区中
2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3)多个溢出文件会被合并成大的溢出文件
4)在溢出过程及合并的过程中,都要调用Partitioner进行分区和针对key进行排序
5)ReduceTask根据自己的分区号,去各个MapTask机器上取相应的结果分区数据
6)ReduceTask会取到同一个分区的来自不同MapTask的结果文件,ReduceTask会将这些文件再进行合并(归并排序)
7)合并成大文件后,Shuffle的过程也就结束了,后面进入ReduceTask的逻辑运算过程
3、两者shuffle的比较
1. 从逻辑角度来讲,Shuffle 过程就是一个 GroupByKey 的过程,两者没有本质区别,但在实现上有区别。
- MapReduce 为了方便 GroupBy 存在于不同 partition中的 key/value records,就提前对 key 进行排序。
- Spark 认为很多应用不需要对 key 排序,就默认没有在 GroupBy 的过程中对 key 排序。
2. 从数据流角度讲,两者有差别。
- MapReduce 只能从一个 Map Stage shuffle 数据,即整个过程只有一个shuffle:Map -> Reduce
- Spark 可以从多个 Map Stages shuffle 数据,即整个过程可以有多个shuffle:Map -> Shuffle -> Map -> Shuffle -> Reduce
补充说明:
在Hadoop MapReduce中,Map和Reduce阶段是串行执行的。在Map阶段,输入数据被分成多个splits,并在不同的Mapper任务间并行处理。在Map阶段结束后,它会将输出按照指定的Partitioner分区规则分发到不同的Reducer任务上进行进一步处理。这个过程中只有一个Shuffle操作,即将数据从Map任务传输到Reduce任务。 而在Spark中,它的计算模型基于弹性分布式数据集(RDD)。Spark可以在不同的Stage中执行多个Map操作,并且可以在各个Stage之间进行洗牌(Shuffle)操作。Spark的洗牌操作可以将数据重新分区并分发到后续阶段的任务上进行处理。 这意味着Spark在执行计算任务时,可以在多个阶段之间进行Shuffle操作,从而更灵活地处理数据。相反,MapReduce只能在一个任务阶段进行Shuffle,数据的流动是串行的。
3. 从数据 fetch 与数据计算的重叠粒度来讲,两者有细微区别。
- MapReduce 是粗粒度的,reducer fetch 到的数据先被放到中休息,当 shuffle buffer 快满时,才对它们进行 combine()。
- 而 Spark 是细粒度的 ,可以即时将 fetch 到的数据与 HashMap 中相同 key 的数据进行 aggregate。
5. 从性能角度来讲
- Spark考虑的更全面,Spark 针对不同类型的操作、不同类型的参数,会使用不同的 shuffle write 方式。
- MapReduce 的 shuffle 方式单一。
4、shuffle的历程
Spark-1.6 之前默认的shuffle方式是hash。
在 spark-1.6版本之后使用Sort-Base Shuffle,因为Hash Shuffle存在的不足所以就替换了Hash Shuffle。
Spark2.0之后, 从源码中完全移除了HashShuffle.
1) 未优化的Hash shuffle
为了方便分析假设前提:每个 Executor 只有 1 个CPU core,也就是说,无论这个 Executor 上分配多少个 task 线程,同一时间都只能执行一个 task 线程。
如上图中有 3个 Reducer,从 Task 开始那边各自把自己进行 Hash 计算(分区器:hash/numreduce取模),分类出3个不同的类别,每个 Task 都分成3种类别的数据,想把不同的数据汇聚然后计算出最终的结果,所以Reducer 会在每个 Task 中把属于自己类别的数据收集过来,汇聚成一个同类别的大集合,每1个 Task 输出3份本地文件,这里有4个 Mapper Tasks,所以总共输出了4个 Tasks x 3个分类文件 = 12个本地小文件。
** 小文件=task数reducetask数*
缺点:
- map 任务的中间结果首先存入内存(缓存), 然后才写入磁盘. 这对于内存的开销很大, 当一个节点上 map 任务的输出结果集很大时, 很容易导致内存紧张, 发生 OOM
- 生成很多的小文件. 假设有 M 个 MapTask, 有 N 个 ReduceTask, 则会创建 M * N 个小文件, 磁盘 I/O 将成为性能瓶颈.
2)优化的Hash shuffle
优化的 HashShuffle 过程就是启用合并机制,合并机制就是复用buffer,开启合并机制的配置是spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制。通常来说,如果我们使用HashShuffleManager,那么都建议开启这个选项。
这里还是有 4 个Tasks,数据类别还是分成 3 种类型,因为Hash算法会根据你的 Key 进行分类,在同一个进程中,无论是有多少过Task,都会把同样的Key放在同一个Buffer里,然后把Buffer中的数据写入以Core数量为单位的本地文件中,(一个Core只有一种类型的Key的数据),每1个Task所在的进程中,分别写入共同进程中的3份本地文件,这里有4个Mapper Tasks,所以总共输出是 2个Cores x 3个分类文件 = 6个本地小文件。
**小文件=CPU数*reducetask数**
3)普通的sort shuffle
在该模式下,数据会先写入一个数据结构,reduceByKey 写入 Map,一边通过 Map 局部聚合,一边写入内存。Join 算子写入 ArrayList 直接写入内存中。然后需要判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘,清空内存数据结构。
在溢写磁盘前,先根据 key 进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为 10000 条,数据会以每批一万条写入到磁盘文件。写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个 Task 过程会产生多个临时文件。
最后在每个 Task 中,将所有的临时文件合并,这就是merge过程,此过程将所有临时文件读取出来,一次写入到最终文件。意味着一个Task的所有数据都在这一个文件中。同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。
**小文件=task数**
4)bypass sort shuffle
bypass运行机制的触发条件如下(必须同时满足):
1、shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200。2、不是聚合类的shuffle算子(没有预聚合)(比如groupByKey)。
此时 task 会为每个 reduce 端的 task 都创建一个临时磁盘文件,并将数据按 key 进行 hash 然后根据key 的 hash 值,将 key 写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。
该过程的磁盘写机制其实跟未经优化的 HashShuffleManager 是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的Hash Shuffle Manager来说,shuffle read的性能会更好。 而该机制与普通Sort Shuffle Manager运行机制的不同在于:不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。
**小文件=task数**
5)总结
未优化的Hash shuffle
每个下游生成一个单独的文件来存放数据,小文件过多,磁盘IO将会是性能瓶颈。
map 任务的中间结果首先存入内存(缓存), 然后才写入磁盘,很容易导致内存紧张, 发生 OOM。
优化的Hash shuffle启用合并机制,复用buffer,减少小文件的数量,但是不明显。普通的sort shuffle每个task只写一个排序了的文件和索引文件,极大的减少了产生文件的数量。bypass sort shuffle每个task只写一个文件和索引文件,节省了排序的开销和产生文件的数量。
二、Shuffle调优
1、程序调优
⾸先,尽量减少shuffle次数;
//两次shuffle
rdd.map().repartition(1000).reduceByKey(_+_,3000)
//⼀次shuffle
Rdd.map().repartition(3000).reduceByKey(_+_)
然后必要时主动shuffle,通常⽤于改变并⾏度,提⾼后续分布式运⾏速度;
rdd.repartition(largerNumPartition).map()
最后,使⽤treeReduce&treeAggregate替换reduce&aggregate。
数据量较⼤时,reduce&aggregate⼀次性聚合,shuffle量太⼤,⽽treeReduce&treeAggregate是分批聚合,更为保险。
2、参数调优
参数默认值参数说明调优建议
spark.shuffle.file.buffer
(map task到buffer到磁盘)
32K该参数⽤于设置shuffle write task的BufferedOutputStream的buffer缓冲⼤⼩。将数据写到磁盘⽂件之前,
会先写⼊buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。如果作业可⽤的内存资源较为充⾜的话,可以适当增加这个参数的⼤⼩(⽐如64k),从⽽减少shuffle
write过程中溢写磁盘⽂件的次数,也就可以减少磁盘IO次数,进⽽提升性能。在实践中发现,合理调节该参数,性能会有1到5%的提升。
spark.reducer.maxSizeFlight
(reduce task去磁盘拉取数据)
48M该参数⽤于设置shuffle read task的buffer缓冲⼤⼩,⽽这个buffer缓冲决定了每次能够拉取多少数据。如果作业可⽤的内存资源较为充⾜的话,可以增加这个参数的⼤⼩(⽐如96M),从⽽减少拉取数据的次
数,也就可以减少⽹络传输的次数,进⽽提升性能。在实践中发现,合理调节该参数,性能会有1到5%的提升。
Spark.shuffle.io.maxRetries
(拉取失败的最大重试数)
3shuffle read task从shuffle write task所在节点拉取属于⾃⼰的数据时,如果因为⽹络异常导致拉取失败,时会⾃动进⾏重试的。该参数就代表了可以重试的最⼤次数,如果在指定次数内拉取属于还是没有成功,就可能会导致作业执⾏失败。对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最⼤次数(⽐如6次),可以避免由于JVM
的full gc或者⽹络不稳定等因素导致的数据拉取失败。在实践中发现,对于超⼤数据量(数⼗亿到上百亿)的shuffle
过程,调节该参数可以⼤幅度提升稳定性。
Spark.shuffle.io.retryWait
(拉取失败时重试拉取的间隔)
5sshuffle read task从shuffle write task所在节点拉取属于⾃⼰的数据时,如果拉取失败了每次重试拉取数据
的等待时间间隔,默认是5s。建议加⼤时间间隔时长,⽐如60s,以增加shuffle操作的稳定性
spark.shuffle.memoryFraction
(分配给read进行聚合操作的内存比例)
0.2该参数代表了executor内存中,分配给shuffle read task进⾏聚合操作的内存⽐例,默认是20%如果内存充⾜,⽽且很少使⽤持久化操作,建议调⾼和这个⽐例,给shuffle read的聚合操作更多内存,以避免由于内存不⾜导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%。
Spark.shuffle.manager
(shuffle 机制)
sort该参数⽤于设置shuffleManager的类型。Spark1.5以后有三个可选项:hash、sort和tungsten-
sort。Tungsten-sort与sort类似,但是使⽤了tungsten计划中的堆外内存管理机制,内存使⽤效率提⾼。由于sort shuffle Manager 默认会对数据进⾏排序,因此如果你的业务逻辑中需要该排序机制的话,则使⽤默认的sort Shuffle Manager 就可以;但是如果你的业务逻辑不需要对数据进⾏排序,那么建议参考后⾯的⼏个参数调优,通过bypass机制或优化的hash Shuffle Manager来避免排序操作,同时提供较好的磁盘读写性能。Spark.shuffle.sort.bypassMergeThreshold200当shuffle Manager为sort shuffle Manager时,如果shuffle read task的数量⼩于这个阈值,则shuffle write过程中不会进⾏排序操作,⽽是直接按照未经优化的hash Shuffle Manager的⽅式去写数据,但是最后会将每个task产⽣的所有临时磁盘⽂件都合并成⼀个⽂件,并会创建单独的索引⽂件。当你使⽤sort Shuffle Manager时,如果的确不需要排序操作,那么建议将这个参数调⼤⼀些,⼤于shuffle
read task的数量,那么此时就会⾃动启⽤bypass机制,map-side就不会进⾏排序,减少了排序的性能开销。但是这种⽅式下,依然会产⽣⼤量的磁盘⽂件,因此shuffle write性能有待提⾼.Spark.shuffle.consolidateFilesfalse如果使⽤hash Shuffle Manager,该参数有效。如果设置为true,那么就会开启consilidate机制,会⼤幅度
合并shuflle write的输出⽂件,对于shuffle read task数量特别多的情况下,这种⽅法可以极⼤地减少磁盘IO开销,提升性能。如果的确不需要sort Hash Shuffle的排序机制,那么除了使⽤bypass机制,还可以尝试将spark.shuffle.manager参数⼿动调节为hash,使⽤hash Shuffle Manager,同时开启consolidate机制。在实践中尝试
过,发现其性能⽐开启了bypass机制的sortshuffleManager要⾼出10%到30%。
路漫漫其修远兮,吾将上下而求索。 ---屈原
版权归原作者 烟雨蒋楠 所有, 如有侵权,请联系我们删除。