0


大数据基础编程、实验和教程案例(实验七)

你好# 大数据基础编程、实验和教程案例(实验七)

14.5 实验五:MapReduce 初级编程实践

本实验对应第 9 章的内容。

14.7.1 实验目的

(1)掌握使用 Spark 访问本地文件和 HDFS 文件的方法
(2)掌握 Spark 应用程序的编写、编译和运行方法

14.7.2 实验平台

操作系统LinuxHadoop版本3.1.3Spark 版本2.4.0

14.7.3 实验步骤

1.Spark读取文件系统的数据

(1)在 spark-shell 中读取 Linux 系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数;

 cd /usr/local/spark
./bin/spark-shell

scala>val textFile=sc.textFile("file:///home/hadoop/test.txt")
scala>textFile.count()

在这里插入图片描述
(2)在 spark-shell 中读取 HDFS 系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后,统计出文件的行数;

scala>val textFile=sc.textFile("hdfs://localhost:9000/user/hadoop/test.txt")
scala>textFile.count()

在这里插入图片描述
(3)编写独立应用程序(推荐使用 Scala 语言),读取 HDFS 系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后,统计出文件的行数;通过 sbt 工具将整个应用程序编译打包成 JAR 包,并将生成的 JAR 包通过 spark-submit 提交到 Spark 中运行命令。
使用 hadoop 用户名登录 Linux 系统,打开一个终端,在 Linux 终端中,执行如下命令创建一个文件夹 sparkapp 作为应用程序根目录:

cd ~ 
mkdir ./sparkapp 
mkdir -p ./sparkapp/src/main/scala 

需要注意的是,为了能够使用 sbt 对 Scala 应用程序进行编译打包,需要把应用程序代码存放在应用程序根目录下的“src/main/scala” 目录下。下面使用 vim 编辑器在“~/sparkapp/src/main/scala”下建立一个名为 SimpleApp.scala 的 Scala 代码文件,命令如下:

cd ~
vim ./sparkapp/src/main/scala/SimpleApp.scala

然后,在 SimpleApp.scala 代码文件中输入以下代码:

/* SimpleApp.scala */importorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._
importorg.apache.spark.SparkConf
object SimpleApp{
 def main(args:Array[String]){
 val logFile =" hdfs://localhost:9000/user/hadoop/test.txt"
 val conf =newSparkConf().setAppName("Simple Application")
 val sc =newSparkContext(conf)
 val logData = sc.textFile(logFile,2)
 val num = logData.count()printf("The num of this file is %d", num)}}

下面使用 sbt 对 Scala 程序进行编译打包。
SimpleApp.scala 程序依赖于 Spark API,因此,需要通过 sbt 进行编译打包以后才能运行。 首先,需要使用 vim 编辑器在“~/sparkapp”目录下新建文件 simple.sbt,命令如下:

cd ~
vim ./sparkapp/simple.sbt

simple.sbt 文件用于声明该独立应用程序的信息以及与 Spark 的依赖关系(实际上,只要扩展名使用.sbt,文件名可以不用 simple,可以自己随意命名,比如 mysimple.sbt)。需要在 simple.sbt 文件中输入以下内容:

name :="Simple Project"
version :="1.0"
scalaVersion :="2.11.12"
libraryDependencies +="org.apache.spark"%%"spark-core"%"2.4.0"

为了保证 sbt 能够正常运行,先执行如下命令检查整个应用程序的文件结构:

cd ~/sparkapp
find .

在这里插入图片描述
执行结果
在这里插入图片描述
生成的 JAR 包的位置为“~/sparkapp/target/scala-2.11/simple-project_2.11-1.0.jar”。对于前面 sbt 打包得到的应用程序 JAR 包,可以通过 spark-submit 提交到 Spark 中运行
在这里插入图片描述

2.对于两个输入文件 A 和 B,编写 Spark 独立应用程序(推荐使用 Scala 语言),对两个文件进行合并,并剔除其中重复的内容,得到一个新文件 C。

下面是输入文件和输出文件的一个样例,供参考。
输入文件 A 的样例如下:

20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z

输入文件 B 的样例如下:

20170101 y
20170102 y
20170103 x
20170104 z
20170105 y

根据输入的文件 A 和 B 合并得到的输出文件 C 的样例如下:

20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z

(1)假设当前目录为/usr/local/spark/mycode/remdup,在当前目录下新建一个目录 mkdir -p src/main/scala,然后在目录/usr/local/spark/mycode/remdup/src/main/scala 下新建一个remdup.scala,复制下面代码

importorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._
importorg.apache.spark.SparkConfimportorg.apache.spark.HashPartitioner
 
object RemDup{
    def main(args:Array[String]){
        val conf =newSparkConf().setAppName("RemDup")
        val sc =newSparkContext(conf)
        val dataFile ="file:///home/charles/data"
        val data = sc.textFile(dataFile,2)
        val res = data.filter(_.trim().length>0).map(line=>(line.trim,"")).partitionBy(newHashPartitioner(1)).groupByKey().sortByKey().keys
        res.saveAsTextFile("result")}}

(2)在目录/usr/local/spark/mycode/remdup 目录下新建 simple.sbt,复制下面代码:

name :="Simple Project"
version :="1.0"
scalaVersion :="2.11.12"
libraryDependencies +="org.apache.spark"%%"spark-core"%"2.4.0"

(3)在目录/usr/local/spark/mycode/remdup 下执行下面命令打包程序

sudo /usr/local/sbt/sbt package

(4)最后在目录/usr/local/spark/mycode/remdup 下执行下面命令提交程序

