0


spark学习

Spark
注意:所有的案例,我使用的端口号是3307,因为我安装了两个版本的mysql,一般mysql数据库的端口是3306

scala特性

1.面向对象特性

2.函数式编程

3.静态类型

4.扩展性

5.并发性

6.兼容java类库

scala入口函数

1.object代表所有的静态域,类似java中的static
2.def是定义函数(方法)的关键字
3.main是主方法的方法名
4.args代表参数名
5.Array[String]代表参数是字符串类型数组
6.Unit代表该方法无返回值
object demo3 {def main(args: Array[String]):Unit={
      println("Hello world!")}}

条件语句

语法格式

// 单条件if(条件){
  满足条件的输出
}else{
  不满足条件的输出
}
// 多条件if(条件一){
  满足条件一的输出
}elseif(条件二){
  满足条件二的输出
}...else{
  不满足条件的输出
}

案例

object demo3 {def main(args: Array[String]):Unit={var num3 =77var num4 =66
  println("num3是:"+num3)
  println("num4是:"+num4)val temp = num3
  num3 = num4
  num4 = temp
  println("num3现在是:"+num3)
  println("num4现在是:"+num4)
  println("________________________________________")val number =Random.nextInt(100)
  println(s"你的成绩是:${number}")if(number<60){
    println("很可惜,你没有及格!")}elseif(number>=60| number <90){
    println("你的成绩及格了!")}else{
    println("恭喜你的成绩是优秀!")}}}

循环

for循环

for循环语法格式

for(值 <- 起始值 to 终止值){
 循环体
}

冒泡排序

importscala.util.Random
object maopao {def main(args: Array[String]):Unit={val arr1 =new Array[Int](10)// 定义一个空的数组// 循环10次,将获取的随机数放入到arr1数组内for(n <-0 until 10){// 生成100以内的随机数
          arr1(n)= Random.nextInt(100)}// 冒泡排序// 外层循环可以减1,内层循环可以减i减1for(i <-0 until  9){for(j <-0 until  10-i -1){if(arr1(j)< arr1(j+1)){val temp = arr1(j)
                  arr1(j)= arr1(j+1)
                  arr1(j+1)= temp
              }}}
      println(arr1.mkString(","))}}

案例

object for_demo {def main(args: Array[String]):Unit={for(i <-1 to 100){
    println(s"这是第${i}句我爱你!")}var sum =0for(n <-1 to 100){
    sum = sum + n
  }
  println(s"1-100的和是:${sum}")}}

类型转换

对类型进行转化使用(.to目标类型)

val num =12.5555val num1 = num.toInt
  println(num1)val str = num.toString
  println(str)

元组

元组可以通过(元组名._1)来获取元组中的第一个元素

案例

object tuple {def jisuan(num1:Int,num2:Int,fuhao:String):Int={var result =0if(fuhao.equals("+")){
    result = num1+num2
  }elseif(fuhao.equals("-")){
    result = num1-num2
  }elseif(fuhao.equals("*")){
    result = num1*num2
  }else{
    result = num1/num2
  }
  result
}def main(args: Array[String]):Unit={val tp =("tom",19,"今天你吃饭了吗?",13.3333)
  println(s"元组中的第一个元素是:${tp._1}")
  println(s"${tp._1}今年${tp._2}了!")val num1 =65val num2 =88
  println(s"${num1}与${num2}做加法运算的结果是:${jisuan(num1,num2,"+")}")
  println(s"${num1}与${num2}做减法运算的结果是:${jisuan(num1,num2,"-")}")
  println(s"${num1}与${num2}做乘法运算的结果是:${jisuan(num1,num2,"*")}")
  println(s"${num1}与${num2}做除法运算的结果是:${jisuan(num1,num2,"/")}")}}

List集合

在这里插入图片描述

在这里插入图片描述

List输出方式

// 第一种
println(list1)// 第二种
println(list1.mkString(","))// 以逗号的形式进行拼接// 第三种
list1.foreach(println)

案例:

importscala.collection.mutable.ListBuffer
importscala.util.Random

