0


Spark SQL----Parquet文件

Spark SQL----Parquet文件

Parquet是一种列式(columnar)格式,许多其他数据处理系统都支持它。Spark SQL支持读取和写入Parquet文件,该文件自动保留原始数据的模式。读取Parquet文件时,出于兼容性原因,所有列都会自动转换为可为null。

一、以编程方式加载数据

使用这个例子的数据:

peopleDF = spark.read.json("examples/src/main/resources/people.json")# DataFrames can be saved as Parquet files, maintaining the schema information.
peopleDF.write.parquet("people.parquet")# Read in the Parquet file created above.# Parquet files are self-describing so the schema is preserved.# The result of loading a parquet file is also a DataFrame.
parquetFile = spark.read.parquet("people.parquet")# Parquet files can also be used to create a temporary view and then used in SQL statements.
parquetFile.createOrReplaceTempView("parquetFile")
teenagers = spark.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")
teenagers.show()# +------+# |  name|# +------+# |Justin|# +------+

在Spark repo的“examples/src/main/python/sql/datasource.py”中找到完整的示例代码。

二、分区发现

表分区是像Hive这样的系统中常用的优化方法。在分区表中,数据通常存储在不同的目录中,分区列值编码在每个分区目录的路径中。所有内置的文件源(包括Text/CSV/JSON/ORC/Parquet)都能够自动发现和推断分区信息。例如,我们可以使用以下目录结构将以前使用的所有人口数据存储到一个分区表中,其中有两个额外的列,即gender和country作为分区列:

path
└── to
    └── table
        ├── gender=male
        │   ├── ...
        │   │
        │   ├── country=US
        │   │   └── data.parquet
        │   ├── country=CN
        │   │   └── data.parquet
        │   └── ...
        └── gender=female
            ├── ...
            │
            ├── country=US
            │   └── data.parquet
            ├── country=CN
            │   └── data.parquet
            └── ...

通过将path/to/table传递给SparkSession.read.parquet或SparkSession.read.load,Spark SQL将自动从路径中提取分区信息。现在,返回的DataFrame的schema变为:

root
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- gender: string (nullable = true)
|-- country: string (nullable = true)

请注意,分区列的数据类型是自动推断出来的。目前支持数字数据类型、日期、时间戳和字符串类型。有时,用户可能不想自动推断分区列的数据类型。对于这些用例,可以通过spark.sql.sources.partitionColumnTypeInference.enabled配置自动类型推断,默认值为true。当类型推断被禁用时,字符串类型将用于分区列。
从Spark 1.6.0开始,分区发现默认只查找给定路径下的分区。对于上面的示例,如果用户将path/to/table/gender=male传递给SparkSession.read.parquet 或 SparkSession.read.load ,则gender将不会被视为分区列。如果用户需要指定分区发现应该开始的基本路径,他们可以在数据源选项中设置basePath。例如,当path/to/table/gender=male是数据的路径,并且用户将basePath设置为path/to/table/时,gender将是一个分区列。

三、Schema合并

与Protocol Buffer、Avro和Thrift一样,Parquet也支持schema进化(evolution)。用户可以从一个简单的schema开始,然后根据需要逐渐向该schema添加更多列。这样,用户最终可能会得到多个具有不同但相互兼容模式的Parquet文件。Parquet数据源现在能够自动检测这种情况并合并所有这些文件的schema。
由于schema合并是一项相对昂贵的操作,而且在大多数情况下不是必要的,因此我们从1.5.0开始默认关闭它。你可以通过以下方式启用它

  1. 读取Parquet文件时,将数据源选项mergeSchema设置为true(如以下示例所示),或者
  2. 将全局SQL选项spark.sql.parquet.mergeSchema设置为true。
from pyspark.sql import Row

# spark is from the previous example.# Create a simple DataFrame, stored into a partition directory
sc = spark.sparkContext

