0


Spark Sql

SparkSQL 是什么

SparkSQL 可以简化 RDD 的开发,提高开发效率,且执行效率非 常快,所以实际工作中,基本上采用的就是 SparkSQL。Spark SQL 为了简化 RDD 的开发, 提高开发效率,提供了 2 个编程抽象,类似 Spark Core 中的 RDD。DataFrame与DataSet

DataFrame

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格,也很像pyhton中的pandas,DataFrame与RDD的主要区别在于,前者带有schema元信息(更关注于每一列的属性),即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从 API 易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API 要更加友好,门槛更低。

上图直观地体现了DataFrame和RDD的区别。

左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。

DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待

DataFrame也是懒执行的,但性能上比RDD要高,主要原因:优化的执行计划,即查询计划通过Spark catalyst optimiser进行优化。比如下面一个例子:

为了说明查询优化,我们来看上图展示的人口数据分析的示例。图中构造了两个

DataFrame,将它们join之后又做了一次filter操作。如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。

DataSet

DataSet是分布式数据集合。DataSet是Spark 1.6中添加的一个新抽象,是DataFrame的一个扩展。它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。DataSet也可以使用功能性的转换(操作map,flatMap,filter

等等)。

➢ DataSet是DataFrame API的一个扩展,是SparkSQL最新的数据抽象

➢ 用户友好的API风格,既具有类型安全检查也具有DataFrame的查询优化特性;

➢ 用样例类来对DataSet中定义数据的结构信息,样例类中每个属性的名称直接映射到

DataSet中的字段名称;

➢ DataSet是强类型的。比如可以有DataSet[Car],DataSet[Person]。

➢ DataFrame是DataSet的特列,DataFrame=DataSet[Row] ,所以可以通过as方法将

DataFrame转换为DataSet。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息都用Row来表示。获取数据时需要指定顺序

RDD 、 DataFrame 、 DataSet 三者的关系

这三者是可以相互转化的

三者的共性

➢ RDD、DataFrame、DataSet全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利;
➢ 三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算;
➢ 三者有许多共同的函数,如filter,groupby等;
➢ 在对DataFrame和Dataset进行操作许多操作都需要这个包:import spark.implicits._(在创建好SparkSession对象后尽量直接导入)
➢ 三者都会根据 Spark 的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出
➢ 三者都有partition的概念
➢ DataFrame和DataSet均可使用模式匹配获取各个字段的值和类型

三者的区别

  1. RDD
    ➢ RDD一般和spark mllib同时使用
    ➢ RDD不支持sparksql操作
  2. DataFrame
    ➢ 与RDD和Dataset不同,DataFrame每一行的类型固定为Row,每一列的值没法直接访问,只有通过解析才能获取各个字段的值
    ➢ DataFrame与DataSet一般不与 spark mllib 同时使用

DataFrame与DataSet均支持 SparkSQL 的操作,比如select,groupby之类,还能注册临时表/视窗,进行 sql 语句操作
➢ DataFrame与DataSet支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然
3) DataSet
➢ Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。 DataFrame其实就是DataSet的一个特例 type DataFrame = Dataset[Row]
➢ DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

IDEA 开发 SparkSQL

//创建运行环境
val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("sparksql")
val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()
//执行
//DataFrame  是特定泛型的dataset
val df: DataFrame = spark.read.json("user.json")
  df.show()
//DataFrame>sql
df.createOrReplaceTempView("user")
spark.sql("select * from user").show()
//DataFrame>DSL
df.select("age","username").show()

这三种展示都是同样的结果

//若涉及转换操作,需要引入转换规则
import spark.implicits._
df.select($"age"+1).as("age").show()
df.select('age+2).as("age").show()

  //DataSet
  val seq: Seq[Int] = Seq(1, 2, 3, 4)
  val ds: Dataset[Int] = seq.toDS()
  ds.show()

  //RDD<=>DataFrame
  val rdd=spark.sparkContext.makeRDD(List((1,"zhangsan",30),(2,"lisi",40)))
  val df2: DataFrame = rdd.toDF("id", "name", "age")
  val rowRDD: RDD[Row] = df2.rdd
  //DataFrame<=>DataSet

  val ds1: Dataset[User] = df2.as[User]
  val df3: DataFrame = ds1.toDF()
  println("------df3---------")
  df3.show()

  //RDD<=>DataSet
  val ds3: Dataset[User] = rdd.map {
    case (id, name, age) => {
      User(id, name, age)
    }
  }.toDS()
  println("------ds3---------")
  ds3.show()
  ds3.rdd

  //关闭
  spark.close()
}
case class User(id:BigInt,name:String,age:BigInt)

用户自定义函数

UDF

import spark.implicits._
val df: DataFrame = spark.read.json("user.json")
df.createOrReplaceTempView("user")
//自定义函数
spark.udf.register("prefixName",(name:String)=>{
  "Name: "+name
})
spark.sql("select age,prefixName(username) from user").show()

UDAF

  /**
   * 自定义聚合函数类  计算年龄品均值
   */
  class MyAsUDAF extends UserDefinedAggregateFunction{
    //输入数据结构
    override def inputSchema: StructType = {
      StructType(
        Array(
          StructField("age",LongType)
        )
      )
    }
//缓冲区
    override def bufferSchema: StructType = {
      StructType(
        Array(
            StructField("total", LongType),
            StructField("count", LongType)
        )
      )
    }
    //输出结果的数据类型
    override def dataType: DataType = LongType
    override def deterministic: Boolean =true
    //缓冲区初始化
    override def initialize(buffer: MutableAggregationBuffer): Unit = {
      buffer(0)=0L
      buffer(1)=0L
    }

    override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
      buffer.update(0,buffer.getLong(0)+input.getLong(0))
      buffer.update(1,buffer.getLong(1)+1)

    }

    override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
      buffer1.update(0,buffer1.getLong(0)+ buffer2.getLong(0))
      buffer1.update(1,buffer1.getLong(1)+ buffer2.getLong(1))

    }
//计算平均值
    override def evaluate(buffer: Row): Any = {
      buffer.getLong(0)/buffer.getLong(1)

    }


本文转载自: https://blog.csdn.net/xuingyu/article/details/139132453
版权归原作者 凡人修炼大数据 所有, 如有侵权,请联系我们删除。

“Spark Sql”的评论:

还没有评论