0


Hudi-集成Spark之spark-sql方式

Hudi集成Spark之spark-sql方式

启动spark-sql

# 启动spark-sql之前需要先启动Hive的Metastorenohup hive --service metastore &#针对Spark 3.2
spark-sql \--conf'spark.serializer=org.apache.spark.serializer.KryoSerializer'\--conf'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'\--conf'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'# 如果没有配置hive环境变量,手动拷贝hive-site.xml到spark的conf下

创建表

建表参数:
参数名默认值说明primaryKeyuuid表的主键名,多个字段用逗号分隔。同 hoodie.datasource.write.recordkey.fieldpreCombineField表的预合并字段。同 hoodie.datasource.write.precombine.fieldtypecow创建的表类型: type = ‘cow’ type = 'mor’同 hoodie.datasource.write.table.type
(1)创建非分区表

创建一个 cow 表,默认 primaryKey ‘uuid’,不提供 preCombineField

createdatabase spark_hudi;use spark_hudi;createtable hudi_cow_nonpcf_tbl (
    uuid int,
    name string,
    price double)using hudi;

(2)创建一个 mor 非分区表

createtable hudi_mor_tbl (
    id int,
    name string,
    price double,
    ts bigint)using hudi
tblproperties (type='mor',
    primaryKey ='id',
    preCombineField ='ts');

(3)创建分区表

创建一个 cow 分区外部表,指定 primaryKey 和 preCombineField。此刻数据在hdfs上

createtable hudi_cow_pt_tbl (
    id bigint,
    name string,
    ts bigint,
    dt string,
    hh string
)using hudi
tblproperties (type='cow',
    primaryKey ='id',
    preCombineField ='ts')
partitioned by(dt, hh)
location '/opt/hudi/hudi_cow_pt_tbl';

(4)在已有的 hudi 表上创建新表,不需要指定模式和非分区列(如果存在)之外的任何属性,Hudi 可以自动识别模式和配置。

  • 非分区表createtable hudi_existing_tbl0 using hudilocation 'file:///opt/datas/hudi/dataframe_hudi_nonpt_table';
  • 分区表createtable hudi_existing_tbl1 using hudipartitioned by(dt, hh)location 'file:///opt/datas/dataframe_hudi_pt_table';

(5)通过 CTAS (Create Table As Select)建表为了提高向 hudi 表加载数据的性能,CTAS 使用批量插入作为写操作。

  • 通过 CTAS 创建 cow 非分区表,不指定 preCombineFieldcreatetable hudi_ctas_cow_nonpcf_tblusing huditblproperties (primaryKey ='id')asselect1as id,'a1'as name,10as price;
  • 通过 CTAS 创建 cow 分区表,指定 preCombineFieldcreatetable hudi_ctas_cow_pt_tblusing huditblproperties (type='cow', primaryKey ='id', preCombineField ='ts')partitioned by(dt)asselect1as id,'a1'as name,10as price,1000as ts,'2021-12-01'as dt;
  • 通过 CTAS 从其他表加载数据# 创建内部表createtable parquet_mngd using parquet location 'file:///opt/datas/parquet_dataset/*.parquet';# 通过 CTAS 加载数据createtable hudi_ctas_cow_pt_tbl2 using hudi location 'file://opt/datas/hudi/hudi_tbl/' options (type='cow', primaryKey ='id', preCombineField ='ts')partitioned by(datestr)asselect*from parquet_mngd;

插入数据

默认情况下,如果提供了 preCombineKey,则 insert into 的写操作类型为 upsert,否则使用 insert

(1)向非分区表插入数据

insertinto hudi_cow_nonpcf_tbl select1,'a1',20;insertinto hudi_mor_tbl select1,'a1',20,1000;

(2)向分区表动态分区插入数据

insertinto hudi_cow_pt_tbl partition(dt, hh)select1as id,'a1'as name,1000as ts,'2021-12-09'as dt,'10'as hh;

(3)向分区表静态分区插入数据

insertinto hudi_cow_pt_tbl partition(dt ='2021-12-09', hh='11')select2,'a2',1000;

(4)使用 bulk_insert 插入数据

