0


Hudi源码|bootstrap源码分析总结(写Hudi)

前言

Apache Hudi bootstrap源码简要走读,不了解Hudi bootstrap的可以参考:利用Hudi Bootstrap转化现有Hive表的parquet/orc文件为Hudi表

版本

Hudi 0.12.0
Spark 2.4.4

入口

val bootstrapDF = spark.emptyDataFrame
    bootstrapDF.write.format("hudi").options(extraOpts).option(DataSourceWriteOptions.OPERATION.key,DataSourceWriteOptions.BOOTSTRAP_OPERATION_OPT_VAL).......save(basePath)

根据文章:Hudi Spark源码学习总结-df.write.format(“hudi”).save可知,

save

方法会走到DefaultSource.createRelation

  override def createRelation(sqlContext:SQLContext,
                              mode:SaveMode,
                              optParams:Map[String,String],
                              df:DataFrame):BaseRelation={
    val dfWithoutMetaCols = df.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala:_*)if(optParams.get(OPERATION.key).contains(BOOTSTRAP_OPERATION_OPT_VAL)){HoodieSparkSqlWriter.bootstrap(sqlContext, mode, optParams, dfWithoutMetaCols)}else{HoodieSparkSqlWriter.write(sqlContext, mode, optParams, dfWithoutMetaCols)}newHoodieEmptyRelation(sqlContext, dfWithoutMetaCols.schema)}

它会判断OPERATION是否为BOOTSTRAP_OPERATION_OPT_VAL,这里为true,所以会调用HoodieSparkSqlWriter.bootstrap

HoodieSparkSqlWriter.bootstrap

这里首先获取一些参数,然后判断表是否存在,如果不存在证明是第一次写,需要设置写一些配置参数,然后进行初始化:

HoodieTableMetaClient.initTable

,接着调用

writeClient.bootstrap
  def bootstrap(sqlContext:SQLContext,
                mode:SaveMode,
                optParams:Map[String,String],
                df:DataFrame,
                hoodieTableConfigOpt:Option[HoodieTableConfig]=Option.empty,
                hoodieWriteClient:Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]=Option.empty):Boolean={assert(optParams.get("path").exists(!StringUtils.isNullOrEmpty(_)),"'path' must be set")
    val path =optParams("path")
    val basePath =newPath(path)
    val sparkContext = sqlContext.sparkContext
    val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
    tableExists = fs.exists(newPath(basePath,HoodieTableMetaClient.METAFOLDER_NAME))
    val tableConfig =getHoodieTableConfig(sparkContext, path, hoodieTableConfigOpt)validateTableConfig(sqlContext.sparkSession, optParams, tableConfig, mode ==SaveMode.Overwrite)

    val (parameters, hoodieConfig)=mergeParamsAndGetHoodieConfig(optParams, tableConfig, mode)
    val tableName = hoodieConfig.getStringOrThrow(HoodieWriteConfig.TBL_NAME, s"'${HoodieWriteConfig.TBL_NAME.key}' must be set.")
    val tableType = hoodieConfig.getStringOrDefault(TABLE_TYPE)
    val bootstrapBasePath = hoodieConfig.getStringOrThrow(BASE_PATH,
      s"'${BASE_PATH.key}' is required for '${BOOTSTRAP_OPERATION_OPT_VAL}'"+" operation'")
    val bootstrapIndexClass = hoodieConfig.getStringOrDefault(INDEX_CLASS_NAME)var schema:String=nullif(df.schema.nonEmpty){
      val (structName, namespace)=AvroConversionUtils.getAvroRecordNameAndNamespace(tableName)
      schema =AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, namespace).toString
    }else{
      schema =HoodieAvroUtils.getNullSchema.toString
    }if(mode ==SaveMode.Ignore&& tableExists){
      log.warn(s"hoodie table at $basePath already exists. Ignoring & not performing actual writes.")if(!hoodieWriteClient.isEmpty){
        hoodieWriteClient.get.close()}false}else{// Handle various save modeshandleSaveModes(sqlContext.sparkSession, mode, basePath, tableConfig, tableName,WriteOperationType.BOOTSTRAP, fs)if(!tableExists){// 表如果不存在
        val archiveLogFolder = hoodieConfig.getStringOrDefault(HoodieTableConfig.ARCHIVELOG_FOLDER)
        val partitionColumns =HoodieWriterUtils.getPartitionColumns(parameters)
        val recordKeyFields = hoodieConfig.getString(DataSourceWriteOptions.RECORDKEY_FIELD)
        val keyGenProp = hoodieConfig.getString(HoodieTableConfig.KEY_GENERATOR_CLASS_NAME)
        val populateMetaFields =java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.POPULATE_META_FIELDS.key(),String.valueOf(HoodieTableConfig.POPULATE_META_FIELDS.defaultValue())))
        val baseFileFormat = hoodieConfig.getStringOrDefault(HoodieTableConfig.BASE_FILE_FORMAT)
        val useBaseFormatMetaFile =java.lang.Boolean.parseBoolean(parameters.getOrElse(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.key(),String.valueOf(HoodieTableConfig.PARTITION_METAFILE_USE_BASE_FORMAT.defaultValue())))// 进行一些配置后,初始化Hudi表HoodieTableMetaClient.withPropertyBuilder().setTableType(HoodieTableType.valueOf(tableType)).setTableName(tableName).setRecordKeyFields(recordKeyFields).setArchiveLogFolder(archiveLogFolder).setPayloadClassName(hoodieConfig.getStringOrDefault(PAYLOAD_CLASS_NAME)).setPreCombineField(hoodieConfig.getStringOrDefault(PRECOMBINE_FIELD,null)).setBootstrapIndexClass(bootstrapIndexClass).setBaseFileFormat(baseFileFormat).setBootstrapBasePath(bootstrapBasePath).setPartitionFields(partitionColumns).setPopulateMetaFields(populateMetaFields).setKeyGeneratorClassProp(keyGenProp).setHiveStylePartitioningEnable(hoodieConfig.getBoolean(HIVE_STYLE_PARTITIONING)).setUrlEncodePartitioning(hoodieConfig.getBoolean(URL_ENCODE_PARTITIONING)).setCommitTimezone(HoodieTimelineTimeZone.valueOf(hoodieConfig.getStringOrDefault(HoodieTableConfig.TIMELINE_TIMEZONE))).setPartitionMetafileUseBaseFormat(useBaseFormatMetaFile).initTable(sparkContext.hadoopConfiguration, path)}

      val jsc =newJavaSparkContext(sqlContext.sparkContext)
      val writeClient = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc,
        schema, path, tableName,mapAsJavaMap(parameters)))try{
        writeClient.bootstrap(org.apache.hudi.common.util.Option.empty())}finally{
        writeClient.close()}
      val metaSyncSuccess =metaSync(sqlContext.sparkSession, hoodieConfig, basePath, df.schema)
      metaSyncSuccess
    }}

