0


Spark SQL

一.SparkSql

  • SparkSQL 可以简化 RDD 的开发,提高开发效率.提供了 2 个编程抽象,类似 Spark Core 中的 RDD ➢ DataFrame ➢ DataSet

1.SparkSQL 特点

➢ 易整合
无缝的整合了 SQL 查询和 Spark 编程
➢ 统一的数据访问
使用相同的方式连接不同的数据源
➢ 兼容 Hive
在已有的仓库上直接运行 SQL 或者 HiveQL
➢ 标准数据连接
通过 JDBC 或者 ODBC 来连接

DataFrame和DataSet

  1. DataFrameDataFrame也是一种基于RDD的分布式数据集, RDD的区别在于DataFrame中有数据的原信息
  2. DataFrame可以理解为传统数据库中的一张二维表格,每一列都有列名和类型
  3. DataSetDataSet也是分布式数据集,对DataFrame的一个扩展,相当于传统JDBC中的ResultSet

2.SparkSQL 核心编程

新的起点

  1. SparkCore中需要创建上下文环境SparkContext
  2. SparkSqlSparkCore的封装, 不仅仅是功能上的封装,上下文件环境也封装了
  3. 老版本中称为 SQLContext 用于Spark自己的查询 HiveContext 用于Hive连接的查询
  4. 新版本中称为 SparkSession SQLContext HiveContext的组成 , 所以他们的API是通用的
  5. 同时 SparkSession也可以直接获取到SparkContext对象

在这里插入图片描述

3.DataFrame的创建和使用

  1. 三个概念:
  2. 数据: RDD中只关心数据 比如:(1,"jack",20) 并不关心每个字段的汉字
  3. 结构:DataFrame关心 数据+结构 比如:{"id":1,"name":"jack","age":20} 关心每个字段数据的类型
  4. 类型:DataSet关系 数据+结构+类型 比如:DataSet[Person]Person是我们定义好的类, 既有类型+字段+数据

3.1 创建DataFrame

  1. 从数据源中创建
  2. scala>var df = spark.read.json("data/info.json")
  3. df:org.apache.spark.sql.DataFrame=[ age: bigint , id: bigint ]
  4. RDD中转换(后续章节补充)
  5. HiveTable查询返回(后续章节补充)

在这里插入图片描述

