大数据学习-Spark
1.Spark-core
1.Demo1WordCount
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}/*
RDD: 弹性的分布式数据集
*/object Demo1WordCount {def main(args: Array[String]):Unit={//1、创建Spark环境//1.1 创建配置文件对象val conf: SparkConf =new SparkConf()//1.2 指定运行的模式(local Standalone Mesos YARN)
conf.setMaster("local")//可以执行所运行需要核数资源local[2],不指定的话默认使用所有的资源执行程序//1.3 给spark作业起一个名字
conf.setAppName("wc")//2、创建spark运行时的上下文对象val sparkContext: SparkContext =new SparkContext(conf)//3、读取文件数据val wordsLine: RDD[String]= sparkContext.textFile("spark/data/words.txt")//4、每一行根据|分隔符进行切分val words: RDD[String]= wordsLine.flatMap(_.split("\\|"))val wordsTuple2: RDD[(String,Int)]= words.map((_,1))val wordsTuple2Group: RDD[(String, Iterable[(String,Int)])]= wordsTuple2.groupBy(_._1)val wordCount: RDD[(String,Int)]= wordsTuple2Group.map((kv:(String, Iterable[(String,Int)]))=>(kv._1, kv._2.size))
wordCount.saveAsTextFile("spark/data/word_count")//将最终的结果保存在本地目录}}
2.Demo2Partition
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD
/**
* spark的运行过程中如果出现了相同的键被拉取到对应的分区,这个过程称之为shuffle
* 注:spark的shuffle和mapreduce的shuffle原理是一样,都是要进行落盘
*
* RDD: 弹性的分布式数据集
* 弹性:RDD将来在计算的时候,其中的数据可以是很大,也可以是很小
* 分布式:数据可以分布在多台服务器中,RDD中的分区来自于block块,而今后的block块会来自不同的datanode
* 数据集:RDD自身是不存储数据的,只是一个代码计算逻辑,今后触发作业执行的时候,数据会在RDD之间流动
*
*/object Demo2Partition {def main(args: Array[String]):Unit={//1、创建Spark环境//1.1 创建配置文件对象val conf: SparkConf =new SparkConf()//1.2 指定运行的模式(local Standalone Mesos YARN)
conf.setMaster("local")//可以执行所运行需要核数资源local[2],不指定的话默认使用所有的资源执行程序//1.3 给spark作业起一个名字
conf.setAppName("wc")//2、创建spark运行时的上下文对象val sparkContext: SparkContext =new SparkContext(conf)//3、读取文件数据// val wordsLine: RDD[String] = sparkContext.textFile("spark/data/ws/*", minPartitions = 7)val wordsLine: RDD[String]= sparkContext.textFile("spark/data/ws/*")
println(s"wordsLineRDD分区数是:${wordsLine.getNumPartitions}")//4、每一行根据|分隔符进行切分val words: RDD[String]= wordsLine.flatMap(_.split("\\|"))
println(s"wordsRDD分区数是:${words.getNumPartitions}")val wordsTuple2: RDD[(String,Int)]= words.map((_,1))
println(s"wordsTuple2RDD分区数是:${wordsTuple2.getNumPartitions}")//产生shuffle的算子上可以单独设置分区数val wordsTuple2Group: RDD[(String, Iterable[(String,Int)])]= wordsTuple2.groupBy(_._1,5)
println(s"wordsTuple2GroupRDD分区数是:${wordsTuple2Group.getNumPartitions}")val wordCount: RDD[(String,Int)]= wordsTuple2Group.map((kv:(String, Iterable[(String,Int)]))=>(kv._1, kv._2.size))
println(s"wordCountRDD分区数是:${wordCount.getNumPartitions}")
wordCount.saveAsTextFile("spark/data/word_count2")}}
- RDD5大特性:(面试必会的!!)
- 1)RDD是由一些分区构成的 读取文件时有多少个block块,RDD中就会有多少个分区
- 注:默认情况下,所有的RDD中的分区数是一样的,无论是shuffle之前还是shuffle之后的,在最开始加载数据的时候决定的
- 2)函数实际上是作用在RDD中的分区上的,一个分区是由一个task处理,有多少个分区,总共就有多少个task
- 注:函数在spark中称之为算子(转换transformation算子 RDD–>RDD,行动action算子 RDD->Other数据类型)
- 3)RDD之间存在一些依赖关系,后一个RDD中的数据是依赖与前一个RDD的计算结果,数据像水流一样在RDD之间流动
- 注:
- 3.1 RDD之间有两种依赖关系
**a. 窄依赖 后一个RDD中分区数据对应前一个RDD中的一个分区数据 1对1的关系**
**b. 宽依赖 后一个RDD中分区数据来自于前一个RDD中的多个分区数据 1对多的关系 shuffle**
**3.2 因为有了依赖关系,将整个作业划分了一个一个stage阶段 sumNum(stage) = Num(宽依赖) + 1**
**3.3 窄依赖的分区数是不可以改变,取决于第一个RDD分区数,宽依赖可以在产生shuffle的算子上设置分区数**
- 4)分区类的算子只能作用在kv格式的RDD上,groupByKey reduceByKey
- 5)spark为task计算提供了精确的计算位置,移动计算而不移动数据
3.Demo3Map
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}/*
map算子:转换算子
一个spark作业,由最后的一个action算子来触发执行的,若没有action算子,整个作业不执行
RDD具有懒执行的特点
*/object Demo3Map {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")/**
* map算子:将rdd中的数据,一条一条的取出来传入到map函数中,map会返回一个新的rdd,map不会改变总数据条数
*/val splitRDD: RDD[List[String]]= studentRDD.map((s:String)=>{
println("============数加防伪码================")
s.split(",").toList
})// splitRDD.foreach(println)}}
4.Demo4Filter
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD
object Demo4Filter {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")/**
* filter: 过滤,将RDD中的数据一条一条取出传递给filter后面的函数,如果函数的结果是true,该条数据就保留,否则丢弃
*
* filter一般情况下会减少数据的条数
*/val filterRDD: RDD[String]= studentRDD.filter((s:String)=>{val strings: Array[String]= s.split(",")"男".equals(strings(3))})
filterRDD.foreach(println)}}
5.Demo5flatMap
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo5flatMap {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("flatMap算子演示")val context =new SparkContext(conf)//====================================================val linesRDD: RDD[String]= context.textFile("spark/data/words.txt")/**
* flatMap算子:将RDD中的数据一条一条的取出传递给后面的函数,函数的返回值必须是一个集合。最后会将集合展开构成一个新的RDD
*/val wordsRDD: RDD[String]= linesRDD.flatMap((line:String)=> line.split("\\|"))
wordsRDD.foreach(println)}}
6.Demo6sample
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo6sample {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("flatMap算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")/**
* sample算子:从前一个RDD的数据中抽样一部分数据
*
* 抽取的比例不是正好对应的,在抽取的比例上下浮动 比如1000条抽取10% 抽取的结果在100条左右
*/val sampleRDD: RDD[String]= studentRDD.sample(withReplacement =true,0.1)
sampleRDD.foreach(println)}}
7.Demo7GroupBy
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD
object Demo7GroupBy {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("groupBy算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")val splitRDD: RDD[Array[String]]= studentRDD.map((s:String)=> s.split(","))//需求:求出每个班级平均年龄//使用模式匹配的方式取出班级和年龄val clazzWithAgeRDD: RDD[(String,Int)]= splitRDD.map {case Array(_, _, age:String, _, clazz:String)=>(clazz, age.toInt)}/**
* groupBy:按照指定的字段进行分组,返回的是一个键是分组字段,值是一个存放原本数据的迭代器的键值对 返回的是kv格式的RDD
*
* key: 是分组字段
* value: 是spark中的迭代器
* 迭代器中的数据,不是完全被加载到内存中计算,迭代器只能迭代一次
*
* groupBy会产生shuffle
*///按照班级进行分组//val stringToStudents: Map[String, List[Student]] = stuList.groupBy((s: Student) => s.clazz)val kvRDD: RDD[(String, Iterable[(String,Int)])]= clazzWithAgeRDD.groupBy(_._1)val clazzAvgAgeRDD: RDD[(String,Double)]= kvRDD.map {case(clazz:String, itr: Iterable[(String,Int)])=>//CompactBuffer((理科二班,21), (理科二班,23), (理科二班,21), (理科二班,23), (理科二班,21), (理科二班,21), (理科二班,24))//CompactBuffer(21,23,21,23,21,21,24)val allAge: Iterable[Int]= itr.map((kv:(String,Int))=> kv._2)val avgAge:Double= allAge.sum.toDouble / allAge.size
(clazz, avgAge)}
clazzAvgAgeRDD.foreach(println)while(true){}}}
8.Demo8GroupByKey
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD
object Demo8GroupByKey {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("groupByKey算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")val splitRDD: RDD[Array[String]]= studentRDD.map((s:String)=> s.split(","))//需求:求出每个班级平均年龄//使用模式匹配的方式取出班级和年龄val clazzWithAgeRDD: RDD[(String,Int)]= splitRDD.map {case Array(_, _, age:String, _, clazz:String)=>(clazz, age.toInt)}/**
* groupByKey: 按照键进行分组,将value值构成迭代器返回
* 将来你在spark中看到RDD[(xx, xxx)] 这样的RDD就是kv键值对类型的RDD
* 只有kv类型键值对RDD才可以调用groupByKey算子
*
*/val kvRDD: RDD[(String, Iterable[Int])]= clazzWithAgeRDD.groupByKey()val clazzAvgAgeRDD: RDD[(String,Double)]= kvRDD.map {case(clazz:String, ageItr: Iterable[Int])=>(clazz, ageItr.sum.toDouble / ageItr.size)}
clazzAvgAgeRDD.foreach(println)while(true){}}}
groupBy与groupByKey的区别(spark的面试题)
*1、代码上的区别:任意一个RDD都可以调用groupBy算子,只有kv类型的RDD才可以调用groupByKey
*2、groupByKey之后产生的RDD的结构比较简单,方便后续处理
*3、groupByKey的性能更好,执行速度更快,因为groupByKey相比较与groupBy算子来说,shuffle所需要的数据量较少
9.Demo9ReduceByKey
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD
object Demo9ReduceByKey {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("reduceByKey算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")val splitRDD: RDD[Array[String]]= studentRDD.map((s:String)=> s.split(","))//求每个班级的人数val clazzKVRDD: RDD[(String,Int)]= splitRDD.map {case Array(_, _, _, _, clazz:String)=>(clazz,1)}/**
* 利用groupByKey实现
*/// val kvRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()// val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {// case (clazz: String, n: Iterable[Int]) =>// (clazz, n.sum)// }// clazzAvgAgeRDD.foreach(println)/**
* 利用reduceByKey实现:按照键key对value值直接进行聚合,需要传入聚合的方式
* reduceByKey算子也是只有kv类型的RDD才能调用
*
*
*/val countRDD: RDD[(String,Int)]= clazzKVRDD.reduceByKey((x:Int, y:Int)=> x + y)
countRDD.foreach(println)// clazzKVRDD.groupByKey()// .map(kv=>(kv._1,kv._2.sum))// .foreach(println)while(true){}/**
*/}}
- reduceByKey与groupByKey的区别- 1、reduceByKey比groupByKey在map端多了一个预聚合的操作,预聚合之后的shuffle数据量肯定是要少很多的,性能上比groupByKey要好- 2、从灵活角度来看,reduceByKey并没有groupByKey灵活- 比如reduceByKey无法做方差,groupByKey后续可以完成
10.Demo10Union
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo10Union {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("Union算子演示")val context =new SparkContext(conf)//====================================================val w1RDD: RDD[String]= context.textFile("spark/data/ws/w1.txt")// 1val w2RDD: RDD[String]= context.textFile("spark/data/ws/w2.txt")// 1/**
* union:上下合并两个RDD,前提是两个RDD中的数据类型要一致,合并后不会对结果进行去重
*
* 注:这里的合并只是逻辑层面上的合并,物理层面其实是没有合并
*/val unionRDD: RDD[String]= w1RDD.union(w2RDD)
println(unionRDD.getNumPartitions)// 2
unionRDD.foreach(println)while(true){}}}
11.Demo11Join
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo11Join {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("Join算子演示")val context =new SparkContext(conf)//====================================================//两个kv类型的RDD之间的关联//通过scala中的集合构建RDDval rdd1: RDD[(String,String)]= context.parallelize(
List(("1001","尚平"),("1002","丁义杰"),("1003","徐昊宇"),("1004","包旭"),("1005","朱大牛"),("1006","汪权")))val rdd2: RDD[(String,String)]= context.parallelize(
List(("1001","崩坏"),("1002","原神"),("1003","王者"),("1004","修仙"),("1005","学习"),("1007","敲代码")))/**
* 内连接:join
* 左连接:leftJoin
* 右连接:rightJoin
* 全连接:fullJoin
*///内连接// val innerJoinRDD: RDD[(String, (String, String))] = rdd1.join(rdd2)// //加工一下RDD// val innerJoinRDD2: RDD[(String, String, String)] = innerJoinRDD.map {// case (id: String, (name: String, like: String)) => (id, name, like)// }// innerJoinRDD2.foreach(println)//左连接val leftJoinRDD: RDD[(String,(String, Option[String]))]= rdd1.leftOuterJoin(rdd2)//加工一下RDDval leftJoinRDD2: RDD[(String,String,String)]= leftJoinRDD.map {case(id:String,(name:String, Some(like)))=>(id, name, like)case(id:String,(name:String, None))=>(id, name,"无爱好")}
leftJoinRDD2.foreach(println)
println("=================================")//右连接自己试//TODO:自己试右连接//全连接val fullJoinRDD: RDD[(String,(Option[String], Option[String]))]= rdd1.fullOuterJoin(rdd2)//加工一下RDDval fullJoinRDD2: RDD[(String,String,String)]= fullJoinRDD.map {case(id:String,(Some(name), Some(like)))=>(id, name, like)case(id:String,(Some(name), None))=>(id, name,"无爱好")case(id:String,(None, Some(like)))=>(id,"无姓名", like)}
fullJoinRDD2.foreach(println)}}
12.Demo12Student
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo12Student {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("Join算子演示")val context =new SparkContext(conf)//====================================================//需求:统计总分年级排名前10的学生的各科分数//读取分数文件数据val scoreRDD: RDD[(String,String,String)]= context.textFile("spark/data/score.txt")// 读取数据文件.map((s:String)=> s.split(","))// 切分数据.filter((arr: Array[String])=> arr.length ==3)// 过滤掉脏数据.map {//整理数据,进行模式匹配取出数据case Array(sid:String, subject_id:String, score:String)=>(sid, subject_id, score)}//计算每个学生的总分val sumScoreWithSidRDD: RDD[(String,Int)]= scoreRDD.map {case(sid:String, _:String, score:String)=>(sid, score.toInt)}.reduceByKey((x:Int, y:Int)=> x + y)//按照总分排序val sumScoreTop10: Array[(String,Int)]= sumScoreWithSidRDD.sortBy(-_._2).take(10)//取出前10的学生学号val ids: Array[String]= sumScoreTop10.map(_._1)//取出每个学生各科分数val top10StuScore: RDD[(String,String,String)]= scoreRDD.filter {case(id:String, _, _)=> ids.contains(id)}
top10StuScore.foreach(println)}}
13.Demo13MapValues
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD
object Demo13MapValues {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("Join算子演示")val context =new SparkContext(conf)//====================================================//需求:统计总分年级排名前10的学生的各科分数//读取分数文件数据val scoreRDD: RDD[(String,String,String)]= context.textFile("spark/data/score.txt")// 读取数据文件.map((s:String)=> s.split(","))// 切分数据.filter((arr: Array[String])=> arr.length ==3)// 过滤掉脏数据.map {//整理数据,进行模式匹配取出数据case Array(sid:String, subject_id:String, score:String)=>(sid, subject_id, score)}//计算每个学生的总分val sumScoreWithSidRDD: RDD[(String,Int)]= scoreRDD.map {case(sid:String, _:String, score:String)=>(sid, score.toInt)}.reduceByKey((x:Int, y:Int)=> x + y)/**
* mapValues算子:也是作用在kv类型的RDD上
* 主要的作用键不变,处理值
*/val resRDD: RDD[(String,Int)]= sumScoreWithSidRDD.mapValues(_ +1000)
resRDD.foreach(println)//等同于val res2RDD: RDD[(String,Int)]= sumScoreWithSidRDD.map((kv:(String,Int))=>(kv._1, kv._2 +1000))}}
14.Demo14mapPartition
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD
object Demo14mapPartition {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("mapPartition算子演示")val context =new SparkContext(conf)//====================================================//需求:统计总分年级排名前10的学生的各科分数//读取分数文件数据val scoreRDD: RDD[String]= context.textFile("spark/data/ws/*")// 读取数据文件
println(scoreRDD.getNumPartitions)/**
* mapPartition: 主要作用是一次处理一个分区的数据,将一个分区的数据一个一个传给后面的函数进行处理
*
* 迭代器中存放的是一个分区的数据
*/// val mapPartitionRDD: RDD[String] = scoreRDD.mapPartitions((itr: Iterator[String]) => {//// println(s"====================当前处理的分区====================")// //这里写的逻辑是作用在一个分区上的所有数据// val words: Iterator[String] = itr.flatMap(_.split("\\|"))// words// })// mapPartitionRDD.foreach(println)
scoreRDD.mapPartitionsWithIndex{case(index:Int,itr: Iterator[String])=>
println(s"当前所处理的分区编号是:${index}")
itr.flatMap(_.split("\\|"))}.foreach(println)}}
15.Demo15Actions
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo15Actions {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("Action算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")/**
* 转换算子:transformation 将一个RDD转换成另一个RDD,转换算子是懒执行的,需要一个action算子触发执行
*
* 行动算子(操作算子):action算子,触发任务执行。一个action算子就会触发一次任务执行
*/
println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")val studentsRDD: RDD[(String,String,String,String,String)]= studentRDD.map(_.split(",")).map {case Array(id:String, name:String, age:String, gender:String, clazz:String)=>
println("**************************** 数加防伪码 ^_^ ********************************")(id, name, age, gender, clazz)}
println("$$$$$$$$$$$$$$$$$$$$$$***__***$$$$$$$$$$$$$$$$$$$$$$$$$")// foreach其实就是一个action算子// studentsRDD.foreach(println)// println("="*100)// studentsRDD.foreach(println)// while (true){//// }/**
* collect()行动算子 主要作用是将RDD转成scala中的数据结构
*
*/val tuples: Array[(String,String,String,String,String)]= studentsRDD.collect()}}
16.Demo16Catch
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD
importorg.apache.spark.storage.StorageLevel
object Demo16Catch {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("Action算子演示")val context =new SparkContext(conf)//设置checkpoint路径,将来对应的是HDFS上的路径
context.setCheckpointDir("spark/data/checkpoint")//====================================================val linesRDD: RDD[String]= context.textFile("spark/data/students.csv")val splitRDD: RDD[Array[String]]= linesRDD.map(_.split(","))//处理数据val studentsRDD: RDD[(String,String,String,String,String)]= splitRDD.map {case Array(id:String, name:String, age:String, gender:String, clazz:String)=>(id, name, age, gender, clazz)}//对studentsRDD进行缓存/**
* 特点带来的问题:既然叫做缓存,所以在程序运行过程中无论是只放内存还是磁盘内存一起使用,一旦程序结束,缓存数据全部丢失。
*
* spark针对上面的场景提供了一个解决方案:可以将RDD运行时的数据永久持久化在HDFS上,这个方案叫做checkpoint,需要在spark环境中设置checkpoint的路径
*/// studentsRDD.cache() //默认情况下,是将数据缓存在内存中// studentsRDD.persist(StorageLevel.MEMORY_AND_DISK)
studentsRDD.checkpoint()//统计每个班级的人数val clazzKVRDD: RDD[(String,Int)]= studentsRDD.map {case(_, _, _, _, clazz:String)=>(clazz,1)}val clazzNumRDD: RDD[(String,Int)]= clazzKVRDD.reduceByKey(_ + _)
clazzNumRDD.saveAsTextFile("spark/data/clazz_num")//统计性别的人数val genderKVRDD: RDD[(String,Int)]= studentsRDD.map {case(_, _, _, gender:String, _)=>(gender,1)}val genderNumRDD: RDD[(String,Int)]= genderKVRDD.reduceByKey(_ + _)
genderNumRDD.saveAsTextFile("spark/data/gender_num")//// while (true){//// }}}
17.Demo17SparkStandaloneSubmit
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo17SparkStandaloneSubmit {def main(args: Array[String]):Unit={val conf =new SparkConf()/**
* 如果将来在linux集群中运行,这里就不需要设置setMaster
*/// conf.setMaster("local")val sparkContext =new SparkContext(conf)val linesRDD: RDD[String]= sparkContext.parallelize(List("java,hello,world","hello,scala,spark","java,hello,spark"))val wordRDD: RDD[String]= linesRDD.flatMap(_.split(","))val wordKVRDD: RDD[(String,Int)]= wordRDD.map((_,1))val countRDD: RDD[(String,Int)]= wordKVRDD.reduceByKey(_ + _)
countRDD.foreach(println)/**
* 将项目打包放到spark集群中使用standalone模式运行
* standalone client
* spark-submit --class com.shujia.core.Demo17SparkStandaloneSubmit --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-1.0.jar 100
*
* standalone cluster
* spark-submit --class com.shujia.core.Demo17SparkStandaloneSubmit --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 --deploy-mode cluster spark-1.0.jar 100
*
*/}}
18.Demo18SparkYarnSubmit
packagecom.shujia.coreimportorg.apache.hadoop.conf.Configuration
importorg.apache.hadoop.fs.{FileSystem, Path}importorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}/**
* 因为是提交到yarn上,可以对hdfs上的数据进行读写
*/object Demo18SparkYarnSubmit {def main(args: Array[String]):Unit={val conf =new SparkConf()/**
* 提交到yarn上运行,这个参数依旧不用设置
*/// conf.setMaster("local")
conf.setAppName("yarn submit")val context =new SparkContext(conf)//读取hdfs上数据val linesRDD: RDD[String]= context.textFile("/bigdata29/data/students.csv")
println("="*100)
println(s"分区数为:${linesRDD.getNumPartitions}")
println("="*100)val classKVRDD: RDD[(String,Int)]= linesRDD.map((line:String)=>{val clazz:String= line.split(",")(4)(clazz,1)})//统计班级人数val clazzNumRDD: RDD[(String,Int)]= classKVRDD.reduceByKey(_ + _)//整理一下要写到结果文件中的数据格式val resRDD: RDD[String]= clazzNumRDD.map((kv:(String,Int))=>s"${kv._1}\t${kv._2}")//删除已经存在的路径val hadoopConf =new Configuration()val fileSystem: FileSystem = FileSystem.get(hadoopConf)//判断路径是否存在if(fileSystem.exists(new Path("/bigdata29/sparkout1"))){
fileSystem.delete(new Path("/bigdata29/sparkout1"),true)}//将RDD中的数据保存到HDFS上的文件中
resRDD.saveAsTextFile("/bigdata29/sparkout1")/**
* spark-submit --class com.shujia.core.Demo18SparkYarnSubmit --master yarn --deploy-mode client spark-1.0.jar
*/}}
19.Demo19PI
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}importscala.util.Random
object Demo19PI {def main(args: Array[String]):Unit={val conf =new SparkConf()/**
* 提交到yarn上运行,这个参数依旧不用设置
*/// conf.setMaster("local")
conf.setAppName("yarn submit")val context =new SparkContext(conf)//设置生成点的个数 10000val list: Range.Inclusive =0 to 1000000000//将scala的序列集合变成rddval rangeRDD: RDD[Int]= context.parallelize(list)//随机生成正方形内的点val dianRDD: RDD[(Double,Double)]= rangeRDD.map((i:Int)=>{val x:Double= Random.nextDouble()*2-1val y:Double= Random.nextDouble()*2-1(x, y)})// println(dianRDD.count())//取出圆中点的个数val yuanZuoRDD: RDD[(Double,Double)]= dianRDD.filter {case(x:Double, y:Double)=>
x * x + y * y <1}// println(yuanZuoRDD.count())//计算PI
println("="*100)
println(s"PI的值为:${(yuanZuoRDD.count().toDouble / dianRDD.count())*4}")
println("="*100)/**
* spark-submit --class com.shujia.core.Demo19PI --master yarn --deploy-mode client spark-1.0.jar
*/}}
20.Demo20Accumulator
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD
importorg.apache.spark.util.LongAccumulator
object Demo20Accumulator {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("map算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")val scoreRDD: RDD[String]= context.textFile("spark/data/score.txt")// var count = 0// studentRDD.foreach((line:String)=>{// count+=1// println("-------------------------")// println(count)// println("-------------------------")// })// println(s"count的值为:${count}")/**
* 累加器
*
* 由SparkContext来创建
* 注意:
* 1、因为累加器的执行实在RDD中执行的,而RDD是在Executor中执行的,而要想在Executor中执行就得有一个action算子触发任务调度
* 2、sparkRDD中无法使用其他的RDD
* 3、SparkContext无法在RDD内部使用,因为SparkContext对象无法进行序列化,不能够通过网络发送到Executor中
*/// val accumulator: LongAccumulator = context.longAccumulator// studentRDD.foreach((line:String)=>{// accumulator.add(1)// })// studentRDD.map((line:String)=>{// accumulator.add(1)// }).collect()// println(s"accumulator的值为:${accumulator.value}")// val value: RDD[RDD[(String, String)]] = studentRDD.map((stuLine: String) => {// scoreRDD.map((scoreLine: String) => {// val strings: Array[String] = scoreLine.split(",")// val strings1: Array[String] = stuLine.split(",")// val str1: String = strings.mkString("|")// val str2: String = strings1.mkString("|")// (str1, str2)// })// })// value.foreach(println)// val value: RDD[RDD[String]] = studentRDD.map((stuLine: String) => {// val scoreRDD: RDD[String] = context.textFile("spark/data/score.txt")// scoreRDD// })// value.foreach(println)}}
21.Demo21Broadcast
packagecom.shujia.coreimportorg.apache.spark.broadcast.Broadcast
importorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}importscala.io.Source
object Demo21Broadcast {def main(args: Array[String]):Unit={val conf =new SparkConf()
conf.setMaster("local")
conf.setAppName("广播变量演示")val context =new SparkContext(conf)//====================================================//使用Scala的方式读取学生数据文件,将其转换以学号作为键的map集合,属于在Driver端的一个变量val studentsMap: Map[String,String]= Source.fromFile("spark/data/students.csv").getLines().toList
.map((line:String)=>{val infos: Array[String]= line.split(",")val stuInfo:String= infos.mkString(",")(infos(0), stuInfo)}).toMap
val scoresRDD: RDD[String]= context.textFile("spark/data/score.txt")/**
* 将studentsMap变成一个广播变量,让每一个将来需要执行关联的Executor中都有一份studentsMap数据
* 避免了每次Task任务拉取都要附带一个副本,拉取的速度变快了,执行速度也就变快了
*
* 广播大变量
*/val studentsMapBroadcast: Broadcast[Map[String,String]]= context.broadcast(studentsMap)/**
* 将Spark读取的分数RDD与外部变量学生Map集合进行关联
* 循环遍历scoresRDD,将学号一样的学生信息关联起来
*/// val resMapRDD: RDD[(String, String)] = scoresRDD.map((score: String) => {// val id: String = score.split(",")(0)// //使用学号到学生map集合中获取学生信息// val studentInfo: String = studentsMap.getOrElse(id, "无学生信息")// (score, studentInfo)// })// resMapRDD.foreach(println)/**
* 使用广播变量进行关联
*/val resMapRDD: RDD[(String,String)]= scoresRDD.map((score:String)=>{val id:String= score.split(",")(0)val stuMap: Map[String,String]= studentsMapBroadcast.value
//使用学号到学生map集合中获取学生信息val studentInfo:String= stuMap.getOrElse(id,"无学生信息")(score, studentInfo)})
resMapRDD.foreach(println)}}
2.Spark-sql
Demo1WordCount
packagecom.shujia.sqlimportorg.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}object Demo1WordCount {def main(args: Array[String]):Unit={/**
* 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/**
* spark sql和spark core的核心数据类型不太一样
*
* 1、读取数据构建一个DataFrame,相当于一张表
*/val linesDF: DataFrame = sparkSession.read
.format("csv")//指定读取数据的格式.schema("line STRING")//指定列的名和列的类型,多个列之间使用,分割.option("sep","\n")//指定分割符,csv格式读取默认是英文逗号.load("spark/data/words.txt")// 指定要读取数据的位置,可以使用相对路径// println(linesDF)// linesDF.show() //查看DF中的数据内容(表内容)// linesDF.printSchema() //查看DF表结构/**
* 2、DF本身是无法直接在上面写sql的,需要将DF注册成一个视图,才可以写sql数据分析
*/
linesDF.createOrReplaceTempView("lines")// 起一个表名,后面的sql语句可以做查询分析/**
* 3、可以编写sql语句 (统计单词的数量)
* spark sql是完全兼容hive sql
*/val resDF: DataFrame = sparkSession.sql("""
|select
|t1.word as word,
|count(1) as counts
|from
|(select
| explode(split(line,'\\|')) as word from lines) t1
| group by t1.word
|""".stripMargin)/**
* 4、将计算的结果DF保存到HDFS上
*/val resDS: Dataset[Row]= resDF.repartition(1)
resDS.write
.format("csv")//指定输出数据文件格式.option("sep","\t")// 指定列之间的分隔符.mode(SaveMode.Overwrite)// 使用SaveMode枚举类,设置为覆盖写.save("spark/data/sqlout1")// 指定输出的文件夹}}
Demo2DSLWordCount
packagecom.shujia.sqlimportorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo2DSLWordCount {def main(args: Array[String]):Unit={/**
* 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/**
* spark sql和spark core的核心数据类型不太一样
*
* 1、读取数据构建一个DataFrame,相当于一张表
*/val linesDF: DataFrame = sparkSession.read
.format("csv")//指定读取数据的格式.schema("line STRING")//指定列的名和列的类型,多个列之间使用,分割.option("sep","\n")//指定分割符,csv格式读取默认是英文逗号.load("spark/data/words.txt")// 指定要读取数据的位置,可以使用相对路径/**
* DSL: 类SQL语法 api 介于代码和纯sql之间的一种api
*
* spark在DSL语法api中,将纯sql中的函数都使用了隐式转换变成一个scala中的函数
* 如果想要在DSL语法中使用这些函数,需要导入隐式转换
*
*///导入Spark sql中所有的sql隐式转换函数importorg.apache.spark.sql.functions._
//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理importsparkSession.implicits._
// linesDF.select(explode(split($"line","\\|")) as "word")// .groupBy($"word")// .count().show()val resultDF: DataFrame = linesDF.select(explode(split($"line","\\|")) as "word").groupBy($"word").agg(count($"word") as "counts")/**
* 保存数据
*/
resultDF
.repartition(1).write
.format("csv").option("sep","\t").mode(SaveMode.Overwrite).save("spark/data/sqlout2")}}
Demo3DSLAPI
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.Window
importorg.apache.spark.sql.{DataFrame, SparkSession}object Demo3DSLAPI {def main(args: Array[String]):Unit={/**
* 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("dsl语法api演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数importorg.apache.spark.sql.functions._
//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理importsparkSession.implicits._
/**
* DSL api
*///新版本的读取方式,读取一个json数据,不需要手动指定列名// val stuDF1: DataFrame = sparkSession.read// .json("spark/data/students.json")//以前版本读取方式,灵活度要高一些val stuDF: DataFrame = sparkSession.read
.format("json").load("spark/data/students.json")// stuDF2.show(100, truncate = false) //传入展示总条数,并完全显示数据内容/**
* select 函数:选择数据【字段】,和纯sql语句中select意思基本是一样,在数据的前提上选择要留下的列
*
*///根据字段的名字选择要查询的字段// stuDF.select("id","name","age").show(1)// //根据字段的名字选择要查询的字段,selectExpr 可以传入表达式字符串形式// stuDF.selectExpr("id","name","age","age + 1 as new_age").show()//使用隐式转换中的$函数将字段变成一个对象// stuDF.select($"id",$"name",$"age").show(10)//使用对象做处理// stuDF.select($"id",$"name",$"age" + 1 as "new_age").show(10)//可以在select中使用sql的函数//下面的操作等同于sql:select id,name,age+1 as new_age,substring(clazz,0,2) as km from lines;// stuDF.select($"id",$"name",$"age" + 1 as "new_age",substring($"clazz",0,2) as "km").show(10)/**
* where 函数:过滤数据
*///直接将sql中where语句以字符串的形式传参// stuDF.where("gender='女' and age=23").show()//使用$列对象的形式过滤// =!= 不等于// === 等于// stuDF.where($"gender" === "女" and $"age" === 23).show()// stuDF.where($"gender" =!= "男" and $"age" === 23).show()//过滤文科的学生// stuDF.where(substring($"clazz", 0, 2) === "文科").show()/**
* groupBy 分组函数
* agg 聚合函数
* 分组聚合要在一起使用
* 分组聚合之后的结果DF中只会包含分组字段和聚合字段
* select中无法出现不是分组的字段
*///根据班级分组,求每个班级的人数和平均年龄// stuDF.groupBy($"clazz")// .agg(count($"clazz") as "number",round(avg($"age"),2) as "avg_age").show()/**
* orderBy: 排序
*/// stuDF.groupBy($"clazz")// .agg(count($"clazz") as "number")// .orderBy($"number").show()/**
* join: 表关联
*/val scoreDF: DataFrame = sparkSession.read
.format("csv").option("sep",",").schema("id STRING,subject_id STRING,score INT").load("spark/data/score.txt")// scoreDF.show()//关联场景1:所关联的字段名字不一样的时候// stuDF.join(scoreDF, $"id" === $"sid", "inner").show()//关联场景2:所关联的字段名字一样的时候// stuDF.join(scoreDF,"id").show()/**
* 开窗函数
* 统计每个班级总分前3的学生
*
* 开窗不会改变总条数的,会以新增一列的形式加上开窗的结果
* withColumn 新增一列
*/val joinStuAndScoreWithIDDF: DataFrame = stuDF.join(scoreDF,"id")
joinStuAndScoreWithIDDF.groupBy($"id", $"clazz")//根据学号和班级一起分组.agg(sum($"score") as "sumScore")//计算总分.withColumn("rn", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc))//.select($"id", $"clazz", $"sumScore", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc) as "rn").where($"rn"<=3).show()}}
Demo4DataSourceAPI
packagecom.shujia.sqlimportorg.apache.spark.sql.{DataFrame, SparkSession}object Demo4DataSourceAPI {def main(args: Array[String]):Unit={/**
* 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("dsl语法api演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理/**
* 读取csv格式的数据,默认是以英文逗号分割的
*
*/// val stuCsvDF: DataFrame = sparkSession.read// .format("csv")// .schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")// .option("sep", ",")// .load("spark/data/students.csv")// //求每个班级的人数,保存到文件中// stuCsvDF.groupBy($"clazz")// .agg(count($"clazz") as "number")// .write// .format("csv")// .option("sep",",")// .mode(SaveMode.Overwrite)// .save("spark/data/souceout1")/**
* 读取json数据格式,因为json数据有键值对。会自动地将键作为列名,值作为列值,不需要手动设置表结构
*/val stuJsonDF: DataFrame = sparkSession.read
.format("json").load("spark/data/students2.json")// //统计每个性别的人数// stuJsonDF.groupBy($"gender")// .agg(count($"gender") as "number")// .write// .format("json")// .mode(SaveMode.Overwrite)// .save("spark/data/jsonout1")/**
* parquet
* 压缩的比例是由【信息熵】来决定的
*/// stuJsonDF.write// .format("parquet")// .mode(SaveMode.Overwrite)// .save("spark/data/parquetout2")//读取parquet格式文件的时候,也是不需要手动指定表结构// val stuParquetDF: DataFrame = sparkSession.read// .format("parquet")// .load("spark/data/parquetout1/part-00000-c5917bb6-172b-49bd-a90c-90b7f09b69d6-c000.snappy.parquet")// stuParquetDF.show()/**
* 读取数据库中的数据,mysql
*
*/val jdDF: DataFrame = sparkSession.read
.format("jdbc").option("url","jdbc:mysql://192.168.220.100:3306").option("dbtable","bigdata29.jd_goods").option("user","root").option("password","123456").load()
jdDF.show(10,truncate =false)}}
Demo5RDDToDF
packagecom.shujia.sqlimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, Row, SparkSession}object Demo5RDDToDF {def main(args: Array[String]):Unit={/**
* 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("RDD和DF互相转换演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理,如果需要做RDD和DF之间的转换importsparkSession.implicits._
/**
* 使用SparkContext读取数据封装成RDD
*
* SparkSession包含了SparkContext
*///使用SparkSession获取SparkContextval sc: SparkContext = sparkSession.sparkContext
val linesRDD: RDD[String]= sc.textFile("spark/data/students.csv")val studentsRDD: RDD[(String,String,Int,String,String)]= linesRDD.map((line:String)=> line.split(",")).map {//1500100001,施笑槐,22,女,文科六班case Array(id:String, name:String, age:String, gender:String, clazz:String)=>(id, name, age.toInt, gender, clazz)}/**
* RDD转DF
*/val studentsDF: DataFrame = studentsRDD.toDF("id","name","age","gender","clazz")
studentsDF.createOrReplaceTempView("students")val resultDF: DataFrame = sparkSession.sql("""
|select
|clazz,
|count(1) as number
|from
|students
|group by clazz
|""".stripMargin)/**
* 在Row的数据类型中 所有整数类型统一为Long 小数类型统一为Double
* 转RDD
*/val studentsRDD2: RDD[Row]= resultDF.rdd
// studentsRDD2.map((row:Row)=>{// val clazz: String = row.getAs[String]("clazz")// val number: Long = row.getAs[Long]("number")// (clazz,number)// }).foreach(println)
studentsRDD2.map{case Row(clazz:String,number:Long)=>(clazz,number)}.foreach(println)}}
Demo6Window
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.Window
importorg.apache.spark.sql.{DataFrame, SparkSession}/**
* 开窗函数
* 聚合开窗函数:sum count avg min max
* 排序开窗函数:row_number rank desen_rank lag(向上取) lead(向后取)
*/object Demo6Window {def main(args: Array[String]):Unit={//创建SparkSession对象/**
* 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("开窗函数DSL API演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数importorg.apache.spark.sql.functions._
//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理,如果需要做RDD和DF之间的转换importsparkSession.implicits._
//学生表val studentsDF: DataFrame = sparkSession.read
.format("csv").option("sep",",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING").load("spark/data/students.csv")//成绩表val scoresDF: DataFrame = sparkSession.read
.format("csv").option("sep",",").schema("sid STRING,subject_id STRING,score INT").load("spark/data/score.txt")//科目表val subjectDF: DataFrame = sparkSession.read
.format("csv").option("sep",",").schema("subject_id STRING,subject_name STRING,subject_sum_score INT").load("spark/data/subject.csv")//将学生数据与成绩数据进行关联val joinDF: DataFrame = studentsDF.join(scoresDF, $"id"=== $"sid")// joinDF.show(10)/**
* 1、统计总分年级排名前十学生各科的分数
*
* 未排序之前,是将开窗中所有数据一起聚合得到一个结果
* 若排序了,依次从上到下聚合得到一个结果
*
*/
joinDF
// sum(score) over(partition by id ) as sumScore.withColumn("sumScore", sum($"score") over Window.partitionBy($"id")).orderBy($"sumScore".desc).limit(60)//.show(60)/**
* 3、统计每科都及格的学生
*/
scoresDF
.join(subjectDF,"subject_id").where($"score">= $"subject_sum_score"*0.6)//统计学生及格的科目数.withColumn("jiGeCounts", count($"sid") over Window.partitionBy($"sid")).where($"jiGeCounts"===6)// .show(100)/**
* 2、统计总分大于年级平均分的学生
*/
joinDF
//计算每个学生的总分,新增一列.withColumn("sumScore", sum($"score") over Window.partitionBy($"id")).withColumn("avgScore", avg($"sumScore") over Window.partitionBy(substring($"clazz",0,2))).where($"sumScore"> $"avgScore")// .show(200)/**
* 统计每个班级的每个名次之间的分数差
*
*/
joinDF
.groupBy($"id", $"clazz").agg(sum($"score") as "sumScore")//开窗,班级开窗,总分降序排序,排个名次.withColumn("rn", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc))//开窗,取出前一名的总分.withColumn("front_score", lag($"sumScore",1,750) over Window.partitionBy($"clazz").orderBy($"sumScore".desc)).withColumn("cha",$"front_score"- $"sumScore").show(100)}}
Demo7BurksTest1
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.Window
importorg.apache.spark.sql.{Column, DataFrame, SparkSession}object Demo7BurksTest1 {def main(args: Array[String]):Unit={//创建SparkSession对象/**
* 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("公司营收额数据需求演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数importorg.apache.spark.sql.functions._
//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理,如果需要做RDD和DF之间的转换importsparkSession.implicits._
//读取数据val burksDF: DataFrame = sparkSession.read
.format("csv").option("sep",",").schema("burk STRING,year STRING"+",tsl01 DOUBLE,tsl02 DOUBLE,tsl03 DOUBLE,tsl04 DOUBLE"+",tsl05 DOUBLE,tsl06 DOUBLE,tsl07 DOUBLE,tsl08 DOUBLE"+",tsl09 DOUBLE,tsl10 DOUBLE,tsl11 DOUBLE,tsl12 DOUBLE").load("spark/data/burks.txt")/**
* 1、统计每个公司每年按月累计收入 行转列 --> sum窗口函数
* 输出结果
* 公司代码,年度,月份,当月收入,累计收入
*
*///纯sql的方式实现// burksDF.createOrReplaceTempView("burks")// sparkSession.sql(// """// |select// |t1.burk as burk,// |t1.year as year,// |t1.month as month,// |t1.tsl as tsl,// |sum(t1.tsl) over(partition by t1.burk,t1.year order by t1.month) as leijia// |from// |(select// | burk,// | year,// | month,// | tsl// | from// | burks// | lateral view explode(map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) T as month,tsl// |) t1// |""".stripMargin).show()// 使用DSL API实现val m: Column = map(
expr("1"), $"tsl01",
expr("2"), $"tsl02",
expr("3"), $"tsl03",
expr("4"), $"tsl04",
expr("5"), $"tsl05",
expr("6"), $"tsl06",
expr("7"), $"tsl07",
expr("8"), $"tsl08",
expr("9"), $"tsl09",
expr("10"), $"tsl10",
expr("11"), $"tsl11",
expr("12"), $"tsl12")
burksDF
.select($"burk",$"year",explode(m) as Array("month","tsl"))//按月累计.withColumn("leijia",sum($"tsl") over Window.partitionBy($"burk",$"year").orderBy($"month")).show()/**
* 2、统计每个公司当月比上年同期增长率 行转列 --> lag窗口函数
* 公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)
*/}}
Demo8SubmitYarn
packagecom.shujia.sqlimportorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo8SubmitYarn {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").config("spark.sql.shuffer.partitions",1).getOrCreate()importsparkSession.implicits._
importorg.apache.spark.sql.functions._
val stuendsDF: DataFrame = sparkSession.read.format("csv").option("sep",",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING").load("/bigdata29/spark_in/data/student")val genderCountsDF: DataFrame = stuendsDF.groupBy($"gender").agg(count($"gender") as "counts")
genderCountsDF.write.format("csv").option("sep",",").mode(SaveMode.Overwrite).save("/bigdata29/spark_out/out2")}}
Demo9SparkOnHive
packagecom.shujia.sqlimportorg.apache.spark.sql.SparkSession
object Demo9SparkOnHive {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("spark读取hive数据").enableHiveSupport().config("spark.sql.shuffle.partitions",1).getOrCreate()importsparkSession.implicits._
importorg.apache.spark.sql.functions._
sparkSession.sql("use bigdata29")
sparkSession.sql("select clazz,count(1) as counts from students group by clazz").show()}}
Demo10Student
packagecom.shujia.sqlimportorg.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}/**
* 1、行列转换
*
* 表1
* 姓名,科目,分数
* name,item,score
* 张三,数学,33
* 张三,英语,77
* 李四,数学,66
* 李四,英语,78
*
*
* 表2
* 姓名,数学,英语
* name,math,english
* 张三,33,77
* 李四,66,78
*
* 1、将表1转化成表2
* 2、将表2转化成表1
*/object Demo10Student {def main(args: Array[String]):Unit={//创建SparkSession对象/**
* 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("行列互相转换演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数importorg.apache.spark.sql.functions._
//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理,如果需要做RDD和DF之间的转换importsparkSession.implicits._
/**
* 列转行
*/val tb1DF: DataFrame = sparkSession.read
.format("csv").option("sep",",").schema("name STRING,item STRING,score INT").load("/bigdata29/tb1.txt")/*
张三,数学,33
张三,英语,77
李四,数学,66
李四,英语,78
张三,33,77
李四,66,78
*/// val res1DF: DataFrame = tb1DF.groupBy($"name")// .agg(// sum(when($"item" === "数学", $"score").otherwise(0)) as "math",// sum(when($"item" === "英语", $"score").otherwise(0)) as "english")// res1DF.write// .format("csv")// .option("sep",",")// .mode(SaveMode.Overwrite)// .save("/bigdata29/out7")// res1DF.show()/**
* 行转列
*/// val tb2DF: DataFrame = sparkSession.read// .format("csv")// .option("sep", ",")// .schema("name STRING,math STRING,english INT")// .load("/bigdata29/out7/*")//// val m: Column = map(// expr("'数学'"), $"math",// expr("'语文'"), $"english"// )// tb2DF.select($"name",explode(m) as Array("item","score")).show()}}
Demo11UDF
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.UserDefinedFunction
importorg.apache.spark.sql.{DataFrame, SparkSession}object Demo11UDF {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("udf函数演示").config("spark.sql.shuffle.partitions",1).getOrCreate()importorg.apache.spark.sql.functions._
/**
* 1、在使用DSL的时候使用自定义函数
*/val studentsDF: DataFrame = sparkSession.read
.format("csv").option("sep",",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING").load("spark/data/students.csv")// studentsDF.show()//编写自定义函数//udf中编写的是scala代码val shujia_fun1: UserDefinedFunction = udf((str:String)=>"数加:"+ str)// studentsDF.select(shujia_fun1($"clazz")).show()/**
* 1、使用SQL语句中使用自定函数
*/
studentsDF.createOrReplaceTempView("students")//将自定义的函数变量注册成一个函数
sparkSession.udf.register("shujia_str",shujia_fun1)
sparkSession.sql("""
|select clazz,shujia_str(clazz) as new_clazz from students
|""".stripMargin).show()}}
Demo12ShuJiaStr
packagecom.shujia.sqlimportorg.apache.hadoop.hive.ql.exec.UDF
class Demo12ShuJiaStr extends UDF {def evaluate(str:String):String={"shujia: "+ str
}}/**
* 1、将类打包,放在linux中spark的jars目录下
* 2、进入spark-sql的客户端
* 3、使用上传的jar中的udf类来创建一个函数
* create function shujia_str as 'com.shujia.sql.Demo12ShuJiaStr';
*/
Demo13SheBao
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.Window
importorg.apache.spark.sql.{DataFrame, SparkSession}object Demo13SheBao {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("作业社保演示").config("spark.sql.shuffle.partitions",1).getOrCreate()importorg.apache.spark.sql.functions._
importsparkSession.implicits._
//读取数据val sheBaoDF: DataFrame = sparkSession.read
.format("csv").option("sep",",").schema("id STRING,burk STRING,sdate STRING").load("spark/data/shebao.txt")//统计每个员工的工作经历
sheBaoDF
//取出员工上一个月所在公司.withColumn("last_burk", lag($"burk",1) over Window.partitionBy($"id").orderBy($"sdate"))//.show()//在每一行后新增一列,标记列,如果换工作了,标记1 否则标记0.withColumn("flag", when($"burk"=== $"last_burk",0).otherwise(1))//.show()//以用户开窗,将后面的flag值加起来.withColumn("tmp",sum($"flag") over Window.partitionBy($"id").orderBy($"sdate"))//.show().groupBy($"id",$"burk",$"tmp").agg(min($"sdate") as "start_date",max($"sdate") as "end_start").show(100)}}
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.Window
importorg.apache.spark.sql.{DataFrame, Row, SparkSession}object Demo14MaYi {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("作业社保演示").config("spark.sql.shuffle.partitions",1).getOrCreate()importorg.apache.spark.sql.functions._
importsparkSession.implicits._
//用户碳排放量表val ant_user_low_carbon: DataFrame = sparkSession.read
.format("csv").option("sep","\t").schema("user_id STRING,data_dt STRING,low_carbon DOUBLE").load("spark/data/ant_user_low_carbon.txt")val ant_plant_carbon: DataFrame = sparkSession.read
.format("csv").option("sep","\t").schema("plant_id STRING,plant_name STRING,plant_carbon DOUBLE").load("spark/data/ant_plant_carbon.txt")// ant_user_low_carbon.show()// ant_plant_carbon.show()/**
* 蚂蚁森林植物申领统计
* 问题:假设2017年1月1日开始记录低碳数据(user_low_carbon),假设2017年10月1日之前满足申领条件的用户都申领了一颗p004-胡杨,
* 剩余的能量全部用来领取“p002-沙柳” 。
* 统计在10月1日累计申领“p002-沙柳” 排名前10的用户信息;以及他比后一名多领了几颗沙柳。
* 得到的统计结果如下表样式:
*///因为用户能量表与植物能量表没有关联字段,如果想要在sql中使用胡杨以及沙柳的能量的话,需要先单独从植物能量表将胡杨以及沙柳的能量取出来,使用变量接收val huYangCarbon:Double= ant_plant_carbon
//过滤条件是胡杨.where($"plant_name"==="胡杨")//选择能量的一列.select($"plant_carbon")//转rdd.rdd
//转scala数组.collect().map {case Row(plant_carbon:Double)=> plant_carbon
}.head
val shaLiuCarbon:Double= ant_plant_carbon
//过滤条件是胡杨.where($"plant_name"==="沙柳")//选择能量的一列.select($"plant_carbon")//转rdd.rdd
//转scala数组.collect().map {case Row(plant_carbon:Double)=> plant_carbon
}.head
println(s"胡杨所需的碳排放量:${huYangCarbon},沙柳所需要的碳排放量:${shaLiuCarbon}")
println("---------------------------------------------------------------------------")
ant_user_low_carbon
//取出2017年1月1日到2017年10月1日.where($"data_dt">="2017/1/1" and $"data_dt"<="2017/10/1")//按照用户分组,求用户的总的排放量.groupBy($"user_id").agg(sum($"low_carbon") as "sum_low_carbon")//判断能量是否满足一个胡杨的能量,如果满足直接减去胡杨的能量,得到剩余的能量.withColumn("shengYu_carbon", when($"sum_low_carbon">= huYangCarbon, $"sum_low_carbon"- huYangCarbon).otherwise($"sum_low_carbon"))//计算剩余的能量可以领取多少个沙柳.withColumn("number", floor($"shengYu_carbon"/ shaLiuCarbon))//.show()//获取后一名用户的领取沙柳的个数.withColumn("lead_number", lead($"number",1,0) over Window.orderBy($"number".desc))//.show().withColumn("duo", $"number"- $"lead_number").limit(10)//.show()/**
* 蚂蚁森林低碳用户排名分析
* 问题:查询user_low_carbon表中每日流水记录,条件为:
* 用户在2017年,连续三天(或以上)的天数里,
* 每天减少碳排放(low_carbon)都超过100g的用户低碳流水。
* 需要查询返回满足以上条件的user_low_carbon表中的记录流水。
* 例如用户u_002符合条件的记录如下,因为2017/1/2~2017/1/5连续四天的碳排放量之和都大于等于100g:
*///("user_id STRING,data_dt STRING,low_carbon DOUBLE")
ant_user_low_carbon
//过滤出2017年的数据.where(substring($"data_dt",0,4)==="2017")//计算用户每天积攒的能量 用户id和日期一起分组.groupBy($"user_id", $"data_dt").agg(sum($"low_carbon") as "sum_low_carbon")//过滤掉能量小于等于100的.where($"sum_low_carbon">100)//开窗有,以用户开窗,以日期升序排序,编号 row_number.withColumn("rn", row_number() over Window.partitionBy($"user_id").orderBy($"data_dt"))//使用日期减去排序.withColumn("flag_dt", date_sub(regexp_replace($"data_dt","/","-"), $"rn"))//计算连续的天数.withColumn("lianxu_days", count($"data_dt") over Window.partitionBy($"user_id", $"flag_dt"))//过滤出连续3天以上的数据.where($"lianxu_days">=3)//关联流水记录表,得到每个符合条件的数据.join(ant_user_low_carbon,List("user_id","data_dt")).select($"user_id",$"data_dt",$"low_carbon").show(100)}}
Test
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.UserDefinedFunction
importorg.apache.spark.sql.{DataFrame, SparkSession}object Test {def main(args: Array[String]):Unit={//1、创建Spark sql环境val spark: SparkSession = SparkSession
.builder().master("local[2]").appName("sql").config("spark.sql.shuffle.partitions",1)//默认在集群中时200个.getOrCreate()importorg.apache.spark.sql.functions._
//定义UDFval str_split: UserDefinedFunction = udf((line:String)=>{"数加:"+line
})// val value2: Broadcast[UserDefinedFunction] = spark.sparkContext.broadcast(str_split)// spark.udf.register("str_split", str_split)/**
* 1、在使用DSL的时候使用自定义函数
*/val studentsDF: DataFrame = spark.read
.format("csv").option("sep",",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING").load("spark/data/students.csv")//在使用DSL时,使用自定义函数// studentsDF.select(str_split($"clazz")).show()
studentsDF.createOrReplaceTempView("lines")
spark.udf.register("str_split",(line:String)=>"数加:"+line)
spark.sql("""
|select str_split(clazz) from lines
|""".stripMargin).show()/**
* 在 sql中使用自定义函数
*/// studentsDF.createOrReplaceTempView("lines")//// //注册自定义函数// spark.udf.register("str_split", (line: String) => "数加:"+line)//// spark.sql(// """// |select * from// |lines// |lateral view explode(str_split(line,',')) T as word// |// |""".stripMargin).show()}}
3.Spark-sql
Demo1WordCount
packagecom.shujia.streamingimportorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo1WordCount {def main(args: Array[String]):Unit={/**
* spark core: SparkContext --> 核心数据类型 RDD
* spark sql: SparkSession --> 核心数据类型 DataFrame
* spark streaming: StreamingContext --> 核心数据类型 DStream(RDD)
*
* 创建SparkStreaming的环境需要使用StreamingContext对象
*/val conf =new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("spark streaming 单词统计案例")val sparkContext =new SparkContext(conf)val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))val lineDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)val flatMapDS: DStream[String]= lineDS.flatMap(_.split(" "))val wordsKVDS: DStream[(String,Int)]= flatMapDS.map((_,1))val resultDS: DStream[(String,Int)]= wordsKVDS.reduceByKey((x:Int, y:Int)=> x + y)
resultDS.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()}}
Demo2UpdateStateByKey
packagecom.shujia.streamingimportorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo2UpdateStateByKey {def main(args: Array[String]):Unit={/**
* spark core: SparkContext --> 核心数据类型 RDD
* spark sql: SparkSession --> 核心数据类型 DataFrame
* spark streaming: StreamingContext --> 核心数据类型 DStream(RDD)
*
* 创建SparkStreaming的环境需要使用StreamingContext对象
*///spark streaming依赖于spark core的环境//因为spark streaming核心数据类型底层封装的是RDDval conf =new SparkConf()
conf.setMaster("local[2]")
conf.setAppName("spark streaming 单词统计案例1")val sparkContext =new SparkContext(conf)// def this(sparkContext: SparkContext, batchDuration: Duration) //传入一个spark core 环境对象以及一个接收时间的范围val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))//设置checkpoint路径
streamingContext.checkpoint("spark/data/stream_state_wc_checkpoint")/**
* 因为sparkstreaming是一个近实时的计算引擎,整个程序写完运行的状态,一直运行,一直接收数据,除非异常或者手动停止
*
* 1、目前在spark使用nc工具监控一个端口号中源源不断产生的数据
* 2、yum install -y nc 下载安装nc工具
* 3、nc -lk xxxx
*/val lineDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)val flatMapDS: DStream[String]= lineDS.flatMap(_.split(" "))val wordsKVDS: DStream[(String,Int)]= flatMapDS.map((_,1))val res2DS: DStream[(String,Int)]= wordsKVDS.updateStateByKey((seq: Seq[Int], option: Option[Int])=>{val currentCount:Int= seq.sum
val count:Int= option.getOrElse(0)
Option(currentCount + count)})
res2DS.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()}}
Demo3ReduceByKeyAndWindow
packagecom.shujia.streamingimportorg.apache.spark.sql.SparkSession
importorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo3ReduceByKeyAndWindow {def main(args: Array[String]):Unit={/**
* 旧版本创建sparkContext的方式
*/// val conf = new SparkConf()// conf.setMaster("local[2]")// conf.setAppName("spark streaming 单词统计案例1")// val sparkContext = new SparkContext(conf)// // def this(sparkContext: SparkContext, batchDuration: Duration) //传入一个spark core 环境对象以及一个接收时间的范围// val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))/**
* 新版本中推荐使用SparkSession对象来获取
*/val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("窗口演示").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
//创建SparkStreaming对象val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))val linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)//将数据封装成kv格式,将来可以调用kv类型的算子来进行操作val wordsKVDS: DStream[(String,Int)]= linesDS.flatMap(_.split(" ")).map((_,1))//1、如果只需要计算当前批次的数据,直接reduceByKey//2、如果要从最开始的数据计算,使用有状态算子,updateStateByKey和checkpoint配合使用//3、如果要计算最近的时间段内的数据,使用窗口类算子,reduceByKeyAndWindow/**
* 滑动窗口
*/// val res1DS: DStream[(String, Int)] = wordsKVDS.reduceByKeyAndWindow(// (x: Int, y: Int) => x + y,// Durations.seconds(15), // 设置窗口的大小// Durations.seconds(5)) // 设置滑动的大小/**
* 滚动窗口
*/val res1DS: DStream[(String,Int)]= wordsKVDS.reduceByKeyAndWindow((x:Int, y:Int)=> x + y,
Durations.seconds(10),// 设置窗口的大小
Durations.seconds(10))// 设置滑动的大小
res1DS.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()}}
Demo4DStreamToRDD
packagecom.shujia.streamingimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, SparkSession}importorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo4DStreamToRDD {def main(args: Array[String]):Unit={/**
* 创建SparkSession
*/val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("DS2RDD演示").config("spark.sql.shuffle.partitions",1).getOrCreate()importsparkSession.implicits._
importorg.apache.spark.sql.functions._
val sparkContext: SparkContext = sparkSession.sparkContext
val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))/**
* 通过spark streaming读取数据得到一个DS
*///hello hello world java hello hadoopval linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)val new_linesDS: DStream[String]= linesDS.window(Durations.seconds(15), Durations.seconds(5))/**
* DStream底层也是RDD,每隔一个批次将接收到的数据封装到RDD中
* 每隔一个批次,接收到数据是不一样的
*/
new_linesDS.foreachRDD((rdd: RDD[String])=>{
println("===================================================")
println("正在处理当前批次的数据.....")
println("===================================================")//在这里面直接写处理rdd的代码// rdd.flatMap(_.split(" "))// .map((_, 1))// .groupBy(_._1)// .map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))// .foreach(println)/**
* 既然这里可以操作rdd, 又因为rdd可以转df, 那么我就可以在spark streaming中一个批次的数据可以使用使用sql语句来分析
*///def toDF(colNames: String*)val linesDF: DataFrame = rdd.toDF("line")
linesDF.createOrReplaceTempView("words")
sparkSession.sql("""
|select
|t1.word as word,
|count(1) as number
|from
|(select
| explode(split(line,' ')) as word
|from
|words) t1
|group by t1.word
|""".stripMargin).show()})
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()}}
Demo5RDDToDStream
packagecom.shujia.streamingimportorg.apache.spark.{SparkContext, rdd}importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, Row, SparkSession}importorg.apache.spark.streaming.{Durations, StreamingContext}importorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object Demo5RDDToDStream {def main(args: Array[String]):Unit={/**
* 创建SparkSession
*/val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("DS2RDD演示").config("spark.sql.shuffle.partitions",1).getOrCreate()importsparkSession.implicits._
importorg.apache.spark.sql.functions._
val sparkContext: SparkContext = sparkSession.sparkContext
val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))/**
* 通过spark streaming读取数据得到一个DS
*///hello hello world java hello hadoopval linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)//in_rdd:hello hello world java hello hadoop//out_rdd:// hello 3// world 1// java 1// hadoop 1val resDS: DStream[(String,Long)]= linesDS.transform((rdd: RDD[String])=>{/**
* rdd处理之后会得到一个新的rdd进行返回
*/// val resultRDD: RDD[(String, Int)] = rdd.flatMap(_.split(" "))// .map((_, 1))// .groupBy(_._1)// .map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))// resultRDD//def toDF(colNames: String*)val linesDF: DataFrame = rdd.toDF("line")
linesDF.createOrReplaceTempView("words")val resRDD: RDD[(String,Long)]= sparkSession.sql("""
|select
|t1.word as word,
|count(1) as number
|from
|(select
| explode(split(line,' ')) as word
|from
|words) t1
|group by t1.word
|""".stripMargin).rdd
.map((row: Row)=>{(row.getAs[String](0), row.getAs[Long](1))})
resRDD
})
resDS.print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()}}
Demo6Submit
packagecom.shujia.streamingimportorg.apache.spark.SparkContext
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.streaming.dstream.ReceiverInputDStream
importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo6Submit {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().appName("提交命令执行").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))val linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)
linesDS
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()}}
Demo7SaveToFile
packagecom.shujia.streamingimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo7SaveToFile {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("提交命令执行").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))val linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)val resultDS: DStream[(String,Int)]= linesDS
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).transform((rdd:RDD[(String,Int)])=>{
println("=======================")
println("正在处理批次数据")
rdd
})//目标路径是一个文件夹,文件的名字系统生成的,自己可以指定后缀//每一批次计算结果都会生成一个结果文件,滚动生成的
resultDS.saveAsTextFiles("spark/data/streams/stream","txt")
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()}}
Demo8SaveToMysql
packagecom.shujia.streamingimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.streaming.{Durations, StreamingContext}importorg.apache.spark.streaming.dstream.ReceiverInputDStream
importjava.sql.{Connection, DriverManager, PreparedStatement}object Demo8SaveToMysql {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("提交命令执行").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))val linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)
linesDS.foreachRDD((rdd:RDD[String])=>{
println("------------正在处理一批数据-------------------")
println(s"该批次的分区数:${rdd.getNumPartitions}")/**
* foreachPartition 每一个分区处理一次逻辑
*/
rdd.foreachPartition((itr:Iterator[String])=>{
println("------------数加 防伪码-------------------")//创建与数据库的连接对象//1、注册驱动
Class.forName("com.mysql.jdbc.Driver")//2、获取数据库连接对象val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=utf-8&useSSL=false","root","123456")//获取数据库操作对象val ps: PreparedStatement = conn.prepareStatement("insert into student values(?,?,?,?,?)")
itr.foreach((line:String)=>{
println("....正在处理一条数据....")//1500100019,娄曦之,24,男,理科三班val infos: Array[String]= line.split(",")val id:Int= infos(0).toInt
val name:String= infos(1)val age:Int= infos(2).toInt
val gender:String= infos(3)val clazz:String= infos(4)//给预编译对象传参
ps.setInt(1,id)
ps.setString(2,name)
ps.setInt(3,age)
ps.setString(4,gender)
ps.setString(5,clazz)//执行sql语句
ps.executeUpdate()})//释放资源
ps.close()
conn.close()
println()})})
streamingContext.start()
streamingContext.awaitTermination()
streamingContext.stop()}}
Demo9CardCondition
packagecom.shujia.streamingimportcom.alibaba.fastjson.{JSON, JSONObject}importcom.alibaba.fastjson.serializer.JSONObjectCodec
importorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.streaming.{Durations, StreamingContext}importjava.langimportjava.sql.{Connection, DriverManager, PreparedStatement}importjava.text.SimpleDateFormat
importjava.util.Date
object Demo9CardCondition {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("提交命令执行").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
val sparkStreaming: StreamingContext =new StreamingContext(sparkContext, Durations.seconds(5))/**
*
* 需求:统计卡口流量的情况
* 读取卡口过车的监控数据
* {"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"00001","orientation":"西南","road_id":34052055,"time":1614711895,"speed":36.38}
*/val InfoDS: ReceiverInputDStream[String]= sparkStreaming.socketTextStream("master",12345)/**
* 方式1:在数据源的产生的时候,设置窗口
* 统计最近的15s之内的卡口情况,每5秒统计一次
*///设置滑动窗口val carJsonDS: DStream[String]= InfoDS.window(Durations.seconds(15), Durations.seconds(5))/**
* 1、解析接收到的json数据
* fastjson第三方工具处理
*
* json中值如果被双引号括起来的是一个字符串,若没有切是一个整数,一般都用long来接收,小数一般用double
*
*/val cardAndSpeedDS: DStream[(Long,(Double,Int))]= carJsonDS.map((line:String)=>{//使用fastjson将字符串转成Json对象val jSONObject: JSONObject = JSON.parseObject(line)//根据需求,只需要获取卡口的值和车速val cardId:Long= jSONObject.getLong("card")val carSpeed:Double= jSONObject.getDouble("speed")(cardId,(carSpeed,1))//假设每次产生的数据都不是同一辆车})/**
* 2、实时统计每个卡口的平均车速和车的数量
* 方式2:在DS调用reduceByKeyAndWindow的时候设置窗口
*
*/// cardAndSpeedDS.reduceByKeyAndWindow()val cardConditionDS: DStream[(Long,(Double,Int))]= cardAndSpeedDS.reduceByKey((kv1:(Double,Int), kv2:(Double,Int))=>{//将同一卡口的1加起来,就得到这个批次中的车的数量val carNumber:Int= kv1._2 + kv2._2
//将同一卡口的速度加起来 / 车的数量 = 这一批次的卡口平均速度val sumSpeed:Double= kv1._1 + kv2._1
val avgSpeed:Double= sumSpeed / carNumber
(avgSpeed, carNumber)})/**
* 将结果保存到数据库中
*/
cardConditionDS.foreachRDD((rdd: RDD[(Long,(Double,Int))])=>{
rdd.foreachPartition((itr: Iterator[(Long,(Double,Int))])=>{
println("------------数加 防伪码-------------------")//创建与数据库的连接对象//1、注册驱动
Class.forName("com.mysql.jdbc.Driver")//2、获取数据库连接对象val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=utf-8&useSSL=false","root","123456")//获取数据库操作对象val ps: PreparedStatement = conn.prepareStatement("insert into card_condition values(?,?,?,?)")//获取数据处理时间val piCiTime:String=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())
itr.foreach((f:(Long,(Double,Int)))=>{val cardId:Long= f._1
val avgSpeed:Double= f._2._1
val carNumber:Int= f._2._2
ps.setLong(1, cardId)
ps.setDouble(2, avgSpeed)
ps.setInt(3, carNumber)
ps.setString(4, piCiTime)
ps.executeUpdate()})//释放资源
ps.close()
conn.close()
println()})})
sparkStreaming.start()
sparkStreaming.awaitTermination()
sparkStreaming.stop()}}
4.Spark-opt
Demo1Cache
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.storage.StorageLevel
object Demo1Cache {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
val studentsRDD: RDD[String]= sparkContext.textFile("spark/data/students.csv")/**
* 尽量避免使用重复的RDD,避免了之前所有的RDD重复计算
*
* rdd
* df
* sql
*
*///针对被复用的rdd进行缓存// studentsRDD.cache() // 默认是MEMORY_ONLY
studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)/**
* 统计每个班级的人数
*/
studentsRDD.map((line:String)=>(line.split(",")(4),1)).reduceByKey(_ + _).saveAsTextFile("spark/data/opt/clazz_num")/**
* 统计每个性别的人数
*
* 第一次作用用到studentsRDD的时候是原本的数据量大小,当第二个作业也用到studentsRDD数据的时候,就去缓存中寻找数据
*/
studentsRDD.map((line:String)=>(line.split(",")(3),1)).reduceByKey(_ + _).saveAsTextFile("spark/data/opt/gender_num")while(true){}}}
Demo2AggregateByKey
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
object Demo2AggregateByKey {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
val studentsRDD: RDD[String]= sparkContext.textFile("spark/data/students.csv")val clazzKVRDD: RDD[(String,Int)]= studentsRDD.map((line:String)=>(line.split(",")(4),1))/**
* reduceByKey: 分组聚合,但是只会在reduce端进行聚合
*
*
* AggregateByKey: 分组聚合,不仅可以设置reduce端聚合的方式,可以提前的在map端进行聚合,相当于预聚合
* zeroValue:初始值
* seqOp: (U, V) => U:map端的聚合
* combOp: (U, U) => U:reduce端的聚合
*
* 如果当map聚合逻辑和reduce聚合逻辑不一样的时候,需要分别设置的时候,就可以使用该算子
*
*///aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]// clazzKVRDD.aggregateByKey(0)(// (u: Int, v: Int) => u + v, // 在map端做聚合操作的函数// (u1: Int, u2: Int) => u1 + u2 //在reduce端做聚合操作// ).foreach(println)//reduceByKey(func: (V, V) => V)
clazzKVRDD.reduceByKey(_ + _).foreach(println)}}
Demo3MapPartition
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
importjava.text.SimpleDateFormat
importjava.util.Date
object Demo3MapPartition {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions",2).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
val dataRDD: RDD[String]= sparkContext.textFile("spark/data/ant_user_low_carbon.txt")val kvRDD: RDD[(String,String,String)]= dataRDD.mapPartitions((itr: Iterator[String])=>{
itr.map((line:String)=>{val infos: Array[String]= line.split("\t")(infos(0), infos(1), infos(2))})})// map的逻辑是RDD中的处理每一条数据// val resRDD: RDD[(String, Long, String)] = kvRDD.map((kv: (String, String, String)) => {// //这句话是是在map中的,所以针对每一个数据都要new一个SimpleDateFormat对象// val sdf = new SimpleDateFormat("yyyy/MM/dd")// println("----------------创建了一个SimpleDateFormat对象----------------")//// val dateObj: Date = sdf.parse(kv._2)// val ts: Long = dateObj.getTime// (kv._1, ts, kv._3)// })//// resRDD.foreach(println)//mapPartitions针对一个分区的数据进行处理val resRDD2: RDD[(String,Long,String)]= kvRDD.mapPartitions((itr: Iterator[(String,String,String)])=>{/**
* 将时间字段转成时间戳
*/val sdf =new SimpleDateFormat("yyyy/MM/dd")
println("----------------创建了一个SimpleDateFormat对象----------------")
itr.map((kv:(String,String,String))=>{val dateObj: Date = sdf.parse(kv._2)val ts:Long= dateObj.getTime
(kv._1, ts, kv._3)})})
resRDD2.foreach(println)}}
Demo4Coalesce
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
/**
* 重分区
*
* repartition
*
* coalesce
*
* 面试题:如何修改rdd中的分区,区别是什么?
*
*/object Demo4Coalesce {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache")// .config("spark.sql.shuffle.partitions", 1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
val studentsRDD: RDD[String]= sparkContext.textFile("spark/data/students.csv")// 1000条数据
println(s"studentsRDD的分区数量为:${studentsRDD.getNumPartitions}")//一开始RDD中的分区数量取决于block块的数量/**
* repartition:对RDD进行重分区,返回一个新的RDD
* repartition可以增加RDD的分区数量或者减少RDD的分区数量
*
* 增加或者减少分区是根据资源而定
* 若资源充足,可以适当的增加分区,提高task任务数量并行度,加快整个spark作业执行
*
* rdd中一个分区数据是否对应有且仅有后一个分区数据,如果前一个RDD的分区数据对应的是后一个RDD中多个分区,那么就是shuffle
* 使用repartition增加分区会产生shuffle,数据重构,不是按照原来的顺序
* 减少分区也会产生shuffle
*
*/// val students2RDD: RDD[String] = studentsRDD.repartition(10) students2RDD.foreach(println)// println(s"students2RDD的分区数量为:${students2RDD.getNumPartitions}")//// val students3RDD: RDD[String] = students2RDD.repartition(1)// println(s"students3RDD的分区数量为:${students3RDD.getNumPartitions}")// students3RDD.foreach(println)/**
* coalesce 重分区,增加分区和减少分区
*
* 传入两个值:numPartitions: Int 目标分区数量, shuffle: Boolean = false 是否产生shuffle
*
* coalesce增加分区,需要产生shuffle
* coalesce减少分区,可以不用shuffle,也可以用。使用场景,如果要合并小文件的话,可以使用coalesce且不产生shuffle
*/val studentsRDD2: RDD[String]= studentsRDD.coalesce(10, shuffle =true)
println(s"studentsRDD2的分区数量为:${studentsRDD2.getNumPartitions}")
studentsRDD2.foreach(println)val studentRDD3: RDD[String]= studentsRDD2.coalesce(2, shuffle =false)
println(s"studentRDD3的分区数量为:${studentRDD3.getNumPartitions}")
studentRDD3.foreach(println)while(true){}}}
Demo5Coalesce2
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
object Demo5Coalesce2 {def main(args: Array[String]):Unit={/**
* 使用coalesce合并小文件
*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache")// .config("spark.sql.shuffle.partitions", 1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
val studentsRDD: RDD[String]= sparkContext.textFile("spark/data/studentsinfo/*")// 1000条数据
println(s"studentsRDD的分区数量为:${studentsRDD.getNumPartitions}")//一开始RDD中的分区数量取决于block块的数量//合并小文件val students2RDD: RDD[String]= studentsRDD.coalesce(1, shuffle =false)
println(s"students2RDD的分区数量为:${students2RDD.getNumPartitions}")//一开始RDD中的分区数量取决于block块的数量
students2RDD.saveAsTextFile("spark/data/studentsinfo2")}}
Demo6MapJoin
packagecom.shujia.optimportorg.apache.spark.sql.{DataFrame, SparkSession}object Demo6MapJoin {def main(args: Array[String]):Unit={/**
* 使用coalesce合并小文件
*/val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions",1).getOrCreate()val studentsDF: DataFrame = sparkSession.read
.format("csv").option("sep",",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING").load("spark/data/students.csv")val scoresDF: DataFrame = sparkSession.read
.format("csv").option("sep",",").schema("id STRING,subject_id STRING,score INT").load("spark/data/score.txt")/**
* 在local模式下,会对sql进行优化,使用广播变量,思想类似于mapjoin
*
* 放到集群中运行,需要手动设置广播的数据
* 在DSL代码中使用语法糖来使用广播数据
*
* 一般来说,大表join小表,小表在DSL函数中使用hint语法将小表广播出去 小表一般指的是小于1G的数据
*
* 开启两个作业job
* 第一个job:将小表数据拉到Driver端,从Driver端广播出去
* 第二个job:在Executor内部与广播的数据进行数据关联
*
*/val resDF: DataFrame = scoresDF.join(studentsDF.hint("broadcast"),"id")
resDF.show()while(true){}}}
Demo7Kryo
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.storage.StorageLevel
/**
* Kryo序列化是spark提供的一种专门适配spark计算时提高性能的一种序列化方式
*
* 序列化可以提高计算的性能,加快计算速度,减少数据所占用的内存空间
*/caseclass Student(id:String, name:String, age:Int, gender:String, clazz:String)object Demo7Kryo {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions",1)//将序列化方式设置为Kryo的序列化方式.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")//自定义一个序列化类,指定要序列化的东西.config("spark.kryo.registrator","com.shujia.opt.Demo8KryoRegistrator").getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
val studentsRDD: RDD[Student]= sparkContext.textFile("spark/data/students.csv").map((line:String)=>{val infos: Array[String]= line.split(",")
Student(infos(0), infos(1), infos(2).toInt, infos(3), infos(4))})/**
* 未做序列化:238.3 KiB
* 使用默认的序列化方式:55.7 KiB
* 使用kryo序列化:43.0 KiB
*
*///针对被复用的rdd进行缓存// studentsRDD.cache() // 默认是MEMORY_ONLY// studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)// studentsRDD.persist(StorageLevel.MEMORY_ONLY)
studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)/**
* 统计每个班级的人数
*/
studentsRDD.map((stu: Student)=>(stu.clazz,1)).reduceByKey(_ + _).saveAsTextFile("spark/data/opt/clazz_num")/**
* 统计每个性别的人数
*
* 第一次作用用到studentsRDD的时候是原本的数据量大小,当第二个作业也用到studentsRDD数据的时候,就去缓存中寻找数据
*/
studentsRDD.map((stu: Student)=>(stu.gender,1)).reduceByKey(_ + _).saveAsTextFile("spark/data/opt/gender_num")while(true){}}}
Demo8KryoRegistrator
packagecom.shujia.optimportcom.esotericsoftware.kryo.Kryo
importorg.apache.spark.serializer.KryoRegistrator
/**
* 自定义一个序列化类,指定kryo要序列化的东西
*/class Demo8KryoRegistrator extends KryoRegistrator{overridedef registerClasses(kryo: Kryo):Unit={/**
* 在这个重写的方法中指定要序列化的东西
*
*/
kryo.register(classOf[Student])
kryo.register(classOf[String])
kryo.register(classOf[Int])}}
Demo9FilterKey
packagecom.shujia.optimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo9FilterKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local").setAppName("app")val sc: SparkContext =new SparkContext(conf)val lines: RDD[String]= sc.textFile("spark/data/ws/*")
println("第一个RDD分区数量:"+ lines.getNumPartitions)val countRDD: RDD[(String,Int)]= lines
.flatMap(_.split("\\|")).map((_,1)).groupByKey()// 这里会产生shuffle.map((x:(String, Iterable[Int]))=>(x._1, x._2.toList.sum))
println("聚合之后RDD分区的数量"+ countRDD.getNumPartitions)// countRDD.foreach(println)/**
* 采样key ,g过滤掉导致数据倾斜并且对业务影响不大的key
*
*/val wordRDD: RDD[(String,Int)]= lines
.flatMap(_.split(",")).map((_,1))//// val top1: Array[(String, Int)] = wordRDD// .sample(true, 0.1)// .reduceByKey(_ + _)// .sortBy(-_._2)// .take(1)//导致数据倾斜额key// val key: String = top1(0)._1//过滤导致倾斜的key
wordRDD
.filter(t =>!"null".equals(t._1)).groupByKey().map(x =>(x._1, x._2.toList.sum)).foreach(println)while(true){}}}
Demo10DoubleReduce
packagecom.shujia.optimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}importscala.util.Random
object Demo10DoubleReduce {/**
* 双重聚合
* 一般适用于 业务不复杂的情况
*
*/def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local").setAppName("app")val sc: SparkContext =new SparkContext(conf)val lines: RDD[String]= sc.textFile("data/word")val wordRDD: RDD[String]= lines
.flatMap(_.split(",")).filter(!_.equals(""))/* wordRDD
.map((_, 1))
.groupByKey()
.map(kv => (kv._1, kv._2.size))
.foreach(println)*///预聚合可以避免数据倾斜/* wordRDD
.map((_, 1))
.reduceByKey(_ + _)
.foreach(println)*/// 对每一个key打上随机5以内前缀
wordRDD
.map(word =>{val pix:Int= Random.nextInt(5)(pix +"-"+ word,1)}).groupByKey()//第一次聚合.map(t =>(t._1, t._2.toList.sum)).map(t =>{///去掉随机前缀(t._1.split("-")(1), t._2)}).groupByKey()//第二次聚合.map(t =>(t._1, t._2.toList.sum)).foreach(println)while(true){}}}
Demo11DoubleJoin
packagecom.shujia.optimportorg.apache.spark.broadcast.Broadcast
importorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo11DoubleJoin {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setAppName("app").setMaster("local")val sc =new SparkContext(conf)val dataList1 = List(("java",1),("shujia",2),("shujia",3),("shujia",1),("shujia",1))val dataList2 = List(("java",100),("java",99),("shujia",88),("shujia",66))val RDD1: RDD[(String,Int)]= sc.parallelize(dataList1)val RDD2: RDD[(String,Int)]= sc.parallelize(dataList2)//采样倾斜的keyval sampleRDD: RDD[(String,Int)]= RDD1.sample(false,1.0)//skewedKey 导致数据倾斜的key shujiaval skewedKey:String= sampleRDD.map(x =>(x._1,1)).reduceByKey(_ + _).map(x =>(x._2, x._1)).sortByKey(ascending =false).take(1)(0)._2
//导致数据倾斜key的RDDval skewedRDD1: RDD[(String,Int)]= RDD1.filter(tuple =>{
tuple._1.equals(skewedKey)})//没有倾斜的keyval commonRDD1: RDD[(String,Int)]= RDD1.filter(tuple =>{!tuple._1.equals(skewedKey)})val skewedRDD2: RDD[(String,Int)]= RDD2.filter(tuple =>{
tuple._1.equals(skewedKey)})val commonRDD2: RDD[(String,Int)]= RDD2.filter(tuple =>{!tuple._1.equals(skewedKey)})val n =2//对产生数据倾斜的key 使用mapjoinval skewedMap: Map[String,Int]= skewedRDD2.collect().toMap
val bro: Broadcast[Map[String,Int]]= sc.broadcast(skewedMap)val resultRDD1: RDD[(String,(Int,Int))]= skewedRDD1.map(kv =>{val word:String= kv._1
val i:Int= bro.value.getOrElse(word,0)(word,(kv._2, i))})//没有数据倾斜的RDD 正常joinval resultRDD2: RDD[(String,(Int,Int))]= commonRDD1.join(commonRDD2)//将两个结果拼接
resultRDD1.union(resultRDD2).foreach(println)}}
本文转载自: https://blog.csdn.net/qq_63548396/article/details/139637588
版权归原作者 @追求 所有, 如有侵权,请联系我们删除。
版权归原作者 @追求 所有, 如有侵权,请联系我们删除。