writeClient.bootstrap

这里的writeClient为

SparkRDDWriteClient

,然后调用HoodieTable的

bootstrap

,我们这里使用表类型为COW,所以为

HoodieSparkCopyOnWriteTable
initTable(WriteOperationType.UPSERT,Option.ofNullable(HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)).bootstrap(context, extraMetadata);publicstatic<TextendsHoodieRecordPayload>HoodieSparkTable<T>create(HoodieWriteConfig config,HoodieSparkEngineContext context,HoodieTableMetaClient metaClient){HoodieSparkTable<T> hoodieSparkTable;switch(metaClient.getTableType()){case COPY_ON_WRITE:
        hoodieSparkTable =newHoodieSparkCopyOnWriteTable<>(config, context, metaClient);break;case MERGE_ON_READ:
        hoodieSparkTable =newHoodieSparkMergeOnReadTable<>(config, context, metaClient);break;default:thrownewHoodieException("Unsupported table type :"+ metaClient.getTableType());}return hoodieSparkTable;}

HoodieSparkCopyOnWriteTable.bootstrap

publicHoodieBootstrapWriteMetadata<HoodieData<WriteStatus>>bootstrap(HoodieEngineContext context,Option<Map<String,String>> extraMetadata){returnnewSparkBootstrapCommitActionExecutor((HoodieSparkEngineContext) context, config,this, extraMetadata).execute();}

SparkBootstrapCommitActionExecutor.execute

这里首先通过listAndProcessSourcePartitions返回mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD,然后对于METADATA_ONLY对应的分区路径执行metadataBootstrap,FULL_RECORD对应的分区路径执行fullBootstrap,从这里可以看出两点:1、通过listAndProcessSourcePartitions返回的mode值判断是进行METADATA_ONLY还是FULL_RECORD 2、具体的逻辑分别在metadataBootstrap,fullBootstrap。那么我们分别来看一下,首先看一下listAndProcessSourcePartitions是如何分会mode的