3.2 使用DataFrame

  1. 使用DataFrame有两个方式,分别是 SQL语法和DSL语法
  2. SQL语法
  3. 1. 通过 "临时视图" 来使用,所以先创建视图
  4. 2. 通过 sparkSession对象执行sql进行数据查询
  5. scala> df.createOrReplaceTempView("user")//创建临时视图
  6. scala>var viewdf = spark.sql("select id,name,age from user")//通过spark执行sql
  7. viewdf:org.apache.spark.sql.DataFrame=[id: bigint, name: string]//执行sql返回的还是DF
  8. scala> viewdf.show //展示DF中的数据
  9. scala> spark.sql("select id,name,age from user").show //也可以直接查询sql并展示+---+-----+---+| id| name|age|+---+-----+---+|1|jack1|18||2|jack2|28||3|jack3|38|+---+-----+---+
  10. 注意:
  11. df.createOrReplaceTempView 只能创建当前会话有效的临时视图
  12. df.createOrReplaceGlobalTempView 能创建所有会话都有效的临时视图
  13. 使用时 需要在视图名前面加上 global_temp.视图名
  14. DSL语法
  15. DSL称为 Domain-SpecificLanguage 特定领域语言
  16. 这是 DataFrame中管理结构化数据的API ,通过DataFrame就可以调用这些API
  17. scala> df.printSchema
  18. root
  19. |-- age:long(nullable =true)|-- id:long(nullable =true)|-- name: string (nullable =true)
  20. scala> df.select("name")
  21. res20:org.apache.spark.sql.DataFrame=[name: string]//基本查询
  22. scala> df.select("name").show
  23. +-----+| name|+-----+|jack1||jack2||jack3|+-----+//列运算
  24. scala> df.select($"age"+1).show
  25. scala> df.select('age +1).show
  26. +---------+|(age +1)|+---------+|19||29||39|+---------+//取别名
  27. scala> df.select('name,'age +1 as "aa").show
  28. +-----+---+| name| aa|+-----+---+|jack1|19||jack2|29||jack3|39|+-----+---+//统计函数
  29. scala> df.select(avg("age") as "平均年龄").show
  30. +--------+|平均年龄|+--------+|48.0|+--------+//条件过滤
  31. scala> df.filter('age >25).show
  32. +---+---+-----+|age| id| name|+---+---+-----+|28|2|jack2||38|3|jack3|+---+---+-----+//组合+聚合函数
  33. scala> df.groupBy("id").count.show
  34. +---+-----+| id|count|+---+-----+|1|1||3|1||2|1|+---+-----+

3.3 DataFrame转换

  1. RDD DF 转换需要导入 隐式函数
  2. importspark.implicits._
  3. 这里的spark SparkSession的对象名,因此需要创建好SparkSession对象之后导入,并且该对象必须是val常量
  4. 1. RDD ==> DF ,缺少结构,即字段名
  5. scala>var rdd = spark.sparkContext.makeRDD(List(1,2,3))
  6. rdd: org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[129] at makeRDD at <console>:23
  7. scala>var df = rdd.toDF("id")
  8. df:org.apache.spark.sql.DataFrame=[id:int]
  9. scala> df.show
  10. +---+| id|+---+|1||2||3|+---+2. DF ===> RDD DF内部封装了RDD 直接获取即可
  11. 删除结构后,DF中每一行 就会变成一个Row对象
  12. 通过Row对象的get(index) 或者 getAs[Type](index)方法获取Row对象中的数据
  13. scala>var df = spark.read.json("data/info.json");
  14. df:org.apache.spark.sql.DataFrame=[age: bigint, id: bigint]
  15. scala>var rdd = df.rdd
  16. rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
  17. scala>var arr = rdd.collect
  18. arr:Array[org.apache.spark.sql.Row]=Array([18,1,jack1],[28,2,jack2],[38,3,jack3])
  19. scala>arr(0)
  20. res62:org.apache.spark.sql.Row=[18,1,jack1]
  21. scala>arr(0).get(0)
  22. res63:Any=18
  23. scala>arr(0).getAs[String](2)
  24. res67:String= jack1

3.4 DataSet

  • DataSet 即有数据,又有结构,也有类型, DataFrame其实一个特殊的DataSet,其类型是DataSet[Row]
  1. 通过Seq或者List 可以把集合直接转成DS
  2. 1. 通过基本类型的集合
  3. scala>var ds =List(1,2,3).toDS
  4. ds:org.apache.spark.sql.Dataset[Int]=[value:int]
  5. scala>var ds =List(1.1,2.2).toDS
  6. ds:org.apache.spark.sql.Dataset[Double]=[value:double]2.通过已定义类型的集合
  7. scala>caseclassUser(age:Int,name:String)
  8. defined classUser
  9. scala>var ds =List(User(10,"jack"),User(20,"rose")).toDS
  10. ds:org.apache.spark.sql.Dataset[User]=[age:int, name: string]

3.5 DataSet的转换

  1. 在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet1. RDD ==> DS 缺少结构和类型
  2. a. RDD DS 我们一般可以映射成 具体具体类型的RDD之后再转DS
  3. scala>caseclassUser(name:String, age:Int)
  4. defined classUser
  5. scala> sc.makeRDD(List(("zhangsan",30),("lisi",49))).map(t=>User(t._1,t._2)).toDS
  6. res11:org.apache.spark.sql.Dataset[User]=[name: string, age:int]
  7. b. 也可以直接ToDS
  8. scala> sc.makeRDD(List(("zhangsan",30),("lisi",49))).toDS
  9. res11:org.apache.spark.sql.Dataset[(String,Int)]=[_1:String, _2:Int]2.DS ==> RDD DS内部封装了RDD 直接获取即可,且获取出来的RDD也是带有类型的
  10. scala>var ds =List(User("aa",11),User("bb",22)).toDS
  11. ds:org.apache.spark.sql.Dataset[User]=[name: string, age:int]
  12. scala>var rdd = ds.rdd
  13. rdd: org.apache.spark.rdd.RDD[User]

3.5 DataFrame和DataSet的转换

  1. DataFrame==>DataSet 需要一个类型
  2. scala>caseclassUser(name:String, age:Int)
  3. defined classUser
  4. scala> val df = sc.makeRDD(List(("zhangsan",30),("lisi",49))).toDF("name","age")
  5. df:org.apache.spark.sql.DataFrame=[name: string, age:int]
  6. scala> val ds = df.as[User]
  7. ds:org.apache.spark.sql.Dataset[User]=[name: string, age:int]
  8. DataSet==>DataFrame 删除类型 即变成 DataSet[Row]
  9. scala> val df = ds.toDF
  10. df:org.apache.spark.sql.DataFrame=[name: string, age:int]

3.6 RDD、DataFrame、DataSet

  1. DF ==rdd==> RDD [ ROW ]
  2. DS ==rdd==> RDD [Type]
  3. rdd 如果是DF 那么泛型是ROW 如果是DS泛型就是DS的泛型
  4. RDD ==toDF==> DF [ ROW ]
  5. DS ==toDF==> DF [ ROW ]
  6. DF 无论如何DF没有类型 所以都是ROW
  7. DF ==as[Type]==> DS[Type]
  8. RDD ====> DS [ RDD的泛型 ]
  9. DS 如果是DF 那么泛型是Type 如果是RDD泛型就是RDD的泛型

4. IDEA 开发 SparkSQL

  1. object Spark_SQL_Start {def main(args: Array[String]):Unit={//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().config(new SparkConf().setMaster("local[*]").setAppName("start01")).getOrCreate()importspark.implicits._
  2. //2.DF的创建和使用val df: DataFrame = spark.read.json("datas/info.json").cache()//SQL
  3. df.createOrReplaceTempView("User")
  4. spark.sql("select * from User").show()//DSL
  5. df.select("name").show()
  6. df.groupBy("id").count().show()
  7. println("-----------------------")//3.DF ==> DS DF ==> RDDval ds: Dataset[User]= df.as[User]
  8. ds.show()val rdd: RDD[Row]= df.rdd
  9. rdd.collect().foreach(println)
  10. println("-----------------------")//4.RDD ==> DF RDD ==> DSval rdd1: RDD[(Int,String)]= spark.sparkContext.makeRDD(Seq((10,"tom"),(20,"jack")))var df1: DataFrame = rdd1.toDF("id","name")
  11. df1.show()val ds1: Dataset[User]= rdd1.map(t=>User(t._1,t._2)).toDS()
  12. ds1.show()
  13. println("-----------------------")//5.DS==>RDD DS==>DFval rdd2: RDD[User]= ds1.rdd
  14. rdd2.collect().foreach(println)val df2: DataFrame = ds1.toDF()
  15. df2.show()//6.关闭
  16. spark.close()}caseclass User(id:Long,name:String)}

5. UDF

  • 执行spark.sql时 可以使用用户自定义函数,实现自己想要的功能
  • 通过spark.udf.register注册函数即可使用
  1. object Spark_Sql_UDF{
  2. def main(args:Array[String]):Unit={
  3. val spark:SparkSession=SparkSession.builder().config(newSparkConf().setAppName("UDF").setMaster("local[*]")).getOrCreate()importspark.implicits._
  4. //注册用户自定义函数
  5. spark.udf.register("getWithName",(x)=>{"Name:"+x})//创建DF
  6. val df:DataFrame= spark.read.format("json").load("datas/info.json")//创建临时表
  7. df.createOrReplaceTempView("user")//使用sql自定义函数
  8. spark.sql("select getWithName(name),id from user").show()
  9. spark.close()}}
  10. info.json:{"id":10,"name":"jack"}{"id":20,"name":"rose"}{"id":30,"name":"tom"}
  11. console:|getWithName(name)| id|+-----------------+---+|Name:jack|10||Name:rose|20||Name:tom|30|+-----------------+---+

5. UDAF

  • 用户自定义聚合函数
  • 通过 - 继承 UserDefinedAggregateFunction 弱类型的聚合函数 ( Spark3.0之前 )- 继承 Aggregator 强类型的聚合函数 ( Spark3.0 )
  1. 需求: 自定义求平均值函数 avgAge
  2. 通过RDD实现
  3. val rdd: RDD[(String,Int)]= spark.sparkContext.makeRDD(Seq(("jack",10),("rose",20),("tom",30)))val ageOneRdd: RDD[(Int,Int)]= rdd.map {case(_, age)=>(age,1)}val ageCount:(Int,Int)= ageOneRdd.reduce((t1, t2)=>(t1._1 + t2._1, t1._2 + t2._2))
  4. println(ageCount._1 / ageCount._2)
  5. 通过累加器实现
  6. val ageAcc =new AgeAccumulator
  7. spark.sparkContext.register(ageAcc)val rdd: RDD[(String,Int)]= spark.sparkContext.makeRDD(Seq(("jack",10),("rose",20),("tom",30)))
  8. rdd.foreach {case(_, age)=> ageAcc.add(age)}val ageCount:(Int,Int)= ageAcc.value
  9. println(ageCount._1 / ageCount._2)class AgeAccumulator extends AccumulatorV2[Int,(Int,Int)]{privatevar ageSum:Int=0privatevar ageCnt:Int=0overridedef isZero:Boolean= ageSum ==0&& ageCnt ==0overridedef copy(): AccumulatorV2[Int,(Int,Int)]=new AgeAccumulator
  10. overridedef reset():Unit={
  11. ageSum =0
  12. ageCnt =0}overridedef add(age:Int):Unit={
  13. ageSum += age
  14. ageCnt +=1}overridedef merge(other: AccumulatorV2[Int,(Int,Int)]):Unit={
  15. ageSum += other.value._1
  16. ageCnt += other.value._2
  17. }overridedef value:(Int,Int)=(ageSum, ageCnt)}
  18. 通过 继承 UDAF (Spark3.0之前) 抽象类实现自定义聚合函数
  19. //创建DFval rdd: RDD[(String,Int)]= spark.sparkContext.makeRDD(Seq(("jack",10),("rose",20),("tom",30)))val df: DataFrame = rdd.toDF("name","age")//创建自定义集合函数对象val ageUDAF =new AgeUDAF
  20. //注册UDAF
  21. spark.udf.register("ageAVG", ageUDAF)//创建临时表
  22. df.createOrReplaceTempView("user")//执行sql
  23. spark.sql("select ageAVG(age) from user").show()class AgeUDAF extends UserDefinedAggregateFunction {//聚合函数输入的数据类型overridedef inputSchema: StructType ={
  24. StructType(
  25. Array(
  26. StructField("age", IntegerType)))}//计算过程的缓冲区overridedef bufferSchema: StructType ={
  27. StructType(
  28. Array(
  29. StructField("ageSum", LongType),
  30. StructField("ageCnt", LongType)))}//聚合函数返回值类型overridedef dataType: DataType = DoubleType
  31. // 稳定性:对于相同的输入是否一直返回相同的输出overridedef deterministic:Boolean=true// 函数缓冲区初始化overridedef initialize(buffer: MutableAggregationBuffer):Unit={
  32. buffer(0)=0L
  33. buffer(1)=0L}//累加计算overridedef update(buffer: MutableAggregationBuffer, input: Row):Unit={
  34. buffer(0)= buffer.getLong(0)+ input.getInt(0)
  35. buffer(1)= buffer.getLong(1)+1}//合并overridedef merge(buffer1: MutableAggregationBuffer, buffer2: Row):Unit={
  36. buffer1(0)= buffer1.getLong(0)+ buffer2.getLong(0)
  37. buffer1(1)= buffer1.getLong(1)+ buffer2.getLong(1)}//计算结果overridedef evaluate(buffer: Row):Double= buffer.getLong(0).toDouble / buffer.getLong(1)}
  38. 通过继承 Aggregate (Spark3.0) 自定义强类型聚合函数
  39. //创建强类型UDAF对象val ageAggr =new AgeAggregator
  40. //注册强类型的UDAF,需要使用functions.udaf进行函数转换
  41. spark.udf.register("ageAggr", functions.udaf(ageAggr))//创建rddval rdd: RDD[(String,Int)]= spark.sparkContext.makeRDD(Seq(("jack",10),("rose",20),("tom",30)))//转成DFval df: DataFrame = rdd.toDF("name","age")//创建临时表
  42. df.createOrReplaceTempView("user")//执行sql 直接使用聚合函数
  43. spark.sql("select ageAggr(age) from user").show()class AgeAggregator extends Aggregator[Int,(Long,Long),Double]{//缓冲区 初始值overridedef zero:(Long,Long)=(0,0)//输入age到缓冲区计算overridedef reduce(buff:(Long,Long), age:Int):(Long,Long)={(buff._1 + age, buff._2 +1)}//合并多个缓冲区overridedef merge(buff1:(Long,Long), buff2:(Long,Long)):(Long,Long)={(buff1._1 + buff2._1, buff1._2 + buff2._2)}//计算结果overridedef finish(buff:(Long,Long)):Double={
  44. buff._1.toDouble / buff._2
  45. }//输入编码,自定义对象Encoders.product 其他Encoders.scalaXxxoverridedef bufferEncoder: Encoder[(Long,Long)]= Encoders.tuple(Encoders.scalaLong, Encoders.scalaLong)//输出编码,自定义对象Encoders.product 其他Encoders.scalaXxxoverridedef outputEncoder: Encoder[Double]= Encoders.scalaDouble
  46. }