object list_demo {def main(args: Array[String]):Unit={val list1 = ListBuffer[Int]()// 可变的for(i <-0 until 15){
          list1+=Random.nextInt(100)}
      println(list1)//     println(list1.mkString("."))//     list1.foreach(println)val list2 = list1.toList // 不可变
      println(list2)
      println(list1.length)
      println(list1.contains(38))// 集合中是否包含元素38
      println(list1.indexOf(22))// 查找集合中元素22的下标val list3 = list1.filter(_%2!=0)
      println(list3)val list4 = list1.sortBy(x=>x)// 排序
      println(list4)
      println(list1.take(4))// 获取前四个元素
      println(list1.sum)// 对集合中的元素进行求和
      println(list1.max)// 取出集合中的最大值
      println(list1.min)// 取出集合中的最小值
      println(list1.mkString("_"))}}

Map集合

在这里插入图片描述

案例

object map_demo {def main(args: Array[String]):Unit={val map =new mutable.HashMap[String,Int]()
      map.put("tom",86)
      map.put("alice",99)
      map.put("frank",96)
      map.put("rose",97)
      println(map.size)
      println(map.contains("tom"))
      println(map.get("tom").get)
      println(map.get("rose").getOrElse(0))// 如果没有该元素,则返回默认值0
      map.remove("tom")
      println(map)for(i <- map.keys){
          println(s"${i}的成绩是:${map.get(i).get}分!")}}}

使用spark读取数据库

importorg.apache.spark.sql.SparkSession

object demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("mydemo1").master("local").getOrCreate()val dataframe=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3307/gold").option("user","root").option("password","123456").option("dbtable","user_info").load()
      dataframe.printSchema()
      dataframe.show(3)
      spark.close()}}

RDD

RDD(弹性分布式数据集) 简介

在这里插入图片描述

RDD五大特征(RDD的算子分为转换算子和行动算子)

1.分区列表

2.每个分区都有一个计算函数

3.依赖于其他RDD

4.(Key,Value)数据类型的RDD分区表

5.每个分区都有一个优先位置列表

spark程序的基本框架

importorg.apache.spark.sql.SparkSession

object adult_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("adult_demo").master("local").getOrCreate()val sc=spark.sparkContext
  sc.setLogLevel("ERROR")// 设置只输出错误日志// spark数据分析相关操作
  spark.stop()}}

1.从文件系统中加载数据创建RDD

使用.textFile("路径“)获取
// 1.从文件系统中得到RDDimportorg.apache.spark.sql.SparkSession

object rdd_distinct {def main(args: Array[String]):Unit={val spark = SparkSession.builder().appName("rdd_distinct").master("local").getOrCreate()val sc = spark.sparkContext
  sc.setLogLevel("ERROR")val rdd = sc.textFile("src/main/java/demo/data/data.txt")
  rdd.foreach(println)
  spark.stop()}}
// 2.从HDFS文件系统中得到RDDimportorg.apache.spark.sql.SparkSession

object rdd_distinct {def main(args: Array[String]):Unit={val spark = SparkSession.builder().appName("rdd_distinct").master("local").getOrCreate()val sc = spark.sparkContext
  sc.setLogLevel("ERROR")val rdd = sc.textFile("hdfs:///data/data.txt")// /data/data.txt是HDFS文件系统的路径
  rdd.foreach(println)
  spark.stop()}}

转换算子

在这里插入图片描述

在这里插入图片描述

案例练习素材

案例 (对数据进行去重)

importorg.apache.spark.sql.SparkSession

object rdd_distinct {def main(args: Array[String]):Unit={val spark = SparkSession.builder().appName("rdd_distinct").master("local").getOrCreate()val sc = spark.sparkContext
  sc.setLogLevel("ERROR")val rdd = sc.textFile("src/main/java/demo/data/data.txt")
  println("原数据是:")
  rdd.foreach(println)val count1=rdd.count()val rdd1=rdd.distinct()val count2=rdd1.count()
  println("现数据是:")
  rdd1.foreach(println)
  println(s"一共删除了${count1-count2}条!")
  spark.stop()}}

在这里插入图片描述

案例练习素材

案例 (对数据进行筛选,只保留性别为男的数据)

importorg.apache.spark.sql.SparkSession

object rdd_distinct {def main(args: Array[String]):Unit={val spark = SparkSession.builder().appName("rdd_distinct").master("local").getOrCreate()val sc = spark.sparkContext
  sc.setLogLevel("ERROR")val rdd = sc.textFile("src/main/java/demo/data/data.txt")
  println("原数据是:")
  rdd.foreach(println)val rdd1=rdd.filter(x=>{val arr1=x.split(",")val sex=arr1(3)if(sex.equals("男")){true}else{false}})
  rdd1.foreach(println)
  spark.stop()}}

在这里插入图片描述

案例练习素材

案例 (年龄大于18时,在数据后添加已成年,小于时添加未成年)

importorg.apache.spark.sql.SparkSession

object adult_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("adult_demo").master("local").getOrCreate()val sc=spark.sparkContext
      sc.setLogLevel("ERROR")val rdd=sc.textFile("src/main/java/demo/data/data.txt")val rdd1=rdd.map(x=>{val arr=x.split(",")val age=arr(2)var result=""if(age.toInt <18){
              result = arr(0)+","+arr(1)+","+arr(2)+","+arr(3)+","+arr(4)+","+"未成年"}else{
              result = arr(0)+","+arr(1)+","+arr(2)+","+arr(3)+","+arr(4)+","+"已成年"}
          result
      })
      rdd1.foreach(println)
      spark.stop()}}