@OverridepublicHoodieBootstrapWriteMetadata<HoodieData<WriteStatus>>execute(){validate();try{HoodieTableMetaClient metaClient = table.getMetaClient();Option<HoodieInstant> completedInstant =
          metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant();ValidationUtils.checkArgument(!completedInstant.isPresent(),"Active Timeline is expected to be empty for bootstrap to be performed. "+"If you want to re-bootstrap, please rollback bootstrap first !!");// 返回 mode和对应的分区,其中mode有两种METADATA_ONLY和FULL_RECORD       Map<BootstrapMode,List<Pair<String,List<HoodieFileStatus>>>> partitionSelections =listAndProcessSourcePartitions();// First run metadata bootstrap which will auto commit// 首先运行metadataBootstrap,如果partitionSelections中有METADATA_ONLY则继续执行metadataBootstrap的逻辑,没有的话,什么都不执行,直接返回Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> metadataResult =metadataBootstrap(partitionSelections.get(BootstrapMode.METADATA_ONLY));// if there are full bootstrap to be performed, perform that too// 然后运行fullBootstrap,如果partitionSelections中有FULL_RECORD则继续执行fullBootstrap的逻辑,没有的话,什么都不执行,直接返回Option<HoodieWriteMetadata<HoodieData<WriteStatus>>> fullBootstrapResult =fullBootstrap(partitionSelections.get(BootstrapMode.FULL_RECORD));// Delete the marker directory for the instantWriteMarkersFactory.get(config.getMarkersType(), table, instantTime).quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());returnnewHoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult);}catch(IOException ioe){thrownewHoodieIOException(ioe.getMessage(), ioe);}}

listAndProcessSourcePartitions

这里的主要实现是selector.select,这里的select是通过MODE_SELECTOR_CLASS_NAME(hoodie.bootstrap.mode.selector)配置的,默认值为MetadataOnlyBootstrapModeSelector,我们的例子中FULL_RECORD设置的为FullRecordBootstrapModeSelector,让我们分别看一下他们具体的实现

privateMap<BootstrapMode,List<Pair<String,List<HoodieFileStatus>>>>listAndProcessSourcePartitions()throwsIOException{List<Pair<String,List<HoodieFileStatus>>> folders =BootstrapUtils.getAllLeafFoldersWithFiles(
            table.getMetaClient(), bootstrapSourceFileSystem, config.getBootstrapSourceBasePath(), context);

    LOG.info("Fetching Bootstrap Schema !!");HoodieBootstrapSchemaProvider sourceSchemaProvider =newHoodieSparkBootstrapSchemaProvider(config);
    bootstrapSchema = sourceSchemaProvider.getBootstrapSchema(context, folders).toString();
    LOG.info("Bootstrap Schema :"+ bootstrapSchema);BootstrapModeSelector selector =(BootstrapModeSelector)ReflectionUtils.loadClass(config.getBootstrapModeSelectorClass(), config);Map<BootstrapMode,List<String>> result = selector.select(folders);Map<String,List<HoodieFileStatus>> partitionToFiles = folders.stream().collect(Collectors.toMap(Pair::getKey,Pair::getValue));// Ensure all partitions are accounted forValidationUtils.checkArgument(partitionToFiles.keySet().equals(
        result.values().stream().flatMap(Collection::stream).collect(Collectors.toSet())));return result.entrySet().stream().map(e ->Pair.of(e.getKey(), e.getValue().stream().map(p ->Pair.of(p, partitionToFiles.get(p))).collect(Collectors.toList()))).collect(Collectors.toMap(Pair::getKey,Pair::getValue));}

selector.select

MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector都是UniformBootstrapModeSelector的子类,区别是bootstrapMode不一样,它们的select方法是在父类UniformBootstrapModeSelector实现的

publicclassMetadataOnlyBootstrapModeSelectorextendsUniformBootstrapModeSelector{publicMetadataOnlyBootstrapModeSelector(HoodieWriteConfig bootstrapConfig){super(bootstrapConfig,BootstrapMode.METADATA_ONLY);}}publicclassFullRecordBootstrapModeSelectorextendsUniformBootstrapModeSelector{publicFullRecordBootstrapModeSelector(HoodieWriteConfig bootstrapConfig){super(bootstrapConfig,BootstrapMode.FULL_RECORD);}}

UniformBootstrapModeSelector.select

很显然上面的mode的返回值和bootstrapMode是对应的,所以当MODE_SELECTOR_CLASS_NAME为MetadataOnlyBootstrapModeSelector和FullRecordBootstrapModeSelector时,他们的mode值是唯一的,要么执行metdata的逻辑要么执行full的逻辑,那么有没有两种模式同时会运行的情况呢,答案是有的。

