0


spark第四章:SparkSQL基本操作

系列文章目录

spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码
spark第四章:SparkSQL基本操作


文章目录

前言

接下来我们学习SparkSQL他和Hql有些相似。Hql是将操作装换成MR,SparkSQL也是,不过是使用Spark引擎来操作,效率更高一些


一、添加pom

  1. <dependency>
  2. <groupId>org.apache.spark</groupId>
  3. <artifactId>spark-sql_2.12</artifactId>
  4. <version>3.2.3</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>mysql</groupId>
  8. <artifactId>mysql-connector-java</artifactId>
  9. <version>5.1.47</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.spark</groupId>
  13. <artifactId>spark-hive_2.12</artifactId>
  14. <version>3.2.3</version>

以上是这次博客需要的所有依赖,一次性全加上。

二、常用操作

在这里插入图片描述
一共这么多,挨个讲解一下

1.类型转换

SparkSQL中有三种常用的类型,RDD之前说过就不说了。
DataFrame
Spark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。
DSL 语法
DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL,使用 DSL 语法风格不必去创建临时视图了

SparkSql_Basic.scala

  1. package com.atguigu.bigdata.spark.sql
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
  5. object SparkSql_Basic {
  6. def main(args: Array[String]): Unit ={
  7. // 创建SparkSQL的运行环境
  8. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
  9. val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._
  10. // val df: DataFrame = spark.read.json("datas/user.json")
  11. // df.show()
  12. //DataFrame => SQL
  13. // df.createOrReplaceTempView("user")
  14. // spark.sql("select age from user").show()
  15. //DtaFrame => DSL
  16. // 在使用DataFrame时,如何涉及到转换操作,需要引入转换规则
  17. // df.select("age","username").show()
  18. // df.select($"age"+1).show()
  19. // df.select('age+1).show()
  20. // DataSet
  21. // DataFrame 是特定泛型的DataSet
  22. // val seq: Seq[Int]= Seq(1, 2, 3, 4)
  23. // val ds: Dataset[Int]= seq.toDS()
  24. // ds.show()
  25. // RDD <=>DataFrame
  26. val rdd=spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",40)))
  27. val df: DataFrame = rdd.toDF("id", "name", "age")
  28. val rowRDD: RDD[Row]= df.rdd
  29. // DataFrame <=> DatsSet
  30. val ds: Dataset[User]= df.as[User]
  31. val df1: DataFrame = ds.toDF()
  32. // RDD <=> DataSet
  33. val ds1: Dataset[User]= rdd.map {case(id, name, age)=>{
  34. User(id, name, age)}}.toDS()
  35. val userRDD: RDD[User]= ds1.rdd
  36. // 关闭环境
  37. spark.close()}case class User(id:Int,name:String,age:Int)}

2.连接mysql

SparkSQL提供了多种数据接口,我们可以通过JDBC连接Mysql数据库,我们先随便在数据库里边写点东西。
在这里插入图片描述
SparkSql_JDBC.scala

  1. package com.atguigu.bigdata.spark.sql
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  4. object SparkSql_JDBC {
  5. def main(args: Array[String]): Unit ={
  6. // 创建SparkSQL的运行环境
  7. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
  8. val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._
  9. val df: DataFrame = spark.read
  10. .format("jdbc")
  11. .option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
  12. .option("driver", "com.mysql.jdbc.Driver")
  13. .option("user", "root")
  14. .option("password", "000000")
  15. .option("dbtable", "user")
  16. .option("useSSL","false")
  17. .load()
  18. df.show
  19. df.write
  20. .format("jdbc")
  21. .option("url", "jdbc:mysql://hadoop102:3306/spark-sql")
  22. .option("driver", "com.mysql.jdbc.Driver")
  23. .option("user", "root")
  24. .option("password", "000000")
  25. .option("dbtable", "user1")
  26. .option("useSSL","false")
  27. .mode(SaveMode.Append)
  28. .save()
  29. // 关闭环境
  30. spark.close()}}

在这里插入图片描述

在这里插入图片描述

3.UDF函数

