3、spark–JSON数据的处理
3.1 介绍
JSON数据
- Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrameSpark SQL能够自动将JSON数据集以结构化的形式加载为一个DataFrame
- This conversion can be done using SparkSession.read.json on a JSON file读取一个JSON文件可以用SparkSession.read.json方法
从JSON到DataFrame
- 指定DataFrame的schema1,通过反射自动推断,适合静态数据2,程序指定,适合程序运行中动态生成的数据
加载json数据
#使用内部的schema
jsonDF = spark.read.json("xxx.json")
jsonDF = spark.read.format('json').load('xxx.json')#指定schema
jsonDF = spark.read.schema(jsonSchema).json('xxx.json')
嵌套结构的JSON
- 重要的方法1,get_json_object2,get_json3,explode
3.2 实践
3.1 静态json数据的读取和操作
无嵌套结构的json数据
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext
# ==========================================# 无嵌套结构的json# ==========================================
jsonString =["""{ "id" : "01001", "city" : "AGAWAM", "pop" : 15338, "state" : "MA" }""","""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""]
从json字符串数组得到DataFrame
# 从json字符串数组得到rdd有两种方法# 1. 转换为rdd,再从rdd到DataFrame# 2. 直接利用spark.createDataFrame(),见后面例子
jsonRDD = sc.parallelize(jsonString)# stringJSONRDD
jsonDF = spark.read.json(jsonRDD)# convert RDD into DataFrame
jsonDF.printSchema()
jsonDF.show()
直接从文件生成DataFrame
# -- 直接从文件生成DataFrame#只有被压缩后的json文件内容,才能被spark-sql正确读取,否则格式化后的数据读取会出现问题
jsonDF = spark.read.json("xxx.json")# or# jsonDF = spark.read.format('json').load('xxx.json')
jsonDF.printSchema()
jsonDF.show()
jsonDF.filter(jsonDF.pop>4000).show(10)#依照已有的DataFrame,创建一个临时的表(相当于mysql数据库中的一个表),这样就可以用纯sql语句进行数据操作
jsonDF.createOrReplaceTempView("tmp_table")
resultDF = spark.sql("select * from tmp_table where pop>4000")
resultDF.show(10)
3.2 动态json数据的读取和操作
指定DataFrame的Schema
3.1节中的例子为通过反射自动推断schema,适合静态数据
下面我们来讲解如何进行程序指定schema
没有嵌套结构的json
jsonString =["""{ "id" : "01001", "city" : "AGAWAM", "pop" : 15338, "state" : "MA" }""","""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""]
jsonRDD = sc.parallelize(jsonString)from pyspark.sql.types import*#定义结构类型#StructType:schema的整体结构,表示JSON的对象结构#XXXStype:指的是某一列的数据类型
jsonSchema = StructType() \
.add("id", StringType(),True) \
.add("city", StringType()) \
.add("pop", LongType()) \
.add("state",StringType())
jsonSchema = StructType() \
.add("id", LongType(),True) \
.add("city", StringType()) \
.add("pop", DoubleType()) \
.add("state",StringType())
reader = spark.read.schema(jsonSchema)
jsonDF = reader.json(jsonRDD)
jsonDF.printSchema()
jsonDF.show()
带有嵌套结构的json
from pyspark.sql.types import*
jsonSchema = StructType([
StructField("id", StringType(),True),
StructField("city", StringType(),True),
StructField("loc", ArrayType(DoubleType())),
StructField("pop", LongType(),True),
StructField("state", StringType(),True)])
reader = spark.read.schema(jsonSchema)
jsonDF = reader.json('data/nest.json')
jsonDF.printSchema()
jsonDF.show(2)
jsonDF.filter(jsonDF.pop>4000).show(10)
版权归原作者 Echo-Niu 所有, 如有侵权,请联系我们删除。