publicMap<BootstrapMode,List<String>>select(List<Pair<String,List<HoodieFileStatus>>> partitions){return partitions.stream().map(p ->Pair.of(bootstrapMode, p)).collect(Collectors.groupingBy(Pair::getKey,Collectors.mapping(x -> x.getValue().getKey(),Collectors.toList())));}

BootstrapRegexModeSelector

BootstrapRegexModeSelector我们在之前的文章中讲过:首先有配置:hoodie.bootstrap.mode.selector.regex.mode 默认值

METADATA_ONLY

、hoodie.bootstrap.mode.selector.regex默认值

.*

但是如果不是默认值的话,比如上面的2020/08/2[0-9],假设我们有分区”2020/08/10,2020/08/10/11,2020/08/20,2020/08/21”,那么匹配成功的2020/08/20和2020/08/21对应的类型为METADATA_ONLY,匹配不成功的2020/08/10和2020/08/10/11则为FULL_RECORD。而至于我的为啥都是FULL_RECORD,原因是regex设置错误,我设置的是2022/10/0[0-9],但实际的分区值为2022-10-08和2022-10-09(分隔符不一样),而如果用默认的.*的话,则全部能匹配上,也就都是METADATA_ONLY(默认情况)

publicBootstrapRegexModeSelector(HoodieWriteConfig writeConfig){super(writeConfig);this.pattern =Pattern.compile(writeConfig.getBootstrapModeSelectorRegex());this.bootstrapModeOnMatch = writeConfig.getBootstrapModeForRegexMatch();// defaultMode和bootstrapModeOnMatch对立this.defaultMode =BootstrapMode.FULL_RECORD.equals(bootstrapModeOnMatch)?BootstrapMode.METADATA_ONLY :BootstrapMode.FULL_RECORD;
    LOG.info("Default Mode :"+ defaultMode +", on Match Mode :"+ bootstrapModeOnMatch);}@OverridepublicMap<BootstrapMode,List<String>>select(List<Pair<String,List<HoodieFileStatus>>> partitions){return partitions.stream()// 匹配上的话,值为bootstrapModeOnMatch,默认为METADATA_ONLY,否则为defaultMode,也就是另外一种类型`FULL_RECORD`// bootstrapModeOnMatch 和 defaultMode是对立的.map(p ->Pair.of(pattern.matcher(p.getKey()).matches()? bootstrapModeOnMatch : defaultMode, p.getKey())).collect(Collectors.groupingBy(Pair::getKey,Collectors.mapping(Pair::getValue,Collectors.toList())));}

关于BootstrapModeSelector的实现一共只有上面讲的这三种,下面让我们来看一下metadataBootstrap,fullBootstrap

metadataBootstrap

这里首先创建keyGenerator,然后获取bootstrapPaths,核心逻辑在于后面的getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap,其中getMetadataHandler我们在之前的文章中讲过了,根据文件类型返回ParquetBootstrapMetadataHandler或者OrcBootstrapMetadataHandler,我们这里返回ParquetBootstrapMetadataHandler

privateHoodieData<BootstrapWriteStatus>runMetadataBootstrap(List<Pair<String,List<HoodieFileStatus>>> partitions){if(null== partitions || partitions.isEmpty()){return context.emptyHoodieData();}TypedProperties properties =newTypedProperties();
    properties.putAll(config.getProps());KeyGeneratorInterface keyGenerator;try{
      keyGenerator =HoodieSparkKeyGeneratorFactory.createKeyGenerator(properties);}catch(IOException e){thrownewHoodieKeyGeneratorException("Init keyGenerator failed ", e);}BootstrapPartitionPathTranslator translator =(BootstrapPartitionPathTranslator)ReflectionUtils.loadClass(
        config.getBootstrapPartitionPathTranslatorClass(), properties);List<Pair<String,Pair<String,HoodieFileStatus>>> bootstrapPaths = partitions.stream().flatMap(p ->{String translatedPartitionPath = translator.getBootstrapTranslatedPath(p.getKey());return p.getValue().stream().map(f ->Pair.of(p.getKey(),Pair.of(translatedPartitionPath, f)));}).collect(Collectors.toList());

    context.setJobStatus(this.getClass().getSimpleName(),"Bootstrap metadata table: "+ config.getTableName());return context.parallelize(bootstrapPaths, config.getBootstrapParallelism()).map(partitionFsPair ->getMetadataHandler(config, table, partitionFsPair.getRight().getRight()).runMetadataBootstrap(partitionFsPair.getLeft(),
                partitionFsPair.getRight().getLeft(), keyGenerator));}

ParquetBootstrapMetadataHandler.runMetadataBootstrap

ParquetBootstrapMetadataHandler的runMetadataBootstrap是在其父类BaseBootstrapMetadataHandler中实现的,这里的核心逻辑在executeBootstrap

publicBootstrapWriteStatusrunMetadataBootstrap(String srcPartitionPath,String partitionPath,KeyGeneratorInterface keyGenerator){Path sourceFilePath =FileStatusUtils.toPath(srcFileStatus.getPath());HoodieBootstrapHandle<?,?,?,?> bootstrapHandle =newHoodieBootstrapHandle(config,HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
        table, partitionPath,FSUtils.createNewFileIdPfx(), table.getTaskContextSupplier());try{Schema avroSchema =getAvroSchema(sourceFilePath);List<String> recordKeyColumns = keyGenerator.getRecordKeyFieldNames().stream().map(HoodieAvroUtils::getRootLevelFieldName).collect(Collectors.toList());Schema recordKeySchema =HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns);
      LOG.info("Schema to be used for reading record Keys :"+ recordKeySchema);AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema);AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema);executeBootstrap(bootstrapHandle, sourceFilePath, keyGenerator, partitionPath, avroSchema);}catch(Exception e){thrownewHoodieException(e.getMessage(), e);}BootstrapWriteStatus writeStatus =(BootstrapWriteStatus) bootstrapHandle.writeStatuses().get(0);BootstrapFileMapping bootstrapFileMapping =newBootstrapFileMapping(
        config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath,
        srcFileStatus, writeStatus.getFileId());
    writeStatus.setBootstrapSourceFileMapping(bootstrapFileMapping);return writeStatus;}

