0


Spark SQL实战(07)-Data Sources

1 概述

Spark SQL通过DataFrame接口支持对多种数据源进行操作。

DataFrame可使用关系型变换进行操作,也可用于创建临时视图。将DataFrame注册为临时视图可以让你对其数据运行SQL查询。

本节介绍使用Spark数据源加载和保存数据的一般方法,并进一步介绍可用于内置数据源的特定选项。

数据源关键操作:

  • load
  • save

2 大数据作业基本流程

input 业务逻辑 output
不管是使用MR/Hive/Spark/Flink/Storm。

Spark能处理多种数据源的数据,而且这些数据源可以是在不同地方:

  • file/HDFS/S3/OSS/COS/RDBMS
  • json/ORC/Parquet/JDBC
object DataSourceApp {def main(args: Array[String]):Unit={val spark: SparkSession = SparkSession.builder().master("local").getOrCreate()
    
    text(spark)// json(spark)// common(spark)// parquet(spark)// convert(spark)// jdbc(spark)
    jdbc2(spark)
    spark.stop()}}

3 text数据源读写

读取文本文件的 API,SparkSession.read.text()

参数:

  • path:读取文本文件的路径。可以是单个文件、文件夹或者包含通配符的文件路径。
  • wholetext:如果为 True,则将整个文件读取为一条记录;否则将每行读取为一条记录。
  • lineSep:如果指定,则使用指定的字符串作为行分隔符。
  • pathGlobFilter:用于筛选文件的通配符模式。
  • recursiveFileLookup:是否递归查找子目录中的文件。
  • allowNonExistingFiles:是否允许读取不存在的文件。
  • allowEmptyFiles:是否允许读取空文件。

返回一个 DataFrame 对象,其中每行是文本文件中的一条记录。

def text(spark: SparkSession):Unit={importspark.implicits._

  val textDF: DataFrame = spark.read.text("/Users/javaedge/Downloads/sparksql-train/data/people.txt")val result: Dataset[(String,String)]= textDF.map(x =>{val splits: Array[String]= x.getString(0).split(",")(splits(0).trim, splits(1).trim)})

编译无问题,运行时报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;

思考下,如何使用text方式,输出多列的值?

修正后

val result: Dataset[String]= textDF.map(x =>{val splits: Array[String]= x.getString(0).split(",")
  splits(0).trim
})

result.write.text("out")

继续报错:

Exception in thread "main" org.apache.spark.sql.AnalysisException: path file:/Users/javaedge/Downloads/sparksql-train/out already exists.;

回想Hadoop中MapReduce的输出:

  • 第一次0K
  • 第二次也会报错输出目录已存在

这关系到 Spark 中的 mode

SaveMode

Spark SQL中,使用DataFrame或Dataset的write方法将数据写入外部存储系统时,使用“SaveMode”参数指定如何处理已存在的数据。

SaveMode有四种取值:

  1. SaveMode.ErrorIfExists:如果目标路径已经存在,则会引发异常
  2. SaveMode.Append:将数据追加到现有数据
  3. SaveMode.Overwrite:覆盖现有数据
  4. SaveMode.Ignore:若目标路径已经存在,则不执行任何操作

所以,修正如下:

result.write.mode(SaveMode.overwrite).text("out")

4 JSON 数据源

// JSONdef json(spark: SparkSession):Unit={importspark.implicits._

  val jsonDF: DataFrame = spark.read.json("/Users/javaedge/Downloads/sparksql-train/data/people.json")

  jsonDF.show()// 只要age>20的数据
  jsonDF.filter("age > 20").select("name").write.mode(SaveMode.Overwrite).json("out")
  
output:
+----+-------+| age|   name|+----+-------+|null|Michael||30|   Andy||19| Justin|+----+-------+

嵌套 JSON

// 嵌套 JSONval jsonDF2: DataFrame = spark.read.json("/Users/javaedge/Downloads/sparksql-train/data/people2.json")
jsonDF2.show()

jsonDF2.select($"name",
  $"age",
  $"info.work".as("work"),
  $"info.home".as("home")).write.mode("overwrite").json("out")

output:
+---+-------------------+----+|age|               info|name|+---+-------------------+----+|30|[shenzhen, beijing]|  PK|+---+-------------------+----+

5 标准写法

// 标准API写法privatedef common(spark: SparkSession):Unit={importspark.implicits._

  val textDF: DataFrame = spark.read.format("text").load("/Users/javaedge/Downloads/sparksql-train/data/people.txt")val jsonDF: DataFrame = spark.read.format("json").load("/Users/javaedge/Downloads/sparksql-train/data/people.json")
  textDF.show()
  println("~~~~~~~~")
  jsonDF.show()

  jsonDF.write.format("json").mode("overwrite").save("out")}

