Spark
注意:所有的案例,我使用的端口号是3307,因为我安装了两个版本的mysql,一般mysql数据库的端口是3306
scala特性
1.面向对象特性
2.函数式编程
3.静态类型
4.扩展性
5.并发性
6.兼容java类库
scala入口函数
1.object代表所有的静态域,类似java中的static
2.def是定义函数(方法)的关键字
3.main是主方法的方法名
4.args代表参数名
5.Array[String]代表参数是字符串类型数组
6.Unit代表该方法无返回值
object demo3 {def main(args: Array[String]):Unit={ println("Hello world!")}}
条件语句
语法格式
// 单条件if(条件){ 满足条件的输出 }else{ 不满足条件的输出 }
// 多条件if(条件一){ 满足条件一的输出 }elseif(条件二){ 满足条件二的输出 }...else{ 不满足条件的输出 }
案例
object demo3 {def main(args: Array[String]):Unit={var num3 =77var num4 =66 println("num3是:"+num3) println("num4是:"+num4)val temp = num3 num3 = num4 num4 = temp println("num3现在是:"+num3) println("num4现在是:"+num4) println("________________________________________")val number =Random.nextInt(100) println(s"你的成绩是:${number}")if(number<60){ println("很可惜,你没有及格!")}elseif(number>=60| number <90){ println("你的成绩及格了!")}else{ println("恭喜你的成绩是优秀!")}}}
循环
for循环
for循环语法格式
for(值 <- 起始值 to 终止值){ 循环体 }
冒泡排序
importscala.util.Random object maopao {def main(args: Array[String]):Unit={val arr1 =new Array[Int](10)// 定义一个空的数组// 循环10次,将获取的随机数放入到arr1数组内for(n <-0 until 10){// 生成100以内的随机数 arr1(n)= Random.nextInt(100)}// 冒泡排序// 外层循环可以减1,内层循环可以减i减1for(i <-0 until 9){for(j <-0 until 10-i -1){if(arr1(j)< arr1(j+1)){val temp = arr1(j) arr1(j)= arr1(j+1) arr1(j+1)= temp }}} println(arr1.mkString(","))}}
案例
object for_demo {def main(args: Array[String]):Unit={for(i <-1 to 100){ println(s"这是第${i}句我爱你!")}var sum =0for(n <-1 to 100){ sum = sum + n } println(s"1-100的和是:${sum}")}}
类型转换
对类型进行转化使用(.to目标类型)
val num =12.5555val num1 = num.toInt println(num1)val str = num.toString println(str)
元组
元组可以通过(元组名._1)来获取元组中的第一个元素
案例
object tuple {def jisuan(num1:Int,num2:Int,fuhao:String):Int={var result =0if(fuhao.equals("+")){ result = num1+num2 }elseif(fuhao.equals("-")){ result = num1-num2 }elseif(fuhao.equals("*")){ result = num1*num2 }else{ result = num1/num2 } result }def main(args: Array[String]):Unit={val tp =("tom",19,"今天你吃饭了吗?",13.3333) println(s"元组中的第一个元素是:${tp._1}") println(s"${tp._1}今年${tp._2}了!")val num1 =65val num2 =88 println(s"${num1}与${num2}做加法运算的结果是:${jisuan(num1,num2,"+")}") println(s"${num1}与${num2}做减法运算的结果是:${jisuan(num1,num2,"-")}") println(s"${num1}与${num2}做乘法运算的结果是:${jisuan(num1,num2,"*")}") println(s"${num1}与${num2}做除法运算的结果是:${jisuan(num1,num2,"/")}")}}
List集合
List输出方式
// 第一种 println(list1)// 第二种 println(list1.mkString(","))// 以逗号的形式进行拼接// 第三种 list1.foreach(println)
案例:
importscala.collection.mutable.ListBuffer importscala.util.Random object list_demo {def main(args: Array[String]):Unit={val list1 = ListBuffer[Int]()// 可变的for(i <-0 until 15){ list1+=Random.nextInt(100)} println(list1)// println(list1.mkString("."))// list1.foreach(println)val list2 = list1.toList // 不可变 println(list2) println(list1.length) println(list1.contains(38))// 集合中是否包含元素38 println(list1.indexOf(22))// 查找集合中元素22的下标val list3 = list1.filter(_%2!=0) println(list3)val list4 = list1.sortBy(x=>x)// 排序 println(list4) println(list1.take(4))// 获取前四个元素 println(list1.sum)// 对集合中的元素进行求和 println(list1.max)// 取出集合中的最大值 println(list1.min)// 取出集合中的最小值 println(list1.mkString("_"))}}
Map集合
案例
object map_demo {def main(args: Array[String]):Unit={val map =new mutable.HashMap[String,Int]() map.put("tom",86) map.put("alice",99) map.put("frank",96) map.put("rose",97) println(map.size) println(map.contains("tom")) println(map.get("tom").get) println(map.get("rose").getOrElse(0))// 如果没有该元素,则返回默认值0 map.remove("tom") println(map)for(i <- map.keys){ println(s"${i}的成绩是:${map.get(i).get}分!")}}}
使用spark读取数据库
importorg.apache.spark.sql.SparkSession object demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("mydemo1").master("local").getOrCreate()val dataframe=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3307/gold").option("user","root").option("password","123456").option("dbtable","user_info").load() dataframe.printSchema() dataframe.show(3) spark.close()}}
RDD
RDD(弹性分布式数据集) 简介
RDD五大特征(RDD的算子分为转换算子和行动算子)
1.分区列表
2.每个分区都有一个计算函数
3.依赖于其他RDD
4.(Key,Value)数据类型的RDD分区表
5.每个分区都有一个优先位置列表
spark程序的基本框架
importorg.apache.spark.sql.SparkSession object adult_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("adult_demo").master("local").getOrCreate()val sc=spark.sparkContext sc.setLogLevel("ERROR")// 设置只输出错误日志// spark数据分析相关操作 spark.stop()}}
1.从文件系统中加载数据创建RDD
使用.textFile("路径“)获取
// 1.从文件系统中得到RDDimportorg.apache.spark.sql.SparkSession object rdd_distinct {def main(args: Array[String]):Unit={val spark = SparkSession.builder().appName("rdd_distinct").master("local").getOrCreate()val sc = spark.sparkContext sc.setLogLevel("ERROR")val rdd = sc.textFile("src/main/java/demo/data/data.txt") rdd.foreach(println) spark.stop()}}
// 2.从HDFS文件系统中得到RDDimportorg.apache.spark.sql.SparkSession object rdd_distinct {def main(args: Array[String]):Unit={val spark = SparkSession.builder().appName("rdd_distinct").master("local").getOrCreate()val sc = spark.sparkContext sc.setLogLevel("ERROR")val rdd = sc.textFile("hdfs:///data/data.txt")// /data/data.txt是HDFS文件系统的路径 rdd.foreach(println) spark.stop()}}
转换算子
案例练习素材
案例 (对数据进行去重)
importorg.apache.spark.sql.SparkSession object rdd_distinct {def main(args: Array[String]):Unit={val spark = SparkSession.builder().appName("rdd_distinct").master("local").getOrCreate()val sc = spark.sparkContext sc.setLogLevel("ERROR")val rdd = sc.textFile("src/main/java/demo/data/data.txt") println("原数据是:") rdd.foreach(println)val count1=rdd.count()val rdd1=rdd.distinct()val count2=rdd1.count() println("现数据是:") rdd1.foreach(println) println(s"一共删除了${count1-count2}条!") spark.stop()}}
案例练习素材
案例 (对数据进行筛选,只保留性别为男的数据)
importorg.apache.spark.sql.SparkSession object rdd_distinct {def main(args: Array[String]):Unit={val spark = SparkSession.builder().appName("rdd_distinct").master("local").getOrCreate()val sc = spark.sparkContext sc.setLogLevel("ERROR")val rdd = sc.textFile("src/main/java/demo/data/data.txt") println("原数据是:") rdd.foreach(println)val rdd1=rdd.filter(x=>{val arr1=x.split(",")val sex=arr1(3)if(sex.equals("男")){true}else{false}}) rdd1.foreach(println) spark.stop()}}
案例练习素材
案例 (年龄大于18时,在数据后添加已成年,小于时添加未成年)
importorg.apache.spark.sql.SparkSession object adult_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("adult_demo").master("local").getOrCreate()val sc=spark.sparkContext sc.setLogLevel("ERROR")val rdd=sc.textFile("src/main/java/demo/data/data.txt")val rdd1=rdd.map(x=>{val arr=x.split(",")val age=arr(2)var result=""if(age.toInt <18){ result = arr(0)+","+arr(1)+","+arr(2)+","+arr(3)+","+arr(4)+","+"未成年"}else{ result = arr(0)+","+arr(1)+","+arr(2)+","+arr(3)+","+arr(4)+","+"已成年"} result }) rdd1.foreach(println) spark.stop()}}
词频统计素材
案例 (词频统计)
importorg.apache.spark.sql.SparkSession object count_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("count_demo").master("local").getOrCreate()val sc=spark.sparkContext sc.setLogLevel("ERROR")val rdd=sc.textFile("src/main/java/demo/data/data1.txt")// rdd.foreach(println)val rdd1=rdd.flatMap(x=>{// 将数据以空为划分,将数据拆分为一列val word=x.split(" ")// 返回每一个元素 word }).map(x=>{// 让数据以(元素,1)的形式返回(x,1)}) println("原数据:") rdd1.foreach(println) println("---------------------------------------")val rdd2=rdd1.groupByKey().map(x=>{// 此时的x为返回的wordval word=x._1 val num=x._2.size (word,num)}) println("统计后的数据:") rdd2.foreach(println) spark.stop()}}
行动算子
importorg.apache.spark.sql.SparkSession object demo5 {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("adult_demo").master("local").getOrCreate()val sc=spark.sparkContext sc.setLogLevel("ERROR")val rdd=sc.textFile("src/main/java/demo/data/data.txt") println(rdd.first()) println(rdd.take(3)) spark.stop()}}
importorg.apache.spark.sql.SparkSession object demo5 {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("adult_demo").master("local").getOrCreate()val sc=spark.sparkContext sc.setLogLevel("ERROR")val rdd=sc.textFile("src/main/java/demo/data/data.txt") rdd.collect().foreach(println) spark.stop()}}
RDD综合应用案例素材
RDD综合应用
// 1.案例分析// 数据由(序号,姓名,性别,车牌号,身份证号码,违章原因,扣分,罚款,违章时间)组成// 要求:// 1) 去除重复的数据// 2) 去除不合理的数据(包括:// 1》姓名为空的数据// 2》身份证为空的数据// 3》违章数据有空的数据// )// 3) 格式统一化处理 (包括:// 1》将日期格式统一为XX年-XX月-XX日 XX时XX分,秒数不要// 2》新增一列 星期几// 3》新增一列 年份// 4》将车牌照中间的空格改为-,如 辽T-EKT75// 5》从身份证号码中提取出生日,单独形成一列,格式为 年-月-日// )// 4) 数据分析 (包括:// 1》统计2019年间各种违章类型对应的违章总数// 1》使用spark统计分析哪个年纪是最容易出现违章的,取统计数据的前三名// 1》使用spark统计各个月份中男、女违章占比// 1》统计各个年份中,男、女违章占比,要求输出:年份、男性违章次数、占比、女性违章次数、占比// )packagedemoimportorg.apache.spark.sql.SparkSession importjava.text.SimpleDateFormat object trffic_demo {def main(args: Array[String]):Unit={// 得到spark上下文对象val spark=SparkSession.builder().appName("trrfic_demo").master("local").getOrCreate()val sc=spark.sparkContext sc.setLogLevel("ERROR")// 设置只输出错误日志// 使用spark应用读取csv文件val rdd=sc.textFile("src/main/csv/trffic.csv")// rdd.foreach(println)// 定义一个常量用来统计数据个数val count1=rdd.count()// 对相同的数据进行去重val rdd1=rdd.distinct()// 定义一个常量用来统计去重后的数据个数val count2=rdd1.count() println(s"共删除${count1-count2}条重复数据数据!") println("---------------------------------------------------------------------") println("去重后的数据是:")// 打印前20条数据 rdd1.take(20).foreach(println)// rdd1.saveAsTextFile("trffic/trffic1") println(s"去重后还有${count2}条数据!")// 声明三个累加器,分别用来统计去除的含有缺失值的数据个数val name_counter=sc.longAccumulator("name")val person_counter=sc.longAccumulator("person")val trffic_counter=sc.longAccumulator("tiffic_data")// 对去重后的数据进行筛选val rdd2=rdd1.filter(x=>{// 对每一行数据,以分号进行分割val arr1=x.split(";")// 定义一个常量name用来表示姓名val name =arr1(1)// 定义一个常量person用来表示身份证号val person=arr1(4)// 定义一个常量trffic用来表示违章的原因val trffic=arr1(5)// 定义一个常量score用来表示扣的分数val score=arr1(6)// 定义一个常量money用来表示罚款金额val money=arr1(7)// 使用条件判断,当姓名为空时,姓名累加器加1if(name.equals("")){ name_counter.add(1)false}// 当身份证号为空时,身份证号累加器加1elseif(person.equals("")){ person_counter.add(1)false}// 当违章原因、扣分情况、罚款金额有任意一项为空时,违章累加器加1elseif(trffic.equals("")|| score.equals("")|| money.equals("")){ trffic_counter.add(1)false}// 如果数据没有缺失值时,则正常返回值else{true}}) println(s"进行数据清洗后的数据总数是:${rdd2.count()}") println(s"姓名为空的数据个数是:${name_counter.value}") println(s"身份证号为空的数据个数是:${person_counter.value}") println(s"违章数据为空的数据个数是:${trffic_counter.value}")val sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")val sdf1=new SimpleDateFormat("yyyy-MM-dd HH:mm")val weeks=Array("星期日","星期一","星期二","星期三","星期四","星期五","星期六")val rdd3=rdd2.map(x=>{val arr2=x.split(";")val date=arr2(8)// 违章的时间val car_number=arr2(3)// 车牌号val person=arr2(4)// 身份证号val trffic_date=sdf.parse(date)val week=trffic_date.getDay val year=trffic_date.getYear+1900val week1=weeks(week)val date1=sdf1.format(trffic_date)val car_num=car_number.replace(" ","-")val birthday=person.substring(6,14)val result=arr2(0)+";"+arr2(1)+";"+arr2(2)+";"+car_num+";"+arr2(4)+";"+arr2(5)+";"+arr2(6)+";"+arr2(7)+";"+date1+";"+week1+";"+year+";"+birthday result }) rdd3.take(20).foreach(println)// 4) 数据分析 (包括:// 1》统计2019年间各种违章类型对应的违章总数// 1》使用spark统计分析哪个年纪是最容易出现违章的,取统计数据的前三名// 1》使用spark统计各个月份中男、女违章占比// 1》统计各个年份中,男、女违章占比,要求输出:年份、男性违章次数、占比、女性违章次数、占比// )val rdd4=rdd3.filter(x=>{val arr3=x.split(";")val year=arr3(10)if(year.equals("2019")){true}else{false}}).flatMap(x=>{val arr4=x.split(";")val reason = arr4(5) Seq((reason))}).map(x=>{(x,1)}).groupByKey().map(x=>{val reason=x._1 val num=x._2.size (reason,num)})val rdd5=rdd3.flatMap(x=>{val arr5=x.split(";")val born=arr5(11)val age=2024-born.substring(0,4).toInt Seq((age))}).map(x=>{(x,1)}).groupByKey().map(x=>{val age=x._1 val num=x._2.size (age,num)})val rdd6=rdd3.flatMap(x=>{val arr6=x.split(";")val month=arr6(8).substring(5,7)val sex=arr6(2) Seq((month,sex))}).groupByKey().map(x=>{val month=x._1 var nan_count=0var nv_count=0for(i <- x._2){if(i.equals("男")){ nan_count+=1}else{ nv_count+=1}}val nan_rate=(nan_count/x._2.size.toDouble).formatted("%.2f")val nv_rate=(nv_count/x._2.size.toDouble).formatted("%.2f")(month,nan_rate,nv_rate)})val rdd7=rdd3.flatMap(x=>{val arr7=x.split(";")val year=arr7(10)val sex=arr7(2) Seq((year,sex))}).groupByKey().map(x=>{val year=x._1 var nan_count=0var nv_count=0for(i <- x._2){if(i.equals("男")){ nan_count+=1}else{ nv_count+=1}}val nan_rate=(nan_count/x._2.size.toDouble).formatted("%.2f")val nv_rate=(nv_count/x._2.size.toDouble).formatted("%.2f")// println(s"${year}年,男性违章次数:${nan_count},占比为:${nan_rate},女性违章次数:${nv_count},占比为:${nv_rate}")(year,nan_count,nan_rate,nv_count,nv_rate)}).sortBy(_._1) println("-----------------------------------") println("2019年间各种违章类型对应的违章总数数据如下:") rdd4.foreach(println) println("-----------------------------------") println("最容易出现违章的年龄前三名如下:") rdd5.sortBy(-_._2).take(3).foreach(println) println("-----------------------------------") println("各个月份中男、女违章占比如下:") rdd6.sortBy(_._1).foreach(println) println("-----------------------------------") println("各个年份中,男、女违章占比如下:") rdd7.foreach(println)}}
DataFrame
简介
DataFrame可以看作是分布式的Row对象的集合,
在二维表数据集的每一列都带有名称和类型,这就是Schema元信息,
这使得Spark框架可获取更多数据结构信息,
从而对在DataFrame背后的数据源以及作用于DataFrame之上数据变换进行针对性的优化,
最终达到提升计算效率。
读取文件
例如:packagedemoimportorg.apache.spark.sql.SparkSession object read_text_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("read_text_demo").master("local").getOrCreate()val df=spark.read.text("src/main/java/demo/data/data2.txt") df.show() df.printSchema() spark.stop()}}
例如:packagedemoimportorg.apache.spark.sql.SparkSession object read_csv_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("read_csv_demo").master("local").getOrCreate()// 以分号为分割val df=spark.read.option("delimiter",",").option("header",true).csv("src/main/java/demo/data/data2.txt") df.show() df.printSchema() spark.stop()}}
JSON文件素材
例如:
packagedemoimportorg.apache.spark.sql.SparkSession object read_json_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("read_json_demo").master("local").getOrCreate()val df=spark.read.option("multiline",true).json("src/main/java/demo/data/person.json") df.show() df.printSchema() spark.stop()}}
例如:
packagedemoimportorg.apache.spark.sql.SparkSession object read_mysql_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("read_mysql_demo").master("local").getOrCreate()val df=spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3307/gold").option("user","root").option("password","123456").option("dbtable","user_info").load() df.printSchema() df.show(10) spark.stop()}}
例如:
packagedemoimportorg.apache.spark.sql.SparkSession object dataframe_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("dataframe_demo").master("local").getOrCreate()val sc=spark.sparkContext sc.setLogLevel("ERROR")importspark.implicits._ // 1;景中婵;女;渝B AHYB4;362426198102291976;未礼让行人;6;100;2018-12-20 12:21:48val df=spark.read.option("header",false).option("delimiter",";").csv("src/main/csv/trffic.csv").toDF("序号","姓名","性别","车牌","身份证号","违章原因","扣分","罚款金额","日期")// df.show() println(df.count())val df1=df.distinct()// df1.show(df1.count().toInt,false) df1.show() println(df1.count())val df3=df.dropDuplicates("序号") df3.show()val df4=df3.filter("`罚款金额`>1000")val df5=df3.where("`罚款金额`>1000") df4.show() df5.show()val df6=df.drop("日期") df6.show() df.printSchema() spark.stop()}}
origin_url=images%2Fimg_25.png&pos_id=img-tDPHiPL0-1733380825271)
例如:packagedemoimportorg.apache.spark.sql.SparkSession importorg.apache.spark.sql.functions._ object dataframe_group_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("dataframe_group_demo").master("local").getOrCreate()// 78723;麻玉红;女;川U-WMZX5;110101195606198927;压线行驶;3;100;2015-06-01 18:50;星期一;2015;19560619val sc=spark.sparkContext sc.setLogLevel("ERROR")val df=spark.read.option("header",false).option("delimiter",";").csv("src/trffic/trffic1/part-00000").toDF("序号","姓名","性别","车牌","身份证号","违章原因","扣分","罚款金额","日期","星期","违章年份","出生日期").withColumn("扣分",col("扣分").cast("int")).withColumn("序号",col("序号").cast("int")).withColumn("性别",when(col("性别").equalTo("男"),"1").otherwise("0")).withColumn("日期1",lit("20241105")) df.show()val df1=df.groupBy("性别").count() df1.show()val df2=df.groupBy("违章原因").max("扣分") df2.show() df.orderBy("序号").show()}}
Spark-SQL简介
在实际情况下,开发工程师并不了解Scala语言,也不了解Spark常用API,但又非常想要使用Spark框架提供的强大的数据分析能力。Spark的开发工程师们考虑到了这个问题,利用SQL语言的语法简洁、学习门槛低以及在编程语言普及程度和流行程度高等诸多优势,从而开发了SparkSQL模块,通过Spark SQL,开发人员能够通过使用SQL语句,实现对结构化数据的处理。
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD) , Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、
DataFrames API和Datasets API三种方式实现对结构化数据的处理。Spark SQL是Spark用来处理结构化数据的一个模块,它提供了2个编程抽象:DataFrame 和DataSet, 并且作为分布式SQL查询引擎的作用
Spark SQL是将SQL转换成RDD,然后提交到集群执行,执行效率非常快
Spark SQL可以看作是一个转换层,向下对接各种不同的结构化数据源,向上提供不同的数据访问方式Spark SQL要比Hive执行的速度要快,原因在于Spark SQL不用通过MapReduce来执行程序,减少了执行 的复杂性
Spark SQL可以将数据转化为RDD,大大提高了执行的效率
Spark SQL主要提供了以下三个功能:
- Spark SQL可从各种结构化数据源中读取数据,进行数据分析。
- Spark SQL包含行业标准的JDBC和ODBC连接方式,因此它不局限于在Spark程序内使用SQL语句进行查询。
- SparkSQL可以无缝地将SQL查询与Spark程序进行结合,它能够将结构化数据作为Spark中的分布式数据集(RDD)进行查询。
Spark SQL是建立在DataFrame基础之上的,需要先获取对应的DataFrame数据集,然后才能以SQL的方 式对DataFrame进行操纵
医院统计素材
Spark-SQL案例
使用Spark—SQL实现下列数据统计(将结果存储到mysql对应表中):
统计各个医院就诊总次数,并按降序排列数据
统计各个医院在每一年的看诊人数,按年份升序、就诊人数降序排列统计各个医院下男女就诊人数和占比
packagedemoimportorg.apache.spark.sql.{SaveMode, SparkSession}importorg.apache.spark.sql.functions._ object hospital_sql_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("hospital_sql_demo").master("local").getOrCreate() spark.sparkContext.setLogLevel("ERROR")val df=spark.read.option("header",true).csv("src/main/csv/hospital.csv").toDF("id","name","sex","born","hospital","room","type","money","date","score") df.createOrReplaceTempView("tb_hospital")val data=spark.sql(""" |select * from tb_hospital |""".stripMargin) data.show()// 统计各个医院就诊总次数,并按降序排列数据val result=spark.sql(""" |select hospital,count(*) as people from tb_hospital |where hospital is not null |group by hospital |order by people desc |""".stripMargin) result.show()// 统计各个医院在每一年的看诊人数,按年份升序、就诊人数降序排列统计各个医院下男女就诊人数和占比val result1=spark.sql(""" |select year(born) as year,hospital,count(*) as people_num from tb_hospital |where hospital is not null |group by year,hospital |order by year,people_num desc |""".stripMargin) result1.show() result1.write.mode(SaveMode.Overwrite).format("jdbc").option("url","jdbc:mysql://localhost:3307/gold?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","hospital_info").save()val result2=spark.sql(""" |select hospital,sum(case when sex="男" then 1 else 0 end) as nan_count, |round(sum(case when sex="男" then 1 else 0 end)/count(*),2) as nan_rate, |sum(case when sex="女" then 1 else 0 end) as nv_count, |round(sum(case when sex="女" then 1 else 0 end)/count(*),2) as nv_rate from tb_hospital |where hospital is not null |group by hospital |""".stripMargin) result2.write.mode(SaveMode.Overwrite).format("jdbc").option("url","jdbc:mysql://localhost:3307/gold?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","hospital_rate_info").save() result2.show() spark.stop()}}
学生信息处理素材
Spark数据分析计算综合案例
具体需求如下:
加载数据集成为Dataframe,并将第一行标题作为DataFrame的列名 对数据集进行数据清洗,具体清洗操作如下:
a)剔除重复的行数,并记录去除的数量
b)清理掉专业为空,NULL的数据,记录总行数
c)清理掉生日在1980年1月1日以前的数据,记录总行数
d)清洗掉性别是“未知”的数据,并记录去除的行数e)查看文件的总行数和清洗的总行数
将清洗后的数据打印top10查看效果
具体需求如下:
在清洗后的数据基础之上,对数据进行格式化操作,具体要求如下:
将生日列和入学日期转为标准日期格式yyyy—MM—dd
新增一列年龄,且为整数类型新增一列入学年份在最后
将清洗后的数据打印top10查看效果数据分析和计算具体需求如下:
在格式化后的数据基础之上,对数据进行计算,具体要求如下:
使用spark统计各个学校的男女比率,输出格式如下,如:学校、男比率、女比率使用spark统计各个专业的平均年龄,按年龄降序排列
使用spark统计各个年、月出生的学生数量,按年、月均升序打印统计各个学校的总人数,按人数降序排列
packagedemoimportorg.apache.spark.sql.SparkSession object spark_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("spark_demo").master("local").getOrCreate() spark.sparkContext.setLogLevel("ERROR")val df=spark.read.option("header",true).csv("src/main/csv/students.csv")// 具体需求如下:// 加载数据集成为Dataframe,并将第一行标题作为DataFrame的列名 对数据集进行数据清洗,具体清洗操作如下:// a)剔除重复的行数,并记录去除的数量// b)清理掉专业为空,NULL的数据,记录总行数// c)清理掉生日在1980年1月1日以前的数据,记录总行数// d)清洗掉性别是“未知”的数据,并记录去除的行数// e)查看文件的总行数和清洗的总行数// 将清洗后的数据打印top10查看效果val count=df.count()val df3=df.distinct()val count1=df3.count()val df1=df3.filter("s_specialty!='NULL'")val count2=df1.count()val df2=df1.filter("s_birthday>'1980-1-1'")val count3=df2.count()val df4=df2.filter("s_sex!='未知'")val count4=df4.count() df2.show() println("----------------------------------------------") println(s"清除了重复的${count-count1}条数据!") println(s"清理掉专业为空,NULL的${count1-count2}条数据!") println(s"清理掉生日在1980年1月1日以前的${count2-count3}条数据!") println(s"清洗掉性别是“未知”的${count3-count4}条数据!") println(s"原文件的总行数有${count}条!") println(s"清洗的总行数有${count-count4}条!") df4.show(10) df4.coalesce(1).write.option("header",true).csv("src/main/java/demo/data/student.csv") spark.stop()}}
packagedemoimportorg.apache.spark.sql.SparkSession importorg.apache.spark.sql.functions._ object spark1_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("spark1_demo").master("local").getOrCreate() spark.sparkContext.setLogLevel("ERROR")val df=spark.read.csv("src/main/java/demo/data/student.csv/part-00000-33e46848-61a8-4661-8840-6f00c7453156-c000.csv").toDF("s_id","s_name","s_sex","s_birthday","s_class","s_specialty","s_school","s_register_time")// df.show()// 具体需求如下:// 在清洗后的数据基础之上,对数据进行格式化操作,具体要求如下:// 将生日列和入学日期转为标准日期格式yyyy—MM—dd// 新增一列年龄,且为整数类型新增一列入学年份在最后// 将清洗后的数据打印top10查看效果val result=df.withColumn("s_birthday",date_format(col("s_birthday"),"yyyy-MM-dd")).withColumn("s_register_time",date_format(col("s_register_time"),"yyyy-MM-dd")).withColumn("s_age",year(current_date())-year(col("s_birthday"))).withColumn("s_study_year",year(col("s_register_time"))) result.write.option("header",true).csv("src/main/java/demo/data/student1.csv") result.show(10) spark.stop()}}
packagedemoimportorg.apache.spark.sql.SparkSession object spark2_demo {def main(args: Array[String]):Unit={val spark=SparkSession.builder().appName("spark2_demo").master("local").getOrCreate() spark.sparkContext.setLogLevel("ERROR")val df=spark.read.option("header",true).csv("src/main/java/demo/data/student1.csv/part-00000-ad09c4e7-f319-4099-bc71-100036c6d5f6-c000.csv") df.createTempView("tb_student")val result=spark.sql(""" |select s_school,round(sum(case when s_sex="男" then 1 else 0 end)/count(*),2) as nan_rate, |round(sum(case when s_sex="女" then 1 else 0 end)/count(*),2) as nv_rate from tb_student |group by s_school |""".stripMargin) result.write.format("jdbc").option("url","jdbc:mysql://localhost:3307/studentdb?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","tb_age_rate").save()val result1=spark.sql(""" |select s_specialty,round(avg(s_age),2) as avg_age from tb_student |group by s_specialty |order by avg_age |""".stripMargin) result1.write.format("jdbc").option("url","jdbc:mysql://localhost:3307/studentdb?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","tb_specialty").save()val result2=spark.sql(""" |select year(s_birthday) as year,month(s_birthday) as month,count(*) as people from tb_student |group by year,month |order by year,month |""".stripMargin) result2.write.format("jdbc").option("url","jdbc:mysql://localhost:3307/studentdb?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","tb_birthday_infos").save()val result3=spark.sql(""" |select s_school,count(*) as people from tb_student |group by s_school |order by people desc |""".stripMargin) result3.write.format("jdbc").option("url","jdbc:mysql://localhost:3307/studentdb?useUnicode=true&characterEncoding=utf-8").option("user","root").option("password","123456").option("dbtable","tb_school_nums").save() result.show() result1.show() result2.show() result3.show()// df.show()}}
版权归原作者 。。。d(ŐдŐ๑) 所有, 如有侵权,请联系我们删除。