executeBootstrap

executeBootstrap在ParquetBootstrapMetadataHandler,首先创建一个ParquetReader,然后将reader封装成ParquetReaderIterator,作为BoundedInMemoryExecutor的参数构造wrapper,然后执行wrapper.execute()

voidexecuteBootstrap(HoodieBootstrapHandle<?,?,?,?> bootstrapHandle,Path sourceFilePath,KeyGeneratorInterface keyGenerator,String partitionPath,Schema avroSchema)throwsException{BoundedInMemoryExecutor<GenericRecord,HoodieRecord,Void> wrapper =null;ParquetReader<IndexedRecord> reader =AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build();try{
      wrapper =newBoundedInMemoryExecutor<GenericRecord,HoodieRecord,Void>(config.getWriteBufferLimitBytes(),newParquetReaderIterator(reader),newBootstrapRecordConsumer(bootstrapHandle), inp ->{String recKey = keyGenerator.getKey(inp).getRecordKey();GenericRecord gr =newGenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
        gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);BootstrapRecordPayload payload =newBootstrapRecordPayload(gr);HoodieRecord rec =newHoodieAvroRecord(newHoodieKey(recKey, partitionPath), payload);return rec;}, table.getPreExecuteRunnable());
      wrapper.execute();}catch(Exception e){thrownewHoodieException(e);}finally{
      reader.close();if(null!= wrapper){
        wrapper.shutdownNow();
        wrapper.awaitTermination();}
      bootstrapHandle.close();}}

wrapper.execute()

这里是一个生产者-消费者模型,可以参考生产者-消费者模型在Hudi中的应用

