get_json_object(string json_string, string path) :
适合最外层为{}的json解析
第一个参数是json对象变量,也就是含json的字段
第二个参数使用$作为json变量标识 定位数据位置,按jsonpath的规则
from_json(string json_string, string struct):
适合[]多行结构统一的json解析
其中struct参数,也就是json的schema格式,可由schema_of_json读取
select schema_of_json('[{"text":"a"},{"b":"a"}, {"c":1}]')
---ARRAY<STRUCT<`b`: STRING, `c`: BIGINT, `text`: STRING>>
使用案例:
1、get_json_object解析单层json:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").enableHiveSupport().appName("test").getOrCreate()
spark.sql("""
with data_table as (select '{
"timestamp": "2021-03-23T06:45:11.460Z",
"metadata": {
"beat": "filebeat",
"type": "doc",
"version": "6.6.1",
"topic": "gateway_track_log"
},
"service_port": "1111",
"service_name": "gateway",
"service_ip": [{"ip_a":"100.100.89.09"},{"ip_b":"100.100.89.10"}],
"center_name": "open"
}' as col)
select get_json_object(col,'$.timestamp') as `timestamp`,
get_json_object(col,'$.metadata.type') as metadata_type,
get_json_object(col,'$.service_ip[0].ip_a') as service_ip_a
from data_table
""").show()
'''结果样式:
+--------------------+-------------+-------------+
| timestamp|metadata_type| service_ip_a|
+--------------------+-------------+-------------+
|2021-03-23T06:45:...| doc|100.100.89.09|
+--------------------+-------------+-------------+
'''
2、from_json解析多行json的某行某key对应值:
spark.sql("""
select from_json('[{"action_id":"favor_add",
"item":"2",
"item_type":"sku_id",
"ts":1592123787234},
{"action_id":"cart_add",
"item":"2",
"item_type":"sku_id",
"ts":1592123789884}]','ARRAY<STRUCT<action_id: STRING,item: STRING,item_type: STRING,ts:BIGINT>>')[0]['ts'] as q
""").show()
'''结果样式:
+-------------+
| q|
+-------------+
|1592123787234|
+-------------+
'''
3、explode应用于from_json解析出的多行ARRAY结构的展开
spark.sql("""
select explode(from_json('[{"action_id":"favor_add",
"item":"2",
"item_type":"sku_id",
"ts":1592123787234},
{"action_id":"cart_add",
"item":"2",
"item_type":"sku_id",
"ts":1592123789884}]','ARRAY<STRUCT<action_id: STRING,item: STRING,item_type: STRING,ts:BIGINT>>') ) as p
""").show()
'''结果样式:注意key没打印出来
+--------------------+
| p|
+--------------------+
|[favor_add, 2, sk...|
|[cart_add, 2, sku...|
+--------------------+
'''
4、explode应用于from_json解析并提取key:
spark.sql("""
select table1.p.action_id as action_id,
table1.p.item as item,
table1.p.item_type as item_type,
table1.p.ts as ts
from
( select explode(from_json('[{"action_id":"favor_add",
"item":"2",
"item_type":"sku_id",
"ts":1592123787234},
{"action_id":"cart_add",
"item":"2",
"item_type":"sku_id",
"ts":1592123789884}]','ARRAY<STRUCT<action_id: STRING,item: STRING,item_type: STRING,ts:BIGINT>>') ) as p ) table1
""").show()
'''结果样式:
+---------+----+---------+-------------+
|action_id|item|item_type| ts|
+---------+----+---------+-------------+
|favor_add| 2| sku_id|1592123787234|
| cart_add| 2| sku_id|1592123789884|
+---------+----+---------+-------------+
'''
5、混合使用:从表中提取json数据,并通过get_json_object获取数据部分,explode+from_json解析出结果至指定形式:
spark.sql("""
with data_table as (select '{
"timestamp": "2021-03-23T06:45:11.460Z",
"metadata": {
"beat": "filebeat",
"type": "doc",
"version": "6.6.1",
"topic": "gateway_track_log"
},
"service_port": "1111",
"service_name": "gateway",
"service_ip": [{"ip_a":"100.100.89.09","ip_b":"100.100.89.10"},
{"ip_a":"100.100.89.11","ip_b":"100.100.89.12"}],
"center_name": "open"
}' as col
union
select '{
"timestamp": "2021-03-24T06:45:11.460Z",
"metadata": {
"beat": "filebeat",
"type": "doc",
"version": "6.6.1",
"topic": "gateway_track_log"
},
"service_port": "1112",
"service_name": "gateway",
"service_ip": [{"ip_a":"100.100.89.13","ip_b":"100.100.89.14"},
{"ip_a":"100.100.89.15","ip_b":"100.100.89.16"}],
"center_name": "open"
}' as col)
select t1.timestamp as timestamp,
t1.ip0_a as ip0_a,
t1.multi.ip_a as ip_a,
t1.multi.ip_b as ip_b
from
(select t0.timestamp,
from_json(t0.ip_json,'ARRAY<STRUCT<ip_a: STRING,ip_b: STRING>>')[0]['ip_a'] as ip0_a,
explode(from_json(t0.ip_json,'ARRAY<STRUCT<ip_a: STRING,ip_b: STRING>>')) as multi
from
(select get_json_object(col,'$.timestamp') as timestamp,
get_json_object(col,'$.service_ip') as ip_json from data_table ) t0 ) t1
""").show()
'''结果格式:
+--------------------+-------------+-------------+-------------+
| timestamp| ip0_a| ip_a| ip_b|
+--------------------+-------------+-------------+-------------+
|2021-03-23T06:45:...|100.100.89.09|100.100.89.09|100.100.89.10|
|2021-03-23T06:45:...|100.100.89.09|100.100.89.11|100.100.89.12|
|2021-03-24T06:45:...|100.100.89.13|100.100.89.13|100.100.89.14|
|2021-03-24T06:45:...|100.100.89.13|100.100.89.15|100.100.89.16|
+--------------------+-------------+-------------+-------------+
'''
版权归原作者 wyp111 所有, 如有侵权,请联系我们删除。