0


大数据 - Spark系列《九》- 广播变量

** Spark系列文章:**

大数据 - Spark系列《一》- 从Hadoop到Spark:大数据计算引擎的演进-CSDN博客

大数据 - Spark系列《二》- 关于Spark在Idea中的一些常用配置-CSDN博客

大数据 - Spark系列《三》- 加载各种数据源创建RDD-CSDN博客

大数据 - Spark系列《四》- Spark分布式运行原理-CSDN博客

大数据 - Spark系列《五》- Spark常用算子-CSDN博客

大数据 - Spark系列《六》- RDD详解-CSDN博客

大数据 - Spark系列《七》- 分区器详解-CSDN博客

大数据 - Spark系列《八》- 闭包引用-CSDN博客

9.1 🥙简介

广播变量(Broadcast Variables)是 Spark 中用于在集群中共享数据的一种机制。*它允许在整个集群中缓存一个只读*的变量,从而在每个节点(****executor)**上只复制一份数据,而不是每个任务都复制一份。这样可以大大减少网络传输的数据量,提高了任务的执行效率。

广播变量被保存在本地,并会把广播变量的值切分成多个数据块进行保存。广播变量数据块的默认大小是4M,数据块太大或太小都不利于数据的传输。

就是说,

当Job的某个阶段的Task使用到一个闭包变量, 且一个Executor中可能会运行相同类型的任务时

如果你的应用需要向所有节点发送一个较大的只读查询表,广播变量非常适合。

9.2 特点

1)广播变量会在每个executor中保留一份副本,而不是为每个Task保留一份副本。

2)Spark使用了BitTorrent协议进行广播变量的分发,这样可以减少通信成本,该分发算法在后面章节有介绍。

9.3 🥙使用场景

  • 共享较大的只读数据: 广播变量通常用于在任务之间共享较大的只读数据,例如机器学习模型参数、配置信息等。
  • 减少数据传输量: 通过在集群中广播变量的方式,可以避免在每个任务中复制大量的数据,减少了网络传输的数据量,提高了任务的执行效率。

注意事项

1)能不能将一个RDD使用 广播变量广播出去?

不能,因为RDD是不存储数据的。可以将RDD的计算结果广播出去。

2)广播变量只能在Driver端定义,不能在Executor端定义。

3)在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

9.4 使用方法

1. 🥙****创建广播变量

使用 SparkContext 的 broadcast 方法将要广播的数据包装成广播变量。

val broadcastVar = sc.broadcast(data)

2. 🥙****访问广播变量

在任务中通过 value 属性来访问广播变量的值。

broadcastVar.value

