0


大数据学习-Spark

大数据学习-Spark

1.Spark-core

1.Demo1WordCount

packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}/*
  RDD: 弹性的分布式数据集
 */object Demo1WordCount {def main(args: Array[String]):Unit={//1、创建Spark环境//1.1 创建配置文件对象val conf: SparkConf =new SparkConf()//1.2 指定运行的模式(local  Standalone  Mesos  YARN)
    conf.setMaster("local")//可以执行所运行需要核数资源local[2],不指定的话默认使用所有的资源执行程序//1.3 给spark作业起一个名字
    conf.setAppName("wc")//2、创建spark运行时的上下文对象val sparkContext: SparkContext =new SparkContext(conf)//3、读取文件数据val wordsLine: RDD[String]= sparkContext.textFile("spark/data/words.txt")//4、每一行根据|分隔符进行切分val words: RDD[String]= wordsLine.flatMap(_.split("\\|"))val wordsTuple2: RDD[(String,Int)]= words.map((_,1))val wordsTuple2Group: RDD[(String, Iterable[(String,Int)])]= wordsTuple2.groupBy(_._1)val wordCount: RDD[(String,Int)]= wordsTuple2Group.map((kv:(String, Iterable[(String,Int)]))=>(kv._1, kv._2.size))
    wordCount.saveAsTextFile("spark/data/word_count")//将最终的结果保存在本地目录}}
2.Demo2Partition

packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD

/**
 * spark的运行过程中如果出现了相同的键被拉取到对应的分区,这个过程称之为shuffle
 * 注:spark的shuffle和mapreduce的shuffle原理是一样,都是要进行落盘
 *
 * RDD: 弹性的分布式数据集
 * 弹性:RDD将来在计算的时候,其中的数据可以是很大,也可以是很小
 * 分布式:数据可以分布在多台服务器中,RDD中的分区来自于block块,而今后的block块会来自不同的datanode
 * 数据集:RDD自身是不存储数据的,只是一个代码计算逻辑,今后触发作业执行的时候,数据会在RDD之间流动
 *
 */object Demo2Partition {def main(args: Array[String]):Unit={//1、创建Spark环境//1.1 创建配置文件对象val conf: SparkConf =new SparkConf()//1.2 指定运行的模式(local  Standalone  Mesos  YARN)
    conf.setMaster("local")//可以执行所运行需要核数资源local[2],不指定的话默认使用所有的资源执行程序//1.3 给spark作业起一个名字
    conf.setAppName("wc")//2、创建spark运行时的上下文对象val sparkContext: SparkContext =new SparkContext(conf)//3、读取文件数据//    val wordsLine: RDD[String] = sparkContext.textFile("spark/data/ws/*", minPartitions = 7)val wordsLine: RDD[String]= sparkContext.textFile("spark/data/ws/*")
    println(s"wordsLineRDD分区数是:${wordsLine.getNumPartitions}")//4、每一行根据|分隔符进行切分val words: RDD[String]= wordsLine.flatMap(_.split("\\|"))
    println(s"wordsRDD分区数是:${words.getNumPartitions}")val wordsTuple2: RDD[(String,Int)]= words.map((_,1))
    println(s"wordsTuple2RDD分区数是:${wordsTuple2.getNumPartitions}")//产生shuffle的算子上可以单独设置分区数val wordsTuple2Group: RDD[(String, Iterable[(String,Int)])]= wordsTuple2.groupBy(_._1,5)
    println(s"wordsTuple2GroupRDD分区数是:${wordsTuple2Group.getNumPartitions}")val wordCount: RDD[(String,Int)]= wordsTuple2Group.map((kv:(String, Iterable[(String,Int)]))=>(kv._1, kv._2.size))
    println(s"wordCountRDD分区数是:${wordCount.getNumPartitions}")

    wordCount.saveAsTextFile("spark/data/word_count2")}}
  • RDD5大特性:(面试必会的!!)
  • 1)RDD是由一些分区构成的 读取文件时有多少个block块,RDD中就会有多少个分区
  • 注:默认情况下,所有的RDD中的分区数是一样的,无论是shuffle之前还是shuffle之后的,在最开始加载数据的时候决定的

  • 2)函数实际上是作用在RDD中的分区上的,一个分区是由一个task处理,有多少个分区,总共就有多少个task
  • 注:函数在spark中称之为算子(转换transformation算子 RDD–>RDD,行动action算子 RDD->Other数据类型)

  • 3)RDD之间存在一些依赖关系,后一个RDD中的数据是依赖与前一个RDD的计算结果,数据像水流一样在RDD之间流动
  • 注:
  • 3.1 RDD之间有两种依赖关系
  • **a. 窄依赖 后一个RDD中分区数据对应前一个RDD中的一个分区数据 1对1的关系**
  • **b. 宽依赖 后一个RDD中分区数据来自于前一个RDD中的多个分区数据 1对多的关系 shuffle**
  • **3.2 因为有了依赖关系,将整个作业划分了一个一个stage阶段 sumNum(stage) = Num(宽依赖) + 1**
  • **3.3 窄依赖的分区数是不可以改变,取决于第一个RDD分区数,宽依赖可以在产生shuffle的算子上设置分区数**

  • 4)分区类的算子只能作用在kv格式的RDD上,groupByKey reduceByKey
  • 5)spark为task计算提供了精确的计算位置,移动计算而不移动数据
3.Demo3Map
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}/*
  map算子:转换算子

  一个spark作业,由最后的一个action算子来触发执行的,若没有action算子,整个作业不执行

  RDD具有懒执行的特点

 */object Demo3Map {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")/**
     * map算子:将rdd中的数据,一条一条的取出来传入到map函数中,map会返回一个新的rdd,map不会改变总数据条数
     */val splitRDD: RDD[List[String]]= studentRDD.map((s:String)=>{
      println("============数加防伪码================")
      s.split(",").toList
    })//    splitRDD.foreach(println)}}
4.Demo4Filter
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD

object Demo4Filter {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")/**
     *  filter: 过滤,将RDD中的数据一条一条取出传递给filter后面的函数,如果函数的结果是true,该条数据就保留,否则丢弃
     *
     *  filter一般情况下会减少数据的条数
     */val filterRDD: RDD[String]= studentRDD.filter((s:String)=>{val strings: Array[String]= s.split(",")"男".equals(strings(3))})

    filterRDD.foreach(println)}}
5.Demo5flatMap
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo5flatMap {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("flatMap算子演示")val context =new SparkContext(conf)//====================================================val linesRDD: RDD[String]= context.textFile("spark/data/words.txt")/**
     *  flatMap算子:将RDD中的数据一条一条的取出传递给后面的函数,函数的返回值必须是一个集合。最后会将集合展开构成一个新的RDD
     */val wordsRDD: RDD[String]= linesRDD.flatMap((line:String)=> line.split("\\|"))

    wordsRDD.foreach(println)}}
6.Demo6sample
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo6sample {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("flatMap算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")/**
     *  sample算子:从前一个RDD的数据中抽样一部分数据
     *
     *  抽取的比例不是正好对应的,在抽取的比例上下浮动 比如1000条抽取10% 抽取的结果在100条左右
     */val sampleRDD: RDD[String]= studentRDD.sample(withReplacement =true,0.1)
    sampleRDD.foreach(println)}}
7.Demo7GroupBy
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD

object Demo7GroupBy {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("groupBy算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")val splitRDD: RDD[Array[String]]= studentRDD.map((s:String)=> s.split(","))//需求:求出每个班级平均年龄//使用模式匹配的方式取出班级和年龄val clazzWithAgeRDD: RDD[(String,Int)]= splitRDD.map {case Array(_, _, age:String, _, clazz:String)=>(clazz, age.toInt)}/**
     *  groupBy:按照指定的字段进行分组,返回的是一个键是分组字段,值是一个存放原本数据的迭代器的键值对 返回的是kv格式的RDD
     *
     *  key: 是分组字段
     *  value: 是spark中的迭代器
     *  迭代器中的数据,不是完全被加载到内存中计算,迭代器只能迭代一次
     *
     *  groupBy会产生shuffle
     *///按照班级进行分组//val stringToStudents: Map[String, List[Student]] = stuList.groupBy((s: Student) => s.clazz)val kvRDD: RDD[(String, Iterable[(String,Int)])]= clazzWithAgeRDD.groupBy(_._1)val clazzAvgAgeRDD: RDD[(String,Double)]= kvRDD.map {case(clazz:String, itr: Iterable[(String,Int)])=>//CompactBuffer((理科二班,21), (理科二班,23), (理科二班,21), (理科二班,23), (理科二班,21), (理科二班,21), (理科二班,24))//CompactBuffer(21,23,21,23,21,21,24)val allAge: Iterable[Int]= itr.map((kv:(String,Int))=> kv._2)val avgAge:Double= allAge.sum.toDouble / allAge.size
        (clazz, avgAge)}
    clazzAvgAgeRDD.foreach(println)while(true){}}}
8.Demo8GroupByKey
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD

object Demo8GroupByKey {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("groupByKey算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")val splitRDD: RDD[Array[String]]= studentRDD.map((s:String)=> s.split(","))//需求:求出每个班级平均年龄//使用模式匹配的方式取出班级和年龄val clazzWithAgeRDD: RDD[(String,Int)]= splitRDD.map {case Array(_, _, age:String, _, clazz:String)=>(clazz, age.toInt)}/**
     *  groupByKey: 按照键进行分组,将value值构成迭代器返回
     *  将来你在spark中看到RDD[(xx, xxx)] 这样的RDD就是kv键值对类型的RDD
     *  只有kv类型键值对RDD才可以调用groupByKey算子
     *
     */val kvRDD: RDD[(String, Iterable[Int])]= clazzWithAgeRDD.groupByKey()val clazzAvgAgeRDD: RDD[(String,Double)]= kvRDD.map {case(clazz:String, ageItr: Iterable[Int])=>(clazz, ageItr.sum.toDouble / ageItr.size)}
    clazzAvgAgeRDD.foreach(println)while(true){}}}
groupBy与groupByKey的区别(spark的面试题)
*1、代码上的区别:任意一个RDD都可以调用groupBy算子,只有kv类型的RDD才可以调用groupByKey
*2、groupByKey之后产生的RDD的结构比较简单,方便后续处理
*3、groupByKey的性能更好,执行速度更快,因为groupByKey相比较与groupBy算子来说,shuffle所需要的数据量较少
9.Demo9ReduceByKey
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD

object Demo9ReduceByKey {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("reduceByKey算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")val splitRDD: RDD[Array[String]]= studentRDD.map((s:String)=> s.split(","))//求每个班级的人数val clazzKVRDD: RDD[(String,Int)]= splitRDD.map {case Array(_, _, _, _, clazz:String)=>(clazz,1)}/**
     * 利用groupByKey实现
     *///    val kvRDD: RDD[(String, Iterable[Int])] = clazzKVRDD.groupByKey()//    val clazzAvgAgeRDD: RDD[(String, Double)] = kvRDD.map {//      case (clazz: String, n: Iterable[Int]) =>//        (clazz, n.sum)//    }//    clazzAvgAgeRDD.foreach(println)/**
     * 利用reduceByKey实现:按照键key对value值直接进行聚合,需要传入聚合的方式
     * reduceByKey算子也是只有kv类型的RDD才能调用
     *
     *
     */val countRDD: RDD[(String,Int)]= clazzKVRDD.reduceByKey((x:Int, y:Int)=> x + y)
    countRDD.foreach(println)//    clazzKVRDD.groupByKey()//      .map(kv=>(kv._1,kv._2.sum))//      .foreach(println)while(true){}/**
    
