0


大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节完成的内容如下:

  • Spark Super Word Count 程序 Scala语言编写
  • 将数据写入MySQL、不写入MySQL等编码方式
  • 代码的详细解释与结果

在这里插入图片描述

背景介绍

这涉及到进程通信,是需要序列化的,可以简单的认为:SparkContext代表Drive

在实际的开发过程中会自定一些RDD的操作,此时需要注意的是:

  • 初始化工作是Driver端进行的
  • 实际运行程序是Executor端进行的

测试代码

遇到问题

class MyClass1(x:Int){val num = x
}object SerializableDemo {def main (args: Array[String]):Unit={val conf =new SparkConf().setAppName("SerializableDemo").setMaster("local[*]")val sc =new SparkContext(conf)
    sc.setLogLevel("WARN")val rdd1 = sc.makeRDD(1 to 20)def add1(x:Int)= x +10val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)// 普通的类不具备序列化能力val object1 =new MyClass1(10)// 报错 提示无法序列化// rdd1.map(x => object1.num + x).foreach(println)}}

解决方案1

caseclass MyClass2(num:Int)val object2 = MyClass2(20)
rdd1.map(x => object2.num + x).foreach(println)

解决方案2

class MyClass3(x:Int)extends Serializable {val num = x
}val object3 =new MyClass3(30)
rdd1.map(x => object3.num + x).foreach(println)

解决方案3

class MyClass1(x:Int){val num = x
}lazyval object4 =new MyClass1(40)
rdd1.map(x => object4.num + x).foreach(println)

完整代码

packageicu.wzkimportorg.apache.spark.{SparkConf, SparkContext}class MyClass1(x:Int){val num = x
}caseclass MyClass2(num:Int)class MyClass3(x:Int)extends Serializable {val num = x
}object SerializableDemo {def main (args: Array[String]):Unit={val conf =new SparkConf().setAppName("SerializableDemo").setMaster("local[*]")val sc =new SparkContext(conf)
    sc.setLogLevel("WARN")val rdd1 = sc.makeRDD(1 to 20)def add1(x:Int)= x +10val add2 = add1 _

    // 过程和方法 都具备序列化的能力
    rdd1.map(add1(_)).foreach(println)
    rdd1.map(add2(_)).foreach(println)// 普通的类不具备序列化能力val object1 =new MyClass1(10)// 报错 提示无法序列化// rdd1.map(x => object1.num + x).foreach(println)// 解决方案1 使用 case classval object2 = MyClass2(20)
    rdd1.map(x => object2.num + x).foreach(println)// 解决方案2 实现 Serializableval object3 =new MyClass3(30)
    rdd1.map(x => object3.num + x).foreach(println)// 解决方法3 延迟创建lazyval object4 =new MyClass1(40)
    rdd1.map(x => object4.num + x).foreach(println)

    sc.stop()}}

注意事项

  • 如果在方法、函数的定义中引入了不可序列化的对象,也会导致任务不能够序列化
  • 延迟创建的解决方案比较简单,且实用性广

RDD依赖关系

基本概念

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建RDD的一系列Lineage(血统)记录下来,以便恢复丢失的分区。
RDD的Lineage会记录RDD的元数据信息和转换行为,当该RDD的部分分区数据丢失时,可根据这些信息来重新运算和恢复丢失的数据分区。

在这里插入图片描述
RDD和它的依赖的父RDDs的关系有两种不同的类型:

  • 窄依赖(narrow dependency):1:1或n:1
  • 宽依赖(wide dependency):n:m 意味着有 shuflle

在这里插入图片描述
RDD任务切分中间分为:Driver program、Job、Stage(TaskSet) 和 Task

  • Driver program:初始化一个SparkContext即生成一个Spark应用
  • Job:一个Action算子就会生成一个Job
  • Stage:根据RDD之间的依赖关系不同将Job划分成不同的Stage,遇到一个宽依赖则划分一个Stage
  • Task:Stage是一个TaskSet,将Stage划分的结果发送到不同的Executor执行即为一个Task
  • Task是Spark中任务调度的最小单位,每个Stage包含许多Task,这些Task执行的计算逻辑是相同的,计算的数据是不同的
  • DriverProgram -> Job -> Stage -> Task 每一层都是 1 对 N 的关系

再回WordCount

代码部分

你可以用代码执行,也可以在 SparkShell 中执行。

packageicu.wzkimportorg.apache.spark.{SparkConf, SparkContext}object ReWordCount {def main(args: Array[String]):Unit={val conf =new SparkConf().setAppName("SparkFindFriends").setMaster("local[*]")val sc =new SparkContext(conf)
    sc.setLogLevel("WARN")val rdd1 = sc.textFile("goodtbl.java")val rdd2 = rdd1.flatMap(_.split("\\+"))val rdd3 = rdd2.map((_,1))val rdd4 = rdd3.reduceByKey(_ + _)val rdd5 = rdd4.sortByKey()
    rdd5.count()// 查看RDD的血缘关系
    rdd1.toDebugString
    rdd5.toDebugString

    // 查看依赖
    rdd1.dependencies
    rdd1.dependencies(0).rdd

    rdd5.dependencies
    rdd5.dependencies(0).rdd
    
    sc.stop()}}

提出问题

上面的WordCount中,一共有几个Job,几个Stage,几个Task?
在这里插入图片描述
答案:1个Job,3个Stage,6个Task

RDD持久化/缓存

基本概念

涉及到的算子:persis、cache、unpersist 都是 Transformation

  • 缓存是将计算结果写入不同的介质,用户定义可定义存储级别(存储级别定义了缓存存储的介质,目前支持内存、堆外内存、磁盘)
  • 通过缓存,Spark避免了RDD上的重复计算,能够极大提升计算的速度。
  • RDD持久化或缓存,是Spark最重要的特征之一,Spark构建迭代算法和快速交互式查询的关键所在
  • Spark非常快的原因之一就是在内存、缓存中持久化,当持久化一个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此数据集进行其他动作(Action),这使得后续更加迅速
  • 使用 persist() 方法将一个RDD标记为持久化,之所以说“标记持久化”是因为使用persist()的地方,并不会马上计算生成RDD并把它持久化,而是要等遇到第一个行动操作出发真正计算后,才会把计算结果进行持久化。在这里插入图片描述
  • 一般情况下,如果多个动作需要用到某个RDD,而它的计算代价又比较高,那么就应该把这个RDD缓存起来
  • 缓存有可能丢失,或者存储在内存由于空间不足而被删除,RDD的缓存的容错机制保证了即使缓存丢失也可以完成计算。通过基于RDD的一系列的转换,丢失的数据会被重算。
  • RDD各个Partition是相对独立的,因为只需要计算丢失的部分即可,而不是需要重算全部的Partition

持久化级别

使用 cache() 方法时,会调用 persist(MEMORY_ONLY),即

cache()== persist(StorageLevel.Memory.ONLY)

对于其他的存储级别,如下图:

  • MEMORY_ONLY
  • MEMORY_AND_DISK
  • MEMORY_ONLY_SER
  • MEMORY_AND_DISK_SER
  • DISK_ONLY
  • DISK_ONLY_2

在这里插入图片描述

标签: 大数据 spark 缓存

本文转载自: https://blog.csdn.net/w776341482/article/details/141310424
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。

“大数据-89 Spark 集群 RDD 编程-高阶 编写代码、RDD依赖关系、RDD持久化/缓存”的评论:

还没有评论