0


Hudi数据湖-基于Flink、Spark湖仓一体、实时入湖保姆级教学

目录

Hudi源码编译

第一步:下载Maven并安装且配置Maven镜像

第二步:下载Hudi源码包(要求对应Hadoop版本、Spark版本、Flink版本、Hive版本)

第三步:执行编译命令,完成之后运行

hudi-cli

脚本,如果可以运行,则说明编译成功

https://github.com/apache/hudi

官网查看不同版本的编译命令,要一一对应

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FBqXKEAY-1654954849724)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112136722.png)]

image-20220611104825549

Hudi大数据环境准备

安装HDFS

安装Spark

安装Flink

Hudi扫盲

表数据结构

Hudi表的数据文件,分为两类,一类是

.hoodie

文件,一类是实际的数据文件

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-5g4FprIQ-1654955084436)(../AppData/Roaming/Typora/typora-user-images/image-20220611124122225.png)]

.hoodie文件

:由于CRUD的零散性,每一次的操作都会生成一个文件,这些小文件越来越多以后,会严重影响HDFS的性能,Hudi因此设计了一套文件合并机制,

.hoodie

文件夹中存放了对应的文件合并操作相关的日志文件。

Hudi把随着时间流逝,对表的一系列CRUD操作叫做TimeLine,TimeLine中某一次的操作,叫做Instant。

Instant Action:记录本次操作是一次数据提交commit,还是文件合并compaction,或者是文件清理cleans

Instant Time:本次操作的时间

Instant State:操作的状态,发起requested、进行中inflight、已完成commit

在这里插入图片描述

数据文件:Hudi真实的数据文件使用Parquet文件格式存储,其中包含了一个metadata元数据文件和数据文件parquet列式存储。

Hudi为了实现数据的CURD,需要能够唯一标识一条记录,Hudi将把数据集中的唯一字段(record key)+数据所在分区(partitionPath)联合起来当作数据的唯一主键。

在这里插入图片描述

基于Spark-shell集成Hudi

目的:使用spark-shell命令行,以本地模式方式运行,模拟产生trip乘车交易数据,将其保存至Hudi表中,并且从Hudi表加载数据查询分析,其中Hudi表数据最后存储在HDFS分布式文件系统中。

启动spark-shell

以下命令需要联网,

--packages

会基于ivy下载相关jar包到本地,然后加载到CLASSPATH中

spark-shell \
  --packages org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.0 \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

通过

--jars

指定本地依赖(需要把jar包放入Spark的lib目录下,或者指定jar包的全局路径

bin/spark-shell \
  -- jars hudi-spark3.2-bundle_2.12-0.11.0.jar \
  --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
  --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
  --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9LZst6KA-1654954849734)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112138684.png)]

导入相关依赖,构建DataGenerator模拟生成乘车数据

# 导入Spark和Hudi的相关依赖包
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._

# 定义表的名称和数据的存储路径
val tableName = "hudi_trips_cow"
val basePath = "hdfs://bigdata:8020/datas/hudi-warehouse/hudi_trips_cow"
val dataGen = new DataGenerator

# 模拟生成10条数据
val inserts = convertToStringList(dataGen.generateInserts(10))

# 将模拟数据转化为DataFrame数据集
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-l2rEN5kr-1654954849735)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112137284.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wZ15mtpx-1654954849736)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112137842.png)]

插入数据

将模拟产生的数据保存到Hudi表中,直接通过format指定数据源source,设置相关属性保存即可

df.write.format("hudi").
  options(getQuickstartWriteConfigs).option(PRECOMBINE_FIELD_OPT_KEY,"ts").option(RECORDKEY_FIELD_OPT_KEY,"uuid").option(PARTITIONPATH_FIELD_OPT_KEY,"partitionpath").option(TABLE_NAME, tableName).mode(Overwrite).save(basePath)
getQuickstartWriteConfigs

:设置写入/更新数据至Hudi时,Shuffle时分区数目,可以在源码中看到

image-20220611122056858

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RRC4As8K-1654954849738)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112138547.png)]

PRECOMBINE_FIELD_OPT_KEY

:数据合并时,依据主键字段·

RECORDKEY_FIELD_OPT_KEY

:每条记录的唯一id,支持多个字段

PARTITIONPATH_FIELD_OPT_KEY

:用于存放数据的分区字段

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-OfWtV7kS-1654954849739)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112138172.png)]