     */}}
  • reduceByKey与groupByKey的区别- 1、reduceByKey比groupByKey在map端多了一个预聚合的操作,预聚合之后的shuffle数据量肯定是要少很多的,性能上比groupByKey要好- 2、从灵活角度来看,reduceByKey并没有groupByKey灵活- 比如reduceByKey无法做方差,groupByKey后续可以完成
10.Demo10Union
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo10Union {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Union算子演示")val context =new SparkContext(conf)//====================================================val w1RDD: RDD[String]= context.textFile("spark/data/ws/w1.txt")// 1val w2RDD: RDD[String]= context.textFile("spark/data/ws/w2.txt")// 1/**
     *  union:上下合并两个RDD,前提是两个RDD中的数据类型要一致,合并后不会对结果进行去重
     *
     *  注:这里的合并只是逻辑层面上的合并,物理层面其实是没有合并
     */val unionRDD: RDD[String]= w1RDD.union(w2RDD)
    println(unionRDD.getNumPartitions)// 2

    unionRDD.foreach(println)while(true){}}}
11.Demo11Join
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo11Join {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Join算子演示")val context =new SparkContext(conf)//====================================================//两个kv类型的RDD之间的关联//通过scala中的集合构建RDDval rdd1: RDD[(String,String)]= context.parallelize(
      List(("1001","尚平"),("1002","丁义杰"),("1003","徐昊宇"),("1004","包旭"),("1005","朱大牛"),("1006","汪权")))val rdd2: RDD[(String,String)]= context.parallelize(
      List(("1001","崩坏"),("1002","原神"),("1003","王者"),("1004","修仙"),("1005","学习"),("1007","敲代码")))/**
     *  内连接:join
     *  左连接:leftJoin
     *  右连接:rightJoin
     *  全连接:fullJoin
     *///内连接//    val innerJoinRDD: RDD[(String, (String, String))] = rdd1.join(rdd2)//    //加工一下RDD//    val innerJoinRDD2: RDD[(String, String, String)] = innerJoinRDD.map {//      case (id: String, (name: String, like: String)) => (id, name, like)//    }//    innerJoinRDD2.foreach(println)//左连接val leftJoinRDD: RDD[(String,(String, Option[String]))]= rdd1.leftOuterJoin(rdd2)//加工一下RDDval leftJoinRDD2: RDD[(String,String,String)]= leftJoinRDD.map {case(id:String,(name:String, Some(like)))=>(id, name, like)case(id:String,(name:String, None))=>(id, name,"无爱好")}
    leftJoinRDD2.foreach(println)

    println("=================================")//右连接自己试//TODO:自己试右连接//全连接val fullJoinRDD: RDD[(String,(Option[String], Option[String]))]= rdd1.fullOuterJoin(rdd2)//加工一下RDDval fullJoinRDD2: RDD[(String,String,String)]= fullJoinRDD.map {case(id:String,(Some(name), Some(like)))=>(id, name, like)case(id:String,(Some(name), None))=>(id, name,"无爱好")case(id:String,(None, Some(like)))=>(id,"无姓名", like)}
    fullJoinRDD2.foreach(println)}}
12.Demo12Student
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo12Student {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Join算子演示")val context =new SparkContext(conf)//====================================================//需求:统计总分年级排名前10的学生的各科分数//读取分数文件数据val scoreRDD: RDD[(String,String,String)]= context.textFile("spark/data/score.txt")// 读取数据文件.map((s:String)=> s.split(","))// 切分数据.filter((arr: Array[String])=> arr.length ==3)// 过滤掉脏数据.map {//整理数据,进行模式匹配取出数据case Array(sid:String, subject_id:String, score:String)=>(sid, subject_id, score)}//计算每个学生的总分val sumScoreWithSidRDD: RDD[(String,Int)]= scoreRDD.map {case(sid:String, _:String, score:String)=>(sid, score.toInt)}.reduceByKey((x:Int, y:Int)=> x + y)//按照总分排序val sumScoreTop10: Array[(String,Int)]= sumScoreWithSidRDD.sortBy(-_._2).take(10)//取出前10的学生学号val ids: Array[String]= sumScoreTop10.map(_._1)//取出每个学生各科分数val top10StuScore: RDD[(String,String,String)]= scoreRDD.filter {case(id:String, _, _)=> ids.contains(id)}

    top10StuScore.foreach(println)}}
13.Demo13MapValues
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD

object Demo13MapValues {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Join算子演示")val context =new SparkContext(conf)//====================================================//需求:统计总分年级排名前10的学生的各科分数//读取分数文件数据val scoreRDD: RDD[(String,String,String)]= context.textFile("spark/data/score.txt")// 读取数据文件.map((s:String)=> s.split(","))// 切分数据.filter((arr: Array[String])=> arr.length ==3)// 过滤掉脏数据.map {//整理数据,进行模式匹配取出数据case Array(sid:String, subject_id:String, score:String)=>(sid, subject_id, score)}//计算每个学生的总分val sumScoreWithSidRDD: RDD[(String,Int)]= scoreRDD.map {case(sid:String, _:String, score:String)=>(sid, score.toInt)}.reduceByKey((x:Int, y:Int)=> x + y)/**
     * mapValues算子:也是作用在kv类型的RDD上
     * 主要的作用键不变,处理值
     */val resRDD: RDD[(String,Int)]= sumScoreWithSidRDD.mapValues(_ +1000)
    resRDD.foreach(println)//等同于val res2RDD: RDD[(String,Int)]= sumScoreWithSidRDD.map((kv:(String,Int))=>(kv._1, kv._2 +1000))}}
14.Demo14mapPartition
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD

object Demo14mapPartition {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("mapPartition算子演示")val context =new SparkContext(conf)//====================================================//需求:统计总分年级排名前10的学生的各科分数//读取分数文件数据val scoreRDD: RDD[String]= context.textFile("spark/data/ws/*")// 读取数据文件

    println(scoreRDD.getNumPartitions)/**
     *  mapPartition: 主要作用是一次处理一个分区的数据,将一个分区的数据一个一个传给后面的函数进行处理
     *
     *  迭代器中存放的是一个分区的数据
     *///    val mapPartitionRDD: RDD[String] = scoreRDD.mapPartitions((itr: Iterator[String]) => {////      println(s"====================当前处理的分区====================")//      //这里写的逻辑是作用在一个分区上的所有数据//      val words: Iterator[String] = itr.flatMap(_.split("\\|"))//      words//    })//    mapPartitionRDD.foreach(println)

    scoreRDD.mapPartitionsWithIndex{case(index:Int,itr: Iterator[String])=>
        println(s"当前所处理的分区编号是:${index}")
        itr.flatMap(_.split("\\|"))}.foreach(println)}}
15.Demo15Actions
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo15Actions {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Action算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")/**
     * 转换算子:transformation 将一个RDD转换成另一个RDD,转换算子是懒执行的,需要一个action算子触发执行
     *
     * 行动算子(操作算子):action算子,触发任务执行。一个action算子就会触发一次任务执行
     */
    println("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$")val studentsRDD: RDD[(String,String,String,String,String)]= studentRDD.map(_.split(",")).map {case Array(id:String, name:String, age:String, gender:String, clazz:String)=>
          println("**************************** 数加防伪码 ^_^ ********************************")(id, name, age, gender, clazz)}
    println("$$$$$$$$$$$$$$$$$$$$$$***__***$$$$$$$$$$$$$$$$$$$$$$$$$")// foreach其实就是一个action算子//    studentsRDD.foreach(println)//    println("="*100)//    studentsRDD.foreach(println)//    while (true){////    }/**
     *  collect()行动算子 主要作用是将RDD转成scala中的数据结构
     *
     */val tuples: Array[(String,String,String,String,String)]= studentsRDD.collect()}}
16.Demo16Catch
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD
importorg.apache.spark.storage.StorageLevel

object Demo16Catch {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("Action算子演示")val context =new SparkContext(conf)//设置checkpoint路径,将来对应的是HDFS上的路径
    context.setCheckpointDir("spark/data/checkpoint")//====================================================val linesRDD: RDD[String]= context.textFile("spark/data/students.csv")val splitRDD: RDD[Array[String]]= linesRDD.map(_.split(","))//处理数据val studentsRDD: RDD[(String,String,String,String,String)]= splitRDD.map {case Array(id:String, name:String, age:String, gender:String, clazz:String)=>(id, name, age, gender, clazz)}//对studentsRDD进行缓存/**
     * 特点带来的问题:既然叫做缓存,所以在程序运行过程中无论是只放内存还是磁盘内存一起使用,一旦程序结束,缓存数据全部丢失。
     *
     * spark针对上面的场景提供了一个解决方案:可以将RDD运行时的数据永久持久化在HDFS上,这个方案叫做checkpoint,需要在spark环境中设置checkpoint的路径
     *///    studentsRDD.cache() //默认情况下,是将数据缓存在内存中//    studentsRDD.persist(StorageLevel.MEMORY_AND_DISK)
      studentsRDD.checkpoint()//统计每个班级的人数val clazzKVRDD: RDD[(String,Int)]= studentsRDD.map {case(_, _, _, _, clazz:String)=>(clazz,1)}val clazzNumRDD: RDD[(String,Int)]= clazzKVRDD.reduceByKey(_ + _)
    clazzNumRDD.saveAsTextFile("spark/data/clazz_num")//统计性别的人数val genderKVRDD: RDD[(String,Int)]= studentsRDD.map {case(_, _, _, gender:String, _)=>(gender,1)}val genderNumRDD: RDD[(String,Int)]= genderKVRDD.reduceByKey(_ + _)
    genderNumRDD.saveAsTextFile("spark/data/gender_num")////    while (true){////    }}}
17.Demo17SparkStandaloneSubmit
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo17SparkStandaloneSubmit {def main(args: Array[String]):Unit={val conf =new SparkConf()/**
     * 如果将来在linux集群中运行,这里就不需要设置setMaster
     *///    conf.setMaster("local")val sparkContext =new SparkContext(conf)val linesRDD: RDD[String]= sparkContext.parallelize(List("java,hello,world","hello,scala,spark","java,hello,spark"))val wordRDD: RDD[String]= linesRDD.flatMap(_.split(","))val wordKVRDD: RDD[(String,Int)]= wordRDD.map((_,1))val countRDD: RDD[(String,Int)]= wordKVRDD.reduceByKey(_ + _)

    countRDD.foreach(println)/**
     *  将项目打包放到spark集群中使用standalone模式运行
     * standalone client
     * spark-submit --class com.shujia.core.Demo17SparkStandaloneSubmit --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 spark-1.0.jar 100
     *
     * standalone cluster
     * spark-submit --class com.shujia.core.Demo17SparkStandaloneSubmit --master spark://master:7077 --executor-memory 512m --total-executor-cores 1 --deploy-mode cluster spark-1.0.jar 100
     *
     */}}
18.Demo18SparkYarnSubmit
packagecom.shujia.coreimportorg.apache.hadoop.conf.Configuration
importorg.apache.hadoop.fs.{FileSystem, Path}importorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}/**
 *  因为是提交到yarn上,可以对hdfs上的数据进行读写
 */object Demo18SparkYarnSubmit {def main(args: Array[String]):Unit={val conf =new SparkConf()/**
     *  提交到yarn上运行,这个参数依旧不用设置
     *///    conf.setMaster("local")
    conf.setAppName("yarn submit")val context =new SparkContext(conf)//读取hdfs上数据val linesRDD: RDD[String]= context.textFile("/bigdata29/data/students.csv")
    println("="*100)
    println(s"分区数为:${linesRDD.getNumPartitions}")
    println("="*100)val classKVRDD: RDD[(String,Int)]= linesRDD.map((line:String)=>{val clazz:String= line.split(",")(4)(clazz,1)})//统计班级人数val clazzNumRDD: RDD[(String,Int)]= classKVRDD.reduceByKey(_ + _)//整理一下要写到结果文件中的数据格式val resRDD: RDD[String]= clazzNumRDD.map((kv:(String,Int))=>s"${kv._1}\t${kv._2}")//删除已经存在的路径val hadoopConf =new Configuration()val fileSystem: FileSystem = FileSystem.get(hadoopConf)//判断路径是否存在if(fileSystem.exists(new Path("/bigdata29/sparkout1"))){
      fileSystem.delete(new Path("/bigdata29/sparkout1"),true)}//将RDD中的数据保存到HDFS上的文件中
    resRDD.saveAsTextFile("/bigdata29/sparkout1")/**
     * spark-submit --class com.shujia.core.Demo18SparkYarnSubmit --master yarn --deploy-mode client  spark-1.0.jar
     */}}