在这里插入图片描述

在这里插入图片描述

词频统计素材

案例 (词频统计)

importorg.apache.spark.sql.SparkSession

object count_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("count_demo").master("local").getOrCreate()val sc=spark.sparkContext
  sc.setLogLevel("ERROR")val rdd=sc.textFile("src/main/java/demo/data/data1.txt")//    rdd.foreach(println)val rdd1=rdd.flatMap(x=>{// 将数据以空为划分,将数据拆分为一列val word=x.split(" ")// 返回每一个元素
    word
  }).map(x=>{// 让数据以(元素,1)的形式返回(x,1)})
  println("原数据:")
  rdd1.foreach(println)
  println("---------------------------------------")val rdd2=rdd1.groupByKey().map(x=>{// 此时的x为返回的wordval word=x._1
    val num=x._2.size
    (word,num)})
  println("统计后的数据:")
  rdd2.foreach(println)
  spark.stop()}}

行动算子

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

importorg.apache.spark.sql.SparkSession

object demo5 {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("adult_demo").master("local").getOrCreate()val sc=spark.sparkContext
      sc.setLogLevel("ERROR")val rdd=sc.textFile("src/main/java/demo/data/data.txt")
      println(rdd.first())
      println(rdd.take(3))
      spark.stop()}}

在这里插入图片描述

importorg.apache.spark.sql.SparkSession

object demo5 {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("adult_demo").master("local").getOrCreate()val sc=spark.sparkContext
      sc.setLogLevel("ERROR")val rdd=sc.textFile("src/main/java/demo/data/data.txt")
      rdd.collect().foreach(println)
      spark.stop()}}

在这里插入图片描述

RDD综合应用案例素材

RDD综合应用

//  1.案例分析//  数据由(序号,姓名,性别,车牌号,身份证号码,违章原因,扣分,罚款,违章时间)组成//  要求://  1) 去除重复的数据//  2) 去除不合理的数据(包括://    1》姓名为空的数据//    2》身份证为空的数据//    3》违章数据有空的数据//  )//  3) 格式统一化处理 (包括://    1》将日期格式统一为XX年-XX月-XX日 XX时XX分,秒数不要//    2》新增一列 星期几//    3》新增一列 年份//    4》将车牌照中间的空格改为-,如 辽T-EKT75//    5》从身份证号码中提取出生日,单独形成一列,格式为 年-月-日//  )//  4) 数据分析 (包括://    1》统计2019年间各种违章类型对应的违章总数//    1》使用spark统计分析哪个年纪是最容易出现违章的,取统计数据的前三名//    1》使用spark统计各个月份中男、女违章占比//    1》统计各个年份中,男、女违章占比,要求输出:年份、男性违章次数、占比、女性违章次数、占比//  )packagedemoimportorg.apache.spark.sql.SparkSession

