0


Spark--Spark SQL结构化数据文件处理知识总结(第五章)

一、Spark SQL的概述

1.1 Spark SQL的简介

Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象结构叫做DataFrame的数据模型(即带有Schema信息的RDD),Spark SQL作为分布式SQL查询引擎,让用户可以通过SQL、DataFrames API和Datasets API三种方式实现对结构化数据的处理。

Spark SQL主要提供了以下三个功能:

  1. Spark SQL可从各种结构化数据源中读取数据,进行数据分析。
  2. Spark SQL包含行业标准的JDBC和ODBC连接方式,因此它不局限于在Spark程序内使用SQL语句进行查询。
  3. Spark SQL可以无缝地将SQL查询与Spark程序进行结合,它能够将结构化数据作为Spark中的分布式数据集(RDD)进行查询。

Hive 是将 SQL 转为 MapReduce。

SparkSQL 可以理解成是将 SQL 解析成:“RDD + 优化” 再执行

在学习Spark SQL前,需要了解数据分类
定义特点举例结构化数据有固定的 Schema有预定义的 Schema关系型数据库的表半结构化数据没有固定的 Schema,但是有结构没有固定的 Schema,有结构信息,数据一般是自描述的指一些有结构的文件格式,例如 JSON非结构化数据没有固定 Schema,也没有结构没有固定 Schema,也没有结构指图片/音频之类的格式
总结:

  • RDD 主要用于处理非结构化数据 、半结构化数据、结构化;
  • SparkSQL 是一个既支持 SQL 又支持命令式数据处理的工具;
  • SparkSQL 主要用于处理结构化数据(较为规范的半结构化数据也可以处理)

1.2 Spark SQL架构

Spark SQL架构与Hive架构相比,把底层的MapReduce执行引擎更改为Spark,还修改了Catalyst优化器,Spark SQL快速的计算效率得益于Catalyst优化器。从HiveQL被解析成语法抽象树起,执行计划生成和优化的工作全部交给Spark SQL的Catalyst优化器进行负责和管理。

Spark要想很好地支持SQL,需要完成解析(Parser)、优化(Optimizer)、执行(Execution)三大过程。

Catalyst优化器在执行计划生成和优化的工作时,离不开内部的五大组件。

  1. SqlParse:完成SQL语法解析功能,目前只提供了一个简单的SQL解析器。
  2. Analyze:主要完成绑定工作,将不同来源的Unresolved LogicalPlan和元数据进行绑定,生成Resolved LogicalPlan。
  3. Optimizer:对Resolved Lo;gicalPlan进行优化,生成OptimizedLogicalPlan。
  4. Planner:将LogicalPlan转换成PhysicalPlan。
  5. CostModel:主要根据过去的性能统计数据,选择最佳的物理执行计划

二、DataFrame概述

2.1DataFrame简介

  • Spark SQL使用的数据抽象并非是RDD,而是DataFrame。
  • 在Spark 1.3.0版本之前,DataFrame被称为SchemaRDD。
  • DataFrame使Spark具备处理大规模结构化数据的能力。
  • 在Spark中,DataFrame是一种以RDD为基础的分布式数据集。
  • DataFrame的结构类似传统数据库的二维表格,可以从很多数据源中创建,如结构化文件、外部数据库、Hive表等数据源。

DataFrame可以看作是分布式的Row对象的集合,在二维表数据集的每一列都带有名称和类型,这就是Schema元信息,这使得Spark框架可获取更多数据结构信息,从而对在DataFrame背后的数据源以及作用于DataFrame之上数据变换进行针对性的优化,最终达到提升计算效率。

2.2 Spark SQL数据抽象

2.2.1 DataFrame 和 DataSet

Spark SQL数据抽象可以分为两类:

  1. DataFrame:DataFrame 是一种以 RDD 为基础的分布式数据集,类似于传统数据库的二维表格,带有 Schema 元信息(可以理解为数据库的列名和类型)。DataFrame = RDD + 泛型 + SQL 的操作 + 优化
  2. DataSet:DataSet是DataFrame的进一步发展,它比RDD保存了更多的描述信息,概念上等同于关系型数据库中的二维表,它保存了类型信息,是强类型的,提供了编译时类型检查。调用 Dataset 的方法先会生成逻辑计划,然后被 spark 的优化器进行优化,最终生成物理计划,然后提交到集群中运行!DataFrame = Dateset[Row]

2.3 DataFrame的创建

创建DataFrame的两种基本方式:

  1. 已存在的RDD调用toDF()方法转换得到DataFrame。
  2. 通过Spark读取数据源直接创建DataFrame。

若使用SparkSession方式创建DataFrame,可以使用spark.read从不同类型的文件中加载数据创建DataFrame。spark.read的具体操作,在创建Dataframe之前,为了支持RDD转换成Dataframe及后续的SQL操作,需要导入import.spark.implicits._包启用隐式转换。若使用SparkSession方式创建Dataframe,可以使用spark.read操作,从不同类型的文件中加载数据创建DataFrame。

2.3.1 数据准备

在HDFS文件系统中的/spark目录中有一个person.txt文件,内容如下:

2.3.2通过文件直接创建DataFrame

我们通过Spark读取数据源的方式进行创建DataFrame

2.3.3 RDD直接转换为DataFrame

2.4 DataFrame的常用操作

Dataframe提供了两种谮法风格,即DSL风格语法和SQL风格语法,二者在功能上并无区别,仅仅是根据用户习惯自定义选择操作方式。接下来,我们通过两种语法风格,分讲解Dstaframe操作的具体方法:

2.4.1 DSL风格操作

