0


SparkSQL知识点总结

一、SparkSql的概述

1.1 SparkSql是什么

1. SparkSql 是Spark生态体系中的一个基于SparkCore的SQL处理模块
2. 用途是处理具有结构化的数据文件的
3. 前身叫Shark,由于Shark是基于Hive,而Hive的发展限制了Shark的功能更新,因此该项目的负责人停止项目的发展,将相应的SQl处理功能独立出来,更名为SparkSQL
4. SparkSQL也是基于内存和RDD的
5. 本质是SparkSQL会在底层转成sparkcore程序

在这里插入图片描述

1.2 SparkSQL的特点

1.  融合特点:
    将 SQL 查询与 Spark 程序无缝混合。   可以使用熟悉的sql在 Spark 程序中查询结构化数据
2. 统一的数据访问接口
     可以使用统一的接口来访问各种数据源,比如avro,json,parquet,orc,hive以及jdbc等
3. hive集成
   就是可以使用sparksql直接访问hive的数据
4. 提供了标准的jdbc和odbc连接接口
   可以像其他语言一样,使用jdbc或者odbc连接RDBMS等
5. sparksql内置了优化器策略。列式存储,等加快查询速度。   

1.3 SparkSQL的数据模型

1.3.0 RDD的回顾

1. RDD是弹性分布式数据集,是一个不存储数据的,不可变,可分区的,并行计算的数据集合
2. 有五大特征:
   --分区列表:  每一个RDD在处理数据时,对于流经过来的数据都是进行分区的。数据都是有自己的分区号的。
               换句话说,每一个RDD都是有固定的分区号的,比如有三个分区,就是0,1,2三个号码
   --依赖关系:  当前的RDD存储这依赖关系
                RDD4---算子3-->RDD3--算子2-->RDD2--算子1-->RDD1
   --计算表达式: 接受的就是传入到算子中的匿名函数:比如 _*2     
                map(_*2)
   --可选的分区器:PairRDD才会用到分区器
   --可选的首选位置:用于计算的节点选择。
   
   
简单的说:  整个sparkcore的运算过程如下:
          1.  先通过程序构建有向无环图,每个节点都是一个RDD. 此时RDD没有任何数据,只有属性初始化。
          2.  Task将程序发送到各个节点后,通过行动算子触发真正的程序运算,也就是数据流动。
          3.  所有的RDD其实就是数据在各个节点(RDD)间不断的流动。最终计算出结果,这期间RDD不存储数据,只是临时经过而已。

1.3.1 DataFrame

1. RDD是一个弹性分布式数据集,DataFrame在RDD的基础上加了Schema的概念,这里的Schema就是表头。
   扩展:Schema表示描述数据的数据,即可以认为是元数据,
   DataFrame曾经就有个名字叫SchemaRDD
2. DataFrame在SparkCore基础上了提供了自己的API接口(其实就是各种算子),还提供SQL风格的写法
3. DataFrame可以理解为是一张二维表,      参考下图
4. DataFrame和RDD一样,不能存储数据,

简单来说:DataFrame就是对RDD的再次封装. 只不过多了一个Schema的概念
     
     
     res0:RDD[String] = sc.textFile("D:/data/a.txt")  //获取一个RDD
     res1:DataFrame = res0.toDF() // 将RDD转成DataFrame数据结构
     res1的具体的表结构:[value: string]  //表示表头只有一个字段叫value,类型是String的数据结构
     res2: RDD[Row] = res1.rdd // 将DataFrame转成RDD    
     
     通过上述代码可知:DataFrame就是一个特殊的额RDD.  特殊在泛型为Row的RDD。
     
     其实:就是将每一行的对应的一个String解析成了多个字段,这一行的所有字段加起来构成一个Row对象。

假设RDD中的两行数据长这样.
在这里插入图片描述

那么DataFrame中的数据长这样
在这里插入图片描述

如果处理一个文件,文件的内容如下:

1001,zhangsan,23,f
1002,lisi,24,f
1003,wangwu,23,m

那么RDD的样子:  RDD[String],  每一个String表示一行记录。

DataFrame的样子如下:
            stuid:Int  name:String   age:Int   gender:Chcar
            1001       zhangsan      23         f
            1002       lisi          24         f
            1003       wangwu        23         m
            
            
            value:String
            1001,zhangsan,23,f

1.3.2 DataSet

1. DataSet 也是一种弹性分布式数据集,在RDD的基础上提供了强制类型检测和转换的功能
  -- RDD封装的是多行字符串,而强制类型表示相当于RDD里封装的每一行记录都是一个类型的对象
2. DataSet 提供了自己的API接口,多种算子都和DataFrame一样,只不过在这基础上扩展了一些功能。
3. DataSet 在编译期间会检查类型是否匹配,如果不匹配,直接报错。而DataFrame在编译期间不检查,所以针对于程序员来说,体验度非常差。

假设RDD中的两行数据长这样
在这里插入图片描述

那么Dataset的数据模型可以理解为如下图所示:(每行数据是个Object)
在这里插入图片描述

var rdd1: RDD[String] = sc.textFile("D:....")
var  df:DataFrame[Row] = rdd1.toDF()     // 将RDD中的每一行记录都封装成Row对象

var rdd2: RDD[Student] = sc.makeRDD(List(Student(1001,"zhangsan"),Student(1002,"lisi")))
var  df:DataFrame[Row] = rdd2.toDF()     // 将RDD中的每一个Student又封装成Row对象,
因此 DataFrame的数据结构非常具有局限性。RDD的任何类型都会被描述成Row对象,非常单一。

因此SparkSql引入了DataSet数据模型, DataSet数据模型中的数据可以是任意的自定义类型。有点类似于JDBC里的结果集

String sql = "select ename,empno,sal,job from emp"
PreparedStatement ps = conn.preparestatement(sql)
ResultSet[Emp] rs = ps.executeQuery()
while(rs.next){
    emp.getEmpno()
    emp.getJob()
    emp.getSal()
    emp.getEname()
}

二、SparkSQL的核心编程

2.1 SparkSession的介绍

1. SparkSession是SparkSql的编程入口,就像SparkContext是SparkCore的编程入口一样。
2. 在Spark2.0版本以前,没有SparkSession的概念,而是有两个入口,一个是SqlContext,另一个是HiveContext,HiveContext是SqlContext的子类型,提供了与Hive有关的功能实现,比如row_number开窗函数等。
3. 在Spark2.0版本以后,将两个入口合并成一个入口,就是SparkSession.
4. SparkSession的获取方式有很多种,比如:
    --(1) 使用构建器和SparkConf配合一起获取SparkSession对象
    --(2) 使用构造器直接获取SparkSession对象
    --(3) 通过连接hive获取SparkSession对象
    --(4) 通过SparkContext获取SparkSession对象
    --还有很多方法,可以自行百度学习。

window下的spark-shell

1.  将spark安装包解压到没有中文和空格的路径下。
            D:\Users\uyihsgnaw\Documents\spark-2.2.3-bin-hadoop2.7
2.  配置环境变量,添加到Path
      Path的值:追加路径bin。
         D:\Users\uyihsgnaw\Documents\spark-2.2.3-bin-hadoop2.7\bin
3. 可以直接运行bin下的spark-shell指令         

简单演示一下小案例:


scala> val df = spark.read.json("D:/data/emp.json")
df: org.apache.spark.sql.DataFrame =[age: bigint, id: string ...1 more field]

scala> df.show-- show是行动算子+---+----+---------+|age|  id|     name|+---+----+---------+|23|1001|zhangsqan||24|1002|     lisi|+---+----+---------+

scala> df.select("age").show+---+|age|+---+|23||24|+---+

scala> df.select("name","age").show+---------+---+|     name|age|+---------+---+|zhangsqan|23||     lisi|24|+---------+---+

scala> df.select($"age"+1).show+---------+|(age +1)|+---------+|24||25|+---------+

scala> df.select('age+1).show+---------+|(age +1)|+---------+|24||25|+---------+

scala> df.count
res15: Long =2

2.2 DataFrame的应用

2.2.1 DataFrame的创建方式

1. 读外部设备的文件,返回DataFrame对象
2. 从RDD转换成DataFrame对象
3. 读取Hive中的表,返回DataFrame对象
4. 调用createDataFrame方法,返回DataFrame对象

2.2.2 两种编程风格的介绍

1) SQL风格编程

就是编写sql语句,底层翻译成相关算子进行执行

步骤如下:
步骤1):  获取DataFrame对象,然后使用相关方法描述成一张临时视图名称
    DataFrame的四个方法如下:
    createGlobalTempView:   创建全局临时视图, 意思就是整个SparkSql中都可以使用   
                                  如果已经存在,则提示错误
    createOrReplaceGlobalTempView:   创建或替换全局临时视图
                                           如果已经存在,就替换
    createTempView:   创建当前会话的临时视图,
                             如果已经存在,则提示错误
   createOrReplaceTempView: 创建或替换当前会话的临时视图
                             如果已经存在,就替换
步骤2) 使用SparkSession提供的Sql方法,来编写sql语句

案例演示:

scala> val df = spark.read.json("D:/data/emp.json")
scala> df.show+----+------+-----+------+----------+---------+----+----+|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|+----+------+-----+------+----------+---------+----+----+|null|20|7369| SMITH|1980-12-17|    CLERK|7902|800||300|30|7499| ALLEN|1981-02-20| SALESMAN|7698|1600||500|30|7521|  WARD|1981-02-22| SALESMAN|7698|1250||null|20|7566| JONES|1981-04-02|  MANAGER|7839|2975||1400|30|7654|MARTIN|1981-09-28| SALESMAN|7698|1250||null|30|7698| BLAKE|1981-05-01|  MANAGER|7839|2850|..........

scala> df.createTempView("emp")-- 创建当前会话的临时视图

scala> spark.sql("select * from emp")-- 没有行动算子

scala> spark.sql("select * from emp").show-- 调用show行动算子+----+------+-----+------+----------+---------+----+----+|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|+----+------+-----+------+----------+---------+----+----+|null|20|7369| SMITH|1980-12-17|    CLERK|7902|800||300|30|7499| ALLEN|1981-02-20| SALESMAN|7698|1600||500|30|7521|  WARD|1981-02-22| SALESMAN|7698|1250||null|20|7566| JONES|1981-04-02|  MANAGER|7839|2975||1400|30|7654|MARTIN|1981-09-28| SALESMAN|7698|1250||null|30|7698| BLAKE|1981-05-01|  MANAGER|7839|2850||null|10|7782| CLARK|1981-06-09|  MANAGER|7839|2450||null|20|7788| SCOTT|1987-04-19|  ANALYST|7566|3000||null|10|7839|  KING|1981-11-17|PRESIDENT|null|5000||0|30|7844|TURNER|1981-09-08| SALESMAN|7698|1500||null|20|7876| ADAMS|1987-05-23|    CLERK|7788|1100||null|30|7900| JAMES|1981-12-03|    CLERK|7698|950||null|20|7902|  FORD|1981-12-02|  ANALYST|7566|3000||null|10|7934|MILLER|1982-01-23|    CLERK|7369|1300|+----+------+-----+------+----------+---------+----+----+

