0


sparkSQL解析json格式数据相关操作

  1. get_json_object(string json_string, string path)
  2. 适合最外层为{}的json解析
  3. 第一个参数是json对象变量,也就是含json的字段
  4. 第二个参数使用$作为json变量标识 定位数据位置,按jsonpath的规则
  5. from_json(string json_string, string struct):
  6. 适合[]多行结构统一的json解析
  7. 其中struct参数,也就是jsonschema格式,可由schema_of_json读取
  1. select schema_of_json('[{"text":"a"},{"b":"a"}, {"c":1}]')
  2. ---ARRAY<STRUCT<`b`: STRING, `c`: BIGINT, `text`: STRING>>
  1. 使用案例:
  2. 1get_json_object解析单层json
  1. from pyspark.sql import SparkSession
  2. spark = SparkSession.builder.master("local").enableHiveSupport().appName("test").getOrCreate()
  3. spark.sql("""
  4. with data_table as (select '{
  5. "timestamp": "2021-03-23T06:45:11.460Z",
  6. "metadata": {
  7. "beat": "filebeat",
  8. "type": "doc",
  9. "version": "6.6.1",
  10. "topic": "gateway_track_log"
  11. },
  12. "service_port": "1111",
  13. "service_name": "gateway",
  14. "service_ip": [{"ip_a":"100.100.89.09"},{"ip_b":"100.100.89.10"}],
  15. "center_name": "open"
  16. }' as col)
  17. select get_json_object(col,'$.timestamp') as `timestamp`,
  18. get_json_object(col,'$.metadata.type') as metadata_type,
  19. get_json_object(col,'$.service_ip[0].ip_a') as service_ip_a
  20. from data_table
  21. """).show()
  22. '''结果样式:
  23. +--------------------+-------------+-------------+
  24. | timestamp|metadata_type| service_ip_a|
  25. +--------------------+-------------+-------------+
  26. |2021-03-23T06:45:...| doc|100.100.89.09|
  27. +--------------------+-------------+-------------+
  28. '''

2、from_json解析多行json的某行某key对应值:

  1. spark.sql("""
  2. select from_json('[{"action_id":"favor_add",
  3. "item":"2",
  4. "item_type":"sku_id",
  5. "ts":1592123787234},
  6. {"action_id":"cart_add",
  7. "item":"2",
  8. "item_type":"sku_id",
  9. "ts":1592123789884}]','ARRAY<STRUCT<action_id: STRING,item: STRING,item_type: STRING,ts:BIGINT>>')[0]['ts'] as q
  10. """).show()
  11. '''结果样式:
  12. +-------------+
  13. | q|
  14. +-------------+
  15. |1592123787234|
  16. +-------------+
  17. '''

3、explode应用于from_json解析出的多行ARRAY结构的展开

  1. spark.sql("""
  2. select explode(from_json('[{"action_id":"favor_add",
  3. "item":"2",
  4. "item_type":"sku_id",
  5. "ts":1592123787234},
  6. {"action_id":"cart_add",
  7. "item":"2",
  8. "item_type":"sku_id",
  9. "ts":1592123789884}]','ARRAY<STRUCT<action_id: STRING,item: STRING,item_type: STRING,ts:BIGINT>>') ) as p
  10. """).show()
  11. '''结果样式:注意key没打印出来
  12. +--------------------+
  13. | p|
  14. +--------------------+
  15. |[favor_add, 2, sk...|
  16. |[cart_add, 2, sku...|
  17. +--------------------+
  18. '''

4、explode应用于from_json解析并提取key:

  1. spark.sql("""
  2. select table1.p.action_id as action_id,
  3. table1.p.item as item,
  4. table1.p.item_type as item_type,
  5. table1.p.ts as ts
  6. from
  7. ( select explode(from_json('[{"action_id":"favor_add",
  8. "item":"2",
  9. "item_type":"sku_id",
  10. "ts":1592123787234},
  11. {"action_id":"cart_add",
  12. "item":"2",
  13. "item_type":"sku_id",
  14. "ts":1592123789884}]','ARRAY<STRUCT<action_id: STRING,item: STRING,item_type: STRING,ts:BIGINT>>') ) as p ) table1
  15. """).show()
  16. '''结果样式:
  17. +---------+----+---------+-------------+
  18. |action_id|item|item_type| ts|
  19. +---------+----+---------+-------------+
  20. |favor_add| 2| sku_id|1592123787234|
  21. | cart_add| 2| sku_id|1592123789884|
  22. +---------+----+---------+-------------+
  23. '''

5、混合使用:从表中提取json数据,并通过get_json_object获取数据部分,explode+from_json解析出结果至指定形式:

  1. spark.sql("""
  2. with data_table as (select '{
  3. "timestamp": "2021-03-23T06:45:11.460Z",
  4. "metadata": {
  5. "beat": "filebeat",
  6. "type": "doc",
  7. "version": "6.6.1",
  8. "topic": "gateway_track_log"
  9. },
  10. "service_port": "1111",
  11. "service_name": "gateway",
  12. "service_ip": [{"ip_a":"100.100.89.09","ip_b":"100.100.89.10"},
  13. {"ip_a":"100.100.89.11","ip_b":"100.100.89.12"}],
  14. "center_name": "open"
  15. }' as col
  16. union
  17. select '{
  18. "timestamp": "2021-03-24T06:45:11.460Z",
  19. "metadata": {
  20. "beat": "filebeat",
  21. "type": "doc",
  22. "version": "6.6.1",
  23. "topic": "gateway_track_log"
  24. },
  25. "service_port": "1112",
  26. "service_name": "gateway",
  27. "service_ip": [{"ip_a":"100.100.89.13","ip_b":"100.100.89.14"},
  28. {"ip_a":"100.100.89.15","ip_b":"100.100.89.16"}],
  29. "center_name": "open"
  30. }' as col)
  31. select t1.timestamp as timestamp,
  32. t1.ip0_a as ip0_a,
  33. t1.multi.ip_a as ip_a,
  34. t1.multi.ip_b as ip_b
  35. from
  36. (select t0.timestamp,
  37. from_json(t0.ip_json,'ARRAY<STRUCT<ip_a: STRING,ip_b: STRING>>')[0]['ip_a'] as ip0_a,
  38. explode(from_json(t0.ip_json,'ARRAY<STRUCT<ip_a: STRING,ip_b: STRING>>')) as multi
  39. from
  40. (select get_json_object(col,'$.timestamp') as timestamp,
  41. get_json_object(col,'$.service_ip') as ip_json from data_table ) t0 ) t1
  42. """).show()
  43. '''结果格式:
  44. +--------------------+-------------+-------------+-------------+
  45. | timestamp| ip0_a| ip_a| ip_b|
  46. +--------------------+-------------+-------------+-------------+
  47. |2021-03-23T06:45:...|100.100.89.09|100.100.89.09|100.100.89.10|
  48. |2021-03-23T06:45:...|100.100.89.09|100.100.89.11|100.100.89.12|
  49. |2021-03-24T06:45:...|100.100.89.13|100.100.89.13|100.100.89.14|
  50. |2021-03-24T06:45:...|100.100.89.13|100.100.89.15|100.100.89.16|
  51. +--------------------+-------------+-------------+-------------+
  52. '''
标签: spark sql

本文转载自: https://blog.csdn.net/wyp111/article/details/126856484
版权归原作者 wyp111 所有, 如有侵权,请联系我们删除。

“sparkSQL解析json格式数据相关操作”的评论:

还没有评论