一、背景
1、数据仓库架构
从Hive表出仓到外部系统(ClickHouse、Presto、ES等)带来的复杂性和存储开发等额外代价,尽量减少这种场景出仓的必要性。
痛点:传统 T+1 任务
海量的TB级 T+ 1 任务延迟导致下游数据产出时间不稳定。
任务遇到故障重试恢复代价昂贵
数据架构在处理去重和 exactly-once语义能力方面比较吃力
架构复杂,涉及多个系统协调,靠调度系统来构建任务依赖关系
- 实时架构
在某些场景中,数据的价值随着时间的推移⽽逐渐减少。所以在传统⼤数据离线数仓的基础上,逐渐对数据的实时性提出了更⾼的要求。于是诞⽣了⼤数据实时数仓,并且衍⽣出了两种技术架构Lambda和Kappa。
1、Lambda架构:
从底层的数据源开始,经过Kafka、Flume等数据组件进⾏收集,然后分成两条线进⾏计算:⼀条线是进⼊流式计算平台(例如 Storm、Flink或者SparkStreaming),去计算实时的⼀些指标;另⼀条线进⼊批量数据处理离线计算平台(例如Mapreduce、Hive,Spark SQL),去计算T+1的相关业务指标,这些指标需要隔⽇才能看见。
①为什么Lambda架构要分成两条线计算?
假如整个系统只有⼀个批处理层,会导致⽤户必须等待很久才能获取计算结果,⼀般有⼏个⼩时的延迟。电商数据分析部门只能查看前⼀天的统计分析结果,⽆法获取当前的结果,这对于实时决策来说有⼀个巨⼤的时间鸿沟,很可能导致管理者错过最佳决策时机。
Lambda架构属于较早的⼀种架构⽅式,早期的流处理不如现在这样成熟,在准确性、扩展性和容错性上,流处理层⽆法直接取代批处理层,只能给⽤户提供⼀个近似结果,还不能为⽤户提供⼀个⼀致准确的结果。因此Lambda架构中,出现了批处理和流处理并存的现象。
在 Lambda 架构中,每层都有⾃⼰所肩负的任务。批处理层存储管理主数据集(不可变的数据集)和预先批处理计算好的视图:批处理层使⽤可处理⼤量数据的分布式处理系统预先计算结果。它通过处理所有的已有历史数据来实现数据的准确性。这意味着它是基于完整的数据集来重新计算的,能够修复任何错误,然后更新现有的数据视图。输出通常存储在只读数据库中,更新则完全取代现有的预先计算好的视图。
流处理层会实时处理新来的⼤数据:流处理层通过提供最新数据的实时视图来最⼩化延迟。流处理层所⽣成的数据视图可能不如批处理层最终⽣成的视图那样准确或完整,但它们⼏乎在收到数据后⽴即可⽤。⽽当同样的数据在批处理层处理完成后,在速度层的数据就可以被替代掉了。
②■ Lambda 架构痛点,缺点
同时维护实时平台和离线平台两套引擎,运维成本高
实时离线两个平台需要维护两套框架不同但业务逻辑相同代码,开发成本高
数据有两条不同链路,容易造成数据的不一致性
数据更新成本大,需要重跑链路
Lambda架构经历多年的发展,其优点是稳定,对于实时计算部分的计算成本可控,批量处理可以⽤晚上的时间来整体批量计算,这样把实时计算和离线计算⾼峰分开,这种架构⽀撑了数据⾏业的早期发展,但是它也有⼀些致命缺点,并在⼤数据3.0时代越来越不适应数据分析业务的需求。缺点如下:使⽤两套⼤数据处理引擎:维护两个复杂的分布式系统,成本⾮常⾼。
批量计算在计算窗⼝内⽆法完成:在IOT时代,数据量级越来越⼤,经常发现夜间只有4、5个⼩时的时间窗⼝,已经⽆法完成⽩天20多个⼩时累计的数据,保证早上上班前准时出数据已成为每个⼤数据团队头疼的问题。
数据源变化都要重新开发,开发周期长:每次数据源的格式变化,业务的逻辑变化都需要针对ETL和Streaming做开发修改,整体开发周期很长,业务反应不够迅速。
导致 Lambda 架构的缺点根本原因是要同时维护两套系统架构:批处理层和速度层。我们已经知道,在架构中加⼊批处理层是因为从批处理层得到的结果具有⾼准确性,⽽加⼊速度层是因为它在处理⼤规模数据时具有低延时性。
那我们能不能改进其中某⼀层的架构,让它具有另外⼀层架构的特性呢?例如,改进批处理层的系统让它具有更低的延时性,⼜或者是改进速度层的系统,让它产⽣的数据视图更具准确性和更加接近历史数据呢?另外⼀种在⼤规模数据处理中常⽤的架构——Kappa 架构,便是在这样的思考下诞⽣的。
2、Kappa架构
Kafka的创始⼈Jay Kreps认为在很多场景下,维护⼀套Lambda架构的⼤数据处理平台耗时耗⼒,于是提出在某些场景下,没有必要维护⼀个批处理层,直接使⽤⼀个流处理层即可满⾜需求,即下图所⽰的Kappa架构:
这种架构只关注流式计算,数据以流的⽅式被采集过来,实时计算引擎将计算结果放⼊数据服务层以供查询。可以认为Kappa架构是Lambda架构的⼀个简化版本,只是去除掉了Lambda架构中的离线批处理部分;
Kappa架构的兴起主要有两个原因:Kafka不仅起到消息队列的作⽤,也可以保存更长时间的历史数据,以替代Lambda架构中批处理层数据仓库部分。流处理引擎以⼀个更早的时间作为起点开始消费,起到了批处理的作⽤。
Flink流处理引擎解决了事件乱序下计算结果的准确性问题。Kappa架构相对更简单,实时性更好,所需的计算资源远⼩于Lambda架构,随着实时处理的需求在不断增长,更多的企业开始使⽤Kappa架构。但这不意味着kappa架构能够取代Lambda架构。
Lambda和kappa架构都有各⾃的适⽤领域;例如流处理与批处理分析流程⽐较统⼀,且允许⼀定的容错,⽤Kappa⽐较合适,少量关键指标(例如交易⾦额、业绩统计等)使⽤Lambda架构进⾏批量计算,增加⼀次校对过程。还有⼀些⽐较复杂的场景,批处理与流处理产⽣不同的结果(使⽤不同的机器学习模型,专家系统,或者实时计算难以处理的复杂计算,可能更适合Lambda架构。
Kappa 架构痛点
对消息队列存储要求高,消息队列的回溯能力不及离线存储
消息队列本身对数据存储有时效性,且当前无法使用 OLAP 引擎直接分析消息队列中的数据
全链路依赖消息队列的实时计算可能因为数据的时序性导致结果不正确
二、实时数仓建设需求
是否存在一种存储技术,
支持数据高效的回溯能力,
支持数据的更新,
又能够实现数据的批流读写,
并且还能够实现分钟级到秒级的数据接入
这也是实时数仓建设的迫切需求。实际上是可以通过对 Kappa 架构进行升级,以解决 Kappa 架构中遇到的一些问题,接下来主要分享当前比较火的数据湖技术—Iceberg。
二、数据湖
数据湖的相关理念和技术依然对于 Hadoop 生态的数仓理念和方法有比较大的变革:
从特性角度来看:结构化/非结构化数据往往是区分数据仓库和数据湖的重要特征,但是在网易严选,我们对非结构化数据的分析需求并不强烈。对于日志类数据,我们还是会做一些清洗并将其结构化后存入数据湖。我们不应该教条的理解数据湖 RAW Data 的定义。原始日志经过清洗/结构化后,只要其包括的信息没有变化,它依然是 RAW Data。
从使用角度来看:数据湖强调对原始数据的使用。灵活的使用数据湖数据,对严选的数据体系有两方面的价值:
提升数据开发效率。当我们需要探索数据洞察业务机会的时候,往往需要给数据开发提数据模型建设需求。由于数据探索的不确定性和临时性,几乎不可能为了临时的频繁的数据探索去排期建设数仓模型。同时数据模型建设完毕后,模型本身的能力就决定了我们探索数据价值的上限。
避免信息的丢失。数仓模型的建设往往是为了满足过去或者当前的业务需求,在模型的开发过程中,不可避免的丢失掉对当时认为价值不大的数据信息。但是,我们在构建机器学习模型或者探索数据的时候,往往需要更加充分的信息,而传统的数仓从根本上无法避免这个问题。
数据湖方案能够很好地解决以上两个问题。
从实时性角度来看:Delta/Iceberg/Huid 经常被宣传为数据湖解决方案。从我们的角度来看,这些技术很好的解决了数据湖 ACID 的问题,并且提供了较好的实时性。意味着我们可以通过这些技术以比较实时的方式提供可靠的原始数据访问能力给应用。 在 Delta Lake 这类存储格式出现之前,严选也自己构建了类似的比较高实时性的原始数据访问方案,但由于缺乏 ACID 能力的支持,使得实际落地的时候往往可靠性不足。
Delta/Iceberg/Hudi 也往往被认为是批量一体的存储解决方案。批流一体跟数据湖的概念糅合在一起,给很多大数据行业的同学带来了困扰。我们用 Delta 技术来构建原始数据层,那么如果用 Delta 构建近实时的 DWD 层,是不是也是数据湖?在这里需要做一个概念上的澄清:数据湖关注的是对原始数据高效、灵活的处理,DWD 及其他数仓分层是充分设计的数据模型,它并不符合我们对数据湖的定义和需求。
1. 为什么基于Iceberg可以构建湖仓一体架构?
对比开放的SQL引擎、存储格式如:Presto、Spark、ORC、Parquet和分布式数仓如:ClickHouse、SnowFlake对应层的实现,其实差别不大,开源分布式引擎一直在逐渐补足SQL Runtime和存储层的一些影响性能的高级特性,比如Runtime CodeGen,向量化执行引擎,基于statistic的CBO,索引等等,当前两者最大的一个不同在于对于数据组织的管理能力。
对于数据湖架构来说,数据文件在HDFS的分布组织是由写入任务决定的,而对于分布式数仓来说,数据一般是通过JDBC写入,数据的存储组织方式是由数仓本身决定的,所以数仓可以按照对于查询更加友好的方式组织数据的存储,比如对数据文件定期compact到合适的大小或者对数据进行合理排序和分组,对于大规模的数据来说,数据的优化组织可以大大提高查询的效率。Iceberg、Hudi、DeltaLake等新的表存储格式的出现,最主要的特性就是可以在HDFS上自组织管理表的metadata信息,从而提供了表数据的Snapshot及粗粒度的事务支持能力,基于此,我们可以在开放的查询引擎之外,异步地,透明地对Iceberg、Hudi、DeltaLake格式的数据进行重新的数据组织优化,从而达到了分布式数仓类似的效果。
2. 为什么选择Iceberg?
我们当时详细地调研了 Delta、Hudi、Iceberg 三个开源项目,并写了一篇调研报告。我们发现 Delta 和 Hudi 跟 Spark 的代码路径绑定太深,尤其是写入路径。毕竟当时这两个项目设计之初,都多多少少把 Spark 作为的他们默认的计算引擎了。而Apache Iceberg 的方向非常坚定,宗旨就是要做一个通用化设计的 Table Format。
因此,它完美地解耦了计算引擎和底下的存储系统,便于接入多样化计算引擎和文件格式,可以说正确地完成了数据湖架构中的 Table Format 这一层的实现。我们认为它也更容易成为 Table Format 层的开源事实标准。
另外一方面,Apache Iceberg 正在朝着流批一体的数据湖存储层发展,manifest 和snapshot 的设计,有效地隔离不同 transaction 的变更,非常方便批处理和增量计算。而我们知道 Apache Flink 已经是一个流批一体的计算引擎,可以说这二者的长远规划完美匹配,未来二者将合力打造流批一体的数据湖架构。
最后,我们还发现 Apache Iceberg 这个项目背后的社区资源非常丰富。在国外, Netflix、Apple、Linkedin、Adobe 等公司都有 PB 级别的生产数据运行在 Apache Iceberg 上;在国内,腾讯这样的巨头也有非常庞大的数据跑在 Apache Iceberg 之上,他们最大的一个业务每天有几十T的增量数据写入到 Apache Iceberg。社区成员同样非常资深和多样化,拥有来自其他项目的 7 位 Apache PMC,1 为 VP。
体现在代码和设计的 review 上,就变得非常苛刻,一个稍微大一点的 PR 涉及 100+ 的comment 很常见。在我个人看来,这些都使得 Apache Iceberg 的设计+代码质量比较高。
Iceberg、Hudi以及DeltaLake是基本同时期出现的开源表存储格式项目,整体的功能和定位也是基本相同,网上已经有很多相关对比介绍的文章,这里就不详细比较了,
我们选择Iceberg的主要原因是:Iceberg在三个里面是表存储格式抽象的最好的,包括读写引擎、Table Schema、文件存储格式都是pluggable的,我们可以进行比较灵活的扩展,并保证和开源以及之前版本的兼容性,基于此我们也比较看好该项目的长远发展。
Iceberg 的 table format 介绍
Iceberg 是为分析海量数据准备的,被定义为 table format,table format 介于计算层和存储层之间。
table format 主要用于向下管理在存储系统上的文件,向上为计算层提供一些接口。存储系统上的文件存储都会采用一定的组织形式,譬如读一张 Hive 表的时候,HDFS 文件系统会带一些 partition,数据存储格式、数据压缩格式、数据存储 HDFS 目录的信息等,这些信息都存在 Metastore 上,Metastore 就可以称之为一种文件组织格式。
一个优秀的文件组织格式,如 Iceberg,可以更高效的支持上层的计算层访问磁盘上的文件,做一些 list、rename 或者查找等操作。
Iceberg 的能力总结
基于快照的读写分离和回溯
流批统一的写入和读取
不强绑定计算存储引擎
ACID 语义及数据多版本
表, 模式及分区的变更
2.4 Iceberg 的文件组织格式介绍
下图展示的是 Iceberg 的整个文件组织格式:
首先最上层是 snapshot 模块。Iceberg 里面的 snapshot 是一个用户可读取的基本的数据单位,也就是说用户每次读取一张表里面的所有数据,都是一个snapshot 下的数据。
其次,manifest。一个 snapshot 下面会有多个 manifest,如图 snapshot-0 有两个 manifest,而 snapshot-1 有三个 manifest,每个 manifest 下面会管理一个至多个 DataFiles 文件。
第三,DataFiles。manifest 文件里面存放的就是数据的元信息,我们可以打开 manifest 文件,可以看到里面其实是一行行的 datafiles 文件路径。
从图上看到,snapshot-1 包含了 snapshop-0 的数据,而 snapshot-1 这个时刻写入的数据只有 manifest2,这个能力其实就为我们后面去做增量读取提供了一个很好的支持。
2.5 Iceberg 读写过程介绍
■ Apache Iceberg 读写
首先,如果有一个 write 操作,在写 snapsho-1 的时候,snapshot-1 是虚线框,也就是说此时还没有发生 commit 操作。这时候对 snapshot-1 的读其实是不可读的,因为用户的读只能读到已经 commit 之后的 snapshot。发生 commit 之后才可以读。同理,会有 snapshot-2,snapshot-3。
Iceberg 提供的一个重要能力,就是读写分离能力。在对 snapshot-4 进行写的时候,其实是完全不影响对 snapshot-2 和 snapshot-3 的读。Iceberg 的这个能力对于构建实时数仓是非常重要的能力之一。
同理,读也是可以并发的,可以同时读 s1、s2、s3 的快照数据,这就提供了回溯读到 snapshot-2 或者 snapshot-3 数据的能力。Snapshot-4 写完成之后,会发生一次 commit 操作,这个时候 snapshot-4 变成了实心,此时就可以读了。另外,可以看到 current Snapshot 的指针移到 s4,也就是说默认情况下,用户对一张表的读操作,都是读 current Snapshot 指针所指向的 Snapshot,但不会影响前面的 snapshot 的读操作。
■ Apache Iceberg 增量读
接下来讲一下 Iceberg 的增量读。首先我们知道 Iceberg 的读操作只能基于已经提交完成的 snapshot-1,此时会有一个 snapshot-2,可以看到每个 snapshot 都包含前面 snapshot 的所有数据,如果每次都读全量的数据,整个链路上对计算引擎来说,读取的代价非常高。
如果只希望读到当前时刻新增的数据,这个时候其实就可以根据 Iceberg 的 snapshot 的回溯机制,仅读取 snapshot1 到 snapshot2 的增量数据,也就是紫色这块的数据可以读的。
同理 s3 也是可以只读黄色的这块区域的数据,同时也可以读 s3 到 s1 这块的增量数据,基于 Flink source 的 streaming reader 功能在内部我们已经实现这种增量读取的功能,并且已经在线上运行了。刚才讲到了一个非常重要的问题,既然 Iceberg 已经有了读写分离,并发读,增量读的功能,Iceberg 要跟 Flink 实现对接,那么就必须实现 Iceberg 的 sink。
■ 实时小文件问题
社区现在已经重构了 Flink 里面的 FlinkIcebergSink,提供了 global committee 的功能,我们的架构其实跟社区的架构是保持一致的,曲线框中的这块内容是 FlinkIcebergSink。
在有多个 IcebergStreamWriter 和一个 IcebergFileCommitter 的情况下,上游的数据写到 IcebergStreamWriter 的时候,每个 writer 里面做的事情都是去写 datafiles 文件。
当每个 writer 写完自己当前这一批 datafiles 小文件的时候,就会发送消息给 IcebergFileCommitter,告诉它可以提交了。而 IcebergFileCommitter 收到信息的时,就一次性将 datafiles 的文件提交,进行一次 commit 操作。
commit 操作本身只是对一些原始信息的修改,当数据都已经写到磁盘了,只是让其从不可见变成可见。在这个情况下,Iceberg 只需要用一个 commit 即可完成数据从不可见变成可见的过程。
■ 实时小文件合并
Flink 实时作业一般会长期在集群中运行,为了要保证数据的时效性,一般会把 Iceberg commit 操作的时间周期设成 30 秒或者是一分钟。当 Flink 作业跑一天时,如果是一分钟一次 commit,一天需要 1440 个 commit,如果 Flink 作业跑一个月commit 操作会更多。甚至 snapshot commit 的时间间隔越短,生成的 snapshot 的数量会越多。当流式作业运行后,就会生成大量的小文件。
这个问题如果不解决的话,Iceberg 在 Flink 处理引擎上的 sink 操作就不可用了。我们在内部实现了一个叫做 data compaction operator 的功能,这个 operator 是跟着 Flink sink 一起走的。当 Iceberg 的 FlinkIcebergSink 每完成一次 commit 操作的时候,它都会向下游 FileScanTaskGen 发送消息,告诉 FileScanTaskGen 已经完成了一次 commit。
FileScanTaskGen 里面会有相关的逻辑,能够根据用户的配置或者当前磁盘的特性来进行文件合并任务的生成操作。FileScanTaskGen 发送到 DataFileRewitre 的内容其实就是在 FileScanTaskGen 里面生成的需要合并的文件的列表。同理,因为合并文件是需要一定的耗时操作,所以需要将其进行异步的操作分发到不同的 task rewrite operator 中。
上面讲过的 Iceberg 是有 commit 操作,对于 rewrite 之后的文件需要有一个新的 snapshot 。这里对 Iceberg 来说,也是一个 commit 操作,所以采用一个单并发的像 commit 操作一样的事件。
整条链路下来,小文件的合并目前采用的是 commit 操作,如果 commit 操作后面阻塞了,会影响前面的写入操作,这块我们后面会持续优化。现在我们也在 Iceberg 社区开了一个 design doc 文档在推进,跟社区讨论进行合并的相关工作。
三、Flink+Iceberg 构建实时数仓 流批一体
版权归原作者 四月天03 所有, 如有侵权,请联系我们删除。