6. 数据的加载和保存

通用的加载和保存方式

  1. SparkSQL 提供了通用的保存数据和加载数据的方式. 默认的保存和加载数据的格式都是parquet
  2. scala> spark.read.load("parquet path") df.write.save("parquet path")
  3. 我们可以通过设置不同的参数,来指定不同的数据源格式,读取和保存数据均可
  4. scala> spark.read.format("json")[.option("key","value")].load("filepath")
  5. format("...") 指定读取数据源的格式, 包括 "csv""jdbc""json""orc""parquet" "textFile"
  6. option("key","value") 如果formatjdbc, 那么使用多个option传递JDBC参数
  7. 也有一些简化的方法,用于特定的文件读取,从而省略format调用
  8. spark.read.json("json file path") spark.read.cvs("cvs file path")
  9. scala> spark.read.
  10. csv jdbc load options parquet table textFile
  11. format json option orc schema text
  12. 保存操作:
  13. df.write.mode("SaveMode String").save("parquet file path")
  14. 除了和读取操作一样的参数之外, 另有一个模式,表示保存时的状态
  15. SaveMode.ErrorIfExists(default)==>"error"(默认的) 如果文件已经存在则抛出异常
  16. SaveMode.Append ==>"append" 如果文件已经存在则追加(会有多个文件生成)
  17. SaveMode.Overwrite ==>"overwrite" 如果文件已经存在则覆盖(把以前的文件删除)
  18. SaveMode.Ignore ==>"ignore" 如果文件已经存在则忽略(不生成新文件,保留原来的文件 )
  19. scala> df.write.mode("append").json("/output")

