一、目的与要求
1、熟悉Spark的RDD基本操作及键值对操作;
2、熟悉使用RDD编程解决实际具体问题的方法。
二、实验内容
1.给定数据集 data1.txt,包含了某大学计算机系的成绩,数据格式如下所示:
Tom,DataBase,80
Tom,Algorithm,50
Tom,DataStructure,60
Jim,DataBase,90
Jim,Algorithm,60
Jim,DataStructure,80
……
请根据给定的实验数据,在pyspark中通过编程来计算以下内容:
(1)该系总共有多少学生;
先获取每行的姓名字段,再将其用字典统计汇总,最后统计出几个键值对即为学生数量
lines=sc.textFile("file:///home/deeszechyi/data1.txt")
lines.foreach(print)
namecount=lines.map(lambda line:(line.split(",")[0],1))
namecount.foreach(print)
namecount=namecount.reduceByKey(lambda x,y:(x+y))
namecount.foreach(print)
namecount.count()
(2)该系共开设了多少门课程;
可以考虑先使用map函数映射获取课程字段,再用字典统计,方法与第一小问类似
coursecount=lines.map(lambda x:x.split(",")[1])
coursecount.foreach(print)
coursecount=coursecount.map(lambda x:(x,1))
coursecount.foreach(print)
coursecount=coursecount.reduceByKey(lambda x,y:x+y)
coursecount.count()
(3)Tom同学的总成绩平均分是多少;
本题可以考虑使用filter方法过滤姓名字段为Tom的记录,再映射其课程分数
filtered_rdd = score.filter(lambda x: x[0] == "Tom").map(lambda x: int(x[1]))
tom_sum=filtered_rdd.reduce(lambda a,b:a+b)
print(tom_sum)
tom_ave=tom_sum/filtered_rdd.count()
print(tom_ave)
(4)求每名同学的选修的课程门数;
该题可直接映射获取姓名字段,并用字典统计每个姓名出现次数,该次数即代表该同学所选修的课程数。
stu=lines.map(lambda x:x.split(“,”)[0])
stu.foreach(print)
stu=stu.map(lambda x:(x,1)).reduceByKey(lambda a,b:(a+b))
stu.foreach(print)
(5)该系DataBase课程共有多少人选修;
本题对数据集直接映射过滤课程为DataBase的课程
course=lines.map(lambda x:x.split(",")[1])
course=course.filter(lambda x:x=="DataBase")
course.count()
(6)各门课程的平均分是多少;
针对问题(6),考虑使用嵌套形式的数据结构来存储,从该数据集中映射出课程名称和分数,对课程出现次数用字典进行统计:(课程名称, (分数, 1))使用reduceByKey方法将分数和方法加,得到新的数据:(课程名称,(总分数,总人数))
cave=lines.map(lambda x:(x.split(",")[1],(x.split(",")[2],1)))
cave.foreach(print)
cave=cave.reduceByKey(lambda x,y:(int(x[0])+int(y[0]),x[1]+y[1]))
cave.foreach(print)
cave=cave.map(lambda x:(x[0],x[1][0]/x[1][1]))
cave.foreach(print)
(7)使用累加器计算共有多少人选了DataBase这门课。
本题使用map方法映射课程字段并用字典对其进行统计,统计结果使用filter过滤即可
course=lines.map(lambda x:x.split(“,”)[1])
course=course.map(lambda x:(x,1))
course=course.reduceByKey(lambda x,y:(x+y))
course.foreach(print)
DB=course.filter(lambda x:x[0]==’DataBase’).map(lambda x:x[1])
DB.foreach(print)
2.编写独立应用程序实现数据去重
对于两个输入文件A和B,编写Spark独立应用程序,对两个文件进行合并,并剔除其中重复的内容,得到一个新文件C。下面是输入文件和输出文件的一个样例,供参考。
输入文件A的样例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 z
输入文件B的样例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y
根据输入的文件A和B合并得到的输出文件C的样例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 z
将数据写入文件A和B
创建unique.py文件,代码如下:
from pyspark import SparkContext, SparkConf
conf = SparkConf().setMaster("local").setAppName("Sparkunique")
sc = SparkContext(conf=conf)
linesA = sc.textFile("file:///home/deeszechyi/A.txt")
linesB = sc.textFile("file:///home/deeszechyi/B.txt")
lines = linesA.union(linesB)
lines = lines.distinct()
lines = lines.sortBy(lambda x: x)
lines.repartition(1).saveAsTextFile("file:///home/deeszechyi/C.txt")
该段代码读取A和B文件,将两个文件内容合并去重并按照第一个字段排序,保存到C.txt中
3.编写独立应用程序实现求平均值问题
每个输入文件表示班级学生某个学科的成绩,每行内容由两个字段组成,第一个是学生名字,第二个是学生的成绩;编写Spark独立应用程序求出所有学生的平均成绩,并输出到一个新文件中。下面是输入文件和输出文件的一个样例,供参考。
Algorithm成绩:
小明 92
小红 87
小新 82
小丽 90
Database成绩:
小明 95
小红 81
小新 89
小丽 85
Python成绩:
小明 82
小红 83
小新 94
小丽 91
平均成绩如下:
(小红,83.67)
(小新,88.33)
(小明,89.67)
(小丽,88.67)
创建三个文本文件和一个.py文件
代码如下:
from pyspark import SparkContext,SparkConf
conf=SparkConf().setMaster("local").setAppName("avescore")
sc=SparkContext(conf=conf)
linesA=sc.textFile("file:///home/deeszechyi/Algorithm.txt")
linesB=sc.textFile("file:///home/deeszechyi/Database.txt")
linesC=sc.textFile("file:///home/deeszechyi/Python.txt")
lines=linesA.union(linesB).union(linesC)
uniquelines=lines.distinct()
ave=uniquelines.sortBy(lambda x:x).filter(bool)
ave=ave.map(lambda x:x.split())
ave=ave.map(lambda x:(x[0],(int(x[1]),1)))
ave=ave.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
ave=ave.map(lambda x:(x[0],x[1][0]/x[1][1]))
ave.repartition(1).saveAsTextFile("file:///home/deeszechyi/ave.txt")
该段代码构造了一个复合型数据结构:(姓名,(成绩,1)),与第一题第(6)问相同。
4、运行教材P86第四节中的三个综合实例,对每个Python程序要给出适当的注释。
版权归原作者 DeeSzeChyi 所有, 如有侵权,请联系我们删除。