0


大数据 - Spark系列《八》- 闭包引用

Spark系列文章:

大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客

大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客

大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客

大数据 - Spark系列《四》- Spark分布式运行原理-CSDN博客

大数据 - Spark系列《五》- Spark常用算子-CSDN博客

大数据 - Spark系列《六》- RDD详解-CSDN博客

大数据 - Spark系列《七》- 分区器详解-CSDN博客


8.1.🐶闭包引用的原理

1. 闭包引用的概念

  • 算子函数中引用了一个算子外部的变量 , 这个变量就是闭包变量 ;
  • 这些引用会随着任务的序列化而被发送到各个 Executor 上,并在 Executor 上被反序列化。
  • 闭包变量定义在Driver端 ,使用在任务实例端 , 变量需要序列化

2. 闭包引用的副本

  • 在 Executor 上执行的任务中的闭包对象是完全独立的。
  • 修改任务中的闭包对象不会影响到 Driver 端的原对象,因为它们是独立的副本。

3. 🧀实例代码1

对于在 RDD 算子函数中引用的外部对象,其修改仅影响到任务执行所在的 Executor 上的局部副本,而不会影响到 Driver 端的原对象。

package com.doit.day0217

import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}

/**
 * @日期: 2024/2/19
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */

object Test06 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 1)
    var cnt=0  // cnt,在Driver端的jvm中
    rdd.foreach(x=>{
      cnt+=1  // 此处的cnt是在worker端反序列化出的Task中,与driver端已无联系
      println(x)
    })
    println(cnt) // 此处打印的是driver端的cnt,依然是0

    // 关闭SparkContext对象
    sc.stop()
  }
}

4. 🧀实例代码2

Spark 的转换操作是惰性求值的,只有在调用结果算子(如

foreach

count

collect

等)时才会触发实际的作业执行。

因此

cnt

的取值为调用结果算子之前的值

package com.doit.day0217
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}
/**
 * @日期: 2024/2/19
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */

object Test07 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 1)
    var cnt=0  // cnt,在Driver端的jvm中

    //算子中函数在远端执行
    //函数中引用的遍历 闭包引用,将闭包变量复制一个副本发送到远端,你对闭包变量的所有操作操作的是副本
    val rdd1 = rdd.map(x => {
      cnt += 1 // 只有在调用结果算子时,才会runjob,所以这里取进来的cnt为10
      println("-----" + cnt)
      x*10
    })

    cnt=10
    println(cnt) // 此处打印的是driver端的cnt,依然是0

    rdd1.foreach(println)
    // 关闭SparkContext对象
    sc.stop()
  }
}

8.2 闭包引用的应用场景

可以使用闭包引用避免****shuffle

在我们之前讲解join算子的案例中,需要将两个数据集进行连接,通常会触发 shuffle 操作,这会带来一定的性能开销。

我们可以将user数据集转换成hashmap,通过闭包引用传入进去,此时可以避免shuffle

package com.doit.day0217

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.io.Source

/**
 * 示例代码:演示了如何使用闭包变量避免shuffle
 */
object Test08 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    // 加载用户数据,将其转换为Map
    val user = Source.fromFile("data/join/user.txt")
    val userMap: Map[String, String] = user.getLines().map(line => {
      val arr = line.split(",")
      (arr(0), arr(1))
    }).toMap // 将用户数据转换为Map,存储在内存中 一般数据集控制大小为1G以内

    // 加载订单数据
    val orders = sc.textFile("data/join/orders.txt")

    // 使用闭包变量userMap避免shuffle,将用户名直接关联到订单数据中
    val rdd2 = orders.map(iter => {
      val arr2 = iter.split(",")
      val name = userMap.getOrElse(arr2(4), "unknown") // 使用闭包变量userMap关联订单数据中的用户ID
      (arr2(0), arr2(1), arr2(2), arr2(3), arr2(4), name)
    })

    // 打印处理后的订单数据
    rdd2.foreach(println)

    // 关闭SparkContext对象
    sc.stop()
  }
}

如果需要让每个task都用有一份“只读”的“小量”数据,比如一个字典,则可以利用闭包引用;

如果共享的这分只读数据比较大,则应该使用“广播变量”效率更高!

🍠Source.fromFile

sc.textFile

的辨析

**1. 使用 ****
Source.fromFile

**** 读取数据:**

  • 何时使用:- 当数据量较小,可以完全装载到内存中并进行处理时,适合使用 Source.fromFile 方法读取数据。例如,对于文件大小在几十兆到几百兆之间的数据集,可以考虑使用该方法。- 特点:- 将文件内容一次性读取到内存中,适合对数据进行全量处理。- 读取的数据直接转换为内存中的集合类型(如Map),方便进行后续处理。- 注意事项:- 对于较大的数据集,可能会导致内存溢出,因此在处理大规模数据时需要谨慎使用。
**2. 使用 ****
sc.textFile

**** 读取数据:**

  • 何时使用:- 当数据量较大,无法一次性装载到内存中进行处理时,应该使用 Spark 的 sc.textFile 方法读取数据。例如,对于几百兆到几十亿的大型数据集,应该使用该方法。
  • 特点:- Spark 的 sc.textFile 方法将文件分布式地加载到集群中的各个节点上,并返回一个分布式数据集(RDD)。- 可以有效地处理大规模数据,具有良好的扩展性和性能。
  • 使用方式:- 使用 sc.textFile 方法加载文件后,可以使用各种 Spark 的转换操作对数据进行处理,如 map、filter、reduceByKey 等。
  • 注意事项:- 使用 sc.textFile 方法加载的数据集是分布式的,无法直接转换为集合类型,因此需要结合 Spark 的各种操作进行处理。

