HUDI表相关概念
- 表类型- cow- mor
- 分区表/不分区表用户可以在Spark SQL中创建分区表和非分区表。要创建分区表,需要使用partitioned by语句指定分区列来创建分区表。当没有使用create table命令进行分区的by语句时,table被认为是一个未分区的表。
- 内部表和外部表一般情况下,Spark SQL支持两种表,即内部表和外部表。如果使用location语句指定一个位置,或者使用create external table显式地创建表,那么它就是一个外部表,否则它被认为是一个内部表。
特别注意:
- 从hudi 0.10.0开始,在创建hudi表时必须指定primaryKey用于表示主键字段。假如你没有指定primaryKey字段,则hudi默认会将uuid作为主键字段。我们建议在创建hudi表时,必须指定主键字段。
- 对于mor表,必须指定preCombineField字段用于表示数据先后顺序。
- 如果表是分区表,必须显式指定配置hoodie.datasource.write.hive_style_partitioning为true,而如果表为不分区表,则必须显式指定配置hoodie.datasource.write.hive_style_partitioning为false。
使用hudi catalog
WDP中的spark安装hudi之后,默认使用hudi catalog。在建表时会将元数据保存到hive metastore中。
创建不分区内部表
-- 创建cow不分区表,指定主键为uuid
create table hudi_cow_nonpcf_tbl (
uuid int,
name string,
price double
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'uuid',
hoodie.datasource.write.hive_style_partitioning = 'false'
);
-- 创建mor不分区表,指定主键为id,指定预合并字段为ts
create table hudi_mor_tbl (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts',
hoodie.datasource.write.hive_style_partitioning = 'false'
);
创建分区外部表
create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts',
hoodie.datasource.write.hive_style_partitioning = 'true',
hoodie.datasource.hive_sync.mode = 'hms'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';
create external table hudi_cow_pt_tbl_2 (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts',
hoodie.datasource.write.hive_style_partitioning = 'true'
)
partitioned by (dt, hh);
更多关于表中tblproperties属性的设置可见官网:https://hudi.apache.org/docs/basic_configurations
CTAS
Hudi支持在Spark SQL上使用CTAS (Create Table As Select)创建hudi表。
注意:为了获得更好的性能来加载数据到hudi表,CTAS使用批量插入(bulk insert)作为写操作。
下述为使用实例CTAS命令创建不带preCombineField的非分区COW表。
create table hudi_ctas_cow_nonpcf_tbl
using hudi
tblproperties (primaryKey = 'id')
as
select 1 as id, 'a1' as name, 10 as price;
使用CTAS创建含有主键和分区字段的COW表
create table hudi_ctas_cow_pt_tbl
using hudi
tblproperties (type = 'cow', primaryKey = 'id', preCombineField = 'ts')
partitioned by (dt)
as
select 1 as id, 'a1' as name, 10 as price, 1000 as ts, '2021-12-01' as dt;
使用CTAS从另一个表加载数据。
# create managed parquet table
create table parquet_mngd using parquet location 'file:///tmp/parquet_dataset/*.parquet';
# CTAS by loading data into hudi table
create table hudi_ctas_cow_pt_tbl2 using hudi location 'file:/tmp/hudi/hudi_tbl/' options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (datestr) as select * from parquet_mngd;
插入数据
-- insert into non-partitioned table
insert into hudi_cow_nonpcf_tbl select 1, 'a1', 20;
insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
-- insert dynamic partition
insert into hudi_cow_pt_tbl partition (dt, hh)
select 1 as id, 'a1' as name, 1000 as ts, '2021-12-09' as dt, '10' as hh;
-- insert static partition
insert into hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='11') select 2, 'a2', 1000;
注意:
默认情况下,如果提供了preCombineKey,则insert into 使用 upsert作为写操作的类型,否则使用insert。
我们支持使用bulk_insert作为写操作的类型,只需要设置两个配置:hoodie.sql.bulk.insert.enable和hoodie.sql.insert.mode。以下述为例:
-- upsert mode for preCombineField-provided table
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1 a1_1 20.0 1001
-- bulk_insert mode for preCombineField-provided table
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;
insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1 a1_1 20.0 1001
1 a1_2 20.0 1002
数据查询
select * from hudi_mor_nonpcf_tbl where price > 10;
从 0.9.0 开始 hudi 已经支持 hudi 内置的 FileIndex:HoodieFileIndex 来查询 hudi 表,支持分区剪枝和 metatable 查询。 这将有助于提高查询性能。 它还支持非全局查询路径,这意味着用户可以通过基本路径查询表,而无需在查询路径中指定“*”。 此功能默认为非全局查询路径启用。 对于全局查询路径,hudi 使用旧的查询路径。 有关支持的所有表类型和查询类型的更多信息,请参阅表类型和查询。
时间旅行查询
Hudi 从 0.9.0 开始支持时间旅行查询。 目前支持三种查询时间格式,如下所示。
注意:仅支持Spark 3.2+版本
create table hudi_cow_pt_tbl_3 (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts',
hoodie.datasource.write.hive_style_partitioning = 'true'
)
partitioned by (dt, hh);
insert into hudi_cow_pt_tbl_3 select 1, 'a0', 1000, '2021-12-09', '10';
select * from hudi_cow_pt_tbl_3;
-- record id=1 changes `name`
insert into hudi_cow_pt_tbl_3 select 1, 'a1', 1001, '2021-12-09', '10';
select * from hudi_cow_pt_tbl_3;
-- time travel based on first commit time, assume `20220725112636518`
select * from hudi_cow_pt_tbl_3 timestamp as of '20220725112636518' where id = 1;
-- time travel based on different timestamp formats
select * from hudi_cow_pt_tbl_3 timestamp as of '2022-07-25 11:26:36.100' where id = 1;
select * from hudi_cow_pt_tbl_3 timestamp as of '2022-07-26' where id = 1;
数据更新
这类似于插入新数据。 使用数据生成器生成对现有行程的更新,加载到 DataFrame 并将 DataFrame 写入 hudi 表。
Spark SQL 支持两种 DML 更新 hudi 表:Merge-Into 和 Update。
Update
语法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
示例
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
update hudi_cow_pt_tbl set name = 'a1_1', ts = 1001 where id = 1;
-- update using non-PK field
update hudi_cow_pt_tbl set ts = 1001 where name = 'a1';
注意:update操作需要有
preCombineField
字段
MergeInto
语法
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
<merge_condition> =A equal bool condition
<matched_action> =
DELETE |
UPDATE SET * |
UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action> =
INSERT * |
INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
示例
-- source table using hudi for testing merging into non-partitioned table
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (
primaryKey = 'id',
preCombineField = 'ts',
hoodie.datasource.write.hive_style_partitioning = 'false'
);
insert into merge_source values (1, "old_a1", 22.22, 900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);
-- 该语法会导致使用HIVE JDBC,所以源表需要指定同步方式为hms
merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;
-- source table using parquet for testing merging into partitioned table
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2021-12-09', '10'), (2, "new_a2", 'delete', '2021-12-09', '11'), (3, "new_a3", 'insert', '2021-12-09', '12');
merge into hudi_cow_pt_tbl as target
using (
select id, name, '1000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;
数据删除
语法
DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
示例
delete from hudi_cow_nonpcf_tbl where uuid = 1;
delete from hudi_mor_tbl where id % 2 = 0;
-- delete using non-PK field
delete from hudi_cow_pt_tbl where name = 'a1';
Insert Overwrite
对于批处理 ETL 作业,此操作可能比 upsert 更快,后者一次重新计算整个目标分区(而不是增量更新目标表)。 这是因为,我们能够完全绕过 upsert 写入路径中的索引、预组合和其他重新分区步骤。
插入覆盖一个分区表使用INSERT_OVERWRITE类型的写操作,而一个非分区表使用INSERT_OVERWRITE_TABLE。
-- insert overwrite non-partitioned table
insert overwrite hudi_mor_tbl select 99, 'a99', 20.0, 900;
insert overwrite hudi_cow_nonpcf_tbl select 99, 'a99', 20.0;
-- insert overwrite partitioned table with dynamic partition
insert overwrite table hudi_cow_pt_tbl select 10, 'a10', 1100, '2021-12-09', '10';
-- insert overwrite partitioned table with static partition
insert overwrite hudi_cow_pt_tbl partition(dt = '2021-12-09', hh='12') select 13, 'a13', 1100;
更多Spark SQL命令
Alter Table
模式演变可以通过 ALTER TABLE 命令来实现。 下面显示了一些基本示例。
语法:
-- Alter table name
ALTER TABLE oldTableName RENAME TO newTableName
-- Alter table add columns
ALTER TABLE tableIdentifier ADD COLUMNS(colAndType (,colAndType)*)
-- Alter table column type
ALTER TABLE tableIdentifier CHANGE COLUMN colName colName colType
-- Alter table properties
ALTER TABLE tableIdentifier SET TBLPROPERTIES (key = 'value')
示例
--rename to:
ALTER TABLE hudi_cow_nonpcf_tbl RENAME TO hudi_cow_nonpcf_tbl2;
--add column:
ALTER TABLE hudi_cow_nonpcf_tbl2 add columns(remark string);
--change column:
ALTER TABLE hudi_cow_nonpcf_tbl2 change column uuid uuid bigint;
--set properties;
alter table hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits = '10');
分区SQL命令
语法:
-- Drop Partition
ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] )
-- Show Partitions
SHOW PARTITIONS tableIdentifier
示例
--show partition:
show partitions hudi_cow_pt_tbl;
--drop partition:
alter table hudi_cow_pt_tbl drop partition (dt='2021-12-09', hh='10');
目前,show partitions 的结果基于文件系统表路径。 删除整个分区数据或直接删除某个分区是不准确的。
Procedures
语法
--Call procedure by positional arguments
CALL system.procedure_name(arg_1, arg_2, ... arg_n)
--Call procedure by named arguments
CALL system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1, ... arg_name_n => arg_n)
示例
--show commit's info
call show_commits(table => 'test_hudi_table', limit => 10);
Call命令已经支持一些提交程序和表优化程序,更多细节请参考Procedures。
版权归原作者 BigDataToAI 所有, 如有侵权,请联系我们删除。