0


Spark SQL

一.SparkSql

  • SparkSQL 可以简化 RDD 的开发,提高开发效率.提供了 2 个编程抽象,类似 Spark Core 中的 RDD ➢ DataFrame ➢ DataSet

1.SparkSQL 特点

➢ 易整合
无缝的整合了 SQL 查询和 Spark 编程
➢ 统一的数据访问
使用相同的方式连接不同的数据源
➢ 兼容 Hive
在已有的仓库上直接运行 SQL 或者 HiveQL
➢ 标准数据连接
通过 JDBC 或者 ODBC 来连接

DataFrame和DataSet

➢ DataFrameDataFrame也是一种基于RDD的分布式数据集, 与RDD的区别在于DataFrame中有数据的原信息
    DataFrame可以理解为传统数据库中的一张二维表格,每一列都有列名和类型
    
 ➢ DataSetDataSet也是分布式数据集,对DataFrame的一个扩展,相当于传统JDBC中的ResultSet

2.SparkSQL 核心编程

新的起点

    在SparkCore中需要创建上下文环境SparkContext
    而SparkSql对SparkCore的封装, 不仅仅是功能上的封装,上下文件环境也封装了
        老版本中称为 SQLContext 用于Spark自己的查询 和 HiveContext 用于Hive连接的查询
        新版本中称为 SparkSession 是 SQLContext 和 HiveContext的组成 , 所以他们的API是通用的
        同时 SparkSession也可以直接获取到SparkContext对象

在这里插入图片描述

3.DataFrame的创建和使用

    三个概念:
        数据:  RDD中只关心数据  比如:(1,"jack",20)  并不关心每个字段的汉字
        结构:DataFrame关心 数据+结构  比如:{"id":1,"name":"jack","age":20} 关心每个字段数据的类型
        类型:DataSet关系 数据+结构+类型 比如:DataSet[Person]Person是我们定义好的类, 既有类型+字段+数据

3.1 创建DataFrame

➢ 从数据源中创建
    scala>var df = spark.read.json("data/info.json")
            df:org.apache.spark.sql.DataFrame=[ age: bigint , id: bigint ]
➢ 从RDD中转换(后续章节补充)
➢ 从HiveTable查询返回(后续章节补充)

在这里插入图片描述

3.2 使用DataFrame

使用DataFrame有两个方式,分别是 SQL语法和DSL语法

➢ SQL语法
    1. 通过 "临时视图" 来使用,所以先创建视图
    2. 通过 sparkSession对象执行sql进行数据查询
    scala> df.createOrReplaceTempView("user")//创建临时视图
    scala>var viewdf = spark.sql("select id,name,age from user")//通过spark执行sql
    viewdf:org.apache.spark.sql.DataFrame=[id: bigint, name: string]//执行sql返回的还是DF
    scala> viewdf.show  //展示DF中的数据
    scala> spark.sql("select id,name,age from user").show  //也可以直接查询sql并展示+---+-----+---+| id| name|age|+---+-----+---+|1|jack1|18||2|jack2|28||3|jack3|38|+---+-----+---+
    
    注意: 
        df.createOrReplaceTempView 只能创建当前会话有效的临时视图
        df.createOrReplaceGlobalTempView 能创建所有会话都有效的临时视图
        使用时 需要在视图名前面加上  global_temp.视图名
        