19.Demo19PI
packagecom.shujia.coreimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}importscala.util.Random

object Demo19PI {def main(args: Array[String]):Unit={val conf =new SparkConf()/**
     * 提交到yarn上运行,这个参数依旧不用设置
     *///    conf.setMaster("local")
    conf.setAppName("yarn submit")val context =new SparkContext(conf)//设置生成点的个数 10000val list: Range.Inclusive =0 to 1000000000//将scala的序列集合变成rddval rangeRDD: RDD[Int]= context.parallelize(list)//随机生成正方形内的点val dianRDD: RDD[(Double,Double)]= rangeRDD.map((i:Int)=>{val x:Double= Random.nextDouble()*2-1val y:Double= Random.nextDouble()*2-1(x, y)})//    println(dianRDD.count())//取出圆中点的个数val yuanZuoRDD: RDD[(Double,Double)]= dianRDD.filter {case(x:Double, y:Double)=>
        x * x + y * y <1}//    println(yuanZuoRDD.count())//计算PI
    println("="*100)
    println(s"PI的值为:${(yuanZuoRDD.count().toDouble / dianRDD.count())*4}")
    println("="*100)/**
     * spark-submit --class com.shujia.core.Demo19PI --master yarn --deploy-mode client  spark-1.0.jar
     */}}
20.Demo20Accumulator
packagecom.shujia.coreimportorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.rdd.RDD
importorg.apache.spark.util.LongAccumulator

object Demo20Accumulator {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("map算子演示")val context =new SparkContext(conf)//====================================================val studentRDD: RDD[String]= context.textFile("spark/data/students.csv")val scoreRDD: RDD[String]= context.textFile("spark/data/score.txt")//    var count = 0//    studentRDD.foreach((line:String)=>{//      count+=1//      println("-------------------------")//      println(count)//      println("-------------------------")//    })//    println(s"count的值为:${count}")/**
     * 累加器
     *
     * 由SparkContext来创建
     * 注意:
     *  1、因为累加器的执行实在RDD中执行的,而RDD是在Executor中执行的,而要想在Executor中执行就得有一个action算子触发任务调度
     *  2、sparkRDD中无法使用其他的RDD
     *  3、SparkContext无法在RDD内部使用,因为SparkContext对象无法进行序列化,不能够通过网络发送到Executor中
     *///    val accumulator: LongAccumulator = context.longAccumulator//    studentRDD.foreach((line:String)=>{//      accumulator.add(1)//    })//    studentRDD.map((line:String)=>{//      accumulator.add(1)//    }).collect()//    println(s"accumulator的值为:${accumulator.value}")//    val value: RDD[RDD[(String, String)]] = studentRDD.map((stuLine: String) => {//      scoreRDD.map((scoreLine: String) => {//        val strings: Array[String] = scoreLine.split(",")//        val strings1: Array[String] = stuLine.split(",")//        val str1: String = strings.mkString("|")//        val str2: String = strings1.mkString("|")//        (str1, str2)//      })//    })//    value.foreach(println)//    val value: RDD[RDD[String]] = studentRDD.map((stuLine: String) => {//      val scoreRDD: RDD[String] = context.textFile("spark/data/score.txt")//      scoreRDD//    })//    value.foreach(println)}}
21.Demo21Broadcast
packagecom.shujia.coreimportorg.apache.spark.broadcast.Broadcast
importorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}importscala.io.Source

object Demo21Broadcast {def main(args: Array[String]):Unit={val conf =new SparkConf()
    conf.setMaster("local")
    conf.setAppName("广播变量演示")val context =new SparkContext(conf)//====================================================//使用Scala的方式读取学生数据文件,将其转换以学号作为键的map集合,属于在Driver端的一个变量val studentsMap: Map[String,String]= Source.fromFile("spark/data/students.csv").getLines().toList
      .map((line:String)=>{val infos: Array[String]= line.split(",")val stuInfo:String= infos.mkString(",")(infos(0), stuInfo)}).toMap