数据保存之后,可以在HDFS上看到,路径为设置的save路径,并且可以看到,数据是以PARQUET列式方式进行存储的

查询数据

从Hudi表中读取数据,通过指定format数据源和相关参数Options即可

val tripsSnapshotDF = spark.
  read.
  format("hudi").
  load(basePath)

# 打印获取Hudi表数据的Schema信息
tripsSnapshotDF.printSchema()

比原先保存到Hudi表中的数据多了5个字段,这些字段属于Hudi管理数据时使用的相关字段

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-u10aEFxN-1654954849739)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112138001.png)]

将获取Hudi表数据注册为临时视图,采用SQL方式依据业务查询分析数据

tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")# 查询乘车费用大于20的信息数据
spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()# 选取字段查询数据
spark.sql("select _hoodie_commit_time, _hoodie_record_key, _hoodie_partition_path, rider, driver, fare from  hudi_trips_snapshot").show()

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WevyWBSu-1654954849740)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112138553.png)]

基于Spark-Hive集成Hudi手动创建HIVE表

环境准备

在Hive中创建表关联至Hudi表,将集成JAR包

hudi-hadoop-mr-bundle-0.11.0.jar

,放入HIVE的 lib目录下

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gVaX0i2p-1654954849740)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139732.png)]

创建数据库和表

连接HIVE

# 启动metastore
hive --service metastore
# 启动hiveserver2
hiveserver2
# 启动beeline
bin/beeline -u jdbc:hive2://bigdata:10000 -n root -p root

通过Spark将数据写入Hudi表之后,要想通过Hive访问到这块数据,就需要创建一个Hive外部表,因为Hudi配置了分区,所以为了能读到所有的数据,此时外部表也得分区,分区字段名可随意配置。

# 创建数据库createdatabase db_hudi;# 使用数据库use db_hudi;# 创建hive外部表CREATE EXTERNAL TABLE`tbl_hudi_trips`(`begin_lat`double,`begin_lon`double,`driver` string,`end_lat`double,`end_lon`double,`fare`double,`partitionpath` string,`rider` string,`uuid` string,`ts`bigint)
PARTITIONED BY(country string,province string,city string)ROW FORMAT SERDE                                   
   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
 STORED AS INPUTFORMAT                              
   'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
 OUTPUTFORMAT                                       
   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
 LOCATION                                           
   '/datas/hudi-warehouse/hudi_trips_cow';

由于Hudi表属于分区表,所以Hive创建的表也是分区表,需要手动添加分区数据

# 分配分区altertable tbl_hudi_trips addifnotexistspartition(`country`='americas',`province`='brazil',`city`='sao_paulo') location '/datas/hudi-warehouse/hudi_trips_cow/americas/brazil/sao_paulo';altertable tbl_hudi_trips addifnotexistspartition(`country`='americas',`province`='united_states',`city`='san_francisco') location '/datas/hudi-warehouse/hudi_trips_cow/americas/united_states/san_francisco';altertable tbl_hudi_trips addifnotexistspartition(`country`='asia',`province`='india',`city`='chennai') location '/datas/hudi-warehouse/hudi_trips_cow/asia/india/chennai';# 查看分区SHOW PARTITIONS tbl_hudi_trips;# 查询表select*from tbl_hudi_trips;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AnWW3voX-1654954849741)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139386.png)]

基于SparkSQL集成Hudi自动创建HIVE表

启动Spark-sql,给个版本启动方式根据官网说明

spark-sql \
--master local[2] \
-- jars /opt/module/spark-3.2.1-bin-hadoop3.2/jars/hudi-spark3.2-bundle_2.12-0.11.0.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

设置参数,Hudi的默认并发度是1500

set hoodie.upsert.shuffle.parallelism=1;
set hoodie.insert.shuffle.parallelism=1;
set hoodie.delete.shuffle.parallelism=1;

设置不同步Hudi表元数据(默认为true,会自动根据SparkSQL设置的进行分区,置为false,需要手动建立分区)

set hoodie.datasource.meta.sync.enable=false;

创建表,表的类型为COW,主键为uuid,分区字段为partitionpath,合并字段默认为ts

# 创建表时,指定location存储路径,表就是外部表CREATETABLE test_hudi_hive(`begin_lat`double,`begin_lon`double,`driver` string,`end_lat`double,`end_lon`double,`fare`double,`partitionpath` string,`rider` string,`uuid` string,`ts`bigint)using hudi
PARTITIONED BY(partitionpath)
options (
 primarykey='uuid',type='cow')
