0


【SQL解析】- SQL血缘分析实现篇01

文章目录

前言

在数据中台中,通常我们面对的是海量的基于数仓的ETL、取数、建模、业务调用等等的数据操作任务,面对错综复杂的调度依赖关系,当出现问题需要快速追溯数据链路、以及对热点资源的盘点治理,人工进行整理效率太低。所以目前一般的数据平台都会实现一个数据地图(任务/SQL维度的血缘)的产品帮助开发/运维更好的进行数据治理。

今天主要分享下SQL维度的血缘实现,对于一个数据开发任务一般对应一个脚本开发任务,离线的可能就是HiveSQL脚本,实时的可能就是Imapla;presto;SparkSQL;FlinkSQL等等脚本任务,我们除了可以在任务的维度进行血缘关系捕获(一般有调度平台可以维护任务的依赖关系),还可以基于离线或者实时hook捕获对应引擎(比如hive;presto;flink等)执行的SQL脚本来进行sql血缘的分析(具体的场景看公司的需求)。

目前来讲,Hive;Presto;Flink等SQL模块大部分的解析引擎都使用了ANTLR进行AST树的生成,然后进行一系列的优化,再生成对应引擎可执行的操作任务,那么我们是不是可以直接复用他们解析的这块功能?答案肯定是可以的,比如Apache Altas产品就是大量使用Hook插件,复用这些计算引擎提供的SQL解析与血缘分析的接口服务,但是有没有缺点呢?肯定是有的,大量的Hook插件对于计算引擎的侵入以及性能的影响肯定是有的。

得益于现在很多的开源框架,对于SQL AST的生成这块都有好几个框架进行了实现,使得我们不必再局限于各个计算引擎本身所自带的SQL解析之中。比如现在的Druid(官方号称是比antlr;javacc等快10-100倍数)、gudusoft、Antlr、javacc等等都是比较优秀的解析框架,接下来,我随便分享几个。。。

Hive自带的解析模块

Hive源码中parser模块,主要就是基于antrl文件(.g)词法(比如Lexer.g)语法(比如HiveParser)等文件,生成各种解析以及AST访问遍历的实现类。antlr具体的实现后续单独讨论。
在这里插入图片描述

对于Hive血缘解析的这块可以参考以下这个血缘的Hook (Altas就是类似这种方式实现的血缘采集)
在这里插入图片描述
看以下已有的方法:生成边;创建目标节点/来源节点;获取边以及节点:
在这里插入图片描述

边(关系);来源表;目标表; sql表达式;表类型。。

在这里插入图片描述
在这里插入图片描述
上述方法对于字段级别表级别的血缘数据采集已经比较完善了,而且代码的话可以直接复用或者借鉴自己实现一个采集的血缘HOOK。开源社区也有单独将Hive解析部分抠出来的项目,大家自行找下,hook使用非本篇重点这里不再多讲…

gudusoft 解析方案

gudusoft有一个商业化的应用产品叫SqlFlow

在这里插入图片描述

注意的几个问题:

  1. 这个产品是商业产品,它是有使用期限的,好像默认是90天
  2. 如果你要测试SQL,需要先登陆(可能需要和谐上网才能注册账号,这个如果不会的可以留言我发给你工具) 官方的声明如下: https://www.sqlparser.com/download.php

在这里插入图片描述
在这里插入图片描述

其实不要被官方说明唬住了,相应的jar包我们还是能用的且不收费,比如 gudusoft.gsqlparser-2.5.1.9.jar(稍后我传上去),这个需要我们将解析的结果进行提取,或者用它的工具方法进行解析等等(完全免费)。
先看下sqlflow的解析功能吧

1.支持的解析功能调研

1.1 从select语句中直接解析血缘关系(也能支持被函数处理的select 字段)

能支持的粒度: 表级别与字段级别(包括函数)

-- case 1 : mysql
SELECT `user`.name,
       `order`.price