scala> spark.sql("select empno,ename,sal,deptno from emp").show+-----+------+----+------+|empno| ename| sal|deptno|+-----+------+----+------+|7369| SMITH|800|20||7499| ALLEN|1600|30||7521|  WARD|1250|30||7566| JONES|2975|20||7654|MARTIN|1250|30||7698| BLAKE|2850|30||7782| CLARK|2450|10||7788| SCOTT|3000|20||7839|  KING|5000|10||7844|TURNER|1500|30||7876| ADAMS|1100|20||7900| JAMES|950|30||7902|  FORD|3000|20||7934|MILLER|1300|10|+-----+------+----+------+

scala> spark.sql("select empno,ename,sal,deptno from emp where deptno = 20").show+-----+-----+----+------+|empno|ename| sal|deptno|+-----+-----+----+------+|7369|SMITH|800|20||7566|JONES|2975|20||7788|SCOTT|3000|20||7876|ADAMS|1100|20||7902| FORD|3000|20|+-----+-----+----+------+

scala> spark.sql("select deptno,sum(sal),max(sal),min(sal),avg(nvl(sal,0)) from emp group by deptno").show+------+--------+--------+--------+----------------------+|deptno|sum(sal)|max(sal)|min(sal)|avg(nvl(emp.`sal`,0))|+------+--------+--------+--------+----------------------+|10|8750|5000|1300|2916.6666666666665||30|9400|2850|950|1566.6666666666667||20|10875|3000|800|2175.0|+------+--------+--------+--------+----------------------+

scala> spark.sql("select * from emp where sal>(select sal from emp where ename='ALLEN')").show+----+------+-----+-----+----------+---------+----+----+|comm|deptno|empno|ename|  hiredate|      job| mgr| sal|+----+------+-----+-----+----------+---------+----+----+|null|20|7566|JONES|1981-04-02|  MANAGER|7839|2975||null|30|7698|BLAKE|1981-05-01|  MANAGER|7839|2850||null|10|7782|CLARK|1981-06-09|  MANAGER|7839|2450||null|20|7788|SCOTT|1987-04-19|  ANALYST|7566|3000||null|10|7839| KING|1981-11-17|PRESIDENT|null|5000||null|20|7902| FORD|1981-12-02|  ANALYST|7566|3000|+----+------+-----+-----+----------+---------+----+----+

scala> df.createGlobalTempView("t1")-- 创建全局的临时视图-- 注意: 使用全局的临时视图时,访问要使用global_temp.来访问
scala> spark.sql("select * from global_temp.t1 where empno=7369").show+----+------+-----+-----+----------+-----+----+---+|comm|deptno|empno|ename|  hiredate|  job| mgr|sal|+----+------+-----+-----+----------+-----+----+---+|null|20|7369|SMITH|1980-12-17|CLERK|7902|800|+----+------+-----+-----+----------+-----+----+---+

scala> spark.newSession.sql("select * from global_temp.t1").show-- 开启新会话进行访问+----+------+-----+------+----------+---------+----+----+|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|+----+------+-----+------+----------+---------+----+----+|null|20|7369| SMITH|1980-12-17|    CLERK|7902|800||300|30|7499| ALLEN|1981-02-20| SALESMAN|7698|1600||500|30|7521|  WARD|1981-02-22| SALESMAN|7698|1250||null|20|7566| JONES|1981-04-02|  MANAGER|7839|2975||1400|30|7654|MARTIN|1981-09-28| SALESMAN|7698|1250||null|30|7698| BLAKE|1981-05-01|  MANAGER|7839|2850||null|10|7782| CLARK|1981-06-09|  MANAGER|7839|2450||null|20|7788| SCOTT|1987-04-19|  ANALYST|7566|3000||null|10|7839|  KING|1981-11-17|PRESIDENT|null|5000||0|30|7844|TURNER|1981-09-08| SALESMAN|7698|1500||null|20|7876| ADAMS|1987-05-23|    CLERK|7788|1100||null|30|7900| JAMES|1981-12-03|    CLERK|7698|950||null|20|7902|  FORD|1981-12-02|  ANALYST|7566|3000||null|10|7934|MILLER|1982-01-23|    CLERK|7369|1300|+----+------+-----+------+----------+---------+----+----+

2) DSL风格编程

Domain-Special Language的简写,指的是特殊领域的语言风格,换言之就是使用各种算子来分析数据。

案例演示:

scala> val session= spark

scala> df.show+----+------+-----+------+----------+---------+----+----+|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|+----+------+-----+------+----------+---------+----+----+|null|20|7369| SMITH|1980-12-17|    CLERK|7902|800||300|30|7499| ALLEN|1981-02-20| SALESMAN|7698|1600|.........

scala> df.printSchema     --打印df对应的schema
root
 |-- comm: long (nullable = true)|-- deptno: long (nullable = true)|-- empno: long (nullable = true)|-- ename: string (nullable = true)|-- hiredate: string (nullable = true)|-- job: string (nullable = true)|-- mgr: long (nullable = true)|-- sal: long (nullable = true)
 
 
 scala> val session= spark

scala> df.show+----+------+-----+------+----------+---------+----+----+|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|+----+------+-----+------+----------+---------+----+----+|null|20|7369| SMITH|1980-12-17|    CLERK|7902|800||300|30|7499| ALLEN|1981-02-20| SALESMAN|7698|1600||500|30|7521|  WARD|1981-02-22| SALESMAN|7698|1250||null|20|7566| JONES|1981-04-02|  MANAGER|7839|2975||1400|30|7654|MARTIN|1981-09-28| SALESMAN|7698|1250||null|30|7698| BLAKE|1981-05-01|  MANAGER|7839|2850||null|10|7782| CLARK|1981-06-09|  MANAGER|7839|2450||null|20|7788| SCOTT|1987-04-19|  ANALYST|7566|3000||null|10|7839|  KING|1981-11-17|PRESIDENT|null|5000||0|30|7844|TURNER|1981-09-08| SALESMAN|7698|1500||null|20|7876| ADAMS|1987-05-23|    CLERK|7788|1100||null|30|7900| JAMES|1981-12-03|    CLERK|7698|950||null|20|7902|  FORD|1981-12-02|  ANALYST|7566|3000||null|10|7934|MILLER|1982-01-23|    CLERK|7369|1300|+----+------+-----+------+----------+---------+----+----+

scala> df.select("deptno","ename","sal","comm").show--select是一个可变参数的转换算子+------+------+----+----+|deptno| ename| sal|comm|+------+------+----+----+|20| SMITH|800|null||30| ALLEN|1600|300||30|  WARD|1250|500||20| JONES|2975|null||30|MARTIN|1250|1400||30| BLAKE|2850|null||10| CLARK|2450|null||20| SCOTT|3000|null||10|  KING|5000|null||30|TURNER|1500|0||20| ADAMS|1100|null||30| JAMES|950|null||20|  FORD|3000|null||10|MILLER|1300|null|+------+------+----+----+

scala> df.select("deptno","ename").where("deptno=10").show+------+------+|deptno| ename|+------+------+|10| CLARK||10|  KING||10|MILLER|+------+------+

scala> df.select("deptno","ename").where("deptno=10").where("ename='KING'").show+------+-----+|deptno|ename|+------+-----+|10| KING|+------+-----+

scala> df.select("deptno","ename").where("deptno=10 and ename = 'KING'").show+------+-----+|deptno|ename|+------+-----+|10| KING|+------+-----+

scala> df.select("deptno","ename").where("deptno=10").where("ename='KING'").show+------+-----+|deptno|ename|+------+-----+|10| KING|+------+-----+-- 先执行select算子,然后对返回值执行分组查询,然后求聚合函数的操作,-- 因为分组查询的返回值为RelationalGroupedDataset  而它只有右边这些方法agg   avg  count  max   mean   min   pivot   sum
scala> df.select("deptno","ename","sal").groupBy("deptno").sum("sal").show+------+--------+|deptno|sum(sal)|+------+--------+|10|8750||30|9400||20|10875|+------+--------+

scala> df.select("deptno","ename","sal").where("ename=7369").show+------+-----+|deptno|ename|+------+-----+|10| KING|+------+-----+-- 注意:会报错。原因 sum返回的类型是DataFrame类型,而再次调用DataFrame类型时,是没有sum方法的,所以报错
scala> df.select("deptno","ename","sal").groupBy("deptno").sum("sal").sum("deptno").show()

2.2.3 DataFrame与RDD的转换

1)DataFrame=>RDD

scala>val df = session.read.json("D:/data/emp.json")

scala>val rdd1 = df.rdd   -- 返回的RDD[Row]类型

-- 获取Row的数据如下
scala> rdd1.map(row=>{println(row.get(1))}).collect()20303020

scala> rdd1.map(row=>{println(row.get(0))}).collect()null300500null1400nullnullnullnull0nullnullnullnull
res95: Array[Unit]= Array((),(),(),(),(),(),(),(),(),(),(),(),(),())

scala> rdd1.map(row=>{println(row.get(0)+","+row.get(1)+","+row.get(2))}).collect()null,20,7369300,30,7499500,30,7521null,20,75661400,30,7654null,30,7698null,10,7782null,20,7788null,10,78390,30,7844null,20,7876null,30,7900null,20,7902null,10,7934

2)RDD=>DataFrame

scala>var rdd1 = sc.makeRDD(List(1,2,3,4,5,6))
scala>var df = rdd1.toDF
df: org.apache.spark.sql.DataFrame =[value: int]

scala> df.select("value").show
+-----+|value|+-----+|1||2||3||4||5||6|+-----+

scala>var df = rdd1.toDF("id")-- 指定列名
df: org.apache.spark.sql.DataFrame =[id: int]

scala> df.select("id").show
+---+| id|+---+|1||2||3||4||5||6|+---+

scala>var rdd1 = sc.makeRDD(List(("1001","zhangsan","f"),("1002","lisi","f")))
rdd1: org.apache.spark.rdd.RDD[(String,String,String)]= ParallelCollectionRDD[189] at makeRDD at <console>:24

scala>var df = rdd1.toDF("id","name","gender")
df: org.apache.spark.sql.DataFrame =[id: string, name: string ...1 more field]

scala> df.select("*").show
+----+--------+------+|  id|    name|gender|+----+--------+------+|1001|zhangsan|     f||1002|    lisi|     f|+----+--------+------+

2.3 DataSet的应用

2.3.1 DataSet的创建方式

1.  要维护一个样例类
    caseclass Student(id:Int, name:String, gender:Int, age:Int)2.  在内存中维护几个样例类对象,存储到集合中
    var students = List(
        Student(1001,"zhangsan",0,23),
        Student(1002,"lisi",1,24),
        Student(1003,"zhangsi",0,25),
        Student(1004,"zhaosan",1,23))3. 调用createDataset方法获取Dataset对象
    var ds = session.createDataset(students)var ds = session.createDataset[Student](students)

案例测试:

scala>caseclass Student(id:Int, name:String, gender:Int, age:Int)
scala>var students = List(
        Student(1001,"zhangsan",0,23),
        Student(1002,"lisi",1,24),
        Student(1003,"zhangsi",0,25),
        Student(1004,"zhaosan",1,23))

scala>var ds = session.createDataset(students)

scala> ds.groupBy("age").sum().show
+---+-------+-----------+--------+|age|sum(id)|sum(gender)|sum(age)|+---+-------+-----------+--------+|23|2005|1|46||25|1003|0|25||24|1002|1|24|+---+-------+-----------+--------+

scala> ds.groupBy("age").max().show
+---+-------+-----------+--------+|age|max(id)|max(gender)|max(age)|+---+-------+-----------+--------+|23|1004|1|23||25|1003|0|25||24|1002|1|24|+---+-------+-----------+--------+

scala> ds.where("age=23").where("id=1001").show
+----+--------+------+---+|  id|    name|gender|age|+----+--------+------+---+|1001|zhangsan|0|23|+----+--------+------+---+

scala>caseclass Cat(id:Int,name:String,color:String)
defined class Cat

scala>val ds = session.createDataset(List(Cat(1,"m1","black"),Cat(2,"m2","black"),Cat(3,"m3","red")))
ds: org.apache.spark.sql.Dataset[Cat]=[id: int, name: string ...1 more field]

scala> ds.groupBy("color").count
res107: org.apache.spark.sql.DataFrame =[color: string, count: bigint]

scala> ds.groupBy("color").count.show
+-----+-----+|color|count|+-----+-----+|  red|1||black|2|+-----+-----+

2.3.2 DataSet与RDD之间的转换

  1. DataSet=>RDD
scala>caseclass Cat(id:Int,name:String,color:String)
defined class Cat

scala>val ds = session.createDataset(List(Cat(1,"m1","black"),Cat(2,"m2","black"),Cat(3,"m3","red")))

scala>var rdd1 = ds.rdd
rdd1: org.apache.spark.rdd.RDD[Cat]= MapPartitionsRDD[229] at rdd at <console>:29

scala> rdd1.foreach{case a:Cat => println(a.id+"\t"+a.name+"\t"+a.color)}3       m3      red
2       m2      black
1       m1      black

2)RDD=>DataSet

scala>var rdd1 = sc.makeRDD(List(1,2,3,4,5,6))

scala>var ds = rdd1.toDS         --toDS()  是无参方法,toDF是可变参数方法
ds: org.apache.spark.sql.Dataset[Int]=[value: int]

scala> ds.select("value").show
+-----+|value|+-----+|1||2||3||4||5||6|+-----+

scala> ds.select("*").show
+-----+|value|+-----+|1||2||3||4||5||6|+-----+

scala>var rdd1 = sc.makeRDD(List(("1001","zhangsan","f"),("1002","lisi","f")))
rdd1: org.apache.spark.rdd.RDD[(String,String,String)]= ParallelCollectionRDD[237] at makeRDD at <console>:24

scala>var ds = rdd1.toDS
ds: org.apache.spark.sql.Dataset[(String,String,String)]=[_1: string, _2: string ...1 more field]

scala> ds.select("*").show
+----+--------+---+|  _1|      _2| _3|+----+--------+---+|1001|zhangsan|  f||1002|    lisi|  f|+----+--------+---+

scala> ds.select("_1","_2").show
+----+--------+|  _1|      _2|+----+--------+|1001|zhangsan||1002|    lisi|+----+--------+

scala>var rdd1 = sc.makeRDD(List(("1001","zhangsan","f"),("1002","lisi","f")))-- 将集合里的每一个元组对象,封装成Cat对象
scala>importorg.apache.spark.rdd.RDD
importorg.apache.spark.rdd.RDD

scala>var rdd2:RDD[Cat]= rdd1.map(ele=>{Cat(ele._1.toInt,ele._2,ele._3)})

scala>var ds = rdd2.toDS
ds: org.apache.spark.sql.Dataset[Cat]=[id: int, name: string ...1 more field]

scala> ds.select("id","name").show
+----+--------+|  id|    name|+----+--------+|1001|zhangsan||1002|    lisi|+----+--------+

2.3.2 DataSet与DataFrame之间的转换

  1. DataSet=>DataFrame
scala> ds
res121: org.apache.spark.sql.Dataset[Cat]=[id: int, name: string ...1 more field]

scala> ds.to
toDF   toJSON   toJavaRDD   toLocalIterator   toString

scala>var df = ds.toDF
df: org.apache.spark.sql.DataFrame =[id: int, name: string ...1 more field]

scala> df.select("id","name").show
+----+--------+|  id|    name|+----+--------+|1001|zhangsan||1002|    lisi|+----+--------+

2)DataFrame=>Dataset

scala>var df = ds.toDF    --注意:df 有id,name,color三个字段
df: org.apache.spark.sql.DataFrame =[id: int, name: string ...1 more field]--定义一个样例类,属性一致
scala>caseclass Dog(id:Int,name:String,color:String)
defined class Dog

scala>var ds = df.as[Dog]-- 转使用as[类型名] 
ds: org.apache.spark.sql.Dataset[Dog]=[id: int, name: string ...1 more field]

三者之间的转换如图所示:
在这里插入图片描述

2.4 IDEA开发SparkSQL

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>spark_sz2102</artifactId><version>1.0-SNAPSHOT</version><!-- 声明公有的属性 --><properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><encoding>UTF-8</encoding><scala.version>2.11.8</scala.version><spark.version>2.2.3</spark.version><hadoop.version>2.7.6</hadoop.version><scala.compat.version>2.11</scala.compat.version></properties><!-- 声明并引入公有的依赖 --><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>${spark.version}</version></dependency></dependencies></project>

2.4.1 SparkSession的获取方式演示案例

packagecom.qf.sparksql.day02importorg.apache.hadoop.hive.conf.HiveConf
importorg.apache.spark.sql.hive.HiveContext
importorg.apache.spark.{SparkConf, SparkContext}importorg.apache.spark.sql.{DataFrame, SQLContext}/**
 * 研究一下SparkSession(sparksql的编程入口)的获取方式
 * 1:  先研究低版本的SQLContext和HiveContext的获取
 */object Spark_01_GetSparkSession {def main(args: Array[String]):Unit={var conf  =new SparkConf().setMaster("local[*]").setAppName("GetSparkSession")var sparkContext =new SparkContext(conf)/**
         * 旧版本的SQLContext的获取
         */val sqlContext =new SQLContext(sparkContext)val df: DataFrame = sqlContext.read.json("data/emp.json")
        df.show()/**
         * 旧版本的SQLContext的获取,  注意,需要配置hive的连接参数才可以。
         * 如果连接成功,只需要调用hiveContext的table方法访问表即可。
         */val hiveContext =new HiveContext(sparkContext)val df1: DataFrame = hiveContext.table("sz2103.emp")
        df1.show()}}

新版本的SparkSession的获取方式

packagecom.qf.sparksql.day02importorg.apache.spark.sql.hive.HiveContext
importorg.apache.spark.sql.{DataFrame, SQLContext, SparkSession}importorg.apache.spark.{SparkConf, SparkContext}/**
 * 研究一下SparkSession(sparksql的编程入口)的获取方式
 * 2:  获取SparkSession的方式
 */object Spark_02_GetSparkSession_new {def main(args: Array[String]):Unit={var conf  =new SparkConf().setMaster("local[*]").setAppName("GetSparkSession")/**
         * SparkSession 是一个半生对象,可以直接使用,如果想要获取该对象,需要获取构建器对象,调用getOrCreate方法。
         * 注意事项:
         *      1.  必须指定master的配置
         *            --方式1: 可以调用master()来指定
         *            --方式2: 可以调用config(conf:SparkConf),传入spark配置对象来指定
         *                     config有很多重载方法,用于指定配置参数和参数值。如config(key:String,value:*)
         *///val spark: SparkSession = SparkSession.builder().master("local[*]").getOrCreate()//val spark: SparkSession = SparkSession.builder().config(conf).config("topn",10).config("author","michael").conf.getOrCreate()/**
         *  2: 获取连接Hive的SparkSession对象
         */val spark: SparkSession = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()}}

2.4.2 两种编程风格的案例演示

1)SQL风格

