0


Spark SQL 血缘解析方案

背景

项目背景建设数据中台,往往数据开发人员首先需要能够通过有效的途径检索到所需要的数据,然后根据检索的数据模型进行业务加工然后得到一些中间模型,最后再通过数据抽取工具或者OLAP分析工具直接将数据仓库中加工好的公共模型输出到应用层。这里我不在去介绍数据仓库为何需要分层以及该如何分层,这个逻辑已经有很多厂商在业务中实践过,这里就不再赘述,本次主要需要解决的事数据链路加工血缘采集的方案。本着知识积累的原则记录一下方案。

Hive DDL采集和血缘

目前这个是最简单的,如果没有特殊的需求,可以直接对Apache Atlas中的hive hook进行裁剪,最终可以得到业务所需的血缘采集插件,一般可以到字段级别血缘。

Spark SQL血缘采集

目前针对Spark SQL血缘采集,首先DDL元数据采集依旧使用Apache Atlas中的hive Hook,因为即使使用Spark操作Hive也是最终链接的是hive的metastore数据库。现在主要解决的是Spark SQL计算中如何记录下血缘信息:

spark.sql.queryExecutionListeners    za.co.absa.spline.harvester.listener.SplineQueryExecutionListener
spark.spline.lineageDispatcher    kafka
spark.spline.lineageDispatcher.kafka.topic    linkis_spark_lineage
spark.spline.lineageDispatcher.kafka.producer.bootstrap.servers    localhost:9092
# 添加额外属性,适合多租户场景下的血缘采集
spark.spline.postProcessingFilter    userExtraMeta
spark.spline.postProcessingFilter.userExtraMeta.className    za.co.absa.spline.harvester.postprocessing.metadata.MetadataCollectingFilter
spark.spline.postProcessingFilter.userExtraMeta.rules    {
   \"executionPlan\":{
   \"extra\":{
   \"companyCode\":\"1200202023020320\"\\,\"originQuery\":{
   \"$js\":\"session.conf().get('sql'\\,'')\"}}}}

到这里就可以启动Spark SQL客户端查看效果,例如小编执行如下sql

CREATETABLE test.t_order (
  id INT,
  uid INT,
  amount INT,
  price DOUBLE,
  c_time TIMESTAMP);CREATETABLE test.t_user (
  uid INT,
  name STRING,
  age INT);CREATETABLE test.t_order_detail (
  id INT,
  name STRING,
  cost DOUBLE,
  c_time TIMESTAMP);setsql=insertinto t_order_detail select o.id,u.name,(o.amount * o.price)as cost,o.c_time from t_user u leftjoin t_order o on o.uid=u.uid;insertinto t_order_detail select o.id,u.name,(o.amount * o.price)as cost,o.c_time from t_user u leftjoin t_order o on o.uid=u.uid;

消费kafka的topic

linkis_spark_lineage

可以消费到如下数据:

{
   "id":"49a81e8e-51f2-5a05-96c3-bc22a1bc3f81","name":"SparkSQL::10.253.30.205","operations":{
   "write":{
   "outputSource":"file://ZBMac-C02CW08SM:8020/Users/jiangzhongzhou/Software/bigdata2.0/spark-3.5.0-bin-hadoop-3.2.x/spark-warehouse/test.db/t_order_detail","append":true,"id":"op-0","name":"InsertIntoHiveTable","childIds":["op-1"],"params":{
   "table":{
   "identifier":{
   "table":"t_order_detail","database":"test"},"storage":"Storage(Location: file:/Users/jiangzhongzhou/Software/bigdata2.0/spark-3.5.0-bin-hadoop-3.2.x/spark-warehouse/test.db/t_order_detail, Serde Library: org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, InputFormat: org.apache.hadoop.mapred.TextInputFormat, OutputFormat: org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, Storage Properties: [serialization.format=1])"}},"extra":{
   "destinationType":"hive"}},"reads":[{
   "inputSources":["file://ZBMac-C02CW08SM:8020/Users/jiangzhongzhou/Software/bigdata2.0/spark-3.5.0-bin-hadoop-3.2.x/spark-warehouse/test.db/t_user"],"id":"op-5",
标签: spark sql 大数据

本文转载自: https://blog.csdn.net/weixin_38231448/article/details/139883924
版权归原作者 麦田里的守望者· 所有, 如有侵权,请联系我们删除。

“Spark SQL 血缘解析方案”的评论:

还没有评论