0


Openlineage数据地图

文章目录


一、什么是Openlineage

Openlineage 是一款数据血缘采集和分析的开源框架。不同的元数据管理系统都有着自己的一套元数据定义,Openlineage的设计相当于在中间架设了一层格式转换,类似于一个转接口。对于元数据的统一来说,Openlineage的出现让使用者无需关注背后的元数据系统是什么,使用的是哪个元数据插件进行监听,直接委托Openlineage进行处理就可以得到标准的元数据。Openlineage的出现让监听数据和下发数据直接解耦,举个例子,Atlas的监听器可以通过Openlineage转换并且插入Marquez(只要实现了Atlas和Marquez的元数据格式转换)。

官方原图
Before
在这里插入图片描述


二、Openlineage 元数据定义

OpenLineage将数据执行的pipeline抽象为三层,分别是Job,Run,Dataset。
Job可以抽象理解为我计划做什么;Run可以理解为这个计划每次的执行;Dateset是每次执行中所消耗和产生的物品。
在这里插入图片描述

2.1 Job Facets

Job (作业)是 生产和消费数据的一个管道流程,例如,如果crontab运行的Python脚本每天执行CREATE TABLE x AS SELECT*FROM y查询,那么Python脚本本身就是Job。
Job Facets含义sourceCodeLocation捕获源代码的位置和版本sourceCode完整的源代码

2.2 Run Facets

Run (运行时)是Job的一个实例,每个Run都会有一个独特的UUID,例如,如果crontab运行一个Python脚本,该脚本每天重复一个查询,那么每天都会有单独的Run重新生成
Run Facets含义nominalTime执行计划的实际时间parent记录父级job 和父级runerrorMessage捕获执行时错误sql如果有sql则捕获sql

2.3 Dataset Facets

DateSet (数据集)是数据的抽象表示,可以是一张表也可以是一个桶的对象,比如每天执行CREATE TABLE x AS SELECT*FROM y查询,那么x和y表就是数据集。
Dateset Facets含义schema数据的schemadataSource数据源lifecycleStateChange数据源生命周期 (create,atler等操作)version数仓定义的数据集版本(Input)dataQualityMetrics输入数据集级别和字段级别的数据质量指标(最大、最小值)(Input)dataQualityAssertions输入数据集或字段运行数据测试的结果(Output)outputStatistics输出数据指标统计(行数或者大小)


三、Marquez

Marquez 是一款遵循OpenLineage标准的元数据管理系统。用于收集、聚合和可视化数据生态系统的元数据。它维护了数据集的消费和生产来源,提供了对作业运行时间和数据集访问频率的全局可见性,数据集生命周期管理的集中化等等。Marquez由WeWork发布并开源。

官网原图
Marquez

元数据具体实践样例
在这里插入图片描述


四、Openlineage 与 Marquez 交互

4.1.部署Docker Marquez

git clone [email protected]:MarquezProject/marquez.git && cd marquez
./docker/up.sh
Marquez 0.21.0 以下版本url解析会存在问题,没有对'/'进行转义,会导致name中包含'/'的实体无法访问

4.2 通过curl请求Marquez

这里分别模拟 请求输入数据(Run开始)和请求结果数据(Run结束)

$ curl -XPOSThttp://localhost:5000/api/v1/lineage \
-H'Content-Type: application/json' \
-d '{"eventType":"START","eventTime":"2020-12-28T19:52:00.001+10:00","run":{"runId":"d46e465b-d358-4d32-83d4-df660ff614dd"},"job":{"namespace":"my-namespace","name":"my-job"},"inputs":[{"namespace":"my-namespace","name":"my-input"}],"producer":"https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"}'
$ curl -XPOSThttp://localhost:5000/api/v1/lineage \
-H'Content-Type: application/json' \
-d '{"eventType":"COMPLETE","eventTime":"2020-12-28T20:52:00.001+10:00","run":{"runId":"d46e465b-d358-4d32-83d4-df660ff614dd"},"job":{"namespace":"my-namespace","name":"my-job"},"outputs":[{"namespace":"my-namespace","name":"my-output","facets":{"schema":{"_producer":"https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client","_schemaURL":"https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/spec/OpenLineage.json#/definitions/SchemaDatasetFacet","fields":[{"name":"a","type":"VARCHAR"},{"name":"b","type":"VARCHAR"}]}}}],"producer":"https://github.com/OpenLineage/OpenLineage/blob/v1-0-0/client"}'

