1、Spark 背景
在
Spark
出现以前,
Hadoop
框架在大数据领域可谓是德高望重,拥有分布式存储之王
HDFS
和 通用的调度系统
YARN
以及分布式计算框架
MapReduce
。MR 分布式计算的通用逻辑图如下所示:
由上图可知,
MapReduce
提供了两类计算抽象,分别是 Map 和 Reduce。Map 通常封装的是业务数据转换的逻辑,Map 计算结束后,把数据存到
HDFS
,通过数据分发,也就是 Shuffle ,Reduce 端进行数据聚合操作,最后把计算结果存放到
HDFS
。在这个过程中,
Hadoop
采用
HDFS
为计算抽象之间的数据接口来规避廉价磁盘引入的系统稳定性问题,采用
YARN
来完成分布式资源调度从而充分利用廉价的硬件资源。
上面的流程看起来很丝滑, Hadoop 的三个组件搭配,几乎可以实现大部分批处理任务。但是,往往一个任务中的数据量会很大,需要大量的 Map 和 Reduce,也就是说,上述的逻辑计算流程会被重复执行多次,就会使得数据不断的落盘,不断的 shuffle ,导致性能变差。
为了解决这个问题,2009 年 Spark 应运而生,加州伯克利分校的 AMP 实验室等人提出了基于内存的分布式计算引擎 -- Spark Core。
当然,Spark 远不止 Spark Core 这么简单,Spark Core 只是它的基石,它的模块大致可以分为以下几个部分:
2、什么是 RDD
RDD(Resilient Distributed Datasets),全称是“弹性分布式数据集”。从名字来看,完全不知道这是啥,官方给的解释是:RDD 是一种抽象,是 Spark 对于分布式数据集的抽象,它包含了所有内存中以及磁盘中的分布式数据。小林给你打个不恰当的比方,在理解 RDD 时可以将其类比成数组。数组是某种同类数据的集合,只不过数组的数据分布在单进程内,而 RDD 可以跨进程、跨节点分布,即
Spark
将分布式存储的数据集作了一层数据模型的抽象,称为 RDD。
为什么要提出 RDD 这样一个概念?
我们反观
MapReduce
的计算过程,其 Map 逻辑和 Reduce 计算逻辑之间,需要依靠 HDFS 作为数据接口。Map 将计算的中间结果以文件的形式存到 HDFS,Reduce 从 HDFS 读取数据后进行聚合,把最终结果再次存到 HDFS 上。
可以看出,Map 和 Reduce 因为其中间存在 HDFS 这样的数据接口,使得 Map 与 Reduce 之间计算不能流畅的衔接,HDFS 数据又是使用副本机制实现高可用,多副本便会带来更多的磁盘 IO 和网络 IO。所以,
Spark
提出了 RDD 的数据模型,将所有中间环节所产生的数据文件以某种统一的方式归纳、抽象出来,使得 Map 与 Redue 不需要 HDFS,从而上下游计算便可以更好的衔接在一起,减少了数据的落盘、发包与收包的动作。
3、RDD 特性
要想深度理解 RDD ,那必须绕不开 RDD 的五大特性。
- dependencies:表明了上下游 RDD 之间的依赖。任何一个 RDD 都不是凭空产生的,它是由上一个 RDD,通过执行某种 compute 的计算逻辑得到的。我们习惯把上一个 RDD 简称为当前 RDD 的父 RDD,术语叫做依赖。
- compute:计算函数。为 RDD 之间转换提供了某种计算逻辑。
- partitions:数据分片。在分布式计算中,一份非常大的数据集通常会按照某种规则分成很多份,散落在集群中的不同节点上,我们把某个节点上的数据称为数据分片。
- partitioner:划分数据分片的规则。
- preferredLocations:该属性表明的是数据分片的物理位置偏好。在 Spark 任务计算时,会根据数据分片的存放位置进行调度,优先调度本地数据。一般位置偏好有这么几种:- 本地内存:数据分片存在当前计算节点的内存,可直接访问- 本地磁盘:数据分片在当前计算节点的磁盘中有副本,可就地访问- 本机架磁盘:跨节点- 其他机架磁盘:跨机架- 无所谓:跨机房
给大家做个总结,各种类型的 RDD 有五大特性:
特性名特性类型特性含义dependencies变量刻画了 RDD 的依赖compute方法刻画了生成该 RDD 的计算规则partitions变量刻画了集群中该 RDD 所有的数据分片partitioner方法切分数据集的规则或方法preferredLocations变量刻画了数据分片物理位置偏好
4、RDD 弹性的体现
我们继续细品 RDD 的全称,弹性分布式数据集。那么,RDD 弹性到底体现在哪里?哈哈,不要被弹性这个词给搞懵了,弹性无非说的就是 RDD 的容错,考虑到checkpoint 机制还么有讲解,目前,小林从 RDD 的特性的角度来看。
- 每个 RDD 都有 dependencies 属性,该 dependencies 记录了父 RDD 的相关元数据信息。一个计算任务中,每个 RDD 的依赖关系便构成了一个血统,父 RDD 通过算子得到子 RDD。当 RDD 存在异常时,Spark 任务便可以根据血统来重新生成异常 RDD。
- 另外从 RDD 的数据分片属性来看,数据分片的数量可以调整,体现了 RDD 的弹性。
5、WordCount举例
object WordCount {
def main(args:Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(conf)
//读取文件内容
val lineRDD:RDD[String] = sc.textFile("./data/words.txt",2)
//以行为单位进行分词
val wordsRDD: RDD[String] = lineRDD.flatMap(line=>{line.split(" ")})
//把 RDD 元素转换为(key,value) 的形式
val kvRDD: RDD[(String, Int)] = wordsRDD.map(word=>{new Tuple2(word,1)})
//把相同的单词作为一组,并计算
val resultRDD: RDD[(String, Int)] = kvRDD.reduceByKey((v1, v2)=>{v1+v2})
//获取位置排在前面 3 位的词汇,注意这里没有排序不是词频前三
resultRDD.count()
}
}
看到上述这段程序,有个需要注意的点:
local[*]
,local 模式就是 Spark 单机的运行模式,其它部署模式后面我们都会讲到。local 一共有以下三种方式设置:
- local:所有计算都运行在一个线程当中,一般 IDEA 本地测试使用这种方式
- local[k]:指定使用几个线程来运行计算,比如local[4]就是运行4个worker线程。通常我们的cpu有几个core,就指定几个线程,最大化利用 cpu 的计算能力
- local[*]:这种模式直接帮你按照cpu最多cores来设置线程数。
5.1、WordCount Job 逻辑执行图
阅读完这段程序后,我们头脑中 Job 的逻辑执行流程应该是这样的:
所谓的 Job 逻辑执行图,就是 RDD 上下游的数据依赖图。
- 首先,通过调用
textFile
生成lineRDD
,然后调用flatMap
算子把lineRDD
转换为wordsRDD
; - 然后,为了后续的聚合运算,调用
map
算子转换成 (key,value) 型kvRDD
; - 之后调用
reduceByKey
做分组聚合,将(key,value)中的 value 进行单词计数。 - 最后代码中还有一行
resultRDD.count()
。当程序执行到count()
时,才会触发 Job 开始。触发后,会先在每个 parktition 上执行计数,然后将每个分区的执行结果发送到Driver
,最后在Driver
端进行 sum 求和。
总的来说,Job 的逻辑执行图描述的是 Job 的数据流,整个 Job 会经过哪些转换(transformation ),中间生成哪些 RDD,以及 RDD 之间的依赖关系。RDD 代表的是分布式数据形态,RDD 到另一个 RDD 之间的转换,本质上是数据形态上的转换。
5.2、WordCount Job 物理执行图
Job 的逻辑执行图还只是刻画了数据上的依赖关系,实际的 task 执行图会更加复杂。学过 Hadoop 的都知道,Hadoop 的整个数据流是固定的,一个进行 Map 处理,一个进行 Reduce 聚合处理,我们只需要把我们自己的计算逻辑分别写在 map( ) 和 reduce( ) 函数即可。
而 Spark 的数据依赖更加灵活,很难将数据依赖流和物理 task 执行统一在一起。因此,Spark 将 Job 的数据流和具体的 task 执行流分开,并设计了一种算法,将逻辑执行图转换成具体的物理执行图,具体算法下一篇再作讨论。
针对 WordCount Job,其物理执行的 DAG 图如下:
从上述的物理执行图可以看出,这个 WordCount 只产生了一个 Job,由 action(
resultRDD.count()
)触发产生。
- 整个 Job 分成了两个 Stage,分别为紫色和蓝色两个框框(这里不明白 Stage 概念没关系,下一篇会介绍这个概念)。
- 其中 Stage1 (也就是紫色) 包含了 2 个 task ,每个 task 负责从 words.txt 读取部分数据,然后并将各个 RDD 通过 Transformation 算子进行转换,最后将各个分区的转换结果写入本地磁盘。
- Stage0(也就是蓝色)同样包含了 2 个 task,我们可将其称为 ResultTask,每个 task 首先 shuffle 自己要处理的数据,边 fetch 数据边进行聚合操作,最后进行
count()
计算得到 result,这里的 result 是每个分区内包含多少条 records。 - task 执行完后,Driver 收集每个 task 的执行结果,进行
sum()
。 - WordCount Job 结束。
可以得知,Job 的物理执行图比较复杂。这里值得提醒的一点是,每个 application 中并不是只包含一个 Job ,可能会包含多个 Job,每个 Job 包含多个 Stage,每个 Stage 包含多个 Task。具体 Job 的个数可以通过 Action 算子界定,即有多少个 Action 算子,就会产生多少个 Job。
注意:
- 在 RDD 编程模型中分为两类算子:Transformation 和 Action 算子,我们通过调用 Transformation 算子去定义并描述数据形态的转换,然后调用 Action 类算子触发执行,将计算结果收集或者存放到磁盘。
- Spark 这种编程模型,势必会把 Job 切分成两个环节:- 基于不同数据形态之间的转换,构建 DAG (有向无环图)- 通过 Action 类算子,以回溯的方式去触发执行这个 DAG 。
也就是说,在调用 Action 类算子之前,整个任务并不会立即执行,只有当调用到 Action 类型算子,之前的转换算子才会执行。这种计算模式我们称之为 “延迟计算”,又名惰性计算(lazy evaluation)。
版权归原作者 000X000 所有, 如有侵权,请联系我们删除。