➢ DSL语法
    DSL称为 Domain-SpecificLanguage 特定领域语言
    这是 DataFrame中管理结构化数据的API ,通过DataFrame就可以调用这些API

    scala> df.printSchema
            root
             |-- age:long(nullable =true)|-- id:long(nullable =true)|-- name: string (nullable =true)
    scala> df.select("name")
            res20:org.apache.spark.sql.DataFrame=[name: string]//基本查询
    scala> df.select("name").show
            +-----+| name|+-----+|jack1||jack2||jack3|+-----+//列运算         
    scala> df.select($"age"+1).show
    scala> df.select('age +1).show
            +---------+|(age +1)|+---------+|19||29||39|+---------+//取别名
    scala> df.select('name,'age +1 as "aa").show
            +-----+---+| name| aa|+-----+---+|jack1|19||jack2|29||jack3|39|+-----+---+//统计函数
    scala> df.select(avg("age") as "平均年龄").show
            +--------+|平均年龄|+--------+|48.0|+--------+//条件过滤
    scala> df.filter('age >25).show
            +---+---+-----+|age| id| name|+---+---+-----+|28|2|jack2||38|3|jack3|+---+---+-----+//组合+聚合函数
    scala> df.groupBy("id").count.show
            +---+-----+| id|count|+---+-----+|1|1||3|1||2|1|+---+-----+

3.3 DataFrame转换

➢ RDD 与 DF 转换需要导入 隐式函数
    importspark.implicits._
    这里的spark是 SparkSession的对象名,因此需要创建好SparkSession对象之后导入,并且该对象必须是val常量
    
    1. RDD ==> DF ,缺少结构,即字段名
        scala>var rdd = spark.sparkContext.makeRDD(List(1,2,3))
                rdd: org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[129] at makeRDD at <console>:23
        scala>var df = rdd.toDF("id")
                df:org.apache.spark.sql.DataFrame=[id:int]
        scala> df.show
                +---+| id|+---+|1||2||3|+---+2. DF ===> RDD DF内部封装了RDD 直接获取即可
            删除结构后,DF中每一行 就会变成一个Row对象
            通过Row对象的get(index) 或者 getAs[Type](index)方法获取Row对象中的数据
            
    scala>var df = spark.read.json("data/info.json");
            df:org.apache.spark.sql.DataFrame=[age: bigint, id: bigint]
    scala>var rdd = df.rdd
            rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] 
    scala>var arr = rdd.collect
            arr:Array[org.apache.spark.sql.Row]=Array([18,1,jack1],[28,2,jack2],[38,3,jack3])
    scala>arr(0)
            res62:org.apache.spark.sql.Row=[18,1,jack1]
    scala>arr(0).get(0)
            res63:Any=18
    scala>arr(0).getAs[String](2)
            res67:String= jack1         

3.4 DataSet

  • DataSet 即有数据,又有结构,也有类型, DataFrame其实一个特殊的DataSet,其类型是DataSet[Row]
➢ 通过Seq或者List 可以把集合直接转成DS
    1. 通过基本类型的集合
     scala>var ds =List(1,2,3).toDS
            ds:org.apache.spark.sql.Dataset[Int]=[value:int]

    scala>var ds =List(1.1,2.2).toDS
            ds:org.apache.spark.sql.Dataset[Double]=[value:double]2.通过已定义类型的集合
    scala>caseclassUser(age:Int,name:String)
            defined classUser

    scala>var ds =List(User(10,"jack"),User(20,"rose")).toDS
            ds:org.apache.spark.sql.Dataset[User]=[age:int, name: string]

3.5 DataSet的转换

    在实际使用的时候,很少用到把序列转换成DataSet,更多的是通过RDD来得到DataSet1. RDD ==> DS  缺少结构和类型
        a. RDD 转DS 我们一般可以映射成 具体具体类型的RDD之后再转DS 
    scala>caseclassUser(name:String, age:Int)
            defined classUser
    scala> sc.makeRDD(List(("zhangsan",30),("lisi",49))).map(t=>User(t._1,t._2)).toDS
            res11:org.apache.spark.sql.Dataset[User]=[name: string, age:int]
        b. 也可以直接ToDS
    scala> sc.makeRDD(List(("zhangsan",30),("lisi",49))).toDS
            res11:org.apache.spark.sql.Dataset[(String,Int)]=[_1:String, _2:Int]2.DS ==> RDD  DS内部封装了RDD 直接获取即可,且获取出来的RDD也是带有类型的
    scala>var ds =List(User("aa",11),User("bb",22)).toDS
            ds:org.apache.spark.sql.Dataset[User]=[name: string, age:int]
    scala>var rdd = ds.rdd
            rdd: org.apache.spark.rdd.RDD[User]

3.5 DataFrame和DataSet的转换

➢ DataFrame==>DataSet  需要一个类型    
    scala>caseclassUser(name:String, age:Int)
            defined classUser
    scala> val df = sc.makeRDD(List(("zhangsan",30),("lisi",49))).toDF("name","age")
            df:org.apache.spark.sql.DataFrame=[name: string, age:int]
    scala> val ds = df.as[User]
            ds:org.apache.spark.sql.Dataset[User]=[name: string, age:int]

        
➢ DataSet==>DataFrame  删除类型 即变成 DataSet[Row]
    scala> val df = ds.toDF
            df:org.apache.spark.sql.DataFrame=[name: string, age:int]

3.6 RDD、DataFrame、DataSet

    DF ==rdd==> RDD [ ROW ] 
    DS ==rdd==> RDD [Type]
    转rdd 如果是DF 那么泛型是ROW 如果是DS泛型就是DS的泛型
    
    RDD ==toDF==> DF [ ROW ]
    DS   ==toDF==> DF [ ROW ]
    转DF 无论如何DF没有类型 所以都是ROW

    DF   ==as[Type]==> DS[Type]
    RDD ====> DS [ RDD的泛型 ]
    转DS 如果是DF 那么泛型是Type 如果是RDD泛型就是RDD的泛型

4. IDEA 开发 SparkSQL

object Spark_SQL_Start {def main(args: Array[String]):Unit={//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().config(new SparkConf().setMaster("local[*]").setAppName("start01")).getOrCreate()importspark.implicits._
        //2.DF的创建和使用val df: DataFrame = spark.read.json("datas/info.json").cache()//SQL
        df.createOrReplaceTempView("User")
        spark.sql("select * from User").show()//DSL
        df.select("name").show()
        df.groupBy("id").count().show()
        println("-----------------------")//3.DF ==> DS  DF ==> RDDval ds: Dataset[User]= df.as[User]
        ds.show()val rdd: RDD[Row]= df.rdd
        rdd.collect().foreach(println)
        println("-----------------------")//4.RDD ==> DF  RDD ==> DSval rdd1: RDD[(Int,String)]= spark.sparkContext.makeRDD(Seq((10,"tom"),(20,"jack")))var df1: DataFrame = rdd1.toDF("id","name")
        df1.show()val ds1: Dataset[User]= rdd1.map(t=>User(t._1,t._2)).toDS()
        ds1.show()
        println("-----------------------")//5.DS==>RDD  DS==>DFval rdd2: RDD[User]= ds1.rdd
        rdd2.collect().foreach(println)val df2: DataFrame = ds1.toDF()
        df2.show()//6.关闭
        spark.close()}caseclass User(id:Long,name:String)}

5. UDF

  • 执行spark.sql时 可以使用用户自定义函数,实现自己想要的功能
  • 通过spark.udf.register注册函数即可使用
object Spark_Sql_UDF{
    def main(args:Array[String]):Unit={
        val spark:SparkSession=SparkSession.builder().config(newSparkConf().setAppName("UDF").setMaster("local[*]")).getOrCreate()importspark.implicits._
        //注册用户自定义函数
        spark.udf.register("getWithName",(x)=>{"Name:"+x})//创建DF
        val df:DataFrame= spark.read.format("json").load("datas/info.json")//创建临时表 
        df.createOrReplaceTempView("user")//使用sql自定义函数
        spark.sql("select getWithName(name),id from user").show()
        spark.close()}}
➢ info.json:{"id":10,"name":"jack"}{"id":20,"name":"rose"}{"id":30,"name":"tom"}
➢ console:|getWithName(name)| id|+-----------------+---+|Name:jack|10||Name:rose|20||Name:tom|30|+-----------------+---+

5. UDAF

  • 用户自定义聚合函数
  • 通过 - 继承 UserDefinedAggregateFunction 弱类型的聚合函数 ( Spark3.0之前 )- 继承 Aggregator 强类型的聚合函数 ( Spark3.0 )
    需求: 自定义求平均值函数 avgAge
    
➢ 通过RDD实现
        val rdd: RDD[(String,Int)]= spark.sparkContext.makeRDD(Seq(("jack",10),("rose",20),("tom",30)))val ageOneRdd: RDD[(Int,Int)]= rdd.map {case(_, age)=>(age,1)}val ageCount:(Int,Int)= ageOneRdd.reduce((t1, t2)=>(t1._1 + t2._1, t1._2 + t2._2))
        println(ageCount._1 / ageCount._2)
➢ 通过累加器实现
        val ageAcc =new AgeAccumulator
        spark.sparkContext.register(ageAcc)val rdd: RDD[(String,Int)]= spark.sparkContext.makeRDD(Seq(("jack",10),("rose",20),("tom",30)))
        rdd.foreach {case(_, age)=> ageAcc.add(age)}val ageCount:(Int,Int)= ageAcc.value
        println(ageCount._1 / ageCount._2)class AgeAccumulator extends AccumulatorV2[Int,(Int,Int)]{privatevar ageSum:Int=0privatevar ageCnt:Int=0overridedef isZero:Boolean= ageSum ==0&& ageCnt ==0overridedef copy(): AccumulatorV2[Int,(Int,Int)]=new AgeAccumulator
        overridedef reset():Unit={
            ageSum =0
            ageCnt =0}overridedef add(age:Int):Unit={
            ageSum += age
            ageCnt +=1}overridedef merge(other: AccumulatorV2[Int,(Int,Int)]):Unit={
            ageSum += other.value._1
            ageCnt += other.value._2
        }overridedef value:(Int,Int)=(ageSum, ageCnt)}

➢ 通过 继承 UDAF (Spark3.0之前) 抽象类实现自定义聚合函数
        //创建DFval rdd: RDD[(String,Int)]= spark.sparkContext.makeRDD(Seq(("jack",10),("rose",20),("tom",30)))val df: DataFrame = rdd.toDF("name","age")//创建自定义集合函数对象val ageUDAF =new AgeUDAF
        //注册UDAF
        spark.udf.register("ageAVG", ageUDAF)//创建临时表
        df.createOrReplaceTempView("user")//执行sql
        spark.sql("select ageAVG(age) from user").show()class AgeUDAF extends UserDefinedAggregateFunction {//聚合函数输入的数据类型overridedef inputSchema: StructType ={
            StructType(
                Array(
                    StructField("age", IntegerType)))}//计算过程的缓冲区overridedef bufferSchema: StructType ={
            StructType(
                Array(
                    StructField("ageSum", LongType),
                    StructField("ageCnt", LongType)))}//聚合函数返回值类型overridedef dataType: DataType = DoubleType
        // 稳定性:对于相同的输入是否一直返回相同的输出overridedef deterministic:Boolean=true// 函数缓冲区初始化overridedef initialize(buffer: MutableAggregationBuffer):Unit={
            buffer(0)=0L
            buffer(1)=0L}//累加计算overridedef update(buffer: MutableAggregationBuffer, input: Row):Unit={
            buffer(0)= buffer.getLong(0)+ input.getInt(0)
            buffer(1)= buffer.getLong(1)+1}//合并overridedef merge(buffer1: MutableAggregationBuffer, buffer2: Row):Unit={
            buffer1(0)= buffer1.getLong(0)+ buffer2.getLong(0)
            buffer1(1)= buffer1.getLong(1)+ buffer2.getLong(1)}//计算结果overridedef evaluate(buffer: Row):Double= buffer.getLong(0).toDouble / buffer.getLong(1)}        