7. MySQL数据读取和写入

➢ spark-shell 命令行连接

  1. 1. 添加mysql驱动到sparkjars目录下
  2. 2. scala> val jdbcDF = spark.read.format("jdbc").options(Map("url" ->"jdbc:mysql://localhost:3306/mysql", "driver" ->"com.mysql.jdbc.Driver", "dbtable" ->"plugin", "user" ->"root", "password" ->"1234")).load()

➢ scala代码连接

  1. 1.导入mysql的驱动依赖
  2. <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency>2.读取mysql数据
  3. spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://192.168.189.90:3306/mysql","driver"->"com.mysql.cj.jdbc.Driver","user"->"root","password"->"1234","dbtable"->"help_topic")).load().show()
  4. spark.read.format("jdbc").option("url","jdbc:mysql://192.168.189.90:3306/mysql").option("driver","com.mysql.cj.jdbc.Driver").option("user","root").option("password","1234").option("dbtable","plugin").load().show
  5. val props: Properties =new Properties()
  6. props.setProperty("user","root")
  7. props.setProperty("password","1234")
  8. spark.read.jdbc("jdbc:mysql://node0:3306/mysql","plugin", props).show()3.写入数据到mysql
  9. val df: DataFrame = spark.sparkContext.makeRDD(List(1,2,3)).toDF("id")
  10. df.write.format("jdbc").options(Map("url"->"jdbc:mysql://192.168.189.90:3306/test","driver"->"com.mysql.cj.jdbc.Driver","user"->"root","password"->"1234","dbtable"->"ids")).mode(SaveMode.Append)//默认的模式是 表存在则报错,这里指定追加 表存在不会保存.save()

