0


Spark中读parquet文件是怎么实现的

背景

最近在整理了一下 spark对Parquet的写文件的过程,也是为了更好的理解和调优Spark相关的任务,
因为对于Spark来说,任何一个事情都不是独立的存在的,比如说parquet文件的rowgroup设置的大小对读写的影响,以及parquet写之前排序对读parquet的影响,以及向量化读取等等
本文基于Spark 3.5

分析

我们以

FileSourceScanExec的doExecute方法

为切口进行分析:

  protected override def doExecute(): RDD[InternalRow] = {
    val numOutputRows = longMetric("numOutputRows")
    if (needsUnsafeRowConversion) {
      inputRDD.mapPartitionsWithIndexInternal { (index, iter) =>
        val toUnsafe = UnsafeProjection.create(schema)
        toUnsafe.initialize(index)
        iter.map { row =>
          numOutputRows += 1
          toUnsafe(row)
        }
      }
    } else {
      inputRDD.mapPartitionsInternal { iter =>
        iter.map { row =>
          numOutputRows += 1
          row
        }
      }
    }
  }

这里的

needsUnsafeRowConversion

判断如果是

ParquetSource

,且配置了

spark.sql.parquet.enableVectorizedReader

为‘true’(默认就是true),则会进行unsafeRow的转换,当然这里的好处就是节约内存以及能够减少GC
对于

inputRDD

来说,就是创建了读取

parquet

的RDD:
具体的见:

ParquetFileFormat.buildReaderWithPartitionValues