/usr/local/spark/bin/spark-submit --class"RemDup"/usr/local/spark/mycode/remdup/target/scala-2.11/simple-project_2.11-1.0.jar

查看结果
在这里插入图片描述

在这里插入图片描述

3.编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。
Algorithm成绩的样例如下:

小明 92
小红 87
小新 82
小丽 90

Database成绩的样例如下:

小明 95
小红 81
小新 89
小丽 85

Python成绩的样例如下:

小明 83
小红 82
小新 94
小丽 91

平均成绩的样例如下:

小明 89.67
小红 83.67
小新 88.33
小丽 88.67

准备工作
1.进入到mycode目录,新建RemDup目录(没有mycode目录可以新建一个)
再进入到RemDup目录中去
在这里插入图片描述

2.新建datas目录,写入文件algorithm、database、python:
在这里插入图片描述
写入文件:
在这里插入图片描述
Algorithm

小明 92
小红 87
小新 82
小丽 90

Database

小明 95
小红 81
小新 89
小丽 85

Python

小明 82
小红 83
小新 94
小丽 91

题目
书上是 avgscore ->AvgScore

(1)假设当前目录为/usr/local/spark/mycode/AvgScore,在当前目录下新建一个目录src/main/scala,然后在目 录/usr/local/spark/mycode/AvgScore/src/main/scala 下新建一个AvgScore.scala。复制如下代码:
在这里插入图片描述

//实际运行代码//导入必要的 Spark 库:importorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._
importorg.apache.spark.SparkConfimportorg.apache.spark.HashPartitioner//定义应用程序对象和入口点:
object AvgScore{
    def main(args:Array[String]){//设置 Spark 应用程序的配置:
        val conf =newSparkConf().setAppName("AvgScore")
        val sc =newSparkContext(conf)//定义数据文件的路径并加载数据:
        val dataFile ="file:///usr/local/spark/mycode/AvgScore/datas"/*
    这里使用 sc.textFile() 方法加载一个文本文件,
            3 参数指定每个分区包含的行数。
*/
        val data = sc.textFile(dataFile,3)//定义一个计算平均分的函数,并应用到数据集上:
        val res = data

    //filter() 方法通过 trim() 函数过滤空行.filter(_.trim().length >0)/*
    map() 方法转换数据集,每行数据被转换成一个元组,包含学生姓名和成绩。
    split() 方法用于将行拆分为数组
    trim() 方法用于去除多余的空格,并将成绩转换为整数。
    */.map(line =>(line.split(" ")(0).trim(), line.split(" ")(1).trim().toInt))//partitionBy() 方法将结果分区。.partitionBy(newHashPartitioner(1))//groupByKey() 方法将数据集按键(学生姓名)分组。.groupByKey()/*map() 方法遍历分组,计算平均成绩,并格式化为两位小数。
    函数返回一个新的元组,包含学生姓名和平均成绩。*//*
    下面这段代码是针对一个键值对RDD进行操作,其中每个键对应一个浮点数数组。
    代码中的.map()函数将每个键值对映射为一个新的键值对,
    其中新的值是原始数组的平均值(保留两位小数)。
    首先定义了一个变量n和一个变量sum,分别用于计算数组元素的数量和总和。
    然后,使用for循环遍历数组中的每个元素,将其加入sum中,并将n加1。
    接下来,计算平均值avg,并使用f"$avg%1.2f"格式化为保留两位小数的字符串,
    最后将键值对的值更新为这个字符串转换为Double类型的结果。
    最终,代码返回一个新的键值对,其中键与原始RDD中的键相同,
    而值则为该键对应的数组的平均值(保留两位小数)。
    */.map(x =>{var n =0var sum =0.0for(i <- x._2){
            sum = sum + i
            n = n +1}
        val avg = sum / n
        val format = f"$avg%1.2f".toDouble
        (x._1, format)})/*
    将结果保存到输出文件:
    这里使用 saveAsTextFile() 方法将结果保存到一个文本文件。
    Spark 会自动将数据保存在多个分区中。
    */
       res.saveAsTextFile("file:///usr/local/spark/mycode/AvgScore/result")}}

(2)在/usr/local/spark/mycode/AvgScore目录下新建simple.sbt,复制如下代码:

name :="Simple Project"
version :="1.0"
scalaVersion :="2.11.12"
libraryDependencies +="org.apache.spark"%%"spark-core"%"2.4.0"

这里我先查看scala 和spark版本并修改

scala 2.11.12 => 2.11.6

spark 2.4.0 => 2.4.0
在这里插入图片描述
在这里插入图片描述
修改好内容如下:
在这里插入图片描述
(3)在/usr/local/spark/mycode/AvgScore目录下执行如下命令打包程序:

sudo /usr/local/sbt/sbt package

在这里插入图片描述
结果如下:
在这里插入图片描述
(4)在/usr/local/spark/mycode/AvgScore 目录下执行如下命令提交程序:

/usr/local/spark/bin/spark-submit --class"AvgScore"/usr/local/spark/mycode/avgscore/target/scala-2.11/simple-project_2.11-1.0.jar

在这里插入图片描述
(5)在/usr/local/spark/mycode/AvgScore/result 目录下即可得到结果文件。
在这里插入图片描述
补充:
scala运行版本和查看版本不一致,让人思考simple.sbt底下用哪个版本合适

实验结果过后表明,两个版本都可以
在这里插入图片描述
scala2.11.12 底下的运行结果
在这里插入图片描述
查看结果
在这里插入图片描述


本文转载自: https://blog.csdn.net/weixin_63180557/article/details/131235288
版权归原作者 Gala8227 所有, 如有侵权,请联系我们删除。

“大数据基础编程、实验和教程案例(实验七)”的评论:

还没有评论