前言
Apache Hudi bootstrap源码简要走读,不了解Hudi bootstrap的可以参考:利用Hudi Bootstrap转化现有Hive表的parquet/orc文件为Hudi表
版本
Hudi 0.12.0
Spark 2.4.4
入口
val bootstrapDF = spark.emptyDataFrame
bootstrapDF.write.format("hudi").options(extraOpts).option(DataSourceWriteOptions.OPERATION.key,DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL).......save(basePath)
根据文章:Hudi Spark源码学习总结-df.write.format(“hudi”).save可知,
save
方法会走到DefaultSource.createRelation
override def createRelation(sqlContext:SQLContext,
mode:SaveMode,
optParams:Map[String,String],
df:DataFrame):BaseRelation={
val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)if(optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)){HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)}else{HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)}newHoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)}
它会判断OPERATION是否为BOOTSTRAP_OPERATION_OPT_VAL,这里为true,所以会调用HoodieSparkSqlWriter.bootstrap
HoodieSparkSqlWriter.bootstrap
这里首先获取一些参数,然后判断表是否存在,如果不存在证明是第一次写,需要设置写一些配置参数,然后进行初始化:
HoodieTableMetaClient.initTable
,接着调用
writeClient.bootstrap
def bootstrap(sqlContext:SQLContext,
mode:SaveMode,
optParams:Map[String,String],
df:DataFrame,
hoodieTableConfigOpt:Option[HoodieTableConfig]=Option.empty,
hoodieWriteClient:Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]=Option.empty):Boolean={assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)),"'path' must be set")
val path =optParams("path")
val basePath =newPath(path)
val sparkContext = sqlContext.sparkContext
val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
tableExists = fs.exists(newPath(basePath,HoodieTableMetaClient.METAFOLDER_NAME))
val tableConfig =getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode ==SaveMode.Overwrite)
val (parameters, hoodieConfig)=mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
s"'${BASE_PATH.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'"+" operation'")
val bootstrapIndexClass = hoodieConfig.getStringOrDefault(INDEX_CLASS_NAME)var schema:String=nullif(df.schema.nonEmpty){
val (structName, namespace)=AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
schema =AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, namespace).toString
}else{
schema =HoodieAvroUtils.getNullSchema.toString
}if(mode ==SaveMode.Ignore&& tableExists){
log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")if(!hoodieWriteClient.isEmpty){
hoodieWriteClient.get.close()}false}else{// Handle various save modeshandleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tableName,WriteOperationType.BOOTSTRAP, fs)if(!tableExists){// 表如果不存在
val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
val partitionColumns =HoodieWriterUtils.getPartitionColumns(parameters)
val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
val populateMetaFields =java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))
val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
val useBaseFormatMetaFile =java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())))// 进行一些配置后,初始化Hudi表HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.valueOf(tableType)).setTableName(tableName).setRecordKeyFields(recordKeyFields).setArchiveLogFolder(archiveLogFolder).setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME)).setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD,null)).setBootstrapIndexClass(bootstrapIndexClass).setBaseFileFormat(baseFileFormat).setBootstrapBasePath(bootstrapBasePath).setPartitionFields(partitionColumns).setPopulateMetaFields(populateMetaFields).setKeyGeneratorClassProp(keyGenProp).setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)).setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)).setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))).setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile).initTable(sparkContext.hadoopConfiguration, path)}
val jsc =newJavaSparkContext(sqlContext.sparkContext)
val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
schema, path, tableName,mapAsJavaMap(parameters)))try{
writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())}finally{
writeClient.close()}
val metaSyncSuccess =metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
metaSyncSuccess
}}
writeClient.bootstrap
这里的writeClient为
SparkRDDWriteClient
,然后调用HoodieTable的
bootstrap
,我们这里使用表类型为COW,所以为
HoodieSparkCopyOnWriteTable
initTable(WriteOperationType.UPSERT,Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)).bootstrap(context, extraMetadata);publicstatic<TextendsHoodieRecordPayload>HoodieSparkTable<T>create(HoodieWriteConfig config,HoodieSparkEngineContext context,HoodieTableMetaClient metaClient){HoodieSparkTable<T> hoodieSparkTable;switch(metaClient.getTableType()){case COPY_ON_WRITE:
hoodieSparkTable =newHoodieSparkCopyOnWriteTable<>(config, context, metaClient);break;case MERGE_ON_READ:
hoodieSparkTable =newHoodieSparkMergeOnReadTable<>(config, context, metaClient);break;default:thrownewHoodieException("Unsupported table type :"+ metaClient.getTableType());}return hoodieSparkTable;}
HoodieSparkCopyOnWriteTable.bootstrap
publicHoodieBootstrapWriteMetadata<HoodieData<WriteStatus>>bootstrap(HoodieEngineContext context,Option<Map<String,String>> extraMetadata){returnnewSparkBootstrapCommitActionExecutor((HoodieSparkEngineContext) context, config,this, extraMetadata).execute();}
SparkBootstrapCommitActionExecutor.execute
这里首先通过listAndProcessSourcePartitions返回mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD,然后对于METADATA_ONLY对应的分区路径执行metadataBootstrap,FULL_RECORD对应的分区路径执行fullBootstrap,从这里可以看出两点:1、通过listAndProcessSourcePartitions返回的mode值判断是进行METADATA_ONLY还是FULL_RECORD 2、具体的逻辑分别在metadataBootstrap,fullBootstrap。那么我们分别来看一下,首先看一下listAndProcessSourcePartitions是如何分会mode的
@OverridepublicHoodieBootstrapWriteMetadata<HoodieData<WriteStatus>>execute(){validate();try{HoodieTableMetaClient metaClient = table.getMetaClient();Option<HoodieInstant> completedInstant =
metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();ValidationUtils.checkArgument(!completedInstant.isPresent(),"Active Timeline is expected to be empty for bootstrap to be performed. "+"If you want to re-bootstrap, please rollback bootstrap first !!");// 返回 mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD Map<BootstrapMode,List<Pair<String,List<HoodieFileStatus>>>> partitionSelections =listAndProcessSourcePartitions();// First run metadata bootstrap which will auto commit// 首先运行metadataBootstrap,如果partitionSelections中有METADATA_ONLY则继续执行metadataBootstrap的逻辑,没有的话,什么都不执行,直接返回Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult =metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));// if there are full bootstrap to be performed, perform that too// 然后运行fullBootstrap,如果partitionSelections中有FULL_RECORD则继续执行fullBootstrap的逻辑,没有的话,什么都不执行,直接返回Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult =fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));// Delete the marker directory for the instantWriteMarkersFactory.get(config.getMarkersType(), table, instantTime).quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());returnnewHoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);}catch(IOException ioe){thrownewHoodieIOException(ioe.getMessage(), ioe);}}
listAndProcessSourcePartitions
这里的主要实现是selector.select,这里的select是通过MODE_SELECTOR_CLASS_NAME(hoodie.bootstrap.mode.selector)配置的,默认值为MetadataOnlyBootstrapModeSelector,我们的例子中FULL_RECORD设置的为FullRecordBootstrapModeSelector,让我们分别看一下他们具体的实现
privateMap<BootstrapMode,List<Pair<String,List<HoodieFileStatus>>>>listAndProcessSourcePartitions()throwsIOException{List<Pair<String,List<HoodieFileStatus>>> folders =BootstrapUtils.getAllLeafFoldersWithFiles(
table.getMetaClient(), bootstrapSourceFileSystem, config.getBootstrapSourceBasePath(), context);
LOG.info("Fetching Bootstrap Schema !!");HoodieBootstrapSchemaProvider sourceSchemaProvider =newHoodieSparkBootstrapSchemaProvider(config);
bootstrapSchema = sourceSchemaProvider.getBootstrapSchema(context, folders).toString();
LOG.info("Bootstrap Schema :"+ bootstrapSchema);BootstrapModeSelector selector =(BootstrapModeSelector)ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);Map<BootstrapMode,List<String>> result = selector.select(folders);Map<String,List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(Collectors.toMap(Pair::getKey,Pair::getValue));// Ensure all partitions are accounted forValidationUtils.checkArgument(partitionToFiles.keySet().equals(
result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));return result.entrySet().stream().map(e ->Pair.of(e.getKey(), e.getValue().stream().map(p ->Pair.of(p, partitionToFiles.get(p))).collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey,Pair::getValue));}
selector.select
MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector都是UniformBootstrapModeSelector的子类,区别是bootstrapMode不一样,它们的select方法是在父类UniformBootstrapModeSelector实现的
publicclassMetadataOnlyBootstrapModeSelectorextendsUniformBootstrapModeSelector{publicMetadataOnlyBootstrapModeSelector(HoodieWriteConfig bootstrapConfig){super(bootstrapConfig,BootstrapMode.METADATA_ONLY);}}publicclassFullRecordBootstrapModeSelectorextendsUniformBootstrapModeSelector{publicFullRecordBootstrapModeSelector(HoodieWriteConfig bootstrapConfig){super(bootstrapConfig,BootstrapMode.FULL_RECORD);}}
UniformBootstrapModeSelector.select
很显然上面的mode的返回值和bootstrapMode是对应的,所以当MODE_SELECTOR_CLASS_NAME为MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector时,他们的mode值是唯一的,要么执行metdata的逻辑要么执行full的逻辑,那么有没有两种模式同时会运行的情况呢,答案是有的。
publicMap<BootstrapMode,List<String>>select(List<Pair<String,List<HoodieFileStatus>>> partitions){return partitions.stream().map(p ->Pair.of(bootstrapMode, p)).collect(Collectors.groupingBy(Pair::getKey,Collectors.mapping(x -> x.getValue().getKey(),Collectors.toList())));}
BootstrapRegexModeSelector
BootstrapRegexModeSelector我们在之前的文章中讲过:首先有配置:hoodie.bootstrap.mode.selector.regex.mode 默认值
METADATA_ONLY
、hoodie.bootstrap.mode.selector.regex默认值
.*
但是如果不是默认值的话,比如上面的2020/08/2[0-9],假设我们有分区”2020/08/10,2020/08/10/11,2020/08/20,2020/08/21”,那么匹配成功的2020/08/20和2020/08/21对应的类型为METADATA_ONLY,匹配不成功的2020/08/10和2020/08/10/11则为FULL_RECORD。而至于我的为啥都是FULL_RECORD,原因是regex设置错误,我设置的是2022/10/0[0-9],但实际的分区值为2022-10-08和2022-10-09(分隔符不一样),而如果用默认的.*的话,则全部能匹配上,也就都是METADATA_ONLY(默认情况)
publicBootstrapRegexModeSelector(HoodieWriteConfig writeConfig){super(writeConfig);this.pattern =Pattern.compile(writeConfig.getBootstrapModeSelectorRegex());this.bootstrapModeOnMatch = writeConfig.getBootstrapModeForRegexMatch();// defaultMode和bootstrapModeOnMatch对立this.defaultMode =BootstrapMode.FULL_RECORD.equals(bootstrapModeOnMatch)?BootstrapMode.METADATA_ONLY :BootstrapMode.FULL_RECORD;
LOG.info("Default Mode :"+ defaultMode +", on Match Mode :"+ bootstrapModeOnMatch);}@OverridepublicMap<BootstrapMode,List<String>>select(List<Pair<String,List<HoodieFileStatus>>> partitions){return partitions.stream()// 匹配上的话,值为bootstrapModeOnMatch,默认为METADATA_ONLY,否则为defaultMode,也就是另外一种类型`FULL_RECORD`// bootstrapModeOnMatch 和 defaultMode是对立的.map(p ->Pair.of(pattern.matcher(p.getKey()).matches()? bootstrapModeOnMatch : defaultMode, p.getKey())).collect(Collectors.groupingBy(Pair::getKey,Collectors.mapping(Pair::getValue,Collectors.toList())));}
关于BootstrapModeSelector的实现一共只有上面讲的这三种,下面让我们来看一下metadataBootstrap,fullBootstrap
metadataBootstrap
这里首先创建keyGenerator,然后获取bootstrapPaths,核心逻辑在于后面的getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap,其中getMetadataHandler我们在之前的文章中讲过了,根据文件类型返回ParquetBootstrapMetadataHandler或者OrcBootstrapMetadataHandler,我们这里返回ParquetBootstrapMetadataHandler
privateHoodieData<BootstrapWriteStatus>runMetadataBootstrap(List<Pair<String,List<HoodieFileStatus>>> partitions){if(null== partitions || partitions.isEmpty()){return context.emptyHoodieData();}TypedProperties properties =newTypedProperties();
properties.putAll(config.getProps());KeyGeneratorInterface keyGenerator;try{
keyGenerator =HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties);}catch(IOException e){thrownewHoodieKeyGeneratorException("Init keyGenerator failed ", e);}BootstrapPartitionPathTranslator translator =(BootstrapPartitionPathTranslator)ReflectionUtils.loadClass(
config.getBootstrapPartitionPathTranslatorClass(), properties);List<Pair<String,Pair<String,HoodieFileStatus>>> bootstrapPaths = partitions.stream().flatMap(p ->{String translatedPartitionPath = translator.getBootstrapTranslatedPath(p.getKey());return p.getValue().stream().map(f ->Pair.of(p.getKey(),Pair.of(translatedPartitionPath, f)));}).collect(Collectors.toList());
context.setJobStatus(this.getClass().getSimpleName(),"Bootstrap metadata table: "+ config.getTableName());return context.parallelize(bootstrapPaths, config.getBootstrapParallelism()).map(partitionFsPair ->getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(),
partitionFsPair.getRight().getLeft(), keyGenerator));}
ParquetBootstrapMetadataHandler.runMetadataBootstrap
ParquetBootstrapMetadataHandler的runMetadataBootstrap是在其父类BaseBootstrapMetadataHandler中实现的,这里的核心逻辑在executeBootstrap
publicBootstrapWriteStatusrunMetadataBootstrap(String srcPartitionPath,String partitionPath,KeyGeneratorInterface keyGenerator){Path sourceFilePath =FileStatusUtils.toPath(srcFileStatus.getPath());HoodieBootstrapHandle<?,?,?,?> bootstrapHandle =newHoodieBootstrapHandle(config,HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
table, partitionPath,FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier());try{Schema avroSchema =getAvroSchema(sourceFilePath);List<String> recordKeyColumns = keyGenerator.getRecordKeyFieldNames().stream().map(HoodieAvroUtils::getRootLevelFieldName).collect(Collectors.toList());Schema recordKeySchema =HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns);
LOG.info("Schema to be used for reading record Keys :"+ recordKeySchema);AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema);AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema);executeBootstrap(bootstrapHandle, sourceFilePath, keyGenerator, partitionPath, avroSchema);}catch(Exception e){thrownewHoodieException(e.getMessage(), e);}BootstrapWriteStatus writeStatus =(BootstrapWriteStatus) bootstrapHandle.writeStatuses().get(0);BootstrapFileMapping bootstrapFileMapping =newBootstrapFileMapping(
config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath,
srcFileStatus, writeStatus.getFileId());
writeStatus.setBootstrapSourceFileMapping(bootstrapFileMapping);return writeStatus;}
executeBootstrap
executeBootstrap在ParquetBootstrapMetadataHandler,首先创建一个ParquetReader,然后将reader封装成ParquetReaderIterator,作为BoundedInMemoryExecutor的参数构造wrapper,然后执行wrapper.execute()
voidexecuteBootstrap(HoodieBootstrapHandle<?,?,?,?> bootstrapHandle,Path sourceFilePath,KeyGeneratorInterface keyGenerator,String partitionPath,Schema avroSchema)throwsException{BoundedInMemoryExecutor<GenericRecord,HoodieRecord,Void> wrapper =null;ParquetReader<IndexedRecord> reader =AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build();try{
wrapper =newBoundedInMemoryExecutor<GenericRecord,HoodieRecord,Void>(config.getWriteBufferLimitBytes(),newParquetReaderIterator(reader),newBootstrapRecordConsumer(bootstrapHandle), inp ->{String recKey = keyGenerator.getKey(inp).getRecordKey();GenericRecord gr =newGenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);BootstrapRecordPayload payload =newBootstrapRecordPayload(gr);HoodieRecord rec =newHoodieAvroRecord(newHoodieKey(recKey, partitionPath), payload);return rec;}, table.getPreExecuteRunnable());
wrapper.execute();}catch(Exception e){thrownewHoodieException(e);}finally{
reader.close();if(null!= wrapper){
wrapper.shutdownNow();
wrapper.awaitTermination();}
bootstrapHandle.close();}}
wrapper.execute()
这里是一个生产者-消费者模型,可以参考生产者-消费者模型在Hudi中的应用
publicEexecute(){try{startProducers();Future<E> future =startConsumer();// Wait for consumer to be donereturn future.get();}catch(InterruptedException ie){shutdownNow();Thread.currentThread().interrupt();thrownewHoodieException(ie);}catch(Exception e){thrownewHoodieException(e);}}
startProducers
我们主要看一下producer.produce,这里的producer为Collections.singletonList(new IteratorBasedQueueProducer<>(new ParquetReaderIterator(reader))),所以这里的produce方法为IteratorBasedQueueProducer.produce
publicExecutorCompletionService<Boolean>startProducers(){// Latch to control when and which producer thread will close the queuefinalCountDownLatch latch =newCountDownLatch(producers.size());finalExecutorCompletionService<Boolean> completionService =newExecutorCompletionService<Boolean>(producerExecutorService);
producers.stream().map(producer ->{return completionService.submit(()->{try{
preExecuteRunnable.run();
producer.produce(queue);}catch(Throwable e){
LOG.error("error producing records", e);
queue.markAsFailed(e);throw e;}finally{synchronized(latch){
latch.countDown();if(latch.getCount()==0){// Mark production as done so that consumer will be able to exit
queue.close();}}}returntrue;});}).collect(Collectors.toList());return completionService;}
IteratorBasedQueueProducer.produce
其中queue.insertRecord的逻辑是是先执行transformFunction.apply返回payload,然后将payload放到队列里,关于transformFunction我们放到后面分析,先看inputIterator.hasNext()和inputIterator.next()这里的inputIterator为ParquetReaderIterator
publicvoidproduce(BoundedInMemoryQueue<I,?> queue)throwsException{
LOG.info("starting to buffer records");while(inputIterator.hasNext()){
queue.insertRecord(inputIterator.next());}
LOG.info("finished buffering records");}// BoundedInMemoryQueue.insertRecordpublicvoidinsertRecord(I t)throwsException{// If already closed, throw exceptionif(isWriteDone.get()){thrownewIllegalStateException("Queue closed for enqueueing new entries");}// We need to stop queueing if queue-reader has failed and exited.throwExceptionIfFailed();
rateLimiter.acquire();// We are retrieving insert value in the record queueing thread to offload computation// around schema validation// and record creation to it.finalO payload = transformFunction.apply(t);adjustBufferSizeIfNeeded(payload);
queue.put(Option.of(payload));}
ParquetReaderIterator.next
可以看到hasNext的逻辑是判断next是否为null,而next,则返回parquetReader.read(),parquetReader.read()在类ParquetReader中实现,这已经是parquet的源码了,我们就不往下分析了,总之生产者的逻辑是读取parquet的内容放到队列里供消费者使用,接下来看一下消费者
publicbooleanhasNext(){try{// To handle when hasNext() is called multiple times for idempotency and/or the first timeif(this.next ==null){this.next = parquetReader.read();}returnthis.next !=null;}catch(Exception e){FileIOUtils.closeQuietly(parquetReader);thrownewHoodieException("unable to read next record from parquet file ", e);}}// 这里的T为IndexedRecord@OverridepublicTnext(){try{// To handle case when next() is called before hasNext()if(this.next ==null){if(!hasNext()){thrownewHoodieException("No more records left to read from parquet file");}}T retVal =this.next;this.next = parquetReader.read();return retVal;}catch(Exception e){FileIOUtils.closeQuietly(parquetReader);thrownewHoodieException("unable to read next record from parquet file ", e);}}
startConsumer
这里主要是consumer.consume,这里的consumer为new BootstrapRecordConsumer(bootstrapHandle)
privateFuture<E>startConsumer(){return consumer.map(consumer ->{return consumerExecutorService.submit(()->{
LOG.info("starting consumer thread");
preExecuteRunnable.run();try{E result = consumer.consume(queue);
LOG.info("Queue Consumption is done; notifying producer threads");return result;}catch(Exception e){
LOG.error("error consuming records", e);
queue.markAsFailed(e);throw e;}});}).orElse(CompletableFuture.completedFuture(null));}
BootstrapRecordConsumer.consume
它的consume方法是在父类BoundedInMemoryQueueConsumer中实现的,首先通过queue.iterator().next()从队列queue里获取数据,再调用consumeOneRecord,
publicOconsume(BoundedInMemoryQueue<?,I> queue)throwsException{Iterator<I> iterator = queue.iterator();while(iterator.hasNext()){consumeOneRecord(iterator.next());}// Notifies donefinish();returngetResult();}
BootstrapRecordConsumer.consumeOneRecord
这里的record和Payload是啥呢?这就要看我们上面提到的transformFunction了,他是在上面的方法executeBootstrap中定义的
protectedvoidconsumeOneRecord(HoodieRecordrecord){try{
bootstrapHandle.write(record,((HoodieRecordPayload)record.getData()).getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));}catch(IOException e){thrownewHoodieIOException(e.getMessage(), e);}}
publicOption<IndexedRecord>getInsertValue(Schema schema){returnOption.ofNullable(record);}
transformFunction
这里的inp应该为queue.iterator().next()返回的IndexedRecord,首先从IndexedRecord中获取recKey,然后将recKey作为hudi的主键元数据字段放到GenericRecord中,然后将gr作为BootstrapRecordPayload的record构造BootstrapRecordPayload,最后返回new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload),所以上面的record即为下面的rec,而上面的HoodieRecordPayload是通过record.getData()获取的,也就是HoodieAvroRecord中的data,即为下面的BootstrapRecordPayload
inp ->{String recKey = keyGenerator.getKey(inp).getRecordKey();GenericRecord gr =newGenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);BootstrapRecordPayload payload =newBootstrapRecordPayload(gr);HoodieRecord rec =newHoodieAvroRecord(newHoodieKey(recKey, partitionPath), payload);return rec;}
BootstrapRecordPayload.getInsertValue
这里的逻辑也很简单, 直接返回record,在上面的transformFunction中可知,record为GenericRecord,只有一个字段RECORD_KEY_METADATA_FIELD,这样我们知道了getInsertValue返回值,就可以继续看一下后面的write方法了
publicOption<IndexedRecord>getInsertValue(Schema schema){returnOption.ofNullable(record);}
HoodieBootstrapHandle.write
HoodieBootstrapHandle的write方法是在父类HoodieCreateHandle中实现的,这里的核心逻辑是通过fileWriter将avroRecord写到对应的parquet中,具体的实现也是在parquet源码中,我们知道这里的Record信息只有主键信息和分区信息,这就是为什么metadata为啥只会生成主键、页脚的基本框架文件,不会重写全部数据的原因了,关于METADATA_ONLY我们就分析到这里了,接着看一下FULL_RECORD
publicvoidwrite(HoodieRecordrecord,Option<IndexedRecord> avroRecord){Option recordMetadata =((HoodieRecordPayload)record.getData()).getMetadata();if(HoodieOperation.isDelete(record.getOperation())){
avroRecord =Option.empty();}try{if(avroRecord.isPresent()){if(avroRecord.get().equals(IGNORE_RECORD)){return;}// Convert GenericRecord to GenericRecord with hoodie commit metadata in schemaif(preserveMetadata){
fileWriter.writeAvro(record.getRecordKey(),rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));}else{
fileWriter.writeAvroWithMetadata(record.getKey(),rewriteRecord((GenericRecord) avroRecord.get()));}// update the new location of record, so we know where to find it nextrecord.unseal();record.setNewLocation(newHoodieRecordLocation(instantTime, writeStatus.getFileId()));record.seal();
recordsWritten++;
insertRecordsWritten++;}else{
recordsDeleted++;}
writeStatus.markSuccess(record, recordMetadata);// deflate record payload after recording success. This will help users access payload as a// part of marking// record successful.record.deflate();}catch(Throwable t){// Not throwing exception from here, since we don't want to fail the entire job// for a single record
writeStatus.markFailure(record, t, recordMetadata);
LOG.error("Error writing record "+record, t);}}
SparkBootstrapCommitActionExecutor.fullBootstrap
首先通过反射构造inputProvider,它是通过FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME(hoodie.bootstrap.full.input.provider)配置的,默认值为SparkParquetBootstrapDataProvider,然后由inputProvider.generateInputRecords读取源表的parquet文件返回inputRecordsRDD,这里返回的是全部的内容,最后将inputRecordsRDD作为参数,由getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute(),最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中,也就是以Hudi表的形式执行数据的完整复制/重写。关于bulkInsert的源码逻辑,由于比较多,限于篇幅和精力以及能力的原因,本文就不深入介绍了,有机会的话我会再单独分享一篇的。
protectedOption<HoodieWriteMetadata<HoodieData<WriteStatus>>>fullBootstrap(List<Pair<String,List<HoodieFileStatus>>> partitionFilesList){if(null== partitionFilesList || partitionFilesList.isEmpty()){returnOption.empty();}TypedProperties properties =newTypedProperties();
properties.putAll(config.getProps());FullRecordBootstrapDataProvider inputProvider =(FullRecordBootstrapDataProvider)ReflectionUtils.loadClass(config.getFullBootstrapInputProvider(),
properties, context);JavaRDD<HoodieRecord> inputRecordsRDD =(JavaRDD<HoodieRecord>) inputProvider.generateInputRecords("bootstrap_source", config.getBootstrapSourceBasePath(),
partitionFilesList);// Start Full BootstrapfinalHoodieInstant requested =newHoodieInstant(State.REQUESTED, table.getMetaClient().getCommitActionType(),HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
table.getActiveTimeline().createNewInstant(requested);// Setup correct schema and run bulk insert.returnOption.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());}
在父类SparkFullBootstrapDataProviderBase
@OverridepublicJavaRDD<HoodieRecord>generateInputRecords(String tableName,String sourceBasePath,List<Pair<String,List<HoodieFileStatus>>> partitionPathsWithFiles){String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue).flatMap(f -> f.stream().map(fs ->FileStatusUtils.toPath(fs.getPath()).toString())).toArray(String[]::new);Dataset inputDataset = sparkSession.read().format(getFormat()).load(filePaths);try{KeyGenerator keyGenerator =HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);String structName = tableName +"_record";String namespace ="hoodie."+ tableName;
RDD<GenericRecord> genericRecords =HoodieSparkUtils.createRdd(inputDataset, structName, namespace,false,Option.empty());return genericRecords.toJavaRDD().map(gr ->{String orderingVal =HoodieAvroUtils.getNestedFieldValAsString(
gr, props.getString("hoodie.datasource.write.precombine.field"),false, props.getBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));try{returnDataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
props.getString("hoodie.datasource.write.payload.class"));}catch(IOException ioe){thrownewHoodieIOException(ioe.getMessage(), ioe);}});}catch(IOException ioe){thrownewHoodieIOException(ioe.getMessage(), ioe);}}
getBulkInsertActionExecutor
返回SparkBulkInsertCommitActionExecutor
protectedBaseSparkCommitActionExecutor<T>getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD){returnnewSparkBulkInsertCommitActionExecutor((HoodieSparkEngineContext) context,newHoodieWriteConfig.Builder().withProps(config.getProps()).withSchema(bootstrapSchema).build(), table,HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
inputRecordsRDD,Option.empty(), extraMetadata);}
SparkBulkInsertCommitActionExecutor.execute
执行SparkBulkInsertHelper.bulkInsert
publicHoodieWriteMetadata<HoodieData<WriteStatus>>execute(){try{returnSparkBulkInsertHelper.newInstance().bulkInsert(inputRecordsRDD, instantTime, table, config,this,true, bulkInsertPartitioner);}catch(HoodieInsertException ie){throw ie;}catch(Throwable e){thrownewHoodieInsertException("Failed to bulk insert for commit time "+ instantTime, e);}}
SparkBulkInsertHelper.bulkInsert
最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中
publicHoodieWriteMetadata<HoodieData<WriteStatus>>bulkInsert(finalHoodieData<HoodieRecord<T>> inputRecords,finalString instantTime,finalHoodieTable<T,HoodieData<HoodieRecord<T>>,HoodieData<HoodieKey>,HoodieData<WriteStatus>> table,finalHoodieWriteConfig config,finalBaseCommitActionExecutor<T,HoodieData<HoodieRecord<T>>,HoodieData<HoodieKey>,HoodieData<WriteStatus>,R> executor,finalboolean performDedupe,finalOption<BulkInsertPartitioner> userDefinedBulkInsertPartitioner){HoodieWriteMetadata result =newHoodieWriteMetadata();//transition bulk_insert state to inflight
table.getActiveTimeline().transitionRequestedToInflight(newHoodieInstant(HoodieInstant.State.REQUESTED,
executor.getCommitActionType(), instantTime),Option.empty(),
config.shouldAllowMultiWriteOnSameInstant());BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));// write new filesHoodieData<WriteStatus> writeStatuses =bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner,false,
config.getBulkInsertShuffleParallelism(),newCreateHandleFactory(false));//update index((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);return result;}
总结
本文简单的对Hudi bootstrap的一些关键的源码逻辑进行了分析,希望能对大家有所帮助。限于精力及能力的原因,有些地方可能不够深入,或者不对的地方,还请大家多多指正,让我们共同进步。
版权归原作者 董可伦 所有, 如有侵权,请联系我们删除。