publicEexecute(){try{startProducers();Future<E> future =startConsumer();// Wait for consumer to be donereturn future.get();}catch(InterruptedException ie){shutdownNow();Thread.currentThread().interrupt();thrownewHoodieException(ie);}catch(Exception e){thrownewHoodieException(e);}}

startProducers

我们主要看一下producer.produce,这里的producer为Collections.singletonList(new IteratorBasedQueueProducer<>(new ParquetReaderIterator(reader))),所以这里的produce方法为IteratorBasedQueueProducer.produce

publicExecutorCompletionService<Boolean>startProducers(){// Latch to control when and which producer thread will close the queuefinalCountDownLatch latch =newCountDownLatch(producers.size());finalExecutorCompletionService<Boolean> completionService =newExecutorCompletionService<Boolean>(producerExecutorService);
    producers.stream().map(producer ->{return completionService.submit(()->{try{
          preExecuteRunnable.run();
          producer.produce(queue);}catch(Throwable e){
          LOG.error("error producing records", e);
          queue.markAsFailed(e);throw e;}finally{synchronized(latch){
            latch.countDown();if(latch.getCount()==0){// Mark production as done so that consumer will be able to exit
              queue.close();}}}returntrue;});}).collect(Collectors.toList());return completionService;}

IteratorBasedQueueProducer.produce

其中queue.insertRecord的逻辑是是先执行transformFunction.apply返回payload,然后将payload放到队列里,关于transformFunction我们放到后面分析,先看inputIterator.hasNext()和inputIterator.next()这里的inputIterator为ParquetReaderIterator

publicvoidproduce(BoundedInMemoryQueue<I,?> queue)throwsException{
    LOG.info("starting to buffer records");while(inputIterator.hasNext()){
      queue.insertRecord(inputIterator.next());}
    LOG.info("finished buffering records");}// BoundedInMemoryQueue.insertRecordpublicvoidinsertRecord(I t)throwsException{// If already closed, throw exceptionif(isWriteDone.get()){thrownewIllegalStateException("Queue closed for enqueueing new entries");}// We need to stop queueing if queue-reader has failed and exited.throwExceptionIfFailed();

    rateLimiter.acquire();// We are retrieving insert value in the record queueing thread to offload computation// around schema validation// and record creation to it.finalO payload = transformFunction.apply(t);adjustBufferSizeIfNeeded(payload);
    queue.put(Option.of(payload));}

ParquetReaderIterator.next

可以看到hasNext的逻辑是判断next是否为null,而next,则返回parquetReader.read(),parquetReader.read()在类ParquetReader中实现,这已经是parquet的源码了,我们就不往下分析了,总之生产者的逻辑是读取parquet的内容放到队列里供消费者使用,接下来看一下消费者

publicbooleanhasNext(){try{// To handle when hasNext() is called multiple times for idempotency and/or the first timeif(this.next ==null){this.next = parquetReader.read();}returnthis.next !=null;}catch(Exception e){FileIOUtils.closeQuietly(parquetReader);thrownewHoodieException("unable to read next record from parquet file ", e);}}// 这里的T为IndexedRecord@OverridepublicTnext(){try{// To handle case when next() is called before hasNext()if(this.next ==null){if(!hasNext()){thrownewHoodieException("No more records left to read from parquet file");}}T retVal =this.next;this.next = parquetReader.read();return retVal;}catch(Exception e){FileIOUtils.closeQuietly(parquetReader);thrownewHoodieException("unable to read next record from parquet file ", e);}}

startConsumer

这里主要是consumer.consume,这里的consumer为new BootstrapRecordConsumer(bootstrapHandle)

privateFuture<E>startConsumer(){return consumer.map(consumer ->{return consumerExecutorService.submit(()->{
        LOG.info("starting consumer thread");
        preExecuteRunnable.run();try{E result = consumer.consume(queue);
          LOG.info("Queue Consumption is done; notifying producer threads");return result;}catch(Exception e){
          LOG.error("error consuming records", e);
          queue.markAsFailed(e);throw e;}});}).orElse(CompletableFuture.completedFuture(null));}

BootstrapRecordConsumer.consume

它的consume方法是在父类BoundedInMemoryQueueConsumer中实现的,首先通过queue.iterator().next()从队列queue里获取数据,再调用consumeOneRecord,

publicOconsume(BoundedInMemoryQueue<?,I> queue)throwsException{Iterator<I> iterator = queue.iterator();while(iterator.hasNext()){consumeOneRecord(iterator.next());}// Notifies donefinish();returngetResult();}

BootstrapRecordConsumer.consumeOneRecord

这里的record和Payload是啥呢?这就要看我们上面提到的transformFunction了,他是在上面的方法executeBootstrap中定义的

protectedvoidconsumeOneRecord(HoodieRecordrecord){try{
      bootstrapHandle.write(record,((HoodieRecordPayload)record.getData()).getInsertValue(bootstrapHandle.getWriterSchemaWithMetaFields()));}catch(IOException e){thrownewHoodieIOException(e.getMessage(), e);}}
publicOption<IndexedRecord>getInsertValue(Schema schema){returnOption.ofNullable(record);}

transformFunction

这里的inp应该为queue.iterator().next()返回的IndexedRecord,首先从IndexedRecord中获取recKey,然后将recKey作为hudi的主键元数据字段放到GenericRecord中,然后将gr作为BootstrapRecordPayload的record构造BootstrapRecordPayload,最后返回new HoodieAvroRecord(new HoodieKey(recKey, partitionPath), payload),所以上面的record即为下面的rec,而上面的HoodieRecordPayload是通过record.getData()获取的,也就是HoodieAvroRecord中的data,即为下面的BootstrapRecordPayload

inp ->{String recKey = keyGenerator.getKey(inp).getRecordKey();GenericRecord gr =newGenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA);
        gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey);BootstrapRecordPayload payload =newBootstrapRecordPayload(gr);HoodieRecord rec =newHoodieAvroRecord(newHoodieKey(recKey, partitionPath), payload);return rec;}

BootstrapRecordPayload.getInsertValue

这里的逻辑也很简单, 直接返回record,在上面的transformFunction中可知,record为GenericRecord,只有一个字段RECORD_KEY_METADATA_FIELD,这样我们知道了getInsertValue返回值,就可以继续看一下后面的write方法了

publicOption<IndexedRecord>getInsertValue(Schema schema){returnOption.ofNullable(record);}

HoodieBootstrapHandle.write

HoodieBootstrapHandle的write方法是在父类HoodieCreateHandle中实现的,这里的核心逻辑是通过fileWriter将avroRecord写到对应的parquet中,具体的实现也是在parquet源码中,我们知道这里的Record信息只有主键信息和分区信息,这就是为什么metadata为啥只会生成主键、页脚的基本框架文件,不会重写全部数据的原因了,关于METADATA_ONLY我们就分析到这里了,接着看一下FULL_RECORD

publicvoidwrite(HoodieRecordrecord,Option<IndexedRecord> avroRecord){Option recordMetadata =((HoodieRecordPayload)record.getData()).getMetadata();if(HoodieOperation.isDelete(record.getOperation())){
      avroRecord =Option.empty();}try{if(avroRecord.isPresent()){if(avroRecord.get().equals(IGNORE_RECORD)){return;}// Convert GenericRecord to GenericRecord with hoodie commit metadata in schemaif(preserveMetadata){
          fileWriter.writeAvro(record.getRecordKey(),rewriteRecordWithMetadata((GenericRecord) avroRecord.get(), path.getName()));}else{
          fileWriter.writeAvroWithMetadata(record.getKey(),rewriteRecord((GenericRecord) avroRecord.get()));}// update the new location of record, so we know where to find it nextrecord.unseal();record.setNewLocation(newHoodieRecordLocation(instantTime, writeStatus.getFileId()));record.seal();
        recordsWritten++;
        insertRecordsWritten++;}else{
        recordsDeleted++;}
      writeStatus.markSuccess(record, recordMetadata);// deflate record payload after recording success. This will help users access payload as a// part of marking// record successful.record.deflate();}catch(Throwable t){// Not throwing exception from here, since we don't want to fail the entire job// for a single record
      writeStatus.markFailure(record, t, recordMetadata);
      LOG.error("Error writing record "+record, t);}}

SparkBootstrapCommitActionExecutor.fullBootstrap

首先通过反射构造inputProvider,它是通过FULL_BOOTSTRAP_INPUT_PROVIDER_CLASS_NAME(hoodie.bootstrap.full.input.provider)配置的,默认值为SparkParquetBootstrapDataProvider,然后由inputProvider.generateInputRecords读取源表的parquet文件返回inputRecordsRDD,这里返回的是全部的内容,最后将inputRecordsRDD作为参数,由getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute(),最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中,也就是以Hudi表的形式执行数据的完整复制/重写。关于bulkInsert的源码逻辑,由于比较多,限于篇幅和精力以及能力的原因,本文就不深入介绍了,有机会的话我会再单独分享一篇的。

protectedOption<HoodieWriteMetadata<HoodieData<WriteStatus>>>fullBootstrap(List<Pair<String,List<HoodieFileStatus>>> partitionFilesList){if(null== partitionFilesList || partitionFilesList.isEmpty()){returnOption.empty();}TypedProperties properties =newTypedProperties();
    properties.putAll(config.getProps());FullRecordBootstrapDataProvider inputProvider =(FullRecordBootstrapDataProvider)ReflectionUtils.loadClass(config.getFullBootstrapInputProvider(),
            properties, context);JavaRDD<HoodieRecord> inputRecordsRDD =(JavaRDD<HoodieRecord>) inputProvider.generateInputRecords("bootstrap_source", config.getBootstrapSourceBasePath(),
            partitionFilesList);// Start Full BootstrapfinalHoodieInstant requested =newHoodieInstant(State.REQUESTED, table.getMetaClient().getCommitActionType(),HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS);
    table.getActiveTimeline().createNewInstant(requested);// Setup correct schema and run bulk insert.returnOption.of(getBulkInsertActionExecutor(HoodieJavaRDD.of(inputRecordsRDD)).execute());}

在父类SparkFullBootstrapDataProviderBase

@OverridepublicJavaRDD<HoodieRecord>generateInputRecords(String tableName,String sourceBasePath,List<Pair<String,List<HoodieFileStatus>>> partitionPathsWithFiles){String[] filePaths = partitionPathsWithFiles.stream().map(Pair::getValue).flatMap(f -> f.stream().map(fs ->FileStatusUtils.toPath(fs.getPath()).toString())).toArray(String[]::new);Dataset inputDataset = sparkSession.read().format(getFormat()).load(filePaths);try{KeyGenerator keyGenerator =HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);String structName = tableName +"_record";String namespace ="hoodie."+ tableName;
      RDD<GenericRecord> genericRecords =HoodieSparkUtils.createRdd(inputDataset, structName, namespace,false,Option.empty());return genericRecords.toJavaRDD().map(gr ->{String orderingVal =HoodieAvroUtils.getNestedFieldValAsString(
            gr, props.getString("hoodie.datasource.write.precombine.field"),false, props.getBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue())));try{returnDataSourceUtils.createHoodieRecord(gr, orderingVal, keyGenerator.getKey(gr),
              props.getString("hoodie.datasource.write.payload.class"));}catch(IOException ioe){thrownewHoodieIOException(ioe.getMessage(), ioe);}});}catch(IOException ioe){thrownewHoodieIOException(ioe.getMessage(), ioe);}}

