0


详解RDD基本概念、RDD五大属性

一、RDD是什么

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。**RDD是spark core的底层核心**。

Dataset:

  • RDD 可以不保存具体数据, 只保留创建自己的必备信息, 例如依赖和计算函数;
  • RDD 也可以缓存起来, 相当于存储具体数据。

Distributed

    RDD 支持分区, 可以运行在集群中。

Resilient

  • RDD 支持高效的容错;
  • RDD 中的数据即可以缓存在内存中, 也可以缓存在磁盘中, 也可以缓存在外部存储中。

1.RDD的特点:

  • 弹性 - 容错的弹性:数据丢失可以自动恢复;- 存储的弹性:内存与磁盘的自动切换;- 计算的弹性:计算出错重试机制;- 分片的弹性:可根据需要重新分片。
  • 分布式:数据存储在集群不同节点上/计算分布式。
  • 数据集: RDD封装了计算逻辑,并不保存数据。
  • 数据抽象: RDD是一个抽象类,需要子类具体实现。
  • 不可变: RDD封装了计算逻辑,是不可以改变的,想要改变,只能产生新的RDD,在新的RDD里面封装计算逻辑。
  • 可分区、并行计算。

二、RDD 为什么会出现

在 RDD 出现之前, 当时 MapReduce 是比较主流的, 而 MapReduce 如何执行迭代计算的任务呢?

多个 MapReduce 任务之间没有基于内存的数据共享方式, 只能通过磁盘来进行共享,这种方式明显比较低效。

RDD 如何解决迭代计算非常低效的问题呢?

在 Spark 中, 其实最终 Job3 从逻辑上的计算过程是:

Job3 = (Job1.map).filter

, 整个过程是共享内存的, 而不需要将中间结果存放在可靠的分布式文件系统中。

这种方式可以在保证容错的前提下, 提供更多的灵活, 更快的执行速度, RDD 在执行迭代型任务时候的表现可以通过下面代码体现:

// 线性回归
val points = sc.textFile(...)
    .map(...)
    .persist(...)
