hudi详解
Hudi将流处理引入大数据, 提供新鲜数据,同时比传统批处理效率高一个数据量级。
Apache Hudi是建立在Hadoop/HDFS之上的一个数据存储和处理框架。它可以用于数据湖、数据仓库和流式ETL等场景。Hudi提供了一个统一的数据模型,支持数据的插入、更新和删除操作,同时保证了数据的幂等性和可恢复性。此外,Hudi还提供了基于时间和增量的数据快照和增量查询功能,方便用户进行数据分析和处理。
特性
(1)快速upsert,可插入索引
(2)以原子方式操作数据并具有回滚功能
(3)写入器之和查询之间的快照隔离
(4)savepoint用户数据恢复的保存点
(5)管理文件大小,使用统计数据布局
(6)异步压缩行列数据
(7)具有时间线来追踪元数据血统
(8)通过聚类优化数据集
架构介绍:
Hudi数据模型
Hudi将数据分为三个层次:文件(File)、记录(Record)和时间线(Timeline)。
文件(File):文件是存储在HDFS上的数据单元,包含了一组记录。每个文件都有一个唯一的文件ID,用于标识文件。文件可以追加数据记录,也可以被删除。
记录(Record):记录是文件的最小数据单元,可以是JSON、CSV等格式。每个记录都有一个唯一的键(Key),用于标识记录。记录可以插入、更新和删除。
时间线(Timeline):时间线用于管理文件的版本。每个文件都有一个提交时间(Commit Time),通过时间线可以追溯文件的变更历史。
Timeline
hudi的核心是维护不同时刻在表上执行的所有操作的时间表,提供表的即时视图,同时还有效地支持按时间顺序检索数据。Hudi的时刻由以下组件组成:
(1)Instant action: 在表上执行的操作类型
(2)Instant time: 即时时间,通常是一个时间戳,按照action的开始时间单调递增
(3)State: 时刻的当前状态
hudi保证在时间线上的操作都是基于即时时间的,两者的时间保持一致并且是原子性的。
acion操作包括:
(1)commits: 表示将一批数据原子写入表中
(2)cleans: 清除表中不在需要的旧版本文件的后台活动。
(3)delta_commit:增量提交是指将一批数据原子性写入MergeOnRead类型的表中,其中部分或者所有数据可以写入增量日志中。
(4)compaction: 协调hudi中差异数据结构的后台活动,例如:将更新从基于行的日志文件变成列格式。在内部,压缩的表现为时间轴上的特殊提交。
(5)rollback:表示提交操作不成功且已经回滚,会删除在写入过程中产生的数据
(6)savepoint:将某些文件标记为“已保存”,以便清理程序时不会被清楚。在需要数据恢复的情况下,有助于将数据集还原到时间轴上某个点。
任何时刻都会处于以下state:
(1)requested:表示一个动作已被安排,但尚未启动
(2)inflight:表是当前正在执行操作
(3)completed:表是在时间线上完成了操作
上图显示了hudi表上10:00和10:20之间发生的更新插入,每5分钟一次,将提交元数据留以及其他后台清理/压缩操作在hudi时间轴上。观察的关键点是,提交时间表示数据的到达时间,而实际数据组织则反应了实际时间或事件时间,即数据所反映的从07:00开始的每小时时段。在权衡数据延迟和完整性,这是两个关键概念。
如果有延迟到达的数据(事件时间为9:00的数据在10:20达到,延迟>1小时),可以看到upsert将数据生成到更旧的时间段/文件夹中。在时间轴的帮助下,增量查询可以只提取10:00以后成功提交的新数据,并非高效地只消费更改过的文件,且无需扫描更大的文件范围,例如07:00后的所有时间段。
File Layout
Hudi会在DFS分布式文件系统上的basepath基本路径下组织成目录结构。每张对应的表都会成多个分区,这些分区是包含该分区的数据文件的文件夹,与hive的目录结构非常相似。
在每个分区内,文件被组织成文件组,文件id为唯一标识。每个文件组包含多个切片,其中每个切片包含在某个提交/压缩即时时间生成的基本列文件(parquet文件),以及自生成基本文件以来对基本文件的插入/更新的一组日志文件(*.log)。Hudi采用MVCC设计,其中压缩操作会将日志和基本文件合并成新的文件片,清理操作会将未使用/较旧的文件片删除来回收DFS上的空间。
MVCC(Multi-Version Concurrency Control):多版本并行发控制机制
Multi-Versioning:产生多版本的数据内容,使得读写可以不互相阻塞
Concurrency Control:并发控制,使得并行执行的内容能保持串行化结果
Index
Hudi通过索引机制将映射的给定的hoodie key(record key+partition path)映射到文件id(唯一标示),从而提供高效的upsert操作。记录键和文件组/文件ID之间的这种映射,一旦记录的第一个版本写入文件就永远不会改变。
Table Types& Queries
Hudi表类型定义了如何在DFS上对数据进行索引和布局,以及如何在此类组织上实现上述操作和时间轴活动(即如何写入数据)。同样,查询类型定义了底层数据如何暴露给查询(即如何读取数据)。
Table Types:
(1)Copy on Write:使用列式存储来存储数据(例如:parquet),通过在写入期间执行同步合并来简单地更新和重现文件
(2)Merge on Read:使用列式存储(parquet)+行式文件(arvo)组合存储数据。更新记录到增量文件中,然后进行同步或异步压缩来生成新版本的列式文件。
下面总结了两种表类型之间的权衡
Query Types:
(1)Snapshot Queries:快照查询,在此视图上的查询将看到某个提交和压缩操作的最新快照。对于merge on read的表,它通过即时合并最新文件切片的基本文件和增量文件来展示近乎实时的数据(几分钟)。对于copy on write的表,它提供了对现有parquet表的直接替代,同时提供了upsert/delete和其他写入功能。
(2)Incremental Queries:增量查询,该视图智能看到从某个提交/压缩写入数据集的新数据。该视图有效地提供了chang stream,来支持增量视图
(3)Read Optimized Queries:读优化视图,在此视图上的查询将查看到给定提交或压缩操作中的最新快照。该视图将最新文件切片的列暴露个查询,并保证与非hudi列式数据集相比,具有相同列式查询功能。
下面总结了两种查询的权衡
Copy on Write Table
Copy on Write表中的文件切片仅包含基本/列文件,并且每次提交都会生成新版本的基本文件。换句话说,每次提交操作都会被压缩,以便存储列式数据,因此Write Amplification写入放大非常高(即使只有一个字节的数据被提交修改,我们也需要重写整个列数据文件),而读取数据成本则没有增加,所以这种表适合于做分析工作,读取密集型的操作。
下图说明了copy on write的表是如何工作的
随着数据被写入,对现有文件组的更新会为该文件组生成一个带有提交即时间标记的新切片,而插入分配一个新文件组并写入该文件组第一个切片。这些切片和提交即时时间在上图用同一颜色标识。针对图上右侧sql查询,首先检查时间轴上的最新提交并过滤掉之前的旧数据(根据时间查询最新数据),如上图所示粉色数据在10:10被提交,第一次查询是在10:10之前,所以出现不到粉色数据,第二次查询时间在10:10之后,可以查询到粉色数据(以被提交的数据)。
Copy on Write表从根本上改进表的管理方式
(1)在原有文件上进行自动更新数据,而不是重新刷新整个表/分区
(2)能够只读取修改部分的数据,而不是浪费查询无效数据
(3)严格控制文件大小来保证查询性能(小文件会显著降低查询性能)
Merge on Read Table
Merge on Read表是copy on write的超集,它仍然支持通过仅向用户公开最新的文件切片中的基本/列来对表进行查询优化。用户每次对表文件的upsert操作都会以增量日志的形式进行存储,增量日志会对应每个文件最新的ID来帮助用户完成快照查询。因此这种表类型,能够智能平衡读取和写放大(wa),提供近乎实时的数据。这种表最重要的是压缩器,它用来选择将对应增量日志数据压缩到表的基本文件中,来保持查询时的性能(较大的增量日志文件会影响合并时间和查询时间)
下图说明了该表的工作原理,并显示两种查询类型:快照查询和读取优化查询
(1)如上图所示,现在每一分钟提交一次,这种操作是在别的表里(copy on write table)无法做到的
(2)现在有一个增量日志文件,它保存对基本列文件中记录的传入更新(对表的修改),在图中,增量日志文件包含从10:05到10:10的所有数据。基本列文件仍然使用commit来进行版本控制,因此如果只看基本列文件,那么表的表的布局就像copy on write表一样。
(3)定期压缩过程会协调增量日志文件和基本列文件进行合并,并生成新版本的基本列文件,就如图中10:05所发生的情况一样。
(4)查询表的方式有两种,Read Optimized query和Snapshot query,取决于我们选择是要查询性能还是数据新鲜度
(5)如上图所示,Read Optimized query查询不到10:05之后的数据(查询不到增量日志里的数据),而Snapshot query则可以查询到全量数据(基本列数据+行式的增量日志数据)。
(6)压缩触发是解决所有难题的关键,通过实施压缩策略,会快速缩新分区数据,来保证用户使用Read Optimized query可以查询到X分钟内的数据
Merge on Read Table是直接在DFS上启用近实时(near real-time)处理,而不是将数据复制到外部专用系统中。该表还有些次要的好处,例如通过避免数据的同步合并来减少写入放大(WA)。
写时复制(COW)与读时合并(MOR)区别
写时复制(Copy On Write):此存储类型使客户端能够以列式文件格式(当前为parquet)摄取数据。使用COW存储类型时,任何写入Hudi数据集的新数据都将写入新的parquet文件。更新现有的行将导致重写整个parquet文件(这些parquet文件包含要更新的受影响的行)。因此,所有对此类数据集的写入都受parquet写性能的限制,parquet文件越大,摄取数据所花费的时间就越长。
读时合并(Merge On Read):此存储类型使客户端可以快速将数据摄取为基于行(如avro)的数据格式。使用MOR存储类型时,任何写入Hudi数据集的新数据都将写入新的日志/增量文件,这些文件在内部将数据以avro进行编码。压缩(Compaction)过程(配置为嵌入式或异步)将日志文件格式转换为列式文件格式(parquet)。
两种不同的格式提供了两种不同视图(读优化视图和实时视图),读优化视图取决于列式parquet文件的读取性能,而实时视图取决于列式和/或日志文件的读取性能。
更新现有的行将导致:a)写入从以前通过压缩(Compaction)生成的基础parquet文件对应的日志/增量文件更新;或b)在未进行压缩的情况下写入日志/增量文件的更新。因此,对此类数据集的所有写入均受avro /日志文件写入性能的限制,其速度比parquet快得多(写入时需要复制)。虽然,与列式(parquet)文件相比,读取日志/增量文件需要更高的成本(读取时需要合并)。
使用场景
近实时读取
Hudi在各种读取数据方面也有很多好处,Hudi在DFS分布式存储系统上存储数据强制执行了最小文件大小,这样有助于解决HDFS和存储上的小文件问题,显著的提升了查询性能。并且Hudi每次提交数据的方式都是原子性的,这样也避免了查询时受到部分写入的影响。
将外部各种OLTP的数据源(比如日志数据、数据库、外部源)写入到Hudi中也是一个常见的问题,Hudi存储这些数据,原始数据层的构建也是非常关键。
对应RDBMS这种关系型数据库的数据写入,Hudi提供了Upserts操作来提供增量修改或新增,而不是采用昂贵的且低效的批量加载。使用Debezium或Kafka Connect或Sqoop等工具将数据导入到Hudi对应在DFS上的表是非常常见的一种方案。对于像NoSql这样的数据库(Cassandra / Voldemort / HBase)即使中等规模的数据量会存储十亿行左右,所以使用完全批量加载的方式根本不行,如果读取要跟上数据的高更新的变化量,则需要更有效的方法。
即使对于像Kafka这样的不可变数据源,通常也需要根据DFS上存储的内容对传入事件进行重复数据的删除。Hudi通过使用不同类型的索引来快速解决这个问题。
所有的一切都可以通过Hudi DeltaStreamer工具无缝实现,该工具Hudi官方一直在尝试增加更多数据源,该工具提供一个连续模式,可以异步地自我管理集群/压缩,而不会阻止数据的写入,显著提高数据新鲜度。
Hudi+Debezium+flink+spark同步方案。
数据删除
Hudi还提供了删除存储在数据中的数据的能力,更重要的是提供了处理大量写入放大(wa)的有效方法,这些通过Merge On Read 表类型基于user_id(任何辅助键)随件删除产生的结果。Hudi可以基于日志提供优雅的并发控制,保证数据的写入和读取可以持续发生,因为后台压缩作业分摊数据的重写和强制删除所需要的成本。
Hudi还解锁了数据聚类等特殊功能,允许用户优化数据布局来进行删除。具体来说,用户可以基于user_id(辅助键)对旧的事件日志数据进行聚类,这样评估数据需要删除的数据就可以快速的定位,对于分区则在时间戳上进行了聚类优化,提高查询性能。
分析和存储 湖仓一体
数据的存储和分析一般我们分为两类数据,实时处理和离线批量处理。通常实时处理由Druid、Memsql或clickhouse提供支持,并且有kafka或pulsar提供服务,这种模型非常昂贵。
如果数据会在很晚之后才被写入到数据湖中,那么就需要进行批处理,Hudi也提供对Persto/Spark Sql等交互时Sql引擎的访问,这些引擎可以轻松横向扩展,并在几秒钟诶返回查询结果。
与实时数据集市相比,通过将数据新鲜度缩短到几分钟,Hudi可以为大量数据应用程序提供更高效的替代方案。
增量处理管道 流批一体
在传统数据仓库中,整个数仓的工作流可能会发生数据延迟的问题,比如上游工作流U每小时会在Hive建立分区,每小时结束时使用该处理时间作为event time,提供1小时的数据有效新鲜度,然后下游工作流D在U完成后立即启动,并在接下来一小时内进行自己的处理,将有效延迟增加到了2小时。这样的例子忽略了数据迟到现象,即processing_time和event time并不是同一时间而是分开的,比如移动设备或者传感器间歇性的连接所造成,这种现象并不是异常而是常态,那么在这种情况下,保证正确性的唯一补救措施是每小时一遍一遍得重新处理那些迟到的数据,这样可能会严重危害到整个生态系统的效率。
那么Hudi也提供了解决方案,它提供一种以记录粒度(不是目录/分区)从上有Hudi表中消费数据(包括后期迟到数据)的方法,可以有效地更新/协调后期数据到达,上下游两张表的调度可以更频繁,例如15分钟,并可以在下游表提供30分钟的端延迟。为了实现这一目标,Hudi采纳了spark streaming、Kafka、Flink、Oracle Streaming等技术框架的概念。
版权归原作者 sunxunyong 所有, 如有侵权,请联系我们删除。