3. 不适用广播变量的问题

  def main(args: Array[String]): Unit = {
    val sc: SparkContext = SparkUtil.getSc
    val rdd1: RDD[(String, Int)] = sc.parallelize(List(("a", 1), ("b", 2), ("c", 3)))
    val rdd2: RDD[(String, Int)] = sc.parallelize(List(("a", 11), ("b", 22), ("c", 33)))
    // 数据在进行join的时候会出现shuffle , 我们在编程的时候尽量避免shuffle的产生
    val resRDD1: RDD[(String, (Int, Int))] = rdd1.join(rdd2)

    // 使用这样的方式来实现join逻辑, 这里调用的是map函数是没有shuffle产生的
     // 1 将数据收集到driver端
    val mp: Map[String, Int] = rdd2.collect().toMap
    // map函数的逻辑分布式执行在不同的Task中 , 那么我们的mp数据在每个Task中都存在
    val resRDD2: RDD[(String, (Int, Int))] = rdd1.map {
      case (k, v) => {
        val i: Int = mp.getOrElse(k, -1)
        (k, (v, i))
      }
    }
    resRDD2.foreach(println)
    sc.stop()

4. 🥙使用广播变量

package com.doit.day0217
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.io.Source
/**
 * @日期: 2024/2/19
 * @Author: Wang NaPao
 * @Blog: https://blog.csdn.net/weixin_40968325?spm=1018.2226.3001.5343
 * @Tips: 和我一起学习吧
 * @Description:
 */

object Test09 {
  def main(args: Array[String]): Unit = {
    // 创建SparkConf对象,并设置应用程序名称和运行模式
    val conf = new SparkConf()
      .setAppName("Starting...") // 设置应用程序名称
      .setMaster("local[*]") // 设置运行模式为本地模式

    // 创建SparkContext对象,并传入SparkConf对象
    val sc = new SparkContext(conf)

    // 加载用户数据,将其转换为Map
    val user = Source.fromFile("data/join/user.txt")
    val userMap: Map[String, String] = user.getLines().map(line => {
      val arr = line.split(",")
      (arr(0), arr(1))
    }).toMap // 将用户数据转换为Map,存储在内存中 一般数据集控制大小为1G以内

    //1)--------------------  将map集合变成广播变量
    val bc = sc.broadcast(userMap)
    // 加载订单数据
    val orders = sc.textFile("data/join/orders.txt")

    // 使用闭包变量userMap避免shuffle,将用户名直接关联到订单数据中
    val rdd2 = orders.map(iter => {
      val arr2 = iter.split(",")
      //val name = userMap.getOrElse(arr2(4), "unknown") // 使用闭包变量userMap关联订单数据中的用户ID
      // 2)--------------------  从广播变量中取出数据
      val name = bc.value.getOrElse(arr2(4), "unknown")
      (arr2(0), arr2(1), arr2(2), arr2(3), arr2(4), name)
    })

    // 打印处理后的订单数据
    rdd2.foreach(println)

    // 关闭SparkContext对象
    sc.stop()
  }
}

5. 销毁广播变量

从driver端发送一个RemoveBroadcast消息。在Executor上的BlockManager服务接收该消息,就会把广播变量从BlockManager中删除。若removeFromDriver设置成True,还会从Driver删除该变量的数据。

// 销毁广播变量
bc.unpersist()

9.5 广播变量分发读取机制

1. 创建原理

广播变量的创建发生在Driver端,如图所示,当调用SparkContext#broadcast来创建广播变量时,会把该变量的数据切分成多个数据块,保存到driver端的BlockManger中,使用的存储级别是:MEMORY_AND_DISK_SER。

所以,广播变量的读取也是懒加载的,只有在Executor端需要获取广播变量时才会去获取。此时广播变量的数据只在Driver端存在。

2. 读取原理

1)第1步(红色线1):首先从本Executor的BlockManager中(或本机的其他executor)读取广播变量的数据,若存在就直接获取,并返回。若不存在,则执行2或3。

2)第2步(红色线2):从远端获取数据。先从同一个机架(rack)的主机的Executor端获取。若不能从其他Executor中获取广播变量,则会直接从Driver端获取。

3)第3步(红色线3):从Driver端获取广播变量的状态和位置信息(由于所有的BlockManager slave端都会向Master端汇报数据块状态)。

从以上获取流程可以看出,在执行spark应用时,只要有一个worker节点的Executor从Driver端获取到了广播变量的数据,则其他的Executor就不需要从Driver端获取了。

BT协议

当某个Executor上的某个数据块被删除,可以从其他Executor直接获取该数据块,然后把数据块保存到自己的Executor的BlockManager中。

Executor4中的任务需要使用广播变量,但它只有该变量的b4数据块。此时,它首先从同主机(worker2节点)的中获取数据,获取到数据块;然后分别从不同主机的Executor1和Executor2中读取数据块。此时,Executor4就获取到变量b的全部数据块了,然后把这些数据块在自己的BlockManager中保存一份。此时,其他Executor就可以从Executor4中读取数据了。

当完成这些操作后,各个Executor端的BlockManager(slave端)会向Driver端的BlockManager(master端)汇报数据块的状态。


本文转载自: https://blog.csdn.net/weixin_40968325/article/details/136198917
版权归原作者 王哪跑nn 所有, 如有侵权,请联系我们删除。

“大数据 - Spark系列《九》- 广播变量”的评论:

还没有评论