➢ 通过继承 Aggregate (Spark3.0) 自定义强类型聚合函数
        //创建强类型UDAF对象val ageAggr =new AgeAggregator
        //注册强类型的UDAF,需要使用functions.udaf进行函数转换
        spark.udf.register("ageAggr", functions.udaf(ageAggr))//创建rddval rdd: RDD[(String,Int)]= spark.sparkContext.makeRDD(Seq(("jack",10),("rose",20),("tom",30)))//转成DFval df: DataFrame = rdd.toDF("name","age")//创建临时表
        df.createOrReplaceTempView("user")//执行sql 直接使用聚合函数
        spark.sql("select ageAggr(age) from user").show()class AgeAggregator extends Aggregator[Int,(Long,Long),Double]{//缓冲区 初始值overridedef zero:(Long,Long)=(0,0)//输入age到缓冲区计算overridedef reduce(buff:(Long,Long), age:Int):(Long,Long)={(buff._1 + age, buff._2 +1)}//合并多个缓冲区overridedef merge(buff1:(Long,Long), buff2:(Long,Long)):(Long,Long)={(buff1._1 + buff2._1, buff1._2 + buff2._2)}//计算结果overridedef finish(buff:(Long,Long)):Double={
            buff._1.toDouble / buff._2
        }//输入编码,自定义对象Encoders.product  其他Encoders.scalaXxxoverridedef bufferEncoder: Encoder[(Long,Long)]= Encoders.tuple(Encoders.scalaLong, Encoders.scalaLong)//输出编码,自定义对象Encoders.product  其他Encoders.scalaXxxoverridedef outputEncoder: Encoder[Double]= Encoders.scalaDouble
    }

