0


Hudi(23):Hudi集成Hive之同步

0. 相关文章链接

** Hudi文章汇总 **

1. Flink同步Hive

1.1. 使用方式

    Flink hive sync 现在支持两种 hive sync mode, 分别是 hms 和 jdbc 模式。 其中 hms 只需要配置 metastore uris;而 jdbc 模式需要同时配置 jdbc 属性 和 metastore uris,具体配置模版如下:
-- hms mode 配置

CREATE TABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
with(
  'connector'='hudi',
  'path' = 'hdfs://xxx.xxx.xxx.xxx:9000/t1',
  'table.type'='COPY_ON_WRITE',        -- MERGE_ON_READ方式在没生成 parquet 文件前,hive不会有输出
  'hive_sync.enable'='true',           -- required,开启hive同步功能
  'hive_sync.table'='${hive_table}',              -- required, hive 新建的表名
  'hive_sync.db'='${hive_db}',             -- required, hive 新建的数据库名
  'hive_sync.mode' = 'hms',            -- required, 将hive sync mode设置为hms, 默认jdbc
  'hive_sync.metastore.uris' = 'thrift://ip:9083' -- required, metastore的端口
);

注意:核心点为上述hive_sync系列的配置。

1.2. 案例实操

CREATE TABLE t10(
  id int,
  num int,
  ts int,
  primary key (id) not enforced
)
PARTITIONED BY (num)
with(
  'connector'='hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t10',
  'table.type'='COPY_ON_WRITE', 
  'hive_sync.enable'='true', 
  'hive_sync.table'='h10', 
  'hive_sync.db'='default', 
  'hive_sync.mode' = 'hms',
  'hive_sync.metastore.uris' = 'thrift://hadoop1:9083'
);

insert into t10 values(1,1,1); 

2. Spark同步Hive

官网参数地址:Basic Configurations | Apache Hudi

2.1. 使用方式

//设置数据集注册并同步到hive
option("hoodie.datasource.hive_sync.enable","true").                         

//使用hms
option("hoodie.datasource.hive_sync.mode","hms").                         

//hivemetastore地址
option("hoodie.datasource.hive_sync.metastore.uris", "thrift://ip:9083"). 

//登入hiveserver2的用户
option("hoodie.datasource.hive_sync.username","").                          

//登入hiveserver2的密码
option("hoodie.datasource.hive_sync.password","").                    

//设置hudi与hive同步的数据库  
option("hoodie.datasource.hive_sync.database", "").                   

//设置hudi与hive同步的表名
option("hoodie.datasource.hive_sync.table", "").                        

//hive表同步的分区列
option("hoodie.datasource.hive_sync.partition_fields", "").               

// 分区提取器 按/ 提取分区
option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor"). 

2.2. 案例实操

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

val tableName = "hudi_trips_cow"
val basePath = "file:///tmp/hudi_trips_cow"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
        .withColumn("a",split(col("partitionpath"),"\\/")(0))
        .withColumn("b",split(col("partitionpath"),"\\/")(1))
        .withColumn("c",split(col("partitionpath"),"\\/")(2))
        
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option("hoodie.table.name", tableName). 
  option("hoodie.datasource.hive_sync.enable","true").
  option("hoodie.datasource.hive_sync.mode","hms").
  option("hoodie.datasource.hive_sync.metastore.uris", "thrift://hadoop1:9083").
  option("hoodie.datasource.hive_sync.database", "default").
  option("hoodie.datasource.hive_sync.table", "spark_hudi").
  option("hoodie.datasource.hive_sync.partition_fields", "a,b,c").
  option("hoodie.datasource.hive_sync.partition_extractor_class", "org.apache.hudi.hive.MultiPartKeysValueExtractor").
  mode(Overwrite).
  save(basePath)

注:**其他Hudi相关文章链接由此进 -> Hudi文章汇总 **



本文转载自: https://blog.csdn.net/yang_shibiao/article/details/128750247
版权归原作者 电光闪烁 所有, 如有侵权,请联系我们删除。

“Hudi(23):Hudi集成Hive之同步”的评论:

还没有评论