packagecom.qf.sql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.sql.{DataFrame, SparkSession}object _02SQLStyle {def main(args: Array[String]):Unit={var conf =new SparkConf().setMaster("local[*]").setAppName("TwoStyle")val session = SparkSession.builder().config(conf).getOrCreate()//获取DataFrame对象val df: DataFrame = session.read.json("D:/input/user.json")//        df.show()/**
         *    1:SQL风格的演示
         *
         *
         *    -- 创建视图
         *           createTempView
         *           createOrReplaceTempView
         *
         *           createGlobalTempView
         *           createOrReplaceGlobalTempView
         *
         * show算子:行动算子
         *     show():  默认显示前20行,如果字段的值的长度超过20个,则截断显示
         *     show(truncate: Boolean)
         *     show(numRows: Int, truncate: Boolean)
         *     show(numRows: Int, truncate: Int)
         *     show(numRows: Int)
         *
         */

        df.createTempView("user")//        session.sql("select * from user").show()//        session.sql("select age,count(1),avg(age) from user group by age order by age desc").show()//        val sql = "select age,count(1),avg(age) from user group by age order by age limit 2"//        session.sql(sql).show()//注意:sparksql中的sql风格,如果单独指定sql语句,一般使用以下方式,可以进行换行val sql1 ="""
              |select age,
              |count(1),
              |avg(age),
              |max(age) maxAge
              |from user
              |group by age
              |order by age desc
              |limit 2
              |""".stripMargin
        session.sql(sql1).show()}}

2)DSL风格

DSL风格,就是调用DataFame或者使Dataset的算子。  注意返回值类型,是否可以连续调用
packagecom.qf.sql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.sql.{Column, DataFrame, SparkSession}object _03DSLStyle {def main(args: Array[String]):Unit={var conf =new SparkConf().setMaster("local[*]").setAppName("TwoStyle")val session = SparkSession.builder().config(conf).getOrCreate()val df: DataFrame = session.read.json("D:/input/user.json")/**
         * 第二种风格: DSL风格, 即调用算子
         *
         * df的一些算子返回的如果依然是DataFrame,则可以连续调用。 但是要注意返回的DataFrame代表的数据是什么
         *///        df.select("age","username").where("age>30").show()//        df.groupBy("age").max("age").select("age").showimportsession.implicits._

//        df.groupBy("age").max("age").show//        df.select($"age",$"username").where($"age">20).show()//        df.select('age,'username).where('age > 20).show()//        df.select(new Column("age"),'username).where('age > 20).show()// 别名的演示://        df.select('age.as("myage"),'username.as("user")).where('age > 20).show()//        df.groupBy("age").max("age").select($"age".as("age1"),$"max(age)".as("maxAge")).show()//        df.selectExpr("age","username").where("age > 20").show()//        df.selectExpr("age as age1","username as user").where("age > 20").show()//查询时,并计算
        df.select("age + 1").show()
        df.select($"age"+1).show()
        df.selectExpr("age + 1").where("age > 20").show()
        session.stop()}}

3)DSL风格总结:

算子传入的参数,可以概括两种形式:
    第一种传入的是字符串类型: 比如select(col:String,cols:String*)
        注意: 
          -- 不能直接做运算-- 不能使用别名

    第二种传入的是Column类型: 比如select(col:Column,cols:Column*),该类型有以下几种写法
        --1. 使用$符号,  select($"age",$"username")--2. 使用一个单引号,  select(’age,‘username)--3. 使用new Column,select(new Column("age"),new Column("username"))--4. 使用col,select(col("age"),col("username"))--5. 使用DataFrame变量,select(df("age"),df("username"))
        注意: 
        -- 可以直接做运算-- 可以使用别名,需要在select算子中使用-- 需要导入隐式转换操作:import sparkSession.implicits._

2.4.3 DataFrame和Dataset的创建方式

1)DataFrame的创建方式

packagecom.qf.sparksql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, SparkSession}/**
 * 创建DataFrame的方式有以下几种
 *       1. 读外部设备的文件,返回DataFrame对象
 *       2. 从RDD转换成DataFrame对象
 *       3. 读取Hive中的表,返回DataFrame对象
 *       4. 调用createDataFrame方法,返回DataFrame对象
 */object Spark_06_CreateDataFrame_1 {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("SQLStyle")val sparkSession = SparkSession
          .builder().config(conf)//.enableHiveSupport() // 开启hive支持.getOrCreate()//因为后续的代码,不一定那地方就需要隐式转换操作,所以建议在获取编程入口时,直接写隐式转换importsparkSession.implicits._
        /**
         * 第一种方式:读取外部文件,获取DataFrame
         *///val frame: DataFrame = sparkSession.read.json("data/emp.json")/**
         * 第二种方式:从RDD转换成DataFrame对象
         */val rdd1: RDD[String]= sparkSession.sparkContext.makeRDD(List("zhangsan","lisi","wangwu"))/**
         * toDF():   将rdd转成DataFrame,  默认将rdd的元素转成一列,列名叫value.
         * toDF(cols:String*), 用于指定转成DataFrame的列名。
         *
         *
         *//*val df: DataFrame = rdd1.toDF()
        df.select("value").show()*//* val df: DataFrame = rdd1.toDF("name")
       df.select("name").show()*/val rdd2: RDD[String]= sparkSession.sparkContext.makeRDD(List("1001,zhangsan,23","1002,lisi,24","1003,wangwu,23"))val rdd3: RDD[(String,String,String)]= rdd2.map(line =>{val arr: Array[String]= line.split(",")(arr(0),arr(1),arr(2))})val frame: DataFrame = rdd3.toDF("id","name","age")
        frame.select("*").show()/**
         * 第三种方式:读取Hive中的表,返回DataFrame对象
         *    需要开启hive支持,然后就可以使用SparkSession里提供的table方法读取hive表了
         *///val frame1: DataFrame = sparkSession.table("sz2103.emp")

        sparkSession.stop()}}

第四种方式的情况1:javaBean+反射

packagecom.qf.sparksql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, SparkSession}/**
 * 创建DataFrame的方式有以下几种
 *       1. 读外部设备的文件,返回DataFrame对象
 *       2. 从RDD转换成DataFrame对象
 *       3. 读取Hive中的表,返回DataFrame对象
 *       4. 调用createDataFrame方法,返回DataFrame对象
 *
 *  这里研究第四种的方式
 *         可以细分为两种情况
 *         第一种:使用javaBean+反射机制     注意,普通的值类型不可以
 *         第二种:动态编程
 */object Spark_07_CreateDataFrame_2 {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("SQLStyle")val sparkSession = SparkSession.builder().config(conf).getOrCreate()//因为后续的代码,不一定哪地方就需要隐式转换操作,所以建议在获取编程入口时,直接写隐式转换importsparkSession.implicits._

        //定义一个集合,存储多个Student类型val list = List(new Student(1001,"zhangsan",23),new Student(1002,"lisi",23),new Student(1003,"wangwu",24),new Student(1004,"zhaoliu",24))/**
         * 调用会话的createDataFrame(java.util.List<_>,beanClass)
         *     第一个参数:java类型的集合
         *     第二个参数:Class对象
         *///该作用是将java类型的集合转成Scala类型的集合importscala.collection.JavaConversions._
        val frame: DataFrame = sparkSession.createDataFrame(list,classOf[Student])
        frame.createTempView("student")val sql ="""
              |select
              |  count(1) `学生个数`,
              |  max(age) `最大年龄`,
              |  min(age) `最小年龄`
              |from student
              |""".stripMargin
        sparkSession.sql(sql).show()

        sparkSession.stop()}}packagecom.qf.sparksql.day02;

public class Student {private int id;privateString name;private int age;

    public Student(){}

    public Student(int id,String name, int age){this.id = id;this.name = name;this.age = age;}

    public int getId(){return id;}

    public void setId(int id){this.id = id;}

    public String getName(){return name;}

    public void setName(String name){this.name = name;}

    public int getAge(){return age;}

    public void setAge(int age){this.age = age;}
    public String toString(){return id+","+name+","+age;}}

第四种方式的情况1:动态编程

packagecom.qf.sparksql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.sql.{DataFrame, Row, SparkSession}/**
 * 创建DataFrame的方式有以下几种
 *       1. 读外部设备的文件,返回DataFrame对象
 *       2. 从RDD转换成DataFrame对象
 *       3. 读取Hive中的表,返回DataFrame对象
 *       4. 调用createDataFrame方法,返回DataFrame对象
 *
 *  这里研究第四种的方式
 *         可以细分为两种情况
 *         第一种:使用javaBean+反射机制     注意,普通的值类型不可以
 *         第二种:动态编程
 */object Spark_08_CreateDataFrame_3 {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("SQLStyle")val sparkSession = SparkSession.builder().config(conf).getOrCreate()//因为后续的代码,不一定哪地方就需要隐式转换操作,所以建议在获取编程入口时,直接写隐式转换/** 使用动态编程的方式获取DataFrame,  其实就是使用StructField和StructType类型以及rowRDD构建DataFrame
         *///构建一个RowRDDval rdd1: RDD[Row]= sparkSession.sparkContext.makeRDD(
            List(
                Row(1001,"zhangsan",23),
                Row(1002,"lisi",25),
                Row(1003,"wangwu",24),
                Row(1004,"zhaoliu",23)))/**
         * StructField(name: String,dataType: DataType, nullable: Boolean = true,metadata: Metadata = Metadata.empty)
         * 是一个用来描述字段的信息的样例类
         *      name:  字段名称
         *      dataType:  字段类型
         *      nullable:   是否可以为null
         *      metadata:  元信息
         */val fields:Array[StructField]= Array(
            StructField("id",IntegerType),
            StructField("name",StringType),
            StructField("age",IntegerType))/**
         * StructType(fields: Array[StructField])
         * 是一个用来构建表头信息的样例类
         *     参数: 描述列的一个集合或数组
         */val schema:StructType =StructType(fields);/**
         * 调用createDataFrame(rowRDD: RDD[Row], schema: StructType)
         *    第一个参数:泛型为Row的RDD
         *    第二个参数:用于规定表头的元数据
         */val df: DataFrame = sparkSession.createDataFrame(rdd1, schema)
        df.show()

        sparkSession.stop()}}

2)Dataset的创建方式

packagecom.qf.sparksql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}importorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/**
 * 创建Dataset的方式和创建DataFrame的方式差不多。
 *  1.  从RDD转换而来
 *  2.  调用createDataset方法创建而来
 */object Spark_09_CreateDataset_1 {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("SQLStyle")val sparkSession = SparkSession.builder().config(conf).getOrCreate()importsparkSession.implicits._

        //第一种方式:rdd转换成Dataset/* val rdd1: RDD[Int] = sparkSession.sparkContext.makeRDD(List(1, 2, 3, 4, 5))
        val ds: Dataset[Int] = rdd1.toDS()
        ds.show()*//**
         * 第二种使用createDataset方法
         */val ts = List(Teacher(1001,"lucy",23),
            Teacher(1002,"lily",23),
            Teacher(1003,"Tom",24))val ds: Dataset[Teacher]= sparkSession.createDataset(ts)//需求:按照年龄分组,查询每种年龄的个数
        ds.groupBy("age").count().show()

        sparkSession.stop()}caseclass Teacher(id:Int,name:String,age:Int)}

2.4.4 RDD与DataFrame、Dataset之间的转换

  1. rdd => DF|DS
packagecom.qf.sparksql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, Dataset, SparkSession}/**
 *  RDD 转  DataFrame  或者是Dataset
 */object Spark_10_RDD_ToDFOrDS {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("SQLStyle")val sparkSession = SparkSession.builder().config(conf).getOrCreate()importsparkSession.implicits._

        val rdd1: RDD[Int]= sparkSession.sparkContext.makeRDD(List(1,2,3,4,5))
        println("--------------RDD===>DataFrame:  RDD的元素只有一列的情况-----------------------")val df1: DataFrame = rdd1.toDF("num")
        df1.show()

        println("--------------RDD===>Dataset:  RDD的元素只有一列的情况-----------------------")val ds: Dataset[Int]= rdd1.toDS()
        ds.show()val rdd2: RDD[(Int,String,Int)]= sparkSession.sparkContext.makeRDD(List((1,"lily",23),(1,"lucy",24),(1,"tom",25)))
        println("--------------RDD===>DataFrame:  RDD的元素只有多列的情况,只能使用元组-----------------------")val df2: DataFrame = rdd2.toDF("id","name","age")
        df2.show()

        println("--------------RDD===>Dataset:  RDD的元素是元组多列的情况下,列名是_1,_2,_3,....... -----------------------")val ds1: Dataset[(Int,String,Int)]= rdd2.toDS()
        ds1.select("_2").show()val rdd3: RDD[Dog]= sparkSession.sparkContext.makeRDD(List(Dog("旺财","白色"), Dog("阿虎","棕色")))
        println("--------------其他自定义类型的RDD===>DataFrame -----------------------")val df3: DataFrame = rdd3.toDF()
        df3.select("color").where("color='白色'").show()

        println("--------------其他自定义类型的RDD===>Dataset -----------------------")val ds3: Dataset[Dog]= rdd3.toDS()
        ds3.where("color='棕色'").select("*").show()

        sparkSession.stop()}caseclass Dog(name:String,color:String)}

2)df=>rdd|ds

packagecom.qf.sparksql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}importjava.util.Date

/**
 *  DataFrame 转  RDD  或者是Dataset
 */object Spark_11_DF_ToRDDOrDS {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("SQLStyle")val sparkSession = SparkSession.builder().config(conf).getOrCreate()importsparkSession.implicits._

        val df: DataFrame = sparkSession.read.json("data/emp.json")

        println("--------------DataFrame=>RDD   注意:RDD的泛型为Row-----------------------")val rdd1: RDD[Row]= df.rdd
        //rdd1.foreach(println)
        rdd1.foreach(row=>println(row.get(0)+","+row.get(1)+","+row.get(2)))

        println("--------------DataFrame=>Dataset    注意: 1 需要自定义一个类型与df中的列数以及类型进行匹配,2,使用as[自定义类型]进行转换即可-----------------------")val ds: Dataset[E]= df.as[E]
        ds.show()

        sparkSession.stop()}caseclass E(empno:Long,ename:String,job:String,mgr:Long,hiredate:String,sal:Double,comm:Double,Deptno:Long)}

3)ds=>rdd|df

packagecom.qf.sparksql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}/**
 *  或者是Dataset 转  RDD  或者是DataFrame
 */object Spark_12_DS_ToRDDOrDF {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("SQLStyle")val sparkSession = SparkSession.builder().config(conf).getOrCreate()importsparkSession.implicits._

        val emps = List(Employee(1001,"lucy","saleman",1000),
            Employee(1002,"lily","saleman",1001),
            Employee(1003,"john","saleman",1001),
            Employee(1004,"michael","boss",1002))val ds: Dataset[Employee]= sparkSession.createDataset(emps)

        println("------------Dataset =>  RDD    两个数据模型的泛型是一样的-------------------------")val rdd1: RDD[Employee]= ds.rdd
        rdd1.foreach(emp=>println(emp.ename+"\t"+emp.job))

        println("------------Dataset =>  DataFrame   :本质就是将Dataset的泛型转成Row形式 ------------------------")val df: DataFrame = ds.toDF()
        df.where("mgr=1001").select("*").show()

        sparkSession.stop()}caseclass Employee(empno:Long,ename:String,job:String,mgr:Long)}

2.5 常用算子的演示

sparksql里的算子同样分两种类型,一种是转换算子,一种是行动算子。转换算子的作用是ds|df转成另外一种ds或者df;行动算子是用于触发程序执行的。

准备数据:emp.json文件

{"empno":7369,"ename":"SMITH","job":"CLERK","mgr":7902,"hiredate":"1980-12-17","sal":800,"comm":null,"deptno":20}{"empno":7499,"ename":"ALLEN","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-20","sal":1600,"comm":300,"deptno":30}{"empno":7521,"ename":"WARD","job":"SALESMAN","mgr":7698,"hiredate":"1981-02-22","sal":1250,"comm":500,"deptno":30}{"empno":7566,"ename":"JONES","job":"MANAGER","mgr":7839,"hiredate":"1981-04-02","sal":2975,"comm":null,"deptno":20}{"empno":7654,"ename":"MARTIN","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-28","sal":1250,"comm":1400,"deptno":30}{"empno":7698,"ename":"BLAKE","job":"MANAGER","mgr":7839,"hiredate":"1981-05-01","sal":2850,"comm":null,"deptno":30}{"empno":7782,"ename":"CLARK","job":"MANAGER","mgr":7839,"hiredate":"1981-06-09","sal":2450,"comm":null,"deptno":10}{"empno":7788,"ename":"SCOTT","job":"ANALYST","mgr":7566,"hiredate":"1987-04-19","sal":3000,"comm":null,"deptno":20}{"empno":7839,"ename":"KING","job":"PRESIDENT","mgr":null,"hiredate":"1981-11-17","sal":5000,"comm":null,"deptno":10}{"empno":7844,"ename":"TURNER","job":"SALESMAN","mgr":7698,"hiredate":"1981-09-08","sal":1500,"comm":0,"deptno":30}{"empno":7876,"ename":"ADAMS","job":"CLERK","mgr":7788,"hiredate":"1987-05-23","sal":1100,"comm":null,"deptno":20}{"empno":7900,"ename":"JAMES","job":"CLERK","mgr":7698,"hiredate":"1981-12-03","sal":950,"comm":null,"deptno":30}{"empno":7902,"ename":"FORD","job":"ANALYST","mgr":7566,"hiredate":"1981-12-02","sal":3000,"comm":null,"deptno":20}{"empno":7934,"ename":"MILLER","job":"CLERK","mgr":7369,"hiredate":"1982-01-23","sal":1300,"comm":null,"deptno":10}

1)常用的行动算子

--show()   用于展示前N条记录   n默认值使20--collect():   将df的数据转成数组类型,数组的泛型为Row
 --collectAsList():将df的数据转成java的集合类型,集合类型的泛型为Row
 --first|head :  取出df中的第一个row对象
 --take(topn:Int):   取出df中的前n个row,封装到Array里
 --takeAsList(topn:Int): 取出df中的前n个row,封装到java.util.List里
 --printSchema
packagecom.qf.sparksql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}importjava.util/**
 *  行动算子的演示:
 */object Spark_13_Action_1 {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("SQLStyle")val sparkSession = SparkSession.builder().config(conf).getOrCreate()val df: DataFrame = sparkSession.read.json("data/emp.json")/**
         *     show算子,用于触发程序的运行,并显示数据
         *      1. show()  : 默认显示前20行
         *      2. show(numRows: Int): Unit   :自定义显示的行数
         *      3. show(truncate: Boolean): Unit  : 自定义是否截取
         *      4. show(numRows: Int, truncate: Boolean): Unit :自定义现实的额行数,自定义是否截取超长的字节数量
         *      5. show(numRows: Int, truncate: Int): Unit   : 自定义显示的行数,每列的显示的最长字节数据量,超出的截断。
         */
        df.show(10,4)/**
         * collect()算子:  搜集算子,将df的数据搜集成数组类型,数组的泛型为Row
         * collectAsList():将df的数据转成java的集合类型,集合类型的泛型为Row
         */val rows: Array[Row]= df.collect()
        rows.foreach(println)val rows1: util.List[Row]= df.collectAsList()
        println(rows1)

        println("-------------first|head :  取出df中的第一个row对象-------------")//val row: Row = df.first()val row: Row = df.head()
        println(row)

        println("-------------take(topn:Int)|head(n:Int):   取出df中的前n个row,封装到Array里-------------")//val rows2: Array[Row] = df.take(2)val rows2: Array[Row]= df.head(2)
        rows2.foreach(println)

        println("-------------takeAsList(topn:Int): 取出df中的前n个row,封装到java.util.List里-------------")val rows3: util.List[Row]= df.takeAsList(2)
        println(rows3)

        println("-------------printSchema-------------")
        df.printSchema()
        sparkSession.stop()}}

2)常用的转换算子

--select():  参数类型有两种类型,一种是字符串,一种是Column类型
        df.select(df("ename"),df("job")).show      <---  Column
        df.select("ename","job").show              <---String--filter()--where(): 用于条件过滤, 注意条件如果是等号的话,一个等号即可,和mysql一样 
             字符串时,要使用单引号
--describe(col:String*),
        用来显示指定字段的分析数据,分析数据有五个,分别使max,min,count,mean,stddev
--selectExpr(expr:String*)    
    df.selectExpr("ename as uname","sal","sal + 1000 as addsal").show
--drop(col:String*)
    排除df中的指定字段
--limit(n:Int) 
    取出前n条记录
--orderBy(col:Column*)|sort(col:Column*)--sortWithinPartitions(col:Column*): 用于分区内排序
--groupBy(col:String*):  
     返回值为RelationalGroupedDataset, 因此只能再次调用聚合算子
--dropDuplicates(col:String*):用于指定字段组合进行去重,同一个组合不能有相同的记录
--union:   两个df做联合操作,  追加
--join:   参数有以下几种
         ds.join(ds,"关联字段","join的类型").show
         ds.join(ds,"关联字段","inner").show
         ds.join(ds,"关联字段","full").show
         ds.join(ds,"关联字段","left").show
--explode: 展开
        scala> df.explode("hiredate","dt"){x:String=> x.split("-")}.show
        +----+------+-----+------+----------+--------+----+----+----+|comm|deptno|empno| ename|  hiredate|     job| mgr| sal|  dt|+----+------+-----+------+----------+--------+----+----+----+|null|20|7369| SMITH|1980-12-17|   CLERK|7902|800|1980||null|20|7369| SMITH|1980-12-17|   CLERK|7902|800|12||null|20|7369| SMITH|1980-12-17|   CLERK|7902|800|17||300|30|7499| ALLEN|1981-02-20|SALESMAN|7698|1600|1981||300|30|7499| ALLEN|1981-02-20|SALESMAN|7698|1600|02||300|30|7499| ALLEN|1981-02-20|SALESMAN|7698|1600|20|+----+------+-----+------+----------+--------+----+----+----+--withColumn : 基于sal新增一列salary
        scala>  df.withColumn("salary", df("sal")).show
        +----+------+-----+------+----------+---------+----+----+------+|comm|deptno|empno| ename|  hiredate|      job| mgr| sal|salary|+----+------+-----+------+----------+---------+----+----+------+|null|20|7369| SMITH|1980-12-17|    CLERK|7902|800|800||300|30|7499| ALLEN|1981-02-20| SALESMAN|7698|1600|1600|+----+------+-----+------+----------+---------+----+----+------+--withColumnRenamed: 给字段重命名
    scala> df.withColumnRenamed("ename","name").show
        +----+------+-----+------+----------+---------+----+----+|comm|deptno|empno|  name|  hiredate|      job| mgr| sal|+----+------+-----+------+----------+---------+----+----+|null|20|7369| SMITH|1980-12-17|    CLERK|7902|800||300|30|7499| ALLEN|1981-02-20| SALESMAN|7698|1600|+----+------+-----+------+----------+---------+----+----+--intersect: 交集
--except:差集

演示1:

packagecom.qf.sparksql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.{Column, DataFrame, Row, SparkSession}importjava.util/**
 *  行动算子的演示:
 */object Spark_14_Transfer_1 {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("SQLStyle")val sparkSession = SparkSession.builder().config(conf).getOrCreate()importsparkSession.implicits._

        val df: DataFrame = sparkSession.read.json("data/emp.json")
        println("-------  select():选择要显示的字段,一种传入字符串类型,一种传入Column类型-------------------")
        df.select("empno","ename","job").show()
        df.select($"empno",'ename,new Column("job"),col("mgr")).show()

        println("-------  filter()  :用于过滤数据的,内部需要传入一个返回boolean类型的表达式-------------------")
        df.filter("empno>7800 and empno <7850").show()
        println("-------  where()  :用于过滤数据的   , 条件中带有字符串,需要使用单引号 -------------------")
        df.where("empno>7800 and  ename like 'K%'").show()

        println("-------  describe(col:String*),用来显示指定字段的分析数据,分析数据有五个,分别使max,min,count,mean,stddev -------------------")
        df.describe("sal","comm").show()

        println("-------  selectExpr(expr:String*)  -------------------")
        df.selectExpr("ename as uname","sal","sal + 1000 as addsal").show

        println("-------  drop(col:String*): 排除df中的指定字段  -------------------")
        df.drop("mgr","hiredate").show()

        println("-------  limit(n:Int) : 取出前n条记录  -------------------")
        df.limit(3).show()

        println("-------  orderBy(col:Column*)|sort(col:Column*)   -------------------")
        df.orderBy($"deptno").show()
        df.orderBy($"deptno".desc).show()
        df.orderBy(-$"deptno").show()
        df.sort($"deptno",-$"sal").show()
        sparkSession.stop()}}

演示2:

packagecom.qf.sparksql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.{Column, DataFrame, SparkSession}/**
 *  行动算子的演示:
 */object Spark_14_Transfer_2 {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("SQLStyle")val sparkSession = SparkSession.builder().config(conf).getOrCreate()importsparkSession.implicits._

        val list = List(
            Animal(1001,"A",3),
            Animal(1002,"B",4),
            Animal(1003,"C",5),
            Animal(1004,"D",4),
            Animal(1005,"E",3),
            Animal(1005,"F",4),
            Animal(1007,"G",2))val rdd1: RDD[Animal]= sparkSession.sparkContext.makeRDD(list,2)val df: DataFrame = rdd1.toDF()
        println("-------  sortWithinPartitions(col:Column*)  : 用于分区内排序-------------------")
        df.sortWithinPartitions(-$"id",-$"age").show()

        println("-------  groupBy(col:String*):  返回值为RelationalGroupedDataset, 因此只能再次调用聚合算子-------------------")
        df.groupBy("age").count().show()

        sparkSession.stop()}caseclass Animal(id:Int,name:String,age:Int)}

演示3:

packagecom.qf.sparksql.day02importorg.apache.spark.SparkConf
importorg.apache.spark.sql.functions.colimportorg.apache.spark.sql.{Column, DataFrame, SparkSession}/**
 *  行动算子的演示:
 */object Spark_14_Transfer_3 {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("SQLStyle")val sparkSession = SparkSession.builder().config(conf).getOrCreate()importsparkSession.implicits._

        val df: DataFrame = sparkSession.read.json("data/emp.json")
        println("------- dropDuplicates(col:String*) :用于指定字段组合进行去重,同一个组合留一条---------------------")
        df.dropDuplicates("deptno","job").show()

        println("------- union:   两个df做联合操作,  追加---------------------")

        println(df.union(df).count())importscala.collection.JavaConversions._
        println("------- join:   参数有以下几种---------------------")val ds = df.as[F]
        ds.join(ds,"deptno").show(100)

        println("------- -withColumn : 新增一列,   需要基于某一列---------------------")

        df.withColumn("newSal",$"sal"+1000).show()

        sparkSession.stop()}caseclass F(empno:Long,ename:String,job:String,mgr:Long,hiredate:String,sal:Double,comm:Double,Deptno:Long)}

扩展:这两个函数是高阶分组函数

假如有一张销售表:
        年       月   日   销售额
        2021     1   1     10
        2021     1   1     1000
        2021     1   1     102
        ............
数据分析人员的需求可能如下:
           select  年  月   日   销售额   from ...group by  年  月  日
           union
           select  年  null 日   销售额   from ...group by  年  日
           union
           select  年  月   null 销售额   from ...group by  年  月 
           union
           select  null 月  日   销售额   from ...group by  月  日
           union 
           select  null 月  null   销售额   from ...group by  月 
           union
           select  年 null null  销售额   from ...group by  年
           union
           select  null null 日   销售额   from ...group by  日
           union
           select  null,null,null,销售额   from ...
针对于上述需求: 有一个高阶函数cube与之对应。
--cube:      cube(A,B,C)
             先三个字段组合分组, 然后两个字段组合分组,再一个字段分组,最后整张表为一组
             
             
在实际生产环境中的合理性:
           select  年  月   日   销售额   from ...group by  年  月  日
           union
           select  年  月   null 销售额   from ...group by  年  月
           union
           select  年 null null  销售额   from ...group by  年
           union
           select  null,null,null,销售额   from ...
针对于上述需求: 有一个高阶函数rollup与之对应。  
--rollup:    rollup(A,B,C)
             先三个字段组合分组,然后依次从后面开始少一个字段分组,
             即再前两个字段分组,然后前一个字段分组,然后整张表为一组
packagecom.qf.sparksql.day03importorg.apache.spark.sql.SparkSession

/**
 * 高阶分组函数的讲解:
 * 1.  cube  :    cube(A,B,C)  先三个字段组合分组, 然后两个字段组合分组,再一个字段分组,最后整张表为一组
 * 2.  rollup:    rollup(A,B,C) 先三个字段组合分组,然后依次从后面开始少一个字段分组,
 *                              即再前两个字段分组,然后前一个字段分组,然后整张表为一组
 */object Spark_01_Transfer_4 {def main(args: Array[String]):Unit={val spark = SparkSession.builder().master("local[*]").appName("高阶分组函数").getOrCreate()importspark.implicits._

        val df = spark.read.json("data/emp.json")
        df.show()

        println("----------------cube(A,B)-----------------------")

        df.cube("deptno","job").sum("sal").show()
        df.groupBy("deptno","job").sum("sal").show()

        println("----------------rollup(A,B)-----------------------")
        df.rollup("deptno","job").sum("sal").show()

        spark.stop()}}

三、sparksql的加载与落地

3.1 SparkSql加载文件

packagecom.qf.sparksql.day03importorg.apache.spark.sql.{DataFrame, SparkSession}object Spark_02_ReadFile {def main(args: Array[String]):Unit={val spark = SparkSession.builder().master("local[*]").appName("readFile").getOrCreate()/**
         * SparkSession想要加载文件时,需要获取DataFrameReader对象,然后调用该对象的load方法加载文件。
         *
         *  load方法,默认加载的是parquet类型的文件。
         */var df = spark.read.load("data/users.parquet")
        df.show()/**
         * 如果想要读取别的类型的数据,需要使用format方法来指定格式
         *///csv格式的文件,默认会使用逗号作为列分隔符进行解析。val df1: DataFrame = spark.read.format("csv").load("data/country.csv")
        df1.toDF("id","name","code").show()val df2: DataFrame = spark.read.format("json").load("data/emp.json")
        df2.show()val df3: DataFrame = spark.read.format("parquet").load("data/sqldf.parquet")
        df3.show()val df4: DataFrame = spark.read.format("orc").load("data/student.orc")
        df4.show()//注意: 读取Text文件时,SparkSql只会将一行的内容解析成一列。val df5: DataFrame = spark.read.format("text").load("data/dailykey.txt")
        df5.show()/**
         * 正常情况下,不能读取avro类型的文件,需要导入其他第三方的jar包才可以。
         *///val df6: DataFrame = spark.read.format("avro").load("data/users.avro")/**
         * option方法的一个应用, 用于指定分隔符
         *
         * csv格式的文件,默认会使用逗号作为列分隔符进行解析。可以使用option("sep","自定义分隔符")来指定
         *    sep:是英文单词separator的简写
         *
         * option方法的另一个应用, 将文件的第一行设置成表头: option("header","true")
         */val df6: DataFrame = spark.read.option("header","true").option("sep","|").format("csv").load("data/location-info.csv")
        df6.show()

        spark.stop()}}

3.2 SparkSql保存文件

packagecom.qf.sparksql.day03importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, Row, SparkSession}object Spark_03_SaveFile {def main(args: Array[String]):Unit={val spark = SparkSession.builder().master("local[*]").appName("readFile").getOrCreate()importspark.implicits._

        val frame: DataFrame = spark.read.json("data/emp.json")/**
         * 将DF保存成文件时,应该调用DataFrameWriter的save方法进行保存,可以使用format来指定要保存的数据源格式
         *///保存成csv格式时,如果是null,则默认保存成空字符串
        frame.write.format("csv").save("out/csv")//保存成json格式时,如果是null,则不存储键值对
        frame.write.format("json").save("out/json")
        frame.write.format("orc").save("out/orc")
        frame.write.format("parquet").save("out/parquet")/**
         * 注意: 将df保存成text格式时,只能将df的多列转成一列进行存储,否则报错
         */val rdd: RDD[Row]= frame.rdd
        val rdd2: RDD[String]= rdd.map(row =>{
            row.get(0)+","+ row.get(1)+","+ row.get(2)+","+ row.get(3)+","+ row.get(4)+","+ row.get(5)+","+ row.get(6)+","+ row.get(7)})//将新的RDD2转成DF再进行存储。
        rdd2.toDF().write.format("text").save("out/text")//正常情况下Spark2.0版本,不支持保存成avro格式//frame.write.format("avro").save("out/avro")

        println("-------------SparkSql的存储方式的简化方式-----------------")

        frame.write.json("out1/json")
        frame.write.option("sep","|").csv("out1/csv")
        frame.write.orc("out1/orc")//frame.write.text("out1/text")
        frame.write.parquet("out1/parquet")

        spark.stop()}}

3.3 Spark连接MySql

<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.21</version></dependency>

mysql里的数据准备

DROP TABLE IF EXISTS `emp`;
CREATE TABLE `emp` (
`EMPNO` int(4) NOT NULL,
`ENAME` varchar(10),
`JOB` varchar(9),
`MGR` int(4),
`HIREDATE` date,
`SAL` int(7),
`COMM` int(7),
`DEPTNO` int(2),
PRIMARY KEY (`EMPNO`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;

INSERT INTO `emp` VALUES ('7369','SMITH','CLERK','7902','1980-12-17','800',null,'20');
INSERT INTO `emp` VALUES ('7499','ALLEN','SALESMAN','7698','1981-02-20','1600','300','30');
INSERT INTO `emp` VALUES ('7521','WARD','SALESMAN','7698','1981-02-22','1250','500','30');
INSERT INTO `emp` VALUES ('7566','JONES','MANAGER','7839','1981-04-02','2975',null,'20');
INSERT INTO `emp` VALUES ('7654','MARTIN','SALESMAN','7698','1981-09-28','1250','1400','30');
INSERT INTO `emp` VALUES ('7698','BLAKE','MANAGER','7839','1981-05-01','2850',null,'30');
INSERT INTO `emp` VALUES ('7782','CLARK','MANAGER','7839','1981-06-09','2450',null,'10');
INSERT INTO `emp` VALUES ('7788','SCOTT','ANALYST','7566','1987-04-19','3000',null,'20');
INSERT INTO `emp` VALUES ('7839','KING','PRESIDENT',null,'1981-11-17','5000',null,'10');
INSERT INTO `emp` VALUES ('7844','TURNER','SALESMAN','7698','1981-09-08','1500','0','30');
INSERT INTO `emp` VALUES ('7876','ADAMS','CLERK','7788','1987-05-23','1100',null,'20');
INSERT INTO `emp` VALUES ('7900','JAMES','CLERK','7698','1981-12-03','950',null,'30');
INSERT INTO `emp` VALUES ('7902','FORD','ANALYST','7566','1981-12-03','3000',null,'20');
INSERT INTO `emp` VALUES ('7934','MILLER','CLERK','7782','1982-01-23','1300',null,'10');
packagecom.qf.sparksql.day03importorg.apache.spark.rdd.RDD
importorg.apache.spark.sql.{DataFrame, Row, SparkSession}importjava.util.Properties

/**
 * 使用SparkSql连接Mysql
 */object Spark_04_Jdbc_Mysql {def main(args: Array[String]):Unit={
        write()}/**
     * 将DF对应的数据保存到mysql中,  小贴士: sparksql向mysql里保存数据时,不需要提前建表
     */def write():Unit={val spark = SparkSession.builder().master("local[*]").appName("readFile").getOrCreate()val frame: DataFrame = spark.read.option("sep","|").option("header","true").csv("data/student.csv")val url ="jdbc:mysql://localhost:3306/sz2103?serverTimezone=UTC"val table ="student"val prop =new Properties()
        prop.setProperty("user","root")
        prop.put("password","mmforu")//保存的时候,使用DataFrame对象调用
        frame.write.jdbc(url,table,prop)

        spark.stop()}/**
     * 读取Mysql里的表操作
     */def read():Unit={val spark = SparkSession.builder().master("local[*]").appName("readFile").getOrCreate()/**
         * jdbc(url: String, table: String, properties: Properties)
         * 解析: 作用是连接mysql,读取mysql里的表数据,返回DataFrame
         *     url: 连接mysql的路径:    8.0版本:jdbc:mysql://ip:port/dbname?serverTimezone=UTC
         *                              低版本:jdbc:mysql://ip:port/dbname
         *     table: 数据库里的表名
         *     properties: 用于指定连接mysql的用户名,密码等一个Properties对象
         */val url ="jdbc:mysql://localhost:3306/sz2103?serverTimezone=UTC"val table ="emp"val prop =new Properties()
        prop.setProperty("user","root")
        prop.put("password","mmforu")// 注意:读取的时候,使用会话对象调用val df: DataFrame = spark.read.jdbc(url, table, prop)
        df.createTempView("employee")val sqlText ="""
              |select
              |   deptno,max(sal) maxSal,avg(nvl(sal,0)) avgSal,count(1) totalPeople
              |from employee
              |where deptno = (
              |       select deptno from employee where empno = 7369
              |    )
              |group by deptno
              |""".stripMargin

        spark.sql(sqlText).show()

        spark.stop()}}

保存模式的研究:

packagecom.qf.sparksql.day03importorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession}importjava.util.Properties

/**
 * 使用SparkSql连接Mysql
 */object Spark_05_SaveMode {def main(args: Array[String]):Unit={val spark = SparkSession.builder().master("local[*]").appName("saveMode").getOrCreate()val frame: DataFrame = spark.read.option("sep","|").option("header","true").csv("data/student.csv")val url ="jdbc:mysql://localhost:3306/sz2103?serverTimezone=UTC"val table ="student"val prop =new Properties()
        prop.setProperty("user","root")
        prop.put("password","mmforu")/**
         * 在保存数据时,可以指定保存模式
         * SaveMode.Append              :追加模式,  在原有数据的基础上追加数据,要考虑主键的问题,如果有主键约束,可能报错
         * SaveMode.Overwrite           :覆盖模式,   删除原有的数据,再添加数据
         * SaveMode.ErrorIfExists       :如果存在该表,就报错     默认模式
         * SaveMode.Ignore              :忽略模式,  不添加数据
         */
        frame.write.mode(SaveMode.Ignore).jdbc(url,table,prop)

        spark.stop()}}

3.4 SparkSQL连接Hive

3.4.1 说明

SparkSQL连接hive,指的是spark读hive中的表,转成DF或者是DS,然后使用相关算子计算。 和hive的执行引擎是spark,不是同一个概念

3.4.2 spark命令行连接hive

可以使用spark-sql命令脚本连接hive。 默认连接的是本地的spark环境,这种情况在企业中根本不用

1. 将hive的hive-site.xml拷贝到spark的conf目录下
2. 将mysql的驱动包,拷贝到spark的jars目录下
3. 测试1:使用spark-sql脚本
   ../spark-local/bin/spark-sql
         就可以正常写sql语句了
   spark-sql (default)> select deptno,max(sal) from emp group by deptno;
   
   测试2: 使用beeline工具
         (1) 先启动服务项,  ../spark-local/sbin/start-thriftserver.sh
         (2) 使用beeline指令连接:
                             ../spark-local/bin/beeline -u jdbc:hive2://localhost:10000 -n root

3.4.3 代码连接hive

1)读取hive中的表

packagecom.qf.sparksql.day03importorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession}importjava.util.Properties

/**
 * 使用SparkSql连接hive
 *
 * 正常情况下,要配置连接hive的一系列参数,不过,可以将hive-site.xml拷贝到程序中,注意是否识别的问题,
 * 需要加载mysql的驱动包到idea的classpath下,即在pom.xml里添加mysql的坐标
 *
 * 识别的解决方式:
 *     1.  可以重启idea
 *     2.  可以将hive-site.xml文件拷贝到classes目录下
 */object Spark_06_readFromHive {def main(args: Array[String]):Unit={val spark = SparkSession.builder().master("local[*]").appName("saveMode").enableHiveSupport()//开启hive支持,.getOrCreate()val df: DataFrame = spark.table("sz2103.emp")
        df.createTempView("emp")val sqlText ="""
              |select deptno,
              |    max(sal),
              |    min(sal),
              |    count(1)
              |from emp
              |     group by deptno
              |""".stripMargin
        spark.sql(sqlText).show()

        spark.stop()}}

2)向hive中写数据

packagecom.qf.sparksql.day03importorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/**
 * 使用SparkSql连接hive
 *
 * 正常情况下,要配置连接hive的一系列参数,不过,可以将hive-site.xml拷贝到程序中,注意是否识别的问题,
 * 需要加载mysql的驱动包到idea的classpath下,即在pom.xml里添加mysql的坐标
 *
 * 识别的解决方式:
 *     1.  可以重启idea
 *     2.  可以将hive-site.xml文件拷贝到classes目录下
 */object Spark_07_WriteToHive {def main(args: Array[String]):Unit={
        System.setProperty("HADOOP_USER_NAME","root")val spark = SparkSession.builder().master("local[*]").appName("saveMode").enableHiveSupport()//开启hive支持,.getOrCreate()val df: DataFrame = spark.read.option("sep","|").option("header","true").csv("data/student.csv")/**
         * 将df对应的数据写到hive中
         *
         * 注意:
         *    SaveMode.ErrorIfExists    默认模式
         *    SaveMode.Append   追加模式
         *    SaveMode.Overwrite  覆盖模式
         *    SaveMode.Ignore     忽略模式
         */
        df.write.mode(SaveMode.Ignore).saveAsTable("sz2103.student")

        spark.stop()

        readTest

    }def readTest():Unit={val spark = SparkSession.builder().master("local[*]").appName("saveMode").enableHiveSupport()//开启hive支持,.getOrCreate()val df: DataFrame = spark.table("sz2103.student")
        println(df.count())

        spark.stop()}}

四、SparkSQL的自定义函数

4.1 函数的分类情况

1)从功能上进行分类

1) 数值函数
    round(x,[d]):    对x保留d位小数,同时会四舍五入
    floor(x):        获取不大于x的最大整数。
    ceil(x):            获取不小于x的最小整数。
    rand():            获取0到1之间的随机数
    abs(x):            取绝对值
    pow(a, b):        获取a的b次幂
    sin(x):
    cos(x):
    tan():
2) 字符串函数
    length(str):                返回字符串str的长度
    instr(str, substr):        作用等同于str.indexOf(substr), 
    substr(str, pos[, len])|substring(str, pos[, len]):
        从str的pos位置开始,截取子字符串,截取len的长度,如果不传len,截取余下所有。
     substring(str,delim,count):返回的字符串是从头到第count个delim为止。不包括这个delim
     concat(str1, str2,......):        拼接字符串
     concat_ws(separator, str1, str2):使用指定分隔符来拼接字符串
3) 日期函数:
    unix_timestamp|to_unix_timestamp:  
    转时间戳(单位是秒)      注意:指定的是北京时间,但是得到的是本初子午线的时间
    
        spark.sql("select to_unix_timestamp('1970-1-1 0:0:0','yyyy-MM-dd HH:mm:ss')").show
    
    from_unixtime:  时间戳转日期   注意:默认指定的是本初子午线的时间,获取的是东八区的时间
                     所以,如果想要指定北京时间,则需要-28800
        spark.sql("select from_unixtime(0,'yyyy-MM-dd HH:mm:ss')").showcurrent_date(),获取当前的日期,日期格式为标准格式:yyyy-MM-dd
    current_timestamp():获取当前日期的时间戳,格式:yyyy-MM-dd HH:mm:ss.SSS
    add_months(start_date, num_months):返回start_date之后num_months月的日期
        spark.sql("select add_months('2021-10-10',-2)").show
    
    date_add(start_date, num_days):返回start_date之后num_days天的日期
    date_sub(start_date, num_days):返回start_date之前num_days天的日期
        spark.sql("select date_add('2021-10-27',-10)").show
    
    next_day(start_date, day_of_week),返回下一个周几,  第二个参数:需要写星期的简写
        spark.sql("select next_day('2022-2-9','Mon')").show
    
    dayofmonth(date) 返回date对应月份中的第几天
    weekofyear(date) 返回date对应年份中的第几周
    日期分量函数:获取日期中的某一个部分值
    secondminutehourdaymonthyear 
    spark.sql("select second(current_timestamp())").show
    
    date_format(日期,格式化),返回指定格式化时间
    datediff(date1, date2),返回date1和date2之间的差值(单位是天)
    to_date(datetime)返回datetime中的日期部分
    
4) 统计函数:
    max()min()avg()sum()count()index(arr, n),就是arr(n),   获取索引n对应的元素
5) 条件函数:
    if(expression,value1,value2)
        spark.sql("select if(2<1,'ok','no')").showcasewhen.....then....when.....then.....else....endcasewhen相当于多个条件的If分支
         when后写条件, then后写值
         如果都没有满足条件,则走else
      
    比如: 
       casewhen job='MANAGER'then sal+1000when job='SALESMAN'then sal+100else sal+10endcase job when'MANAGER'then sal+1000when'SALESMAN'then sal+100else sal+10end6) 特殊函数:
     array:返回数组
    collect_set:返回一个元素不重复的set集合
    collect_list:返回一个元素可重复的list集合
    split(str, regex):使用regex分隔符将str进行切割,返回一个字符串数组
    explode(array):将一个数组,转化为多行
    cast(type1 as type2):将数据类型type1的数据转化为数据类型type2

2)从定义的角度进行分类

一种是框架中内置函数, 上述函数都是内置的

另外一种就是用户自定义的函数,而用户自定义函数在sparksql中可以分为两类,
    --udf  :用户自定义函数, 通常指的是一对一形式,进入一条记录,出来一条记录
   --udaf  用户自定义聚合函数, 通常指的是多对一形式,进入多条记录,出来一条记录,比如模拟max

4.2 自定义函数案例演示

4.2.1 UDF

案例1)

packagecom.qf.sparksql.day03importorg.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/**
 * sparksql定义udf函数
 */object Spark_08_UDF_1 {def main(args: Array[String]):Unit={val spark = SparkSession.builder().master("local[*]").appName("saveMode").getOrCreate()val df: DataFrame = spark.read.json("data/emp.json")//需求:使用内置的函数length 统计人名长度大于4的员工信息
        df.createTempView("emp")
        spark.sql("select ename,length(ename) from emp where length(ename)>4 ").show()/**
         *  需求: 使用自定义udf函数来完成上述需求
         *
         *  步骤1:  定义一个方法
         *  步骤2:  注册   register(name:String , func: Function)
         *               name: 要注册的函数的昵称
         *               func: 表示函数的逻辑体
         */def f1(content:String):Long={
            content.length
        }
        spark.udf.register("mylen",f1 _)

        spark.sql("select ename,mylen(ename) from emp where mylen(ename)>4 ").show()
        spark.stop()}}

案例2)

packagecom.qf.sparksql.day03importorg.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}importorg.apache.spark.sql.types.{DataType, DoubleType, LongType, StructField, StructType}importorg.apache.spark.sql.{DataFrame, Row, SparkSession}/**
 * sparksql定义udaf函数
 */object Spark_09_UDAF_1 {def main(args: Array[String]):Unit={val spark = SparkSession.builder().master("local[*]").appName("saveMode").getOrCreate()val df: DataFrame = spark.read.json("data/emp.json")//需求:查询每个部门的人数,工资之和,平均工资
        df.createTempView("emp")
        spark.sql("select deptno,count(1),sum(sal),avg(nvl(sal,0)) from emp group by deptno").show()//需求:使用用户自定义分析函数来完成上述的求平均值的需求。/**
         * 步骤1: 先自定义一个类型,实现UserDefinedAggregateFunction
         * 步骤2: 注册
          */
        spark.udf.register("myavg",new MyUDAF)
        spark.sql("select deptno,count(1),sum(sal),myavg(nvl(sal,0)) from emp group by deptno").show()
        spark.stop()}/**
     * UDAF类型,需求继承抽象的UserDefinedAggregateFunction,重写里面的方法
     *
     * 求平均值的原理:    使用一个变量sum来累加所有的值,再使用另一个变量count来累加个数。  平均值: sum/count
     */class MyUDAF extends UserDefinedAggregateFunction{// 用于指定进入函数的数据的类型overridedef inputSchema: StructType = StructType(Array(
            StructField("sal",DoubleType)))// 用于设计计算过程中所需要的数据信息,  而我们的这个需求需要两个变量,一个sum,一个countoverridedef bufferSchema: StructType = StructType(Array(
            StructField("sum",DoubleType),
            StructField("count",LongType)))//函数的返回数据类型overridedef dataType: DataType = DoubleType

        //函数是否稳定overridedef deterministic:Boolean=true//初始化, 初始化需要两个变量的值,这两个变量是一个数组的两个元素,第一个元素充当sum,第二个元素充当countoverridedef initialize(buffer: MutableAggregationBuffer):Unit={
            buffer(0)=0D// sum的初始化
            buffer(1)=0L// count的初始化}//分区内的计算过程中的更新, buffer就是临时存储的数组,input是进入的每一条记录overridedef update(buffer: MutableAggregationBuffer, input: Row):Unit={
            buffer.update(0,buffer.getDouble(0)+input.getDouble(0))//累加和,然后设置回去
            buffer.update(1,buffer.getLong(1)+1)//累加个数}//分区间的合并overridedef merge(buffer1: MutableAggregationBuffer, buffer2: Row):Unit={
            buffer1.update(0,buffer1.getDouble(0)+buffer2.getDouble(0))//累加和,然后设置回去
            buffer1.update(1,buffer1.getLong(1)+buffer2.getLong(1))//累加个数}// 最终的计算overridedef evaluate(buffer: Row):Any={
            buffer.getDouble(0)/buffer.getLong(1)}}}

4.2.2 UDAF

packagecom.qf.sparksql.day03importorg.apache.spark.sql.{DataFrame, SparkSession}/**
 * sparksql定义udf函数
 */object Spark_09_UDF_2 {def main(args: Array[String]):Unit={val spark = SparkSession.builder().master("local[*]").appName("saveMode").getOrCreate()val df: DataFrame = spark.read.json("data/emp.json")//需求:显示每个员工的工资级别。   >4000的为 level4  >2500的为leve3  >1000的为level2  其他的为level1
        df.createTempView("emp")val sqlText ="""
              | select empno,ename,sal,
              | (case when sal>4000 then 'level4'
              |       when sal>2500 then 'level3'
              |       when sal>1000 then 'level2'
              |       else 'level1' end) level
              | from emp
              |""".stripMargin
        spark.sql(sqlText).show()//需求:显示每个员工的工资级别。   >4000的为 level4  >2500的为leve3  >1000的为level2  其他的为level1val sqlText2 ="""
              | select
              | (case when sal>4000 then 'level4'
              |       when sal>2500 then 'level3'
              |       when sal>1000 then 'level2'
              |       else 'level1' end) level, count(1)
              | from emp
              | group by (case when sal>4000 then 'level4'
              |       when sal>2500 then 'level3'
              |       when sal>1000 then 'level2'
              |       else 'level1' end)
              |""".stripMargin
        spark.sql(sqlText2).show()//需求: 使用udf函数来完成上述需求def myfunc(sal:Double):String={if(sal>4000)"level4"elseif(sal>2500)"level3"elseif(sal>1000)"level2"else"level1"}
        spark.udf.register("mylevel",myfunc _)val sqlText3 ="""
              | select mylevel(sal), count(1)
              | from emp
              | group by mylevel(sal)
              |""".stripMargin
        spark.sql(sqlText3).show()
        spark.stop()}}

五、SparkSQL和Spark的优化

5.1 SparkSql的优化

5.1.1 内存参数

spark.sql.inMemoryColumnarStorage.compressedtrue如果假如设置为true,SparkSql会根据统计信息自动的为每个列选择压缩方式进行压缩。spark.sql.inMemoryColumnarStorage.batchSize10000控制列缓存的批量大小。批次大有助于改善内存使用和压缩,但是缓存数据会有OOM的风险。

5.1.2 其他调优参数

Property NameDefaultMeaningspark.sql.files.maxPartitionBytes134217728 (128 MB)获取数据到分区中的最大字节数。spark.sql.files.openCostInBytes4194304 (4 MB)该参数默认4M,表示小于4M的小文件会合并到一个分区中,用于减小小文件,防止太多单个小文件占一个分区情况。spark.sql.broadcastTimeout300广播等待超时时间,单位秒。spark.sql.autoBroadcastJoinThreshold10485760 (10 MB)最大广播表的大小。设置为-1可以禁止该功能。当前统计信息仅支持Hive Metastore表。spark.sql.shuffle.partitions200设置shuffle分区数,默认200。

5.2 Spark的优化机制

5.2.1 spark的优化基础篇

参考文章:https://tech.meituan.com/2016/04/29/spark-tuning-basic.html

开发原则:
原则一:避免创建重复的RDD       
原则二:尽可能复用同一个RDD       原则1和原则2 都要由原则3来配合使用
原则三:对多次使用的RDD进行持久化
                        持久化的级别,要针对于具体应用场景来选择
原则四:尽量避免使用shuffle类算子
原则五:使用map-side预聚合的shuffle操作
原则六:使用高性能的算子
原则七:广播大变量
原则八:使用Kryo优化序列化性能
原则九:优化数据结构

资源参数调优
num-executors
executor-memory
executor-cores
driver-memory
spark.default.parallelism
spark.storage.memoryFraction
spark.shuffle.memoryFraction

案例演示:
./bin/spark-submit \
  --master yarn-cluster \
  --num-executors 100 \
  --executor-memory 6G \
  --executor-cores 4 \
  --driver-memory 1G \
  --conf spark.default.parallelism=1000 \
  --conf spark.storage.memoryFraction=0.5 \
  --conf spark.shuffle.memoryFraction=0.3 \

5.2.2 spark的优化高阶篇

主要就是处理数据倾斜问题

参考文章:https://tech.meituan.com/2016/05/12/spark-tuning-pro.html

解决方案一:使用Hive ETL预处理数据
解决方案二:过滤少数导致倾斜的key
解决方案三:提高shuffle操作的并行度
解决方案四:两阶段聚合(局部聚合+全局聚合)
解决方案五:将reduce join转为map join
解决方案六:采样倾斜key并分拆join操作
解决方案七:使用随机前缀和扩容RDD进行join
解决方案八:多种方案组合使用

实际生产环境中,要具体情况具体分析,采用哪一种,或哪几种方案的组合

5.3 局部聚合+全局聚合解决数据倾斜问题

packagecom.qf.sparksql.day03importorg.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}importorg.apache.spark.sql.types._
importorg.apache.spark.sql.{DataFrame, Row, SparkSession}/**
 * 使用局部聚合+全局聚合來解決数据倾斜的问题
 */object Spark_10_DataSkew {def main(args: Array[String]):Unit={val spark = SparkSession.builder().master("local[*]").appName("data skew").getOrCreate()importspark.implicits._

        var rdd1 = spark.sparkContext.makeRDD(List("a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a,a","b,c,d,e,f","b,b,c,c,d,e,f,g","a,b,a,c,f"))val df: DataFrame = rdd1.toDF("line")
        df.createTempView("temp")

        println("----------------先正常统计每个字符的个数, 发现a特别多,那么下游的RDD对应的Task处理所有的a的时候,时间就会很长,出现数据倾斜-----------------------")val sql ="""
              |select word,count(1)
              |from
              |(
              |select explode(split(line,",")) word
              |from temp) t1
              |group by t1.word
              |
              |""".stripMargin
        spark.sql(sql).show()/**
         * 上述的查询,可能会发生数据倾斜,因为a特别多
         */

        println("----------------先在单词前面拼接随机数字,比如0,1,2,3-----------------------")val sql1 ="""
              |select concat(floor(rand()*4),"-",word)
              |from
              |(
              | select explode(split(line,",")) word
              |from temp) t1
              |
              |""".stripMargin
        spark.sql(sql1).show()

        println("----------------將加上前缀的单词,进行预聚合   统计的时候:可能会出现 3-a:6 2-a:8  1-a:2 0-a:4  此时跟a有关的就4条数据了,远远小于20条  -----------------------")val sql2 ="""
              |
              |select prefix_word,count(1)
              |from(
              |select concat(floor(rand()*4),"-",word) prefix_word
              |from
              |(
              | select explode(split(line,",")) word
              |from temp) t1
              |) t2
              |group by prefix_word
              |
              |""".stripMargin
        spark.sql(sql2).show()

        println("----------------去掉前缀,进行全局聚合-----------------------")val sql3 ="""
              |
              |select substr(prefix_word,instr(prefix_word,"-")+1) w,sum(num)
              |from
              |     (select prefix_word,count(1) num
              |     from(
              |         select concat(floor(rand()*4),"-",word) prefix_word
              |         from(
              |             select explode(split(line,",")) word
              |             from temp) t1
              |         ) t2
              |     group by prefix_word
              |     ) t3
              |group by  w
              |""".stripMargin
        spark.sql(sql3).show()
        spark.stop()}}
标签: hive big data spark

本文转载自: https://blog.csdn.net/weixin_45682261/article/details/125143933
版权归原作者 南潇如梦 所有, 如有侵权,请联系我们删除。

“SparkSQL知识点总结”的评论:

还没有评论