点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(已更完)
- Spark(正在更新!)
章节内容
上节我们完成了如下的内容:
- Spark 学习 WordCount 程序
- Scala & Java 的方式分别编写 WordCount 程序
计算圆周率
需求背景
我们要实现一个程序来实现圆周率的计算,将利用下面的公式:
编写代码
packageicu.wzkimportorg.apache.spark.{SparkConf, SparkContext}importscala.math.randomobject SparkPi {def main(args: Array[String]):Unit={var conf =new SparkConf().setAppName("ScalaSparkPi").setMaster("local[*]")val sc =new SparkContext(conf)
sc.setLogLevel("WARN")val slices =if(args.length >0){
args(0).toInt
}else{0}val N =100000000val count = sc.makeRDD(1 to N, slices).map(idx =>{val(x, y)=(random, random)if(x*x + y*y <=1){1}else{0}}).reduce(_ + _)
println(s"Pi is ${4.0* count / N}")}}
代码部分截图如下所示:
代码解释
object SparkPi { … }
这个对象定义了一个 Scala 应用程序的入口。Scala 的 object 关键字用于定义一个单例对象,这意味着 SparkPi 只能有一个实例。
def main(args: Array[String]): Unit = { … }
main 方法是 Scala 应用程序的入口点,类似于 Java 中的 main 方法。args 是传递给程序的命令行参数,类型为 Array[String]。Unit 表示该方法没有返回值。
var conf = new SparkConf().setAppName(“ScalaSparkPi”)
- SparkConf() 用于配置 Spark 应用程序。setAppName(“ScalaSparkPi”) 设置应用程序的名称为 ScalaSparkPi。
- setMaster("local[]") 表示 Spark 应用程序将在本地运行,使用所有可用的 CPU 核心。local[] 是 Spark 中的特殊设置,表示本地模式下使用所有的 CPU 核心。
val sc = new SparkContext(conf)
SparkContext 是 Spark 应用程序的核心,负责与 Spark 集群进行交互。这里通过配置对象 conf 创建了一个新的 SparkContext 实例。
sc.setLogLevel(“WARN”)
设置日志的级别为 “WARN”。这意味着只会记录警告级别及以上的日志信息,减少不必要的日志输出。
val slices = if (args.length > 0) { … }
这段代码用来处理传递给程序的第一个参数,如果有参数传递过来,则将其转换为整数,作为分片数 slices。如果没有参数,则默认值为 0。
val N = 100000000
定义一个常量 N,表示将进行一亿次随机点的生成,以此来估算 \pi 值。
val count = sc.makeRDD(1 to N, slices)
- sc.makeRDD(1 to N, slices) 创建一个包含从 1 到 N 的整数的 RDD(弹性分布式数据集),并将其划分为 slices 个分片进行并行计算。
- map(idx => { … }) 是对 RDD 中的每个元素进行映射操作。对于每个 idx,生成两个随机数 x 和 y,分别表示点的 x 和 y 坐标。
- if (xx + yy <= 1) 判断点 (x, y) 是否在单位圆内。如果在圆内,则返回 1,否则返回 0。
reduce(_ + _)
- reduce(_ + _) 将所有的 1 和 0 相加,得到在单位圆内的点的总数。
println(s"Pi is ${4.0 * count / N}")
- 计算 \pi 的估计值:使用公式 \pi \approx 4 \times (\text{圆内点的数量} / \text{总点数})。
- 输出计算结果。
打包上传
mvn clean package
打包完成上传Jar包:
运行项目
spark-submit --master local[*]--class icu.wzk.SparkPi spark-wordcount-1.0-SNAPSHOT.jar 15
运行等待结果
运行完毕的结果如下:
找共同好友
需求背景
目前有一组数据
100, 200300400500600200, 100300400300, 100200400500400, 100200300500, 100300600, 100
第一列表示用户,后边的数字表示该用户的好友,我们要对上面的这几列进行分析计算,得出共同的好友。
编写代码
方法一
核心思想利用笛卡尔积求两两之间的好友 然后去除多余的数据
packageicu.wzkimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object FindFriends {def main(args: Array[String]):Unit={val conf =new SparkConf().setAppName("SparkFindFriends").setMaster("local[*]")val sc =new SparkContext(conf)
sc.setLogLevel("WARN")val lines: RDD[String]= sc.textFile(args(0))val friendsRDD: RDD[(String, Array[String])]= lines.map{
line =>val fields: Array[String]= line.split(",")val userId = fields(0).trim
val friends: Array[String]= fields(1).trim.split("\\s+")(userId, friends)}
friendsRDD
.cartesian(friendsRDD).filter({case((id1, _),(id2, _))=> id1 < id2
}).map{case((id1, friends1),(id2, friends2))=>((id1, id2), friends1.intersect(friends2).sorted.toBuffer)}.sortByKey().collect().foreach(println)
sc.stop()}}
方法二
消除笛卡尔积 核心思想是:将数据变形,找到两两的好友,再执行数据的合并
packageicu.wzkimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object FindFriends2 {def main(args: Array[String]):Unit={val conf =new SparkConf().setAppName("SparkFindFriends").setMaster("local[*]")val sc =new SparkContext(conf)
sc.setLogLevel("WARN")val lines: RDD[String]= sc.textFile(args(0))val friendsRDD: RDD[(String, Array[String])]= lines.map{
line =>val fields: Array[String]= line.split(",")val userId = fields(0).trim
val friends: Array[String]= fields(1).trim.split("\\s+")(userId, friends)}
friendsRDD
.flatMapValues(friends => friends.combinations(2)).map{case(k, v)=>(v.mkString(" & "), Set(k))}.reduceByKey(_ | _).sortByKey().collect().foreach(println)
sc.stop()}}
打包上传
运行项目
方法一
spark-submit --master local[*]--class icu.wzk.FindFriends spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt
运行结果如下图:
方法二
spark-submit --master local[*]--class icu.wzk.FindFriends2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt
运行结果如下图所示:
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。