0


Iceberg从入门到精通系列之二十二:Spark DDL

Iceberg从入门到精通系列之二十二:Spark DDL

一、Spark DDL

要在 Spark 中使用 Iceberg,请首先配置 Spark 目录。 Iceberg 使用 Apache Spark 的 DataSourceV2 API 来实现数据源和目录。

二、Spark DDL-创建表

Spark 3 可以使用 USINGiceberg 子句在任何 Iceberg 目录中创建表:

CREATETABLE prod.db.sample (
    id bigintCOMMENT'unique id',data string)USING iceberg

Iceberg会将Spark中的列类型转换为对应的Iceberg类型。详细信息请查看创建表的类型兼容性部分。

  • PARTITIONED BY(分区表达式)来配置分区
  • LOCATION ‘(完全限定-uri)’ 设置表位置
  • COMMENT ‘表文档’ 设置表描述
  • TBLPROPERTIES(‘key’=‘value’, …) 设置表配置

创建命令还可以使用 USING 子句设置默认格式。仅 SparkCatalog 支持此功能,因为 Spark 对内置目录的 USING 子句的处理方式不同。

三、Spark DDL-PARTITIONED BY

要创建分区表,请使用 PARTITIONED BY:

CREATE TABLE prod.db.sample (id bigint,
    data string,
    category string)
USING iceberg
PARTITIONED BY (category)

PARTITIONED BY 子句支持转换表达式来创建隐藏分区。

CREATE TABLE prod.db.sample (id bigint,
    data string,
    category string,
    ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category)

支持的转换有:

  • Year(ts):按年份分区
  • Month(ts):按月分区
  • day(ts) 或 date(ts):相当于 dateint 分区
  • hour(ts) 或 date_hour(ts):相当于 dateint 和 hour 分区
  • bucket(N, col):按哈希值 mod N 个桶进行分区
  • truncate(L, col):按截断为 L 的值进行分区 - 字符串被截断为给定长度- 整数和长整型截断为 bin: truncate(10, i) 生成分区 0, 10, 20, 30,

注意:为了兼容性,还支持年(ts)、月(ts)、日(ts)和小时(ts)的旧语法。

四、Spark DDL-CREATE TABLE … AS SELECT

使用 SparkCatalog 时,Iceberg 支持 CTAS 作为原子操作。支持 CTAS,但在使用 SparkSessionCatalog 时不是原子的。

CREATETABLE prod.db.sample
USING iceberg
ASSELECT...

新创建的表不会继承 SELECT 中源表的分区规范和表属性,您可以使用 CTAS 中的 PARTITIONED BY 和 TBLPROPERTIES 来声明新表的分区规范和表属性。

CREATETABLE prod.db.sample
USING iceberg
PARTITIONED BY(part)
TBLPROPERTIES ('key'='value')ASSELECT...

五、Spark DDL-REPLACE TABLE … AS SELECT

使用 SparkCatalog 时,Iceberg 支持 RTAS 作为原子操作。支持 RTAS,但在使用 SparkSessionCatalog 时不是原子的。

原子表替换使用 SELECT 查询的结果创建新快照,但保留表历史记录。

REPLACETABLE prod.db.sample
USING iceberg
ASSELECT...
REPLACETABLE prod.db.sample
USING iceberg
PARTITIONED BY(part)
TBLPROPERTIES ('key'='value')ASSELECT...
CREATEORREPLACETABLE prod.db.sample
USING iceberg
ASSELECT...

六、Spark DDL-DROP TABLE

删除表行为在 0.14 中发生了变化。

在 0.14 之前,运行 DROP TABLE 将从目录中删除表并删除表内容。

从 0.14 开始,DROP TABLE 只会从目录中删除表。为了删除表内容,应使用 DROP TABLE PURGE。

DROP TABLE

要从目录中删除表,请运行:

DROPTABLE prod.db.sample

DROP TABLE PURGE
要从目录中删除表并删除表的内容,请运行:

DROPTABLE prod.db.sample PURGE

七、Spark DDL-ALTER TABLE

Iceberg 在 Spark 3 中拥有完整的 ALTER TABLE 支持,包括:

  • 重命名表
  • 设置或删除表属性
  • 添加、删除和重命名列
  • 添加、删除和重命名嵌套字段
  • 重新排序顶级列和嵌套结构字段
  • 扩大 int、float 和decimal 字段的类型
  • 将必需的列设置为可选

此外,SQL 扩展可用于添加对分区演化和设置表的写入顺序的支持

1.ALTER TABLE … RENAME TO

ALTERTABLE prod.db.sample RENAMETO prod.db.new_name