6. 数据的加载和保存

通用的加载和保存方式

SparkSQL 提供了通用的保存数据和加载数据的方式. 默认的保存和加载数据的格式都是parquet
    scala> spark.read.load("parquet path") 和 df.write.save("parquet path")

我们可以通过设置不同的参数,来指定不同的数据源格式,读取和保存数据均可
    scala>  spark.read.format("json")[.option("key","value")].load("filepath")
    
➢ format("...")  指定读取数据源的格式, 包括 "csv"、"jdbc"、"json"、"orc"、"parquet" 和 "textFile"。

➢ option("key","value")  如果format是jdbc, 那么使用多个option传递JDBC参数

➢ 也有一些简化的方法,用于特定的文件读取,从而省略format调用
    spark.read.json("json file path") spark.read.cvs("cvs file path")
    scala> spark.read.
                csv      jdbc   load     options   parquet   table   textFile   
                format   json   option   orc       schema    text 
    
保存操作:
    df.write.mode("SaveMode String").save("parquet file path")
    除了和读取操作一样的参数之外, 另有一个模式,表示保存时的状态
    SaveMode.ErrorIfExists(default)==>"error"(默认的) 如果文件已经存在则抛出异常
    SaveMode.Append  ==>"append" 如果文件已经存在则追加(会有多个文件生成)
    SaveMode.Overwrite  ==>"overwrite" 如果文件已经存在则覆盖(把以前的文件删除)
    SaveMode.Ignore  ==>"ignore" 如果文件已经存在则忽略(不生成新文件,保留原来的文件 )
    
    scala> df.write.mode("append").json("/output")

