0


spark--JSON数据的处理

3、spark–JSON数据的处理

3.1 介绍

JSON数据

  • Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrameSpark SQL能够自动将JSON数据集以结构化的形式加载为一个DataFrame
  • This conversion can be done using SparkSession.read.json on a JSON file读取一个JSON文件可以用SparkSession.read.json方法

从JSON到DataFrame

  • 指定DataFrame的schema1,通过反射自动推断,适合静态数据2,程序指定,适合程序运行中动态生成的数据

加载json数据

#使用内部的schema
jsonDF = spark.read.json("xxx.json")
jsonDF = spark.read.format('json').load('xxx.json')#指定schema
jsonDF = spark.read.schema(jsonSchema).json('xxx.json')

嵌套结构的JSON

  • 重要的方法1,get_json_object2,get_json3,explode

3.2 实践

3.1 静态json数据的读取和操作

无嵌套结构的json数据

from pyspark.sql import SparkSession
spark =  SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext

# ==========================================#                无嵌套结构的json# ==========================================
jsonString =["""{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""","""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""]

从json字符串数组得到DataFrame

# 从json字符串数组得到rdd有两种方法# 1. 转换为rdd,再从rdd到DataFrame# 2. 直接利用spark.createDataFrame(),见后面例子

jsonRDD = sc.parallelize(jsonString)# stringJSONRDD
jsonDF =  spark.read.json(jsonRDD)# convert RDD into DataFrame
jsonDF.printSchema()
jsonDF.show()

直接从文件生成DataFrame

# -- 直接从文件生成DataFrame#只有被压缩后的json文件内容,才能被spark-sql正确读取,否则格式化后的数据读取会出现问题
jsonDF = spark.read.json("xxx.json")# or# jsonDF = spark.read.format('json').load('xxx.json')

jsonDF.printSchema()
jsonDF.show()

jsonDF.filter(jsonDF.pop>4000).show(10)#依照已有的DataFrame,创建一个临时的表(相当于mysql数据库中的一个表),这样就可以用纯sql语句进行数据操作
jsonDF.createOrReplaceTempView("tmp_table")

resultDF = spark.sql("select * from tmp_table where pop>4000")
resultDF.show(10)

3.2 动态json数据的读取和操作

指定DataFrame的Schema

3.1节中的例子为通过反射自动推断schema,适合静态数据

下面我们来讲解如何进行程序指定schema

没有嵌套结构的json

jsonString =["""{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""","""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""]

jsonRDD = sc.parallelize(jsonString)from pyspark.sql.types import*#定义结构类型#StructType:schema的整体结构,表示JSON的对象结构#XXXStype:指的是某一列的数据类型
jsonSchema = StructType() \
  .add("id", StringType(),True) \
  .add("city", StringType()) \
  .add("pop", LongType()) \
  .add("state",StringType())

jsonSchema = StructType() \
  .add("id", LongType(),True) \
  .add("city", StringType()) \
  .add("pop", DoubleType()) \
  .add("state",StringType())

reader = spark.read.schema(jsonSchema)

jsonDF = reader.json(jsonRDD)
jsonDF.printSchema()
jsonDF.show()

带有嵌套结构的json

from pyspark.sql.types import*
jsonSchema = StructType([
    StructField("id", StringType(),True),
    StructField("city", StringType(),True),
    StructField("loc", ArrayType(DoubleType())),
    StructField("pop", LongType(),True),
    StructField("state", StringType(),True)])

reader = spark.read.schema(jsonSchema)
jsonDF = reader.json('data/nest.json')
jsonDF.printSchema()
jsonDF.show(2)
jsonDF.filter(jsonDF.pop>4000).show(10)
标签: spark json 大数据

本文转载自: https://blog.csdn.net/weixin_43894652/article/details/128682573
版权归原作者 Echo-Niu 所有, 如有侵权,请联系我们删除。

“spark--JSON数据的处理”的评论:

还没有评论