DataFrame提供了一个领域特定语言(DSL)以方便操作结构化数据,下面将针对DSL操作风格,讲解DataFrame 常用操作示例:

  1. show():查看DataFrame中的具体内容信息
  2. pritSchema0:查看0staFrame的Schema信息
  3. select():查看DataFmame中造取部分列的数据

DSL风格示例:

personDF.select(personDF.col("name")).show
personDF.select(personDF("name")).show
personDF.select(col("name")).show
personDF.select("name").show

2.4.2 SQL风格操作DataFrame

scala> ffDF.registerTempTable("t_ff")
scala> spark.sql("select * from t_ff order by age desc limit 2").show()

SQL 风格示例:

spark.sql("select * from t_person").show

总结:

  • DataFrame 和 DataSet 都可以通过RDD来进行创建;
  • 也可以通过读取普通文本创建–注意:直接读取没有完整的约束,需要通过 RDD+Schema;
  • 通过 josn/parquet 会有完整的约束;
  • 不管是 DataFrame 还是 DataSet 都可以注册成表,之后就可以使用 SQL 进行查询了! 也可以使用 DSL

三、RDD、DataFrame及Dataset的区别

RDD数据的表现形式,如下图。此时RDD数据没有数据类型和元数据信息。

DataFrame数据的表现形式,如下图。此时DataFrame数据中添加Schema元数据信息(列名和数据类型,如ID:String),DataFrame每行类型固定为Row类型,每列的值无法直接访问,只有通过解析才能获取各个字段的值。

Dataset数据的表现形式,序号(3)和(4),其中序号(3)是在RDD每行数据的基础之上,添加一个数据类型(value:String)作为Schema元数据信息。而序号(4)每行数据添加People强数据类型,在Dataset[Person]中里存放了3个字段和属性,Dataset每行数据类型可自定义,一旦定义后,就具有错误检查机制。

四、Dataset概述

4.1 Dataset对象的创建

4.1.1 通过SparkSession中的createDataset来创建Dataset

scala> val ffDs=spark.createDataset(sc.textFile("/spark/ff.txt")  
scala>ffDs.show()

4.1.2 DataFrame通过“as[ElementType]”方法转换得到Dataset

scala> spark. read. text ("/spark/ff.txt").as[String]
scala> spark. read. text ("/spark/ff.txt").as[String].toDF()    

五、RDD转换DataFrame

Spark官方提供了两种方法实现从RDD转换得到DataFrame。

  1. 第一种方法是利用反射机制来推断包含特定类型对象的Schema,这种方式适用于对已知数据结构的RDD转换
  2. 第二种方法通过编程接口构造一个Schema,并将其应用在已知的RDD数据中

5.1 反射机制推断Schema

Windows系统开发Scala代码,可使用本地环境测试(需要先准备本地数据文件)。我们可以很容易的分析出当前数据文件中字段的信息,但计算机无法直观感受字段的实际含义,因此需要通过反射机制来推断包含特定类型对象的Schema信息,实现将RDD转换成DataFrame。

5.1.1 创建Maven工程

依赖

效果

5.1.2 编程方式定义Schema

当Case类不能提前定义Schema时,就需要采用编程方式定义Schema信息,实现RDD转换DataFrame的功能。

六、Spark SQL 多数据源交互

读取文件

读取 json 文件:

spark.read.json("D:\\data\\output\\json").show()

读取 csv 文件:

spark.read.csv("D:\\data\\output\\csv").toDF("id","name","age").show()

读取 parquet 文件:

spark.read.parquet("D:\\data\\output\\parquet").show()

读取 mysql 表:

val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","root")
spark.read.jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop).show()

写入文件

写入 json 文件:

personDF.write.json("D:\\data\\output\\json")

写入 csv 文件:

personDF.write.csv("D:\\data\\output\\csv")

写入 parquet 文件:

personDF.write.parquet("D:\\data\\output\\parquet")

写入 mysql 表:

val prop = new Properties()
    prop.setProperty("user","root")
    prop.setProperty("password","root")
personDF.write.mode(SaveMode.Overwrite).jdbc(
"jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8","person",prop)

七、数据抽象

Spark Streaming 的基础抽象是 DStream(Discretized Stream,离散化数据流,连续不断的数据流),代表持续性的数据流和经过各种 Spark 算子操作后的结果数据流。

可以从以下多个角度深入理解 DStream:

7.1DStream 本质上就是一系列时间上连续的 RDD:

7.2 对 DStream 的数据的进行操作也是按照 RDD 为单位来进行的:

7.3 容错性

底层 RDD 之间存在依赖关系,DStream 直接也有依赖关系,RDD 具有容错性,那么 DStream 也具有容错性。

7.4 准实时性/近实时性

  • Spark Streaming 将流式计算分解成多个 Spark Job,对于每一时间段数据的处理都会经过 Spark DAG 图分解以及 Spark 的任务集的调度过程。
  • 对于目前版本的 Spark Streaming 而言,其最小的 Batch Size 的选取在 0.5~5 秒钟之间。

所以 Spark Streaming 能够满足流式准实时计算场景,对实时性要求非常高的如高频实时交易场景则不太适合。

总结: 简单来说 DStream 就是对 RDD 的封装,你对 DStream 进行操作,就是对 RDD 进行操作。对于 DataFrame/DataSet/DStream 来说本质上都可以理解成 RDD。

标签: spark sql 大数据

本文转载自: https://blog.csdn.net/2302_80385233/article/details/138805941
版权归原作者 番外剧. 所有, 如有侵权,请联系我们删除。

“Spark--Spark SQL结构化数据文件处理知识总结(第五章)”的评论:

还没有评论