0


实验7 Spark初级编程实践

一、实验目的

  • 掌握使用 Spark 访问本地文件和 HDFS 文件的方法
  • 掌握 Spark 应用程序的编写、编译和运行方法

二、实验平台

  • 操作系统:Ubuntu18.04(或 Ubuntu16.04)
  • Spark 版本:2.4.0
  • Hadoop 版本:3.1.3

三、实验内容和要求

1. 安装 Hadoop 和 Spark

进人 Linux 操作系统,完成 Hadoop 伪分布式模式的安装。完成 Hadoop 的安装以后,再安装 Spark (Local 模 式 ) 。具体安装过程 ,可以 参考教材官网(https://dblab.xmu.edu.cn/post/bigdata3/)的“教材配套大数据软件安装和编程实践指南”。

2. Spark 读取文件系统的数据

启动hadoop

  1. cd /usr/local/hadoop
  2. start-all.sh

在这里插入图片描述

(1) 在 spark-shell 中读取 Linux 系统本地文件“/home/hadoop/test.txt”,然后统计出文件的行数;

  1. echo -e "Hello\nThis is a test\nBye!">>~/test.txt

在这里插入图片描述

启动spark-shell:

  1. cd /usr/local/spark
  2. ./bin/spark-shell

在这里插入图片描述
注意此处的spark和Scala的版本
输入命令:

  1. val textFile=sc.textFile("file:///home/hadoop/test.txt")

在这里插入图片描述

  1. textFile.count()

在这里插入图片描述

(2) 在 spark-shell 中读取 HDFS 系统文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后统计出文件的行数;

如果该文件不存在,创建:

  1. hadoop fs -mkdir -p /usr/hadoop

在这里插入图片描述
在终端执行,上传test.txt文件至HDFS中:

  1. /usr/local/hadoop/bin/hdfs dfs -put ~/test.txt

在这里插入图片描述
在Spark执行

  1. val textFile=sc.textFile("hdfs://localhost:9000/user/hadoop/test.txt")
  2. textFile.count()

在这里插入图片描述

(3) 编写独立应用程序(推荐使用 Scala),读取 HDFS 文件“/user/hadoop/test.txt”(如果该文件不存在,请先创建),然后统计出文件的行数;通过 sbt 将整个应用程序编译打包成 JAR 包,并将生成的 JAR 包通过 spark-submit 提交到 Spark 中运行命令。
进入spark安装目录:

  1. cd /usr/local/spark
  2. mkdir mycode && cd mycode

创建HDFStset目录并编写Scala文件:

  1. mkdir -p HDFStest/src/main/scala
  2. vim ./HDFStest/src/main/scala/HDFStest.scala

HDFStest.scala:

  1. /* HDFStest.scala */
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.SparkContext._
  4. import org.apache.spark.SparkConf
  5. object HDFStest {
  6. def main(args: Array[String]){
  7. val logFile ="hdfs://localhost:9000/user/hadoop/test.txt"
  8. val conf =newSparkConf().setAppName("Simple Application")
  9. val sc =newSparkContext(conf)
  10. val logData = sc.textFile(logFile,2)
  11. val num = logData.count()printf("The num of this file is %d\n", num)}}

进入 HDFStest 目录,创建 simple.sbt:

  1. cd HDFStest
  2. vim simple.sbt

注意这里的 scalaVersion 是我的 Scala 版本(2.11.12),spark-core 是我的 spark 版本(2.4.0)。

  1. name :="A Simple HDFS Test"
  2. version :="1.0"
  3. scalaVersion :="2.11.12"
  4. libraryDependencies +="org.apache.spark"%%"spark-core"%"2.4.0"

在这里插入图片描述

接下来,可以通过如下代码将整个应用程序打包成 JAR:

  1. sbt package
  1. /usr/local/spark/bin/spark-submit --class "HDFStest"/usr/local/spark/mycode/HDFStest/target/scala-2.11/a-simple-hdfs-test_2.11-1.0.jar 2>&1| grep The

3. 编写独立应用程序实现数据去重

对于两个输入文件 A 和 B,编写 Spark 独立应用程序(推荐使用 Scala),对两个文件进行
合并,并剔除其中重复的内容,得到一个新文件 C。下面是输入文件和输出文件的一个样例,
可供参考。
输入文件 A 的样例如下:

  1. 20170101 x
  2. 20170102 y
  3. 20170103 x
  4. 20170104 y
  5. 20170105 z
  6. 20170106 z

输入文件 B 的样例如下:

  1. 20170101 y
  2. 20170102 y
  3. 20170103 x
  4. 20170104 z
  5. 20170105 y

根据输入的文件 A 和 B 合并得到的输出文件 C 的样例如下:

  1. 20170101 x
  2. 20170101 y
  3. 20170102 y
  4. 20170103 x
  5. 20170104 y
  6. 20170104 z
  7. 20170105 y
  8. 20170105 z
  9. 20170106 z
  1. cd /usr/local/spark/mycode
  2. mkdir -p RemDup/src/main/scala
  3. cd RemDup
  1. mkdir datas

写入A数据:

  1. vim ./datas/A

写入B数据:

  1. vim ./datas/B

写入RemDup.scala:

  1. vim ./src/main/scala/RemDup.scala

编写Scale文件

  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkContext._
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.HashPartitioner
  5. object RemDup {
  6. def main(args: Array[String]){
  7. val conf =newSparkConf().setAppName("RemDup")
  8. val sc =newSparkContext(conf)
  9. val dataFile ="file:///usr/local/spark/mycode/RemDup/datas"
  10. val data = sc.textFile(dataFile,2)
  11. val res = data.filter(_.trim().length>0).map(line=>(line.trim,"")).partitionBy(newHashPartitioner(1)).groupByKey().sortByKey().keys
  12. res.saveAsTextFile("file:///usr/local/spark/mycode/RemDup/result")}}

编写simple.sbt文件:

  1. vim simple.sbt

注意此处的scale版本和spark版本

  1. name :="Remove Duplication"
  2. version :="1.0"
  3. scalaVersion :="2.12.15"
  4. libraryDependencies +="org.apache.spark"%%"spark-core"%"3.2.0"

打包:

  1. sbt package
  1. /usr/local/spark/bin/spark-submit --class "RemDup"/usr/local/spark/mycode/RemDup/target/scala-2.11/remove-duplication_2.11-1.0.jar

在这里插入图片描述

查看结果:

  1. cat result/*

在这里插入图片描述

4. 编写独立应用程序实现求平均值问题

每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写 Spark 独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。

Algorithm 成绩:

  1. 小明 92
  2. 小红 87
  3. 小新 82
  4. 小丽 90

Database 成绩:

  1. 小明 95
  2. 小红 81
  3. 小新 89
  4. 小丽 85

Python 成绩:

  1. 小明 82
  2. 小红 83
  3. 小新 94
  4. 小丽 91

平均成绩如下:

  1. (小红,83.67)(小新,88.33)(小明,89.67)(小丽,88.67)

进入到 mycode 目录,新建 AvgScore 目录,

  1. cd /usr/local/spark/mycode
  2. mkdir -p AvgScore/src/main/scala
  3. cd AvgScore

在这里插入图片描述

新建 datas 目录,写入文件 algorithm、database、python:

  1. mkdir datas

注意这里 algorithm、database 和 python 文件内容不能有多余的换行符或者空格!

  1. vim ./datas/algorithm
  1. vim ./datas/database
  1. vim ./datas/python

编写 Scala 文件:

  1. vim ./src/main/scala/AvgScore.scala

代码如下:

  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkContext._
  3. import org.apache.spark.SparkConf
  4. import org.apache.spark.HashPartitioner
  5. object AvgScore {
  6. def main(args: Array[String]){
  7. val conf =newSparkConf().setAppName("AvgScore")
  8. val sc =newSparkContext(conf)
  9. val dataFile ="file:///usr/local/spark/mycode/AvgScore/datas"
  10. val data = sc.textFile(dataFile,3)
  11. val res = data.filter(_.trim().length>0).map(line=>(line.split(" ")(0).trim(),line.split(" ")(1).trim().toInt)).partitionBy(newHashPartitioner(1)).groupByKey().map(x =>{
  12. var n =0
  13. var sum =0.0for(i <- x._2){
  14. sum = sum + i
  15. n = n +1}
  16. val avg = sum/n
  17. val format = f"$avg%1.2f".toDouble
  18. (x._1,format)})
  19. res.saveAsTextFile("file:///usr/local/spark/mycode/AvgScore/result")}}

编写 simple.sbt 文件:

  1. vim simple.sbt

内容如下:

  1. name :="Average Score"
  2. version :="1.0"
  3. scalaVersion :="2.11.12"
  4. libraryDependencies +="org.apache.spark"%%"spark-core"%"2.4.0"

在这里插入图片描述

使用如下命令打包:

  1. sbt package

在这里插入图片描述

使用生成的 jar 包:

  1. /usr/local/spark/bin/spark-submit --class "AvgScore"/usr/local/spark/mycode/AvgScore/target/scala-2.11/average-score_2.11-1.0.jar

使用如下命令查看输出:

  1. cat result/*

输出如下:
在这里插入图片描述

四、遇到的问题:

1、 输入/usr/local/sbt/sbt package打包时,显示找不到sbt
2、 vim中无法退出
3、 报错127.0.1.1 to hadoop:9000 failed on connection exception: 拒绝连接
4、 没有文件夹/usr/Hadoop/test.txt

五、解决办法:

1、 将sbt package设置为全局变量,后续打包只需输入sbt package
2、 vim退出方法:esc :wq
3、 未开启hadoop,输入start-all.sh开启hadoop
4、 新建文件夹mkdir -p test.txt

标签: spark hadoop 大数据

本文转载自: https://blog.csdn.net/weixin_51293984/article/details/128076728
版权归原作者 一个很菜的小猪 所有, 如有侵权,请联系我们删除。

“实验7 Spark初级编程实践”的评论:

还没有评论