0


Hudi源码 | Insert源码分析总结(二)(WorkloadProfile)

前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站:https://www.captainai.net/dongkelun

前言

接上篇文章:Hudi源码 | Insert源码分析总结(一)(整体流程),继续进行Apache Hudi Insert源码分析总结,本文主要分析上文提到的

WorkloadProfile

版本

Hudi 0.9.0

入口

入口在上篇文章中讲到的

BaseJavaCommitActionExecutor

execute
WorkloadProfile profile =null;if(isWorkloadProfileNeeded()){// 始终为true// 构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用// WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息// 数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理// 位置信息是为了获取要更新的文件// 对于upsert数据,我们复用原来的fileId// 对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写
      profile =newWorkloadProfile(buildProfile(inputRecords));LOG.info("Workload profile :"+ profile);try{// 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.// 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息saveWorkloadProfileMetadataToInflight(profile, instantTime);}catch(Exception e){HoodieTableMetaClient metaClient = table.getMetaClient();HoodieInstant inflightInstant =newHoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);try{if(!metaClient.getFs().exists(newPath(metaClient.getMetaPath(), inflightInstant.getFileName()))){thrownewHoodieCommitException("Failed to commit "+ instantTime +" unable to save inflight metadata ", e);}}catch(IOException ex){LOG.error("Check file exists failed");thrownewHoodieCommitException("Failed to commit "+ instantTime +" unable to save inflight metadata ", ex);}}}

WorkloadProfile

首先看一下

WorkloadProfile

的构造函数,看看需要哪些参数。它有两个构造函数,一个只有一个参数:

Pair<HashMap<String, WorkloadStat>, WorkloadStat>

profile,另外一个相比于第一个只是多了一个写操作类型:WriteOperationType。对于profile,它的left是分区统计信息,right是全局统计信息。统计信息是通过WorkloadStat实现的。

publicWorkloadProfile(Pair<HashMap<String,WorkloadStat>,WorkloadStat> profile){this.partitionPathStatMap = profile.getLeft();this.globalStat = profile.getRight();}publicWorkloadProfile(Pair<HashMap<String,WorkloadStat>,WorkloadStat> profile,WriteOperationType operationType){this(profile);this.operationType = operationType;}

buildProfile

buildProfile

就是将

inputRecords

构造成

WorkloadProfile

所需要的

profile

。首先初始化一个分区统计信息

partitionPathStatMap

和全局统计信息

globalStat

。然后将inputRecords通过map和groupingBy得到每个分区路径对应的文件位置信息和记录数量:partitionLocationCounts。其中文件位置信息是通过record.getCurrentLocation得到的,保存在HoodieRecordLocation中。而record中位置信息是通过上篇文章提到的

tag

方法通过读取索引信息得到的。不过tag方法只有upsert/delete等才会调用,对于insert方法是不会触发的,也就是这里的record中的location都为空。然后遍历partitionLocationCounts.entrySet(),其实就是按照分区执行,获取分区路径、记录数、文件位置。如果partitionPathStatMap没有该分区,则将该分区放进去,并且初始化value即WorkloadStat。接着判断文件信息是否存在,如果文件位置信息存在,则代表是update(有对应的历史数据),对于update,对应的分区下的WorkloadStat调用addUpdates,全局WorkloadStat调用addUpdates。如果文件位置信息不存在,则代表是insert数据(新数据),对于insert,对应分区下的WorkloadStat调用addInserts,使insert数加上对应的记录数,全局WorkloadStat中的insert数也加上对应的记录数,最后返回WorkloadProfile所需要的Pair.of(partitionPathStatMap, globalStat)

protectedPair<HashMap<String,WorkloadStat>,WorkloadStat>buildProfile(List<HoodieRecord<T>> inputRecords){// 分区路径,WorkloadStatHashMap<String,WorkloadStat> partitionPathStatMap =newHashMap<>();// 全局WorkloadStatWorkloadStat globalStat =newWorkloadStat();// 返回(分区路径,文件位置信息),记录数// 也就是partitionLocationCounts:分区路径、文件位置、记录数Map<Pair<String,Option<HoodieRecordLocation>>,Long> partitionLocationCounts = inputRecords
        .stream().map(record ->Pair.of(// (分区路径,文件位置信息),recordPair.of(record.getPartitionPath(),Option.ofNullable(record.getCurrentLocation())), record))// 根据分区路径groupBy,统计每个分区对应的数量.collect(Collectors.groupingBy(Pair::getLeft,Collectors.counting()));// 遍历partitionLocationCounts(k,v)for(Map.Entry<Pair<String,Option<HoodieRecordLocation>>,Long> e : partitionLocationCounts.entrySet()){// 分区路径String partitionPath = e.getKey().getLeft();// 记录数Long count = e.getValue();// 文件位置HoodieRecordLocationOption<HoodieRecordLocation> locOption = e.getKey().getRight();// 如果partitionPathStatMap没有该分区,则将该分区放进去,并且初始化value WorkloadStatif(!partitionPathStatMap.containsKey(partitionPath)){
        partitionPathStatMap.put(partitionPath,newWorkloadStat());}// 如果文件位置信息存在,则代表是update// 获取文件位置信息是在前面的tag方法中,对于insert方法,不需要tagif(locOption.isPresent()){// update// 对应分区下的WorkloadStat调用addUpdates
        partitionPathStatMap.get(partitionPath).addUpdates(locOption.get(), count);// 全局WorkloadStat调用addUpdates
        globalStat.addUpdates(locOption.get(), count);}else{// 否则是insert// insert// 对应分区下的WorkloadStat调用addInserts,使insert数+count
        partitionPathStatMap.get(partitionPath).addInserts(count);// 全局WorkloadStat中的insert数+count
        globalStat.addInserts(count);}}// 返回分区统计信息和全局统计信息returnPair.of(partitionPathStatMap, globalStat);}

WorkloadStat

主要是看一下它的addInserts和addUpdates方法。addInserts方法很简单,就是将numInsert更新为numInserts加上对应的记录数。addUpdates稍微复杂一点,主要是更新updateLocationToCount,updateLocationToCount保存的是(fileId,(instantTime,记录数))。主要逻辑:如果updateLocationToCount中没有该fileId,则直接将fileId,(instantTime,记录数)放进updateLocationToCount,如果有的话,则更新该fileId对应的value。value为pair,将value的left即instantTime更新为location.getInstantTime()。将value的right即记录数更新为numUpdates + accNumUpdates。其中numUpdates为参数即本次记录数,accNumUpdates是已经存在的累计数量。最后再将numUpdates更新numUpdates+对应的记录数。(numUpdates在后面并没有用到)

总结一下,WorkloadStat的作用主要记录insert累计数和update累计数。不过update需要以fileId为维度进行累计,这是因为update有明确要更新的fileId,而insert是没有的。

privatelong numInserts =0L;privatelong numUpdates =0L;// fileId,(instantTime,记录数)privateHashMap<String,Pair<String,Long>> updateLocationToCount;/**
   * numInserts数初始值0,addInserts将numInserts加上对应的记录数
   */publiclongaddInserts(long numInserts){returnthis.numInserts += numInserts;}/**
   *  accNumUpdates初始值0
   * 如果updateLocationToCount中有对应的fileId,
   * 则先获取updateLocationToCount对应的fileId对应的记录数赋值给accNumUpdates
   * 最后将updateLocationToCount
   */publiclongaddUpdates(HoodieRecordLocation location,long numUpdates){// 初始值0long accNumUpdates =0;// 如果已经存在了对应的fileIdif(updateLocationToCount.containsKey(location.getFileId())){// 将updateLocationToCount中对应的数取出来赋值给accNumUpdates
      accNumUpdates = updateLocationToCount.get(location.getFileId()).getRight();}// 更新updateLocationToCount中该fileId对应的value。value为pair,将value的left即instantTime更新为location.getInstantTime()// 将value的right即记录数更新为numUpdates + accNumUpdates
    updateLocationToCount.put(
        location.getFileId(),Pair.of(location.getInstantTime(), numUpdates + accNumUpdates));// numUpdates初始值为0,每次调用addUpdates都将numUpdates加上对应的record数returnthis.numUpdates += numUpdates;}

saveWorkloadProfileMetadataToInflight

上篇文章讲到将WorkloadProfile元数据信息持久化到.inflight文件中,我们来看一下是如何持久化的。主要逻辑就是遍历profile中的分区,获取对应的WorkloadStat,然后将对应的partitionPath、numInserts/numUpdates、fileId、instantTime放到HoodieWriteStat中,再将HoodieWriteStat放到HoodieCommitMetadata中,最后调用activeTimeline.transitionRequestedToInflight将HoodieCommitMetadata转成json持久化到

.inflight

// 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.// 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息saveWorkloadProfileMetadataToInflight(profile, instantTime);voidsaveWorkloadProfileMetadataToInflight(WorkloadProfile profile,String instantTime)throwsHoodieCommitException{try{HoodieCommitMetadata metadata =newHoodieCommitMetadata();// 按照分区路径遍历
      profile.getPartitionPaths().forEach(path ->{// 获取对应分区的WorkloadStatWorkloadStat partitionStat = profile.getWorkloadStat(path);// 创建一个新的HoodieWriteStat,先进行insertHoodieWriteStat insertStat =newHoodieWriteStat();// 将WorkloadStat中的numInserts赋值给insertStat
        insertStat.setNumInserts(partitionStat.getNumInserts());// insertStat的fileId为空
        insertStat.setFileId("");// prevCommit为null
        insertStat.setPrevCommit(HoodieWriteStat.NULL_COMMIT);// 将path和insertStat添加到metadata中
        metadata.addWriteStat(path, insertStat);// 接着进行update的逻辑
        partitionStat.getUpdateLocationToCount().forEach((key, value)->{// 创建一个新的HoodieWriteStatHoodieWriteStat writeStat =newHoodieWriteStat();// 设置fileId
          writeStat.setFileId(key);// TODO : Write baseCommitTime is possible here ?// prevCommit设为WorkloadStat中的instantTime
          writeStat.setPrevCommit(value.getKey());// 设置更新数
          writeStat.setNumUpdateWrites(value.getValue());// 将path和writeStat添加到metadata中
          metadata.addWriteStat(path, writeStat);});});// 设置操作类型
      metadata.setOperationType(operationType);HoodieActiveTimeline activeTimeline = table.getActiveTimeline();String commitActionType =getCommitActionType();HoodieInstant requested =newHoodieInstant(State.REQUESTED, commitActionType, instantTime);// 将.request转为.inflight,其实就是创建一个新的.inflight,将metadata转成json持久化到.inflight
      activeTimeline.transitionRequestedToInflight(requested,Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)),
          config.shouldAllowMultiWriteOnSameInstant());}catch(IOException io){thrownewHoodieCommitException("Failed to commit "+ instantTime +" unable to save inflight metadata ", io);}}