    val scoresRDD: RDD[String]= context.textFile("spark/data/score.txt")/**
     * 将studentsMap变成一个广播变量,让每一个将来需要执行关联的Executor中都有一份studentsMap数据
     * 避免了每次Task任务拉取都要附带一个副本,拉取的速度变快了,执行速度也就变快了
     *
     * 广播大变量
     */val studentsMapBroadcast: Broadcast[Map[String,String]]= context.broadcast(studentsMap)/**
     * 将Spark读取的分数RDD与外部变量学生Map集合进行关联
     * 循环遍历scoresRDD,将学号一样的学生信息关联起来
     *///    val resMapRDD: RDD[(String, String)] = scoresRDD.map((score: String) => {//      val id: String = score.split(",")(0)//      //使用学号到学生map集合中获取学生信息//      val studentInfo: String = studentsMap.getOrElse(id, "无学生信息")//      (score, studentInfo)//    })//    resMapRDD.foreach(println)/**
     * 使用广播变量进行关联
     */val resMapRDD: RDD[(String,String)]= scoresRDD.map((score:String)=>{val id:String= score.split(",")(0)val stuMap: Map[String,String]= studentsMapBroadcast.value
      //使用学号到学生map集合中获取学生信息val studentInfo:String= stuMap.getOrElse(id,"无学生信息")(score, studentInfo)})
    resMapRDD.foreach(println)}}

2.Spark-sql

Demo1WordCount
packagecom.shujia.sqlimportorg.apache.spark.sql.{DataFrame, Dataset, Row, SaveMode, SparkSession}object Demo1WordCount {def main(args: Array[String]):Unit={/**
     * 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
     */val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/**
     * spark sql和spark core的核心数据类型不太一样
     *
     * 1、读取数据构建一个DataFrame,相当于一张表
     */val linesDF: DataFrame = sparkSession.read
      .format("csv")//指定读取数据的格式.schema("line STRING")//指定列的名和列的类型,多个列之间使用,分割.option("sep","\n")//指定分割符,csv格式读取默认是英文逗号.load("spark/data/words.txt")// 指定要读取数据的位置,可以使用相对路径//    println(linesDF)//    linesDF.show() //查看DF中的数据内容(表内容)//    linesDF.printSchema() //查看DF表结构/**
     * 2、DF本身是无法直接在上面写sql的,需要将DF注册成一个视图,才可以写sql数据分析
     */
    linesDF.createOrReplaceTempView("lines")// 起一个表名,后面的sql语句可以做查询分析/**
     * 3、可以编写sql语句 (统计单词的数量)
     * spark sql是完全兼容hive sql
     */val resDF: DataFrame = sparkSession.sql("""
        |select
        |t1.word as word,
        |count(1) as counts
        |from
        |(select
        | explode(split(line,'\\|')) as word from lines) t1
        | group by t1.word
        |""".stripMargin)/**
     * 4、将计算的结果DF保存到HDFS上
     */val resDS: Dataset[Row]= resDF.repartition(1)
    resDS.write
      .format("csv")//指定输出数据文件格式.option("sep","\t")// 指定列之间的分隔符.mode(SaveMode.Overwrite)// 使用SaveMode枚举类,设置为覆盖写.save("spark/data/sqlout1")// 指定输出的文件夹}}
Demo2DSLWordCount
packagecom.shujia.sqlimportorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo2DSLWordCount {def main(args: Array[String]):Unit={/**
     * 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
     */val sparkSession: SparkSession = SparkSession.builder().master("local").appName("wc spark sql").getOrCreate()/**
     * spark sql和spark core的核心数据类型不太一样
     *
     * 1、读取数据构建一个DataFrame,相当于一张表
     */val linesDF: DataFrame = sparkSession.read
      .format("csv")//指定读取数据的格式.schema("line STRING")//指定列的名和列的类型,多个列之间使用,分割.option("sep","\n")//指定分割符,csv格式读取默认是英文逗号.load("spark/data/words.txt")// 指定要读取数据的位置,可以使用相对路径/**
     * DSL: 类SQL语法 api  介于代码和纯sql之间的一种api
     *
     * spark在DSL语法api中,将纯sql中的函数都使用了隐式转换变成一个scala中的函数
     * 如果想要在DSL语法中使用这些函数,需要导入隐式转换
     *
     *///导入Spark sql中所有的sql隐式转换函数importorg.apache.spark.sql.functions._
    //导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理importsparkSession.implicits._

//    linesDF.select(explode(split($"line","\\|")) as "word")//      .groupBy($"word")//      .count().show()val resultDF: DataFrame = linesDF.select(explode(split($"line","\\|")) as "word").groupBy($"word").agg(count($"word") as "counts")/**
     * 保存数据
     */
    resultDF
      .repartition(1).write
      .format("csv").option("sep","\t").mode(SaveMode.Overwrite).save("spark/data/sqlout2")}}
Demo3DSLAPI
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.Window
importorg.apache.spark.sql.{DataFrame, SparkSession}object Demo3DSLAPI {def main(args: Array[String]):Unit={/**
     * 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
     */val sparkSession: SparkSession = SparkSession.builder().master("local").appName("dsl语法api演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数importorg.apache.spark.sql.functions._
    //导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理importsparkSession.implicits._

    /**
     * DSL api
     *///新版本的读取方式,读取一个json数据,不需要手动指定列名//    val stuDF1: DataFrame = sparkSession.read//      .json("spark/data/students.json")//以前版本读取方式,灵活度要高一些val stuDF: DataFrame = sparkSession.read
      .format("json").load("spark/data/students.json")//    stuDF2.show(100, truncate = false) //传入展示总条数,并完全显示数据内容/**
     * select 函数:选择数据【字段】,和纯sql语句中select意思基本是一样,在数据的前提上选择要留下的列
     *
     *///根据字段的名字选择要查询的字段//    stuDF.select("id","name","age").show(1)//    //根据字段的名字选择要查询的字段,selectExpr 可以传入表达式字符串形式//    stuDF.selectExpr("id","name","age","age + 1 as new_age").show()//使用隐式转换中的$函数将字段变成一个对象//    stuDF.select($"id",$"name",$"age").show(10)//使用对象做处理//    stuDF.select($"id",$"name",$"age" + 1  as "new_age").show(10)//可以在select中使用sql的函数//下面的操作等同于sql:select id,name,age+1 as new_age,substring(clazz,0,2) as km from lines;//    stuDF.select($"id",$"name",$"age" + 1  as "new_age",substring($"clazz",0,2) as "km").show(10)/**
     * where 函数:过滤数据
     *///直接将sql中where语句以字符串的形式传参//    stuDF.where("gender='女' and age=23").show()//使用$列对象的形式过滤// =!= 不等于// === 等于//    stuDF.where($"gender" === "女" and $"age" === 23).show()//    stuDF.where($"gender" =!= "男" and $"age" === 23).show()//过滤文科的学生//    stuDF.where(substring($"clazz", 0, 2) === "文科").show()/**
     * groupBy 分组函数
     * agg 聚合函数
     * 分组聚合要在一起使用
     * 分组聚合之后的结果DF中只会包含分组字段和聚合字段
     * select中无法出现不是分组的字段
     *///根据班级分组,求每个班级的人数和平均年龄//    stuDF.groupBy($"clazz")//      .agg(count($"clazz") as "number",round(avg($"age"),2) as "avg_age").show()/**
     * orderBy: 排序
     *///    stuDF.groupBy($"clazz")//          .agg(count($"clazz") as "number")//          .orderBy($"number").show()/**
     * join: 表关联
     */val scoreDF: DataFrame = sparkSession.read
      .format("csv").option("sep",",").schema("id STRING,subject_id STRING,score INT").load("spark/data/score.txt")//    scoreDF.show()//关联场景1:所关联的字段名字不一样的时候//    stuDF.join(scoreDF, $"id" === $"sid", "inner").show()//关联场景2:所关联的字段名字一样的时候//    stuDF.join(scoreDF,"id").show()/**
     * 开窗函数
     * 统计每个班级总分前3的学生
     *
     * 开窗不会改变总条数的,会以新增一列的形式加上开窗的结果
     * withColumn 新增一列
     */val joinStuAndScoreWithIDDF: DataFrame = stuDF.join(scoreDF,"id")
    joinStuAndScoreWithIDDF.groupBy($"id", $"clazz")//根据学号和班级一起分组.agg(sum($"score") as "sumScore")//计算总分.withColumn("rn", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc))//.select($"id", $"clazz", $"sumScore", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc) as "rn").where($"rn"<=3).show()}}
Demo4DataSourceAPI
packagecom.shujia.sqlimportorg.apache.spark.sql.{DataFrame, SparkSession}object Demo4DataSourceAPI {def main(args: Array[String]):Unit={/**
     * 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
     */val sparkSession: SparkSession = SparkSession.builder().master("local").appName("dsl语法api演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理/**
     * 读取csv格式的数据,默认是以英文逗号分割的
     *
     *///    val stuCsvDF: DataFrame = sparkSession.read//      .format("csv")//      .schema("id STRING,name STRING,age INT,gender STRING,clazz STRING")//      .option("sep", ",")//      .load("spark/data/students.csv")//    //求每个班级的人数,保存到文件中//    stuCsvDF.groupBy($"clazz")//      .agg(count($"clazz") as "number")//      .write//      .format("csv")//      .option("sep",",")//      .mode(SaveMode.Overwrite)//      .save("spark/data/souceout1")/**
     * 读取json数据格式,因为json数据有键值对。会自动地将键作为列名,值作为列值,不需要手动设置表结构
     */val stuJsonDF: DataFrame = sparkSession.read
      .format("json").load("spark/data/students2.json")//    //统计每个性别的人数//    stuJsonDF.groupBy($"gender")//      .agg(count($"gender") as "number")//      .write//      .format("json")//      .mode(SaveMode.Overwrite)//      .save("spark/data/jsonout1")/**
     *  parquet
     *  压缩的比例是由【信息熵】来决定的
     *///    stuJsonDF.write//      .format("parquet")//      .mode(SaveMode.Overwrite)//      .save("spark/data/parquetout2")//读取parquet格式文件的时候,也是不需要手动指定表结构//    val stuParquetDF: DataFrame = sparkSession.read//      .format("parquet")//      .load("spark/data/parquetout1/part-00000-c5917bb6-172b-49bd-a90c-90b7f09b69d6-c000.snappy.parquet")//    stuParquetDF.show()/**
     * 读取数据库中的数据,mysql
     *
     */val jdDF: DataFrame = sparkSession.read
      .format("jdbc").option("url","jdbc:mysql://192.168.220.100:3306").option("dbtable","bigdata29.jd_goods").option("user","root").option("password","123456").load()

    jdDF.show(10,truncate =false)}}
Demo5RDDToDF
packagecom.shujia.sqlimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, Row, SparkSession}object Demo5RDDToDF {def main(args: Array[String]):Unit={/**
     * 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
     */val sparkSession: SparkSession = SparkSession.builder().master("local").appName("RDD和DF互相转换演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数//导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理,如果需要做RDD和DF之间的转换importsparkSession.implicits._

    /**
     * 使用SparkContext读取数据封装成RDD
     *
     * SparkSession包含了SparkContext
     *///使用SparkSession获取SparkContextval sc: SparkContext = sparkSession.sparkContext
    val linesRDD: RDD[String]= sc.textFile("spark/data/students.csv")val studentsRDD: RDD[(String,String,Int,String,String)]= linesRDD.map((line:String)=> line.split(",")).map {//1500100001,施笑槐,22,女,文科六班case Array(id:String, name:String, age:String, gender:String, clazz:String)=>(id, name, age.toInt, gender, clazz)}/**
     * RDD转DF
     */val studentsDF: DataFrame = studentsRDD.toDF("id","name","age","gender","clazz")
    studentsDF.createOrReplaceTempView("students")val resultDF: DataFrame = sparkSession.sql("""
        |select
        |clazz,
        |count(1) as number
        |from
        |students
        |group by clazz
        |""".stripMargin)/**
     * 在Row的数据类型中 所有整数类型统一为Long  小数类型统一为Double
     * 转RDD
     */val studentsRDD2: RDD[Row]= resultDF.rdd
//    studentsRDD2.map((row:Row)=>{//      val clazz: String = row.getAs[String]("clazz")//      val number: Long = row.getAs[Long]("number")//      (clazz,number)//    }).foreach(println)

    studentsRDD2.map{case Row(clazz:String,number:Long)=>(clazz,number)}.foreach(println)}}
Demo6Window
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.Window
importorg.apache.spark.sql.{DataFrame, SparkSession}/**
 * 开窗函数
 * 聚合开窗函数:sum  count avg min max
 * 排序开窗函数:row_number rank desen_rank lag(向上取) lead(向后取)
 */object Demo6Window {def main(args: Array[String]):Unit={//创建SparkSession对象/**
     * 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
     */val sparkSession: SparkSession = SparkSession.builder().master("local").appName("开窗函数DSL API演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数importorg.apache.spark.sql.functions._
    //导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理,如果需要做RDD和DF之间的转换importsparkSession.implicits._

    //学生表val studentsDF: DataFrame = sparkSession.read
      .format("csv").option("sep",",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING").load("spark/data/students.csv")//成绩表val scoresDF: DataFrame = sparkSession.read
      .format("csv").option("sep",",").schema("sid STRING,subject_id STRING,score INT").load("spark/data/score.txt")//科目表val subjectDF: DataFrame = sparkSession.read
      .format("csv").option("sep",",").schema("subject_id STRING,subject_name STRING,subject_sum_score INT").load("spark/data/subject.csv")//将学生数据与成绩数据进行关联val joinDF: DataFrame = studentsDF.join(scoresDF, $"id"=== $"sid")//    joinDF.show(10)/**
     * 1、统计总分年级排名前十学生各科的分数
     *
     * 未排序之前,是将开窗中所有数据一起聚合得到一个结果
     * 若排序了,依次从上到下聚合得到一个结果
     *
     */
    joinDF
      // sum(score) over(partition by id ) as sumScore.withColumn("sumScore", sum($"score") over Window.partitionBy($"id")).orderBy($"sumScore".desc).limit(60)//.show(60)/**
     * 3、统计每科都及格的学生
     */
    scoresDF
      .join(subjectDF,"subject_id").where($"score">= $"subject_sum_score"*0.6)//统计学生及格的科目数.withColumn("jiGeCounts", count($"sid") over Window.partitionBy($"sid")).where($"jiGeCounts"===6)//      .show(100)/**
     * 2、统计总分大于年级平均分的学生
     */
    joinDF
      //计算每个学生的总分,新增一列.withColumn("sumScore", sum($"score") over Window.partitionBy($"id")).withColumn("avgScore", avg($"sumScore") over Window.partitionBy(substring($"clazz",0,2))).where($"sumScore"> $"avgScore")//      .show(200)/**
     * 统计每个班级的每个名次之间的分数差
     *
     */
    joinDF
      .groupBy($"id", $"clazz").agg(sum($"score") as "sumScore")//开窗,班级开窗,总分降序排序,排个名次.withColumn("rn", row_number() over Window.partitionBy($"clazz").orderBy($"sumScore".desc))//开窗,取出前一名的总分.withColumn("front_score", lag($"sumScore",1,750) over Window.partitionBy($"clazz").orderBy($"sumScore".desc)).withColumn("cha",$"front_score"- $"sumScore").show(100)}}
Demo7BurksTest1
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.Window
importorg.apache.spark.sql.{Column, DataFrame, SparkSession}object Demo7BurksTest1 {def main(args: Array[String]):Unit={//创建SparkSession对象/**
     * 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
     */val sparkSession: SparkSession = SparkSession.builder().master("local").appName("公司营收额数据需求演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数importorg.apache.spark.sql.functions._
    //导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理,如果需要做RDD和DF之间的转换importsparkSession.implicits._

    //读取数据val burksDF: DataFrame = sparkSession.read
      .format("csv").option("sep",",").schema("burk STRING,year STRING"+",tsl01 DOUBLE,tsl02 DOUBLE,tsl03 DOUBLE,tsl04 DOUBLE"+",tsl05 DOUBLE,tsl06 DOUBLE,tsl07 DOUBLE,tsl08 DOUBLE"+",tsl09 DOUBLE,tsl10 DOUBLE,tsl11 DOUBLE,tsl12 DOUBLE").load("spark/data/burks.txt")/**
     * 1、统计每个公司每年按月累计收入  行转列 --> sum窗口函数
     * 输出结果
     * 公司代码,年度,月份,当月收入,累计收入
     *
     *///纯sql的方式实现//    burksDF.createOrReplaceTempView("burks")//    sparkSession.sql(//      """//        |select//        |t1.burk as burk,//        |t1.year as year,//        |t1.month as month,//        |t1.tsl as tsl,//        |sum(t1.tsl) over(partition by t1.burk,t1.year order by t1.month) as leijia//        |from//        |(select//        |  burk,//        |  year,//        |  month,//        |  tsl//        | from//        |  burks//        |  lateral view explode(map(1,tsl01,2,tsl02,3,tsl03,4,tsl04,5,tsl05,6,tsl06,7,tsl07,8,tsl08,9,tsl09,10,tsl10,11,tsl11,12,tsl12)) T as month,tsl//        |) t1//        |""".stripMargin).show()// 使用DSL API实现val m: Column = map(
      expr("1"), $"tsl01",
      expr("2"), $"tsl02",
      expr("3"), $"tsl03",
      expr("4"), $"tsl04",
      expr("5"), $"tsl05",
      expr("6"), $"tsl06",
      expr("7"), $"tsl07",
      expr("8"), $"tsl08",
      expr("9"), $"tsl09",
      expr("10"), $"tsl10",
      expr("11"), $"tsl11",
      expr("12"), $"tsl12")

    burksDF
      .select($"burk",$"year",explode(m) as Array("month","tsl"))//按月累计.withColumn("leijia",sum($"tsl") over Window.partitionBy($"burk",$"year").orderBy($"month")).show()/**
     * 2、统计每个公司当月比上年同期增长率  行转列 --> lag窗口函数
     * 公司代码,年度,月度,增长率(当月收入/上年当月收入 - 1)
     */}}
Demo8SubmitYarn
packagecom.shujia.sqlimportorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession}object Demo8SubmitYarn {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").config("spark.sql.shuffer.partitions",1).getOrCreate()importsparkSession.implicits._
    importorg.apache.spark.sql.functions._

    val stuendsDF: DataFrame = sparkSession.read.format("csv").option("sep",",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING").load("/bigdata29/spark_in/data/student")val genderCountsDF: DataFrame = stuendsDF.groupBy($"gender").agg(count($"gender") as "counts")
    genderCountsDF.write.format("csv").option("sep",",").mode(SaveMode.Overwrite).save("/bigdata29/spark_out/out2")}}
Demo9SparkOnHive
packagecom.shujia.sqlimportorg.apache.spark.sql.SparkSession

object Demo9SparkOnHive {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("spark读取hive数据").enableHiveSupport().config("spark.sql.shuffle.partitions",1).getOrCreate()importsparkSession.implicits._
    importorg.apache.spark.sql.functions._
    sparkSession.sql("use bigdata29")

    sparkSession.sql("select clazz,count(1) as counts from students group by clazz").show()}}
Demo10Student
packagecom.shujia.sqlimportorg.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}/**
 * 1、行列转换
 *
 * 表1
 * 姓名,科目,分数
 * name,item,score
 * 张三,数学,33
 * 张三,英语,77
 * 李四,数学,66
 * 李四,英语,78
 *
 *
 * 表2
 * 姓名,数学,英语
 * name,math,english
 * 张三,33,77
 * 李四,66,78
 *
 * 1、将表1转化成表2
 * 2、将表2转化成表1
 */object Demo10Student {def main(args: Array[String]):Unit={//创建SparkSession对象/**
     * 在新版本的spark中,如果想要编写spark sql的话,需要使用新的spark入口类:SparkSession
     */val sparkSession: SparkSession = SparkSession.builder().master("local").appName("行列互相转换演示").config("spark.sql.shuffle.partitions",1)//默认分区的数量是200个.getOrCreate()//导入Spark sql中所有的sql隐式转换函数importorg.apache.spark.sql.functions._
    //导入另一个隐式转换,后面可以直接使用$函数引用字段进行处理,如果需要做RDD和DF之间的转换importsparkSession.implicits._

    /**
     * 列转行
     */val tb1DF: DataFrame = sparkSession.read
      .format("csv").option("sep",",").schema("name STRING,item STRING,score INT").load("/bigdata29/tb1.txt")/*
    张三,数学,33
    张三,英语,77
    李四,数学,66
    李四,英语,78

    张三,33,77
    李四,66,78
     *///    val res1DF: DataFrame = tb1DF.groupBy($"name")//      .agg(//        sum(when($"item" === "数学", $"score").otherwise(0)) as "math",//        sum(when($"item" === "英语", $"score").otherwise(0)) as "english")//    res1DF.write//      .format("csv")//      .option("sep",",")//      .mode(SaveMode.Overwrite)//      .save("/bigdata29/out7")//    res1DF.show()/**
     * 行转列
     *///    val tb2DF: DataFrame = sparkSession.read//      .format("csv")//      .option("sep", ",")//      .schema("name STRING,math STRING,english INT")//      .load("/bigdata29/out7/*")////    val m: Column = map(//      expr("'数学'"), $"math",//      expr("'语文'"), $"english"//    )//    tb2DF.select($"name",explode(m) as Array("item","score")).show()}}
Demo11UDF
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.UserDefinedFunction
importorg.apache.spark.sql.{DataFrame, SparkSession}object Demo11UDF {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("udf函数演示").config("spark.sql.shuffle.partitions",1).getOrCreate()importorg.apache.spark.sql.functions._
    /**
     * 1、在使用DSL的时候使用自定义函数
     */val studentsDF: DataFrame = sparkSession.read
      .format("csv").option("sep",",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING").load("spark/data/students.csv")//    studentsDF.show()//编写自定义函数//udf中编写的是scala代码val shujia_fun1: UserDefinedFunction = udf((str:String)=>"数加:"+ str)//    studentsDF.select(shujia_fun1($"clazz")).show()/**
     * 1、使用SQL语句中使用自定函数
     */
    studentsDF.createOrReplaceTempView("students")//将自定义的函数变量注册成一个函数
    sparkSession.udf.register("shujia_str",shujia_fun1)
    sparkSession.sql("""
        |select clazz,shujia_str(clazz) as new_clazz from students
        |""".stripMargin).show()}}
Demo12ShuJiaStr
packagecom.shujia.sqlimportorg.apache.hadoop.hive.ql.exec.UDF

class Demo12ShuJiaStr extends UDF {def evaluate(str:String):String={"shujia: "+ str
  }}/**
 * 1、将类打包,放在linux中spark的jars目录下
 * 2、进入spark-sql的客户端
 * 3、使用上传的jar中的udf类来创建一个函数
 * create function shujia_str as 'com.shujia.sql.Demo12ShuJiaStr';
 */
Demo13SheBao
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.Window
importorg.apache.spark.sql.{DataFrame, SparkSession}object Demo13SheBao {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("作业社保演示").config("spark.sql.shuffle.partitions",1).getOrCreate()importorg.apache.spark.sql.functions._
    importsparkSession.implicits._

    //读取数据val sheBaoDF: DataFrame = sparkSession.read
      .format("csv").option("sep",",").schema("id STRING,burk STRING,sdate STRING").load("spark/data/shebao.txt")//统计每个员工的工作经历
    sheBaoDF
      //取出员工上一个月所在公司.withColumn("last_burk", lag($"burk",1) over Window.partitionBy($"id").orderBy($"sdate"))//.show()//在每一行后新增一列,标记列,如果换工作了,标记1 否则标记0.withColumn("flag", when($"burk"=== $"last_burk",0).otherwise(1))//.show()//以用户开窗,将后面的flag值加起来.withColumn("tmp",sum($"flag") over Window.partitionBy($"id").orderBy($"sdate"))//.show().groupBy($"id",$"burk",$"tmp").agg(min($"sdate") as "start_date",max($"sdate") as "end_start").show(100)}}
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.Window
importorg.apache.spark.sql.{DataFrame, Row, SparkSession}object Demo14MaYi {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("作业社保演示").config("spark.sql.shuffle.partitions",1).getOrCreate()importorg.apache.spark.sql.functions._
    importsparkSession.implicits._

    //用户碳排放量表val ant_user_low_carbon: DataFrame = sparkSession.read
      .format("csv").option("sep","\t").schema("user_id STRING,data_dt STRING,low_carbon DOUBLE").load("spark/data/ant_user_low_carbon.txt")val ant_plant_carbon: DataFrame = sparkSession.read
      .format("csv").option("sep","\t").schema("plant_id STRING,plant_name STRING,plant_carbon DOUBLE").load("spark/data/ant_plant_carbon.txt")//        ant_user_low_carbon.show()//        ant_plant_carbon.show()/**
     * 蚂蚁森林植物申领统计
     * 问题:假设2017年1月1日开始记录低碳数据(user_low_carbon),假设2017年10月1日之前满足申领条件的用户都申领了一颗p004-胡杨,
     * 剩余的能量全部用来领取“p002-沙柳” 。
     * 统计在10月1日累计申领“p002-沙柳” 排名前10的用户信息;以及他比后一名多领了几颗沙柳。
     * 得到的统计结果如下表样式:
     *///因为用户能量表与植物能量表没有关联字段,如果想要在sql中使用胡杨以及沙柳的能量的话,需要先单独从植物能量表将胡杨以及沙柳的能量取出来,使用变量接收val huYangCarbon:Double= ant_plant_carbon
      //过滤条件是胡杨.where($"plant_name"==="胡杨")//选择能量的一列.select($"plant_carbon")//转rdd.rdd
      //转scala数组.collect().map {case Row(plant_carbon:Double)=> plant_carbon
      }.head

    val shaLiuCarbon:Double= ant_plant_carbon
      //过滤条件是胡杨.where($"plant_name"==="沙柳")//选择能量的一列.select($"plant_carbon")//转rdd.rdd
      //转scala数组.collect().map {case Row(plant_carbon:Double)=> plant_carbon
      }.head

    println(s"胡杨所需的碳排放量:${huYangCarbon},沙柳所需要的碳排放量:${shaLiuCarbon}")
    println("---------------------------------------------------------------------------")
    ant_user_low_carbon
      //取出2017年1月1日到2017年10月1日.where($"data_dt">="2017/1/1" and $"data_dt"<="2017/10/1")//按照用户分组,求用户的总的排放量.groupBy($"user_id").agg(sum($"low_carbon") as "sum_low_carbon")//判断能量是否满足一个胡杨的能量,如果满足直接减去胡杨的能量,得到剩余的能量.withColumn("shengYu_carbon", when($"sum_low_carbon">= huYangCarbon, $"sum_low_carbon"- huYangCarbon).otherwise($"sum_low_carbon"))//计算剩余的能量可以领取多少个沙柳.withColumn("number", floor($"shengYu_carbon"/ shaLiuCarbon))//.show()//获取后一名用户的领取沙柳的个数.withColumn("lead_number", lead($"number",1,0) over Window.orderBy($"number".desc))//.show().withColumn("duo", $"number"- $"lead_number").limit(10)//.show()/**
     * 蚂蚁森林低碳用户排名分析
     * 问题:查询user_low_carbon表中每日流水记录,条件为:
     * 用户在2017年,连续三天(或以上)的天数里,
     * 每天减少碳排放(low_carbon)都超过100g的用户低碳流水。
     * 需要查询返回满足以上条件的user_low_carbon表中的记录流水。
     * 例如用户u_002符合条件的记录如下,因为2017/1/2~2017/1/5连续四天的碳排放量之和都大于等于100g:
     *///("user_id STRING,data_dt STRING,low_carbon DOUBLE")
    ant_user_low_carbon
      //过滤出2017年的数据.where(substring($"data_dt",0,4)==="2017")//计算用户每天积攒的能量 用户id和日期一起分组.groupBy($"user_id", $"data_dt").agg(sum($"low_carbon") as "sum_low_carbon")//过滤掉能量小于等于100的.where($"sum_low_carbon">100)//开窗有,以用户开窗,以日期升序排序,编号 row_number.withColumn("rn", row_number() over Window.partitionBy($"user_id").orderBy($"data_dt"))//使用日期减去排序.withColumn("flag_dt", date_sub(regexp_replace($"data_dt","/","-"), $"rn"))//计算连续的天数.withColumn("lianxu_days", count($"data_dt") over Window.partitionBy($"user_id", $"flag_dt"))//过滤出连续3天以上的数据.where($"lianxu_days">=3)//关联流水记录表,得到每个符合条件的数据.join(ant_user_low_carbon,List("user_id","data_dt")).select($"user_id",$"data_dt",$"low_carbon").show(100)}}
Test
packagecom.shujia.sqlimportorg.apache.spark.sql.expressions.UserDefinedFunction
importorg.apache.spark.sql.{DataFrame, SparkSession}object Test {def main(args: Array[String]):Unit={//1、创建Spark sql环境val spark: SparkSession = SparkSession
      .builder().master("local[2]").appName("sql").config("spark.sql.shuffle.partitions",1)//默认在集群中时200个.getOrCreate()importorg.apache.spark.sql.functions._

    //定义UDFval str_split: UserDefinedFunction = udf((line:String)=>{"数加:"+line
    })//    val value2: Broadcast[UserDefinedFunction] = spark.sparkContext.broadcast(str_split)//    spark.udf.register("str_split", str_split)/**
     * 1、在使用DSL的时候使用自定义函数
     */val studentsDF: DataFrame = spark.read
      .format("csv").option("sep",",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING").load("spark/data/students.csv")//在使用DSL时,使用自定义函数//    studentsDF.select(str_split($"clazz")).show()
    studentsDF.createOrReplaceTempView("lines")

    spark.udf.register("str_split",(line:String)=>"数加:"+line)

    spark.sql("""
        |select str_split(clazz) from lines
        |""".stripMargin).show()/**
     * 在 sql中使用自定义函数
     *///    studentsDF.createOrReplaceTempView("lines")////    //注册自定义函数//    spark.udf.register("str_split", (line: String) => "数加:"+line)////    spark.sql(//      """//        |select * from//        |lines//        |lateral view explode(str_split(line,',')) T as word//        |//        |""".stripMargin).show()}}

3.Spark-sql

Demo1WordCount
packagecom.shujia.streamingimportorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo1WordCount {def main(args: Array[String]):Unit={/**
     * spark core: SparkContext  --> 核心数据类型 RDD
     * spark sql: SparkSession --> 核心数据类型 DataFrame
     * spark streaming: StreamingContext --> 核心数据类型 DStream(RDD)
     *
     * 创建SparkStreaming的环境需要使用StreamingContext对象
     */val conf =new SparkConf()
    conf.setMaster("local[2]")
    conf.setAppName("spark streaming 单词统计案例")val sparkContext =new SparkContext(conf)val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))val lineDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)val flatMapDS: DStream[String]= lineDS.flatMap(_.split(" "))val wordsKVDS: DStream[(String,Int)]= flatMapDS.map((_,1))val resultDS: DStream[(String,Int)]= wordsKVDS.reduceByKey((x:Int, y:Int)=> x + y)
    resultDS.print()

    streamingContext.start()
    streamingContext.awaitTermination()
    streamingContext.stop()}}
Demo2UpdateStateByKey
packagecom.shujia.streamingimportorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo2UpdateStateByKey {def main(args: Array[String]):Unit={/**
     * spark core: SparkContext  --> 核心数据类型 RDD
     * spark sql: SparkSession --> 核心数据类型 DataFrame
     * spark streaming: StreamingContext --> 核心数据类型 DStream(RDD)
     *
     * 创建SparkStreaming的环境需要使用StreamingContext对象
     *///spark streaming依赖于spark core的环境//因为spark streaming核心数据类型底层封装的是RDDval conf =new SparkConf()
    conf.setMaster("local[2]")
    conf.setAppName("spark streaming 单词统计案例1")val sparkContext =new SparkContext(conf)// def this(sparkContext: SparkContext, batchDuration: Duration) //传入一个spark core 环境对象以及一个接收时间的范围val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))//设置checkpoint路径
    streamingContext.checkpoint("spark/data/stream_state_wc_checkpoint")/**
     * 因为sparkstreaming是一个近实时的计算引擎,整个程序写完运行的状态,一直运行,一直接收数据,除非异常或者手动停止
     *
     * 1、目前在spark使用nc工具监控一个端口号中源源不断产生的数据
     * 2、yum install -y nc 下载安装nc工具
     * 3、nc -lk xxxx
     */val lineDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)val flatMapDS: DStream[String]= lineDS.flatMap(_.split(" "))val wordsKVDS: DStream[(String,Int)]= flatMapDS.map((_,1))val res2DS: DStream[(String,Int)]= wordsKVDS.updateStateByKey((seq: Seq[Int], option: Option[Int])=>{val currentCount:Int= seq.sum
      val count:Int= option.getOrElse(0)
      Option(currentCount + count)})
    res2DS.print()

    streamingContext.start()
    streamingContext.awaitTermination()
    streamingContext.stop()}}
Demo3ReduceByKeyAndWindow
packagecom.shujia.streamingimportorg.apache.spark.sql.SparkSession
importorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo3ReduceByKeyAndWindow {def main(args: Array[String]):Unit={/**
     * 旧版本创建sparkContext的方式
     *///    val conf = new SparkConf()//    conf.setMaster("local[2]")//    conf.setAppName("spark streaming 单词统计案例1")//    val sparkContext = new SparkContext(conf)//    // def this(sparkContext: SparkContext, batchDuration: Duration) //传入一个spark core 环境对象以及一个接收时间的范围//    val streamingContext = new StreamingContext(sparkContext, Durations.seconds(5))/**
     * 新版本中推荐使用SparkSession对象来获取
     */val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("窗口演示").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext

    //创建SparkStreaming对象val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))val linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)//将数据封装成kv格式,将来可以调用kv类型的算子来进行操作val wordsKVDS: DStream[(String,Int)]= linesDS.flatMap(_.split(" ")).map((_,1))//1、如果只需要计算当前批次的数据,直接reduceByKey//2、如果要从最开始的数据计算,使用有状态算子,updateStateByKey和checkpoint配合使用//3、如果要计算最近的时间段内的数据,使用窗口类算子,reduceByKeyAndWindow/**
     * 滑动窗口
     *///    val res1DS: DStream[(String, Int)] = wordsKVDS.reduceByKeyAndWindow(//      (x: Int, y: Int) => x + y,//      Durations.seconds(15), // 设置窗口的大小//      Durations.seconds(5)) // 设置滑动的大小/**
     *  滚动窗口
     */val res1DS: DStream[(String,Int)]= wordsKVDS.reduceByKeyAndWindow((x:Int, y:Int)=> x + y,
      Durations.seconds(10),// 设置窗口的大小
      Durations.seconds(10))// 设置滑动的大小

    res1DS.print()

    streamingContext.start()
    streamingContext.awaitTermination()
    streamingContext.stop()}}
Demo4DStreamToRDD
packagecom.shujia.streamingimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, SparkSession}importorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo4DStreamToRDD {def main(args: Array[String]):Unit={/**
     * 创建SparkSession
     */val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("DS2RDD演示").config("spark.sql.shuffle.partitions",1).getOrCreate()importsparkSession.implicits._
    importorg.apache.spark.sql.functions._

    val sparkContext: SparkContext = sparkSession.sparkContext
    val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))/**
     * 通过spark streaming读取数据得到一个DS
     *///hello hello world java hello hadoopval linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)val new_linesDS: DStream[String]= linesDS.window(Durations.seconds(15), Durations.seconds(5))/**
     * DStream底层也是RDD,每隔一个批次将接收到的数据封装到RDD中
     * 每隔一个批次,接收到数据是不一样的
     */
    new_linesDS.foreachRDD((rdd: RDD[String])=>{
      println("===================================================")
      println("正在处理当前批次的数据.....")
      println("===================================================")//在这里面直接写处理rdd的代码//      rdd.flatMap(_.split(" "))//        .map((_, 1))//        .groupBy(_._1)//        .map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))//        .foreach(println)/**
       * 既然这里可以操作rdd, 又因为rdd可以转df, 那么我就可以在spark streaming中一个批次的数据可以使用使用sql语句来分析
       *///def toDF(colNames: String*)val linesDF: DataFrame = rdd.toDF("line")
      linesDF.createOrReplaceTempView("words")
      sparkSession.sql("""
          |select
          |t1.word as word,
          |count(1) as number
          |from
          |(select
          | explode(split(line,' ')) as word
          |from
          |words) t1
          |group by t1.word
          |""".stripMargin).show()})

    streamingContext.start()
    streamingContext.awaitTermination()
    streamingContext.stop()}}
Demo5RDDToDStream
packagecom.shujia.streamingimportorg.apache.spark.{SparkContext, rdd}importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, Row, SparkSession}importorg.apache.spark.streaming.{Durations, StreamingContext}importorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}object Demo5RDDToDStream {def main(args: Array[String]):Unit={/**
     * 创建SparkSession
     */val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("DS2RDD演示").config("spark.sql.shuffle.partitions",1).getOrCreate()importsparkSession.implicits._
    importorg.apache.spark.sql.functions._

    val sparkContext: SparkContext = sparkSession.sparkContext
    val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))/**
     * 通过spark streaming读取数据得到一个DS
     *///hello hello world java hello hadoopval linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)//in_rdd:hello hello world java hello hadoop//out_rdd://  hello 3//  world 1//  java 1//  hadoop 1val resDS: DStream[(String,Long)]= linesDS.transform((rdd: RDD[String])=>{/**
       * rdd处理之后会得到一个新的rdd进行返回
       *///      val resultRDD: RDD[(String, Int)] = rdd.flatMap(_.split(" "))//        .map((_, 1))//        .groupBy(_._1)//        .map((kv: (String, Iterable[(String, Int)])) => (kv._1, kv._2.size))//      resultRDD//def toDF(colNames: String*)val linesDF: DataFrame = rdd.toDF("line")
      linesDF.createOrReplaceTempView("words")val resRDD: RDD[(String,Long)]= sparkSession.sql("""
            |select
            |t1.word as word,
            |count(1) as number
            |from
            |(select
            | explode(split(line,' ')) as word
            |from
            |words) t1
            |group by t1.word
            |""".stripMargin).rdd
        .map((row: Row)=>{(row.getAs[String](0), row.getAs[Long](1))})
      resRDD
    })
    resDS.print()

    streamingContext.start()
    streamingContext.awaitTermination()
    streamingContext.stop()}}
