0


RDD的分区、依赖关系、机制

一.RDD的分区

RDD的分区原则是分区的个数尽量等于集群中的CPU核心(Core)数目。

各种模式下的默认分区数目如下

(1) Local****模式:默认为本地机器的CPU数目,若设置了local[N].则默认为N.

(2) Standalone****或者Yarn模式:在“集群中所有CPU核数总和"和“2”这两者中取较大值作为默认值。

**(3) Mesos **模式:默认的分区数是8.

Spark****框架为RDD提供了两种分区方式,分别是哈希分区(HashPartitioner)和范围分区(RangePartitioner)。

Spark****也支持自定义分区方式,即通过一个自定义的Partitioner对象来控制RDD的分区,从而进一步减少通信开销。

如果想要实现自定义分区,就需要定义一个类,使得这个自定义的类继承org. apache. spark. Partitioner类,并实现其中的3个方法,具体如下:

**(1) def numPartitions:Int:**用于返回创建的分区个数。

**(2) def getPartition(Key:Any):**用于对输人的Key做处理,并返回该Key的分区ID, 分区ID的范围是0~ numPartitions 1.

**(3) equals (other: Any):**用于Spark判断自定义的Partitioner 对象和其他的 Partitioner对象是否相同,从而判断两个RDD的分区方式是否相同。

二.RDD的依赖关系

RDD****之间具有依赖的关系。

RDD****与它所依赖的RDD的依赖关系有两种类型,分别是

窄依赖(narrow dependency)****和宽依赖(wide dependency)

**窄依赖是指父RDD的每一个分区最多被一个子RDD的分区使用,即OneToOneDependencies。窄依赖的表现一般分为两类:第一类表现为一个父RDD的分区对应于一个子RDD的分区;第二类表现为多个父RDD的分区对应于一个子RDD的分区,也就是说,一个父RDD的一个分区不可能对应一个子RDD的多个分区。 **

当RDD执行map、filter、union、和join操作时,都会产生窄依赖。

**宽依赖是指子RDD的每一个分区都会使用所有父RDD的所有分区或多个分区,即One ToManyDependecies。(当RDD进行groupByKey和join操作时,会产生宽依赖) **

join****算子操作既可以属于窄依赖,也可以属于宽依赖。当join算子操作后,分区数量没有变化则为窄依赖(如join with inputs co partitioned,输人协同划分);当join算子操作后,分区数量发生变化则为宽依赖(如join with inputs not corpartitioned,输人非协同划分)。

三.RDD机制

Spark****为RDD提供了两个重要的机制,分别是特久化机制(即缓存机制)和容错机制。

RDD****是采用惰性求值(即每次调用行动算子操作,都会从头开始计算)

RDD****的持久化操作有两种方法,分别是cache()方法和persist()方法。

①persist()方法的存储级别是通过StorageLevel对象(Scala Java、Python)设置的。

②cache()方法的存储级别是使用默认的存储级别(即StorageLevel. MEMORY ONLY(将反序列化的对象存人内存))。

** **持久化RDD的存储级别

MEMORY_ ONLY

默认存储级别。将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则某些分区将不会被缓存,并且每次需要时都会重新计算

MEMORY_AND_DISK

将RDD作为反序列化的Java对象,缓存到JVM中,若内存放不下(内存已满情况),则将剩余分区存储到磁盘上,并在需要时从磁盘读取

MEMORY_ONLY_SER

将RDD作为序列化的Java对象(每个分区序列化为一个字节数组),比反序列化的Java对象节省空间,但读取时更占CPU

MEMORY_AND_DISK_SER

与MEMORY ONLY SER类似,但是当内存放不下时则溢出到磁盘,而不是每次需要时重新计算它们

DISK_ONLY

仅将RDD分区全部存储到磁盘上

MEMORY_ONLY_2 MEMORY_AND_DISK_2

与上面的级别相同。若加上后缀2,代表的是将每个持久化的数据都复制一份副本,并将副本保存到其他节点上

**OFF_HEAP(**实验性)

与MEMORY ONLY SER类似,但将数据存储在堆外内存中(这需要启用堆外内存)

1、使用persist()方法对RDD进行持久化

定义一个列表list,通过该列表创建一个RDD,然后通过persist持久化操作和算子操作统计RDD中的元素个数以及打印输出RDD中所有的元素。

代码:

import org.apache.spark.storage.StorageLevel

val testlist = List("hadoop","Python","Spark","Java")

val listRDD = sc.parallelize(testlist)

listRDD.persist(StorageLevel.DISK_ONLY)

println(listRDD.count())

println(listRDD.collect().mkString(","))

2、使用cache()方法对RDD进行持久化

代码:

val list = List("hadoop","Python","Spark","Java")

val listRDD = sc.parallelize(list)

listRDD.cache()

println(listRDD.count())

println(listRDD.collect().mkString(","))

三、容错机制

RDD****提供了两种故障恢复的方式,分别是血统(lineage)方式和设置检查点(checkpoint)方式。

①血统方式,主要是根据RDD之间的依赖关系对丢失数据的RDD进行数据恢复。

②设置检查点方式,本质上是将RDD写人磁盘进行存储。


本文转载自: https://blog.csdn.net/m0_57781407/article/details/126381115
版权归原作者 发量不足 所有, 如有侵权,请联系我们删除。

“RDD的分区、依赖关系、机制”的评论:

还没有评论