0


大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka(已更完)
  • Spark(正在更新!)

章节内容

上节我们完成了如下的内容:

  • Spark 学习 WordCount 程序
  • Scala & Java 的方式分别编写 WordCount 程序

在这里插入图片描述

计算圆周率

需求背景

我们要实现一个程序来实现圆周率的计算,将利用下面的公式:
在这里插入图片描述

编写代码

packageicu.wzkimportorg.apache.spark.{SparkConf, SparkContext}importscala.math.randomobject SparkPi {def main(args: Array[String]):Unit={var conf =new SparkConf().setAppName("ScalaSparkPi").setMaster("local[*]")val sc =new SparkContext(conf)
    sc.setLogLevel("WARN")val slices =if(args.length >0){
      args(0).toInt
    }else{0}val N =100000000val count = sc.makeRDD(1 to N, slices).map(idx =>{val(x, y)=(random, random)if(x*x + y*y <=1){1}else{0}}).reduce(_ + _)
    println(s"Pi is ${4.0* count / N}")}}

代码部分截图如下所示:
在这里插入图片描述

代码解释

object SparkPi { … }

这个对象定义了一个 Scala 应用程序的入口。Scala 的 object 关键字用于定义一个单例对象,这意味着 SparkPi 只能有一个实例。

def main(args: Array[String]): Unit = { … }

main 方法是 Scala 应用程序的入口点,类似于 Java 中的 main 方法。args 是传递给程序的命令行参数,类型为 Array[String]。Unit 表示该方法没有返回值。

var conf = new SparkConf().setAppName(“ScalaSparkPi”)

  • SparkConf() 用于配置 Spark 应用程序。setAppName(“ScalaSparkPi”) 设置应用程序的名称为 ScalaSparkPi。
  • setMaster("local[]") 表示 Spark 应用程序将在本地运行,使用所有可用的 CPU 核心。local[] 是 Spark 中的特殊设置,表示本地模式下使用所有的 CPU 核心。

val sc = new SparkContext(conf)

SparkContext 是 Spark 应用程序的核心,负责与 Spark 集群进行交互。这里通过配置对象 conf 创建了一个新的 SparkContext 实例。

sc.setLogLevel(“WARN”)

设置日志的级别为 “WARN”。这意味着只会记录警告级别及以上的日志信息,减少不必要的日志输出。

val slices = if (args.length > 0) { … }

这段代码用来处理传递给程序的第一个参数,如果有参数传递过来,则将其转换为整数,作为分片数 slices。如果没有参数,则默认值为 0。

val N = 100000000

定义一个常量 N,表示将进行一亿次随机点的生成,以此来估算 \pi 值。

val count = sc.makeRDD(1 to N, slices)

  • sc.makeRDD(1 to N, slices) 创建一个包含从 1 到 N 的整数的 RDD(弹性分布式数据集),并将其划分为 slices 个分片进行并行计算。
  • map(idx => { … }) 是对 RDD 中的每个元素进行映射操作。对于每个 idx,生成两个随机数 x 和 y,分别表示点的 x 和 y 坐标。
  • if (xx + yy <= 1) 判断点 (x, y) 是否在单位圆内。如果在圆内,则返回 1,否则返回 0。

reduce(_ + _)

  • reduce(_ + _) 将所有的 1 和 0 相加,得到在单位圆内的点的总数。

println(s"Pi is ${4.0 * count / N}")

  • 计算 \pi 的估计值:使用公式 \pi \approx 4 \times (\text{圆内点的数量} / \text{总点数})。
  • 输出计算结果。

打包上传

mvn clean package

打包完成上传Jar包:
在这里插入图片描述

运行项目

spark-submit --master local[*]--class icu.wzk.SparkPi spark-wordcount-1.0-SNAPSHOT.jar 15

运行等待结果
在这里插入图片描述
运行完毕的结果如下:
在这里插入图片描述

找共同好友

需求背景

目前有一组数据

100, 200300400500600200, 100300400300, 100200400500400, 100200300500, 100300600, 100

第一列表示用户,后边的数字表示该用户的好友,我们要对上面的这几列进行分析计算,得出共同的好友。
在这里插入图片描述

编写代码

方法一

核心思想利用笛卡尔积求两两之间的好友 然后去除多余的数据

packageicu.wzkimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object FindFriends {def main(args: Array[String]):Unit={val conf =new SparkConf().setAppName("SparkFindFriends").setMaster("local[*]")val sc =new SparkContext(conf)
    sc.setLogLevel("WARN")val lines: RDD[String]= sc.textFile(args(0))val friendsRDD: RDD[(String, Array[String])]= lines.map{
      line =>val fields: Array[String]= line.split(",")val userId = fields(0).trim
        val friends:  Array[String]= fields(1).trim.split("\\s+")(userId, friends)}
    friendsRDD
      .cartesian(friendsRDD).filter({case((id1, _),(id2, _))=> id1 < id2
      }).map{case((id1, friends1),(id2, friends2))=>((id1, id2), friends1.intersect(friends2).sorted.toBuffer)}.sortByKey().collect().foreach(println)
    sc.stop()}}

方法二

消除笛卡尔积 核心思想是:将数据变形,找到两两的好友,再执行数据的合并

packageicu.wzkimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object FindFriends2 {def main(args: Array[String]):Unit={val conf =new SparkConf().setAppName("SparkFindFriends").setMaster("local[*]")val sc =new SparkContext(conf)
    sc.setLogLevel("WARN")val lines: RDD[String]= sc.textFile(args(0))val friendsRDD: RDD[(String, Array[String])]= lines.map{
      line =>val fields: Array[String]= line.split(",")val userId = fields(0).trim
        val friends:  Array[String]= fields(1).trim.split("\\s+")(userId, friends)}
    friendsRDD
      .flatMapValues(friends => friends.combinations(2)).map{case(k, v)=>(v.mkString(" & "), Set(k))}.reduceByKey(_ | _).sortByKey().collect().foreach(println)
    sc.stop()}}

打包上传

在这里插入图片描述

运行项目

方法一

spark-submit --master local[*]--class icu.wzk.FindFriends spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt

运行结果如下图:
在这里插入图片描述

方法二

spark-submit --master local[*]--class icu.wzk.FindFriends2 spark-wordcount-1.0-SNAPSHOT.jar /opt/wzk/friends.txt

运行结果如下图所示:
在这里插入图片描述

标签: 大数据 spark 学习

本文转载自: https://blog.csdn.net/w776341482/article/details/141275559
版权归原作者 武子康 所有, 如有侵权,请联系我们删除。

“大数据-87 Spark 集群 案例学习 Spark Scala 案例 手写计算圆周率、计算共同好友”的评论:

还没有评论