location 'hdfs://bigdata:8020/datas/hudi-warehouse/hudi_spark_sql/';# 查看创建后的表showcreatetable test_hudi_hive;# 插入数据,会建立一个china分区insertinto test_hudi_hive select0.1as begin_lat,0.2as begin_lon,'driver-1'as driver,0.3as end_lat,0.4as end_lon,35as fare,'china'as partitionpath,'rider-1'as rider,'uuid-1'as uuid,1654447758530as ts;# 插入数据,会建立一个america分区insertinto test_hudi_hive select0.1as begin_lat,0.2as begin_lon,'driver-2'as driver,0.3as end_lat,0.4as end_lon,35as fare,'america'as partitionpath,'rider-2'as rider,'uuid-2'as uuid,1654447758531as ts;

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-9xDRA3Na-1654954849742)(../AppData/Roaming/Typora/typora-user-images/image-20220611155639086.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GNr0IrLq-1654954849743)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139952.png)]

基于FlinkSQL集成Hudi

Flink集成Hudi时,需要把

hudi-flink1.13-bundle_2.11-0.11.0.jar

,放到FLINK_HOME的lib目录下

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-lEqqaxRG-1654954849744)(../AppData/Roaming/Typora/typora-user-images/image-20220611163947941.png)]

根据官网的说明,修改conf下的配置为文件

flink-conf.yaml

。给TaskManager分配Slots>=4

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vNQ3Fzl1-1654954849745)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139076.png)]

配置之后启动FLINK集群

bin/start-cluster.sh

,如果在后续操作中报错类似找不到hadoop的类,则需要暴露一下hadoop的环境变量

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

启动FLINK SQL CLI

bin/sql-client.sh --embedded

创建表table

将数据存储到Hudi表中,底层HDFS存储,表的类型为MOR

# 更方便的展示数据set execution.result-mode=tableau;CREATETABLE t1(
    uuid VARCHAR(20), 
    name VARCHAR(10),
    age INT,
    ts TIMESTAMP(3),`partition`VARCHAR(20))
PARTITIONED BY(`partition`)WITH('connector'='hudi','path'='hdfs://bigdata:8020/hudi-warehouse/hudi-t1','write.tasks'='1','compaction.tasks'='1','table.type'='MERGE_ON_READ');

插入数据

INSERTINTO t1 VALUES('id2','Stephen',33,TIMESTAMP'1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP'1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP'1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP'1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP'1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP'1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP'1970-01-01 00:00:08','par4');

在Flink集群上验证任务的执行状态,和Hadoop落盘的数据,已经验证SQL查询

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yDGIl6Sw-1654954849746)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139213.png)]

image-20220611164711215

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-f1Gpe36i-1654954849747)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139257.png)]

基于FlinkSQL-HIVE集成Hudi手动创建HIVE表

# 一定要设置checkpointing的时间,要么命令行,要么配置文件中
set execution.checkpointing.interval=3sec;

创建输出表,

MERGE_ON_READ

类型的会生成log文件,不会生成parquet文件。

COPY ON WRITE

类型的会生成parquet文件,不会生成log文件。

CREATETABLE flink_hudi_sink (
  uuid  STRING PRIMARYKEYNOT ENFORCED,
  name  STRING,
  age  INT,
  ts  STRING,`partition` STRING
)
PARTITIONED BY(`partition`)WITH('connector'='hudi','path'='hdfs://bigdata:8020/hudi-warehouse/flink_hudi_sink','table.type'='COPY_ON_WRITE','write.operation'='upsert','hoodie.datasource.write.recordkey.field'='uuid','write.precombine.field'='ts','write.tasks'='1','compaction.tasks'='1','compaction.async.enabled'='true','compaction.trigger.strategy'='num_commits','compaction.delta_commits'='1');INSERTINTO flink_hudi_sink VALUES('id2','Stephen',33,'1970-01-01 00:00:02','par1'),('id3','Julian',53,'1970-01-01 00:00:03','par2'),('id4','Fabian',31,'1970-01-01 00:00:04','par2'),('id5','Sophia',18,'1970-01-01 00:00:05','par3'),('id6','Emma',20,'1970-01-01 00:00:06','par3'),('id7','Bob',44,'1970-01-01 00:00:07','par4'),('id8','Han',56,'1970-01-01 00:00:08','par4');

