Spark SQL
一、Spark SQL概述
Spark SQL属于Spark计算框架的一部分,是专门负责结构化数据的处理计算框架,Spark SQL提供了两种数据抽象:DataFrame、Dataset,都是基于RDD之上的一种高级数据抽象,在RDD基础之上增加了一个schema表结构。
DataFrame是以前旧版本的数据抽象(untyped类型的数据抽象),Dataset是新版本的数据抽象(typed有类型的数据抽象),新版本当中DataFrame底层就是Dataset[Row]。
Spark SQL特点
- 易整合
- 统一的数据访问方式
- 兼容Hive
- 标准的数据库连接
二、准备Spark SQL的编程环境
1、创建Spark SQL的编程项目,scala语言支持的
2、引入编程依赖
spark-core_2.12
hadoop-hdfs
spark-sql_2.12
spark-hive_2.12
hadoop的有一个依赖jackson版本和scala2.12版本冲突了,Spark依赖中也有这个依赖,但是默认使用的是pom.xml先引入的那个依赖,把hadoop中jackson依赖排除了即可。
<projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.kang</groupId><artifactId>spark-sql-study</artifactId><version>1.0-SNAPSHOT</version><packaging>jar</packaging><name>spark-sql-study</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>3.1.4</version><exclusions><exclusion><groupId>com.fasterxml.jackson.module</groupId><artifactId>*</artifactId></exclusion><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>*</artifactId></exclusion></exclusions></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.1.1</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.12</artifactId><version>3.1.1</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.18</version></dependency><!-- spark sql on hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.12</artifactId><version>3.1.1</version></dependency></dependencies></project>
三、Spark SQL程序编程的入口
1、SQLContext:只能做SQL编程,无法操作Hive以及使用HQL操作。
2、HiveContext:专门提供用来操作和Hive相关的编程。
3、SparkSession:全新的Spark SQL程序执行入口,把SQLContext和HiveContext功能全部整合了,SparkSession底层封装了一个SparkContext,而且SparkSession可以开启Hive的支持。
packagestudyimportorg.apache.spark.SparkConf
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, SparkSession}/**
* Spark SQL的基本案例执行
*/object Demo01 {def main(args: Array[String]):Unit={/**
* 1、创建Spark SQL的程序编程入口
*/val sparkConf:SparkConf =new SparkConf()val sc:SparkSession = SparkSession.builder().appName("test").master("local[*]").config(sparkConf).getOrCreate()importsc.implicits._
/**
* 2、创建DataFrame或者Dataset数据抽象
*/val rdd:RDD[(String,Int)]= sc.sparkContext.makeRDD(Array(("zs",20),("ls",30)))val df:DataFrame = rdd.toDF("name","age")
df.printSchema()
df.show()
sc.stop()}}
四、DataFrame的创建
- 1、使用隐式转换函数从RDD、Scala集合创建DataFrame toDF() toDF(columnName*)- 机制:如果集合或者RDD的类型不是Bean,而且再toDF没有传入任何的列名,那么Spark会默认按照列的个数给生成随机的列名,但是如果类型是一个Bean类型,那么toDF产生的随机列名就是bean的属性名。-
packagecreate.methon1importorg.apache.spark.SparkConfimportorg.apache.spark.sql.{DataFrame, SparkSession}/** * 1、通过隐式转换函数从Scala集合创建DataFrame * 如果使用隐式转换函数 那么必须引入spark定义的隐式转换函数代码 * sparksession的对象名.implicits._ */object Demo01 {def main(args: Array[String]):Unit={val sparkConf:SparkConf =new SparkConf()val ss:SparkSession = SparkSession.builder().appName("seq to df").master("local[*]").config(sparkConf).getOrCreate()//隐式转换必须导入隐式转换函数类importss.implicits._ /** * 从集合创建DataFrame * 集合一般都是T类型的 T类型如果是Scala自带类型,toDF后面需要跟列名,不跟列名也可以 * 集合必须是Seq类型的 而且必须显示的声明为Seq类型 */val array:Seq[(String,Int)]= Array(("zs",20),("ls",30))val df:DataFrame = array.toDF("name","age") df.printSchema() df.show()val array1:Seq[Student]= Array(Student("zs",21),Student("ls",25))val df1:DataFrame = array1.toDF() df1.printSchema() df1.show() ss.stop()}}
-packagecreate.methon1importorg.apache.spark.SparkConfimportorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.{DataFrame, SparkSession}/** * 1、通过隐式转换函数从RDD创建DataFrame * 如果使用隐式转换函数 那么必须引入spark定义的隐式转换函数代码 * sparksession的对象名.implicits._ */object Demo02 {def main(args: Array[String]):Unit={val sparkConf:SparkConf =new SparkConf()val ss:SparkSession = SparkSession.builder().appName("seq to df").master("local[*]").config(sparkConf).getOrCreate()//隐式转换必须导入隐式转换函数类importss.implicits._ /** * 从RDD创建DataFrame */val array:Seq[(String,Int)]= Array(("zs",20),("ls",30))val rdd:RDD[(String,Int)]= ss.sparkContext.makeRDD(array)val df:DataFrame = rdd.toDF() df.printSchema() df.show()val array1:Seq[Student]= Array(Student("zs",21),Student("ls",25))val rdd1:RDD[Student]= ss.sparkContext.makeRDD(array1)val df1:DataFrame = rdd1.toDF() df1.printSchema() df1.show() ss.stop()}}
-packagecreate.methon1caseclass Student(name:String,age:Int)
- 2、通过SparkSession自带的createDataFrame函数从集合或者RDD中创建DataFrame—使用并不多-
packagecreate.methon2importorg.apache.spark.SparkConfimportorg.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}importorg.apache.spark.sql.{DataFrame, Row, SparkSession}/** * createDataFrame函数从集合中创建DataFrame */object Demo01 {def main(args: Array[String]):Unit={val sparkConf: SparkConf =new SparkConf()val ss: SparkSession = SparkSession.builder().appName("seq to df").master("local[*]").config(sparkConf).getOrCreate()/** * 1、通过Scala的seq集合创建DataFrame 列名是自动生成的 */val array:Seq[(String,Int)]= Array(("zs",20),("ls",30))val df:DataFrame = ss.createDataFrame(array) df.printSchema() df.show()val array1:Seq[Student]= Array(Student("zs",20),Student("ls",30))val df1:DataFrame = ss.createDataFrame(array1) df1.printSchema() df1.show()/** * 2、从java集合中创建DataFrame,如果是Java集合,必须传入一个BeanClass * 同时如果Java集合中存放的数据类型是Row类型,那么必须传入StructType指定row的结构 * * java集合中如果使用BeanClass构建DaraFrame,要求Java集合中存放的数据类型也必须是Bean的类型 * BeanClass必须有getter和setter方法 */val list: java.util.List[Student]= java.util.Arrays.asList(Student("ls",20),Student("zs",30))val df2 = ss.createDataFrame(list,classOf[Student]) df2.printSchema() df2.show()/** * 3、java集合的类型为row类型 */val list1: java.util.List[Row]= java.util.Arrays.asList(Row("ls",20),Row("zs",30))val df3 = ss.createDataFrame(list1,StructType(java.util.Arrays.asList(StructField("name",DataTypes.StringType),StructField("age",DataTypes.IntegerType)))) df3.printSchema() df3.show() ss.stop()}}
-packagecreate.method2importcreate.methon2.Studentimportorg.apache.spark.SparkConfimportorg.apache.spark.rdd.RDDimportorg.apache.spark.sql.types.{DataTypes, StructField, StructType}importorg.apache.spark.sql.{DataFrame, Row, SparkSession}importjava.util/** * createDataFrame函数从RDD中创建DataFrame(操作手法完全一致的) */object Demo02 {def main(args: Array[String]):Unit={val sparkConf: SparkConf =new SparkConf()val ss: SparkSession = SparkSession.builder().config(sparkConf).appName("seq to df").master("local[*]").getOrCreate()/** * 1、通过Scala的seq集合创建DataFrame 列名是自动生成的 */val array:Seq[(String,Int)]= Array(("zs",20),("ls",30))val rdd:RDD[(String,Int)]= ss.sparkContext.makeRDD(array)val df:DataFrame = ss.createDataFrame(rdd) df.printSchema() df.show()val array1: Seq[Student]= Array(Student("zs",20))val rdd1:RDD[Student]= ss.sparkContext.makeRDD(array1)val df1: DataFrame = ss.createDataFrame(rdd1,classOf[Student]) df1.printSchema() df1.show()/** * 3、java集合的类型为row类型 */val array2:Array[Row]= Array(Row("zs",20),Row("ww",30))val rdd2:RDD[Row]= ss.sparkContext.makeRDD(array2)val df3 = ss.createDataFrame(rdd2, StructType(Array(StructField("name", DataTypes.StringType), StructField("age", DataTypes.IntegerType)))) df3.printSchema() df3.show() ss.stop()}}
-packagecreate.methon2importscala.beans.BeanPropertycaseclass Student(@BeanPropertyvar name:String,@BeanPropertyvar age:Int)
- 3、从Spark SQL支持的数据源创建DataFrame(HDFS、Hive、JSON文件、CSV文件等等):使用频率最高的- 外部存储HDFS中读取数据成为DataFrame- ss.read.format(“jsonxx”).load(“path”) 不太好用- ss.read.option(key,value).option(…).csv/json(path)- 从jdbc支持的数据库创建DataFrame- ss.read.jdbc(url,table,properties)
packagecreate.methon3importorg.apache.spark.SparkConfimportorg.apache.spark.sql.{DataFrame, SparkSession}importjava.util.Properties/** * 从外部存储读取数据成为DataFrame */object Demo01 {def main(args: Array[String]):Unit={val sparkConf:SparkConf =new SparkConf()val ss:SparkSession = SparkSession.builder().appName("storage to df").master("local[*]").getOrCreate()/** * 从csv文件读取数据成为DataFrame */val df:DataFrame = ss.read.option("header","true").format("csv").load("file:///D://Desktop/Student.csv") df.printSchema() df.show()/** * 读取模式有三种: * permissive:默认的 * dropMalformed * failfast */val df1:DataFrame = ss.read.option("header","true").format("csv").option("mode","permissive").csv("file:///D://Desktop/Student.csv") df1.printSchema() df1.show()/** * 从json文件创建DataFrame * json文件中要求一个json对象独占一行 */val df2:DataFrame = ss.read.option("mode","dropMalformed").json("file:///D://Desktop/Student.json") df2.printSchema() df2.show()/** * 从普通的文本文档创建DataFrame---不太实用 */val df3 = ss.read.text("file:///D://Desktop/Student.csv") df3.printSchema() df3.show()/** * 从JDBC可以连接的数据库(rdbms、Hive)创建DataFrame */val prop:Properties =new Properties() prop.setProperty("user","root") prop.setProperty("password","root")val df4 = ss.read.jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8","student",prop) df4.printSchema() df4.show() ss.stop()}}
- 读取Hive数据成为DataFrame- 1、通过SparkSession开启Hive的支持- 2、引入spark-hive的编程依赖- 3、通过ss.sql()-packagecreate.methon3importorg.apache.spark.SparkConfimportorg.apache.spark.sql.{DataFrame, SparkSession}/** * 连接Hive创建DataFrame: * 1、jdbc方式(基本的操作只能查询表中的所有字段 所有数据) * 2、Spark SQL On Hive:用Hive作为数据存储,用Spark直连Hive 操作Hive中的数据 * 不是使用JDBC的方式,而是使用的Hive的元数据库来完成的 * 两步操作:(1)需要把Hive的配置文件放到项目的resources目录下,如果在集群环境下,我们需要把hive的配置文件放到spark的conf目录下,(2)需要开启SparkSession的hive支持 */object Demo02 {def main(args: Array[String]):Unit={val sparkConf:SparkConf =new SparkConf()val sparkSession:SparkSession = SparkSession.builder().appName("spark sql on hive").master("local[*]").config(sparkConf).enableHiveSupport().getOrCreate()/** * 从Hive中读取数据创建DataFrame */val df:DataFrame = sparkSession.sql("select * from project.ods_user_behavior_origin") df.printSchema() df.show()//新建数据表 sparkSession.sql("create table test (name string,age int,sex string) row format delimited fields terminated by '*'") sparkSession.stop()}}
- 4、从其他的DataFrame转换的来
五、DataFrame的编程风格
- 通过代码来操作计算DataFrame中数据
- DSL编程风格- DataFrame和Dataset提供了一系列的API操作,API说白了就是Spark SQL中算子操作,可以通过算子操作来获取DataFrame或者Dataset中的数据。- 转换算子- RDD具备的算子DataFrame基本上都可以使用。- DataFrame还增加了一些和SQL操作有关的算子: selectExpr、where/filter、groupBy、orderBy/sort、limit、join操作算子算子概念limit获得指定前n行数据并形成新的 dataframewhere、filter条件过滤select根据传入的 string 类型字段名,获取指定字段的值,以 DataFrame 类型返回join按指定的列进行合并两个dataframegroupBy按指定字段进行分组,后面可加聚合函数对分组后的数据进行操作orderBy、sort按指定字段排序selectExpr对指定字段进行特殊处理,可以对指定字段调用 UDF 函数或者指定别名;selectExpr 传入 string 类型的参数,返回 DataFrame 对象。- 行动算子- RDD具备的行动算子DataFrame和Dataset也都具备一些- collect/collectAsList:不建议使用,尤其是数据量特别庞大的情况下- foreach/foreachPartition- 获取结果集的一部分数据- first/take(n)/head(n)/takeAsList(n)/tail(n)- 获取的返回值类型就是Dataset存储的数据类型- printSchema:获取DataFrame或者Dataset的表结构的- show()/show(num,truncate:boolean)/show(num,truncate:Int)/show(num,truncate:Int,ver:boolean)- 保存输出的算子- 文件系统- df/ds.write.mode(SaveMode).csv/json/parquet/orc/text(path–目录)- text纯文本文档要求DataFrame和Dataset的结果集只有一列 而且列必须是String类型- JDBC支持的数据库- df/ds.write.mode().jdbc- foreach|foreachPartition-
packageopratorimportorg.apache.spark.SparkConfimportorg.apache.spark.sql.{Dataset, SaveMode, SparkSession}importjava.util.Propertiesobject Demo03 {def main(args: Array[String]):Unit={val sparkConf: SparkConf =new SparkConf()val ss: SparkSession = SparkSession.builder().appName("action").master("local[*]").config(sparkConf).enableHiveSupport().getOrCreate()importss.implicits._ /** * 创建DataFrame */val array:Seq[(String,Int,String)]= Array(("zs",20,"man"),("ls",30,"woman"),("ww",40,"man"),("ml",50,"woman"))val dataset:Dataset[(String,Int,String)]= array.toDS()// dataset.show()/** * 保存到MySQL当中 JDBC连接保存 */val prop =new Properties() prop.setProperty("user","root") prop.setProperty("password","root") dataset.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://localhost:3306/spark?serverTimezone=Asia/Shanghai","Student",prop) ss.stop()}}
- 执行前- 执行后- Hive- df/ds.write.mode().saveAsTable(“库名.表名”)- 1、保证hive支持开启的- 2、保存的数据底层在HDFS上以parquet文件格式保存的-dataset.write.mode(SaveMode.Append).saveAsTable("default.demo")
- - SQL编程风格- 1、将创建的DataFrame加载为一个临时表格- 2、然后通过ss.sql(sql语句)进行数据的查询
packageopratorimportorg.apache.spark.SparkConfimportorg.apache.spark.sql.{DataFrame, SparkSession}object Demo01 {def main(args: Array[String]):Unit={val sparkConf:SparkConf =new SparkConf()val ss:SparkSession = SparkSession.builder().appName("spark sql on hive").enableHiveSupport().master("local[*]").getOrCreate()/** * 从Hive中读取数据创建DataFrame */val df:DataFrame = ss.sql("select * from project.ods_user_behavior_origin") df.createTempView("test_spark_sql")val df1 = ss.sql("select ip_addr,parse_url(request_url,'HOST') as host,age from test_spark_sql") df1.show() df.selectExpr("ip_addr","parse_url(request_url,'HOST') as host").show() df.select("age","ip_addr").where("age>40").show() ss.stop()}}
六、DataSet的创建和使用
Dataset有类型,DataFrame无类型的。
创建
- 1、隐式转换,toDS()-
packagecreatedatasetimportorg.apache.spark.SparkConfimportorg.apache.spark.sql.{Dataset, SparkSession}importscala.beans.BeanPropertycaseclass Student(@BeanPropertyvar name:String,@BeanPropertyvar age:Int)object Demo01 {def main(args: Array[String]):Unit={val sparkConf:SparkConf =new SparkConf()val sparkSession:SparkSession = SparkSession.builder().appName("createds").config(sparkConf).master("local[*]").getOrCreate()importsparkSession.implicits._ /** * 通过隐式转换从集合或者rdd创建Dataset */val array:Seq[(String,Int)]= Array(("zs",20),("ls",30))val ds:Dataset[(String,Int)]= array.toDS() ds.printSchema() ds.show()val array1:Seq[Student]= Array(Student("zs",30),Student("ls",20))val ds1:Dataset[Student]= array1.toDS() ds1.printSchema() ds1.show() sparkSession.stop()}}
- - 2、通过SparkSession的createDataset函数创建-
/** * 通过SparkSession的createDataset函数创建 */val rdd:RDD[Student]= sparkSession.sparkContext.makeRDD(array1)val ds2:Dataset[Student]= sparkSession.createDataset(rdd)ds2.show()
- - 3、通过DataFrame转换得到Dataset df.as[类型-Bean对象必须有getter、setter方法] 也是需要隐式转换的-
/** * 通过DataFrame转换得到Dataset */val df:DataFrame = sparkSession.createDataFrame(rdd, classOf[Student])val ds3:Dataset[Student]= df.as[Student]ds3.show()
-
七、Spark SQL的函数操作
Spark SQL基本上常见的MySQL、Hive中函数都是支持的。
packagefunctionimportorg.apache.spark.SparkConf
importorg.apache.spark.sql.{DataFrame, SparkSession}object Demo01 {def main(args: Array[String]):Unit={val sparkConf:SparkConf =new SparkConf()val ss:SparkSession = SparkSession.builder().appName("function").master("local[*]").enableHiveSupport().config(sparkConf).getOrCreate()importss.implicits._
val array:Seq[(Int,String,Int)]= Array((1,"zs",80),(1,"ls",90),(1,"ww",65),(1,"ml",70),(2,"zsf",70),(2,"zwj",67),(2,"qf",76),(2,"dy",80))val df:DataFrame = array.toDF("classId","studentName","score")
df.createOrReplaceTempView("student_score_temp")
ss.sql("select *,row_number() over(partition by classId order by score desc) as class_rank from student_score_temp").show()
ss.stop()}}
ss.sql("select * from (select *,row_number() over(partition by classId order by score desc) as class_rank from student_score_temp) as temp where temp.class_rank < 2").show()
val array: Seq[(String,String)]= Array(("zs","play,eat,drink"),("ls","play,game,run"))val df: DataFrame = array.toDF("name","hobby")
df.createOrReplaceTempView("temp")/**
* zs play,eat,drink
* ls play,game,run
* zs play
* zs eat
*/
ss.sql("select temp.name,a.bobby from temp lateral view explode(split(hobby,',')) a as bobby").show()
自定义函数
- ss.udf.register(name,函数)
packagefunctionimportorg.apache.spark.SparkConf
importorg.apache.spark.sql.expressions.Aggregator
importorg.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession}object Demo02 {def main(args: Array[String]):Unit={val sparkConf: SparkConf =new SparkConf()val ss: SparkSession = SparkSession.builder().appName("createMyFunction").master("local[*]").config(sparkConf).enableHiveSupport().getOrCreate()importss.implicits._
ss.udf.register("my_length",(name:String)=>{name.length})val array: Seq[(String,String)]= Array(("zs","play,eat,drink"),("ls","play,game,run"))val df: DataFrame = array.toDF("name","hobby")
df.selectExpr("my_length(hobby)").show()
ss.udf.register("my_avg",new My())val array1: Seq[(String,Int)]= Array(("zs",20),("ls",30))val df1: DataFrame = array1.toDF("name","score")
df1.selectExpr("my_avg(score)").show()
ss.stop()}}class My_AVG extends Aggregator[Int,(Int,Int),java.lang.Double]{/**
* 设置初始值的 是缓冲区的初始值
* @return
*/overridedef zero:(Int,Int)=(0,0)/**
* 当输入一个结果之后,缓冲区如何对输入的结果进行计算
*
* @param b 缓冲区
* @param a 输入的某一个值
* @return
*/overridedef reduce(b:(Int,Int), a:Int):(Int,Int)={(b._1+a,b._2+1)}/**
* 分区之间的合并
*
* @param b1
* @param b2
* @return
*/overridedef merge(b1:(Int,Int), b2:(Int,Int)):(Int,Int)={(b1._1+b2._1,b1._2+b2._2)}/**
* 最后的结果
*
* @param reduction
* @return
*/overridedef finish(reduction:(Int,Int)): java.lang.Double={
reduction._1.toDouble / reduction._2
}overridedef bufferEncoder: Encoder[(Int,Int)]= Encoders.product[(Int,Int)]overridedef outputEncoder: Encoder[java.lang.Double]= Encoders.DOUBLE
}----------------------------------------------------------------------------------------------------------------------------------------------------------------packagefunctionimportorg.apache.spark.sql.Row
importorg.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}importorg.apache.spark.sql.types.{DataType, DataTypes, StructField, StructType}class My extends UserDefinedAggregateFunction{overridedef inputSchema: StructType = StructType(Array(StructField("score",DataTypes.IntegerType)))overridedef bufferSchema: StructType = StructType(Array(StructField("sum",DataTypes.IntegerType),StructField("count",DataTypes.IntegerType)))overridedef dataType: DataType = DataTypes.DoubleType
overridedef deterministic:Boolean=trueoverridedef initialize(buffer: MutableAggregationBuffer):Unit={
buffer(0)=0
buffer(1)=0}overridedef update(buffer: MutableAggregationBuffer, input: Row):Unit={
buffer(0)= buffer.getInt(0)+input.getInt(0)
buffer(1)= buffer.getInt(1)+1}overridedef merge(buffer1: MutableAggregationBuffer, buffer2: Row):Unit={
buffer1(0)= buffer1.getInt(0)+ buffer2.getInt(0)
buffer1(1)= buffer1.getInt(1)+ buffer2.getInt(1)}overridedef evaluate(buffer: Row):Any={
buffer.getInt(0).toDouble/buffer.getInt(1)}}
版权归原作者 Augenstern K 所有, 如有侵权,请联系我们删除。