squaresDF = spark.createDataFrame(sc.parallelize(range(1,6)).map(lambda i: Row(single=i, double=i **2)))
squaresDF.write.parquet("data/test_table/key=1")# Create another DataFrame in a new partition directory,# adding a new column and dropping an existing column
cubesDF = spark.createDataFrame(sc.parallelize(range(6,11)).map(lambda i: Row(single=i, triple=i **3)))
cubesDF.write.parquet("data/test_table/key=2")# Read the partitioned table
mergedDF = spark.read.option("mergeSchema","true").parquet("data/test_table")
mergedDF.printSchema()# The final schema consists of all 3 columns in the Parquet files together# with the partitioning column appeared in the partition directory paths.# root#  |-- double: long (nullable = true)#  |-- single: long (nullable = true)#  |-- triple: long (nullable = true)#  |-- key: integer (nullable = true)

四、Hive metastore Parquet表 转换

当从Hive metastore Parquet表读取和写入非分区Hive metastore Parquet表时,Spark SQL将尝试使用自己的Parquet支持,而不是Hive SerDe,以获得更好的性能。此行为由spark.sql.hive.convertMetastoreParquet配置控制,并且在默认情况下处于启用状态。

4.1 Hive/Parquet Schema Reconciliation

从表schema处理的角度来看,Hive和Parquet之间有两个关键区别。

  1. Hive是不区分大小写的,而Parquet不是
  2. Hive认为所有列都可以为Null,而Parquet中的可为空非常显著(significant)

由于这个原因,在将Hive metastore Parquet表转换为Spark SQL Parquet表时,我们必须reconcile Hive metastore schema和Parquet schema。reconciliation规则为:

  1. 无论是否可以为null,两个schema中具有相同名称的字段都必须具有相同的数据类型。reconciled字段应具有Parquet侧的数据类型,以便尊重(respect)可空性。
  2. reconciled schema正好包含Hive metastore schema中定义的字段。 - 仅出现在Parquet schema中的任何字段都会在reconciled schema中被丢弃。- 仅出现在Hive metastore schema中的任何字段都将作为可为null的字段添加到reconciled schema中。

4.2 元数据刷新

Spark SQL缓存Parquet元数据以获得更好的性能。当Hive metastore Parquet表转换启用时,这些转换后的表的metadata也会被缓存。如果这些表是由Hive或其他外部工具更新的,则需要手动刷新它们以确保元数据一致。

# spark is an existing SparkSession
spark.catalog.refreshTable("my_table")

五、Columnar加密

自Spark 3.2以来,Apache Parquet 1.12+支持Parquet表的列式加密。
Parquet使用信封加密实践,其中文件部分用“data encryption keys”(DEK)加密,DEK用“master encryption keys”(MEKs)加密。Parquet为每个加密的文件/列随机生成DEK。MEK是在用户选择的密钥管理服务(KMS)中生成、存储和管理的。Parquet Maven存储库有一个带有模拟KMS实现的jar,该实现只允许使用spark shell运行列加密和解密,而无需部署KMS服务器(下载parquet-hadoop-tests.jar文件并将其放在Spark jar文件夹中):

# Set hadoop configuration properties, e.g. using configuration properties of# the Spark job:# --conf spark.hadoop.parquet.encryption.kms.client.class=\#           "org.apache.parquet.crypto.keytools.mocks.InMemoryKMS"\# --conf spark.hadoop.parquet.encryption.key.list=\#           "keyA:AAECAwQFBgcICQoLDA0ODw== ,  keyB:AAECAAECAAECAAECAAECAA=="\# --conf spark.hadoop.parquet.crypto.factory.class=\#           "org.apache.parquet.crypto.keytools.PropertiesDrivenCryptoFactory"# Write encrypted dataframe files.# Column "square" will be protected with master key "keyA".# Parquet file footers will be protected with master key "keyB"
squaresDF.write\
   .option("parquet.encryption.column.keys","keyA:square")\
   .option("parquet.encryption.footer.key","keyB")\
   .parquet("/path/to/table.parquet.encrypted")# Read encrypted dataframe files
