官方文档链接(Spark整合Iceberg)
1.Getting Started
Spark 目前是进行 Iceberg 操作最丰富的计算引擎。官方建议从 Spark 开始,以理解 Iceberg 的概念和功能。
The latest version of Iceberg is 1.6.1.(2024年9月24日11:45:55)
在 Spark shell 中使用 Iceberg,需使用
--packages
选项:
spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1
Tips:
如果您想将 Iceberg 包含在 Spark 安装中,请将
iceberg-spark-runtime-3.5_2.12
Jar 添加到 Spark 的 jars 文件夹中。
Adding Catalogs 添加目录
Iceberg 提供了目录功能,使 SQL 命令能够管理表并通过名称加载它们。目录通过以下属性进行配置:
spark.sql.catalog.(catalog_name)
。
创建一个名为
local
的基于路径的目录,用于管理
$PWD/warehouse
下的表,并为 Spark 的内置目录添加对 Iceberg 表的支持:
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1 \--confspark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \--confspark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \--confspark.sql.catalog.spark_catalog.type=hive \--confspark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \--confspark.sql.catalog.local.type=hadoop \--confspark.sql.catalog.local.warehouse=$PWD/warehouse
创建 Iceberg 表
在 Spark 中创建第一个 Iceberg 表,可以使用
spark-sql
shell 或
spark.sql(...)
来运行
CREATE TABLE
命令:
-- local 是上述定义的基于路径的目录CREATETABLElocal.db.table(id bigint,data string)USING iceberg;
Iceberg 目录支持完整的 SQL DDL 命令,包括:
CREATE TABLE ... PARTITIONED BY
CREATE TABLE ... AS SELECT
ALTER TABLE
DROP TABLE
写入数据
创建表后,可以使用
INSERT INTO
向表中插入数据:
INSERTINTOlocal.db.tableVALUES(1,'a'),(2,'b'),(3,'c');INSERTINTOlocal.db.tableSELECT id,dataFROM source WHERE length(data)=1;
Iceberg 还支持行级 SQL 更新,包括
MERGE INTO
和
DELETE FROM
:
MERGEINTOlocal.db.target t
USING(SELECT*FROM updates) u
ON t.id = u.id
WHENMATCHEDTHENUPDATESET t.count = t.count + u.count
WHENNOTMATCHEDTHENINSERT*;
此外,Iceberg 支持通过新的 v2 DataFrame 写入 API 写入 DataFrames:
spark.table("source").select("id","data").writeTo("local.db.table").append()
旧的写入 API 得到支持,但不推荐使用。
读取数据
要使用 SQL 读取数据,可以在
SELECT
查询中使用 Iceberg 表的名称:
SELECTcount(1)as count,dataFROMlocal.db.tableGROUPBYdata;
SQL 也是检查表的推荐方式。要查看表中的所有快照,可以使用快照元数据表:
SELECT*FROMlocal.db.table.snapshots;
输出:
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+
| committed_at | snapshot_id | parent_id | operation | manifest_list |
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+
| 2019-02-08 03:29:51.215 | 57897183625154 | null | append | s3://.../table/metadata/snap-57897183625154-1.avro |
| | | | | |
| ... | ... | ... | ... | ... |
+-------------------------+----------------+-----------+-----------+----------------------------------------------------+
DataFrame 读取也得到了支持,可以通过名称引用表:
val df = spark.table("local.db.table")
df.count()
这就是在 Spark 中使用 Iceberg 创建、写入和读取表的基本步骤。
Spark 和 Iceberg 的类型兼容性
Spark type to Iceberg type
Spark 类型Iceberg 类型备注booleanbooleanshortintegerbyteintegerintegerintegerlonglongfloatfloatdoubledoubledatedatetimestamp带时区的时间戳timestamp_ntz不带时区的时间戳charstringvarcharstringstringstringbinarybinarydecimaldecimalstructstructarraylistmapmap
Tips:
- 数字类型(integer、long、float、double、decimal)在写入时支持提升。例如,可以将 Spark 类型
short
、byte
、integer
或long
写入 Iceberg 类型long
。 - 可以使用 Spark 的 binary 类型写入 Iceberg 固定类型,但会进行长度验证。
Iceberg type to Spark type
Iceberg 类型Spark 类型备注booleanbooleanintegerintegerlonglongfloatfloatdoubledoubledatedatetime不支持带时区的时间戳timestamp不带时区的时间戳timestamp_ntzstringstringuuidstringfixedbinarybinarybinarydecimaldecimalstructstructlistarraymapmap
2.Spark DDL
(1)CREATE TABLE 创建表
Spark 3 can create tables in any Iceberg catalog with the clause
USING iceberg
:
CREATETABLE prod.db.sample (
id bigintNOTNULLCOMMENT'唯一ID',data string)USING iceberg;
Iceberg会将Spark中的列类型转换为相应的Iceberg类型。
创建表的命令(包括CTAS和RTAS)支持一系列Spark创建选项,包括:
- **PARTITIONED BY (partition-expressions)**:配置分区。
- LOCATION ‘(fully-qualified-uri)’:设置表的位置。
- COMMENT ‘table documentation’:设置表描述。
- **TBLPROPERTIES (‘key’=‘value’, …)**:设置表配置。
Tips:
CREATE TABLE ... LIKE ...
语法不受支持。
(2)PARTITIONED BY 分区表的创建
CREATETABLE prod.db.sample (
id bigint,data string,
category string)USING iceberg
PARTITIONED BY(category);
PARTITIONED BY
子句支持转换表达式以创建隐藏分区:
CREATETABLE prod.db.sample (
id bigint,data string,
category string,
ts timestamp)USING iceberg
PARTITIONED BY(bucket(16, id), days(ts), category);
支持的分区转换表达式:
- **year(ts)**:按年分区
- **month(ts)**:按月分区
- day(ts) 或 **date(ts)**:等同于按日期整数分区
- hour(ts) 或 **date_hour(ts)**:等同于按日期整数和小时分区
- **bucket(N, col)**:按哈希值取模N的分区
- **truncate(L, col)**:按截断值分区(字符串按照给定长度截断;整数和长整型按区间截断,例如
truncate(10, i)
生成分区 0, 10, 20, 30, …)
注:为了向后兼容,旧语法
years(ts)
、
months(ts)
、
days(ts)
和
hours(ts)
也被支持。
(3)创建表(CTAS)
Iceberg 支持使用 SparkCatalog 进行原子操作的 CREATE TABLE AS SELECT(CTAS)。在使用 SparkSessionCatalog 时,CTAS 是不原子的。
基本语法:
CREATETABLE prod.db.sample
USING iceberg
ASSELECT...;
- 新创建的表不会继承源表的分区规范和表属性。可以使用
PARTITIONED BY
和TBLPROPERTIES
来声明新表的分区规范和表属性。
示例:
CREATETABLE prod.db.sample
USING iceberg
PARTITIONED BY(part)
TBLPROPERTIES ('key'='value')ASSELECT...;
(4)替换表(RTAS)
- 原子替换表操作会创建一个新快照,保留表的历史记录。
基本语法:
REPLACETABLE prod.db.sample
USING iceberg
ASSELECT...;
示例:
REPLACETABLE prod.db.sample
USING iceberg
PARTITIONED BY(part)
TBLPROPERTIES ('key'='value')ASSELECT...;
(5)创建或替换表
CREATEORREPLACETABLE prod.db.sample
USING iceberg
ASSELECT...;
- 替换表时的影响:- 如果使用
REPLACE TABLE
命令来替换一个表,并且新的查询结果的模式(schema)或分区规范(partition spec)发生了变化,那么原有的模式和分区会被新的内容替换。 - 如何避免修改:- 如果想保持表的现有模式和分区不变,可以使用
INSERT OVERWRITE
命令,而不是REPLACE TABLE
。这样做可以更新表的数据,但不会影响表的结构或分区设置。 - 表属性的处理:- 在
REPLACE TABLE
命令中,如果你定义了新的表属性,这些新属性会与现有的属性合并。- 如果新属性与现有属性相同,则保持不变;如果不同,则会更新现有的属性。
- 使用
REPLACE TABLE
会改变表的结构和分区。 - 使用
INSERT OVERWRITE
可以仅更新数据而不影响结构。 - 新的表属性会与旧的表属性合并,存在冲突时会更新。
(6)删除表
- 在 0.14 之前,运行
DROP TABLE
将从目录中移除表,并删除表内容。 - 从 0.14 开始,
DROP TABLE
仅从目录中移除表。要删除表内容,需使用DROP TABLE PURGE
。
基本语法:
- 删除表:
DROPTABLE prod.db.sample;
- 删除表及其内容:
DROPTABLE prod.db.sample PURGE;
(7)修改表
1.重命名表 ALTER TABLE … RENAME TO:
ALTERTABLE prod.db.sample RENAMETO prod.db.new_name;
2.设置或移除表属性:
- 设置属性 ALTER TABLE … SET TBLPROPERTIES:
ALTERTABLE prod.db.sample SET TBLPROPERTIES ('read.split.target-size'='268435456');
- 移除属性 ALTER TABLE … UNSET TBLPROPERTIES:
ALTERTABLE prod.db.sample UNSET TBLPROPERTIES ('read.split.target-size');
- 设置表注释:
ALTERTABLE prod.db.sample SET TBLPROPERTIES ('comment'='A table comment.');
- 添加、删除和重命名列: - 添加列 ALTER TABLE … ADD COLUMN:
ALTERTABLE prod.db.sample ADDCOLUMNS( new_column string comment'new_column docs');
- 添加嵌套字段:- 创建结构列:
ALTERTABLE prod.db.sample ADDCOLUMNpoint struct<x: double, y: double>;
- 向结构中添加字段:ALTERTABLE prod.db.sample ADDCOLUMNpoint.z double;
- 创建嵌套数组列:ALTERTABLE prod.db.sample ADDCOLUMN points array<struct<x: double, y: double>>;
- 向数组中的结构添加字段:ALTERTABLE prod.db.sample ADDCOLUMN points.element.z double;
- 创建映射列:ALTERTABLE prod.db.sample ADDCOLUMN points map<struct<x: int>, struct<a: int>>;
- 向映射值结构中添加字段:ALTERTABLE prod.db.sample ADDCOLUMN points.value.b int;
- 调整列的位置:- 在其他列后添加新列:
ALTERTABLE prod.db.sample ADDCOLUMN new_column bigintAFTER other_column;
- 在最前面添加新列:ALTERTABLE prod.db.sample ADDCOLUMN nested.new_column bigintFIRST;
- Note: Altering a map ‘key’ column by adding columns is not allowed. Only map values can be updated.
- 修改map的“键”列以添加列是不允许的,只能更新map的“值”。
6.重命名列
RENAME COLUMN
使用
RENAME COLUMN
可以重命名任何字段:
ALTERTABLE prod.db.sample RENAMECOLUMNdataTO payload;ALTERTABLE prod.db.sample RENAMECOLUMN location.lat TO latitude;
在 Iceberg 中,嵌套重命名命令只会影响到最底层的字段(即叶子字段)。
将
location.lat
重命名为
location.latitude
。
location
└─ lat
└─ long
执行命令后,结构变为:
location
└─ latitude
└─ long
这里的
location
仍然是父结构,但
lat
字段被重命名为
latitude
,而其他字段保持不变。这就是嵌套重命名的含义。
7.修改列类型
ALTER COLUMN
使用
ALTER COLUMN
来修改列的类型,前提是这种更新是安全的。安全更新包括:
int
到bigint
float
到double
decimal(P,S)
到decimal(P2,S)
(当P2 > P
时)
ALTERTABLE prod.db.sample ALTERCOLUMN measurement TYPEdouble;
8.更新列注释
ALTERTABLE prod.db.sample ALTERCOLUMN measurement TYPEdoubleCOMMENT'unit is bytes per second';ALTERTABLE prod.db.sample ALTERCOLUMN measurement COMMENT'unit is kilobytes per second';
9. 重新排序列
使用
FIRST
和
AFTER
子句,可以重新排序顶级列或结构中的列:
ALTERTABLE prod.db.sample ALTERCOLUMN col FIRST;ALTERTABLE prod.db.sample ALTERCOLUMN nested.col AFTER other_col;
10. 更改列的可空性
对于非可空列,可以使用
DROP NOT NULL
更改可空性:
ALTERTABLE prod.db.sample ALTERCOLUMN id DROPNOTNULL;
注意:不能通过
SET NOT NULL
将可空列更改为非可空列,因为 Iceberg 无法确保是否存在空值。
11. 删除列
要删除列,可以使用
DROP COLUMN
:
ALTERTABLE prod.db.sample DROPCOLUMN id;ALTERTABLE prod.db.sample DROPCOLUMNpoint.z;
ALTER TABLE SQL 扩展
Iceberg 支持使用
ADD PARTITION FIELD
命令向分区规范中添加新的分区字段:
ALTERTABLE prod.db.sample ADDPARTITION FIELD catalog;-- 身份变换
也可以使用不同的分区变换,例如:
ALTERTABLE prod.db.sample ADDPARTITION FIELD bucket(16, id);ALTERTABLE prod.db.sample ADDPARTITION FIELD truncate(4,data);ALTERTABLE prod.db.sample ADDPARTITION FIELD year(ts);
使用可选的
AS
关键字可以为分区字段指定自定义名称:
ALTERTABLE prod.db.sample ADDPARTITION FIELD bucket(16, id)AS shard;
- 添加分区字段是一个元数据操作,不会更改现有表的数据。新数据将按照新的分区方式写入,但现有数据仍将保持在旧的分区布局中。在元数据表中,旧数据文件的新的分区字段将显示为 null 值。
当表的分区发生变化时,动态分区覆盖行为将发生变化,因为动态覆盖会隐式替换分区。要显式覆盖,请使用新的 DataFrameWriterV2 API。
如果从按天分区迁移到按小时分区,动态分区覆盖行为将有所不同。例如,如果原本是按天分区,改为按小时分区,覆盖操作将只覆盖小时分区,而不再覆盖天分区。
如果需要从日分区迁移到小时分区,建议保留日分区字段,以确保现有元数据表查询能够继续正常工作。
3.Spark Queries
选择表中的所有记录
SELECT*FROM prod.db.table;
prod
是目录,db
是命名空间,table
是表名。
访问元数据表:
可以使用 Iceberg 表名作为命名空间查询元数据表,例如,要读取特定 Iceberg 表的文件元数据:
使用 DataFrame 查询
要将 Iceberg 表加载为 Spark 中的 DataFrame,可以使用以下命令:
val df = spark.table("prod.db.table")
使用此命令可以利用 Spark DataFrame 操作的全范围对 DataFrame 进行操作。
加载 DataFrame 后,可以执行各种操作。例
- 显示 DataFrame:
df.show()
- 过滤记录:
val filteredDf = df.filter($"columnName"==="someValue")
- 分组和聚合:
val aggregatedDf = df.groupBy("columnName").count()
- 写回 Iceberg 表:
filteredDf.write.format("iceberg").mode("append").save("prod.db.table")
Iceberg 表的时间旅行(Time Travel)
从 Spark 3.3 开始,Iceberg 支持在 SQL 查询中使用
TIMESTAMP AS OF
或
VERSION AS OF
子句进行时间旅行。
时间旅行查询示例:
- 基于时间戳的查询- 时间旅行到1986年10月26日01:21:00
SELECT*FROM prod.db.tableTIMESTAMPASOF'1986-10-26 01:21:00';
- 基于快照 ID 的查询- 时间旅行到快照 ID 为 10963874102873 的快照
SELECT*FROM prod.db.table VERSION ASOF10963874102873;
- 基于分支的查询- 时间旅行到
audit-branch
的最新快照SELECT*FROM prod.db.table VERSION ASOF'audit-branch';
- 基于标签的查询- 时间旅行到
historical-snapshot
标签引用的快照SELECT*FROM prod.db.table VERSION ASOF'historical-snapshot';
时间戳也可以以 Unix 时间戳(秒)提供:
- 使用 Unix 时间戳查询
SELECT*FROM prod.db.tableTIMESTAMPASOF499162860;SELECT*FROM prod.db.tableFOR SYSTEM_TIME ASOF499162860;
可以使用类似于元数据表的语法指定分支或标签:
- 指定分支
SELECT*FROM prod.db.table.`branch_audit-branch`;
- 指定标签
SELECT*FROM prod.db.table.`tag_historical-snapshot`;
(包含“-”的标识符无效,因此必须使用反引号转义。)
不同的时间旅行查询可以使用快照的架构或表的架构:
- 使用快照的架构(基于时间戳和快照 ID 的查询)
SELECT*FROM prod.db.tableTIMESTAMPASOF'1986-10-26 01:21:00';SELECT*FROM prod.db.table VERSION ASOF10963874102873;SELECT*FROM prod.db.table VERSION ASOF'historical-snapshot';SELECT*FROM prod.db.table.`tag_historical-snapshot`;
- 使用表的架构(基于分支的查询)
SELECT*FROM prod.db.table VERSION ASOF'audit-branch';SELECT*FROM prod.db.table.`branch_audit-branch`;
Iceberg 中的DataFrame时间旅行
Iceberg 支持在 DataFrame API 中使用四个 Spark 读取选项来选择特定的表快照或特定时间的快照:
- snapshot-id: 选择特定的表快照。
- as-of-timestamp: 选择某个时间点的当前快照,单位为毫秒。
- branch: 选择指定分支的最新快照。注意,目前不能将分支与时间戳结合使用。
- tag: 选择与指定标签关联的快照。标签也不能与时间戳结合使用。
示例:
- 时间旅行到1986年10月26日01:21:00
spark.read .option("as-of-timestamp","499162860000").format("iceberg").load("path/to/table")
- 时间旅行到快照 ID 为 10963874102873 的快照
spark.read .option("snapshot-id",10963874102873L).format("iceberg").load("path/to/table")
- 时间旅行到标签
historical-snapshot
spark.read .option(SparkReadOptions.TAG,"historical-snapshot").format("iceberg").load("path/to/table")
- 时间旅行到
audit-branch
的最新快照spark.read .option(SparkReadOptions.BRANCH,"audit-branch").format("iceberg").load("path/to/table")
Iceberg增量读取
- start-snapshot-id: 用于增量扫描的起始快照 ID(不包括该快照)。
- end-snapshot-id: 用于增量扫描的结束快照 ID(包括该快照)。这是可选的。如果省略,将默认为当前快照。
示例代码
// 获取在 start-snapshot-id (10963874102873L) 之后追加的数据,直到 end-snapshot-id (63874143573109L)
spark.read
.format("iceberg").option("start-snapshot-id","10963874102873").option("end-snapshot-id","63874143573109").load("path/to/table")
- 当前仅支持从追加操作中获取数据,无法支持替换、覆盖或删除操作。
- 增量读取适用于 V1 和 V2 格式版本。
- Spark 的 SQL 语法不支持增量读取。
Iceberg 支持使用元数据表来检查表的历史和快照。元数据表通过在原始表名称后添加元数据表名来识别。例如,查看
db.table
的历史可以使用
db.table.history
。
表历史查询
要显示表的历史,可以执行以下查询:
SELECT*FROM prod.db.table.history;
made_current_atsnapshot_idparent_idis_current_ancestor2019-02-08 03:29:51.2155781947118336215154NULLtrue2019-02-08 03:47:55.94851792995261850568305781947118336215154true2019-02-09 16:24:30.132964100402475335445179299526185056830false2019-02-09 16:32:47.33629998756080624373305179299526185056830true2019-02-09 19:42:03.91989245587860605834792999875608062437330true2019-02-09 19:49:16.34365367338231819750458924558786060583479true
元数据日志条目查询
要查看表的元数据日志条目,可以执行以下查询:
SELECT*FROM prod.db.table.metadata_log_entries;
timestampfilelatest_snapshot_idlatest_schema_idlatest_sequence_number2022-07-28 10:43:52.93s3://…/table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.jsonnullnullnull2022-07-28 10:43:57.487s3://…/table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json170260833677645300012022-07-28 10:43:58.25s3://…/table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json95890649397670977402
快照查询
SELECT*FROM prod.db.table.snapshots;
示例结果:
committed_atsnapshot_idparent_idoperationmanifest_listsummary2019-02-08 03:29:51.21557897183625154nullappends3://…/table/metadata/snap-57897183625154-1.avro{ added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, spark.app.id -> application_1520379288616_155055 }
Apache Iceberg 是一个高性能的表格式,用于大数据处理。下面我将根据你提到的各个方面,概述 Iceberg 的使用。
Entries 快照的详细信息
Entries 表提供了有关表中每个快照的详细信息。查询示例如下:
SELECT*FROM my_catalog.db.my_table.entries;
4.Spark Write
1. 特性支持
Iceberg 利用 Apache Spark 的 DataSourceV2 API,支持多种写入方式。不同版本的 Spark 对某些功能的支持程度不同:
功能Spark 3备注SQL 插入✔️⚠ 需要
spark.sql.storeAssignmentPolicy=ANSI
(自 Spark 3.0 默认)SQL 合并✔️⚠ 需要 Iceberg Spark 扩展SQL 覆盖插入✔️⚠ 需要
spark.sql.storeAssignmentPolicy=ANSI
SQL 删除✔️⚠ 行级删除需要 Iceberg Spark 扩展SQL 更新✔️⚠ 需要 Iceberg Spark 扩展DataFrame 附加写入✔️DataFrame 覆盖写入✔️DataFrame CTAS 和 RTAS✔️⚠ 需要 DSv2 API
2. 使用 SQL 写入
Spark 3 支持 SQL 的
INSERT INTO
、
MERGE INTO
和
INSERT OVERWRITE
操作。
INSERT INTO
用于向表中追加新数据:
INSERTINTO prod.db.tableVALUES(1,'a'),(2,'b');INSERTINTO prod.db.tableSELECT...;
MERGE INTO
支持对目标表进行行级更新。Iceberg 通过重写包含需要更新行的数据文件来实现
MERGE INTO
。
**推荐使用
MERGE INTO
而不是
INSERT OVERWRITE
,因为它只替换受影响的数据文件,避免了因分区变化导致的数据覆盖不一致问题。**
MERGEINTO prod.db.target t -- 目标表USING(SELECT...) s -- 源更新ON t.id = s.id -- 用于找到更新的条件WHENMATCHEDAND s.op ='delete'THENDELETEWHENMATCHEDAND t.count ISNULLAND s.op ='increment'THENUPDATESET t.count =0WHENMATCHEDAND s.op ='increment'THENUPDATESET t.count = t.count +1WHENNOTMATCHEDTHENINSERT*;
可以根据条件添加多个
WHEN MATCHED
子句。如果源数据中的多条记录匹配同一目标表的行,将会抛出错误。
INSERT OVERWRITE
在 Iceberg 中,
INSERT OVERWRITE
允许用查询结果替换表中的数据。此操作是原子的,确保数据的一致性。
1. 覆盖行为
Iceberg 支持两种覆盖模式:静态模式和动态模式。
- 静态覆盖模式(默认):- 通过将
PARTITION
子句转换为过滤器来确定要覆盖的分区。- 如果省略了PARTITION
子句,将替换表中的所有分区。 - 动态覆盖模式(推荐):- 仅替换由
SELECT
查询产生的行所在的分区。- 通过设置spark.sql.sources.partitionOverwriteMode=dynamic
来启用。
2. 示例表结构
以下是一个示例日志表的 DDL 定义:
CREATETABLE prod.my_app.logs (
uuid string NOTNULL,level string NOTNULL,
ts timestampNOTNULL,
message string)USING iceberg
PARTITIONED BY(level, hours(ts))
3. 动态覆盖示例
当 Spark 的覆盖模式为动态时,以下查询会替换所有包含查询结果的分区:
INSERT OVERWRITE prod.my_app.logs
SELECT uuid,first(level),first(ts),first(message)FROM prod.my_app.logs
WHERE cast(ts asdate)='2020-07-01'GROUPBY uuid
在动态模式下,仅会替换 2020 年 7 月 1 日的小时分区。
4. 静态覆盖示例
在静态模式下,如果没有
PARTITION
子句,将替换所有现有行:
INSERT OVERWRITE prod.my_app.logs
SELECT uuid,first(level),first(ts),first(message)FROM prod.my_app.logs
WHERE cast(ts asdate)='2020-07-01'GROUPBY uuid
这将删除表中所有行,只写入 7 月 1 日的日志。
要仅覆盖特定分区,可以添加
PARTITION
子句:
INSERT OVERWRITE prod.my_app.logs
PARTITION(level='INFO')SELECT uuid,first(level),first(ts),first(message)FROM prod.my_app.logs
WHERElevel='INFO'GROUPBY uuid
注意:静态模式无法像动态模式那样替换小时分区,因为
PARTITION
子句只能引用表列,而不能引用隐藏分区。
DELETE
DELETE FROM
查询允许根据条件过滤来删除表中的行。
- 删除指定时间范围内的记录:
DELETEFROM prod.db.tableWHERE ts >='2020-05-01 00:00:00'AND ts <'2020-06-01 00:00:00'
- 删除
all_events
表中 session_time 小于good_events
表中的最小 session_time 的记录:DELETEFROM prod.db.all_eventsWHERE session_time <(SELECTMIN(session_time)FROM prod.db.good_events)
- 删除
orders
表中存在于returned_orders
表的订单:DELETEFROM prod.db.orders AS t1WHEREEXISTS(SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
注意:
- 如果删除条件匹配整个分区,Iceberg 将执行元数据仅删除(metadata-only delete)。
- 如果删除条件匹配单个行,Iceberg 将仅重写受影响的数据文件。
UPDATE
- 更新指定时间范围内的记录的字段值:
UPDATE prod.db.tableSET c1 ='update_c1', c2 ='update_c2'WHERE ts >='2020-05-01 00:00:00'AND ts <'2020-06-01 00:00:00'
- 更新
all_events
表中 session_time 小于good_events
表中的最小 session_time 的记录:UPDATE prod.db.all_eventsSET session_time =0, ignored =trueWHERE session_time <(SELECTMIN(session_time)FROM prod.db.good_events)
- 更新
orders
表中存在于returned_orders
表的订单状态:UPDATE prod.db.orders AS t1SET order_status ='returned'WHEREEXISTS(SELECT oid FROM prod.db.returned_orders WHERE t1.oid = oid)
Iceberg 分支写入指南
- 分支存在性:在执行任何写入操作之前,分支必须已经存在。可以使用 Spark DDL 命令创建分支。
- 模式验证:在向分支写入数据时,将验证表的当前模式。
通过 SQL 写入
- 插入到审计分支:
INSERTINTO prod.db.table.branch_audit VALUES(1,'a'),(2,'b');
- 合并到审计分支:
MERGEINTO prod.db.table.branch_audit t USING(SELECT...) s ON t.id = s.id WHEN...
- 更新审计分支:
UPDATE prod.db.table.branch_audit AS t1SET val ='c';
- 从审计分支删除:
DELETEFROM prod.db.table.branch_audit WHERE id =2;
- WAP 分支写入:
SET spark.wap.branch = audit-branch;INSERTINTO prod.db.tableVALUES(3,'c');
通过 DataFrame 写入
- 插入到审计分支:
val data: DataFrame =...data.writeTo("prod.db.table.branch_audit").append()
- 覆盖审计分支:
val data: DataFrame =...data.writeTo("prod.db.table.branch_audit").overwritePartitions()
后续继续更新~
列位可以移步博主Apache Iceberg专栏,或许对您理解Iceberg有所帮助😊
版权归原作者 喻师傅 所有, 如有侵权,请联系我们删除。