8.Spark连接Hive

  • Apache Hive 可以理解为一个Hadoop上的SQL引擎数据库
  • SparkSQL在编译时可以加入Hive的支持也可以不加入(我们下载的Hive二进制包是包含Hive支持的) - 那么就可以 访问Hive表、UDF (用户自定义函数) 以及 Hive 查询语言(HiveQL/HQL)等- 由于Spark包含了Hive 所以连接Hive有两种方式, - 一是连接自带的内嵌Hive(不需要任何配置)- 二是连接外部的Hive 需要简单的配置

1.连接内嵌的Hive

  1. 连接内嵌Hive什么都不需要做, 默认使用derby作为元数据库,使用本地文件系统作为数据仓库
  2. 执行两个命令://执行查看数据库sql, 会自动在 $spark_home下生成metastore_db元数据库信息
  3. scala> spark.sql("show tables").show
  4. //执行创建表操作,或者插入数据操作,会自动生成并在$spark_home/spark-warehouse 存储数据
  5. scala> spark.sql("create table test(id int)")
  6. spark.sql("insert into test values(1),(2)")

2.连接外部的Hive

  • Spark 使用hive 数据仓库, - 将hive-site.xml文件 拷贝到spark的conf目录下- 添加mysql驱动包到spark的jars目录下- 如果spark访问不到hdfs, 需要将core-site.xml和hdfs-site.xml也拷贝到spark的conf目录下 [经测试是不需要拷贝这两个文件的,也不知道spark是如何找到hadoop的,hive是通过$HADOOP_HOME找到的]- 如果hive-site.xml配置了元数据服务器,需要启动元数据服务器- 重写启动spark-shell即可自动切换到hive数据仓库
  1. scala> spark.sql("show tables").show
  2. +--------+--------------------+-----------+|database| tableName|isTemporary|+--------+--------------------+-----------+| default| aa|false|| default| live_events|false|| default| login_events|false|| default|order_amount_by_p...| false|| default| order_detail|false|| default| page_view_events|false|| default| payment_detail|false|| default| product_info|false|| default| promotion_info|false|| default| province_info|false|+--------+--------------------+-----------+

