0


spark中RDD的累加器的详细解释

累加器 (accumulators):累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)。

示例代码:

import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}

  object accumulator$ {
    def main(args: Array[String]): Unit = {
      val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
      val sc: SparkContext = new SparkContext(conf)
      sc.setLogLevel("WARN")
      var counter1: Int = 0;
      var data = Seq(1,2,3)

      data.foreach(x => counter1 += x )
      println(counter1)//6
      
      println("+++++++++++++++++++++++++")

      var counter2: Int = 0;
      val dataRDD: RDD[Int] = sc.parallelize(data)
      dataRDD.foreach(x => counter2 += x)
      println(counter2)//0

      val counter3: Accumulator[Int] = sc.accumulator(0)
      dataRDD.foreach(x => counter3 += x)
      println(counter3)//6
    }
  }

备注:此代码取材网络,这里便于学习使用

代码解释:

1.val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")

以下是对这段代码的分析: 一、整体功能 这段代码是使用 Spark 的 Java/Scala 语言编写的配置部分。它创建了一个 SparkConf 对象,并设置了应用程序的名称为“wc”以及设置了主节点为“local[]”。 二、具体解释 1. val conf: SparkConf = new SparkConf(): - 这行代码声明一个名为conf的变量,类型为SparkConf,并通过调用构造函数创建一个新的 Spark 配置对象。 2. .setAppName("wc"): - 设置应用程序的名称为“wc”。应用程序名称在 Spark 的监控界面和日志中用于标识特定的作业。 3. .setMaster("local[*]"): - 设置 Spark 主节点的地址为“local[]”。这表示在本地以多线程的方式运行 Spark 作业,其中星号“*”表示使用本地机器上尽可能多的核心。

这个配置通常用于在本地开发和测试 Spark 应用程序,因为它允许快速迭代和调试,而无需连接到一个分布式的 Spark 集群。如果你想要在真正的分布式环境中运行 Spark 作业,需要将setMaster方法设置为适当的集群管理器地址,例如“spark://master:7077”(对于 Spark 独立集群)或“yarn-client”(对于在 YARN 上运行)等。

2.val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN")

以下是对这段代码的分析:

一、整体功能:这段代码在创建了一个 SparkConf 对象(在前面的代码中创建)后,使用这个配置对象创建了一个 SparkContext,并设置了日志级别为“WARN”。

二、具体解释*:1. val sc: SparkContext = new SparkContext(conf): - 这里声明一个名为sc的变量,类型为SparkContext,并使用前面创建的conf配置对象作为参数来构造一个 SparkContext。SparkContext 是 Spark 应用程序的入口点,它负责与 Spark 集群进行交互,提交作业并管理资源。

  1. sc.setLogLevel("WARN"): 这行代码设置 Spark 的日志级别为“WARN”。这意味着只有警告级别及以上的日志消息会被输出。通过设置日志级别,可以减少不必要的日志输出,使日志更加简洁,便于关注重要的信息。 可以根据具体的需求调整日志级别。

常见的日志级别还有“INFO”(显示信息级别日志)、“ERROR”(只显示错误级别日志)等。不同的日志级别可以帮助在开发和调试过程中更好地理解应用程序的运行情况,同时在生产环境中可以避免过多的日志输出影响性能。

3.var counter1: Int = 0; var data = Seq(1,2,3) data.foreach(x => counter1 += x ) println(counter1)//6 println("+++++++++++++++++++++++++")

以下是对这段代码的分析:

一、整体功能: 这段代码首先定义了一个整数变量 counter1 并初始化为 0。然后创建了一个包含整数序列 Seq(1, 2, 3)。接着使用 foreach 循环遍历这个序列,并将序列中的每个元素累加到 counter1 中。最后,打印出 counter1 的值,接着打印一个分隔字符串。