由于Hive的读取,需要有生成的parquet文件,因此

 'table.type' = 'COPY ON WRITE'

是必须的。如果看到Hadoop中生成了parquet文件说明成功,则只需要手动创建Hive外部表和指定分区关联Hudi表即可。

# 创建hive外部表
CREATE EXTERNAL TABLE `flink_hudi_sink`(                                  
   `uuid` string,     
   `name` string, 
   `age` int, 
   `ts` string, 
   `partition` string
)
PARTITIONED BY (part string) 
ROW FORMAT SERDE                                   
   'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'  
 STORED AS INPUTFORMAT                              
   'org.apache.hudi.hadoop.HoodieParquetInputFormat' 
 OUTPUTFORMAT                                       
   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' 
 LOCATION                                           
   '/hudi-warehouse/flink_hudi_sink';
   
# 分配分区
alter table tbl_hudi_trips add if not exists partition(`part`='pat1') location '/hudi-warehouse/flink_hudi_sink/par1';

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-v2aaYh6j-1654954849748)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139206.png)]

基于FlinkSQL集成Hudi-自动创建Hive表

Hudi集成Flink和Hive编译依赖版本配置,查看源码包

hudi-0.11.0/packaging/hudi-flink-bundle

下的pom.xml文件,Hive2的打包和Hive3的打包不一样,根据自身的HIVE类型选择,打包命令如下,具体的版本根据官网模仿

mvn clean install -DskipTests -Drat.skip=true -Dflink1.13 -Dscala-2.11 -Pflink-bundle-shade-hive3

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QQIFoPBp-1654954849748)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112139046.png)]

编译之后将

hudi-flink1.13-bundle_2.11-0.11.0.jar

拷贝到FLINK的lib目录下和

hudi-hadoop-mr-bundle-0.11.0.jar

拷贝到HIVE的lib目录下(具体根据版本而定)

以COPY ON WRITE方式创建表自动同步HIVE分区表

set execution.checkpointing.interval=3sec;setsql-client.execution.result-mode=tableau;CREATETABLE czs_hudi_hive (
  uuid  STRING PRIMARYKEYNOT ENFORCED,
  name  STRING,
  age  INT,
  ts  STRING,`partition` STRING
)
PARTITIONED BY(`partition`)WITH('connector'='hudi','path'='hdfs://bigdata:8020/czs_hudi_hive','table.type'='COPY_ON_WRITE','hoodie.datasource.write.recordkey.field'='uuid','write.precombine.field'='ts','write.tasks'='1','write.rate.limit'='2000','compaction.tasks'='1','compaction.async.enabled'='true','compaction.trigger.strategy'='num_commits','compaction.delta_commits'='1','changelog.enabled'='true','read.streaming.enabled'='true','read.streaming.check-interval'='3','hive_sync.enable'='true','hive_sync.mode'='hms','hive_sync.metastore.uris'='thrift://bigdata:9083','hive_sync.jdbc_url'='jdbc:hive2://bigdata:10000','hive_sync.table'='czs_hive','hive_sync.db'='default','hive_sync.username'='root','hive_sync.password'='root','hive_sync.support_timestamp'='true');INSERTINTO czs_hudi_hive VALUES('id2','Stephen',33,'1970-01-01 00:00:02','par1'),('id3','Julian',53,'1970-01-01 00:00:03','par2'),('id4','Fabian',31,'1970-01-01 00:00:04','par2'),('id5','Sophia',18,'1970-01-01 00:00:05','par3'),('id6','Emma',20,'1970-01-01 00:00:06','par3'),('id7','Bob',44,'1970-01-01 00:00:07','par4'),('id8','Han',56,'1970-01-01 00:00:08','par4');

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-LiGot2KY-1654954849749)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112140568.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bbd1PvVA-1654954849755)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112140315.png)]

基于FlinkCDC采集MySQL写入Hudi

添加FlinkCDC依赖

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ysFnlECs-1654954849756)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112140492.png)]

添加Hadoop依赖

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

创建MERGE_ON_READ类型的Hudi表

set execution.checkpointing.interval=3sec;setsql-client.execution.result-mode=tableau;CREATETABLE users_source_mysql (
  id BIGINTPRIMARYKEYNOT ENFORCED,
  name STRING,
  birthday TIMESTAMP(3),
  ts TIMESTAMP(3))WITH('connector'='mysql-cdc','hostname'='bigdata','port'='3306','username'='root','password'='root','server-time-zone'='Asia/Shanghai','debezium.snapshot.mode'='initial','database-name'='hudi','table-name'='tbl_users');createview view_users_cdc 