2.使用Spark SQL CLI

  1. 确保Spark可以连接外部的hive之后,.就可以是用spark-sql直接连接hive进行操作
  2. [zhyp@node0 spark-local]$ bin/spark-sql
  3. spark-sql>showtables;default aa falsedefault live_events falsedefault login_events falsedefault order_amount_by_province falsedefault order_detail falsedefault page_view_events falsedefault payment_detail falsedefault product_info falsedefault promotion_info falsedefault province_info falseTime taken: 2.433 seconds, Fetched 10row(s)

2.使用Spark Beeline

  • Spark Thrift Server 是 基于HiveServer2的另外一个实现 , 完全兼容HiveServer2 并且都是一样的协议和端口 我们使用Spark Thrift Server 取代HiveServer2 和 Hive的metastore服务交互获取元数据
  • Spark Beeline 连接 Spark thrift server的步骤 - 将hive-site.xml文件 拷贝到spark的conf目录下- 如果hive-site.xml配置了元数据服务器,需要启动元数据服务器 - [如果不是用元数据服务器,需要在jars中加入mysql驱动和配置文件中添加连接数据库四大要素]- 启动Spark thrift server 使用beeline连接即可
  1. [zhyp@node0 spark-local]$ sbin/start-thriftserver.sh
  2. [zhyp@node0 spark-local]$ bin/beeline -u jdbc:hive2://linux1:10000 -n zhyp
  3. 或者
  4. [zhyp@node0 spark-local]$ bin/beeline
  5. beeline>!connect jdbc:hive2://node0:10000