importjava.text.SimpleDateFormat

object trffic_demo {def main(args: Array[String]):Unit={// 得到spark上下文对象val spark=SparkSession.builder().appName("trrfic_demo").master("local").getOrCreate()val sc=spark.sparkContext
      sc.setLogLevel("ERROR")// 设置只输出错误日志// 使用spark应用读取csv文件val rdd=sc.textFile("src/main/csv/trffic.csv")//    rdd.foreach(println)// 定义一个常量用来统计数据个数val count1=rdd.count()// 对相同的数据进行去重val rdd1=rdd.distinct()// 定义一个常量用来统计去重后的数据个数val count2=rdd1.count()
      println(s"共删除${count1-count2}条重复数据数据!")
      println("---------------------------------------------------------------------")
      println("去重后的数据是:")// 打印前20条数据
      rdd1.take(20).foreach(println)//    rdd1.saveAsTextFile("trffic/trffic1")
      println(s"去重后还有${count2}条数据!")// 声明三个累加器,分别用来统计去除的含有缺失值的数据个数val name_counter=sc.longAccumulator("name")val person_counter=sc.longAccumulator("person")val trffic_counter=sc.longAccumulator("tiffic_data")// 对去重后的数据进行筛选val rdd2=rdd1.filter(x=>{// 对每一行数据,以分号进行分割val arr1=x.split(";")// 定义一个常量name用来表示姓名val name =arr1(1)// 定义一个常量person用来表示身份证号val person=arr1(4)// 定义一个常量trffic用来表示违章的原因val trffic=arr1(5)// 定义一个常量score用来表示扣的分数val score=arr1(6)// 定义一个常量money用来表示罚款金额val money=arr1(7)// 使用条件判断,当姓名为空时,姓名累加器加1if(name.equals("")){
              name_counter.add(1)false}// 当身份证号为空时,身份证号累加器加1elseif(person.equals("")){
              person_counter.add(1)false}// 当违章原因、扣分情况、罚款金额有任意一项为空时,违章累加器加1elseif(trffic.equals("")|| score.equals("")|| money.equals("")){
              trffic_counter.add(1)false}// 如果数据没有缺失值时,则正常返回值else{true}})
      println(s"进行数据清洗后的数据总数是:${rdd2.count()}")
      println(s"姓名为空的数据个数是:${name_counter.value}")
      println(s"身份证号为空的数据个数是:${person_counter.value}")
      println(s"违章数据为空的数据个数是:${trffic_counter.value}")val sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val sdf1=new SimpleDateFormat("yyyy-MM-dd HH:mm")val weeks=Array("星期日","星期一","星期二","星期三","星期四","星期五","星期六")val rdd3=rdd2.map(x=>{val arr2=x.split(";")val date=arr2(8)// 违章的时间val car_number=arr2(3)// 车牌号val person=arr2(4)// 身份证号val trffic_date=sdf.parse(date)val week=trffic_date.getDay
          val year=trffic_date.getYear+1900val week1=weeks(week)val date1=sdf1.format(trffic_date)val car_num=car_number.replace(" ","-")val birthday=person.substring(6,14)val result=arr2(0)+";"+arr2(1)+";"+arr2(2)+";"+car_num+";"+arr2(4)+";"+arr2(5)+";"+arr2(6)+";"+arr2(7)+";"+date1+";"+week1+";"+year+";"+birthday
          result
      })
      rdd3.take(20).foreach(println)//  4) 数据分析 (包括://    1》统计2019年间各种违章类型对应的违章总数//    1》使用spark统计分析哪个年纪是最容易出现违章的,取统计数据的前三名//    1》使用spark统计各个月份中男、女违章占比//    1》统计各个年份中,男、女违章占比,要求输出:年份、男性违章次数、占比、女性违章次数、占比//  )val rdd4=rdd3.filter(x=>{val arr3=x.split(";")val year=arr3(10)if(year.equals("2019")){true}else{false}}).flatMap(x=>{val arr4=x.split(";")val reason = arr4(5)
          Seq((reason))}).map(x=>{(x,1)}).groupByKey().map(x=>{val reason=x._1
          val num=x._2.size
          (reason,num)})val rdd5=rdd3.flatMap(x=>{val arr5=x.split(";")val born=arr5(11)val age=2024-born.substring(0,4).toInt
          Seq((age))}).map(x=>{(x,1)}).groupByKey().map(x=>{val age=x._1
          val num=x._2.size
          (age,num)})val rdd6=rdd3.flatMap(x=>{val arr6=x.split(";")val month=arr6(8).substring(5,7)val sex=arr6(2)
          Seq((month,sex))}).groupByKey().map(x=>{val month=x._1
          var nan_count=0var nv_count=0for(i <- x._2){if(i.equals("男")){
                  nan_count+=1}else{
                  nv_count+=1}}val nan_rate=(nan_count/x._2.size.toDouble).formatted("%.2f")val nv_rate=(nv_count/x._2.size.toDouble).formatted("%.2f")(month,nan_rate,nv_rate)})val rdd7=rdd3.flatMap(x=>{val arr7=x.split(";")val year=arr7(10)val sex=arr7(2)
          Seq((year,sex))}).groupByKey().map(x=>{val year=x._1
          var nan_count=0var nv_count=0for(i <- x._2){if(i.equals("男")){
                  nan_count+=1}else{
                  nv_count+=1}}val nan_rate=(nan_count/x._2.size.toDouble).formatted("%.2f")val nv_rate=(nv_count/x._2.size.toDouble).formatted("%.2f")//      println(s"${year}年,男性违章次数:${nan_count},占比为:${nan_rate},女性违章次数:${nv_count},占比为:${nv_rate}")(year,nan_count,nan_rate,nv_count,nv_rate)}).sortBy(_._1)
      println("-----------------------------------")
      println("2019年间各种违章类型对应的违章总数数据如下:")
      rdd4.foreach(println)
      println("-----------------------------------")
      println("最容易出现违章的年龄前三名如下:")
      rdd5.sortBy(-_._2).take(3).foreach(println)
      println("-----------------------------------")
      println("各个月份中男、女违章占比如下:")
      rdd6.sortBy(_._1).foreach(println)
      println("-----------------------------------")
      println("各个年份中,男、女违章占比如下:")
      rdd7.foreach(println)}}

