Spark SQL介绍
Spark SQL是一个用于结构化数据处理的Spark组件。所谓结构化数据,是指具有Schema信息的数据,例如JSON、Parquet、Avro、CSV格式的数据。与基础的Spark RDD API不同,Spark SQL提供了对结构化数据的查询和计算接口。
Spark SQL的主要特点:
- 将SQL查询与Spark应用程序无缝组合
Spark SQL允许使用SQL或熟悉的API在Spark程序中查询结构化数据。与Hive不同的是,Hive是将SQL翻译成MapReduce作业,底层是基于MapReduce的;而Spark SQL底层使用的是Spark RDD。
- 可以连接到多种数据源
Spark SQL提供了访问各种数据源的通用方法,数据源包括Hive、Avro、Parquet、ORC、JSON、JDBC等。
- 在现有的数据仓库上运行SQL或HiveQL查询
Spark SQL支持HiveQL语法以及Hive SerDes和UDF (用户自定义函数) ,允许访问现有的Hive仓库。
DataFrame和DataSet
- DataFrame的结构
DataFrame是Spark SQL提供的一个编程抽象,与RDD类似,也是一个分布式的数据集合。但与RDD不同的是,DataFrame的数据都被组织到有名字的列中,就像关系型数据库中的表一样。
DataFrame在RDD的基础上添加了数据描述信息(Schema,即元信息) ,因此看起来更像是一张数据库表。例如,在一个RDD中有3行数据,将该RDD转成DataFrame后,其中的数据可能如图所示:
- DataSet的结构Dataset是一个分布式数据集,是Spark 1.6中添加的一个新的API。相比于RDD, Dataset提供了强类型支持,在RDD的每行数据加了类型约束。 在Spark中,一个DataFrame代表的是一个元素类型为Row的Dataset,即DataFrame只是Dataset[Row]的一个类型别名。
Spark SQL的基本使用
Spark Shell启动时除了默认创建一个名为sc的SparkContext的实例外,还创建了一个名为spark的SparkSession实例,该spark变量可以在Spark Shell中直接使用。
SparkSession只是在SparkContext基础上的封装,应用程序的入口仍然是SparkContext。SparkSession允许用户通过它调用DataFrame和Dataset相关API来编写Spark程序,支持从不同的数据源加载数据,并把数据转换成DataFrame,然后使用SQL语句来操作DataFrame数据。
Spark SQL基本使用案例
在HDFS中有一个文件/input/person.txt,文件内容如下:
现需要使用Spark SQL将该文件中的数据按照年龄降序排列,步骤如下:
进入spark-shell环境
- 加载数据为Dataset
val d1 = spark.read.textFile("hdfs://192.168.121.131:9000/input/person.txt")
d1.show()# 查看d1中的数据内容
从上述代码的结果可以看出,Dataset将文件中的每一行看作一个元素,并且所有元素组成了一列,列名默认为value。
- 给Dataset添加元数据信息 定义一个样例类Person,用于存放数据描述信息,代码如下:
case class Person(id:Int,name:String,age:Int)
注:Scala有一种特殊的类叫做样例类(case class)。默认情况下,样例类一般用于不可变对象(样例类构造参数默认声明为val)。
调用Dataset的map()算子将每一个元素拆分并存入Person类中,代码如下:
val personDataset = d1.map(line=>{
val fields= line.split(",")
val id =fields(0).toInt
val name =fields(1)
val age =fields(2).toInt
Person(id,name,age)
})
personDataset.show()# 查看personDataset中的数据内容
可以看到,personDataset中的数据类似于一张关系型数据库的表。
- 将Dataset转为DataFrame Spark SQL查询的是DataFrame中的数据,因此需要将存有元数据信息的Dataset转为DataFrame。
调用Dataset的toDF()方法,将存有元数据的Dataset转为DataFrame,代码如下:
val pdf = personDataset.toDF()
- 执行SQL查询 在DataFrame上创建一个临时视图v_person,并使用SparkSession对象执行SQL查询,代码如下:
pdf.createTempView("v_person")
val result = spark.sql("select * from v_person order by age desc")
result.show()
版权归原作者 Sunflower461 所有, 如有侵权,请联系我们删除。