Spark算子--Scala版本
第1关 Spark算子--Scala版本
编程要求
根据提示,在右侧编辑器
begin-end
处补充代码,输出每个元素及其长度并去重。
测试说明
平台会对你编写的代码进行测试:
预期输出:
(an,2)` `(dog,3)` `(cat,3)
开始你的任务吧,祝你成功!
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder1 {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder1").setMaster("local")
val sc=new SparkContext(conf)
val rdd = sc.parallelize(List("dog","an","cat","an","cat"))
/********** Begin **********/
//第一步:通过获取rdd中每个元素的长度创建新的rdd1
val rdd1=rdd.map(x=>x.length)
//第二步:通过zip把rdd1和rdd组合创建rdd2
val rdd2=rdd.zip(rdd1)
//第三步:去重
val rdd3=rdd2.distinct()
//第四步:输出结果
rdd3.foreach(println)
/********** End **********/
sc.stop()
}
}
第2关:转换算子之flatMap和filter算子
编程要求
根据提示,在右侧编辑器
begin-end
处补充代码,输出个数大于一的单词。
测试说明
平台会对你编写的代码进行测试:
所给文件内容如下:
hello,world,hello,sparkgood,nice,good,do
预期输出:
(hello,2)
(good,2)
开始你的任务吧,祝你成功!
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder2 {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder2").setMaster("local")
val sc=new SparkContext(conf)
val rdd=sc.textFile("file:///root/step3_fils")
/********** Begin **********/
//对所给数据创建的rdd切割分词
val rdd1=rdd.flatMap(t=>t.split(","))
//每个单词计数为1
val rdd2= rdd1.map(t=>(t,1))
//对相同单词个数进行累加
val rdd3=rdd2.reduceByKey(_+_)
//过滤出单词个数大于一个的
val rdd4= rdd3.filter(t=>t._2>1)
//输出结果
rdd4.foreach(println)
/********** End **********/
sc.stop()
}
}
第3关:转换算子之reduceBykey和mapValues算子
编程要求
根据提示,在右侧编辑器
begin-end
处补充代码,某商店上午卖出
10
本 spark 书籍,每本
50
元,
4
本 Hadoop 书籍,每本
40
元,下午卖出
20
本 spark 书籍,每本
40
元,
10
本 Hadoop 书籍,每本
30
元。
现要求求出这两本书这一天销售的平均价格。
数据如下:
spark,10,50spark,40,25hadoop,5,40hadoop,10,25
测试说明
平台会对你编写的代码进行测试:
预期输出:
(spark,30)` `(hadoop,30)
开始你的任务吧,祝你成功!
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder3 {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder3").setMaster("local")
val sc=new SparkContext(conf)
/********** Begin **********/
//通过给定数据通过序列化方式创建rdd
val rdd = sc.parallelize(List(("spark",(10,50)),("hadoop",(5,40)),("hadoop",(10,25)),("spark",(40,25))))
//求出一天收入总和以及出售本数
val rdd2 = rdd.reduceByKey((x,y) => ((x._1*x._2)+(y._1*y._2), x._1+y._1))
//求出每本平均售价
val rdd3 = rdd2.mapValues(x => x._1 / x._2)
//输出结果
rdd3.foreach(println)
/********** End **********/
sc.stop
}
}
第4关:转化算子之groupByKey和sortByKey
编程要求
根据提示,在右侧编辑器
begin-end
处补充代码,对每人所学书籍本数分组并排序输出。
测试说明
平台会对你编写的代码进行测试:
所给数据说明:
("Bob","spark")
Bob
:人名
spark
:所学书籍
预期输出:
(Bob,3)(Candy,1)(Lily,1)
开始你的任务吧,祝你成功!
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder4 {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder4").setMaster("local")
val sc=new SparkContext(conf)
val rdd = sc.parallelize(List(("Bob","spark"),("Lily","hadoop"),("Candy","hive"),("Bob","hbase"),("Bob","hive")))
/********** Begin **********/
//根据姓名对所学书籍分组
val rdd1= rdd.groupByKey()
//求出每个人的书籍本数
val rdd2= rdd1.mapValues(t=>t.toList.size)
//根据姓名排序
val rdd3= rdd2.sortByKey()
//打印结果
rdd3.foreach(println)
/********** End **********/
sc.stop()
}
}
第5关:常见行动算子
编程要求
根据提示,在右侧编辑器
begin-end
处补充代码输出正确答案。
测试说明
预期输出:
4
dog
sun
an
dogsunancat
dog
sun
an
cat
开始你的任务吧,祝你成功!
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder5 {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder5").setMaster("local")
val sc=new SparkContext(conf)
val rdd = sc.parallelize(List("dog","sun","an","cat"))
/********** Begin **********/
//返回所给rdd的元素个数并输出
val r1=rdd.count()
println(r1)
//返回rdd的前三个元素并输出
val rdd1=rdd.take(3)
rdd1.foreach(println)
//累加rdd的所有元素并输出结果
val r2=rdd.reduce(_+_)
println(r2)
//收集所有元素并且输出
rdd.collect().foreach(println)
/********** End **********/
sc.stop()
}
}
第6关:算子的综合使用案例
编程要求
有一份数据格式如下的文档:
日期,姓名,app,下载渠道,地区,版本号
2017-08-14,Lily,Facebook,360 Shop,NewYork,v1.0 2017-08-14,Bob,Facebook,Amazon Appstore,NewYork,v1.2 2017-08-14,Lily,Facebook,360 Shop,Washington,v1.2 2017-08-14,Lily,Facebook,Google Play Store,Washington,v2.0 2017-08-14,Candy,YouTube,app store,Chicago,v1.8 2017-08-14,Lily,Facebook,Google Play Store,Washington,v2.0 2017-08-14,Candy,YouTube,app store,Chicago,v1.9 2017-08-15,Candy,YouTube,app store,Chicago,v2.0 2017-08-15,Candy,YouTube,app store,Chicago,v2.3 2017-08-15,Lily,Facebook,360 Shop,NewYork,v2.0 2017-08-15,Bob,Facebook,Amazon Appstore,NewYork,v1.2 2017-08-15,Bob,Facebook,Amazon Appstore,NewYork,v1.5 2017-08-15,Candy,YouTube,app store,Chicago,v2.9
需求: 不考虑地区,列出版本升级情况。
结果格式: 日期,姓名,app,下载渠道,升级前版本,升级后版本。
例: 数据:
2017-08-14,Lily,Facebook,360 Shop,NewYork,v1.0 2017-08-14,Lily,Facebook,360 Shop,Washington,v1.2 2017-08-14,Lily,Facebook,360 Shop,NewYork,v2.0
结果:
(2017-08-14,Lily,Facebook,360 Shop,v1.0,v1.2) (2017-08-14,Lily,Facebook,360 Shop,v1.2,v2.0)
测试说明
本实训目前是基于
Spark
单机模式的运行方式,完成整个评测流程所需时间较长,请耐心等待!
开始你的任务吧,祝你成功!
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object EduCoder {
def main(args: Array[String]): Unit = {
val conf =new SparkConf().setAppName("educoder").setMaster("local")
val sc=new SparkContext(conf)
val line=sc.textFile("file:///root/step1_fils")
/********** Begin **********/
//根据需求,去除城市字段
val rdd1 = line.map(t => {
val arr = t.split(",")
((arr(0), arr(1), arr(2), arr(3)), arr(5))
})
//按key分组,key是除城市字段和版本号字段``以外的所有字段,value是版本号
val rdd2=rdd1.groupByKey()
//过滤版本号重复的``(例:(v2.0,v2.0))以及版本号只有一个的(例(v1.0))
val rdd3=rdd2.mapValues(t=>t.toList.distinct).filter(t=>t._2.length>1)
//拆分重新组合 例:(key,(v2.0,v2.5,v3.0))拆分成(key,(v2.0,v2.5),((key,(v2.5,v3.0)))
val rdd4= rdd3.mapValues(t => {
val tai = t.tail
t.zip(tai)
})
//按需求整理输出格式(例:(2017-08-14,Lily,Facebook,360 Shop,v1.2,v2.0))
val rdd5= rdd4.flatMap(t => {
t._2.map(tp => {
(t._1._1, t._1._2, t._1._3, t._1._4, tp._1, tp._2)
})
})
//执行foreach操作,打印出结果
rdd5.foreach(println)
/********** End **********/
sc.stop()
}
}
版权归原作者 4师傅 所有, 如有侵权,请联系我们删除。