官网结果图
在这里插入图片描述

4.3 Api && Client

Openlineage 不仅可以通过API传递数据,还支持了python和java的语法库。
Marquez API 定义, Java 语法库,Python语法库
支持 Console、Http和Kafka三种传输方式,采用yaml文件进行客户端的参数读取。
传输协议

4.4 Java 实现简单客户端请求代码

这里直接引用官网http-transport样例

// using http transportOpenLineageClient client =OpenLineageClient.builder().transport(HttpTransport.builder().uri("http://localhost:5000").build()).build();// create one start event for testingRunEvent event =buildEvent(EventType.START,null);// emit the event
            client.emit(event);// another event to COMPLETE the run
            event =buildEvent(EventType.COMPLETE, event.getRun().getRunId());// emit the second COMPLETE event
            client.emit(event);

Event构建样例

// sample code to build eventpublicstaticRunEventbuildEvent(EventType eventType,UUID runId){ZonedDateTime now =ZonedDateTime.now(ZoneId.of("UTC"));URI producer = URI.create("producer");OpenLineage ol =newOpenLineage(producer);if(runId ==null){
            runId = UUID.randomUUID();}// run facetsRunFacets runFacets =
        ol.newRunFacetsBuilder().nominalTime(
                ol.newNominalTimeRunFacetBuilder().nominalStartTime(now).nominalEndTime(now).build()).build();// a run is composed of run id, and run facetsRun run = ol.newRunBuilder().runId(runId).facets(runFacets).build();// job facetsJobFacets jobFacets = ol.newJobFacetsBuilder().build();// jobString name ="jobName";String namespace ="namespace";Job job = ol.newJobBuilder().namespace(namespace).name(name).facets(jobFacets).build();// input datasetList<InputDataset> inputs =Arrays.asList(
            ol.newInputDatasetBuilder().namespace("ins").name("input").facets(
                    ol.newDatasetFacetsBuilder().version(ol.newDatasetVersionDatasetFacet("input-version")).build()).inputFacets(
                    ol.newInputDatasetInputFacetsBuilder().dataQualityMetrics(
                            ol.newDataQualityMetricsInputDatasetFacetBuilder().rowCount(10L).bytes(20L).columnMetrics(
                                    ol.newDataQualityMetricsInputDatasetFacetColumnMetricsBuilder().put("mycol",
                                            ol.newDataQualityMetricsInputDatasetFacetColumnMetricsAdditionalBuilder().count(10D).distinctCount(10L).max(30D).min(5D).nullCount(1L).sum(3000D).quantiles(
                                                    ol.newDataQualityMetricsInputDatasetFacetColumnMetricsAdditionalQuantilesBuilder().put("25",52D).build()).build()).build()).build()).build()).build());// output datasetList<OutputDataset> outputs =Arrays.asList(
                ol.newOutputDatasetBuilder().namespace("ons").name("output").facets(
                        ol.newDatasetFacetsBuilder().version(ol.newDatasetVersionDatasetFacet("output-version")).build()).outputFacets(
                        ol.newOutputDatasetOutputFacetsBuilder().outputStatistics(ol.newOutputStatisticsOutputDatasetFacet(10L,20L)).build()).build());// run state udpate which encapsulates all - with START event in this caseRunEvent runStateUpdate =
        ol.newRunEventBuilder().eventType(eventType).eventTime(now).run(run).job(job).inputs(inputs).outputs(outputs).build();return runStateUpdate;

五、总结

该篇仅作为 Openlineage 的 入门介绍,感觉Openlineage的设计理念十分的超前,但是遵循该协议的元数据系统目前只有Marquez。后续会介绍Openlineage的扩展支持组件,包括Spark以及Flink扩展(代码+实现)。目前博主初步实现和Atlas 元数据地图的实体交互,正在测试,会在后面继续书写系列专题。

参考链接:
https://openlineage.io/
https://marquezproject.github.io/

标签: java big data

本文转载自: https://blog.csdn.net/weixin_43947468/article/details/129593234
版权归原作者 老扎儿 所有, 如有侵权,请联系我们删除。

“Openlineage数据地图”的评论:

还没有评论