一、RDD是什么
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。**RDD是spark core的底层核心**。
Dataset:
- RDD 可以不保存具体数据, 只保留创建自己的必备信息, 例如依赖和计算函数;
- RDD 也可以缓存起来, 相当于存储具体数据。
Distributed:
RDD 支持分区, 可以运行在集群中。
Resilient:
- RDD 支持高效的容错;
- RDD 中的数据即可以缓存在内存中, 也可以缓存在磁盘中, 也可以缓存在外部存储中。
1.RDD的特点:
- 弹性 - 容错的弹性:数据丢失可以自动恢复;- 存储的弹性:内存与磁盘的自动切换;- 计算的弹性:计算出错重试机制;- 分片的弹性:可根据需要重新分片。
- 分布式:数据存储在集群不同节点上/计算分布式。
- 数据集: RDD封装了计算逻辑,并不保存数据。
- 数据抽象: RDD是一个抽象类,需要子类具体实现。
- 不可变: RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑。
- 可分区、并行计算。
二、RDD 为什么会出现
在 RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行迭代计算的任务呢?
多个 MapReduce 任务之间没有基于内存的数据共享方式, 只能通过磁盘来进行共享,这种方式明显比较低效。
RDD 如何解决迭代计算非常低效的问题呢?
在 Spark 中, 其实最终 Job3 从逻辑上的计算过程是:
Job3 = (Job1.map).filter
, 整个过程是共享内存的, 而不需要将中间结果存放在可靠的分布式文件系统中。
这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度, RDD 在执行迭代型任务时候的表现可以通过下面代码体现:
// 线性回归
val points = sc.textFile(...)
.map(...)
.persist(...)
val w = randomValue
for (i <- 1 to 10000) {
val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)
.reduce(_ + _)
w -= gradient
}
在这个例子中, 进行了大致 10000 次迭代, 如果在 MapReduce 中实现, 可能需要运行很多 Job, 每个 Job 之间都要通过 HDFS 共享结果, 谁快谁慢一窥便知。
三、结合案例深入了解RDD
需求:
- 给定一个网站的访问记录, 俗称 Access log
- 计算其中出现的独立 IP, 以及其访问的次数
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)
val result = sc.textFile("dataset/access_log_sample.txt")
.map(item => (item.split(" ")(0), 1))
.filter(item => StringUtils.isNotBlank(item._1))
.reduceByKey((curr, agg) => curr + agg)
.sortBy(item => item._2, false)
.take(10)
result.foreach(item => println(item))
针对这个小案例, 我们问出互相关联但是又方向不同的六个问题:
1.假设要针对整个网站的历史数据进行处理, 量有 1T, 如何处理?
放在集群中, 利用集群多台计算机来并行处理。
2.如何放在集群中运行?
简单来讲, 并行计算就是同时使用多个计算资源解决一个问题, 有如下四个要点
- 要解决的问题必须可以分解为多个可以并发计算的部分;
- 每个部分要可以在不同处理器上被同时执行;
- 需要一个共享内存的机制;
- 需要一个总体上的协作机制来进行调度。
3.如果放在集群中的话, 可能要对整个计算任务进行分解, 如何分解?
概述:
- 对于 HDFS 中的文件, 是分为不同的 Block 的;
- 在进行计算的时候, 就可以按照 Block 来划分, 每一个 Block 对应一个不同的计算单元。
扩展:
RDD
并没有真实的存放数据, 数据是从 HDFS 中读取的, 在计算的过程中读取即可;RDD
至少是需要可以 分片 的, 因为HDFS中的文件就是分片的,RDD
分片的意义在于表示对源数据集每个分片的计算,RDD
可以分片也意味着 可以并行计算。
4.移动数据不如移动计算是一个基础的优化, 如何做到?
每一个计算单元需要记录其存储单元的位置, 尽量调度过去。
5.在集群中运行, 需要很多节点之间配合, 出错的概率也更高, 出错了怎么办?
RDD1 → RDD2 → RDD3 这个过程中, RDD2 出错了, 有两种办法可以解决:
- 缓存 RDD2 的数据, 直接恢复 RDD2, 类似 HDFS 的备份机制;
- 记录 RDD2 的依赖关系, 通过其父级的 RDD 来恢复 RDD2, 这种方式会少很多数据的交互和保存。
如何通过父级 RDD 来恢复?
- 记录 RDD2 的父亲是 RDD1;
- 记录 RDD2 的计算函数, 例如记录
RDD2 = RDD1.map(…)
,map(…)
就是计算函数; - 当 RDD2 计算出错的时候, 可以通过父级 RDD 和计算函数来恢复 RDD2。
6.假如任务特别复杂, 流程特别长, 有很多 RDD 之间有依赖关系, 如何优化?
上面提到了可以使用依赖关系来进行容错, 但是如果依赖关系特别长的时候, 这种方式其实也比较低效, 这个时候就应该使用另外一种方式, 也就是记录数据集的状态。
在 Spark 中有两个手段可以做到:
- 缓存
- Checkpoint
四、RDD的五大属性
/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
* pairs, such as `groupByKey` and `join`;
* [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
* Doubles; and
* [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
* can be saved as SequenceFiles.
* All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])
* through implicit.
*
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions
* - A function for computing each split
* - A list of dependencies on other RDDs
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
*
* All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
* to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
* reading data from a new storage system) by overriding these functions. Please refer to the
* <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
* for more details on RDD internals.
*/
从上面源码中,可以得到RDD的五大属性:
1.分区列表( a list of partitions)
Spark RDD是被分区的,每一个分区都会被一个计算任务(Task)处理,分区数决定了并行计算的数量,RDD的并行度默认从父RDD传给子RDD。默认情况下,一个HDFS上的数据分片就是一个 partiton,RDD分片数决定了并行计算的力度,可以在创建RDD时指定RDD分片个数(分区)。
如果不指定分区数量,当RDD从集合创建时,则默认分区数量为该程序所分配到的资源的CPU核数(每个Core可以承载2~4个 partition),如果是从HDFS文件创建,默认为文件的 Block数。
/**
* Implemented by subclasses to return the set of partitions in this RDD. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*
* The partitions in this array must satisfy the following property:
* `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
*/
protected def getPartitions: Array[Partition]
2.每一个分区都有一个计算函数( a function for computing each split)
每个分区都会有计算函数, Spark的RDD的计算函数是以分片为基本单位的,每个RDD都会实现 compute函数,对具体的分片进行计算,不需要保存每次计算的结果。RDD中的分片是并行的,所以是分布式并行计算。
有一点非常重要,就是由于RDD有前后依赖关系,遇到宽依赖关系,如reduceByKey等这些操作时划分成 Stage, Stage内部的操作都是通过 Pipeline进行的,在具体处理数据时它会通过 Blockmanager来获取相关的数据,因为具体的 split要从外界读数据,也要把具体的计算结果写入外界,所以用了一个管理器,具体的 split都会映射成 BlockManager的Block,而具体的split会被函数处理,函数处理的具体形式是以任务的形式进行的。
/**
* :: DeveloperApi ::
* Implemented by subclasses to compute a given partition.
*/
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
3.依赖关系( a list of dependencies on other RDDS)
由于RDD每次转换都会生成新的RDD,所以RDD会形成类似流水线一样的前后依赖关系,当然宽依赖就不类似于流水线了,宽依赖后面的RDD具体的数据分片会依赖前面所有的RDD的所有数据分片,这个时候数据分片就不进行内存中的 Pipeline,一般都是跨机器的,因为有前后的依赖关系,所以当有分区的数据丢失时, Spark会通过依赖关系进行重新计算,从而计算出丢失的数据,而不是对RDD所有的分区进行重新计算。
RDD之间的依赖有两种:窄依赖( Narrow Dependency)和宽依赖( Wide Dependency)。
窄依赖(Narrow Dependency)
窄依赖是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;
对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,这种转换不会引起shuffle操作,速度快!
宽依赖(Wide Dependency)
宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;
这种转换会引起shuffle操作,速度慢!
Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。
/**
* Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
* be called once, so it is safe to implement a time-consuming computation in it.
*/
protected def getDependencies: Seq[Dependency[_]] = deps
4.key- value数据类型的RDD分区器( a Partitioner for key- alue RDDS)
一个Partitioner,即RDD的分区函数(可选项),Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
当前Spark中实现了两种类型的分区函数,
- 基于哈希的HashPartitioner,(key.hashcode % 分区数= 分区号)。它是默认值
- 基于范围的RangePartitioner。
什么会有Partitioner?
- 只有对于key-value的RDD(RDD[(String, Int)]),并且产生shuffle,才会有Partitioner;
- 非key-value的RDD(RDD[String])的Parititioner的值是None。
Option类型:可以表示有值或者没有值,它有2个子类:
- Some:表示封装了值
- None:表示没有值
/** Optionally overridden by subclasses to specify how they are partitioned. */
@transient val partitioner: Option[Partitioner] = None
5每个分区都有一个优先位置列表,即首选位置( a list of preferred locations to compute each split on)
存储每个切片优先(preferred location)位置的列表。 比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置.。按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置。
/**
* Optionally overridden by subclasses to specify placement preferences.
*/
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
版权归原作者 YaoYong_BigData 所有, 如有侵权,请联系我们删除。