0


Spark-序列化、依赖关系、持久化

序列化

闭包检查

  1. 从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。那么在 scala 的函数式编程中,就会导致算子内经常会用到算子外的数据,这样就 形成了闭包的效果,如果使用的算子外的数据无法序列化,就意味着无法传值给 Executor 端执行,就会发生错误,所以需要在执行任务计算前,检测闭包内的对象是否可以进行序列 化,这个操作我们称之为闭包检测。Scala2.12 版本后闭包编译方式发生了改变

序列化方法和属性

  1. 从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor 端执行。
  1. object spark_02 {
  2. def main(args: Array[String]): Unit = {
  3. //准备环境
  4. //"*"代表线程的核数 应用程序名称"RDD"
  5. val sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDD")
  6. val sc = new SparkContext(sparkConf)
  7. val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))
  8. //创建查询对象
  9. val search = new Search("h")
  10. //函数传递,打印:ERROR Task not serializable
  11. search.getMatch1(rdd).collect().foreach(println)
  12. println("========================================")
  13. //属性传递,打印:ERROR Task not serializable
  14. search.getMatch2(rdd).collect().foreach(println)
  15. //关闭环境
  16. sc.stop()
  17. }
  18. }
  19. //查询对象
  20. //类的构造参数是类的属性,构造参数需要进行闭包检查(对类进行闭包检查)
  21. class Search(query:String) extends Serializable {
  22. def isMatch(s: String): Boolean = {
  23. s.contains(query)
  24. }
  25. // 函数序列化案例
  26. def getMatch1 (rdd: RDD[String]): RDD[String] = {
  27. rdd.filter(isMatch)
  28. }
  29. // 属性序列化案例
  30. def getMatch2(rdd: RDD[String]): RDD[String] = {
  31. rdd.filter(x => x.contains(query))
  32. }
  33. }

依赖关系

  1. 相邻的两个RDD关系称之为依赖关系

RDD 血缘关系

  1. 多个连续的RDD的依赖关系称之为血缘关系
  2. RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage (血统)记录下来,以便恢复丢失的分区。RDD Lineage 会记录 RDD 的元数据信息和转 换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的 数据分区。
  1. val fileRDD: RDD[String] = sc.textFile("input/1.txt")
  2. println(fileRDD.toDebugString) //打印输出血缘关系
  3. println("----------------------")
  4. val wordRDD: RDD[String] = fileRDD.flatMap(_.split(" "))
  5. println(wordRDD.toDebugString)
  6. println("----------------------")
  7. val mapRDD: RDD[(String, Int)] = wordRDD.map((_,1))
  8. println(mapRDD.toDebugString)
  9. println("----------------------")
  10. val resultRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_+_)
  11. println(resultRDD.toDebugString)
  12. resultRDD.collect()

RDD 窄依赖

  1. 窄依赖表示每一个父(上游)RDD Partition 最多被子(下游)RDD 的一个 Partition 使用, 窄依赖我们形象的比喻为独生子女。

RDD 宽依赖

  1. 宽依赖表示同一个父(上游)RDD Partition 被多个子(下游)RDD Partition 依赖,会 引起 Shuffle,总结:宽依赖我们形象的比喻为多生。

RDD 任务划分

  1. RDD 任务切分中间分为:ApplicationJobStage Task
  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job:一个 Action 算子就会生成一个 Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数。

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。

RDD 持久化

RDD Cache 缓存

  1. RDD 通过 Cache 或者 Persist 方法将前面的计算结果缓存,默认情况下会把数据以缓存 JVM 的堆内存中。但是并不是这两个方法被调用时立即缓存,而是触发后面的 action 子时,该 RDD 将会被缓存在计算节点的内存中,并供后面重用。
  1. // cache 操作会增加血缘关系,不改变原有的血缘关系
  2. println(wordToOneRdd.toDebugString)
  3. // 数据缓存。
  4. wordToOneRdd.cache()
  5. // 可以更改存储级别
  6. //mapRdd.persist(StorageLevel.MEMORY_AND_DISK_2)
  1. 缓存有可能丢失,或者存储于内存的数据由于内存不足而被删除,RDD 的缓存容错机 制保证了即使缓存丢失也能保证计算的正确执行。通过基于 RDD 的一系列转换,丢失的数 据会被重算,由于 RDD 的各个 Partition 是相对独立的,因此只需要计算丢失的部分即可, 并不需要重算全部 Partition Spark 会自动对一些 Shuffle 操作的中间数据做持久化操作(比如:reduceByKey)。这样 做的目的是为了当一个节点 Shuffle 失败了避免重新计算整个输入。但是,在实际使用的时 候,如果想重用数据,仍然建议调用 persist cache

RDD CheckPoint 检查点

  1. 所谓的检查点其实就是通过将 RDD 中间结果写入磁盘 由于血缘依赖过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果检查点 之后有节点出现问题,可以从检查点开始重做血缘,减少了开销。 RDD 进行 checkpoint 操作并不会马上被执行,必须执行 Action 操作才能触发。
  1. // 设置检查点路径
  2. sc.setCheckpointDir("./checkpoint1")
  3. // 创建一个 RDD,读取指定位置文件:hello atguigu atguigu
  4. val lineRdd: RDD[String] = sc.textFile("input/1.txt")
  5. // 业务逻辑
  6. val wordRdd: RDD[String] = lineRdd.flatMap(line => line.split(" "))
  7. val wordToOneRdd: RDD[(String, Long)] = wordRdd.map {
  8. word => {
  9. (word, System.currentTimeMillis())
  10. }
  11. }
  12. // 增加缓存,避免再重新跑一个 job 做 checkpoint
  13. wordToOneRdd.cache()
  14. // 数据检查点:针对 wordToOneRdd 做检查点计算
  15. wordToOneRdd.checkpoint()
  16. // 触发执行逻辑
  17. wordToOneRdd.collect().foreach(println)

缓存和检查点区别

  1. Cache 缓存只是将数据保存起来,不切断血缘依赖。Checkpoint 检查点切断血缘依赖。
  2. Cache 缓存的数据通常存储在磁盘、内存等地方,可靠性低。Checkpoint 的数据通常存 储在 HDFS 等容错、高可用的文件系统,可靠性高。
  3. 建议对 checkpoint()的 RDD 使用 Cache 缓存,这样 checkpoint 的 job 只需从 Cache 缓存 中读取数据即可,否则需要再从头计算一次 RDD。
标签: spark scala 大数据

本文转载自: https://blog.csdn.net/dafsq/article/details/129466546
版权归原作者 open_test01 所有, 如有侵权,请联系我们删除。

“Spark-序列化、依赖关系、持久化”的评论:

还没有评论