1、什么是 shuffle
shuffle
单从英文意思上来看,是 “洗牌” 的意思,但在大数据分布式计算中指的是集群范围内跨节点、跨进程的数据分发。 也就是 2 个 Stage 之间,数据进行传递的过程就叫做 Shuffle。
那为什么需要 shuffle 呢?
这往往和大数据的很多计算场景相关,比如我们一直在说的 WordCount ,把相同的单词进行计数。因此,需要对数据进行聚合操作,聚合自然要对相同的 key 进行聚合,所以需要通过 shuffle 把各个节点相同的 key 拉到一起,可以结合下图理解一下。
当然,WordCount 是我举的一个小栗子。其实,在 Spark 中,聚合、排序是一个通用的计算场景(都有相应的算子),而且很多时候数据量会很大,因此,必须得设计 shuffle 来支持这些不同类型的计算。
那 shuffle 到底解决了什么问题?
我们已经知道 Shuffle 指的是集群范围内跨节点、跨进程数据分发,其实分发并不是上游 Stage 直接把数据分发给下游,而是先输出一个中间文件,然后下游 Stage 去读取中间文件。
所以,shuffle 要解决的问题就是如何将这些中间文件数据重新组织,使其能够在上下游 Task 之间进行数据传递和计算。
2、shuffle 工作原理
Shuffle
分为
Shuffle Write
和
Shuffle Read
两个阶段,前者主要解决上游 stage 输出数据的分区问题,后者主要解决下游 stage 从上游 stage 获取数据、重新组织、并为后续操作提供数据源的问题。
干说有点晦涩,为了你方便理解,我还是用
WordCount
这个例子来说明。在
WordCount
中,要实现相同的单词进行分组计数,需要引入 shuffle,其流程图如下:
WordCount
中主要是
reduceByKey
产生了 Shuffle ,从上图可知,以 Shuffle 为边界把计算切分成 2 个 Stage, Shuffle 的上游叫做 Map 阶段,Shuffle 的下游叫做 Reduce 阶段。
Map 阶段,每个分区内先把自己的数据做一个初步的 combine,又称为 Map 端聚合;然后不同的单词会根据分区函数计算出来的分区号,被分发到下游对应的分区
最后
Reduce
阶段以单词为 key 执行聚合函数做第二次聚合,这里称为全局聚合,从而实现了单词计数的功能。
上述流程值得注意的是,在 Shuffle 环节,数据不是直接分发到下游,而是先写到中间文件中,然后
Reduce
阶段会去读取这个中间文件的数据。
总的来说,Map 和 Reduce 类似于一个生产者-消费者模型,Map 阶段生产 shuffle 中间文件,reduce 消费 shuffle 中间文件,从而衔接了 shuffle 的上下游,实现了数据的交换。
那么,疑问又来了,什么是 shuffle 中间文件?shuffle 中间文件又是怎样生成的?
DAGScheduler
会从数据依赖图后往前回溯,以 shuffle 为边界切割 job ,生成 Stage,并会为每个 Stage 创建任务集合
TaskSet
,每一个
TaskSet
又都包含多个分布式任务 Task。
在 Map 阶段,每个 Task 都会生成 2 个文件:一个 data 文件,一个 index 文件。
这两个文统称为 Shuffle 中间文件,并且以 Map 端的 Task 粒度生成,有多少个并行的 Task ,
就会生成多少份 Shuffle 中间文件(最后小文件个数:2 x
Map Task
个数)。
shuffle 中间文件到底长啥样?我们已经说了,它是 data 文件和 index 文件的一个统称。我们以
WordCount
为例来说明:
从上图可知,data 文件记录的就是原始的 <key,value> 数据集,index 文件则记录了哪些数据属于下游哪个
Reduce
分区,
标识了目标分区所属数据的起始索引。在图中,为了简洁描述,我只用了 2 条数据示意,实际上更复杂。
我们知道了 shuffle 文件的样子,下面看看它是如何产生的?
3、shuffle write
在 Spark shuffle 机制中,Shuffle Write 主要就是生成 shuffle 中间文件的一个过程。
在 shuffle 中间文件生成的过程中,
Shuffle Writer
主要承担的功能有:数据分区、聚合和排序 3 个功能。
关于数据分区,主要考虑 2 个问题。
第一个问题,如何确定分区个数?
一般来说,分区个数与下游 Stage 的 Reduc Task 个数一致。在 shuffle 时,用户通过指定分区个数:
numPartitions
相应的会有同样个数的 reduce task 来处理相应的数据;如果用户没有自定义,则分区个数默认为 parent RDD 中分区个数的最大值。
第二个问题,对于 map task 输出的 record 如何分区?
公式:
partitionId = Hash(key) % numpartitions
。即每计算出一条 record,计算其 key 的哈希值,并与分区个数取模,则得到该条数据的分区 ID
关于数据聚合,本来聚合应该是
shuffle read
完数据之后要做的,之所以在
shuffle write
时做 combine ,是考虑到数据量大的时候,提前聚合以减少数据量,从而减少网络 IO。
这里要注意的是,
shuffle read
聚合是属于全局聚合,
shuffle write
聚合只针对当前分区做聚合。
关于排序,Spark 采取的是先聚合,再排序。实现主要是设计了一个特殊的数据结构,类似
HashMap+Array
,完成先聚合,再排序的功能。
针对上述三个功能,Spark 设计了一个通用的 Shuffle Write 框架,该框架的执行顺序为:map 输出 record ---> 数据聚合 ---> 排序 ---> 分区。其中数据聚合、排序是可选项。
这里还是使用 WordCount 例子给大家做说明:
如图所示,Spark 采用的实现方法是建立一个类似
HashMap + Array
的内存数据结构,对
map()
输出的 record 进行聚合。
类
HashMap
结构中的 Key 由
“partitionId+Key”
组成, Value 是经过相同 Key combine 的结果。
在图中,map 端聚合是
sum()
函数,那么 Value 中存放的便是多个 record 对应的 Value 相加的结果。聚合完成后,Spark 对类似
HashMap
中的 record 进行排序。
如果需要按 Key 进行排序,那么按
partitionId+Key
进行排序。最后,将排序后的 record 写入一个分区文件中。
如果该数据结构存放不下,则会先扩容为 2 倍大小,如果还存放不下,就将类
HashMap
中的 record 排序后 spill 到磁盘上。
此时,
HashMap
被清空,可以继续对
map()
输出的 record 进行聚合,如果内存再次不够用,那么继续spill到磁盘上,此过程可以重复多次。
当
map()
输出完成以后,将此时
HashMap
中的 reocrd 与磁盘上已排序的 record 进行再次聚合(merge),得到最终的
record
,并输出到相应的分区文件中,该分区文件,便是上文中提到的 Shuffle 中间文件。
好了,我们知道 shuffle 中间文件是如何产生的,接下来我们继续看 reduce task 如何读取 shuffle 中间文件?
4、shuffle read
当
Shuffle Write
输出了 shuffle 中间文件后,就到了
Shuffle Read
阶段。
Shuffle Read
主要需要实现 3 个功能:跨节点拉取数据,聚合和排序。
同样的,Spark 为了支持这三个功能,设计了一个通用的 Shuffle Read 框架,它的计算顺序:跨节点数据拉取 ---> 数据聚合 ---> 排序 ---> 输出。
这里值得注意的点是,对于每一个
Map Task
生成的中间文件,其中生成的分区数量由下游的 Reduce 阶段的 Task 个数决定,并且 index 文件标识了目标分区所属数据的起始索引。
为了讲清楚
Shuffle Read
的细节,小林还是使用
WordCount
例子,给大家揭开
Shuffle Read
的神秘面纱。
如图所示,Reduce Task 不断的从各个
shuffle
中间文件拉取数据,并将数据输出到一个 buffer 中。
获取 record 后,Spark 建立一个类似
HashMap
的内存数据结构(
ExternalAppendOnlyMap
),对 buffer 中的 record 进行聚合,类
HashMap
中的 Key 是 record 中的 Key,类
HashMap
中的 Value 是经过相同聚合函数(
func()
)计算后的结果。
在图中,聚合函数是
sum()
函数,那么 Value 中存放的是多个 record 对应 Value 相加后的结果。之后,按照 Key进行排序,如图所示,则建立一个Array 结构,读取类
HashMap
中的 record,并对 record 按 Key (图中是按 key 的首字母)进行排序,排序完成后,将结果输出或者传递给下一步操作。
如果类
HashMap
存放不下,则会先扩容为两倍大小,如果还存放不下,就将
HashMap
中的 record 排序后 spill 到磁盘上。此时,
HashMap
被清空,可以继续对
buffer
中的 record 进行聚合。如果内存再次不够用,那么继续 spill 到磁盘上,此过程可以重复多次。
当聚合完成以后,将此时
HashMap
中的 reocrd 与磁盘上已排序的 record 进行再次聚合,得到最终的 record,输出到分区文件中,供其它数据操作。
好了,至此小林已经把 Shuffle 的整个全貌都讲清楚了。
版权归原作者 000X000 所有, 如有侵权,请联系我们删除。