3.代码连接Hive

  1. def main(args: Array[String]):Unit={//创建 SparkSession
  2. System.setProperty("HADOOP_USER_NAME","zhyp")val spark: SparkSession = SparkSession
  3. .builder().enableHiveSupport()//添加Hive支持,默认是不支持连接Hive,开启后会读取classpath下的hive-site.xml.config("spark.sql.warehouse.dir","hdfs://node0:8020/user/hive/warehouse")//通过spark创建数据库需要写这个地址,因为新建的数据库默认是在本地路径中找/user/hive/warehouse.master("local[*]").appName("sql").getOrCreate()importspark.implicits._
  4. spark.sql("create table abc(id int)").show()
  5. spark.sql("insert into abc values(1),(111)").show()
  6. spark.sql("show tables").show()
  7. spark.sql("select * from abc").show()
  8. spark.close()}

代码练习

  1. packagecom.zhyp.spark.sql.startimportorg.apache.spark.SparkConf
  2. importorg.apache.spark.sql.{Dataset, SparkSession}object Spark_SQL_Test01{def main(args: Array[String]):Unit={
  3. System.setProperty("HADOOP_USER_NAME","zhyp")//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().enableHiveSupport().config("spark.sql.warehouse.dir","hdfs://node0:8020/user/hive/warehouse").config(new SparkConf().setMaster("local[*]").setAppName("start01")).getOrCreate()importspark.implicits._
  4. spark.sql("create database test")
  5. spark.sql("use test")
  6. spark.sql("""
  7. |CREATE TABLE `user_visit_action`(
  8. | `date` string,
  9. | `user_id` bigint,
  10. | `session_id` string,
  11. | `page_id` bigint,
  12. | `action_time` string,
  13. | `search_keyword` string,
  14. | `click_category_id` bigint,
  15. | `click_product_id` bigint,
  16. | `order_category_ids` string,
  17. | `order_product_ids` string,
  18. | `pay_category_ids` string,
  19. | `pay_product_ids` string,
  20. | `city_id` bigint)
  21. |row format delimited fields terminated by '\t';
  22. """.stripMargin)
  23. spark.sql("""
  24. |load data local inpath 'datas/user_visit_action.txt' into table
  25. |user_visit_action
  26. """.stripMargin)
  27. spark.sql("""
  28. |CREATE TABLE `product_info`(
  29. | `product_id` bigint,
  30. | `product_name` string,
  31. | `extend_info` string)
  32. |row format delimited fields terminated by '\t'
  33. """.stripMargin)
  34. spark.sql("""
  35. |load data local inpath 'datas/product_info.txt' into table product_info
  36. """.stripMargin)
  37. spark.sql("""
  38. |CREATE TABLE `city_info`(
  39. | `city_id` bigint,
  40. | `city_name` string,
  41. | `area` string)
  42. |row format delimited fields terminated by '\t'
  43. """.stripMargin)
  44. spark.sql("""
  45. |load data local inpath 'datas/city_info.txt' into table city_info
  46. """.stripMargin)
  47. spark.close()}}packagecom.zhyp.spark.sql.startimportorg.apache.spark.SparkConf
  48. importorg.apache.spark.sql.expressions.Aggregator
  49. importorg.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}importscala.collection.mutableimportscala.collection.mutable.ListBuffer
  50. object Spark_SQL_Test03 {def main(args: Array[String]):Unit={
  51. System.setProperty("HADOOP_USER_NAME","zhyp")//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().enableHiveSupport().config("spark.sql.warehouse.dir","hdfs://node0:8020/user/hive/warehouse").config(new SparkConf().setMaster("local[*]").setAppName("start01")).getOrCreate()//2.执行
  52. spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF))
  53. spark.sql("use test")//1.关联三张表 过滤非点击数据
  54. spark.sql("""
  55. |select
  56. |c.area,c.city_name,p.product_name
  57. |from
  58. |user_visit_action a
  59. |join
  60. |city_info c
  61. |on a.city_id = c.city_id
  62. |join
  63. |product_info p
  64. |on a.click_product_id = p.product_id
  65. |where a.click_product_id > -1
  66. """.stripMargin).createOrReplaceTempView("t1")//2.分组 按照地区和商品分组
  67. spark.sql("""
  68. |select
  69. |area,product_name,
  70. |cityRemark(city_name) city_remark,
  71. |count(*) clickCnt
  72. |from t1
  73. |group by area,product_name
  74. """.stripMargin).createOrReplaceTempView("t2")//3.对 同一个地区的各种商品点击量排名
  75. spark.sql("""
  76. |select
  77. |*,
  78. |rank() over(partition by area order by clickCnt desc) rank
  79. |from t2
  80. """.stripMargin).createOrReplaceTempView("t3")//4.取各区域的前三名
  81. spark.sql("""
  82. |select
  83. |*
  84. |from t3
  85. |where rank <= 3
  86. """.stripMargin).show()}caseclass CityAndCntBuff(map:mutable.Map[String,Long])class CityRemarkUDAF extends Aggregator[String, CityAndCntBuff,String]{overridedef zero: CityAndCntBuff = CityAndCntBuff(mutable.Map())overridedef reduce(buff: CityAndCntBuff, city:String): CityAndCntBuff ={val map: mutable.Map[String,Long]= buff.map
  87. map.update(city, map.getOrElse(city,0L)+1L)
  88. buff
  89. }overridedef merge(buff1: CityAndCntBuff, buff2: CityAndCntBuff): CityAndCntBuff ={var map1 = buff1.map;var map2 = buff2.map
  90. map2.foreach({case(city, cnt)=>{
  91. map1.update(city, map1.getOrElse(city,0L)+ cnt)}})
  92. buff1
  93. }overridedef finish(resultBuff: CityAndCntBuff):String={val resultMap: mutable.Map[String,Long]= resultBuff.map
  94. val totalCnt:Long= resultMap.values.reduce(_ + _)val top2: List[(String,Long)]= resultMap.toList.sortBy(_._2)(Ordering.Long.reverse).take(2)val cityBuffer =new ListBuffer[String]var percentSum =0L
  95. top2.foreach({case(city, cnt)=>{val percent:Long= cnt *100/ totalCnt
  96. cityBuffer.append(s"${city}${percent}%")
  97. percentSum += percent
  98. }})if(resultMap.size >2){
  99. cityBuffer.append(s"其他 ${1- percentSum}%")}
  100. cityBuffer.mkString(", ")}overridedef bufferEncoder: Encoder[CityAndCntBuff]= Encoders.product
  101. overridedef outputEncoder: Encoder[String]= Encoders.STRING
  102. }}
标签: spark sql 大数据

本文转载自: https://blog.csdn.net/ytzhyp/article/details/129675538
版权归原作者 ytzhyp 所有, 如有侵权,请联系我们删除。

“Spark SQL”的评论:

还没有评论