0


RDD转换为DataFrame

** spark官方提供了两种方法实现从RDD转换到DataFrame。第一种方法是利用反射机制来推断包含特定类型对象的Schema,这种方式适用于对已知的数据结构的RDD转换;第二种方法通过编程接口构造一个 Schema ,并将其应用在已知的RDD数据中。**

(一)反射机制推断Schema

  1. Windows系统下开发Scala 代码,可以使用本地环境测试,因此首先需要在本地磁

盘准备文本数据文件,这里将HD FS中的/spark/person.txt文件下载到本地D:/spark person.txt路径下。从文件4-1可!以看出,当前数据文件共3列,可以非常容易地分析出这3列分别是编号、姓名、年龄。但是计算机无法像人一样直观地感受字段的实际含义,因此需要通过反射机制来推断包含特定类型对象的Schema信息。

接下来打开IDEA开发工具,创建名为 spark01 的Maven工程,讲解实现反射机制推断Schema的开发工具。

1、添加 Spark SQL 依赖,代码如下:

<dependency>

<groupId>org.apache.spark</groupId>

<artifactId>spark-sql_2.11</artifactId>

<version>2.3.2</version>

</dependency>

2、编写代码:

文件名:CaseClassSchema.scala

  1. package cn.itcast
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.{DataFrame, Row,SparkSession}
  5. //定义样例类
  6. case class Person(id:Int,name:String,age:Int)
  7. object CaseClassSchema {
  8. def main(args: Array[String]): Unit = {
  9. //构建SparkSession
  10. val spark : SparkSession = SparkSession.builder()
  11. .appName("CaseClassSchema")
  12. .master("local[2]")
  13. .getOrCreate()
  14. //获取SparkContext
  15. val sc : SparkContext = spark.sparkContext
  16. //设置日志打印级别
  17. sc.setLogLevel("WARN")
  18. //读取文件
  19. val data:RDD[Array[String]]=
  20. sc.textFile("D://spark//person.txt").map(x=>x.split(" "))
  21. //将RDD与样例类关联
  22. val personRdd : RDD[Person] = data.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
  23. //获取DataFrame
  24. //手动导入隐式转换
  25. import spark.implicits._
  26. val personDF : DataFrame = personRdd.toDF
  27. //------------DSL风格操作开始----------
  28. // 显示DataFrame的数据,默认显示20行
  29. personDF.show()
  30. //显示DataFrame的schema信息
  31. personDF.printSchema()
  32. //统计DataFrame中年龄大于30岁的人
  33. println(personDF.filter($"age">30).count())
  34. //-----------------DSL风格操作结束------------
  35. //----------------SQL风格操作开始-------------
  36. //将DataFrame注册成表
  37. personDF.createOrReplaceTempView("t_person")
  38. spark.sql("select * from t_person").show()
  39. spark.sql("select * from t_person where name='kuli'").show()
  40. //---------------------SQL风格操作结束--------------------
  41. //关闭资源操作
  42. sc.stop()
  43. spark.stop()
  44. }
  45. }

运行结果:

(二)编程方式定义Schema

  1. case类不能提前定义的时候,就需要采用编程方式定义Schema信息,定义DataFrame主要包含3个步骤,具体如下:

(1)创建一个Row对象结构的RDD;(2)基于StructType类型创建Schema;

(3)通过SparkSession提供的createDataFrame(()方法来拼接Schema。

根据上述步骤,创建 SparkSqlSchema.scala文件,使用编程方式定义Schema信息的具体代码如文件所示。

文件名:SparkSqlSchema.scala

  1. package cn.itcast
  2. import org.apache.spark.SparkContext
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.sql.{DataFrame, Row, SparkSession}
  5. import org.apache.spark.sql.types.{StructType, StructField, StringType,IntegerType}
  6. object SparkSqlSchema {
  7. def main(args: Array[String]): Unit = {
  8. // 创建SparkSeeion
  9. val spark : SparkSession = SparkSession.builder()
  10. .appName("SparkSqlSchema")
  11. .master("local[2]")
  12. .getOrCreate()
  13. // 获取sparkContext对象
  14. val sc : SparkContext = spark.sparkContext
  15. //设置日志打印级别
  16. sc.setLogLevel("WARN")
  17. //加载数据
  18. val dataRDD : RDD[String] = sc.textFile("D://spark//person.txt")
  19. // 切分每一行
  20. val dataArrayRDD : RDD[Array[String]] = dataRDD.map(_.split(" "))
  21. //加载数据到Row对象中
  22. val personRDD : RDD[Row] = dataArrayRDD.map(x=>Row(x(0).toInt,x(1),x(2).toInt))
  23. //创建Schema
  24. val schema : StructType = StructType(Seq(
  25. StructField("id",IntegerType,false),
  26. StructField("name",StringType,false),
  27. StructField("age",IntegerType,false)
  28. ))
  29. //利用personRDD与Schema创建DataFrame
  30. val personDF : DataFrame = spark.createDataFrame(personRDD,schema)
  31. //DSL操作显示DataFrame的数据结果
  32. personDF.show()
  33. //将DataFrame注册成表
  34. personDF.createOrReplaceTempView("t_person")
  35. //sql语句操作
  36. spark.sql("select * from t_person").show()
  37. //关闭资源
  38. sc.stop()
  39. spark.stop()
  40. }
  41. }

运行结果:


本文转载自: https://blog.csdn.net/m0_59839948/article/details/124902790
版权归原作者 鄙人阿彬 所有, 如有侵权,请联系我们删除。

“RDD转换为DataFrame”的评论:

还没有评论