output:
+-----------+|      value|+-----------+|Michael,29||   Andy,30|| Justin,19|+-----------+~~~~~~~~+----+-------+| age|   name|+----+-------+|null|Michael||30|   Andy||19| Justin|+----+-------+

6 Parquet数据源

6.1 简介

一种列式存储格式,在大数据环境中高效地存储和处理数据。由Hadoop生态系统中的Apache Parquet项目开发的。

6.2 设计目标

支持高效的列式存储和压缩,并提供高性能的读/写能力,以便处理大规模结构化数据。

Parquet可以与许多不同的计算框架一起使用,如Apache Hadoop、Apache Spark、Apache Hive等,因此广泛用于各种大数据应用程序中。

6.3 优点

高性能、节省存储空间、支持多种编程语言和数据类型、易于集成和扩展等。

privatedef parquet(spark: SparkSession):Unit={importspark.implicits._

  val parquetDF: DataFrame = spark.read.parquet("/Users/javaedge/Downloads/sparksql-train/data/users.parquet")
  parquetDF.printSchema()
  parquetDF.show()

  parquetDF.select("name","favorite_numbers").write.mode("overwrite").option("compression","none").parquet("out")
  
output:
root
 |-- name: string (nullable =true)|-- favorite_color: string (nullable =true)|-- favorite_numbers: array (nullable =true)||-- element: integer (containsNull =true)+------+--------------+----------------+|  name|favorite_color|favorite_numbers|+------+--------------+----------------+|Alyssa|null|[3,9,15,20]||   Ben|           red|[]|+------+--------------+----------------+

7convert

方便从一种数据源写到另一种数据源。

存储类型转换:JSON==>Parquet

def convert(spark: SparkSession):Unit={importspark.implicits._

  val jsonDF: DataFrame = spark.read.format("json").load("/Users/javaedge/Downloads/sparksql-train/data/people.json")
  jsonDF.show()

  jsonDF.filter("age>20").write.format("parquet").mode(SaveMode.Overwrite).save("out")

8 JDBC

有些数据是在MySQL,使用Spark处理,肯定要通过Spark读出MySQL的数据。
数据源是text/json,通过Spark处理完后,要将统计结果写入MySQL。

查 DB

写法一

def jdbc(spark: SparkSession):Unit={importspark.implicits._

  val jdbcDF = spark.read
    .format("jdbc").option("url","jdbc:mysql://localhost:3306").option("dbtable","smartrm_monolith.order").option("user","root").option("password","root").load()

  jdbcDF.filter($"order_id">150).show(100)}

写法二

val connectionProperties =new Properties()
connectionProperties.put("user","root")
connectionProperties.put("password","root")val jdbcDF2: DataFrame = spark.read
  .jdbc(url, srcTable, connectionProperties)

jdbcDF2.filter($"order_id">100)

写 DB

val connProps =new Properties()
connProps.put("user","root")
connProps.put("password","root")val jdbcDF: DataFrame = spark.read.jdbc(url, srcTable, connProps)

jdbcDF.filter($"order_id">100).write.jdbc(url,"smartrm_monolith.order_bak", connProps)

若 目标表不存在,会自动帮你创建:

统一配置管理

如何将那么多数据源配置参数统一管理呢?

先引入依赖:

<dependency><groupId>com.typesafe</groupId><artifactId>config</artifactId><version>1.3.3</version></dependency>

配置文件:

读配置的程序:

packagecom.javaedge.bigdata.chapter05importcom.typesafe.config.{Config, ConfigFactory}object ParamsApp {def main(args: Array[String]):Unit={val config: Config = ConfigFactory.load()val url:String= config.getString("db.default.url")
    println(url)}}
privatedef jdbcConfig(spark: SparkSession):Unit={importspark.implicits._

  val config = ConfigFactory.load()val url = config.getString("db.default.url")val user = config.getString("db.default.user")val password = config.getString("db.default.password")val driver = config.getString("db.default.driver")val database = config.getString("db.default.database")val table = config.getString("db.default.table")val sinkTable = config.getString("db.default.sink.table")val connectionProperties =new Properties()
  connectionProperties.put("user", user)
  connectionProperties.put("password", password)val jdbcDF: DataFrame = spark.read.jdbc(url,s"$database.$table", connectionProperties)

  jdbcDF.filter($"order_id">100).show()

写到新表:

jdbcDF.filter($"order_id">158).write.jdbc(url,s"$database.$sinkTable", connectionProperties)

标签: spark sql 大数据

本文转载自: https://blog.csdn.net/qq_33589510/article/details/129772840
版权归原作者 JavaEdge. 所有, 如有侵权,请联系我们删除。

“Spark SQL实战(07)-Data Sources”的评论:

还没有评论