df2 = spark.read.parquet("/path/to/table.parquet.encrypted")

5.1 KMS 客户端

InMemoryKMS类仅用于Parquet加密功能的说明和简单演示。不应在实际部署中使用它。主加密密钥必须在用户组织中部署的生产级KMS系统中保存和管理。推出带有Parquet加密的Spark需要为KMS服务器实现一个客户端类。Parquet提供了用于开发此类类的插件接口,

publicinterfaceKmsClient{// Wraps a key - encrypts it with the master key.publicStringwrapKey(byte[] keyBytes,String masterKeyIdentifier);// Decrypts (unwraps) a key with the master key.publicbyte[]unwrapKey(String wrappedKey,String masterKeyIdentifier);// Use of initialization parameters is optional.publicvoidinitialize(Configuration configuration,String kmsInstanceID,String kmsInstanceURL,String accessToken);}

开源KMS的此类示例可以在parquet-mr存储库中找到。生产KMS客户端应与组织的安全管理员合作设计,并由具有访问控制管理经验的开发人员构建。一旦创建了这样的类,它就可以通过parquet.encryption.kms.client.class参数传递给应用程序,并由普通Spark用户使用,如上面的加密dataframe 写入/读取示例所示。
注意:默认情况下,Parquet实现了“双信封加密”模式,最大限度地减少了Spark执行器与KMS服务器的交互。在这种模式下,DEK使用“key encryption keys”(KEK,由Parquet随机生成)进行加密。KEK使用KMS中的MEK进行加密;结果和KEK本身被缓存在Spark执行器内存中。对常规信封加密感兴趣的用户可以通过将parquet.encryption.double.wrapping参数设置为false来切换到常规信封加密。有关Parquet加密参数的更多详细信息,请访问parquet-hadoop配置页面。

六、数据源选项

Parquet的数据源选项可以通过以下方式设置:

  • 以下类的option/.options方法 - DataFrameReader- DataFrameWriter- DataStreamReader- DataStreamWriter
  • CREATE TABLE USING DATA_SOURCE处的OPTIONS子句
    Property NameDefaultMeaningScopedatetimeRebaseMode(spark.sql.parquet.datetimeRebaseModeInRead配置的值)datetimeRebaseMode选项允许为DATE、TIMESTAMP_MILLIS、TIMESTAMP_MICROS逻辑类型的值指定从Julian 到Proleptic Gregorian日历的rebase模式。当前支持的模式有: ● EXCEPTION:未能读取两个日历之间不明确的ancient dates/timestamps。 ● CORRECTED: 加载日期/时间戳而不rebase。 ● LEGACY: 执行从Julian 到Proleptic Gregorian公历的ancient dates/timestamps的rebase。readint96RebaseMode(spark.sql.parquet.int96RebaseModeInRead配置的值)int96RebaseMode选项允许指定从Julian到Proleptic Gregorian日历的INT96时间戳的rebase模式。目前支持的模式有: ● EXCEPTION: 在读取两个日历之间不明确的ancient INT96 timestamps时失败。 ● CORRECTED: 加载INT96 时间戳而不rebase。● LEGACY: 执行从Julian 到Proleptic Gregorian日历的ancient时间戳的rebase。readmergeSchema(spark.sql.parquet.mergeSchema配置的值)设置是否应该合并从所有Parquet部分文件收集的schemas。这将覆盖spark.sql.parquet.mergeSchema。readcompressionsnappy保存到文件时使用的压缩编解码器。这可以是已知的不区分大小写的缩写名之一(none、uncompressed、snappy、gzip、lzo、brotli、lz4和zstd)。这将覆盖spark.sql.parquet.compression.codec。write
    其他通用选项可以在通用文件源选项中找到

6.1 配置

Parquet的配置可以使用SparkSession上的setConf方法或使用SQL运行SET key=value命令来完成。
Property NameDefaultMeaningSince Versionspark.sql.parquet.binaryAsStringfalse其他一些生成Parquet的系统,特别是Impala、Hive和旧版本的Spark SQL,在写Parquet schema时不区分二进制数据和字符串。这个flag告诉Spark SQL将二进制数据解释为字符串,以提供与这些系统的兼容性。1.1.1spark.sql.parquet.int96AsTimestamptrue一些parquet生成系统,特别是Impala和Hive,将Timestamp存储到INT96中。这个flag告诉Spark SQL将INT96数据解释为时间戳,以提供与这些系统的兼容性。1.3.0spark.sql.parquet.int96TimestampConversionfalse对于Impala写入的数据,它控制在转换为时间戳时是否应该将时间戳调整应用于INT96数据。这是必要的,因为Impala使用不同时区偏移存储INT96数据,不同于Hive和Spark。2.3.0spark.sql.parquet.outputTimestampTypeINT96设置Spark向Parquet文件写入数据时要使用的Parquet时间戳类型。INT96是Parquet中一种非标准但常用的时间戳类型。TIMESTAMP_MICROS是Parquet中的标准时间戳类型,它存储从Unix epoch开始的微秒数。TIMESTAMP_MILLIS也是标准的,但具有毫秒精度,这意味着Spark必须截断其时间戳值的微秒部分。2.3.0spark.sql.parquet.compression.codecsnappy设置写Parquet文件时使用的压缩编解码器。如果在表特定的选项/属性中指定了compression 或parquet.compression,则优先级将是compression、parquet.compression、spark.sql.parquet.compression.codec。可接受的值包括:none、uncompressed、snappy、gzip、lzo、brotli、lz4、zstd。注意,brotli需要安装BrotliCodec。1.1.1spark.sql.parquet.filterPushdowntrue当设置为true时,启用Parquet过滤器下推(push-down)优化。1.2.0spark.sql.parquet.aggregatePushdownfalse如果为真,聚合将被下推到Parquet进行优化。支持MIN, MAX和COUNT作为聚合表达式。对于MIN/MAX,support boolean,integer,float 和date类型。对于COUNT,支持所有数据类型。如果任何Parquet文件footer缺少统计信息,将抛出异常。3.3.0spark.sql.hive.convertMetastoreParquettrue当设置为false时,Spark SQL将为parquet表使用Hive SerDe,而不是内置的支持。1.1.1spark.sql.parquet.mergeSchemafalse当为true时,Parquet数据源合并从所有数据文件收集的schemas,否则从摘要文件中选择schema,如果没有可用的摘要文件,则从随机数据文件中选择schema。1.5.0spark.sql.parquet.respectSummaryFilesfalse当为true时,我们假设Parquet的所有part-files都与摘要文件一致,并且在合并schema时会忽略它们。否则,如果默认为false,我们将合并所有part-files。这应该被视为专家专用选项,在知道它的确切含义之前不应该启用。1.5.0spark.sql.parquet.writeLegacyFormatfalse如果为true,数据将以Spark 1.4及更早版本的方式写入。例如,十进制值将以Apache Parquet的固定长度字节数组格式编写,其他系统(如Apache Hive和Apache Impala)也使用这种格式。如果为false,则将使用Parquet中较新的格式。例如,小数将以基于int的格式写入。如果Parquet输出用于不支持此较新格式的系统,请设置为true。1.6.0spark.sql.parquet.enableVectorizedReadertrue启用向量化parquet解码。2.0.0spark.sql.parquet.enableNestedColumnVectorizedReadertrue为嵌套列(例如,struct, list, map)启用向量化Parquet解码。需要启用spark.sql.parquet.enableVectorizedReader。3.3.0spark.sql.parquet.recordLevelFilter.enabledfalse如果为true,则使用下推过滤器启用Parquet的native记录级过滤。此配置仅在启用spark.sql.parquet.filterPushdown且不使用向量化reader时有效。通过将spark.sql.parquet.enableVectorizedReader设置为false,可以确保不使用向量化reader。2.3.0spark.sql.parquet.columnarReaderBatchSize4096在parquet向量化reader批处理中要包括的行数。应该小心选择这个数字,以最小化开销并避免读取数据时出现OOM。2.4.0spark.sql.parquet.fieldId.write.enabledtrueField ID 是Parquet schema规范的native字段。启用后,Parquet writers将把Spark schema中的field Id元数据(如果存在)填充到Parquet schema中。3.3.0spark.sql.parquet.fieldId.read.enabledfalse字段ID是Parquet schema规范的native字段。当启用时,Parquet readers将使用请求的Spark schema中的field IDs(如果存在)来查找Parquet字段,而不是使用列名。3.3.0spark.sql.parquet.fieldId.read.ignoreMissingfalse当Parquet文件没有任何field IDs,但Spark read schema正在使用field IDs进行读取时,我们将在启用此flag时静默返回null,否则会出错。3.3.0spark.sql.parquet.timestampNTZ.enabledtrue启用TIMESTAMP_NTZ对Parquet读写的支持。启用后,TIMESTAMP_NTZ值将被写入Parquet时间戳列,注解为isAdjustedToUTC = false ,并以类似的方式推断。禁用时,这些值将读取为TIMESTAMP_LTZ,并且必须转换为TIMESTAMP_LTZ进行写入。3.4.0spark.sql.parquet.datetimeRebaseModeInReadEXCEPTIONDATE, TIMESTAMP_MILLIS, TIMESTAMP_MICROS逻辑类型的值从Julian 到Proleptic Gregorian日历的rebase模式: ● EXCEPTION: 如果Spark看到两个日历之间有不明确的ancient日期/时间戳,则读取失败。 ● CORRECTED: Spark不会按原样进行rebase和读取日期/时间戳。 ● LEGACY: Spark在读取Parquet文件时,将日期/时间戳从传统的混合日历(Julian + Gregorian)rebase为Proleptic Gregorian日历。只有当Parquet文件的写入程序信息(如Spark、Hive)未知时,此配置才有效。3.0.0spark.sql.parquet.datetimeRebaseModeInWriteEXCEPTIONDATE、TIMESTAMP_MILLIS、TIMESTAMP_MICROS逻辑类型的值从Proleptic Gregorian日历到Julian 日历的rebase模式: ● EXCEPTION: 如果Spark看到两个日历之间的ancient日期/时间戳不明确,它将写失败。● CORRECTED:Spark不会按原样rebase并写入日期/时间戳。 ● LEGACY: Spark在写Parquet文件时,会将日期/时间戳从Proleptic Gregorian日历rebase为传统的混合日历(Julian + Gregorian)。3.0.0spark.sql.parquet.int96RebaseModeInReadEXCEPTIONINT96时间戳类型的值从Julian 到Proleptic Gregorian日历的rebase模式:● EXCEPTION: 如果看到两个日历之间有不明确的ancient INT96时间戳,Spark将无法读取。 ● CORRECTED: Spark不会按原样进行rebase和读取日期/时间戳。● LEGACY: Spark在读取Parquet文件时,将INT96时间戳从传统的混合日历(Julian + Gregorian)rebase为Proleptic Gregorian日历。只有当Parquet文件的写入程序信息(如Spark、Hive)未知时,此配置才有效。3.1.0spark.sql.parquet.int96RebaseModeInWriteEXCEPTIONINT96时间戳类型的值从Proleptic Gregorian日历到Julian日历的rebase模式: ● EXCEPTION: 如果Spark看到两个日历之间有不明确的ancient时间戳,则会导致写入失败。 ● CORRECTED: Spark不会按原样rebase并写入日期/时间戳。 ● LEGACY: Spark将在写Parquet文件时将INT96时间戳从Proleptic Gregorian日历rebase为遗留混合(Julian + Gregorian)日历。3.1.0

标签: spark sql 大数据

本文转载自: https://blog.csdn.net/gabriel_wang_sh/article/details/136972265
版权归原作者 85程序员老王 所有, 如有侵权,请联系我们删除。

“Spark SQL----Parquet文件”的评论:

还没有评论