这个函数可以对简单的数据进行处理,但是比较局限.
这次我们从json文件读取数据
在这里插入图片描述

  1. {"username":"zhangsan", "age":20}{"username":"lisi", "age":30}{"username":"wangwu", "age":40}

SparkSql_UDF.scala

  1. package com.atguigu.bigdata.spark.sql
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
  5. object SparkSql_UDF {
  6. def main(args: Array[String]): Unit ={
  7. // 创建SparkSQL的运行环境
  8. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
  9. val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._
  10. val df: DataFrame = spark.read.json("datas/user.json")
  11. df.createOrReplaceTempView("user")
  12. spark.udf.register("prefixName",(name:String)=>{"Name:" + name
  13. })
  14. spark.sql("select age ,prefixName(username) from user").show()
  15. // 关闭环境
  16. spark.close()}}

在这里插入图片描述

4.UDAF函数

UDAF函数的处理能力就比UDF强大多了,可以完成一些更复杂的操作.
SparkSql_UDAF1.scala

  1. package com.atguigu.bigdata.spark.sql
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.sql.expressions.Aggregator
  4. import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}import org.apache.spark.sql.{DataFrame, Encoder, Encoders, Row, SparkSession, functions}
  5. object SparkSql_UDAF1 {
  6. def main(args: Array[String]): Unit ={
  7. // 创建SparkSQL的运行环境
  8. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
  9. val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
  10. val df: DataFrame = spark.read.json("datas/user.json")
  11. df.createOrReplaceTempView("user")
  12. //计算平均年龄
  13. spark.udf.register("ageAvg", functions.udaf(new MyAvgUDAF()))
  14. spark.sql("select ageAvg(age) from user").show()
  15. // 关闭环境
  16. spark.close()}case class Buff( var total:Long,var count:Long)
  17. class MyAvgUDAF extends Aggregator[Long,Buff,Long]{
  18. //初始值
  19. override def zero: Buff ={
  20. Buff(0L,0L)}
  21. //更新缓冲区
  22. override def reduce(buff: Buff, in: Long): Buff ={
  23. buff.total=buff.total+in
  24. buff.count=buff.count+1
  25. buff
  26. }
  27. //合并缓冲区
  28. override def merge(buff1: Buff, buff2: Buff): Buff ={
  29. buff1.total=buff1.total+buff2.total
  30. buff1.count=buff1.count+buff2.count
  31. buff1
  32. }
  33. //计算结果
  34. override def finish(buff: Buff): Long ={
  35. buff.total/buff.count
  36. }
  37. //缓冲区编码操作
  38. override def bufferEncoder: Encoder[Buff]= Encoders.product
  39. //输出的编码操作
  40. override def outputEncoder: Encoder[Long]= Encoders.scalaLong
  41. }}

在这里插入图片描述
还有一种方法,在Spark3已经不被官方推荐了,所以这里就不叙述了.

5.连接hive

首先我们在集群先,启动Hadoop和Hive
然后将jdbc的jar包放到hive的lib文件中
在这里插入图片描述
这个jar包在安装Hive环境时,使用过.
将虚拟机中的hive配置文件,hive-site.xml导出
在这里插入图片描述
放到idea的resource文件夹中,然后最好吧target文件夹删除,因为idea有可能从target中直接读取之前的数据,从而没有扫描hive-site.xml
在这里插入图片描述
我们就做最简单的查询操作
SparkSql_Hive.scala

  1. package com.atguigu.bigdata.spark.sql
  2. import org.apache.spark.SparkConf
  3. import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
  4. object SparkSql_Hive {
  5. def main(args: Array[String]): Unit ={
  6. val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparkSql")
  7. val spark: SparkSession = SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()
  8. spark.sql("show tables").show
  9. // 关闭环境
  10. spark.close()}}

在这里插入图片描述
如果能查询hive中的数据库,代表成功.

总结

SparkSQL的常用操作基本就这些,至于项目吗,下次专门在写一次吧

标签: spark 大数据 scala

本文转载自: https://blog.csdn.net/weixin_50835854/article/details/129789721
版权归原作者 超哥-- 所有, 如有侵权,请联系我们删除。

“spark第四章:SparkSQL基本操作”的评论:

还没有评论