FROM   `user`,
       `order`
WHERE  `user`.id = `order`.uid

解析字段血缘结果:
name(`user`) -> name(RS-1)
price(`order`) -> price(RS-1)

在这里插入图片描述

支持show function:
在这里插入图片描述

-- case 2 :mysql
SELECT `user`.name,
       pow(floor(`order`.price),2) as calculated_price
FROM   `user`,
       `order`
WHERE  `user`.id = `order`.uid

解析字段血缘结果(包含中间被函数转换的过程):
name(`user`) -> name(RS-1)
price(`order`) -> floor -> pow -> calculated_price(RS-1)

在这里插入图片描述

1.2 包含子查询的select语句

SELECT temp.name, max(temp.price) as price 
FROM (
  SELECT `user`.name,
         `order`.price
  FROM   `user`,
         `order`
  WHERE  user.id = order.uid 
 ) as temp
GROUP BY temp.name

解析字段血缘结果:
name(`user`) -> name(result of temp) -> name(RS-1)
price(`order`) -> price(result of temp) -> price(RS-1)

1.3 复杂语句的支持

支持多种

在这里插入图片描述
在这里插入图片描述

2. 支持引擎调研

源码里支持的种类如下: 已覆盖 Hive、Mysql、SparkSQL、Presto、impala这些常用的SQL引擎

  dbvaccess,
    dbvansi,
    dbvathena,
    dbvazuresql,
    dbvbigquery,
    dbvcouchbase,
    dbvdax,
    dbvdb2,
    dbvexasol,
    dbvfirebird,
    dbvgeneric,
    dbvgreenplum,
    dbvhana,
    dbvhive,
    dbvimpala,
    dbvinformix,
    dbvmdx,
    dbvmysql,
    dbvmssql,
    dbvnetezza,
    dbvodbc,
    dbvopenedge,
    dbvoracle,
    dbvpostgresql,
    dbvpresto,
    dbvredshift,
    dbvsnowflake,
    dbvsoql,
    dbvsparksql,
    dbvsybase,
    dbvteradata,
    dbvtrino,
    dbvvertica;

3. SQL测试案例

4. 调研结论

  1. 目前大部分引擎的适配其实做的都OK,识别率挺准的
  2. 解析的粒度可以精确到字段,函数
  3. 对于SELECT * 这种字段缺失的,它其实也能处理,前提是会员(需要连接到元数据库进行提取)
  4. 目前开源社区里其实已经分享了很多的实现解析案例,我们可以直接进行借鉴
  5. 开源的jar包解析的效果,我测试了下和sqlflow解析效果(识别率)一样

java版, python版demo都有, 本篇先不展开,有兴趣的可以自己下载研究下
在这里插入图片描述

5. 代码demo

5.1 自己解析

自己解析需要自己去遍历AST树,在每一层的信息也需要自己去保存,上下层之间的关系也需要自己维护,直接写比较麻烦点(后面优化下代码设计其实也还可以):

import gudusoft.gsqlparser.EDbVendor;
import gudusoft.gsqlparser.TGSqlParser;
import gudusoft.gsqlparser.TStatementList;
import gudusoft.gsqlparser.nodes.TExpression;
import gudusoft.gsqlparser.nodes.TResultColumn;
import gudusoft.gsqlparser.nodes.TResultColumnList;
import gudusoft.gsqlparser.stmt.TCreateTableSqlStatement;
import gudusoft.gsqlparser.stmt.TSelectSqlStatement;

import java.util.LinkedList;
import java.util.stream.StreamSupport;

/**
 * @author pushkin
 * @version v1.0.0
 * @date 2022/6/1 16:20
 * <p>
 * Modification History:
 * Date         Author          Version            Description
 * ------------------------------------------------------------
 */