val w = randomValue
for (i <- 1 to 10000) {
    val gradient = points.map(p => p.x * (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y)
        .reduce(_ + _)
    w -= gradient
}

在这个例子中, 进行了大致 10000 次迭代, 如果在 MapReduce 中实现, 可能需要运行很多 Job, 每个 Job 之间都要通过 HDFS 共享结果, 谁快谁慢一窥便知。

三、结合案例深入了解RDD

需求:

  • 给定一个网站的访问记录, 俗称 Access log
  • 计算其中出现的独立 IP, 以及其访问的次数
val config = new SparkConf().setAppName("ip_ana").setMaster("local[6]")
val sc = new SparkContext(config)

val result = sc.textFile("dataset/access_log_sample.txt")
  .map(item => (item.split(" ")(0), 1))
  .filter(item => StringUtils.isNotBlank(item._1))
  .reduceByKey((curr, agg) => curr + agg)
  .sortBy(item => item._2, false)
  .take(10)

result.foreach(item => println(item))

针对这个小案例, 我们问出互相关联但是又方向不同的六个问题:

1.假设要针对整个网站的历史数据进行处理, 量有 1T, 如何处理?

放在集群中, 利用集群多台计算机来并行处理。

2.如何放在集群中运行?

简单来讲, 并行计算就是同时使用多个计算资源解决一个问题, 有如下四个要点

  • 要解决的问题必须可以分解为多个可以并发计算的部分;
  • 每个部分要可以在不同处理器上被同时执行;
  • 需要一个共享内存的机制;
  • 需要一个总体上的协作机制来进行调度。

3.如果放在集群中的话, 可能要对整个计算任务进行分解, 如何分解?

概述:

  • 对于 HDFS 中的文件, 是分为不同的 Block 的;
  • 在进行计算的时候, 就可以按照 Block 来划分, 每一个 Block 对应一个不同的计算单元。

扩展:

  • RDD 并没有真实的存放数据, 数据是从 HDFS 中读取的, 在计算的过程中读取即可;
  • RDD 至少是需要可以 分片 的, 因为HDFS中的文件就是分片的, RDD 分片的意义在于表示对源数据集每个分片的计算, RDD 可以分片也意味着 可以并行计算。

4.移动数据不如移动计算是一个基础的优化, 如何做到?

每一个计算单元需要记录其存储单元的位置, 尽量调度过去。

5.在集群中运行, 需要很多节点之间配合, 出错的概率也更高, 出错了怎么办?

RDD1 → RDD2 → RDD3 这个过程中, RDD2 出错了, 有两种办法可以解决:

  1. 缓存 RDD2 的数据, 直接恢复 RDD2, 类似 HDFS 的备份机制;
  2. 记录 RDD2 的依赖关系, 通过其父级的 RDD 来恢复 RDD2, 这种方式会少很多数据的交互和保存。

如何通过父级 RDD 来恢复?

  1. 记录 RDD2 的父亲是 RDD1;
  2. 记录 RDD2 的计算函数, 例如记录 RDD2 = RDD1.map(…​), map(…​) 就是计算函数;
  3. 当 RDD2 计算出错的时候, 可以通过父级 RDD 和计算函数来恢复 RDD2。

6.假如任务特别复杂, 流程特别长, 有很多 RDD 之间有依赖关系, 如何优化?

上面提到了可以使用依赖关系来进行容错, 但是如果依赖关系特别长的时候, 这种方式其实也比较低效, 这个时候就应该使用另外一种方式, 也就是记录数据集的状态。

在 Spark 中有两个手段可以做到:

  1. 缓存
  2. Checkpoint

四、RDD的五大属性

/**
 * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
 * partitioned collection of elements that can be operated on in parallel. This class contains the
 * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
 * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
 * pairs, such as `groupByKey` and `join`;
 * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of
 * Doubles; and
 * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that
 * can be saved as SequenceFiles.
 * All operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)])
 * through implicit.
 *
 * Internally, each RDD is characterized by five main properties:
 *
 *  - A list of partitions
 *  - A function for computing each split
 *  - A list of dependencies on other RDDs
 *  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
 *  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
 *    an HDFS file)
 *
 * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD
 * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for
 * reading data from a new storage system) by overriding these functions. Please refer to the
 * <a href="http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf">Spark paper</a>
 * for more details on RDD internals.
 */

从上面源码中,可以得到RDD的五大属性:

1.分区列表( a list of partitions)

    Spark RDD是被分区的,每一个分区都会被一个计算任务(Task)处理,分区数决定了并行计算的数量,RDD的并行度默认从父RDD传给子RDD。默认情况下,一个HDFS上的数据分片就是一个 partiton,RDD分片数决定了并行计算的力度,可以在创建RDD时指定RDD分片个数(分区)。

    如果不指定分区数量,当RDD从集合创建时,则默认分区数量为该程序所分配到的资源的CPU核数(每个Core可以承载2~4个 partition),如果是从HDFS文件创建,默认为文件的 Block数。
/**
  * Implemented by subclasses to return the set of partitions in this RDD. This method will only
  * be called once, so it is safe to implement a time-consuming computation in it.
  *
  * The partitions in this array must satisfy the following property:
  *   `rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }`
  */
 protected def getPartitions: Array[Partition]

2.每一个分区都有一个计算函数( a function for computing each split)

    每个分区都会有计算函数, Spark的RDD的计算函数是以分片为基本单位的,每个RDD都会实现 compute函数,对具体的分片进行计算,不需要保存每次计算的结果。RDD中的分片是并行的,所以是分布式并行计算。

    有一点非常重要,就是由于RDD有前后依赖关系,遇到宽依赖关系,如reduceByKey等这些操作时划分成 Stage, Stage内部的操作都是通过 Pipeline进行的,在具体处理数据时它会通过 Blockmanager来获取相关的数据,因为具体的 split要从外界读数据,也要把具体的计算结果写入外界,所以用了一个管理器,具体的 split都会映射成 BlockManager的Block,而具体的split会被函数处理,函数处理的具体形式是以任务的形式进行的。
 /**
  * :: DeveloperApi ::
  * Implemented by subclasses to compute a given partition.
  */
 @DeveloperApi
 def compute(split: Partition, context: TaskContext): Iterator[T]

3.依赖关系( a list of dependencies on other RDDS)

由于RDD每次转换都会生成新的RDD,所以RDD会形成类似流水线一样的前后依赖关系,当然宽依赖就不类似于流水线了,宽依赖后面的RDD具体的数据分片会依赖前面所有的RDD的所有数据分片,这个时候数据分片就不进行内存中的 Pipeline,一般都是跨机器的,因为有前后的依赖关系,所以当有分区的数据丢失时, Spark会通过依赖关系进行重新计算,从而计算出丢失的数据,而不是对RDD所有的分区进行重新计算。

RDD之间的依赖有两种:窄依赖( Narrow Dependency)和宽依赖( Wide Dependency)。

窄依赖(Narrow Dependency)

窄依赖是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;

对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,这种转换不会引起shuffle操作,速度快!

宽依赖(Wide Dependency)

宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;

这种转换会引起shuffle操作,速度慢!

Shuffle是MapReduce框架中的一个特定的phase,介于Map phase和Reduce phase之间,当Map的输出结果要被Reduce使用时,输出结果需要按key哈希,并且分发到每一个Reducer上去,这个过程就是shuffle。由于shuffle涉及到了磁盘的读写和网络的传输,因此shuffle性能的高低直接影响到了整个程序的运行效率。

 /**
  * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
  * be called once, so it is safe to implement a time-consuming computation in it.
  */
 protected def getDependencies: Seq[Dependency[_]] = deps

4.key- value数据类型的RDD分区器( a Partitioner for key- alue RDDS)

一个Partitioner,即RDD的分区函数(可选项),Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

当前Spark中实现了两种类型的分区函数,

  1. 基于哈希的HashPartitioner,(key.hashcode % 分区数= 分区号)。它是默认值
  2. 基于范围的RangePartitioner。

什么会有Partitioner?

  1. 只有对于key-value的RDD(RDD[(String, Int)]),并且产生shuffle,才会有Partitioner;
  2. 非key-value的RDD(RDD[String])的Parititioner的值是None。

Option类型:可以表示有值或者没有值,它有2个子类:

  1. Some:表示封装了值
  2. None:表示没有值
/** Optionally overridden by subclasses to specify how they are partitioned. */
 @transient val partitioner: Option[Partitioner] = None

5每个分区都有一个优先位置列表,即首选位置( a list of preferred locations to compute each split on)

存储每个切片优先(preferred location)位置的列表。 比如对于一个 HDFS 文件来说, 这个列表保存的就是每个 Partition 所在文件块的位置.。按照“移动数据不如移动计算”的理念, Spark 在进行任务调度的时候, 会尽可能地将计算任务分配到其所要处理数据块的存储位置。

/**
  * Optionally overridden by subclasses to specify placement preferences.
  */
 protected def getPreferredLocations(split: Partition): Seq[String] = Nil
标签: 大数据 spark

本文转载自: https://blog.csdn.net/u010147215/article/details/125575053
版权归原作者 YaoYong_BigData 所有, 如有侵权,请联系我们删除。

“详解RDD基本概念、RDD五大属性”的评论:

还没有评论