7. MySQL数据读取和写入

➢ spark-shell 命令行连接

1. 添加mysql驱动到spark的jars目录下
    2. scala> val jdbcDF = spark.read.format("jdbc").options(Map("url" ->"jdbc:mysql://localhost:3306/mysql", "driver" ->"com.mysql.jdbc.Driver", "dbtable" ->"plugin", "user" ->"root", "password" ->"1234")).load()

➢ scala代码连接

1.导入mysql的驱动依赖
    <dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.27</version></dependency>2.读取mysql数据
    spark.read.format("jdbc").options(Map("url"->"jdbc:mysql://192.168.189.90:3306/mysql","driver"->"com.mysql.cj.jdbc.Driver","user"->"root","password"->"1234","dbtable"->"help_topic")).load().show()

    spark.read.format("jdbc").option("url","jdbc:mysql://192.168.189.90:3306/mysql").option("driver","com.mysql.cj.jdbc.Driver").option("user","root").option("password","1234").option("dbtable","plugin").load().show

    val props: Properties =new Properties()
    props.setProperty("user","root")
    props.setProperty("password","1234")
    spark.read.jdbc("jdbc:mysql://node0:3306/mysql","plugin", props).show()3.写入数据到mysql
    val df: DataFrame = spark.sparkContext.makeRDD(List(1,2,3)).toDF("id")
    df.write.format("jdbc").options(Map("url"->"jdbc:mysql://192.168.189.90:3306/test","driver"->"com.mysql.cj.jdbc.Driver","user"->"root","password"->"1234","dbtable"->"ids")).mode(SaveMode.Append)//默认的模式是 表存在则报错,这里指定追加 表存在不会保存.save()

8.Spark连接Hive

  • Apache Hive 可以理解为一个Hadoop上的SQL引擎数据库
  • SparkSQL在编译时可以加入Hive的支持也可以不加入(我们下载的Hive二进制包是包含Hive支持的) - 那么就可以 访问Hive表、UDF (用户自定义函数) 以及 Hive 查询语言(HiveQL/HQL)等- 由于Spark包含了Hive 所以连接Hive有两种方式, - 一是连接自带的内嵌Hive(不需要任何配置)- 二是连接外部的Hive 需要简单的配置

1.连接内嵌的Hive

    连接内嵌Hive什么都不需要做, 默认使用derby作为元数据库,使用本地文件系统作为数据仓库
    执行两个命令://执行查看数据库sql, 会自动在 $spark_home下生成metastore_db元数据库信息
        scala> spark.sql("show tables").show
        //执行创建表操作,或者插入数据操作,会自动生成并在$spark_home/spark-warehouse 存储数据
        scala> spark.sql("create table test(id int)")
        spark.sql("insert into test values(1),(2)")