ASSELECT*, DATE_FORMAT(birthday,'yyyyMMdd')as part FROM users_source_mysql;CREATETABLE users_sink_hudi_hive(
id bigint,
name string,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
part VARCHAR(20),primarykey(id)not enforced
)
PARTITIONED BY(part)with('connector'='hudi','path'='hdfs://bigdata:8020/czs_hudi_hive_test','table.type'='MERGE_ON_READ','hoodie.datasource.write.recordkey.field'='id','write.precombine.field'='ts','write.tasks'='1','write.rate.limit'='2000','compaction.tasks'='1','compaction.async.enabled'='true','compaction.trigger.strategy'='num_commits','compaction.delta_commits'='1','changelog.enabled'='true','read.streaming.enabled'='true','read.streaming.check-interval'='3','hive_sync.enable'='true','hive_sync.mode'='hms','hive_sync.metastore.uris'='thrift://bigdata:9083','hive_sync.jdbc_url'='jdbc:hive2://bigdata:10000','hive_sync.table'='czs_hive','hive_sync.db'='default','hive_sync.username'='root','hive_sync.password'='root','hive_sync.support_timestamp'='true');INSERTINTO users_sink_hudi_hive SELECT id, name, birthday, ts, part FROM view_users_cdc ;

自动生成Hudi MOR模式的两张表

xxx _ro

:ro表全称为

read oprimized table

对于MOR表同步的xxx_ro表,只暴露压缩后的parquet。

xxx_rt

:rt表示增量视图,主要针对增量查询的rt表,

ro表只能查询parquet文件数据,rt表parquet文件数据和log文件数据都可查

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RTWHd0oj-1654954849757)(https://raw.githubusercontent.com/czshh0628/blogs/master/202206112140612.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hxysH7rq-1654954849757)(../AppData/Roaming/Typora/typora-user-images/image-20220611213057980.png)]

在这里插入图片描述

基于FlinkCDC采集PostgreSQL写入Hudi

设置PG的日志级别

修改

wal_level = logical

,重启数据库,使配置生效

vim /var/lib/pgsql/14/data/postgresql.conf

在这里插入图片描述

添加FlinkCDC依赖

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ZCRhYlPA-1655088688110)(../AppData/Roaming/Typora/typora-user-images/image-20220613104822842.png)]

添加Hadoop依赖

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

创建MERGE_ON_READ类型的Hudi表

set execution.checkpointing.interval=3sec;setsql-client.execution.result-mode=tableau;CREATETABLE pg_source(
  id INT,
  username STRING,PRIMARYKEY(id)NOT ENFORCED 
)WITH('connector'='postgres-cdc','hostname'='bigdata','port'='5432','username'='postgres','password'='postgres','database-name'='mydb','schema-name'='public','decoding.plugin.name'='pgoutput','table-name'='test');CREATETABLE pg_hudi_hive(
id bigint,
name string,primarykey(id)not enforced
)
PARTITIONED BY(name)with('connector'='hudi','path'='hdfs://bigdata:8020/pg_hive_test','table.type'='MERGE_ON_READ','hoodie.datasource.write.recordkey.field'='id','write.precombine.field'='name','write.tasks'='1','write.rate.limit'='2000','compaction.tasks'='1','compaction.async.enabled'='true','compaction.trigger.strategy'='num_commits','compaction.delta_commits'='1','changelog.enabled'='true','read.streaming.enabled'='true','read.streaming.check-interval'='3','hive_sync.enable'='true','hive_sync.mode'='hms','hive_sync.metastore.uris'='thrift://bigdata:9083','hive_sync.jdbc_url'='jdbc:hive2://bigdata:10000','hive_sync.table'='hudi_hive','hive_sync.db'='default','hive_sync.username'='root','hive_sync.password'='root','hive_sync.support_timestamp'='true');INSERTINTO pg_hudi_hive SELECT id,username FROM pg_source ;

在这里插入图片描述

标签: spark flink big data

本文转载自: https://blog.csdn.net/xiaoyixiao_/article/details/125239236
版权归原作者 心沉不是心沉 所有, 如有侵权,请联系我们删除。

“Hudi数据湖-基于Flink、Spark湖仓一体、实时入湖保姆级教学”的评论:

还没有评论