2.ALTER TABLE … SET TBLPROPERTIES

ALTERTABLE prod.db.sample SET TBLPROPERTIES ('read.split.target-size'='268435456')

Iceberg 使用表属性来控制表行为。

UNSET 用于删除属性:

ALTERTABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size')

SET TBLPROPERTIES 还可以用来设置表注释(描述):

ALTERTABLE prod.db.sample SET TBLPROPERTIES ('comment'='A table comment.')

3.ALTER TABLE … ADD COLUMN

要将列添加到 Iceberg,请使用 ADD COLUMNS 子句和 ALTER TABLE:

ALTERTABLE prod.db.sample
ADDCOLUMNS(
    new_column string comment'new_column docs')

可以同时添加多列,以逗号分隔。

应使用完整的列名称来标识嵌套列:

-- create a struct columnALTERTABLE prod.db.sample
ADDCOLUMNpoint struct<x: double, y: double>;-- add a field to the structALTERTABLE prod.db.sample
ADDCOLUMNpoint.z double
-- create a nested array column of structALTERTABLE prod.db.sample
ADDCOLUMN points array<struct<x: double, y: double>>;-- add a field to the struct within an array. Using keyword 'element' to access the array's element column.ALTERTABLE prod.db.sample
ADDCOLUMN points.element.z double
-- create a map column of struct key and struct valueALTERTABLE prod.db.sample
ADDCOLUMN points map<struct<x: int>, struct<a: int>>;-- add a field to the value struct in a map. Using keyword 'value' to access the map's value column.ALTERTABLE prod.db.sample
ADDCOLUMN points.value.b int

注意:不允许通过添加列来更改映射“键”列。只能更新地图值。

通过添加 FIRST 或 AFTER 子句在任意位置添加列:

ALTERTABLE prod.db.sample
ADDCOLUMN new_column bigintAFTER other_column
ALTERTABLE prod.db.sample
ADDCOLUMN nested.new_column bigintFIRST

4.ALTER TABLE … RENAME COLUMN

Iceberg 允许重命名任何字段。要重命名字段,请使用 RENAME COLUMN:

ALTERTABLE prod.db.sample RENAMECOLUMNdataTO payload
ALTERTABLE prod.db.sample RENAMECOLUMN location.lat TO latitude

请注意,嵌套重命名命令仅重命名叶字段。上述命令将 location.lat 重命名为 location.latitude

5.ALTER TABLE … ALTER COLUMN

更改列用于扩大类型、使字段可选、设置注释以及重新排序字段。

如果更新是安全的,Iceberg 允许更新列类型。安全更新是:

  • int to bigint
  • float to double
  • decimal(P,S) to decimal(P2,S) when P2 > P (scale cannot change)
ALTERTABLE prod.db.sample ALTERCOLUMN measurement TYPEdouble

要从结构中添加或删除列,请使用带有嵌套列名称的 ADD COLUMN 或 DROP COLUMN。

列注释也可以使用 ALTER COLUMN 进行更新:

ALTERTABLE prod.db.sample ALTERCOLUMN measurement TYPEdoubleCOMMENT'unit is bytes per second'ALTERTABLE prod.db.sample ALTERCOLUMN measurement COMMENT'unit is kilobytes per second'

Iceberg 允许使用 FIRST 和 AFTER 子句对顶级列或结构中的列进行重新排序:

ALTERTABLE prod.db.sample ALTERCOLUMN col FIRSTALTERTABLE prod.db.sample ALTERCOLUMN nested.col AFTER other_col

可以使用 DROP NOT NULL 更改不可为空列的可为空性:

ALTERTABLE prod.db.sample ALTERCOLUMN id DROPNOTNULL

无法使用 SET NOT NULL 将可空列更改为不可空列,因为 Iceberg 不知道是否存在具有空值的现有数据。

ALTER COLUMN 不用于更新结构类型。使用 ADD COLUMN 和 DROP COLUMN 添加或删除结构字段。

6.ALTER TABLE … DROP COLUMN

要删除列,请使用 ALTER TABLE … DROP COLUMN:

ALTERTABLE prod.db.sample DROPCOLUMN id
ALTERTABLE prod.db.sample DROPCOLUMNpoint.z

ALTER TABLE SQL extensions

使用 Iceberg SQL 扩展时,这些命令在 Spark 3 中可用。

7.ALTER TABLE … ADD PARTITION FIELD

Iceberg 支持使用 ADD PARTITION FIELD 将新的分区字段添加到规范中:

ALTERTABLE prod.db.sample ADDPARTITION FIELD catalog 

还支持分区转换:

ALTERTABLE prod.db.sample ADDPARTITION FIELD bucket(16, id)ALTERTABLE prod.db.sample ADDPARTITION FIELD truncate(4,data)ALTERTABLE prod.db.sample ADDPARTITION FIELD year(ts)-- use optional AS keyword to specify a custom name for the partition field ALTERTABLE prod.db.sample ADDPARTITION FIELD bucket(16, id)AS shard

添加分区字段是元数据操作,不会更改任何现有表数据。新数据将使用新分区写入,但现有数据将保留在旧分区布局中。旧数据文件的元数据表中的新分区字段将为空值。

当表的分区发生变化时,动态分区覆盖行为也会发生变化,因为动态覆盖会隐式替换分区。要显式覆盖,请使用新的 DataFrameWriterV2 API。

要通过转换从每日分区迁移到每小时分区,无需删除每日分区字段。保留该字段可确保现有元数据表查询继续工作。
当分区发生变化时,动态分区覆盖行为也会发生变化。例如,如果您按天分区并改为按小时分区,则覆盖将覆盖每小时分区,但不再覆盖天分区。

8.ALTER TABLE … DROP PARTITION FIELD

可以使用 DROP PARTITION FIELD 删除分区字段:

ALTERTABLE prod.db.sample DROPPARTITION FIELD catalog
ALTERTABLE prod.db.sample DROPPARTITION FIELD bucket(16, id)ALTERTABLE prod.db.sample DROPPARTITION FIELD truncate(4,data)ALTERTABLE prod.db.sample DROPPARTITION FIELD year(ts)ALTERTABLE prod.db.sample DROPPARTITION FIELD shard

请注意,尽管删除了分区,但该列仍将存在于表模式中。

删除分区字段是元数据操作,不会更改任何现有表数据。新数据将使用新分区写入,但现有数据将保留在旧分区布局中。

当分区更改时,动态分区覆盖行为将会改变。例如,如果您按天分区并转为按小时分区,则覆盖将覆盖每小时分区,但不再覆盖天分区。
删除分区字段时要小心,因为它会更改元数据表(如文件)的架构,并可能导致元数据查询失败或产生不同的结果。

9.ALTER TABLE … REPLACE PARTITION FIELD

通过使用 REPLACE PARTITION FIELD,可以在单个元数据更新中将分区字段替换为新的分区字段:

ALTERTABLE prod.db.sample REPLACEPARTITION FIELD ts_day WITHday(ts)-- use optional AS keyword to specify a custom name for the new partition field ALTERTABLE prod.db.sample REPLACEPARTITION FIELD ts_day WITHday(ts)AS day_of_ts

10.ALTER TABLE … WRITE ORDERED BY

Iceberg 表可以配置排序顺序,用于自动对某些引擎中写入表的数据进行排序。例如,Spark 中的 MERGE INTO 将使用表排序。

要设置表的写入顺序,请使用 WRITE ORDERED BY:

ALTERTABLE prod.db.sample WRITE ORDERED BY category, id
-- use optional ASC/DEC keyword to specify sort order of each field (default ASC)ALTERTABLE prod.db.sample WRITE ORDERED BY category ASC, id DESC-- use optional NULLS FIRST/NULLS LAST keyword to specify null order of each field (default FIRST)ALTERTABLE prod.db.sample WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST

表写入顺序不保证查询的数据顺序。它仅影响数据写入表的方式。

WRITE ORDERED BY 设置全局排序,其中行跨任务排序,就像在 INSERT 命令中使用 ORDER BY 一样:

INSERTINTO prod.db.sample
SELECT id,data, category, ts FROM another_table
ORDERBY ts, category

要在每个任务内而不是跨任务排序,请使用 LOCALLY ORDERED BY:

ALTERTABLE prod.db.sample WRITE LOCALLY ORDERED BY category, id

11.ALTER TABLE … WRITE DISTRIBUTED BY PARTITION

WRITE DISTRIBUTED BY PARTITION 会要求每个分区由一个 writer 处理,默认实现是哈希分布。

ALTERTABLE prod.db.sample WRITEDISTRIBUTEDBYPARTITION

DISTRIBUTED BY PARTITION 和 LOCALLY ORDERED BY 可以一起使用,以按分区分布并在每个任务中本地排序行。

ALTERTABLE prod.db.sample WRITEDISTRIBUTEDBYPARTITION LOCALLY ORDERED BY category, id

12.ALTER TABLE … SET IDENTIFIER FIELDS

Iceberg 支持使用 SET IDENTIFIER FIELDS 将标识符字段设置为规范:

ALTERTABLE prod.db.sample SET IDENTIFIER FIELDS id
-- single columnALTERTABLE prod.db.sample SET IDENTIFIER FIELDS id,data-- multiple columns

标识符字段必须为 NOT NULL,后面的 ALTER 语句将覆盖之前的设置。

13.ALTER TABLE … DROP IDENTIFIER FIELDS

可以使用 DROP IDENTIFIER FIELDS 删除标识符字段:

ALTERTABLE prod.db.sample DROP IDENTIFIER FIELDS id
-- single columnALTERTABLE prod.db.sample DROP IDENTIFIER FIELDS id,data-- multiple columns

请注意,尽管标识符已被删除,但该列仍将存在于表架构中。

分支和标记 DDL

14.ALTER TABLE … CREATE BRANCH

可以通过 CREATE BRANCH 语句使用以下选项创建分支:

  • 如果分支已经存在且 IF NOT EXISTS 则不会失败
  • 如果分支已存在,则使用 CREATE OR REPLACE 更新分支
  • 在快照上创建
  • 创造并保留
-- CREATE audit-branch at current snapshot with default retention.ALTERTABLE prod.db.sample CREATE BRANCH `audit-branch`-- CREATE audit-branch at current snapshot with default retention if it doesn't exist.ALTERTABLE prod.db.sample CREATE BRANCH IFNOTEXISTS`audit-branch`-- CREATE audit-branch at current snapshot with default retention or REPLACE it if it already exists.ALTERTABLE prod.db.sample CREATEORREPLACE BRANCH `audit-branch`-- CREATE audit-branch at snapshot 1234 with default retention.ALTERTABLE prod.db.sample CREATE BRANCH `audit-branch`ASOF VERSION 1234-- CREATE audit-branch at snapshot 1234, retain audit-branch for 31 days, and retain the latest 31 days. The latest 3 snapshot snapshots, and 2 days worth of snapshots. ALTERTABLE prod.db.sample CREATE BRANCH `audit-branch`ASOF VERSION 1234 RETAIN 30 DAYS 
WITHSNAPSHOT RETENTION 3 SNAPSHOTS 2 DAYS

15.ALTER TABLE … CREATE TAG

可以通过 CREATE TAG 语句使用以下选项创建标签:

  • 如果标签已经存在并且使用 IF NOT EXISTS 则不会失败
  • 如果标签已存在,请使用 CREATE OR REPLACE 更新标签
  • 在快照上创建
  • 创造并保留
-- CREATE historical-tag at current snapshot with default retention.ALTERTABLE prod.db.sample CREATE TAG `historical-tag`-- CREATE historical-tag at current snapshot with default retention if it doesn't exist.ALTERTABLE prod.db.sample CREATE TAG IFNOTEXISTS`historical-tag`-- CREATE historical-tag at current snapshot with default retention or REPLACE it if it already exists.ALTERTABLE prod.db.sample CREATEORREPLACE TAG `historical-tag`-- CREATE historical-tag at snapshot 1234 with default retention.ALTERTABLE prod.db.sample CREATE TAG `historical-tag`ASOF VERSION 1234-- CREATE historical-tag at snapshot 1234 and retain it for 1 year. ALTERTABLE prod.db.sample CREATE TAG `historical-tag`ASOF VERSION 1234 RETAIN 365 DAYS

16.ALTER TABLE … REPLACE BRANCH

分支引用的快照可以通过 REPLACE BRANCH sql 进行更新。保留也可以在此声明中更新。

-- REPLACE audit-branch to reference snapshot 4567 and update the retention to 60 days.ALTERTABLE prod.db.sample REPLACE BRANCH `audit-branch`ASOF VERSION 4567 RETAIN 60 DAYS

17.ALTER TABLE … REPLACE TAG

标签引用的快照可以通过 REPLACE TAG sql 进行更新。保留也可以在此声明中更新。

-- REPLACE historical-tag to reference snapshot 4567 and update the retention to 60 days.ALTERTABLE prod.db.sample REPLACE TAG `historical-tag`ASOF VERSION 4567 RETAIN 60 DAYS

18.ALTER TABLE … DROP BRANCH

可以通过 DROP BRANCH sql 删除分支

ALTERTABLE prod.db.sample DROP BRANCH `audit-branch`

19.ALTER TABLE … DROP TAG

可以通过 DROP TAG sql 删除标签

ALTERTABLE prod.db.sample DROP TAG `historical-tag`

本文转载自: https://blog.csdn.net/zhengzaifeidelushang/article/details/135982308
版权归原作者 最笨的羊羊 所有, 如有侵权,请联系我们删除。

“Iceberg从入门到精通系列之二十二:Spark DDL”的评论:

还没有评论