2.连接外部的Hive

  • Spark 使用hive 数据仓库, - 将hive-site.xml文件 拷贝到spark的conf目录下- 添加mysql驱动包到spark的jars目录下- 如果spark访问不到hdfs, 需要将core-site.xml和hdfs-site.xml也拷贝到spark的conf目录下 [经测试是不需要拷贝这两个文件的,也不知道spark是如何找到hadoop的,hive是通过$HADOOP_HOME找到的]- 如果hive-site.xml配置了元数据服务器,需要启动元数据服务器- 重写启动spark-shell即可自动切换到hive数据仓库
scala> spark.sql("show tables").show
        +--------+--------------------+-----------+|database|           tableName|isTemporary|+--------+--------------------+-----------+| default|                  aa|false|| default|         live_events|false|| default|        login_events|false|| default|order_amount_by_p...|      false|| default|        order_detail|false|| default|    page_view_events|false|| default|      payment_detail|false|| default|        product_info|false|| default|      promotion_info|false|| default|       province_info|false|+--------+--------------------+-----------+

2.使用Spark SQL CLI

    确保Spark可以连接外部的hive之后,.就可以是用spark-sql直接连接hive进行操作
    [zhyp@node0 spark-local]$ bin/spark-sql
    spark-sql>showtables;default aa      falsedefault live_events     falsedefault login_events    falsedefault order_amount_by_province        falsedefault order_detail    falsedefault page_view_events        falsedefault payment_detail  falsedefault product_info    falsedefault promotion_info  falsedefault province_info   falseTime taken: 2.433 seconds, Fetched 10row(s)

2.使用Spark Beeline

  • Spark Thrift Server 是 基于HiveServer2的另外一个实现 , 完全兼容HiveServer2 并且都是一样的协议和端口 我们使用Spark Thrift Server 取代HiveServer2 和 Hive的metastore服务交互获取元数据
  • Spark Beeline 连接 Spark thrift server的步骤 - 将hive-site.xml文件 拷贝到spark的conf目录下- 如果hive-site.xml配置了元数据服务器,需要启动元数据服务器 - [如果不是用元数据服务器,需要在jars中加入mysql驱动和配置文件中添加连接数据库四大要素]- 启动Spark thrift server 使用beeline连接即可
[zhyp@node0 spark-local]$ sbin/start-thriftserver.sh
    [zhyp@node0 spark-local]$ bin/beeline -u jdbc:hive2://linux1:10000 -n zhyp
    或者
    [zhyp@node0 spark-local]$ bin/beeline
    beeline>!connect jdbc:hive2://node0:10000

3.代码连接Hive

def main(args: Array[String]):Unit={//创建 SparkSession
        System.setProperty("HADOOP_USER_NAME","zhyp")val spark: SparkSession = SparkSession
            .builder().enableHiveSupport()//添加Hive支持,默认是不支持连接Hive,开启后会读取classpath下的hive-site.xml.config("spark.sql.warehouse.dir","hdfs://node0:8020/user/hive/warehouse")//通过spark创建数据库需要写这个地址,因为新建的数据库默认是在本地路径中找/user/hive/warehouse.master("local[*]").appName("sql").getOrCreate()importspark.implicits._

        spark.sql("create table abc(id int)").show()
        spark.sql("insert into abc values(1),(111)").show()
        spark.sql("show tables").show()
        spark.sql("select * from abc").show()

        spark.close()}

代码练习

packagecom.zhyp.spark.sql.startimportorg.apache.spark.SparkConf
importorg.apache.spark.sql.{Dataset, SparkSession}object Spark_SQL_Test01{def main(args: Array[String]):Unit={
        System.setProperty("HADOOP_USER_NAME","zhyp")//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().enableHiveSupport().config("spark.sql.warehouse.dir","hdfs://node0:8020/user/hive/warehouse").config(new SparkConf().setMaster("local[*]").setAppName("start01")).getOrCreate()importspark.implicits._
        spark.sql("create database test")
        spark.sql("use test")
        spark.sql("""
                |CREATE TABLE `user_visit_action`(
                | `date` string,
                | `user_id` bigint,
                | `session_id` string,
                | `page_id` bigint,
                | `action_time` string,
                | `search_keyword` string,
                | `click_category_id` bigint,
                | `click_product_id` bigint,
                | `order_category_ids` string,
                | `order_product_ids` string,
                | `pay_category_ids` string,
                | `pay_product_ids` string,
                | `city_id` bigint)
                |row format delimited fields terminated by '\t';
            """.stripMargin)
        spark.sql("""
                |load data local inpath 'datas/user_visit_action.txt' into table
                |user_visit_action
            """.stripMargin)
        spark.sql("""
                |CREATE TABLE `product_info`(
                | `product_id` bigint,
                | `product_name` string,
                | `extend_info` string)
                |row format delimited fields terminated by '\t'
            """.stripMargin)
        spark.sql("""
                |load data local inpath 'datas/product_info.txt' into table product_info
            """.stripMargin)
        spark.sql("""
                |CREATE TABLE `city_info`(
                | `city_id` bigint,
                | `city_name` string,
                | `area` string)
                |row format delimited fields terminated by '\t'
            """.stripMargin)
        spark.sql("""
                |load data local inpath 'datas/city_info.txt' into table city_info
            """.stripMargin)

        spark.close()}}packagecom.zhyp.spark.sql.startimportorg.apache.spark.SparkConf
