0


sparkSQL解析json格式数据相关操作

get_json_object(string json_string, string path) : 
适合最外层为{}的json解析
第一个参数是json对象变量,也就是含json的字段
第二个参数使用$作为json变量标识 定位数据位置,按jsonpath的规则

from_json(string json_string, string struct):
适合[]多行结构统一的json解析
其中struct参数,也就是json的schema格式,可由schema_of_json读取
select schema_of_json('[{"text":"a"},{"b":"a"}, {"c":1}]')
---ARRAY<STRUCT<`b`: STRING, `c`: BIGINT, `text`: STRING>>
使用案例:
1、get_json_object解析单层json:
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").enableHiveSupport().appName("test").getOrCreate()

spark.sql("""
with data_table as (select  '{
  "timestamp": "2021-03-23T06:45:11.460Z",
  "metadata": {
    "beat": "filebeat",
    "type": "doc",
    "version": "6.6.1",
    "topic": "gateway_track_log"
  },
  "service_port": "1111",
  "service_name": "gateway",
  "service_ip": [{"ip_a":"100.100.89.09"},{"ip_b":"100.100.89.10"}],
  "center_name": "open"
}' as col) 
select get_json_object(col,'$.timestamp') as `timestamp`,
       get_json_object(col,'$.metadata.type') as metadata_type,
       get_json_object(col,'$.service_ip[0].ip_a') as service_ip_a 
  from data_table
""").show()
'''结果样式:
+--------------------+-------------+-------------+
|           timestamp|metadata_type| service_ip_a|
+--------------------+-------------+-------------+
|2021-03-23T06:45:...|          doc|100.100.89.09|
+--------------------+-------------+-------------+
'''

2、from_json解析多行json的某行某key对应值:

spark.sql("""
   select from_json('[{"action_id":"favor_add",
                       "item":"2",
                       "item_type":"sku_id",
                       "ts":1592123787234},
                      {"action_id":"cart_add",
                       "item":"2",
                       "item_type":"sku_id",
                       "ts":1592123789884}]','ARRAY<STRUCT<action_id: STRING,item: STRING,item_type: STRING,ts:BIGINT>>')[0]['ts'] as q
""").show()
'''结果样式:
+-------------+
|            q|
+-------------+
|1592123787234|
+-------------+
'''

3、explode应用于from_json解析出的多行ARRAY结构的展开

spark.sql("""
select explode(from_json('[{"action_id":"favor_add",
                    "item":"2",
                    "item_type":"sku_id",
                    "ts":1592123787234},
                   {"action_id":"cart_add",
                    "item":"2",
                    "item_type":"sku_id",
                    "ts":1592123789884}]','ARRAY<STRUCT<action_id: STRING,item: STRING,item_type: STRING,ts:BIGINT>>') ) as p

""").show()
'''结果样式:注意key没打印出来
+--------------------+
|                   p|
+--------------------+
|[favor_add, 2, sk...|
|[cart_add, 2, sku...|
+--------------------+
'''

4、explode应用于from_json解析并提取key:

spark.sql("""
            select table1.p.action_id as action_id,
                   table1.p.item as item,
                   table1.p.item_type as item_type,
                   table1.p.ts as ts
            from   
            (      select explode(from_json('[{"action_id":"favor_add",
                    "item":"2",
                    "item_type":"sku_id",
                    "ts":1592123787234},
                   {"action_id":"cart_add",
                    "item":"2",
                    "item_type":"sku_id",
                    "ts":1592123789884}]','ARRAY<STRUCT<action_id: STRING,item: STRING,item_type: STRING,ts:BIGINT>>') ) as p  ) table1

""").show()
'''结果样式:
+---------+----+---------+-------------+
|action_id|item|item_type|           ts|
+---------+----+---------+-------------+
|favor_add|   2|   sku_id|1592123787234|
| cart_add|   2|   sku_id|1592123789884|
+---------+----+---------+-------------+
'''

5、混合使用:从表中提取json数据,并通过get_json_object获取数据部分,explode+from_json解析出结果至指定形式:

spark.sql("""
with data_table as (select  '{
  "timestamp": "2021-03-23T06:45:11.460Z",
  "metadata": {
      "beat": "filebeat",
      "type": "doc",
      "version": "6.6.1",
      "topic": "gateway_track_log"
  },
  "service_port": "1111",
  "service_name": "gateway",
  "service_ip": [{"ip_a":"100.100.89.09","ip_b":"100.100.89.10"},
                 {"ip_a":"100.100.89.11","ip_b":"100.100.89.12"}],
  "center_name": "open"
}' as col  
union
select  '{
  "timestamp": "2021-03-24T06:45:11.460Z",
  "metadata": {
      "beat": "filebeat",
      "type": "doc",
      "version": "6.6.1",
      "topic": "gateway_track_log"
  },
  "service_port": "1112",
  "service_name": "gateway",
  "service_ip": [{"ip_a":"100.100.89.13","ip_b":"100.100.89.14"},
                 {"ip_a":"100.100.89.15","ip_b":"100.100.89.16"}],
  "center_name": "open"
}' as col)  
select t1.timestamp as timestamp,
       t1.ip0_a as ip0_a,
       t1.multi.ip_a as ip_a,
       t1.multi.ip_b as ip_b
from
(select t0.timestamp,
       from_json(t0.ip_json,'ARRAY<STRUCT<ip_a: STRING,ip_b: STRING>>')[0]['ip_a'] as ip0_a,
       explode(from_json(t0.ip_json,'ARRAY<STRUCT<ip_a: STRING,ip_b: STRING>>')) as multi
from
(select get_json_object(col,'$.timestamp') as timestamp,
       get_json_object(col,'$.service_ip') as ip_json from data_table ) t0  ) t1
""").show()
'''结果格式:
+--------------------+-------------+-------------+-------------+
|           timestamp|        ip0_a|         ip_a|         ip_b|
+--------------------+-------------+-------------+-------------+
|2021-03-23T06:45:...|100.100.89.09|100.100.89.09|100.100.89.10|
|2021-03-23T06:45:...|100.100.89.09|100.100.89.11|100.100.89.12|
|2021-03-24T06:45:...|100.100.89.13|100.100.89.13|100.100.89.14|
|2021-03-24T06:45:...|100.100.89.13|100.100.89.15|100.100.89.16|
+--------------------+-------------+-------------+-------------+
'''
标签: spark sql

本文转载自: https://blog.csdn.net/wyp111/article/details/126856484
版权归原作者 wyp111 所有, 如有侵权,请联系我们删除。

“sparkSQL解析json格式数据相关操作”的评论:

还没有评论