摘要:本文整理自 Paimon Committer 邹欣宇老师在11月15日 Apache Spark & Paimon Meetup,助力 Lakehouse 架构生产落地上的分享。
文章介绍了 Paimon x Spark 的发展历程,企业搭建 Lakehouse 面临的挑战,Paimon 通过分层元数据架构支持 ACID 事务,主键表通过引入 LSM Tree 结构实现实时场景分钟级时延,且支持通过 Deletion Vector 进一步提升查询性能;在离线场景,提供了 Z-order Sort 和 Index 等能力加速非主键表查询。还详细介绍了 Paimon x Spark 在性能方面进行的系列优化,包含元数据加载优化、查询优化、Scan IO 优化等。
Apache Spark & Paimon 交流钉钉群:91535023202
01
Paimon x Spark 发展历程
Paimon 自 2022年从 Apache Flink 诞生,到进入 Apache 孵化器,再到最终成为 Apache 顶级项目,如今Paimon 已经构建了一个非常完善的生态系统:不仅实现了与多种数据源的无缝对接,还实现了与 Python API、以及 SQL Query 也就是和各式各样引擎的集成。在众多引擎集成当中,除了 Paimon 诞生而来的 Apache Flink 之外,Paimon 与 Apache Spark 之间的集成也同样有效、全面,甚至在某些方面有其独特的能力。
下面就让我们进入 Paimon Spark 的世界。下图展示的是 Paimon x Spark 的发展历程。
自 0.5 版本开始,社区开始大力投入 Paimon x Spark 引擎的对接。这一阶段不仅支持了基础的数据操作如 Insert Into / Overwrite 语句等,还支持了 Streaming API、Scheme Evolution 等,以及支持了主键表的 DML 操作,如 Delete、Update 甚至是 Merge Into 等高级特性。还支持通过扩展 Session Extension 去执行 Call Procedure 来完成各种表管理操作。可以说到了 0.6 版本,Paimon 与 Spark 之间的集成已具备相对成熟的流批处理能力,能够满足绝大多数实际业务场景的需求。
而从 0.7 版本开始,社区开始更加关注 Paimon Spark 的性能优化。例如,引入了通过 Compact Procedure 对数据做 Z-order 以改善数据的 Layout;利用 Analyze Table 获取表统计信息,并将其应用于 CBO 优化等等。同时,我们还针对真实用户场景和 TPC-DS 典型场景进行了大量的查询优化。还支持通过 Deletion Vector 模式进一步加速主键表的查询(这个后面会有一个章节会展开分享)。
在 0.9 版本,还新增 Bucket Join 并补齐了非主键表的各种能力,可以说在 0.9 版本 Paimon 与 Spark 之间的集成无论是在功能还是性能表现上均已生产 ready。
当然,社区并没有因此停下脚步。从 1.0 开始,社区开始对接 Spark 4.0 以及前沿的 Variant Type,可以说 Paimon Spark 社区始终在不断优化和迭代的路上。
02
Paimon:一站式全场景覆盖的数据湖存储
下图展示了从 Warehouse 到 Lakehouse 架构的演进过程。接下来,我将详细阐述搭建 Lakehouse 面临的挑战。
首先要确保支持 ACID 事务,因为 ACID 事务是一切数据正确性、完整性的基石。其次,除了基本的数据摄入外,数据更新也是一个很常见的能力。再者,我们需要对数据有足够强的掌控力,可以对其进行质量管理。最后同样重要的是,整个架构设计必须满足高性能要求。
近年来,为了应对上述挑战,湖格式技术应运而生,其中包括 Hudi、Iceberg、Delta Lake 以及我们今天重点讨论的 Paimon。这些湖格式其实采取一种类似的设计理念:在湖上基于低成本存储介质(如对象存储或块存储)构建一层元数据层。该元数据层以快照形式记录了特定时间点下表内所有文件的状态信息,为实现高效的数据更新与管理提供了基础支持。更重要的是,基于这样的架构设计,可以无缝集成多种高性能计算引擎,比如 Apache Spark 等,进而实现高性能读写。
03
**Paimon x Spark:极致性能优化 **
****
****3.1 ****ACID事务的核心:元数据层
首先是第一个挑战:支持ACID事务。在 Paimon 中,这一能力通过元数据层实现。如下图左侧架构所示,Paimon 元数据架构采用了分层架构,从顶层的 snapshot 到 manifest 再到底层的 data file。这种分层结构不仅支持不同版本间的数据复用,从而显著降低了存储成本;同时,也解锁了许多新的能力,比如可以指定某一个版本做时间旅行。此外,在与 Apache Spark 的集成方面,Paimon 通过扩展了 TVF 实现了增量查询能力。以及,我们还可以借助 Call Procedure 实现各种表管理操作,比如清理不用的版本或回滚到某个特定版本。
除了上述特性外,Paimon 还引入了一些比较有趣的能力。例如 Tag ,允许用户基于某一版本创建 Tag,在执行版本清理时,被标记的版本将不会被删除,除非手动删除 Tag。另外,Paimon 还提供了类似 Git 的 Branch 管理能力,用户可以通过创建新的分支基于新的分支做开发,最后通过 fast-forward procedure 把改动推进到主分支。
****
****3.2 ****数据摄入与更新
接下来介绍第二个挑战,数据摄入与更新。数据摄入比较简单,我们重点关注数据更新。回顾上一章节,我们已经构建了一层元数据层来记录当前快照中的文件列表。当我们需要对特定主键(例如PK=1)的记录进行更新时,一种直观的方法是遍历所有文件,然后找到 PK=1 所在的记录的全部文件,加载至内存中进行修改,最后再写成新的文件。这就是 Copy-on-Write 策略,其主要缺点在于即使仅需修改一条数据也需要重写整个文件,造成显著的写放大问题。另一种方案是把所有更新写入进新的文件中,然后在读时做合并,即所谓的 Merge-on-Read 。而这种方式需要在读时进行复杂合并操作,存在读放大问题。接下来,我们将深入分析 Paimon 是如何应对的,Paimon 目前有主键表和非主键表两类结构。
Paimon 的主键表其实是基于 MOR 的思想设计,如下图 Paimon 主键表结构。在定义好主键字段后,系统会根据该字段值的哈希结果将记录其分配到不同的 bucket 中存储,这样数据的 merge 范围就缩小到了一个 bucket 内。在同一 bucket 内部,Paimon 采用 LSM Tree 的架构组织数据文件。该结构的特点是,在数据写入时会依据主键对数据进行排序然后写入。读取数据时,由于数据已按序排列,只需对不同层级文件做一个简单的 merge sort 即可。
Paimon 在数据写入过程中还会定期执行 compact,以确保读写性能的稳定性。此外,得益于 compact 机制,Paimon 还提供了一些功能,例如支持定义 merge engine 和产生 change log。在与 Spark 集成时,Paimon 支持使用 Delete、Update 及 Merge 等语法对表进行修改。Paimon 还引入了 deletion vector 来加速主键表的查询。因此,主键表作为一种非常适合流处理场景的表结构,具备较高的读写性能,能够有效应对实时数据分析的需求。
在离线处理场景下,Paimon 也推出了非主键表,其架构设计类似传统 Hive 表,将数据按照分区、Bucket写入到文件中。而在此基础上,Paimon 还进行了多项优化。例如,通过 Z-order Sort 调整 layout,或支持创建通用的文件索引,在写入阶段对特定字段建立索引,从而在读取时能够实现高效的数据跳过。因此,非主键表非常适合大规模离线批处理。
值得注意的是,非主键表原本叫 Append 表,因为在其设计之初仅支持写入操作。而在与 Spark 集成时,我们采用了一种轻量级的 Copy-on-Write 机制,使它也能够支持 Delete、Update 及 Merge 等语法。因此,后来 Append 表逐渐改叫非主键表了。此外,我们在非主键表中也同样引入了 Deletion Vector 的能力。
****
****3.3 ****Paimon x Spark 性能优化
第三个挑战是高性能,我会结合 Spark 介绍一下 Paimon x Spark 上做了哪些性能优化。
我们首先从 Spark SQL 的执行链路开始探讨。下图是一个非常经典的 Spark SQL 执行链路。一条 SQL 语句会先被 Spark 的 Parser 解析成一个 Unresolved Logical Plan。随后,Analyzer 会从 Catalog 中获取表的 Schema 或者其它元数据,将 Unresolved Logical Plan 解析成 Resolved Logical Plan。相较于传统的 Hive 或 Parquet 表从 HMS 中获取元数据,Paimon 底层会维护自己的元数据,元数据由 Paimon 直接提供。
接下来进入 Query Optimization 层,在这里 Logical Plan 会基于 Spark 已经固定的规则和表的统计信息,根据 Cost 模型评估后选择生成最优的 Physical Plans 进行执行。对于 Paimon 来说,如果是查询作业,则表现为执行 Table Scan;而写入操作如 Insert / Update 则会完成数据的写入,并提交 Commit 生成一个新的快照。
整个流程中,有 3 个关键阶段会直接影响数据湖格式的读写性能,这也是 Paimon Spark 的优化重点:元数据加载优化、查询优化、Scan IO 优化。
****
元数据加载优化
Paimon 0.9 版本引入了 Caching Catalog 的概念。当一个简单的查询进入到 Paimon 时,会触发 Scan 操作。接着 Paimon 通过 Planning 获取需要的元数据。这一过程中,涉及到读取分层的 Snapshot、Manifest Files 以及相关的 File Meta。最终,这些信息会被组织生成 Splits,并分发给各个 Reader。
在这一过程中,如果元数据文件比较多,同时获取元数据的性能比较差,例如在对象存储场景,I/O 开销较大,元数据的加载过程可能会成为性能瓶颈,对于一些小 Query,元数据加载耗时会比较久。为了解决这个问题,Paimon 0.9 引入了 Caching Catalog ,它能够将已加载过的元数据缓存在 Catalog 中。这样一来,在缓存命中的情况下,第二次查询 Planning 的耗时就可以显著降低。此功能默认开启,并允许用户根据具体需求自定义缓存大小以及过期时间。
查询优化
在 Spark 中,查询优化通常被划分为两大类:基于规则的优化 RBO 和基于成本的优化 CBO。
RBO 优化
RBO 优化可以进一步细分为 Spark 内置优化和自定义扩展优化两部分。Spark 内置优化包括了 Spark 自身提供的多种优化技术,例如经典的 DPP、DSV2 Projection、Predicate、Limit Pushdown 等等。自定义扩展优化则可以引入额外的优化策略。例如 Paimon 就允许用户定义自己的优化规则,以支持更复杂的场景,如标量子查询的优化等等。
CBO 优化
CBO 优化一般是 Spark 利用表的统计信息,基于 Cost 模型选择代价最小的 Plan 作为最终执行 Plan。在传统的 Hive 表中,用户可以通过执行 Analyze 命令将统计信息同步到 HMS。这一点 Paimon 也同样支持,通过调用 Analyze 语句,Paimon 会计算表统计信息以及列统计信息,并且将它们保存在指定的文件里。然后,在查询的时候就可以读取这些统计信息做优化,比如在做 Join 操作时,可以通过 Cost based Join Reorder,根据统计信息选择最合适的 Join 顺序。
下面介绍在 0.9 版本新增的一个查询优化,Bucket Join。Join 是关系型数据库中一个非常常见的操作,也就是将两个表进行关联。而在 Spark 中,Join 一般分为两种。第一种是大表 Join 小表,采用 BHJ( Broadcast Hash Join),这种 Join 方式通常性能比较好。另外一种更加通用的方式就是大表 Join 大表。该场景下 Spark 会采用 Sort Merge Join,如下图所示,Spark 会先做一个Exchange 操作,也就是对数据做Shuffle 确保相同键值的数据位于同一分区,然后对该分区里面的数据进行排序,最后再对两个相同的分区进行 Merge Sort。整个过程中,不论是 Shuffle,还是 Sort,都带来比较大的开销。
为了解决这一性能瓶颈,自 Spark 3.3 起引入了 Bucket Join 机制。而对应到 Paimon,以下图为例,我们定义了两张表,其主键字段类型一致,且 Bucket 个数相同,当我们对这两张表基于主键进行 Join 时,可以直接对两个 Bucket 的数据进行 Join,无需额外进行 Shuffle、Sort 操作,显著提升执行效率。该能力要求 Spark 3.3 版本以上,且需手动开启。
Scan IO 优化
第三个优化方向是 Scan IO 优化,该过程可细分为三个层级。
Partition Pruning
首先第一层是最基本的 Partition Pruning,可以直接选择分区作为过滤条进行分区裁剪,或者利用 DPP ,依据 Spark 运行时生成的 Partition Filter 对分区进行裁剪。
File Pruning
第二层 File Pruning,也就是 Data Skipping,文件级别裁剪。Paimon 的 File 里会记录一些 Meta 比如字段的 Min / Max。查询时会将 Filter 下推到文件级别,根据 Min/Max 做一个过滤。同时 Paimon 还支持创建通用的索引如布隆索引,Bitmap 索引等,以针对特定字段进行查询过滤。此外,Paimon 还支持手动做 Compact 操作,对某一字段或多个字段进行排序,提升查询 Skipping 效率。
Row/Column Filtering
最后一层就是文件内部的 Scan 优化。Paimon 它的底层其实是采用是 parquet 或者 ORC 这样的列式存储。因此它其实天生支持列级别的 Column Pruning,即支持列裁,而 Paimon 还支持把 Filter下推到格式Reader 里,比如对 Parquet 格式实现 Row Group 级别的 Filter 过滤。
Nested Column Pruning
在 0.9 和 1.0 版本中,我们引入了多项新的 Scan IO 优化技术,其中第一项改进是 Nested Column Pruning。以 Parquet 格式为例,对于嵌套类型,如Struct、Map、Array 等,其底层仍然采用列式存储。因此,在处理具有复杂的嵌套结构时,如果仅需访问某一子嵌套的某字段,例如当我们需要从school.address中提取street字段的信息时,通过应用 Nested Column Pruning,可以直接读取目标子列字的数据,而无需读取整个列,从而显著降低 I/O 开销。目前此特性不仅已在 Spark x Paimon 中实现,我们还提供了 Core 层面通用接口,供其他计算引擎对接使用。
Aggregate Push Down
第二项改进是 Aggregate Push Down。如图所示,比如我们有一个以 day 为分区的 T 表,如果我们想去对它做一个 Count 操作,或者加一个分区的 Filter 做 Count。因为 Paimon 的文件元数据里其实已经保存了数据条数。因此其实我们没必要去再去做一个 Scan,我们可以直接查询元数据,然后对元数据做一个累加,快速地返回结果。
Deletion Vector
最后再介绍一个比较通用的优化,Deletion Vector,它的设计之初旨在应对主键表结构面临的挑战。
- 读时 Merge
主键表底层是 LSM 的结构,当遇到具有相同主键的数据分布于不同层级时,必须执行 Merge。尽管数据已经是有序的,但 Merge 还是会产生一定开销和一些连锁问题。
- 非主键字段 Filter 无法下推
举例说明,假设我们有一个 bucket,其中包含 6 个文件。假设主键是 id,现在需要查询 c>10且 id = 1的所有记录。如果我们将 c > 10这个 filter 下推,可能会导致不正确的结果。具体来说,假设 f2 中的统计信息显示 c 的最大值为 7,在这种情况下,根据过滤条件 c > 10,文件 f2 会被直接过滤,而 f5 则会被保留。因为 Paimon 采用 LSM 树结构,通常层级越低的数据越新。因此,两个文件 Merge 时,由于 f2 已被过滤掉,最终只会保留 f5 中的记录,返回 id = 1, c = 15。然而,实际情况是,id = 1, c = 5这条记录在时间上更晚生成,它会覆盖 id = 1, c =15这条记录。因此,最终查询结果应该是没有任何记录返回。由此可见,非主键字段由于 Merge 存在,不能简单地下推到文件层面,否则会导致查询结果不准确。
- 读并发受限于 Bucket 数
此外,LSM 树结构下,我们还做不到并发地 Merge 某个 Bucket,这意味着当 Bucket 个数较小而计算资源非常大的时候,最大并发仍然受限于 Bucket 的个数,导致我们无法充分利用集群资源。
- Native 引擎难以集成
最后一个挑战是 Native 引擎难以集成。以 Databricks 的 Photon 以及开源 Spark + Gluten + Velox 等为代表的 Native 引擎越来越流行。这些 Native 引擎对 Parquet 及 ORC 格式文件的读取集成已经非常完善了,但在文件之前需要 Merge 的场景下,它们之间变得难以适配,尤其是 Paimon 还支持丰富多样的 Merge Engine,这进一步提升了主键表与 Native 引擎集成的困难性。
为了应对这些挑战,我们设计了 Deletion Vector 模式。其核心思想就是标记删除。以一个简单的例子来说明:假设我们需要删除文件中 PK = 2 的记录,此时将生成一个 Deletion Vector 标记 PK = 2 的记录被删除,该文件会通过 Bitmap 记录被删除记录所在行号。测试表明,亿级别下 Bitmap 仍然可以保持较高的 Filter 性能。当新的数据(例如 f2)被写入时,Paimon 底层会通过 Lookup 机制找到历史文件中具有相同 ID 的记录,并为这些记录创建对应的 Deletion Vector,其底层利用 Bitmap 精确记录当前文件内哪些记录被删除,随后将此信息保存在 Index 文件中。
读取的时候,直接读取文件,然后与 Bitmap 对行号进行过滤,得到最终的文件。通过这种非常轻量的Filter 模式,数据之间无需再 Merge,上述问题也就不复存在。我们可以做非主键 Filter 下推,并发也不再受限制。此外,因为 Bitmap 结构可以与 Java、C++ 等语言很好适配,所以这套机制可以与 Native 引擎进行友好对接。我们在开启 Deletion Vector 模式后,针对特定场景下的查询性能提高了 3 至 5 倍;而当我们把 Reader 与 Native 引擎集成后,性能增益更是显著提升。
在完成前面的各项优化措施后,我们进行了一系列性能测试以验证其效果。首先,在 1TB TPC-DS 数据集的场景下,我们比较了多种格式 Load 及99条 Query 总耗时,结果显示 Paimon 的表现最为优异。此外,我们在更大的 100TB TPC-DS 数据集上进行了测试,使用 Spark 与 Paimon 相结合的方式,并将其性能与当前 TPC-DS 榜单第一名即 Databricks Photon + Delta 进行了对比,且是在相同 Core 数,一半内存的条件下,结果显示,Spark + Paimon 能够实现约 50% 的性能提升(该成绩目前已经提交给 TPC-DS 官方进行审核)。
通常来说,TPC-DS 测试包含两个主要阶段:Load 和 Query。除了这两个阶段之外,还会同时跑四条 PT 并行查询的 TT,以及 MT 来考验存储格式对数据更新的支持能力。在 MT 过程中,会执行 Insert 和 Delete 操作。通过采用 Paimon 结合 Deletion Vector,我们能够在执行删除操作时仅针对需要删除的记录生成相应的 Deletion Vector,无需对整个数据进行 Copy-on-Write。这一改进相较于 Databricks Photon + Delta 方案有很大提升,且在执行第二次 PT 时,整体查询性能几乎未受影响,证明 Deletion Vector 在查询方面基本没有回退。
04
湖仓架构下的典型案例
下面简单介绍湖仓架构下的典型案例 - Open Lakehouse 架构。它的特点是以 Paimon 作为统一的底层存储,在这之上可以自由选择喜欢的计算引擎,比如使用 Flink + Spark 进行流批读写,也可以选择使用 StarRocks 做 AD-HOC 查询。这个架构不仅实现了存储统一,还实现了流批一体,而且整个架构是完全开源的,里面所有组件如 Paimon、Spark 等都是开源的。
进一步看一个阿里云的客户案例,展示了该架构的具体实践。以 Paimon 作为统一的存储,用它来构建数仓的 ODS 层、DWD 层以及 DWS 层。通过阿里云 EMR Serverless Spark 做离线 ETL 完成整个数仓链路的搭建,同时也可以通过 StarRocks 做在线查询。这样的组合不仅简化了技术栈,降低了维护成本,同时也为企业提供了强大的数据处理能力和更广泛的业务应用场景。
05
未来展望与规划
最后简单分享下未来规划,之后我们仍将重点关注功能、性能以及易用性提升三个方向。
首先,在功能层面,Paimon 已经在 Core 层面支持了 View,我们会将这一能力扩展到 Spark,还会支持 Spark 4.0 提出的Variant 类型。Variant 类型专门设计用于处理半结构化数据(如JSON格式),相较于传统方法下直接以 String 形式存储,采用 Variant 能够显著加快 Json 解析速度。其次,在性能方面,我们将继续致力于查询优化,特别是 Read /Writer 以及更多 Spark DS V2 优化。最后是易用性方面,支持在查询的时候配置表级别的 Options,这个最新版本已经支持。此外还会集成最新的 Spark 4.0,并且兼容多版本Spark。
阿里云 EMR Serverless Spark 版是一款云原生,专为大规模数据处理和分析而设计的全托管 Serverless 产品。该产品内置 Fusion Engine,100% 兼容开源 Spark 编程接口,相比于开源 Spark 性能提升300%;提供 Notebook 及 SQL 开发、调试、发布、调度、监控诊断等一站式数据开发体验;支持弹性伸缩、按量付费,进一步降低计算成本!欢迎体验!https://x.sm.cn/HLawcSX
如果您在使用 EMR Serverless Spark 版的过程中遇到任何疑问,可加入钉钉群(群号:58570004119)咨询。
点击了解 EMR Serverless Spark~
版权归原作者 Apache Spark中国社区 所有, 如有侵权,请联系我们删除。