Demo6Submit
packagecom.shujia.streamingimportorg.apache.spark.SparkContext
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.streaming.dstream.ReceiverInputDStream
importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo6Submit {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().appName("提交命令执行").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
    val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))val linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)

    linesDS
      .flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    streamingContext.start()
    streamingContext.awaitTermination()
    streamingContext.stop()}}
Demo7SaveToFile
packagecom.shujia.streamingimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.streaming.{Durations, StreamingContext}object Demo7SaveToFile {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("提交命令执行").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
    val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))val linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)val resultDS: DStream[(String,Int)]= linesDS
      .flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).transform((rdd:RDD[(String,Int)])=>{
        println("=======================")
        println("正在处理批次数据")
        rdd
      })//目标路径是一个文件夹,文件的名字系统生成的,自己可以指定后缀//每一批次计算结果都会生成一个结果文件,滚动生成的
    resultDS.saveAsTextFiles("spark/data/streams/stream","txt")

    streamingContext.start()
    streamingContext.awaitTermination()
    streamingContext.stop()}}
Demo8SaveToMysql
packagecom.shujia.streamingimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.streaming.{Durations, StreamingContext}importorg.apache.spark.streaming.dstream.ReceiverInputDStream

importjava.sql.{Connection, DriverManager, PreparedStatement}object Demo8SaveToMysql {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("提交命令执行").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
    val streamingContext =new StreamingContext(sparkContext, Durations.seconds(5))val linesDS: ReceiverInputDStream[String]= streamingContext.socketTextStream("master",12345)

    linesDS.foreachRDD((rdd:RDD[String])=>{
      println("------------正在处理一批数据-------------------")
      println(s"该批次的分区数:${rdd.getNumPartitions}")/**
       * foreachPartition 每一个分区处理一次逻辑
       */
      rdd.foreachPartition((itr:Iterator[String])=>{
        println("------------数加 防伪码-------------------")//创建与数据库的连接对象//1、注册驱动
        Class.forName("com.mysql.jdbc.Driver")//2、获取数据库连接对象val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=utf-8&useSSL=false","root","123456")//获取数据库操作对象val ps: PreparedStatement = conn.prepareStatement("insert into student values(?,?,?,?,?)")

        itr.foreach((line:String)=>{
          println("....正在处理一条数据....")//1500100019,娄曦之,24,男,理科三班val infos: Array[String]= line.split(",")val id:Int= infos(0).toInt
          val name:String= infos(1)val age:Int= infos(2).toInt
          val gender:String= infos(3)val clazz:String= infos(4)//给预编译对象传参
          ps.setInt(1,id)
          ps.setString(2,name)
          ps.setInt(3,age)
          ps.setString(4,gender)
          ps.setString(5,clazz)//执行sql语句
          ps.executeUpdate()})//释放资源
        ps.close()
        conn.close()
        println()})})

    streamingContext.start()
    streamingContext.awaitTermination()
    streamingContext.stop()}}