DataFrame

简介

DataFrame可以看作是分布式的Row对象的集合,
在二维表数据集的每一列都带有名称和类型,这就是Schema元信息,
这使得Spark框架可获取更多数据结构信息,
从而对在DataFrame背后的数据源以及作用于DataFrame之上数据变换进行针对性的优化,
最终达到提升计算效率。

在这里插入图片描述

读取文件

在这里插入图片描述
例如:

packagedemoimportorg.apache.spark.sql.SparkSession

object read_text_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("read_text_demo").master("local").getOrCreate()val df=spark.read.text("src/main/java/demo/data/data2.txt")
   df.show()
   df.printSchema()
   spark.stop()}}

在这里插入图片描述
例如:

packagedemoimportorg.apache.spark.sql.SparkSession

object read_csv_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("read_csv_demo").master("local").getOrCreate()// 以分号为分割val df=spark.read.option("delimiter",",").option("header",true).csv("src/main/java/demo/data/data2.txt")
   df.show()
   df.printSchema()
   spark.stop()}}

在这里插入图片描述

JSON文件素材

例如:

packagedemoimportorg.apache.spark.sql.SparkSession

object read_json_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("read_json_demo").master("local").getOrCreate()val df=spark.read.option("multiline",true).json("src/main/java/demo/data/person.json")
   df.show()
   df.printSchema()
   spark.stop()}}

在这里插入图片描述

例如:

packagedemoimportorg.apache.spark.sql.SparkSession

object read_mysql_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("read_mysql_demo").master("local").getOrCreate()val df=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3307/gold").option("user","root").option("password","123456").option("dbtable","user_info").load()
      df.printSchema()
      df.show(10)
      spark.stop()}}

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