hudi 支持使用 bulk_insert 作为写操作的类型,只需要设置两个配置:hoodie.sql.bulk.insert.enable 和 hoodie.sql.insert.mode。

-- 向指定 preCombineKey 的表插入数据,则写操作为 upsertinsertinto hudi_mor_tbl select1,'a1_1',20,1001;select id, name, price, ts from hudi_mor_tbl;1 a1_1 20.01001-- 向指定 preCombineKey 的表插入数据,指定写操作为 bulk_insert set hoodie.sql.bulk.insert.enable=true;set hoodie.sql.insert.mode=non-strict;insertinto hudi_mor_tbl select1,'a1_2',20,1002;select id, name, price, ts from hudi_mor_tbl;1 a1_1 20.010011 a1_2 20.01002

查询数据

(1)查询

select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare >20.0;

(2)时间旅行查询

Hudi 从 0.9.0 开始就支持时间旅行查询。Spark SQL 方式要求 Spark 版本 3.2 及以上。

-- 关闭前面开启的 bulk_insertset hoodie.sql.bulk.insert.enable=false;-- 数据写入到hdfs上createtable hudi_cow_pt_tbl1 (
    id bigint,
    name string,
    ts bigint,
    dt string,
    hh string
)using hudi
tblproperties (type='cow',
    primaryKey ='id',
    preCombineField ='ts')
partitioned by(dt, hh)
location '/opt/datas/hudi/hudi_cow_pt_tbl1';-- 插入一条 id 为 1 的数据insertinto hudi_cow_pt_tbl1 select1,'a0',1000,'2021-12-09','10';select*from hudi_cow_pt_tbl1;-- 修改 id 为 1 的数据insertinto hudi_cow_pt_tbl1 select1,'a1',1001,'2021-12-09','10';select*from hudi_cow_pt_tbl1;-- 基于第一次提交时间进行时间旅行select*from hudi_cow_pt_tbl1 timestampasof'20220307091628793'where id =1;-- 其他时间格式的时间旅行写法select*from hudi_cow_pt_tbl1 timestampasof'2022-03-07 09:16:28.100'where id =1;select*from hudi_cow_pt_tbl1 timestampasof'2022-03-08'where id =1;

更新数据

(1)update

更新操作需要指定 preCombineField。

  • 语法UPDATE tableIdentifier SETcolumn= EXPRESSION(,column= EXPRESSION)[WHERE boolExpression]
  • 执行更新update hudi_mor_tbl set price = price *2, ts =1111where id =1;update hudi_cow_pt_tbl1 set name ='a1_1', ts =1001where id =1;-- update using non-PK fieldupdate hudi_cow_pt_tbl1 set ts =1111where name ='a1_1';

(2)MergeInto

  • 语法MERGEINTO tableIdentifier AS target_aliasUSING(sub_query | tableIdentifier)AS source_aliasON<merge_condition>[WHENMATCHED[AND<condition>]THEN<matched_action>][WHENMATCHED[AND<condition>]THEN<matched_action>][WHENNOTMATCHED[AND<condition>]THEN<not_matched_action>]<merge_condition>=A equal bool condition <matched_action>=DELETE|UPDATESET*|UPDATESET column1 = expression1 [, column2 = expression2 ...]<not_matched_action>=INSERT*|INSERT(column1 [, column2 ...])VALUES(value1 [, value2 ...])可以看作是一个join操作。
  • 执行案例执行前开启hive的hiveservice2[root@hadoop102 bin]# ./hiveserver2 start``````-- 1、准备 source 表:非分区的 hudi 表,插入数据createtable merge_source (id int, name string, price double, ts bigint)using hudi tblproperties (primaryKey ='id', preCombineField ='ts');insertinto merge_source values(1,"old_a1",22.22,2900),(2,"new_a2",33.33,2000),(3,"new_a3",44.44,2000);mergeinto hudi_mor_tbl as target using merge_source as source on target.id = source.id whenmatchedthenupdateset*whennotmatchedtheninsert*;-- 2、准备 source 表:分区的 parquet 表,插入数据createtable merge_source2 (id int, name string, flag string, dt string, hh string)using parquet;insertinto merge_source2 values(1,"new_a1",'update','2021-12- 09','10'),(2,"new_a2",'delete','2021-12-09','11'),(3,"new_a3",'insert','2021-12-09','12');mergeinto hudi_cow_pt_tbl1 as targetusing(select id, name,'2000'as ts, flag, dt, hh from merge_source2) sourceon target.id = source.idwhenmatchedand flag !='delete'thenupdateset id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hhwhenmatchedand flag ='delete'thendeletewhennotmatchedtheninsert(id, name, ts, dt, hh)values(source.id, source.name, source.ts, source.dt, source.hh);