Demo9CardCondition
packagecom.shujia.streamingimportcom.alibaba.fastjson.{JSON, JSONObject}importcom.alibaba.fastjson.serializer.JSONObjectCodec
importorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}importorg.apache.spark.streaming.{Durations, StreamingContext}importjava.langimportjava.sql.{Connection, DriverManager, PreparedStatement}importjava.text.SimpleDateFormat
importjava.util.Date

object Demo9CardCondition {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local[2]").appName("提交命令执行").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext
    val sparkStreaming: StreamingContext =new StreamingContext(sparkContext, Durations.seconds(5))/**
     *
     * 需求:统计卡口流量的情况
     * 读取卡口过车的监控数据
     * {"car":"皖A9A7N2","city_code":"340500","county_code":"340522","card":117988031603010,"camera_id":"00001","orientation":"西南","road_id":34052055,"time":1614711895,"speed":36.38}
     */val InfoDS: ReceiverInputDStream[String]= sparkStreaming.socketTextStream("master",12345)/**
     * 方式1:在数据源的产生的时候,设置窗口
     * 统计最近的15s之内的卡口情况,每5秒统计一次
     *///设置滑动窗口val carJsonDS: DStream[String]= InfoDS.window(Durations.seconds(15), Durations.seconds(5))/**
     * 1、解析接收到的json数据
     * fastjson第三方工具处理
     *
     * json中值如果被双引号括起来的是一个字符串,若没有切是一个整数,一般都用long来接收,小数一般用double
     *
     */val cardAndSpeedDS: DStream[(Long,(Double,Int))]= carJsonDS.map((line:String)=>{//使用fastjson将字符串转成Json对象val jSONObject: JSONObject = JSON.parseObject(line)//根据需求,只需要获取卡口的值和车速val cardId:Long= jSONObject.getLong("card")val carSpeed:Double= jSONObject.getDouble("speed")(cardId,(carSpeed,1))//假设每次产生的数据都不是同一辆车})/**
     * 2、实时统计每个卡口的平均车速和车的数量
     * 方式2:在DS调用reduceByKeyAndWindow的时候设置窗口
     *
     *///    cardAndSpeedDS.reduceByKeyAndWindow()val cardConditionDS: DStream[(Long,(Double,Int))]= cardAndSpeedDS.reduceByKey((kv1:(Double,Int), kv2:(Double,Int))=>{//将同一卡口的1加起来,就得到这个批次中的车的数量val carNumber:Int= kv1._2 + kv2._2
      //将同一卡口的速度加起来 / 车的数量 = 这一批次的卡口平均速度val sumSpeed:Double= kv1._1 + kv2._1

      val avgSpeed:Double= sumSpeed / carNumber
      (avgSpeed, carNumber)})/**
     * 将结果保存到数据库中
     */
    cardConditionDS.foreachRDD((rdd: RDD[(Long,(Double,Int))])=>{
      rdd.foreachPartition((itr: Iterator[(Long,(Double,Int))])=>{
        println("------------数加 防伪码-------------------")//创建与数据库的连接对象//1、注册驱动
        Class.forName("com.mysql.jdbc.Driver")//2、获取数据库连接对象val conn: Connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useUnicode=true&characterEncoding=utf-8&useSSL=false","root","123456")//获取数据库操作对象val ps: PreparedStatement = conn.prepareStatement("insert into card_condition values(?,?,?,?)")//获取数据处理时间val piCiTime:String=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date())

        itr.foreach((f:(Long,(Double,Int)))=>{val cardId:Long= f._1
          val avgSpeed:Double= f._2._1
          val carNumber:Int= f._2._2
          ps.setLong(1, cardId)
          ps.setDouble(2, avgSpeed)
          ps.setInt(3, carNumber)
          ps.setString(4, piCiTime)

          ps.executeUpdate()})//释放资源
        ps.close()
        conn.close()
        println()})})