例如:

packagedemoimportorg.apache.spark.sql.SparkSession

object dataframe_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("dataframe_demo").master("local").getOrCreate()val sc=spark.sparkContext
   sc.setLogLevel("ERROR")importspark.implicits._
   // 1;景中婵;女;渝B AHYB4;362426198102291976;未礼让行人;6;100;2018-12-20 12:21:48val df=spark.read.option("header",false).option("delimiter",";").csv("src/main/csv/trffic.csv").toDF("序号","姓名","性别","车牌","身份证号","违章原因","扣分","罚款金额","日期")//    df.show()
   println(df.count())val df1=df.distinct()//    df1.show(df1.count().toInt,false)
   df1.show()
   println(df1.count())val df3=df.dropDuplicates("序号")
   df3.show()val df4=df3.filter("`罚款金额`>1000")val df5=df3.where("`罚款金额`>1000")
   df4.show()
   df5.show()val df6=df.drop("日期")
   df6.show()
   df.printSchema()
   spark.stop()}}

在这里插入图片描述

在这里插入图片描述
origin_url=images%2Fimg_25.png&pos_id=img-tDPHiPL0-1733380825271)
例如:

packagedemoimportorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions._

object dataframe_group_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("dataframe_group_demo").master("local").getOrCreate()//    78723;麻玉红;女;川U-WMZX5;110101195606198927;压线行驶;3;100;2015-06-01 18:50;星期一;2015;19560619val sc=spark.sparkContext
     sc.setLogLevel("ERROR")val df=spark.read.option("header",false).option("delimiter",";").csv("src/trffic/trffic1/part-00000").toDF("序号","姓名","性别","车牌","身份证号","违章原因","扣分","罚款金额","日期","星期","违章年份","出生日期").withColumn("扣分",col("扣分").cast("int")).withColumn("序号",col("序号").cast("int")).withColumn("性别",when(col("性别").equalTo("男"),"1").otherwise("0")).withColumn("日期1",lit("20241105"))
     df.show()val df1=df.groupBy("性别").count()
     df1.show()val df2=df.groupBy("违章原因").max("扣分")
     df2.show()
     df.orderBy("序号").show()}}

Spark-SQL简介

在实际情况下,开发工程师并不了解Scala语言,也不了解Spark常用API,但又非常想要使用Spark框架提供的强大的数据分析能力。Spark的开发工程师们考虑到了这个问题,利用SQL语言的语法简洁、学习门槛低以及在编程语言普及程度和流行程度高等诸多优势,从而开发了SparkSQL模块,通过Spark SQL,开发人员能够通过使用SQL语句,实现对结构化数据的处理。

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD) , Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、
DataFrames API和Datasets API三种方式实现对结构化数据的处理。

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame 和DataSet, 并且作为分布式SQL查询引擎的作用

Spark SQL是将SQL转换成RDD,然后提交到集群执行,执行效率非常快

Spark SQL可以看作是一个转换层,向下对接各种不同的结构化数据源,向上提供不同的数据访问方式Spark SQL要比Hive执行的速度要快,原因在于Spark SQL不用通过MapReduce来执行程序,减少了执行 的复杂性

Spark SQL可以将数据转化为RDD,大大提高了执行的效率

Spark SQL主要提供了以下三个功能:
  1. Spark SQL可从各种结构化数据源中读取数据,进行数据分析。
  2. Spark SQL包含行业标准的JDBC和ODBC连接方式,因此它不局限于在Spark程序内使用SQL语句进行查询。
  3. SparkSQL可以无缝地将SQL查询与Spark程序进行结合,它能够将结构化数据作为Spark中的分布式数据集(RDD)进行查询。

Spark SQL是建立在DataFrame基础之上的,需要先获取对应的DataFrame数据集,然后才能以SQL的方 式对DataFrame进行操纵

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

医院统计素材

Spark-SQL案例

使用Spark—SQL实现下列数据统计(将结果存储到mysql对应表中):

统计各个医院就诊总次数,并按降序排列数据

统计各个医院在每一年的看诊人数,按年份升序、就诊人数降序排列统计各个医院下男女就诊人数和占比