总结

关于

WorkloadProfile

的分析一共就这么多,主要是统计

record

中每个分区路径对应的insert/upsert数量以及upsert数据对应的

fileId

instantTime

,先持久化到

.inflight

文件中,然后给后面的

getPartitioner

使用。关于

WorkloadProfile

统计的这些信息是如何在

getPartitioner

中使用的,我们放在下篇文章中分析。

注释代码

github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments

相关阅读

  • 开源经验分享 | 如何从一名小白成为Apache Hudi Contributor
  • Hudi源码 | Insert源码分析总结(一)(整体流程)
  • Hudi Java Client总结|读取Hive写Hudi代码示例
  • Hudi源码|bootstrap源码分析总结(写Hudi)
  • Hudi Clean Policy 清理策略实现分析
  • Hudi Clean 清理文件实现分析
  • Hudi查询类型/视图总结
  • Hudi preCombinedField 总结(二)-源码分析
  • Hudi Spark SQL源码学习总结-Create Table
  • Hudi Spark SQL源码学习总结-CTAS
  • Hudi Spark源码学习总结-df.write.format(“hudi”).save
  • Hudi Spark源码学习总结-spark.read.format(“hudi”).load
  • Hudi Spark SQL源码学习总结-select(查询)

本文转载自: https://blog.csdn.net/dkl12/article/details/127888951
版权归原作者 董可伦 所有, 如有侵权,请联系我们删除。

“Hudi源码 | Insert源码分析总结(二)(WorkloadProfile)”的评论:

还没有评论