public class DemoD {
    public static void main(String[] args) {
        // 注SQL已脱敏
        String sql1 = "CREATE TABLE tmp.jinpushi_066\n" +
                "AS\n" +
                "SELECT \n" +
                "T1.a1, " +
                "T1.a2, " +
                "T1.a3, " +
                "T1.a4 " +
                ",T2.b1" +
                ",T2.b2 \n" +
                ",T3.b3\n" +
                ",T2.b4\n" +
                ",T1.a5\n" +
                "FROM dwd.jinpushi_01 T1\n" +
                "LEFT JOIN dim.jinpushi_02 T2\n" +
                "ON T1.id = T2.id\n" +
                "LEFT JOIN ods.jinpushi_03 T3\n" +
                "ON T1.code = T3.value\n" +
                "AND T3.tyoe = 12306\n" +
                ";\n" +
                "INSERT OVERWRITE TABLE dwb.jinpushi_05 \n" +
                "SELECT * FROM tmp.jinpushi_066\n" +
                ";";

        TGSqlParser sqlparser = new TGSqlParser(EDbVendor.dbvhive);
        sqlparser.sqltext = sql1;
        int ret = sqlparser.parse();
        if (ret == 0) {
            // 解析出所有语句
            TStatementList stmts = sqlparser.getSqlstatements();

            // 拿到create table语句的实例
            TCreateTableSqlStatement stmt = (TCreateTableSqlStatement) stmts.get(0);

            // 从create table语句的子查询中,拿到select语句的实例,再获取column
            TSelectSqlStatement subquery = stmt.getSubQuery();
            TResultColumnList columns = subquery.getResultColumnList();
            LinkedList<String>[] lineages = new LinkedList[columns.size()];

            for (int i = 0; i < columns.size(); i++) {
                TResultColumn column = columns.getResultColumn(i);
                LinkedList<String> lineage = lineages[i] = new LinkedList<>();
                lineage.addFirst(String.format("%s(%s)", column.getDisplayName(), stmt.getTableName()));
                lineage.addFirst(String.format("%s(RS-1)", column.getDisplayName()));

                String columnName = "";
                if (column.getExpr() != null) {
                    TExpression expr = column.getExpr();
                    while (expr.getFunctionCall() != null) {
                        expr = expr.getFunctionCall().getArgs().getExpression(0);
                    }
                    columnName = expr.toString();
                }

                String[] pair = columnName.split("\\.");
                if (pair.length == 2) {
                    // 有alias,在alias对应的select语句中搜索
                    String prefix = pair[0];
                    String columnDisplayName = pair[1];
                    lineage.addFirst(String.format("%s(%s)", columnDisplayName, prefix));
                    StreamSupport
                            .stream(subquery.tables.spliterator(), false)
                            .filter(t -> t.getAliasClause().toString().equalsIgnoreCase(prefix))
                            .findFirst().ifPresent(table -> {
                                TSelectSqlStatement subquery1 = table.subquery;
                                if (subquery1 != null) {
                                    TResultColumnList resultColumnList = subquery1.getResultColumnList();
                                    resultColumnList.forEach(tableColumn -> {
                                        if (columnDisplayName.equalsIgnoreCase(tableColumn.getDisplayName())) {
                                            if (tableColumn.getExpr().getFunctionCall() == null) {
                                                lineage.addFirst(String.format("%s(%s)", columnDisplayName, table.subquery.tables.getTable(0).getTableName()));
                                            } else {
                                                lineage.addFirst(String.format("%s(%s)",
                                                        tableColumn.getExpr().getFunctionCall().getArgs().getElement(0),
                                                        table.subquery.tables.getTable(0).getTableName()));
                                            }
                                        }
                                    });
                                }
                            });
                } else if (pair.length == 1) {
                    // 没有alias,在所有的select语句中搜索
                    String columnDisplayName = pair[0];
                    StreamSupport
                            .stream(subquery.tables.spliterator(), false)
                            .filter(t -> {
                                if (t.subquery != null) {
                                    for (int j = 0; j < t.subquery.getResultColumnList().size(); j++) {
                                        if (t.subquery.getResultColumnList().getResultColumn(j).getDisplayName().equalsIgnoreCase(columnDisplayName)) {
                                            return true;
                                        }
                                    }
                                }
                                return false;
                            })
                            .findFirst().ifPresent(table -> {
                                lineage.addFirst(String.format("%s(%s)", columnDisplayName, table.getAliasClause()));
                                lineage.addFirst(String.format("%s(%s)", columnDisplayName, table.subquery.tables.getTable(0)));
                            });
                }
            }

            for (LinkedList<String> lineage : lineages) {
                System.out.println(String.join(" -> ", lineage));
            }

        } else {
            System.out.println(sqlparser.getErrormessage());
        }

    }
}