二、具体解释:

  1. var counter1: Int = 0;: - 定义一个可变的整数变量 counter1,初始值为 0。

  2. var data = Seq(1,2,3): - 创建一个不可变的序列 data,包含三个整数元素 1、2 和 3。

  3. data.foreach(x => counter1 += x ): - 使用 foreach 方法遍历 data 序列中的每个元素。对于每个元素 x,执行 counter1 += x,即将当前元素的值加到 counter1 上。

  4. println(counter1): - 打印 counter1 的值,此时 counter1 为序列中所有元素的总和,即 6。 5. println("+++++++++++++++++++++++++"): - 打印一个分隔字符串,用于在输出中进行区分。 这种方式在单线程环境下是可行的,但在分布式环境下(如 Spark 集群中),这种累加方式可能会出现问题,因为不同的执行单元可能会同时尝试修改 counter1,导致结果不准确。在分布式环境中,应该使用累加器等机制来进行安全的累加操作。

4.var counter2: Int = 0; val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3] dataRDD.foreach(x => counter2 += x) println(counter2)

以下是对这段代码的分析:

一、整体功能:这段代码首先定义了一个整数变量 counter2 并初始化为 0。接着,将本地的序列 data(假设是 Seq(1,2,3))转换为分布式弹性数据集(RDD)dataRDD。然后尝试使用 foreach 方法遍历这个分布式 RDD,并将每个元素累加到 counter2。最后打印 counter2 的值,但结果为 0。

二、具体解释:

  1. var counter2: Int = 0;: - 定义一个可变的整数变量 counter2,初始值为 0。

  2. val dataRDD: RDD[Int] = sc.parallelize(data): - 使用 SparkContextparallelize 方法将本地的序列 data 转换为分布式的 RDD。这个 RDD 可以在分布式环境中进行并行处理。

  3. dataRDD.foreach(x => counter2 += x): - 尝试使用 foreach 方法遍历分布式 RDD 的每个元素,并将元素值累加到 counter2。然而,在分布式环境下,foreach 操作是在各个工作节点上并行执行的,并且对外部变量的修改通常是在各个任务的本地执行环境中进行的,不会将修改反映回驱动程序。

  4. println(counter2): - 打印 counter2 的值,由于在分布式环境下对 counter2 的累加操作没有正确地反映回驱动程序,所以仍然是初始值 0。 在 Spark 中,如果要在分布式环境下进行累加操作,可以使用累加器(Accumulator)来实现正确的累加效果。累加器可以确保在分布式计算中对一个变量的累加操作是安全和正确的。

5.val counter3: Accumulator[Int] = sc.accumulator(0) dataRDD.foreach(x => counter3 += x) println(counter3)//6

以下是对这段代码的分析: :

一、整体功能:这段代码首先创建了一个名为 counter3 的累加器,初始值为 0。然后使用分布式 RDD(dataRDD,假设包含元素 [1, 2, 3])的 foreach 方法遍历每个元素,并将元素值累加到累加器 counter3 中。最后打印累加器的值,得到正确的累加结果 6。

二、具体解释:

  1. val counter3: Accumulator[Int] = sc.accumulator(0): - 创建一个类型为 Accumulator[Int] 的累加器 counter3,并初始化为 0。累加器是 Spark 中用于在分布式计算中安全地进行累加操作的特殊变量。

  2. dataRDD.foreach(x => counter3 += x): - 使用 foreach 方法遍历分布式 RDD dataRDD 的每个元素。对于每个元素 x,执行 counter3 += x,即将元素值加到累加器 counter3 上。在分布式环境下,这个操作会在各个工作节点上并行执行。 - 累加器的特殊之处在于,它们可以确保在分布式计算中对变量的累加操作是安全的,并且最终的结果会正确地反映回驱动程序。

  3. println(counter3): - 打印累加器 counter3 的值。由于累加器的正确工作机制,这里会得到正确的累加结果 6,即分布式 RDD 中所有元素的总和。 使用累加器可以有效地在 Spark 的分布式计算中进行累加、计数等操作,确保结果的准确性。

备注:此篇文章为平时学习笔记,以供参考,若有补充,还请大佬不吝赐教


本文转载自: https://blog.csdn.net/qq_73823758/article/details/143052683
版权归原作者 翻斗花园龙爷爷 所有, 如有侵权,请联系我们删除。

“spark中RDD的累加器的详细解释”的评论:

还没有评论