前言
Apache Hudi insert源码分析总结,以Java Client为例,不了解Hudi Java Client的可以参考:Hudi Java Client总结|读取Hive写Hudi代码示例。
以Java Client为例的原因:1、自己生产上用的Java Client,相比于Spark客户端更熟悉一点。
2、Java Client和Spark、Flink客户端核心逻辑是一样的。不同的是比如Spark的入口是DF和SQL,多了一层API封装。
3、Java Client更贴近源码,可以直接分析核心逻辑。不用剖析Spark、Flink源码。对Sprk、Flink源码不熟悉的更容易上手。
4、等分析完Java Client源码后,有时间的话我会再总结一下Spark客户端的源码,这样大家会更容易理解。
版本
Hudi 0.9.0
备注:其实每个版本核心代码都差不多,之所以使用0.9.0,一个是因为对于Java Client,我用0.9.0用的比较多,相比于使用最新版可以节省不少时间,另一个原因是,之前总结的Java Client的源码也是基于0.9.0。比如Hudi Clean Policy 清理策略实现分析和Hudi Clean 清理文件实现分析
initTable
首先是通过initTable初始化Hudi表,可以看出来主要就是根据我们配置的一些参数,创建
.hoodie
元数据目录,然后将这些参数持久化到
hoodier.properties
文件中,具体的细节可以自己研究。
publicHoodieTableMetaClientinitTable(Configuration configuration,String basePath)throwsIOException{returnHoodieTableMetaClient.initTableAndGetMetaClient(configuration, basePath,build());}/**
* Helper method to initialize a given path as a hoodie table with configs passed in as Properties.
*
* @return Instance of HoodieTableMetaClient
*/publicstaticHoodieTableMetaClientinitTableAndGetMetaClient(Configuration hadoopConf,String basePath,Properties props)throwsIOException{LOG.info("Initializing "+ basePath +" as hoodie table "+ basePath);Path basePathDir =newPath(basePath);finalFileSystem fs =FSUtils.getFs(basePath, hadoopConf);if(!fs.exists(basePathDir)){
fs.mkdirs(basePathDir);}Path metaPathDir =newPath(basePath,METAFOLDER_NAME);if(!fs.exists(metaPathDir)){
fs.mkdirs(metaPathDir);}// if anything other than default archive log folder is specified, create that tooString archiveLogPropVal =newHoodieConfig(props).getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER);if(!StringUtils.isNullOrEmpty(archiveLogPropVal)){Path archiveLogDir =newPath(metaPathDir, archiveLogPropVal);if(!fs.exists(archiveLogDir)){
fs.mkdirs(archiveLogDir);}}// Always create temporaryFolder which is needed for finalizeWrite for Hoodie tablesfinalPath temporaryFolder =newPath(basePath,HoodieTableMetaClient.TEMPFOLDER_NAME);if(!fs.exists(temporaryFolder)){
fs.mkdirs(temporaryFolder);}// Always create auxiliary folder which is needed to track compaction workloads (stats and any metadata in future)finalPath auxiliaryFolder =newPath(basePath,HoodieTableMetaClient.AUXILIARYFOLDER_NAME);if(!fs.exists(auxiliaryFolder)){
fs.mkdirs(auxiliaryFolder);}initializeBootstrapDirsIfNotExists(hadoopConf, basePath, fs);HoodieTableConfig.createHoodieProperties(fs, metaPathDir, props);// We should not use fs.getConf as this might be different from the original configuration// used to create the fs in unit testsHoodieTableMetaClient metaClient =HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(basePath).build();LOG.info("Finished initializing Table of type "+ metaClient.getTableConfig().getTableType()+" from "+ basePath);return metaClient;}
HoodieWriteConfig
这里的配置是写数据时使用的配置,上面initTable的配置是持久化文件的配置,当然这俩配置要保持一致(实际上Spark客户端就是保持一致的)。可以看到有Schema、表名、payload、索引、clean、文件大小等一些参数。熟悉这些参数后就可以进行调优了。
Properties indexProperties =newProperties();
indexProperties.put(BLOOM_INDEX_FILTER_DYNAMIC_MAX_ENTRIES.key(),150000);// 1000万总体时间提升1分钟HoodieWriteConfig cfg =HoodieWriteConfig.newBuilder().withPath(tablePath).withSchema(writeSchema.toString()).withParallelism(parallelism, parallelism).withDeleteParallelism(parallelism).forTable(tableName).withWritePayLoad(payloadClassName).withPayloadConfig(HoodiePayloadConfig.newBuilder().withPayloadOrderingField(orderingField).build()).withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM)// .bloomIndexPruneByRanges(false) // 1000万总体时间提升1分钟.bloomFilterFPP(0.000001)// 1000万总体时间提升3分钟.fromProperties(indexProperties).build()).withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(150,200).compactionSmallFileSize(Long.parseLong(smallFileLimit)).approxRecordSize(Integer.parseInt(recordSizeEstimate)).retainCommits(100).build()).withStorageConfig(HoodieStorageConfig.newBuilder().parquetMaxFileSize(Long.parseLong(maxFileSize)).build()).build();
HoodieJavaWriteClient
创建writeClient
HoodieJavaWriteClient writeClient =newHoodieJavaWriteClient<>(newHoodieJavaEngineContext(hadoopConf), cfg)
startCommit
String newCommitTime = writeClient.startCommit();
具体的实现在其父类
AbstractHoodieWriteClient
中。首先调用
rollbackFailedWrites
执行
rollback
操作,关于
rollback
分析本文先不讲。然后通过
HoodieActiveTimeline.createNewInstantTime()
创建一个新的instantTime。最后创建
metaClient
,通过metaClient.getActiveTimeline().createNewInstant生成
.commit.request
文件
publicStringstartCommit(){// 首先调用rollbackFailedWrites执行rollback操作CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),HoodieTimeline.COMMIT_ACTION,()->rollbackFailedWrites());// 生成新的instantTimeString instantTime =HoodieActiveTimeline.createNewInstantTime();// 创建metaClientHoodieTableMetaClient metaClient =createMetaClient(true);startCommit(instantTime, metaClient.getCommitActionType(), metaClient);return instantTime;}privatevoidstartCommit(String instantTime,String actionType,HoodieTableMetaClient metaClient){LOG.info("Generate a new instant time: "+ instantTime +" action: "+ actionType);// if there are pending compactions, their instantTime must not be greater than that of this instant time
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending ->ValidationUtils.checkArgument(HoodieTimeline.compareTimestamps(latestPending.getTimestamp(),HoodieTimeline.LESSER_THAN, instantTime),"Latest pending compaction instant time must be earlier than this instant time. Latest Compaction :"+ latestPending +", Ingesting at "+ instantTime));if(config.getFailedWritesCleanPolicy().isLazy()){this.heartbeatClient.start(instantTime);}// 创建.commit.request
metaClient.getActiveTimeline().createNewInstant(newHoodieInstant(HoodieInstant.State.REQUESTED, actionType,
instantTime));}
generateRecord
主要是构造writeClient写数据所需的数据结构writeRecords:List<HoodieRecord>,具体的可以参考我之前分享的文章。
client.insert(writeRecords, newCommitTime)
首先获取table,这里返回HoodieJavaCopyOnWriteTable,接着验证一下schema和历史数据的兼容性。然后通过preWrite执行写之前的一些步骤,比如设置操作类型,接着调用table.insert执行完整的写数据操作,返回result。最后调用postWrite执行archive、clean等操作返回WriteStatuses。
publicList<WriteStatus>insert(List<HoodieRecord<T>> records,String instantTime){// 首先获取table,这里的table为HoodieJavaCopyOnWriteTableHoodieTable<T,List<HoodieRecord<T>>,List<HoodieKey>,List<WriteStatus>> table =getTableAndInitCtx(WriteOperationType.INSERT, instantTime);// 验证schema
table.validateUpsertSchema();// 写之前的一些步骤,比如设置操作类型preWrite(instantTime,WriteOperationType.INSERT, table.getMetaClient());// 调用table.insert执行写数据操作,返回resultHoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, instantTime, records);if(result.getIndexLookupDuration().isPresent()){
metrics.updateIndexMetrics(LOOKUP_STR, result.getIndexLookupDuration().get().toMillis());}// 调用postWrite返回WriteStatusesreturnpostWrite(result, instantTime, table);}
postWrite
我们先看一下postWrite的逻辑,首先判断是否已经commit生成了.commit文件,如果是的话,则执行archive、clean,也就是archive、clean等操作是在写操作完成、生成.commit文件之后进行的。
/**
* 判断是否已经commit生成了.commit文件,如果是的话,则执行archive、clean
*/protectedList<WriteStatus>postWrite(HoodieWriteMetadata<List<WriteStatus>> result,String instantTime,HoodieTable<T,List<HoodieRecord<T>>,List<HoodieKey>,List<WriteStatus>> hoodieTable){if(result.getIndexLookupDuration().isPresent()){
metrics.updateIndexMetrics(getOperationType().name(), result.getIndexUpdateDuration().get().toMillis());}// commit是否已经提交,这里主要考虑是否设置了自动提交,hoodie.auto.commit默认true// 如果不是自动提交的话,那么我们需要手动执行clean等操作,然后手动commit// 所以这里默认为true// isCommitted代表着已经生成了.commit文件,也就是写操作成功了,也就是通过table.insert已经完成了整个的写操作if(result.isCommitted()){// Perform post commit operations.if(result.getFinalizeDuration().isPresent()){
metrics.updateFinalizeWriteMetrics(result.getFinalizeDuration().get().toMillis(),
result.getWriteStats().get().size());}// postCommit主要是执行archive、clean等操作。也就是archive、clean等操作是在写操作完成,生成.commit文件之后进行的。postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,Option.empty());emitCommitMetrics(instantTime, result.getCommitMetadata().get(), hoodieTable.getMetaClient().getCommitActionType());}return result.getWriteStatuses();}
table.insert
先调用JavaInsertCommitActionExecutor.execute接着调用JavaWriteHelper.newInstance().write
publicHoodieWriteMetadata<List<WriteStatus>>insert(HoodieEngineContext context,String instantTime,List<HoodieRecord<T>> records){returnnewJavaInsertCommitActionExecutor<>(context, config,this, instantTime, records).execute();}publicHoodieWriteMetadata<List<WriteStatus>>execute(){returnJavaWriteHelper.newInstance().write(instantTime, inputRecords, context, table,
config.shouldCombineBeforeInsert(), config.getInsertShuffleParallelism(),this,false);}
JavaWriteHelper.write
它的write方法是在其父类
AbstractWriteHelper
中实现的,首先首先判断是否需要去重(通过配置项hoodie.combine.before.insert配置是否需要去重),insert默认不需要去重(upsert/delete默认需要)。如果需要去重的话调用方法
combineOnCondition
先进行去重。
然后判断是否需要tag, tag的作用主要是利用文件中保存的索引信息(默认布隆索引),判断records中的数据哪些是新增数据,哪些是更新数据,对于更新的数据,还要添加上对应的文件位置信息,方便后面更新时查找对应的parquet文件。由于这里为insert所以不需要tag,这也是insert和upsert一个比较大的区别。
我们后面分析upsert源码时,会专门分析tag怎么实现的,本文先略过。然后通过调用executor.execute执行写操作,返回result,这里的executor为JavaInsertCommitActionExecutor。
publicHoodieWriteMetadata<O>write(String instantTime,I inputRecords,HoodieEngineContext context,HoodieTable<T,I,K,O> table,boolean shouldCombine,int shuffleParallelism,BaseCommitActionExecutor<T,I,K,O,R> executor,boolean performTagging){try{// De-dupe/merge if needed// 如果开启了去重,则先去重,insert默认不去重// 配置项hoodie.combine.before.insertI dedupedRecords =combineOnCondition(shouldCombine, inputRecords, shuffleParallelism, table);Instant lookupBegin =Instant.now();I taggedRecords = dedupedRecords;if(performTagging){// 是否需要tag,insert为false// perform index loop up to get existing location of records// tag的作用主要是利用文件中保存的索引信息(默认布隆索引),判断records中的数据哪些是新增数据,哪些是更新数据// 对于更新的数据,还要添加上对应的文件位置信息,方便后面更新时查找对应的parquet文件
taggedRecords =tag(dedupedRecords, context, table);}Duration indexLookupDuration =Duration.between(lookupBegin,Instant.now());// 通过调用executor.execute执行写操作,返回result。这里的executor为JavaInsertCommitActionExecutorHoodieWriteMetadata<O> result = executor.execute(taggedRecords);
result.setIndexLookupDuration(indexLookupDuration);return result;}catch(Throwable e){if(e instanceofHoodieUpsertException){throw(HoodieUpsertException) e;}thrownewHoodieUpsertException("Failed to upsert for commit time "+ instantTime, e);}}publicbooleanshouldCombineBeforeInsert(){returngetBoolean(COMBINE_BEFORE_INSERT);}publicstaticfinalConfigProperty<String>COMBINE_BEFORE_INSERT=ConfigProperty.key("hoodie.combine.before.insert").defaultValue("false").withDocumentation("When inserted records share same key, controls whether they should be first combined (i.e de-duplicated) before"+" writing to storage.");
JavaInsertCommitActionExecutor.execute
JavaInsertCommitActionExecutor.execute实际上调用的父类
BaseJavaCommitActionExecutor
的
execute
首先通过buildProfile构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用。WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息。数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理,位置信息是为了获取要更新的文件,也就是对应的fileId。对于upsert数据,我们复用原来的fileId。对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写。然后将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight。这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息。然后通过getPartitioner根据WorkloadProfile获取partitioner,接着调用partition方法返回partitionedRecords(<桶号,对应的HoodieRecord>),一个桶对应一个文件 fileId。最后再遍历partitionedRecords,也就是每个桶执行一次写操作handleInsertPartition/handleUpsertPartition,最后调用BoundedInMemoryExecutor.execute,利用生产者消费者模式写数据,关于如何通过生产者消费者模式写数据,我已经在Hudi源码|bootstrap源码分析总结(写Hudi)分析过bootstrap的源码了,原理一样,不同的是实现类不一样,感兴趣的可以看看。
关于tag(索引相关)、WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition本文讲个大概,可能有不准确的地方,大家可以先结合17张图带你彻底理解Hudi Upsert原理进行学习,至于具体的源码分析,限于篇幅及个人精力,本文先不涉及,会放在后面的文章单独讲解,对于本文可能不准确的地方,也会在后面的文章中更新。
publicHoodieWriteMetadata<List<WriteStatus>>execute(List<HoodieRecord<T>> inputRecords){HoodieWriteMetadata<List<WriteStatus>> result =newHoodieWriteMetadata<>();WorkloadProfile profile =null;if(isWorkloadProfileNeeded()){// 始终为true// 构建WorkloadProfile,构建WorkloadProfile的目的主要是为给getPartitioner使用// WorkloadProfile包含了分区路径对应的insert/upsert数量以及upsert数据对应的文件位置信息// 数量信息是为了分桶,或者说是为了分几个文件,这里涉及了小文件合并、文件大小等原理// 位置信息是为了获取要更新的文件// 对于upsert数据,我们复用原来的fileId// 对于insert数据,我们生成新的fileId,如果record数比较多,则分多个文件写
profile =newWorkloadProfile(buildProfile(inputRecords));LOG.info("Workload profile :"+ profile);try{// 将WorkloadProfile元数据信息持久化到.inflight文件中,.commit.request->.commit.inflight.// 这一步主要是为了mor表的rollback,rollback时可以从.inflight文件中读取对应的元数据信息saveWorkloadProfileMetadataToInflight(profile, instantTime);}catch(Exception e){HoodieTableMetaClient metaClient = table.getMetaClient();HoodieInstant inflightInstant =newHoodieInstant(HoodieInstant.State.INFLIGHT, metaClient.getCommitActionType(), instantTime);try{if(!metaClient.getFs().exists(newPath(metaClient.getMetaPath(), inflightInstant.getFileName()))){thrownewHoodieCommitException("Failed to commit "+ instantTime +" unable to save inflight metadata ", e);}}catch(IOException ex){LOG.error("Check file exists failed");thrownewHoodieCommitException("Failed to commit "+ instantTime +" unable to save inflight metadata ", ex);}}}// 根据WorkloadProfile获取partitionerfinalPartitioner partitioner =getPartitioner(profile);// <桶号,对应的HoodieRecord>,一个桶对应一个文件 fileIdMap<Integer,List<HoodieRecord<T>>> partitionedRecords =partition(inputRecords, partitioner);List<WriteStatus> writeStatuses =newLinkedList<>();// forEach,每个桶执行一次写操作handleInsertPartition/handleUpsertPartition// 最终通过BoundedInMemoryExecutor.execute 生产者消费者模式写数据
partitionedRecords.forEach((partition, records)->{// 是否更新、删除if(WriteOperationType.isChangingRecords(operationType)){handleUpsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);}else{handleInsertPartition(instantTime, partition, records.iterator(), partitioner).forEachRemaining(writeStatuses::addAll);}});updateIndex(writeStatuses, result);// commit生成.commit文件,.commit文件的生成标记着写数据的完成updateIndexAndCommitIfNeeded(writeStatuses, result);return result;}
commit
上面通过handleInsertPartition/handleUpsertPartition实际上已经完成了数据的写入。但是最后还要生成.commit元数据文件,代表一次commit的完成,否则如果没有生成commit的话,比如只有.commit.request或者commit.inflight,这样在查询时候不会查到本地写数据生成的文件,而且下次写数据时会触发rollback来处理。
这里索引相关的先不看,commit调用链
updateIndexAndCommitIfNeeded
->
commitOnAutoCommit
->
autoCommit
->
commit
->
commit
,最终在BaseJavaCommitActionExecutor的commit方法中通过activeTimeline.saveAsComplete生成.commit文件。
前面讲过了,在生成.commit文件后,会调用postWrite方法触发archive、clean等操作。实际上archive、clean等操作的失败,不影响本次写数据的成功。比如clean失败了,可以下次再clean就可以了。所以当commit完成后,如果clean失败了,这样对于有失败机制的集成工具,比如我们使用的Apache NIFI,是不能将本批次数据放进失败队列的。PS:当本次commit不成功时,我们需要放进失败队列,目的是防止数据丢失。其实我们可以自己写代码继承JavaClient类,将postWrite方法和table.insert分开。这样便于判断是写数据失败还是clean失败,以后我会分享相关代码实现。
publicvoidupdateIndexAndCommitIfNeeded(List<WriteStatus> writeStatuses,HoodieWriteMetadata result){Instant indexStartTime =Instant.now();// Update the index backList<WriteStatus> statuses = table.getIndex().updateLocation(writeStatuses, context, table);
result.setIndexUpdateDuration(Duration.between(indexStartTime,Instant.now()));
result.setWriteStatuses(statuses);
result.setPartitionToReplaceFileIds(getPartitionToReplacedFileIds(result));commitOnAutoCommit(result);}protectedvoidcommitOnAutoCommit(HoodieWriteMetadata result){// validate commit action before committing resultrunPrecommitValidators(result);// validate commit action before committing resultif(config.shouldAutoCommit()){LOG.info("Auto commit enabled: Committing "+ instantTime);autoCommit(extraMetadata, result);}else{LOG.info("Auto commit disabled for "+ instantTime);}}protectedvoidautoCommit(Option<Map<String,String>> extraMetadata,HoodieWriteMetadata<O> result){this.txnManager.beginTransaction(Option.of(newHoodieInstant(State.INFLIGHT,HoodieTimeline.COMMIT_ACTION, instantTime)),
lastCompletedTxn.isPresent()?Option.of(lastCompletedTxn.get().getLeft()):Option.empty());try{TransactionUtils.resolveWriteConflictIfAny(table,this.txnManager.getCurrentTransactionOwner(),
result.getCommitMetadata(), config,this.txnManager.getLastCompletedTransactionOwner());commit(extraMetadata, result);}finally{this.txnManager.endTransaction();}}// BaseJavaCommitActionExecutorprotectedvoidcommit(Option<Map<String,String>> extraMetadata,HoodieWriteMetadata<List<WriteStatus>> result){commit(extraMetadata, result, result.getWriteStatuses().stream().map(WriteStatus::getStat).collect(Collectors.toList()));}protectedvoidcommit(Option<Map<String,String>> extraMetadata,HoodieWriteMetadata<List<WriteStatus>> result,List<HoodieWriteStat> writeStats){String actionType =getCommitActionType();LOG.info("Committing "+ instantTime +", action Type "+ actionType);
result.setCommitted(true);
result.setWriteStats(writeStats);// Finalize writefinalizeWrite(instantTime, writeStats, result);try{LOG.info("Committing "+ instantTime +", action Type "+getCommitActionType());HoodieActiveTimeline activeTimeline = table.getActiveTimeline();HoodieCommitMetadata metadata =CommitUtils.buildMetadata(writeStats, result.getPartitionToReplaceFileIds(),
extraMetadata, operationType,getSchemaToStoreInCommit(),getCommitActionType());// 通过activeTimeline.saveAsComplete生成.commit文件
activeTimeline.saveAsComplete(newHoodieInstant(true,getCommitActionType(), instantTime),Option.of(metadata.toJsonString().getBytes(StandardCharsets.UTF_8)));LOG.info("Committed "+ instantTime);
result.setCommitMetadata(Option.of(metadata));}catch(IOException e){thrownewHoodieCommitException("Failed to complete commit "+ config.getBasePath()+" at time "+ instantTime,
e);}}
handleInsertPartition
附
handleInsertPartition
到生产者消费者模式调用链,
handleInsertPartition
->
handleUpsertPartition
->
handleInsert
->
JavaLazyInsertIterable.computeNext
->
BoundedInMemoryExecutor.execute
protectedIterator<List<WriteStatus>>handleInsertPartition(String instantTime,Integer partition,Iterator recordItr,Partitioner partitioner){returnhandleUpsertPartition(instantTime, partition, recordItr, partitioner);}protectedIterator<List<WriteStatus>>handleUpsertPartition(String instantTime,Integer partition,Iterator recordItr,Partitioner partitioner){JavaUpsertPartitioner javaUpsertPartitioner =(JavaUpsertPartitioner) partitioner;BucketInfo binfo = javaUpsertPartitioner.getBucketInfo(partition);BucketType btype = binfo.bucketType;try{if(btype.equals(BucketType.INSERT)){returnhandleInsert(binfo.fileIdPrefix, recordItr);}elseif(btype.equals(BucketType.UPDATE)){returnhandleUpdate(binfo.partitionPath, binfo.fileIdPrefix, recordItr);}else{thrownewHoodieUpsertException("Unknown bucketType "+ btype +" for partition :"+ partition);}}catch(Throwable t){String msg ="Error upserting bucketType "+ btype +" for partition :"+ partition;LOG.error(msg, t);thrownewHoodieUpsertException(msg, t);}}publicIterator<List<WriteStatus>>handleInsert(String idPfx,Iterator<HoodieRecord<T>> recordItr){// This is needed since sometimes some buckets are never picked in getPartition() and end up with 0 recordsif(!recordItr.hasNext()){LOG.info("Empty partition");returnCollections.singletonList((List<WriteStatus>)Collections.EMPTY_LIST).iterator();}returnnewJavaLazyInsertIterable<>(recordItr,true, config, instantTime, table, idPfx,
taskContextSupplier,newCreateHandleFactory<>());}// JavaLazyInsertIterableprotectedList<WriteStatus>computeNext(){// Executor service used for launching writer thread.BoundedInMemoryExecutor<HoodieRecord<T>,HoodieInsertValueGenResult<HoodieRecord>,List<WriteStatus>> bufferedIteratorExecutor =null;try{finalSchema schema =newSchema.Parser().parse(hoodieConfig.getSchema());
bufferedIteratorExecutor =newBoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(),newIteratorBasedQueueProducer<>(inputItr),Option.of(getInsertHandler()),getTransformFunction(schema));// bufferedIteratorExecutor.execute通过生产者消费者模型实现写数据 finalList<WriteStatus> result = bufferedIteratorExecutor.execute();assert result !=null&&!result.isEmpty()&&!bufferedIteratorExecutor.isRemaining();return result;}catch(Exception e){thrownewHoodieException(e);}finally{if(null!= bufferedIteratorExecutor){
bufferedIteratorExecutor.shutdownNow();}}}
总结
本文以Java Client为例,对Apache Hudi insert源码进行了整体逻辑的分析总结,希望能对大家有所帮助。由于精力有限,对于文中提到的WorkloadProfile、getPartitioner、handleInsertPartition/handleUpsertPartition等,我会在后面的文章再进行总结。并且等insert相关源码分析完后,会再进行upsert的源码分析。可能有些地方不够准确,还请大家多多指正,让我们共同进步。
注释代码
github: https://github.com/dongkelun/hudi/tree/0.9.0-learning-comments
gitee: https://gitee.com/dongkelun/hudi/tree/0.9.0-learning-comments
相关阅读
- Hudi Java Client总结|读取Hive写Hudi代码示例
- Hudi源码|bootstrap源码分析总结(写Hudi)
- Hudi Clean Policy 清理策略实现分析
- Hudi Clean 清理文件实现分析
- Hudi查询类型/视图总结
- Hudi preCombinedField 总结(二)-源码分析
- Hudi Spark SQL源码学习总结-Create Table
- Hudi Spark SQL源码学习总结-CTAS
- Hudi Spark源码学习总结-df.write.format(“hudi”).save
- Hudi Spark源码学习总结-spark.read.format(“hudi”).load
- Hudi Spark SQL源码学习总结-select(查询)
版权归原作者 董可伦 所有, 如有侵权,请联系我们删除。