    sparkStreaming.start()
    sparkStreaming.awaitTermination()
    sparkStreaming.stop()}}

4.Spark-opt

Demo1Cache
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.storage.StorageLevel

object Demo1Cache {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext

    val studentsRDD: RDD[String]= sparkContext.textFile("spark/data/students.csv")/**
     * 尽量避免使用重复的RDD,避免了之前所有的RDD重复计算
     *
     * rdd
     * df
     * sql
     *
     *///针对被复用的rdd进行缓存//    studentsRDD.cache() // 默认是MEMORY_ONLY
    studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)/**
     * 统计每个班级的人数
     */
    studentsRDD.map((line:String)=>(line.split(",")(4),1)).reduceByKey(_ + _).saveAsTextFile("spark/data/opt/clazz_num")/**
     * 统计每个性别的人数
     *
     * 第一次作用用到studentsRDD的时候是原本的数据量大小,当第二个作业也用到studentsRDD数据的时候,就去缓存中寻找数据
     */
    studentsRDD.map((line:String)=>(line.split(",")(3),1)).reduceByKey(_ + _).saveAsTextFile("spark/data/opt/gender_num")while(true){}}}
Demo2AggregateByKey
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession

object Demo2AggregateByKey {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions",1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext

    val studentsRDD: RDD[String]= sparkContext.textFile("spark/data/students.csv")val clazzKVRDD: RDD[(String,Int)]= studentsRDD.map((line:String)=>(line.split(",")(4),1))/**
     * reduceByKey: 分组聚合,但是只会在reduce端进行聚合
     *
     *
     * AggregateByKey: 分组聚合,不仅可以设置reduce端聚合的方式,可以提前的在map端进行聚合,相当于预聚合
     * zeroValue:初始值
     * seqOp: (U, V) => U:map端的聚合
     * combOp: (U, U) => U:reduce端的聚合
     *
     * 如果当map聚合逻辑和reduce聚合逻辑不一样的时候,需要分别设置的时候,就可以使用该算子
     *
     *///aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]//    clazzKVRDD.aggregateByKey(0)(//      (u: Int, v: Int) => u + v, // 在map端做聚合操作的函数//      (u1: Int, u2: Int) => u1 + u2 //在reduce端做聚合操作//    ).foreach(println)//reduceByKey(func: (V, V) => V)
    clazzKVRDD.reduceByKey(_ + _).foreach(println)}}
Demo3MapPartition
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession

importjava.text.SimpleDateFormat
importjava.util.Date

object Demo3MapPartition {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions",2).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext

    val dataRDD: RDD[String]= sparkContext.textFile("spark/data/ant_user_low_carbon.txt")val kvRDD: RDD[(String,String,String)]= dataRDD.mapPartitions((itr: Iterator[String])=>{
      itr.map((line:String)=>{val infos: Array[String]= line.split("\t")(infos(0), infos(1), infos(2))})})//    map的逻辑是RDD中的处理每一条数据//    val resRDD: RDD[(String, Long, String)] = kvRDD.map((kv: (String, String, String)) => {//      //这句话是是在map中的,所以针对每一个数据都要new一个SimpleDateFormat对象//      val sdf = new SimpleDateFormat("yyyy/MM/dd")//      println("----------------创建了一个SimpleDateFormat对象----------------")////      val dateObj: Date = sdf.parse(kv._2)//      val ts: Long = dateObj.getTime//      (kv._1, ts, kv._3)//    })////    resRDD.foreach(println)//mapPartitions针对一个分区的数据进行处理val resRDD2: RDD[(String,Long,String)]= kvRDD.mapPartitions((itr: Iterator[(String,String,String)])=>{/**
       * 将时间字段转成时间戳
       */val sdf =new SimpleDateFormat("yyyy/MM/dd")
      println("----------------创建了一个SimpleDateFormat对象----------------")
      itr.map((kv:(String,String,String))=>{val dateObj: Date = sdf.parse(kv._2)val ts:Long= dateObj.getTime
        (kv._1, ts, kv._3)})})

    resRDD2.foreach(println)}}
Demo4Coalesce
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession

/**
 * 重分区
 *
 * repartition
 *
 * coalesce
 *
 * 面试题:如何修改rdd中的分区,区别是什么?
 *
 */object Demo4Coalesce {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache")//      .config("spark.sql.shuffle.partitions", 1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext

    val studentsRDD: RDD[String]= sparkContext.textFile("spark/data/students.csv")// 1000条数据

    println(s"studentsRDD的分区数量为:${studentsRDD.getNumPartitions}")//一开始RDD中的分区数量取决于block块的数量/**
     * repartition:对RDD进行重分区,返回一个新的RDD
     * repartition可以增加RDD的分区数量或者减少RDD的分区数量
     *
     * 增加或者减少分区是根据资源而定
     * 若资源充足,可以适当的增加分区,提高task任务数量并行度,加快整个spark作业执行
     *
     * rdd中一个分区数据是否对应有且仅有后一个分区数据,如果前一个RDD的分区数据对应的是后一个RDD中多个分区,那么就是shuffle
     * 使用repartition增加分区会产生shuffle,数据重构,不是按照原来的顺序
     * 减少分区也会产生shuffle
     *
     *///    val students2RDD: RDD[String] = studentsRDD.repartition(10)    students2RDD.foreach(println)//    println(s"students2RDD的分区数量为:${students2RDD.getNumPartitions}")////    val students3RDD: RDD[String] = students2RDD.repartition(1)//    println(s"students3RDD的分区数量为:${students3RDD.getNumPartitions}")//    students3RDD.foreach(println)/**
     * coalesce 重分区,增加分区和减少分区
     *
     * 传入两个值:numPartitions: Int 目标分区数量, shuffle: Boolean = false 是否产生shuffle
     *
     * coalesce增加分区,需要产生shuffle
     * coalesce减少分区,可以不用shuffle,也可以用。使用场景,如果要合并小文件的话,可以使用coalesce且不产生shuffle
     */val studentsRDD2: RDD[String]= studentsRDD.coalesce(10, shuffle =true)
    println(s"studentsRDD2的分区数量为:${studentsRDD2.getNumPartitions}")
    studentsRDD2.foreach(println)val studentRDD3: RDD[String]= studentsRDD2.coalesce(2, shuffle =false)
    println(s"studentRDD3的分区数量为:${studentRDD3.getNumPartitions}")
    studentRDD3.foreach(println)while(true){}}}
Demo5Coalesce2
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession

object Demo5Coalesce2 {def main(args: Array[String]):Unit={/**
     * 使用coalesce合并小文件
     */val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache")//      .config("spark.sql.shuffle.partitions", 1).getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext

    val studentsRDD: RDD[String]= sparkContext.textFile("spark/data/studentsinfo/*")// 1000条数据
    println(s"studentsRDD的分区数量为:${studentsRDD.getNumPartitions}")//一开始RDD中的分区数量取决于block块的数量//合并小文件val students2RDD: RDD[String]= studentsRDD.coalesce(1, shuffle =false)
    println(s"students2RDD的分区数量为:${students2RDD.getNumPartitions}")//一开始RDD中的分区数量取决于block块的数量

    students2RDD.saveAsTextFile("spark/data/studentsinfo2")}}
Demo6MapJoin
packagecom.shujia.optimportorg.apache.spark.sql.{DataFrame, SparkSession}object Demo6MapJoin {def main(args: Array[String]):Unit={/**
     * 使用coalesce合并小文件
     */val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions",1).getOrCreate()val studentsDF: DataFrame = sparkSession.read
      .format("csv").option("sep",",").schema("id STRING,name STRING,age INT,gender STRING,clazz STRING").load("spark/data/students.csv")val scoresDF: DataFrame = sparkSession.read
      .format("csv").option("sep",",").schema("id STRING,subject_id STRING,score INT").load("spark/data/score.txt")/**
     * 在local模式下,会对sql进行优化,使用广播变量,思想类似于mapjoin
     *
     * 放到集群中运行,需要手动设置广播的数据
     * 在DSL代码中使用语法糖来使用广播数据
     *
     * 一般来说,大表join小表,小表在DSL函数中使用hint语法将小表广播出去 小表一般指的是小于1G的数据
     *
     * 开启两个作业job
     * 第一个job:将小表数据拉到Driver端,从Driver端广播出去
     * 第二个job:在Executor内部与广播的数据进行数据关联
     *
     */val resDF: DataFrame = scoresDF.join(studentsDF.hint("broadcast"),"id")

    resDF.show()while(true){}}}
Demo7Kryo
packagecom.shujia.optimportorg.apache.spark.SparkContext
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.SparkSession
importorg.apache.spark.storage.StorageLevel

/**
 * Kryo序列化是spark提供的一种专门适配spark计算时提高性能的一种序列化方式
 *
 * 序列化可以提高计算的性能,加快计算速度,减少数据所占用的内存空间
 */caseclass Student(id:String, name:String, age:Int, gender:String, clazz:String)object Demo7Kryo {def main(args: Array[String]):Unit={val sparkSession: SparkSession = SparkSession.builder().master("local").appName("cache").config("spark.sql.shuffle.partitions",1)//将序列化方式设置为Kryo的序列化方式.config("spark.serializer","org.apache.spark.serializer.KryoSerializer")//自定义一个序列化类,指定要序列化的东西.config("spark.kryo.registrator","com.shujia.opt.Demo8KryoRegistrator").getOrCreate()val sparkContext: SparkContext = sparkSession.sparkContext

    val studentsRDD: RDD[Student]= sparkContext.textFile("spark/data/students.csv").map((line:String)=>{val infos: Array[String]= line.split(",")
      Student(infos(0), infos(1), infos(2).toInt, infos(3), infos(4))})/**
     * 未做序列化:238.3 KiB
     * 使用默认的序列化方式:55.7 KiB
     * 使用kryo序列化:43.0 KiB
     *
     *///针对被复用的rdd进行缓存//    studentsRDD.cache() // 默认是MEMORY_ONLY//    studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)//    studentsRDD.persist(StorageLevel.MEMORY_ONLY)
    studentsRDD.persist(StorageLevel.MEMORY_AND_DISK_SER)/**
     * 统计每个班级的人数
     */
    studentsRDD.map((stu: Student)=>(stu.clazz,1)).reduceByKey(_ + _).saveAsTextFile("spark/data/opt/clazz_num")/**
     * 统计每个性别的人数
     *
     * 第一次作用用到studentsRDD的时候是原本的数据量大小,当第二个作业也用到studentsRDD数据的时候,就去缓存中寻找数据
     */
    studentsRDD.map((stu: Student)=>(stu.gender,1)).reduceByKey(_ + _).saveAsTextFile("spark/data/opt/gender_num")while(true){}}}
Demo8KryoRegistrator
packagecom.shujia.optimportcom.esotericsoftware.kryo.Kryo
importorg.apache.spark.serializer.KryoRegistrator

/**
 * 自定义一个序列化类,指定kryo要序列化的东西
 */class Demo8KryoRegistrator extends KryoRegistrator{overridedef registerClasses(kryo: Kryo):Unit={/**
     * 在这个重写的方法中指定要序列化的东西
     *
     */
    kryo.register(classOf[Student])
    kryo.register(classOf[String])
    kryo.register(classOf[Int])}}
Demo9FilterKey
packagecom.shujia.optimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo9FilterKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local").setAppName("app")val sc: SparkContext =new SparkContext(conf)val lines: RDD[String]= sc.textFile("spark/data/ws/*")

    println("第一个RDD分区数量:"+ lines.getNumPartitions)val countRDD: RDD[(String,Int)]= lines
      .flatMap(_.split("\\|")).map((_,1)).groupByKey()// 这里会产生shuffle.map((x:(String, Iterable[Int]))=>(x._1, x._2.toList.sum))

    println("聚合之后RDD分区的数量"+ countRDD.getNumPartitions)//    countRDD.foreach(println)/**
     * 采样key  ,g过滤掉导致数据倾斜并且对业务影响不大的key
     *
     */val wordRDD: RDD[(String,Int)]= lines
      .flatMap(_.split(",")).map((_,1))////    val top1: Array[(String, Int)] = wordRDD//       .sample(true, 0.1)//       .reduceByKey(_ + _)//       .sortBy(-_._2)//       .take(1)//导致数据倾斜额key//     val key: String = top1(0)._1//过滤导致倾斜的key
    wordRDD
      .filter(t =>!"null".equals(t._1)).groupByKey().map(x =>(x._1, x._2.toList.sum)).foreach(println)while(true){}}}
Demo10DoubleReduce
packagecom.shujia.optimportorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}importscala.util.Random

object Demo10DoubleReduce {/**
    * 双重聚合
    * 一般适用于  业务不复杂的情况
    *
    */def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local").setAppName("app")val sc: SparkContext =new SparkContext(conf)val lines: RDD[String]= sc.textFile("data/word")val wordRDD: RDD[String]= lines
      .flatMap(_.split(",")).filter(!_.equals(""))/*    wordRDD
          .map((_, 1))
          .groupByKey()
          .map(kv => (kv._1, kv._2.size))
          .foreach(println)*///预聚合可以避免数据倾斜/*    wordRDD
      .map((_, 1))
      .reduceByKey(_ + _)
      .foreach(println)*/// 对每一个key打上随机5以内前缀
    wordRDD
      .map(word =>{val pix:Int= Random.nextInt(5)(pix +"-"+ word,1)}).groupByKey()//第一次聚合.map(t =>(t._1, t._2.toList.sum)).map(t =>{///去掉随机前缀(t._1.split("-")(1), t._2)}).groupByKey()//第二次聚合.map(t =>(t._1, t._2.toList.sum)).foreach(println)while(true){}}}
Demo11DoubleJoin
packagecom.shujia.optimportorg.apache.spark.broadcast.Broadcast
importorg.apache.spark.rdd.RDD
importorg.apache.spark.{SparkConf, SparkContext}object Demo11DoubleJoin {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setAppName("app").setMaster("local")val sc =new SparkContext(conf)val dataList1 = List(("java",1),("shujia",2),("shujia",3),("shujia",1),("shujia",1))val dataList2 = List(("java",100),("java",99),("shujia",88),("shujia",66))val RDD1: RDD[(String,Int)]= sc.parallelize(dataList1)val RDD2: RDD[(String,Int)]= sc.parallelize(dataList2)//采样倾斜的keyval sampleRDD: RDD[(String,Int)]= RDD1.sample(false,1.0)//skewedKey  导致数据倾斜的key   shujiaval skewedKey:String= sampleRDD.map(x =>(x._1,1)).reduceByKey(_ + _).map(x =>(x._2, x._1)).sortByKey(ascending =false).take(1)(0)._2

    //导致数据倾斜key的RDDval skewedRDD1: RDD[(String,Int)]= RDD1.filter(tuple =>{
      tuple._1.equals(skewedKey)})//没有倾斜的keyval commonRDD1: RDD[(String,Int)]= RDD1.filter(tuple =>{!tuple._1.equals(skewedKey)})val skewedRDD2: RDD[(String,Int)]= RDD2.filter(tuple =>{
      tuple._1.equals(skewedKey)})val commonRDD2: RDD[(String,Int)]= RDD2.filter(tuple =>{!tuple._1.equals(skewedKey)})val n =2//对产生数据倾斜的key 使用mapjoinval skewedMap: Map[String,Int]= skewedRDD2.collect().toMap

    val bro: Broadcast[Map[String,Int]]= sc.broadcast(skewedMap)val resultRDD1: RDD[(String,(Int,Int))]= skewedRDD1.map(kv =>{val word:String= kv._1

      val i:Int= bro.value.getOrElse(word,0)(word,(kv._2, i))})//没有数据倾斜的RDD  正常joinval resultRDD2: RDD[(String,(Int,Int))]= commonRDD1.join(commonRDD2)//将两个结果拼接
    resultRDD1.union(resultRDD2).foreach(println)}}
标签: 大数据 学习 spark

本文转载自: https://blog.csdn.net/qq_63548396/article/details/139637588
版权归原作者 @追求 所有, 如有侵权,请联系我们删除。

“大数据学习-Spark”的评论:

还没有评论