累加器 (accumulators):累加器支持在所有不同节点之间进行累加计算(比如计数或者求和)。
示例代码:
import org.apache.spark.rdd.RDD
import org.apache.spark.{Accumulator, SparkConf, SparkContext}
object accumulator$ {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
var counter1: Int = 0;
var data = Seq(1,2,3)
data.foreach(x => counter1 += x )
println(counter1)//6
println("+++++++++++++++++++++++++")
var counter2: Int = 0;
val dataRDD: RDD[Int] = sc.parallelize(data)
dataRDD.foreach(x => counter2 += x)
println(counter2)//0
val counter3: Accumulator[Int] = sc.accumulator(0)
dataRDD.foreach(x => counter3 += x)
println(counter3)//6
}
}
备注:此代码取材网络,这里便于学习使用
代码解释:
1.val conf: SparkConf = new SparkConf().setAppName("wc").setMaster("local[*]")
以下是对这段代码的分析: 一、整体功能 这段代码是使用 Spark 的 Java/Scala 语言编写的配置部分。它创建了一个 SparkConf 对象,并设置了应用程序的名称为“wc”以及设置了主节点为“local[]”。 二、具体解释 1. val conf: SparkConf = new SparkConf()
: - 这行代码声明一个名为conf
的变量,类型为SparkConf
,并通过调用构造函数创建一个新的 Spark 配置对象。 2. .setAppName("wc")
: - 设置应用程序的名称为“wc”。应用程序名称在 Spark 的监控界面和日志中用于标识特定的作业。 3. .setMaster("local[*]")
: - 设置 Spark 主节点的地址为“local[]”。这表示在本地以多线程的方式运行 Spark 作业,其中星号“*”表示使用本地机器上尽可能多的核心。
这个配置通常用于在本地开发和测试 Spark 应用程序,因为它允许快速迭代和调试,而无需连接到一个分布式的 Spark 集群。如果你想要在真正的分布式环境中运行 Spark 作业,需要将setMaster
方法设置为适当的集群管理器地址,例如“spark://master:7077”(对于 Spark 独立集群)或“yarn-client”(对于在 YARN 上运行)等。
2.val sc: SparkContext = new SparkContext(conf) sc.setLogLevel("WARN")
以下是对这段代码的分析:
一、整体功能:这段代码在创建了一个 SparkConf 对象(在前面的代码中创建)后,使用这个配置对象创建了一个 SparkContext,并设置了日志级别为“WARN”。
二、具体解释*:1. val sc: SparkContext = new SparkContext(conf)
: - 这里声明一个名为sc
的变量,类型为SparkContext
,并使用前面创建的conf
配置对象作为参数来构造一个 SparkContext。SparkContext 是 Spark 应用程序的入口点,它负责与 Spark 集群进行交互,提交作业并管理资源。
sc.setLogLevel("WARN")
: 这行代码设置 Spark 的日志级别为“WARN”。这意味着只有警告级别及以上的日志消息会被输出。通过设置日志级别,可以减少不必要的日志输出,使日志更加简洁,便于关注重要的信息。 可以根据具体的需求调整日志级别。
常见的日志级别还有“INFO”(显示信息级别日志)、“ERROR”(只显示错误级别日志)等。不同的日志级别可以帮助在开发和调试过程中更好地理解应用程序的运行情况,同时在生产环境中可以避免过多的日志输出影响性能。
3.var counter1: Int = 0; var data = Seq(1,2,3) data.foreach(x => counter1 += x ) println(counter1)//6 println("+++++++++++++++++++++++++")
以下是对这段代码的分析:
一、整体功能: 这段代码首先定义了一个整数变量 counter1
并初始化为 0。然后创建了一个包含整数序列 Seq(1, 2, 3)
。接着使用 foreach
循环遍历这个序列,并将序列中的每个元素累加到 counter1
中。最后,打印出 counter1
的值,接着打印一个分隔字符串。
二、具体解释:
var counter1: Int = 0;
: - 定义一个可变的整数变量counter1
,初始值为 0。var data = Seq(1,2,3)
: - 创建一个不可变的序列data
,包含三个整数元素 1、2 和 3。data.foreach(x => counter1 += x )
: - 使用foreach
方法遍历data
序列中的每个元素。对于每个元素x
,执行counter1 += x
,即将当前元素的值加到counter1
上。println(counter1)
: - 打印counter1
的值,此时counter1
为序列中所有元素的总和,即 6。 5.println("+++++++++++++++++++++++++")
: - 打印一个分隔字符串,用于在输出中进行区分。 这种方式在单线程环境下是可行的,但在分布式环境下(如 Spark 集群中),这种累加方式可能会出现问题,因为不同的执行单元可能会同时尝试修改counter1
,导致结果不准确。在分布式环境中,应该使用累加器等机制来进行安全的累加操作。
4.var counter2: Int = 0; val dataRDD: RDD[Int] = sc.parallelize(data) //分布式集合的[1,2,3] dataRDD.foreach(x => counter2 += x) println(counter2)
以下是对这段代码的分析:
一、整体功能:这段代码首先定义了一个整数变量 counter2
并初始化为 0。接着,将本地的序列 data
(假设是 Seq(1,2,3)
)转换为分布式弹性数据集(RDD)dataRDD
。然后尝试使用 foreach
方法遍历这个分布式 RDD,并将每个元素累加到 counter2
。最后打印 counter2
的值,但结果为 0。
二、具体解释:
var counter2: Int = 0;
: - 定义一个可变的整数变量counter2
,初始值为 0。val dataRDD: RDD[Int] = sc.parallelize(data)
: - 使用SparkContext
的parallelize
方法将本地的序列data
转换为分布式的 RDD。这个 RDD 可以在分布式环境中进行并行处理。dataRDD.foreach(x => counter2 += x)
: - 尝试使用foreach
方法遍历分布式 RDD 的每个元素,并将元素值累加到counter2
。然而,在分布式环境下,foreach
操作是在各个工作节点上并行执行的,并且对外部变量的修改通常是在各个任务的本地执行环境中进行的,不会将修改反映回驱动程序。println(counter2)
: - 打印counter2
的值,由于在分布式环境下对counter2
的累加操作没有正确地反映回驱动程序,所以仍然是初始值 0。 在 Spark 中,如果要在分布式环境下进行累加操作,可以使用累加器(Accumulator)来实现正确的累加效果。累加器可以确保在分布式计算中对一个变量的累加操作是安全和正确的。
5.val counter3: Accumulator[Int] = sc.accumulator(0) dataRDD.foreach(x => counter3 += x) println(counter3)//6
以下是对这段代码的分析: :
一、整体功能:这段代码首先创建了一个名为 counter3
的累加器,初始值为 0。然后使用分布式 RDD(dataRDD
,假设包含元素 [1, 2, 3]
)的 foreach
方法遍历每个元素,并将元素值累加到累加器 counter3
中。最后打印累加器的值,得到正确的累加结果 6。
二、具体解释:
val counter3: Accumulator[Int] = sc.accumulator(0)
: - 创建一个类型为Accumulator[Int]
的累加器counter3
,并初始化为 0。累加器是 Spark 中用于在分布式计算中安全地进行累加操作的特殊变量。dataRDD.foreach(x => counter3 += x)
: - 使用foreach
方法遍历分布式 RDDdataRDD
的每个元素。对于每个元素x
,执行counter3 += x
,即将元素值加到累加器counter3
上。在分布式环境下,这个操作会在各个工作节点上并行执行。 - 累加器的特殊之处在于,它们可以确保在分布式计算中对变量的累加操作是安全的,并且最终的结果会正确地反映回驱动程序。println(counter3)
: - 打印累加器counter3
的值。由于累加器的正确工作机制,这里会得到正确的累加结果 6,即分布式 RDD 中所有元素的总和。 使用累加器可以有效地在 Spark 的分布式计算中进行累加、计数等操作,确保结果的准确性。
备注:此篇文章为平时学习笔记,以供参考,若有补充,还请大佬不吝赐教
版权归原作者 翻斗花园龙爷爷 所有, 如有侵权,请联系我们删除。