getBulkInsertActionExecutor

返回SparkBulkInsertCommitActionExecutor

protectedBaseSparkCommitActionExecutor<T>getBulkInsertActionExecutor(HoodieData<HoodieRecord> inputRecordsRDD){returnnewSparkBulkInsertCommitActionExecutor((HoodieSparkEngineContext) context,newHoodieWriteConfig.Builder().withProps(config.getProps()).withSchema(bootstrapSchema).build(), table,HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS,
        inputRecordsRDD,Option.empty(), extraMetadata);}

SparkBulkInsertCommitActionExecutor.execute

执行SparkBulkInsertHelper.bulkInsert

publicHoodieWriteMetadata<HoodieData<WriteStatus>>execute(){try{returnSparkBulkInsertHelper.newInstance().bulkInsert(inputRecordsRDD, instantTime, table, config,this,true, bulkInsertPartitioner);}catch(HoodieInsertException ie){throw ie;}catch(Throwable e){thrownewHoodieInsertException("Failed to bulk insert for commit time "+ instantTime, e);}}

SparkBulkInsertHelper.bulkInsert

最后由SparkBulkInsertHelper.bulkInsert按照普通的写Hudi的逻辑写到目标表中

publicHoodieWriteMetadata<HoodieData<WriteStatus>>bulkInsert(finalHoodieData<HoodieRecord<T>> inputRecords,finalString instantTime,finalHoodieTable<T,HoodieData<HoodieRecord<T>>,HoodieData<HoodieKey>,HoodieData<WriteStatus>> table,finalHoodieWriteConfig config,finalBaseCommitActionExecutor<T,HoodieData<HoodieRecord<T>>,HoodieData<HoodieKey>,HoodieData<WriteStatus>,R> executor,finalboolean performDedupe,finalOption<BulkInsertPartitioner> userDefinedBulkInsertPartitioner){HoodieWriteMetadata result =newHoodieWriteMetadata();//transition bulk_insert state to inflight
    table.getActiveTimeline().transitionRequestedToInflight(newHoodieInstant(HoodieInstant.State.REQUESTED,
            executor.getCommitActionType(), instantTime),Option.empty(),
        config.shouldAllowMultiWriteOnSameInstant());BulkInsertPartitioner partitioner = userDefinedBulkInsertPartitioner.orElse(BulkInsertInternalPartitionerFactory.get(config.getBulkInsertSortMode()));// write new filesHoodieData<WriteStatus> writeStatuses =bulkInsert(inputRecords, instantTime, table, config, performDedupe, partitioner,false,
        config.getBulkInsertShuffleParallelism(),newCreateHandleFactory(false));//update index((BaseSparkCommitActionExecutor) executor).updateIndexAndCommitIfNeeded(writeStatuses, result);return result;}

总结

本文简单的对Hudi bootstrap的一些关键的源码逻辑进行了分析,希望能对大家有所帮助。限于精力及能力的原因,有些地方可能不够深入,或者不对的地方,还请大家多多指正,让我们共同进步。


本文转载自: https://blog.csdn.net/dkl12/article/details/127383512
版权归原作者 董可伦 所有, 如有侵权,请联系我们删除。

“Hudi源码|bootstrap源码分析总结(写Hudi)”的评论:

还没有评论