运行结果:

a1(T1) -> a1(RS-1) -> a1(tmp.jinpushi_066)
a2(T1) -> a2(RS-1) -> a2(tmp.jinpushi_066)
a3(T1) -> a3(RS-1) -> a3(tmp.jinpushi_066)
a4(T1) -> a4(RS-1) -> a4(tmp.jinpushi_066)
b1(T2) -> b1(RS-1) -> b1(tmp.jinpushi_066)
b2(T2) -> b2(RS-1) -> b2(tmp.jinpushi_066)
b3(T3) -> b3(RS-1) -> b3(tmp.jinpushi_066)
b4(T2) -> b4(RS-1) -> b4(tmp.jinpushi_066)
a5(T1) -> a5(RS-1) -> a5(tmp.jinpushi_066)

5.2 官方解析工具

importgudusoft.gsqlparser.EDbVendor;importgudusoft.gsqlparser.dlineage.DataFlowAnalyzer;importgudusoft.gsqlparser.dlineage.dataflow.model.json.Dataflow;importgudusoft.gsqlparser.dlineage.dataflow.model.xml.dataflow;importgudusoft.gsqlparser.dlineage.util.RemoveDataflowFunction;publicclassTestGSql{publicstaticvoidmain(String[] args){String sql2 ="CREATE TABLE tmp.tmp_table_a AS \n"+"SELECT T1.id, T1.age, T2.name \n"+"FROM dwd.table1 T1 \n"+"LEFT JOIN \n"+"dwd.table2 T2 ON T1.id = T2.id";System.out.println(getSqlLineage(sql2));}privatestaticDataflowgetSqlLineage(String sql){DataFlowAnalyzer dlineage =newDataFlowAnalyzer(sql,EDbVendor.dbvhive,false);
        dlineage.setSqlEnv(null);
        dlineage.setShowJoin(true);
        dlineage.setIgnoreRecordSet(true);
        dlineage.setLinkOrphanColumnToFirstTable(false);
        dlineage.setTextFormat(false);
        dlineage.generateDataFlow();
        dataflow dataFlow = dlineage.getDataFlow();
        dataFlow.getDatabases();
        dataflow dataflow =newRemoveDataflowFunction().removeFunction(dlineage.getDataFlow());Dataflow flow =DataFlowAnalyzer.getSqlflowJSONModel(dataflow);return flow;}}

调试结果:
在这里插入图片描述
表(包括对应的字段):
在这里插入图片描述
关系
在这里插入图片描述
在这里插入图片描述
根据以上的信息,其实我们完全可以从中解析出我们想要的结果(表血缘;字段级别血缘)
唯一不足的是对于select * 我们不太好解析,这块后续我也会提供具体实现方案

Druid血缘解析方案

pass
等待我有时间写吧

自研解析Hathor项目

https://github.com/Shkin1/hathor
等待我有时间写吧

生产环境最强解析方案思路

等待我有时间写吧
pass

标签: sql hive 大数据

本文转载自: https://blog.csdn.net/qq_31557939/article/details/126277212
版权归原作者 Pushkin. 所有, 如有侵权,请联系我们删除。

“【SQL解析】- SQL血缘分析实现篇01”的评论:

还没有评论