8.3 🐶闭包引用的注意事项

1.🥙序列化检查

闭包引用的对象,必须实现序列化接口,否则会导致task序列化失败,从而快速报错

   class Per extends Serializable {
      val id:Int = 0
    }
    val p = new Per()
    val rd = sc.makeRDD(1 to 10, 2)
    // 闭包引用的对象,必须实现序列化接口
    rd.map((_,p)).foreach(println)

2. 🥙“副本”数量

如果闭包引用的是普通对象,则每个task中都有一份“copy”

如果闭包引用的是一个object对象(单例对象),则其实在整个executor中只有一份,如下

Yarn :resourcemanager nodemanager

Nodemanager 提供计算资源 : container对资源的隔离

    object Per extends Serializable {
      val id:Int = 0
    }
    val p = Per
    val rd = sc.makeRDD(1 to 10, 2)
    rd.map((_,p)).foreach(println)

如果使用闭包引用object对象,有可能产生线程安全问题(因为有多个task线程共享这个对象);

看似闭包,实非闭包

下面的代码,其实并不是“闭包引用”

当然,里面用到了自定义类型,也还是要注意序列化问题

    class Phone(var brand: String, var price: Double)
    val resRdd = rdd.map(tp => {
      new Phone(tp._1, tp._2) // 这里并没有引用外部的对象,所以不存在f序列化检查失败的问题    })
      //.map(phone => (phone.brand, phone.price))
      //.reduceByKey(_ + _) // 这里有shuffle,但shuffle写出的是 2元组,它能序列化,所以不会报错
      .groupBy(p=>p.brand)  // 这里有shuffle,而且shuffle写出phone对象,它不能序列化,所以报错
      .mapValues(_.size)

8.4 🐶闭包变量的问题

  • 每个任务实例中都会保存一份完整的闭包变量
  • 如果一条节点同时运行当前任务的多个任务实例 , 存储多份闭包变量数据
package com.doit.day0217
import org.apache.spark.rdd.{JdbcRDD, RDD}
import org.apache.spark.{SparkConf, SparkContext}

import java.sql.{DriverManager, ResultSet}
import org.apache.log4j.{Level, Logger}
/**
 * @日期: 2024/2/19
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */

object Test07 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
    var cnt=0  // cnt,在Driver端的jvm中

    //算子中函数在远端执行
    //函数中引用的遍历 闭包引用,将闭包变量复制一个副本发送到远端,你对闭包变量的所有操作操作的是副本
    val rdd1 = rdd.map(x => {
      cnt += 1 // 只有在调用结果算子时,才会runjob,所以这里取进来的cnt为10
      println("-----" + cnt)
      x*10
    })

    cnt=10
    println(cnt) // 此处打印的是driver端的cnt,依然是0

    rdd1.foreach(println)
    // 关闭SparkContext对象
    sc.stop()
  }
}

解决方案: 运行当前任务的节点只存一份数据

🥙BT传输协议:基本原理

BitTorrent(简称BT)是一种用于大规模文件分享的通信协议。它被广泛用于分发大型文件和数据集,例如软件、电影、音乐等。BitTorrent协议的主要原理是将文件分成小块,并且允许用户同时上传和下载这些文件块,从而实现高效的分发。

以下是BitTorrent协议的主要特点和工作原理:

  1. 分布式架构:1. BitTorrent是一种分布式协议,没有单一的中心服务器,所有参与者都可以直接交换文件块。2. 参与者之间通过Tracker服务器或者DHT网络(分布式哈希表)进行通信,用于发现其他参与者并交换文件块的信息。
  2. 分块下载:1. 将文件分成固定大小的块(一般为256KB或512KB)。2. 下载者可以选择下载文件的哪些块,而不是整个文件,从而实现灵活的下载策略。
  3. 种子文件:1. 种子文件(Torrent文件)包含了文件的元数据信息,包括文件名、大小、哈希值等。2. 种子文件可以通过文件共享网站或者其他方式进行传播,从而让其他用户获取文件的元数据信息并加入下载。
  4. 优化的上传下载策略:1. BitTorrent协议实现了一种基于位掩码的上传下载策略,优先下载缺失的文件块,并且优先上传稀缺的文件块,从而提高下载速度和整体的网络效率。
  5. 健壮性和自我修复:1. BitTorrent协议具有较强的健壮性,即使某些参与者离线或者退出下载,其他参与者仍然可以通过其他方式获取丢失的文件块。2. 通过校验和哈希校验值,BitTorrent协议可以检测到下载的文件块是否损坏或者被篡改,并且自动请求重新下载。
  6. Tracker服务器和DHT网络:1. Tracker服务器用于管理下载者和上传者的信息,帮助下载者找到可用的上传者。2. DHT网络是一种分布式哈希表,允许下载者通过哈希值查询其他下载者的IP地址和端口信息,从而实现去中心化的Peer发现。


本文转载自: https://blog.csdn.net/weixin_40968325/article/details/136185979
版权归原作者 王哪跑nn 所有, 如有侵权,请联系我们删除。

“大数据 - Spark系列《八》- 闭包引用”的评论:

还没有评论