packagedemoimportorg.apache.spark.sql.{SaveMode, SparkSession}importorg.apache.spark.sql.functions._


object hospital_sql_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("hospital_sql_demo").master("local").getOrCreate()
   spark.sparkContext.setLogLevel("ERROR")val df=spark.read.option("header",true).csv("src/main/csv/hospital.csv").toDF("id","name","sex","born","hospital","room","type","money","date","score")
   df.createOrReplaceTempView("tb_hospital")val data=spark.sql("""
       |select * from tb_hospital
       |""".stripMargin)
   data.show()//    统计各个医院就诊总次数,并按降序排列数据val result=spark.sql("""
       |select hospital,count(*) as people from tb_hospital
       |where hospital is not null
       |group by hospital
       |order by people desc
       |""".stripMargin)
   result.show()//    统计各个医院在每一年的看诊人数,按年份升序、就诊人数降序排列统计各个医院下男女就诊人数和占比val result1=spark.sql("""
       |select year(born) as year,hospital,count(*) as people_num from tb_hospital
       |where hospital is not null
       |group by year,hospital
       |order by year,people_num desc
       |""".stripMargin)
   result1.show()
   result1.write.mode(SaveMode.Overwrite).format("jdbc").option("url","jdbc:mysql://localhost:3307/gold?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","hospital_info").save()val result2=spark.sql("""
       |select hospital,sum(case when sex="男" then 1 else 0 end) as nan_count,
       |round(sum(case when sex="男" then 1 else 0 end)/count(*),2) as nan_rate,
       |sum(case when sex="女" then 1 else 0 end) as nv_count,
       |round(sum(case when sex="女" then 1 else 0 end)/count(*),2) as nv_rate from tb_hospital
       |where hospital is not null
       |group by hospital
       |""".stripMargin)
   result2.write.mode(SaveMode.Overwrite).format("jdbc").option("url","jdbc:mysql://localhost:3307/gold?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","hospital_rate_info").save()
   result2.show()
   spark.stop()}}

学生信息处理素材

Spark数据分析计算综合案例

具体需求如下:

加载数据集成为Dataframe,并将第一行标题作为DataFrame的列名 对数据集进行数据清洗,具体清洗操作如下:

a)剔除重复的行数,并记录去除的数量

b)清理掉专业为空,NULL的数据,记录总行数

c)清理掉生日在1980年1月1日以前的数据,记录总行数
d)清洗掉性别是“未知”的数据,并记录去除的行数

e)查看文件的总行数和清洗的总行数

将清洗后的数据打印top10查看效果

具体需求如下:

在清洗后的数据基础之上,对数据进行格式化操作,具体要求如下:

将生日列和入学日期转为标准日期格式yyyy—MM—dd

新增一列年龄,且为整数类型新增一列入学年份在最后
将清洗后的数据打印top10查看效果

数据分析和计算具体需求如下:

在格式化后的数据基础之上,对数据进行计算,具体要求如下:

使用spark统计各个学校的男女比率,输出格式如下,如:学校、男比率、女比率使用spark统计各个专业的平均年龄,按年龄降序排列

使用spark统计各个年、月出生的学生数量,按年、月均升序打印统计各个学校的总人数,按人数降序排列

packagedemoimportorg.apache.spark.sql.SparkSession

object spark_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("spark_demo").master("local").getOrCreate()
   spark.sparkContext.setLogLevel("ERROR")val df=spark.read.option("header",true).csv("src/main/csv/students.csv")//    具体需求如下://    加载数据集成为Dataframe,并将第一行标题作为DataFrame的列名 对数据集进行数据清洗,具体清洗操作如下://    a)剔除重复的行数,并记录去除的数量//    b)清理掉专业为空,NULL的数据,记录总行数//    c)清理掉生日在1980年1月1日以前的数据,记录总行数//    d)清洗掉性别是“未知”的数据,并记录去除的行数//    e)查看文件的总行数和清洗的总行数//    将清洗后的数据打印top10查看效果val count=df.count()val df3=df.distinct()val count1=df3.count()val df1=df3.filter("s_specialty!='NULL'")val count2=df1.count()val df2=df1.filter("s_birthday>'1980-1-1'")val count3=df2.count()val df4=df2.filter("s_sex!='未知'")val count4=df4.count()
   df2.show()
   println("----------------------------------------------")
   println(s"清除了重复的${count-count1}条数据!")
   println(s"清理掉专业为空,NULL的${count1-count2}条数据!")
   println(s"清理掉生日在1980年1月1日以前的${count2-count3}条数据!")
   println(s"清洗掉性别是“未知”的${count3-count4}条数据!")
   println(s"原文件的总行数有${count}条!")
   println(s"清洗的总行数有${count-count4}条!")
   df4.show(10)
   df4.coalesce(1).write.option("header",true).csv("src/main/java/demo/data/student.csv")
   spark.stop()}}
packagedemoimportorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions._

object spark1_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("spark1_demo").master("local").getOrCreate()
   spark.sparkContext.setLogLevel("ERROR")val df=spark.read.csv("src/main/java/demo/data/student.csv/part-00000-33e46848-61a8-4661-8840-6f00c7453156-c000.csv").toDF("s_id","s_name","s_sex","s_birthday","s_class","s_specialty","s_school","s_register_time")//    df.show()//    具体需求如下://    在清洗后的数据基础之上,对数据进行格式化操作,具体要求如下://    将生日列和入学日期转为标准日期格式yyyy—MM—dd//    新增一列年龄,且为整数类型新增一列入学年份在最后//    将清洗后的数据打印top10查看效果val result=df.withColumn("s_birthday",date_format(col("s_birthday"),"yyyy-MM-dd")).withColumn("s_register_time",date_format(col("s_register_time"),"yyyy-MM-dd")).withColumn("s_age",year(current_date())-year(col("s_birthday"))).withColumn("s_study_year",year(col("s_register_time")))
   result.write.option("header",true).csv("src/main/java/demo/data/student1.csv")
   result.show(10)
   spark.stop()}}
packagedemoimportorg.apache.spark.sql.SparkSession

object spark2_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("spark2_demo").master("local").getOrCreate()
   spark.sparkContext.setLogLevel("ERROR")val df=spark.read.option("header",true).csv("src/main/java/demo/data/student1.csv/part-00000-ad09c4e7-f319-4099-bc71-100036c6d5f6-c000.csv")
   df.createTempView("tb_student")val result=spark.sql("""
       |select s_school,round(sum(case when s_sex="男" then 1 else 0 end)/count(*),2) as nan_rate,
       |round(sum(case when s_sex="女" then 1 else 0 end)/count(*),2) as nv_rate from tb_student
       |group by s_school
       |""".stripMargin)
   result.write.format("jdbc").option("url","jdbc:mysql://localhost:3307/studentdb?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","tb_age_rate").save()val result1=spark.sql("""
       |select s_specialty,round(avg(s_age),2) as avg_age from tb_student
       |group by s_specialty
       |order by avg_age
       |""".stripMargin)
   result1.write.format("jdbc").option("url","jdbc:mysql://localhost:3307/studentdb?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","tb_specialty").save()val result2=spark.sql("""
       |select year(s_birthday) as year,month(s_birthday) as month,count(*) as people from tb_student
       |group by year,month
       |order by year,month
       |""".stripMargin)
   result2.write.format("jdbc").option("url","jdbc:mysql://localhost:3307/studentdb?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","tb_birthday_infos").save()val result3=spark.sql("""
       |select s_school,count(*) as people from tb_student
       |group by s_school
       |order by people desc
       |""".stripMargin)
   result3.write.format("jdbc").option("url","jdbc:mysql://localhost:3307/studentdb?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","tb_school_nums").save()
   result.show()
   result1.show()
   result2.show()
   result3.show()//    df.show()}}
标签: spark scala 大数据

本文转载自: https://blog.csdn.net/2303_81781665/article/details/144266653
版权归原作者 。。。d(ŐдŐ๑) 所有, 如有侵权,请联系我们删除。

“spark学习”的评论:

还没有评论