前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun
前言
接上篇文章:Hudi源码 | Insert源码分析总结(一)(整体流程),继续进行Apache Hudi Insert源码分析总结,本文主要分析上文提到的
WorkloadProfile
版本
Hudi 0.9.0
入口
入口在上篇文章中讲到的
BaseJavaCommitActionExecutor
的
execute
WorkloadProfile profile =null;if(isWorkloadProfileNeeded()){// 始终为true// 构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用// WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息// 数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理// 位置信息是为了获取要更新的文件// 对于upsert数据,我们复用原来的fileId// 对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写
profile =newWorkloadProfile(buildProfile(inputRecords));LOG.info("Workload profile :"+ profile);try{// 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.// 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息saveWorkloadProfileMetadataToInflight(profile, instantTime);}catch(Exception e){HoodieTableMetaClient metaClient = table.getMetaClient();HoodieInstant inflightInstant =newHoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);try{if(!metaClient.getFs().exists(newPath(metaClient.getMetaPath(), inflightInstant.getFileName()))){thrownewHoodieCommitException("Failed to commit "+ instantTime +" unable to save inflight metadata ", e);}}catch(IOException ex){LOG.error("Check file exists failed");thrownewHoodieCommitException("Failed to commit "+ instantTime +" unable to save inflight metadata ", ex);}}}
WorkloadProfile
首先看一下
WorkloadProfile
的构造函数,看看需要哪些参数。它有两个构造函数,一个只有一个参数:
Pair<HashMap<String, WorkloadStat>, WorkloadStat>
profile,另外一个相比于第一个只是多了一个写操作类型:WriteOperationType。对于profile,它的left是分区统计信息,right是全局统计信息。统计信息是通过WorkloadStat实现的。
publicWorkloadProfile(Pair<HashMap<String,WorkloadStat>,WorkloadStat> profile){this.partitionPathStatMap = profile.getLeft();this.globalStat = profile.getRight();}publicWorkloadProfile(Pair<HashMap<String,WorkloadStat>,WorkloadStat> profile,WriteOperationType operationType){this(profile);this.operationType = operationType;}
buildProfile
buildProfile
就是将
inputRecords
构造成
WorkloadProfile
所需要的
profile
。首先初始化一个分区统计信息
partitionPathStatMap
和全局统计信息
globalStat
。然后将inputRecords通过map和groupingBy得到每个分区路径对应的文件位置信息和记录数量:partitionLocationCounts。其中文件位置信息是通过record.getCurrentLocation得到的,保存在HoodieRecordLocation中。而record中位置信息是通过上篇文章提到的
tag
方法通过读取索引信息得到的。不过tag方法只有upsert/delete等才会调用,对于insert方法是不会触发的,也就是这里的record中的location都为空。然后遍历partitionLocationCounts.entrySet(),其实就是按照分区执行,获取分区路径、记录数、文件位置。如果partitionPathStatMap没有该分区,则将该分区放进去,并且初始化value即WorkloadStat。接着判断文件信息是否存在,如果文件位置信息存在,则代表是update(有对应的历史数据),对于update,对应的分区下的WorkloadStat调用addUpdates,全局WorkloadStat调用addUpdates。如果文件位置信息不存在,则代表是insert数据(新数据),对于insert,对应分区下的WorkloadStat调用addInserts,使insert数加上对应的记录数,全局WorkloadStat中的insert数也加上对应的记录数,最后返回WorkloadProfile所需要的Pair.of(partitionPathStatMap, globalStat)
protectedPair<HashMap<String,WorkloadStat>,WorkloadStat>buildProfile(List<HoodieRecord<T>> inputRecords){// 分区路径,WorkloadStatHashMap<String,WorkloadStat> partitionPathStatMap =newHashMap<>();// 全局WorkloadStatWorkloadStat globalStat =newWorkloadStat();// 返回(分区路径,文件位置信息),记录数// 也就是partitionLocationCounts:分区路径、文件位置、记录数Map<Pair<String,Option<HoodieRecordLocation>>,Long> partitionLocationCounts = inputRecords
.stream().map(record ->Pair.of(// (分区路径,文件位置信息),recordPair.of(record.getPartitionPath(),Option.ofNullable(record.getCurrentLocation())), record))// 根据分区路径groupBy,统计每个分区对应的数量.collect(Collectors.groupingBy(Pair::getLeft,Collectors.counting()));// 遍历partitionLocationCounts(k,v)for(Map.Entry<Pair<String,Option<HoodieRecordLocation>>,Long> e : partitionLocationCounts.entrySet()){// 分区路径String partitionPath = e.getKey().getLeft();// 记录数Long count = e.getValue();// 文件位置HoodieRecordLocationOption<HoodieRecordLocation> locOption = e.getKey().getRight();// 如果partitionPathStatMap没有该分区,则将该分区放进去,并且初始化value WorkloadStatif(!partitionPathStatMap.containsKey(partitionPath)){
partitionPathStatMap.put(partitionPath,newWorkloadStat());}// 如果文件位置信息存在,则代表是update// 获取文件位置信息是在前面的tag方法中,对于insert方法,不需要tagif(locOption.isPresent()){// update// 对应分区下的WorkloadStat调用addUpdates
partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), count);// 全局WorkloadStat调用addUpdates
globalStat.addUpdates(locOption.get(), count);}else{// 否则是insert// insert// 对应分区下的WorkloadStat调用addInserts,使insert数+count
partitionPathStatMap.get(partitionPath).addInserts(count);// 全局WorkloadStat中的insert数+count
globalStat.addInserts(count);}}// 返回分区统计信息和全局统计信息returnPair.of(partitionPathStatMap, globalStat);}
WorkloadStat
主要是看一下它的addInserts和addUpdates方法。addInserts方法很简单,就是将numInsert更新为numInserts加上对应的记录数。addUpdates稍微复杂一点,主要是更新updateLocationToCount,updateLocationToCount保存的是(fileId,(instantTime,记录数))。主要逻辑:如果updateLocationToCount中没有该fileId,则直接将fileId,(instantTime,记录数)放进updateLocationToCount,如果有的话,则更新该fileId对应的value。value为pair,将value的left即instantTime更新为location.getInstantTime()。将value的right即记录数更新为numUpdates + accNumUpdates。其中numUpdates为参数即本次记录数,accNumUpdates是已经存在的累计数量。最后再将numUpdates更新numUpdates+对应的记录数。(numUpdates在后面并没有用到)
总结一下,WorkloadStat的作用主要记录insert累计数和update累计数。不过update需要以fileId为维度进行累计,这是因为update有明确要更新的fileId,而insert是没有的。
privatelong numInserts =0L;privatelong numUpdates =0L;// fileId,(instantTime,记录数)privateHashMap<String,Pair<String,Long>> updateLocationToCount;/**
* numInserts数初始值0,addInserts将numInserts加上对应的记录数
*/publiclongaddInserts(long numInserts){returnthis.numInserts += numInserts;}/**
* accNumUpdates初始值0
* 如果updateLocationToCount中有对应的fileId,
* 则先获取updateLocationToCount对应的fileId对应的记录数赋值给accNumUpdates
* 最后将updateLocationToCount
*/publiclongaddUpdates(HoodieRecordLocation location,long numUpdates){// 初始值0long accNumUpdates =0;// 如果已经存在了对应的fileIdif(updateLocationToCount.containsKey(location.getFileId())){// 将updateLocationToCount中对应的数取出来赋值给accNumUpdates
accNumUpdates = updateLocationToCount.get(location.getFileId()).getRight();}// 更新updateLocationToCount中该fileId对应的value。value为pair,将value的left即instantTime更新为location.getInstantTime()// 将value的right即记录数更新为numUpdates + accNumUpdates
updateLocationToCount.put(
location.getFileId(),Pair.of(location.getInstantTime(), numUpdates + accNumUpdates));// numUpdates初始值为0,每次调用addUpdates都将numUpdates加上对应的record数returnthis.numUpdates += numUpdates;}
saveWorkloadProfileMetadataToInflight
上篇文章讲到将WorkloadProfile元数据信息持久化到.inflight文件中,我们来看一下是如何持久化的。主要逻辑就是遍历profile中的分区,获取对应的WorkloadStat,然后将对应的partitionPath、numInserts/numUpdates、fileId、instantTime放到HoodieWriteStat中,再将HoodieWriteStat放到HoodieCommitMetadata中,最后调用activeTimeline.transitionRequestedToInflight将HoodieCommitMetadata转成json持久化到
.inflight
中
// 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.// 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息saveWorkloadProfileMetadataToInflight(profile, instantTime);voidsaveWorkloadProfileMetadataToInflight(WorkloadProfile profile,String instantTime)throwsHoodieCommitException{try{HoodieCommitMetadata metadata =newHoodieCommitMetadata();// 按照分区路径遍历
profile.getPartitionPaths().forEach(path ->{// 获取对应分区的WorkloadStatWorkloadStat partitionStat = profile.getWorkloadStat(path);// 创建一个新的HoodieWriteStat,先进行insertHoodieWriteStat insertStat =newHoodieWriteStat();// 将WorkloadStat中的numInserts赋值给insertStat
insertStat.setNumInserts(partitionStat.getNumInserts());// insertStat的fileId为空
insertStat.setFileId("");// prevCommit为null
insertStat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);// 将path和insertStat添加到metadata中
metadata.addWriteStat(path, insertStat);// 接着进行update的逻辑
partitionStat.getUpdateLocationToCount().forEach((key, value)->{// 创建一个新的HoodieWriteStatHoodieWriteStat writeStat =newHoodieWriteStat();// 设置fileId
writeStat.setFileId(key);// TODO : Write baseCommitTime is possible here ?// prevCommit设为WorkloadStat中的instantTime
writeStat.setPrevCommit(value.getKey());// 设置更新数
writeStat.setNumUpdateWrites(value.getValue());// 将path和writeStat添加到metadata中
metadata.addWriteStat(path, writeStat);});});// 设置操作类型
metadata.setOperationType(operationType);HoodieActiveTimeline activeTimeline = table.getActiveTimeline();String commitActionType =getCommitActionType();HoodieInstant requested =newHoodieInstant(State.REQUESTED, commitActionType, instantTime);// 将.request转为.inflight,其实就是创建一个新的.inflight,将metadata转成json持久化到.inflight
activeTimeline.transitionRequestedToInflight(requested,Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)),
config.shouldAllowMultiWriteOnSameInstant());}catch(IOException io){thrownewHoodieCommitException("Failed to commit "+ instantTime +" unable to save inflight metadata ", io);}}
总结
关于
WorkloadProfile
的分析一共就这么多,主要是统计
record
中每个分区路径对应的insert/upsert数量以及upsert数据对应的
fileId
和
instantTime
,先持久化到
.inflight
文件中,然后给后面的
getPartitioner
使用。关于
WorkloadProfile
统计的这些信息是如何在
getPartitioner
中使用的,我们放在下篇文章中分析。
注释代码
github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments
相关阅读
- 开源经验分享 | 如何从一名小白成为Apache Hudi Contributor
- Hudi源码 | Insert源码分析总结(一)(整体流程)
- Hudi Java Client总结|读取Hive写Hudi代码示例
- Hudi源码|bootstrap源码分析总结(写Hudi)
- Hudi Clean Policy 清理策略实现分析
- Hudi Clean 清理文件实现分析
- Hudi查询类型/视图总结
- Hudi preCombinedField 总结(二)-源码分析
- Hudi Spark SQL源码学习总结-Create Table
- Hudi Spark SQL源码学习总结-CTAS
- Hudi Spark源码学习总结-df.write.format(“hudi”).save
- Hudi Spark源码学习总结-spark.read.format(“hudi”).load
- Hudi Spark SQL源码学习总结-select(查询)
版权归原作者 董可伦 所有, 如有侵权,请联系我们删除。