方法,涉及到的代码多,所以只解读关键的几个部分:

  • fileRooter的读取 val fileFooter = if (enableVectorizedReader) { // When there are vectorized reads, we can avoid reading the footer twice by reading // all row groups in advance and filter row groups according to filters that require // push down (no need to read the footer metadata again). ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.WITH_ROW_GROUPS) } else { ParquetFooterReader.readFooter(sharedConf, file, ParquetFooterReader.SKIP_ROW_GROUPS) }

这里

enableVectorizedReader

如果是true的话, fileFooter 只会得到所属Task的FileMetaData信息,其中只包括了所属Task的需要读取的parquet RowGroups,具体的数据流如下:

ParquetFooterReader.readFooter
   ||
   \/
readFooter
   ||
   \/
fileReader.getFooter
   ||
   \/
readFooter(file, options, f, converter)
   ||
   \/
converter.readParquetMetadata
   ||
   \/
filter.accept(new MetadataFilterVisitor)
filter.accept(new MetadataFilterVisitor

就会根据对应的

filter

类型进行不同的操作:

FileMetaDataAndRowGroupOffsetInfo fileMetaDataAndRowGroupInfo = filter.accept(new MetadataFilterVisitor<FileMetaDataAndRowGroupOffsetInfo, IOException>() {
      @Override
      public FileMetaDataAndRowGroupOffsetInfo visit(NoFilter filter) throws IOException {
        FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
        return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
      }

      @Override
      public FileMetaDataAndRowGroupOffsetInfo visit(SkipMetadataFilter filter) throws IOException {
        FileMetaData fileMetadata = readFileMetaData(from, true, footerDecryptor, encryptedFooterAAD);
        return new FileMetaDataAndRowGroupOffsetInfo(fileMetadata, generateRowGroupOffsets(fileMetadata));
      }

      @Override
      public FileMetaDataAndRowGroupOffsetInfo visit(OffsetMetadataFilter filter) throws IOException {
        FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
        // We must generate the map *before* filtering because it modifies `fileMetadata`.
        Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
        FileMetaData filteredFileMetadata = filterFileMetaDataByStart(fileMetadata, filter);
        return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);
      }

      @Override
      public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throws IOException {
        FileMetaData fileMetadata = readFileMetaData(from, footerDecryptor, encryptedFooterAAD);
        // We must generate the map *before* filtering because it modifies `fileMetadata`.
        Map<RowGroup, Long> rowGroupToRowIndexOffsetMap = generateRowGroupOffsets(fileMetadata);
        FileMetaData filteredFileMetadata = filterFileMetaDataByMidpoint(fileMetadata, filter);
        return new FileMetaDataAndRowGroupOffsetInfo(filteredFileMetadata, rowGroupToRowIndexOffsetMap);
      }
    });
  • 如果是 ParquetFooterReader.SKIP_ROW_GROUPS ,则是走的SkipMetadataFilter这条filter,则只会拿出rowgroup的信息和rowgrups的的行数
  • 如果是 enableVectorizedReader,也就是会走 RangeMetadataFilter这个Filter,则会调用filterFileMetaDataByMidpoint,该方法会根据Task分配的数据是否覆盖了Rowgroups的中点来纳入到该task的读取的数据中来,具体的可以见:Spark-读取Parquet-为什么task数量会多于Row Group的数量
  • vectorizedReader的创建`````` vectorizedReader.initialize(split, hadoopAttemptContext, Option.apply(fileFooter)) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) if (returningBatch) { vectorizedReader.enableReturningBatches() } // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. iter.asInstanceOf[Iterator[InternalRow]]- vectorizedReader.initialize 重要的点是这个主要是涉及到 parquet messageType到 ParquetColumn的转换,主要是ParquetToSparkSchemaConverter converter = new ParquetToSparkSchemaConverter(configuration)这个的配置- vectorizedReader.initBatch 这里面主要涉及到了根据memModeOFF_HEAP还是ON_HEAP模式来构造不同的ColumnVector,其中 如果是ON_HEAP,则会创建OnHeapColumnVector,用jvm数据的形式存储 如果是OFF_HEAP,则会创建OffHeapColumnVector,这里涉及到的对象都是都是用unsafe api来操作,这里涉及到一个有意思的点: Platform.putByte(null, data + rowId, value); Platform.putInt(null, data + 4L * rowId, value)也就是说 无论是put什么 里面的第一个参数是为null,这个其实在Unsafed方法 putInt(Object o, long offset, int x)类中有提到 Fetches a value from a given Java variable. More specifically, fetches a field or array element within the given object o at the given offset, or (if o is null) from the memory address whose numerical value is the given offset.也就是说如果传入的第一个参数为null,则会以offset作为地址,而在OffHeapColumnVector中对应的put当法中涉及到的offset就是data这个变量会在OffHeapColumnVector构造函数中的reserveInternal方法中赋值,这其中涉及到unsafe.allocateMemory方法会返回分配的内存地址- 具体迭代获取InternalRow 这里的迭代获取主要是通过 vectorizedReader.getCurrentValue方法实现的,也就是会返回columnarBatch,但是这里的columnarBatch赋值是通过vectorizedReader.nextKeyValue方法实现的,该方法会被RecordReaderIterator.hasNext调用,vectorizedReader.nextKeyValue的数据流如下: VectorizedParquetRecordReader.nextBatch || \/ checkEndOfRowGroup => 初始化 PageReadStore pages = reader.readNextRowGroup(); || || \/ \/ columnReader.readBatch(num, leafCv.getValueVector() initColumnReader(pages, cv); // columnVectors 设置ParquetColumnVector 里面包括了rowgroup里的所有page || \/ readPage || \/ pageReader.readPage() || \/ decompressor.decompress // 之类会进行解压 ``````decompressor.decompressdecompressorChunk.readAllPagesdescriptor.metadata.getCodec()传进来的,也就是从元数据里面读取的 具体的向量化的读取,细节比较多,包括批量读取definition levelsrepetition levels等,这些读者自行分析注意:为什么 FileSourceScanExec中inputRDDs返回的类型是RDD[InternalRow] ,而vectorizedReader.getCurrentValue返回的类型是ColumnarBatch 也能运行,那是因为 我们在运行的时候,会有ColumnarToRow,他最终调用的是FileSourceScanExec.doExecuteColumnar,如下图:在这里插入图片描述

jvm

会对

Iterator[InternalRow]

进行类型擦除,也就是说所有Iterator[InternalRow]在编译的时候会编译成Iterator[Object],会在运行时获取真正的类型

`FileScanRDD` 中的`compute方法` 最后有个
iterator.asInstanceOf[Iterator[InternalRow]] // This is an erasure hack.

标签: spark 大数据

本文转载自: https://blog.csdn.net/monkeyboy_tech/article/details/136461718
版权归原作者 鸿乃江边鸟 所有, 如有侵权,请联系我们删除。

“Spark中读parquet文件是怎么实现的”的评论:

还没有评论