mergeInto会发生的报错:

Could not sync using the meta sync class org.apache.hudi.hive.HiveSyncTool

java.sql.SQLException: Could notopen client transport with JDBC Uri: jdbc:hive2://localhost:10000: Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.security.AccessControlException: Permission denied: user=hive, access=EXECUTE, inode="/tmp":root:supergroup:drwxrwx---

解决方案:https://blog.csdn.net/weixin_45417821/article/details/128651942

删除数据

语法:

DELETEFROM tableIdentifier [WHERE BOOL_EXPRESSION]

案例:

deletefrom hudi_cow_nonpcf_tbl where uuid =1;deletefrom hudi_mor_tbl where id %2=0;-- 使用非主键字段删除deletefrom hudi_cow_pt_tbl1 where name ='a1_1';

覆盖数据

  • 使用 INSERT_OVERWRITE 类型的写操作覆盖分区表
  • 使用 INSERT_OVERWRITE_TABLE 类型的写操作插入覆盖非分区表或分区表(动态分区)

(1)insert overwrite 非分区表

insert overwrite hudi_mor_tbl select99,'a99',20.0,900;insert overwrite hudi_cow_nonpcf_tbl select99,'a99',20.0;

(2)通过动态分区 insert overwrite table 到分区表

insert overwrite table hudi_cow_pt_tbl1 select10,'a10',1100,'2021-12-09','11';

(3)通过静态分区 insert overwrite 分区表

insert overwrite hudi_cow_pt_tbl1 partition(dt ='2021-12-09', hh='12')select13,'a13',1100;

修改表结构(Alter Table)

语法:

-- Alter table nameALTERTABLE oldTableName RENAMETO newTableName
-- Alter table add columnsALTERTABLE tableIdentifier ADDCOLUMNS(colAndType (,colAndType)*)-- Alter table column typeALTERTABLE tableIdentifier CHANGE COLUMN colName colName colType
-- Alter table propertiesALTERTABLE tableIdentifier SET TBLPROPERTIES (key='value')

案例:

--rename to:ALTERTABLE hudi_cow_nonpcf_tbl RENAMETO hudi_cow_nonpcf_tbl2;--add column:ALTERTABLE hudi_cow_nonpcf_tbl2 addcolumns(remark string);--change column:ALTERTABLE hudi_cow_nonpcf_tbl2 change column uuid uuid int;--set properties;altertable hudi_cow_nonpcf_tbl2 set tblproperties (hoodie.keep.max.commits ='10');

修改分区

语法:

-- Drop PartitionALTERTABLE tableIdentifier DROPPARTITION( partition_col_name = partition_col_val [,...])-- Show PartitionsSHOW PARTITIONS tableIdentifier

案例:

--show partition:show partitions hudi_cow_pt_tbl1;--drop partition:altertable hudi_cow_pt_tbl1 droppartition(dt='2021-12-09', hh='10');

注意:show partition 结果是基于文件系统表路径的。删除整个分区数据或直接删除某个分区目录并不精确。

存储过程(Procedures)

语法:

--Call procedure by positional argumentsCALL system.procedure_name(arg_1, arg_2,... arg_n)--Call procedure by named argumentsCALL system.procedure_name(arg_name_2 => arg_2, arg_name_1 => 
                           arg_1,... arg_name_n => arg_n)

案例:

可用的存储过程:https://hudi.apache.org/docs/procedures/

--show commit's infocall show_commits(table=>'hudi_cow_pt_tbl1',limit=>10);
标签: spark sql 大数据

本文转载自: https://blog.csdn.net/qq_44766883/article/details/129016533
版权归原作者 迷雾总会解 所有, 如有侵权,请联系我们删除。

“Hudi-集成Spark之spark-sql方式”的评论:

还没有评论