importorg.apache.spark.sql.expressions.Aggregator
importorg.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}importscala.collection.mutableimportscala.collection.mutable.ListBuffer
object Spark_SQL_Test03 {def main(args: Array[String]):Unit={
        System.setProperty("HADOOP_USER_NAME","zhyp")//1.创建SparkSessionval spark: SparkSession = SparkSession.builder().enableHiveSupport().config("spark.sql.warehouse.dir","hdfs://node0:8020/user/hive/warehouse").config(new SparkConf().setMaster("local[*]").setAppName("start01")).getOrCreate()//2.执行
        spark.udf.register("cityRemark", functions.udaf(new CityRemarkUDAF))
        spark.sql("use test")//1.关联三张表 过滤非点击数据
        spark.sql("""
                |select
                |c.area,c.city_name,p.product_name
                |from
                |user_visit_action a
                |join
                |city_info c
                |on a.city_id = c.city_id
                |join
                |product_info p
                |on a.click_product_id = p.product_id
                |where a.click_product_id > -1
            """.stripMargin).createOrReplaceTempView("t1")//2.分组 按照地区和商品分组
        spark.sql("""
                |select
                |area,product_name,
                |cityRemark(city_name) city_remark,
                |count(*) clickCnt
                |from t1
                |group by area,product_name
            """.stripMargin).createOrReplaceTempView("t2")//3.对 同一个地区的各种商品点击量排名
        spark.sql("""
                |select
                |*,
                |rank() over(partition by area order by clickCnt desc) rank
                |from t2
            """.stripMargin).createOrReplaceTempView("t3")//4.取各区域的前三名
        spark.sql("""
                |select
                |*
                |from t3
                |where rank <= 3
            """.stripMargin).show()}caseclass CityAndCntBuff(map:mutable.Map[String,Long])class CityRemarkUDAF extends Aggregator[String, CityAndCntBuff,String]{overridedef zero: CityAndCntBuff = CityAndCntBuff(mutable.Map())overridedef reduce(buff: CityAndCntBuff, city:String): CityAndCntBuff ={val map: mutable.Map[String,Long]= buff.map
            map.update(city, map.getOrElse(city,0L)+1L)
            buff
        }overridedef merge(buff1: CityAndCntBuff, buff2: CityAndCntBuff): CityAndCntBuff ={var map1 = buff1.map;var map2 = buff2.map
            map2.foreach({case(city, cnt)=>{
                    map1.update(city, map1.getOrElse(city,0L)+ cnt)}})
            buff1
        }overridedef finish(resultBuff: CityAndCntBuff):String={val resultMap: mutable.Map[String,Long]= resultBuff.map
            val totalCnt:Long= resultMap.values.reduce(_ + _)val top2: List[(String,Long)]= resultMap.toList.sortBy(_._2)(Ordering.Long.reverse).take(2)val cityBuffer =new ListBuffer[String]var percentSum =0L
            top2.foreach({case(city, cnt)=>{val percent:Long= cnt *100/ totalCnt
                    cityBuffer.append(s"${city}${percent}%")
                    percentSum += percent
                }})if(resultMap.size >2){
                cityBuffer.append(s"其他 ${1- percentSum}%")}
            cityBuffer.mkString(", ")}overridedef bufferEncoder: Encoder[CityAndCntBuff]= Encoders.product
        overridedef outputEncoder: Encoder[String]= Encoders.STRING
    }}
标签: spark sql 大数据

本文转载自: https://blog.csdn.net/ytzhyp/article/details/129675538
版权归原作者 ytzhyp 所有, 如有侵权,请联系我们删除。

“Spark SQL”的评论:

还没有评论