0


数据湖架构Hudi(五)Hudi集成Flink案例详解

五、Hudi集成Flink案例详解

5.1 hudi集成flink

flink的下载地址:

https://archive.apache.org/dist/flink/

HudiSupported Flink version0.12.x****1.15.x1.14.x1.13.x0.11.x1.14.x1.13.x0.10.x1.13.x0.9.01.12.2

  • 将上述编译好的安装包拷贝到flink下的jars目录中:
cp /opt/apps/hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.13-bundle-0.12.0.jar /opt/apps/flink-1.13.6/lib/
  • 拷贝guava包,解决依赖冲突
cp /opt/apps/hadoop-3.1.3/share/hadoop/common/lib/guava-27.0-jre.jar /opt/apps/flink-1.13.6/lib/
  • 配置Hadoop环境变量
vim /etc/profile.d/my_env.sh

exportHADOOP_CLASSPATH=`hadoop classpath`exportHADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

source /etc/profile.d/my_env.sh

5.2 sql-client之yarn-session模式

配置hadoop调度器yarn

mapred-site.xml

<configuration><!-- 指定MapReduce作业执⾏时,使⽤YARN进⾏资源调度 --><property><name>mapreduce.framework.name</name><value>yarn</value></property><property><name>yarn.app.mapreduce.am.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property><property><name>mapreduce.map.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property><property><name>mapreduce.reduce.env</name><value>HADOOP_MAPRED_HOME=/opt/apps/hadoop-3.1.3</value></property></configuration>

yarn-site.xml

<configuration><!-- 设置ResourceManager --><property><name>yarn.resourcemanager.hostname</name><value>centos04</value></property><!--配置yarn的shuffle服务--><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property></configuration>

hadoop-env.sh
# 在最后面添加如下:
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

# 记得配置sql-client-defaults.yaml

5.2.1 启动

# 1、修改配置文件vim /opt/apps/flink-1.13.6/conf/flink-conf.yaml

classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4

 
state.backend: rocksdb
execution.checkpointing.interval: 30000# 开启ck,才能快速从内存中flush出去
state.checkpoints.dir: hdfs://centos04:9000/ckps
state.backend.incremental: true# 2、yarn-session模式启动# 解决依赖问题cp /opt/apps/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar /opt/apps/flink-1.13.6/lib/

# 启动yarn-session
/opt/apps/flink-1.13.6/bin/yarn-session.sh -d# 启动sql-client
/opt/apps/flink-1.13.6/bin/sql-client.sh embedded -s yarn-session

5.2.2 插入数据

setsql-client.execution.result-mode=tableau;-- 创建hudi表CREATETABLE t1(
  uuid VARCHAR(20)PRIMARYKEYNOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),`partition`VARCHAR(20))
PARTITIONED BY(`partition`)WITH('connector'='hudi','path'='hdfs://centos04:9000/tmp/hudi_flink/t1','table.type'='MERGE_ON_READ'-- 默认是COW);

或如下写法
CREATETABLE t1(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),`partition`VARCHAR(20),PRIMARYKEY(uuid)NOT ENFORCED
)
PARTITIONED BY(`partition`)WITH('connector'='hudi','path'='hdfs://centos04:9000/tmp/hudi_flink/t1','table.type'='MERGE_ON_READ');-- 插入数据INSERTINTO t1 VALUES('id1','Danny',23,TIMESTAMP'1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP'1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP'1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP'1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP'1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP'1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP'1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP'1970-01-01 00:00:08','par4');-- 查询数据select*from t1;

5.2.3 流式插入

-- 1、创建测试表CREATETABLE sourceT (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),`partition`varchar(20))WITH('connector'='datagen','rows-per-second'='1');createtable t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),`partition`varchar(20))with('connector'='hudi','path'='/tmp/hudi_flink/t2','table.type'='MERGE_ON_READ');-- 2、执行插入insertinto t2 select*from sourceT;

查询结果
setsql-client.execution.result-mode=tableau;
Flink SQL>select*from t2 limit10;-- 会产生一个collect的flink任务,拉取10条数据,注意:不是流读取2023-03-0622:45:10,403 INFO  org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient []- SASL encryption trust check: localHostTrusted =false, remoteHostTrusted =false2023-03-0622:45:12,897 INFO  org.apache.hadoop.yarn.client.RMProxy                        []- Connecting to ResourceManager at centos04/192.168.42.104:80322023-03-0622:45:12,899 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  []-No path for the flink jar passed.Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-03-0622:45:12,918 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  []- Found Web Interface centos04:45452of application 'application_1678113536312_0001'.+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+| op |                           uuid |                           name |         age |                      ts |partition|+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+|+I | d0523c31d3da5b8e2a8ff676dcf...|327db70824413c5dcde0a7ac10c...|1971040768|2023-03-0614:40:58.780|42b45346672bf719b5393232763...||+I | cfc07cbebf6890a04942ec88947...|36fc7a58aab88835f11b3b51a40...|-12199364|2023-03-0614:41:05.781| e33c02173f4c744fb9c1c68e774...||+I |668b204a933494a89b829c76bc6...| aa9ff2109457fdcd5f099b8ce98...|2061449955|2023-03-0614:41:14.780|680514e53b196324423cd12cda5...||+I |95fe7878909a801c2726f1d05f5...|1c86b29fe313e557688df0ba950...|519997290|2023-03-0614:41:11.781| b9817c52301ab4614c3053c9ccc...||+I |8661c25c8c930f4660fbefa867e...|01a2bee6b99064c7bca9513ca37...|-682830738|2023-03-0614:41:32.781|16ab837502a31e208b06bb74efd...||+I |55ce03895e229b29546dbdd2ff3...|77f2552de13337e8092c1445654...|2011273584|2023-03-0614:41:09.780|3fd688cfa17b2a3a6fd3ffac6bd...||+I |50c23f315d736c313b652b34fc5...|4f9c84ff75466fba8e800daabd0...|-190184764|2023-03-0614:42:26.780|7f2a07a1007b2fbfea8cbb2062e...||+I |8073e8c70a9bc0e79c2e69aa885...|30bf89c80d9ab0f0a8f5f883ee6...|-1639873427|2023-03-0614:41:24.781|15df7d527d6d7edae496e76d02f...||+I |29a61b7cd348d08498d2b089a5d...|77a63ca7a2e77e6d167de20c673...|71527378|2023-03-0614:42:14.781|2842db44a691f4f1d597ac79086...||+I | e5defc24191f60557644b7d14e2...|56bdd04424b8f422d4075ade510...|1054223989|2023-03-0614:40:42.781| e8d2d3c6fed90d37b15647d1ecd...|+----+--------------------------------+--------------------------------+-------------+-------------------------+--------------------------------+

在这里插入图片描述

在这里插入图片描述

5.3 使用IDEA开发

除了用sql-client,还可以自己编写FlinkSQL程序,打包提交Flink作业。

1、首先,需要将hudi集成flink的jar包,装载到本地的仓库,命令如下:

D:\bigdata\hudi从入门到精通\apps>mvn install:install-file -DgroupId=org.apache.hudi -DartifactId=hudi-flink_2.12 -Dversion=0.12.0 -Dpackaging=jar -Dfile=./hudi-flink1.13-bundle-0.12.0.jar

[INFO] Scanning for projects...
[INFO][INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM)1[INFO] --------------------------------[ pom ]---------------------------------
[INFO][INFO] --- maven-install-plugin:2.4:install-file (default-cli) @ standalone-pom ---
[INFO] Installing D:\bigdata\hudi从入门到精通\apps\hudi-flink1.13-bundle-0.12.0.jar to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.jar
[INFO] Installing C:\Users\Undo\AppData\Local\Temp\mvninstall50353756903805721.pom to D:\doit\apps\repository\org\apache\hudi\hudi-flink_2.12\0.12.0\hudi-flink_2.12-0.12.0.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  1.111 s
[INFO] Finished at: 2023-03-02T10:08:15+08:00
[INFO] ------------------------------------------------------------------------

2、导入pom文件

<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><parent><artifactId>hudi-start</artifactId><groupId>com.yyds</groupId><version>1.0-SNAPSHOT</version></parent><modelVersion>4.0.0</modelVersion><artifactId>hudi-flink</artifactId><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><flink.version>1.13.6</flink.version><hudi.version>0.12.0</hudi.version><java.version>1.8</java.version><scala.binary.version>2.12</scala.binary.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope><!--不会打包到依赖中,只参与编译,不参与运行 --></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!--idea运行时也有webui--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version><scope>provided</scope></dependency><!--手动install到本地maven仓库,要不然会报错--><dependency><groupId>org.apache.hudi</groupId><artifactId>hudi-flink_2.12</artifactId><version>${hudi.version}</version><scope>provided</scope></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.2.4</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformerscombine.children="append"><transformerimplementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"></transformer></transformers></configuration></execution></executions></plugin></plugins></build></project>
packagecom.yyds.hudi.flink;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.configuration.RestOptions;importorg.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;importorg.apache.flink.contrib.streaming.state.PredefinedOptions;importorg.apache.flink.streaming.api.CheckpointingMode;importorg.apache.flink.streaming.api.environment.CheckpointConfig;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importjava.util.concurrent.TimeUnit;publicclassHudiTest{publicstaticvoidmain(String[] args){System.setProperty("HADOOP_USER_NAME","root");// 1、创建flinksql的执行环境Configuration conf =newConfiguration();
        conf.setString(RestOptions.BIND_PORT,"8081-8089");StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);StreamTableEnvironment tabEnv =StreamTableEnvironment.create(env);// 注意:需要设置check-point// 设置状态后端RocksDBEmbeddedRocksDBStateBackend embeddedRocksDBStateBackend =newEmbeddedRocksDBStateBackend(true);
        embeddedRocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM);
        env.setStateBackend(embeddedRocksDBStateBackend);// checkpoint配置
        env.enableCheckpointing(TimeUnit.SECONDS.toMillis(30),CheckpointingMode.EXACTLY_ONCE);CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointStorage("hdfs://centos04:9000/ckps");
        checkpointConfig.setMinPauseBetweenCheckpoints(TimeUnit.SECONDS.toMillis(20));
        checkpointConfig.setTolerableCheckpointFailureNumber(5);
        checkpointConfig.setCheckpointTimeout(TimeUnit.MINUTES.toMillis(1));
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 2、使用flink自带connector模拟数据
        tabEnv.executeSql("CREATE TABLE sourceT (\n"+"  uuid varchar(20),\n"+"  name varchar(10),\n"+"  age int,\n"+"  ts timestamp(3),\n"+"  `partition` varchar(20)\n"+") WITH (\n"+"  'connector' = 'datagen',\n"+"  'rows-per-second' = '1'\n"+")");// 3、创建hudi表
        tabEnv.executeSql("create table t2(\n"+"  uuid varchar(20),\n"+"  name varchar(10),\n"+"  age int,\n"+"  ts timestamp(3),\n"+"  `partition` varchar(20)\n"+")\n"+"with (\n"+"  'connector' = 'hudi',\n"+// 指定connector为hudi"  'path' = 'hdfs://192.168.42.104:9000/datas/hudi_warehouse/hudi_flink/t2',\n"+"  'table.type' = 'MERGE_ON_READ'\n"+// MOR类型的表")");// 4、将模拟产生的数据,写入到Hudi表中
        tabEnv.executeSql("insert into t2 select * from sourceT");}}

jar包运行

bin/flink run -t yarn-per-job \-c com.yyds.hudi.flink.HudiTest \
./myjars/hudi-flink-1.0-SNAPSHOT.jar

类型映射
Flink SQL TypeHudi TypeAvro logical typeCHAR / VARCHAR / STRINGstringBOOLEANbooleanBINARY / VARBINARYbytesDECIMALfixeddecimalTINYINTintSMALLINTintINTintBIGINTlongFLOATfloatDOUBLEdoubleDATEintdateTIMEinttime-millisTIMESTAMPlongtimestamp-millisARRAYarrayMAP(key must be string/char/varchar type)mapMULTISET(element must be string/char/varchar type)mapROWrecord

5.4 hudi核心参数

5.4.1 去重参数

-- 通过如下语法设置主键:-- 设置单个主键createtable hoodie_table (
  f0 intprimarykeynot enforced,
  f1 varchar(20),...)with('connector'='hudi',...)-- 设置联合主键createtable hoodie_table (
  f0 int,
  f1 varchar(20),...primarykey(f0, f1)not enforced
)with('connector'='hudi',...)

名称说明默认值备注hoodie.datasource.write.recordkey.field主键字段–支持主键语法 PRIMARY KEY 设置,支持逗号分隔的多个字段precombine.field(0.13.0 之前版本为 write.precombine.field)去重时间字段–record 合并的时候会按照该字段排序,选值较大的 record 为合并结果;不指定则为处理序:选择后到的 record

5.4.2 并发参数

名称说明默认值备注write.taskswriter 的并发,每个 writer 顺序写 1~N 个 buckets4增加并发对小文件个数没影响write.bucket_assign.tasksbucket assigner 的并发Flink的并行度增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件(小 bucket) 数write.index_bootstrap.tasksIndex bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 checkpoint,因此需要设置多一些的 checkpoint 失败容忍次数Flink的并行度只在 index.bootstrap.enabled 为 true 时生效read.tasks读算子的并发(batch 和 stream)4compaction.tasksonline compaction 算子的并发writer 的并发online compaction 比较耗费资源,建议走 offline compaction
可以flink建表时在with中指定,或Hints临时指定参数的方式:在需要调整的表名后面加上

 /*+ OPTIONS() */
案例如下:
insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */
select * from sourceT;# 从下图可以看出,writer 的并发变成了2,bucket assigner 的并发变成了3,compaction_task 变成了4

在这里插入图片描述

在这里插入图片描述

可以参考下面Hudi表读取原理,看上图。

5.4.3 压缩参数

​ 在线压缩的参数,通过设置

compaction.async.enabled =false

关闭在线压缩执行,但是调度

compaction.schedule.enabled

仍然建议开启(即上图的compact_plan_generate步骤),之后通过离线压缩直接执行

在线压缩任务 阶段性调度的压缩 plan


名称说明默认值备注compaction.schedule.enabled是否阶段性生成压缩 plantrue建议开启,即使compaction.async.enabled 关闭的情况下compaction.async.enabled是否开启异步压缩true通过关闭此参数关闭在线压缩compaction.tasks压缩 task 并发4compaction.trigger.strategy压缩策略num_commits支持四种策略:num_commits、time_elapsed、num_and_time、num_or_timecompaction.delta_commits默认策略,5 个 commits 压缩一次5compaction.delta_seconds3600compaction.max_memory压缩去重的 hash map 可用内存100(MB)资源够用的话建议调整到 1GBcompaction.target_io每个压缩 plan 的 IO 上限,默认 5GB500(GB)

案例如下:
CREATETABLE t3(
  uuid VARCHAR(20)PRIMARYKEYNOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),`partition`VARCHAR(20))WITH('connector'='hudi','path'='hdfs://centos04:9000/tmp/hudi_flink/t3','compaction.async.enabled'='true',-- 异步在线压缩'compaction.tasks'='1','compaction.schedule.enabled'='true',-- 生成压缩 plan'compaction.trigger.strategy'='num_commits',-- 压缩策略,安装commit次数进行压缩'compaction.delta_commits'='2',-- 2次进行压缩'table.type'='MERGE_ON_READ');settable.dynamic-table-options.enabled=true;insertinto t3
select*from sourceT/*+ OPTIONS('rows-per-second' = '5') */;-- 从hdfs上可以看到,flink发生两次ck,delta_commit提交两次后,将log文件进行压缩,然后生成了parquet文件。

在这里插入图片描述

5.4.4 文件大小

​ Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用。在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小。

​ 目前只有 log 文件的写入大小可以做到精确控制,parquet 文件大小按照估算值。
名称说明默认值备注hoodie.parquet.max.file.size最大可写入的 parquet 文件大小120 * 1024 * 1024默认 120MB(单位 byte)超过该大小切新的 file grouphoodie.logfile.to.parquet.compression.ratiolog文件大小转 parquet 的比率0.35hoodie 统一依据 parquet 大小来评估小文件策略hoodie.parquet.small.file.limit在写入时,hudi 会尝试先追加写已存小文件,该参数设置了小文件的大小阈值,小于该参数的文件被认为是小文件104857600默认 100MB(单位 byte)大于 100MB,小于 120MB 的文件会被忽略,避免写过度放大hoodie.copyonwrite.record.size.estimate预估的 record 大小,hoodie 会依据历史的 commits 动态估算 record 的大小,但是前提是之前有单次写入超过 hoodie.parquet.small.file.limit 大小,在未达到这个大小时会使用这个参数1024默认 1KB(单位 byte)如果作业流量比较小,可以设置下这个参数hoodie.logfile.max.sizeLogFile最大大小。这是在将Log滚转到下一个版本之前允许的最大大小。1073741824默认1GB(单位 byte)
案例如下:

CREATETABLE t4(
  uuid VARCHAR(20)PRIMARYKEYNOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),`partition`VARCHAR(20))WITH('connector'='hudi','path'='hdfs://centos04:9000/tmp/hudi_flink/t4','compaction.tasks'='1','hoodie.parquet.max.file.size'='10000',-- 最大可写入的 parquet 文件大小,设置为10 KB'hoodie.parquet.small.file.limit'='5000',-- 小文件的大小阈值,小于该参数的文件被认为是小文件 设置为5KB'table.type'='MERGE_ON_READ');settable.dynamic-table-options.enabled=true;insertinto t4
select*from sourceT /*+ OPTIONS('rows-per-second' = '5')*/;

5.4.5 hadoop参数

从 0.12.0 开始支持,如果有跨集群提交执行的需求,可以通过 sql 的 ddl 指定 per-job 级别的 hadoop 配置
名称说明默认值备注hadoop.${you option key}通过 hadoop.前缀指定 hadoop 配置项–支持同时指定多个 hadoop 配置项

5.5 内存优化

5.5.1 内存参数

名称说明默认值备注write.task.max.size一个 write task 的最大可用内存1024当前预留给 write buffer 的内存为write.task.max.size -compaction.max_memory当 write task 的内存 buffer达到阈值后会将内存里最大的 buffer flush 出去write.batch.sizeFlink 的写 task 为了提高写数据效率,会按照写 bucket 提前 buffer 数据,每个 bucket 的数据在内存达到阈值之前会一直 cache 在内存中,当阈值达到会把数据 buffer 传递给 hoodie 的 writer 执行写操作256一般不用设置,保持默认值就好write.log_block.sizehoodie 的 log writer 在收到 write task 的数据后不会马上 flush 数据,writer 是以 LogBlock 为单位往磁盘刷数据的,在 LogBlock 攒够之前 records 会以序列化字节的形式 buffer 在 writer 内部128一般不用设置,保持默认值就好write.merge.max_memoryhoodie 在 COW 写操作的时候,会有增量数据和 base file 数据 merge 的过程,增量的数据会缓存在内存的 map 结构里,这个 map 是可 spill 的,这个参数控制了 map 可以使用的堆内存大小100一般不用设置,保持默认值就好compaction.max_memory同 write.merge.max_memory: 100MB 类似,只是发生在压缩时。100如果是 online compaction,资源充足时可以开大些,比如 1GB

5.5.2 MOR

(1)state backend 换成 rocksdb (默认的 in-memory state-backend 非常吃内存)

(2)内存够的话,compaction.max_memory 调大些 (默认是 100MB 可以调到 1GB)

(3)关注 TM 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的大小,比如 TM 的内存是 4GB 跑了 2 个 StreamWriteFunction 那每个 writefunction 能分到 2GB,尽量预留一些 buffer,因为网络 buffer,TM 上其他类型 task (比如 BucketAssignFunction 也会吃些内存)

(4)需要关注 compaction 的内存变化,compaction.max_memory 控制了每个 compaction task 读 log 时可以利用的内存大小,compaction.tasks 控制了 compaction task 的并发

注意: write.task.max.size - compaction.max_memory 是预留给每个 write task 的内存 buffer

5.5.3 COW

(1)state backend 换成 rocksdb(默认的 in-memory state-backend 非常吃内存)。

(2)write.task.max.size 和 write.merge.max_memory 同时调大(默认是 1GB 和 100MB 可以调到 2GB 和 1GB)。

(3)关注 TM 分配给每个 write task 的内存,保证每个 write task 能够分配到 write.task.max.size 所配置的大小,比如 TM 的内存是 4GB 跑了 2 个 StreamWriteFunction 那每个 writefunction 能分到 2GB,尽量预留一些 buffer,因为网络 buffer,TM 上其他类型 task(比如 BucketAssignFunction 也会吃些内存)。

注意:write.task.max.size - write.merge.max_memory 是预留给每个 write task 的内存 buffer。

5.6 读取方式

5.6.1 流读

​ 当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数

read.streaming.enabled

参数开启流读模式,通过

read.start-commit

参数指定起始消费位置,支持指定 earliest 从最早消费。
名称Required默认值说明read.streaming.enabledfalsefalse设置 true 开启流读模式read.start-commitfalse最新 commit指定 ‘yyyyMMddHHmmss’ 格式的起始 commit(闭区间)read.streaming.skip_compactionfalsefalse流读时是否跳过 compaction 的 commits,跳过 compaction 有两个用途:1)避免 upsert 语义下重复消费 (compaction 的 instant 为重复数据,如果不跳过,有小概率会重复消费) 2) changelog 模式下保证语义正确性 0.11 开始,以上两个问题已经通过保留 compaction 的 instant time 修复clean.retain_commitsfalse10cleaner 最多保留的历史 commits 数,大于此数量的历史 commits 会被清理掉,changelog 模式下,这个参数可以控制 changelog 的保留时间,例如 checkpoint 周期为 5 分钟一次,默认最少保留 50 分钟的时间。

setsql-client.execution.result-mode=tableau;CREATETABLE t5(
  uuid VARCHAR(20)PRIMARYKEYNOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),`partition`VARCHAR(20))WITH('connector'='hudi','path'='hdfs://centos04:9000/tmp/hudi_flink/t5','table.type'='MERGE_ON_READ','read.streaming.enabled'='true','read.streaming.check-interval'='4'-- 默认60s);insertinto t5 select*from sourceT;select*from t5;-- 如下图,能够不断的获取数据

在这里插入图片描述

5.6.2 增量读取

从 0.10.0 开始支持。

如果有增量读取 batch 数据的需求,增量读取包含三种场景。
(1)Stream 增量消费,通过参数 read.start-commit 指定起始消费位置;
(2)Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit
(3)TimeTravel:Batch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明)

名称Required默认值说明read.start-commitfalse默认从最新 commit支持 earliest 从最早消费read.end-commitfalse默认到最新 commit

5.6.3 限流

​ 如果将全量数据(百亿数量级) 和增量先同步到 kafka,再通过 flink 流式消费的方式将库表数据直接导成 hoodie 表,因为直接消费全量部分数据:量大(吞吐高)、乱序严重(写入的 partition 随机),会导致写入性能退化,出现吞吐毛刺,这时候可以开启限速参数,保证流量平稳写入。
名称Required默认值说明write.rate.limitfalse0默认关闭限速

5.7 写入方式

5.7.1 通过flink-cdc进行写入

CDC 数据保存了完整的数据库变更,当前可通过两种途径将数据导入 hudi

在这里插入图片描述

第一种:通过 cdc-connector 直接对接 DB 的 binlog 将数据导入 hudi,优点是不依赖消息队列,缺点是对 db server 造成压力。

第二种:对接 cdc format 消费 kafka 数据导入 hudi,优点是可扩展性强,缺点是依赖 kafka。

注意:如果上游数据无法保证顺序,需要指定 write.precombine.field 字段。

1)准备MySQL表

(1)MySQL开启binlog

(2)建表

createdatabase test;use test;createtable stu3 (
 id intunsignedauto_incrementprimarykeyCOMMENT'自增id',
 name varchar(20)notnullcomment'学生名字',
 school varchar(20)notnullcomment'学校名字',
 nickname varchar(20)notnullcomment'学生小名',
 age intnotnullcomment'学生年龄',
 class_num intnotnullcomment'班级人数',
 phone bigintnotnullcomment'电话号码',
 email varchar(64)comment'家庭网络邮箱',
 ip varchar(32)comment'IP地址')engine=InnoDBdefaultcharset=utf8;

2)flink读取mysql binlog并写入kafka

(1)创建MySQL表

createtable stu3_binlog(
 id bigintnotnull,
 name string,
 school string,
 nickname string,
 age intnotnull,
 class_num intnotnull,
 phone bigintnotnull,
 email string,
 ip string,primarykey(id)not enforced
)with('connector'='mysql-cdc','hostname'='centos01','port'='3306','username'='root','password'='root','database-name'='test','table-name'='stu3');

(2)创建Kafka表

createtable stu3_binlog_sink_kafka(
 id bigintnotnull,
 name string,
 school string,
 nickname string,
 age intnotnull,
 class_num intnotnull,
 phone bigintnotnull,
 email string,
 ip string,primarykey(id)not enforced
)with('connector'='upsert-kafka','topic'='cdc_mysql_stu3_sink','properties.zookeeper.connect'='centos01:2181','properties.bootstrap.servers'='centos01:9092','key.format'='json','value.format'='json');

(3)将mysql binlog日志写入kafka

insertinto stu3_binlog_sink_kafka
select*from stu3_binlog;

3)flink读取kafka数据并写入hudi数据湖

(1)创建kafka源表

createtable stu3_binlog_source_kafka(
 id bigintnotnull,
 name string,
 school string,
 nickname string,
 age intnotnull,
 class_num intnotnull,
 phone bigintnotnull,
 email string,
 ip string
 )with('connector'='kafka','topic'='cdc_mysql_stu3_sink','properties.bootstrap.servers'='hadoop1:9092','format'='json','scan.startup.mode'='earliest-offset','properties.group.id'='testGroup');

(2)创建hudi目标表

createtable stu3_binlog_sink_hudi(
 id bigintnotnull,
 name string,`school` string,
 nickname string,
 age intnotnull,
 class_num intnotnull,
 phone bigintnotnull,
 email string,
 ip string,primarykey(id)not enforced
)
partitioned by(`school`)with('connector'='hudi','path'='hdfs://centos04:9000/tmp/hudi_flink/stu3_binlog_sink_hudi','table.type'='MERGE_ON_READ','write.option'='insert','write.precombine.field'='school');

(3)将kafka数据写入到hudi中

insertinto stu3_binlog_sink_hudi
select*from  stu3_binlog_source_kafka;

5.7.2 离线批量导入

如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。

(1)批量导入省去了 avro 的序列化以及数据的 merge 过程,后续不会再有去重操作,数据的唯一性需要自己来保证。
(2)bulk_insert 需要在 Batch Execuiton Mode 下执行更高效,Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie,避免 file handle 频繁切换导致性能下降。

SET execution.runtime-mode = batch; 
SET execution.checkpointing.interval =0;

(3)bulk_insert write task 的并发通过参数 write.tasks 指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task 的并发数就是划分的 bucket 数,当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 roll over 到新的文件句柄,所以最后:写文件数量 >= bulk_insert write task 数。

名称Required默认值说明write.operationTRUEupsert配置 bulk_insert 开启该功能write.tasksFALSE4bulk_insert 写 task 的并发,最后的文件数 >=write.taskswrite.bulk_insert.shuffle_by_partitionwrite.bulk_insert.shuffle_input(从 0.11 开始)FALSETRUE是否将数据按照 partition 字段 shuffle 再通过 write task 写入,开启该参数将减少小文件的数量 但是可能有数据倾斜风险write.bulk_insert.sort_by_partitionwrite.bulk_insert.sort_input(从 0.11 开始)FALSETRUE是否将数据线按照 partition 字段排序再写入,当一个 write task 写多个 partition,开启可以减少小文件数量write.sort.memory128sort 算子的可用 managed memory(单位 MB)

5.7.3 全量接增量

如果已经有全量的离线 Hoodie 表,需要接上实时写入,并且保证数据不重复,可以开启 index bootstrap 功能。

如果觉得流程冗长,

可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)


名称Required默认值说明index.bootstrap.enabledtruefalse开启索引加载,会将已存表的最新数据一次性加载到 state 中index.partition.regexfalse*设置正则表达式进行分区筛选,默认为加载全部分区

使用流程
(1) CREATE TABLE 创建和 Hoodie 表对应的语句,注意 table type 要正确
(2)设置 index.bootstrap.enabled = true开启索引加载功能
(3)重启任务将 index.bootstrap.enabled 关闭,参数配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同,可以重启避免 shuffle

说明:
索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索
finish loading the index under partition 和 Load records from file 日志来观察索引加载的进度

5.8 写入模式

5.8.1 Changelog 模式

​ 如果希望

Hoodie 保留消息的所有变更(I/-U/U/D)

,之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie 的 MOR 表通过行存原生支持保留消息的所有变更(format 层面的集成),通过

流读 MOR 表可以消费到所有的变更记录

1)WITH 参数
名称Required默认值说明changelog.enabledfalsefalse默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更。
​ 批(快照)读仍然会合并所有的中间结果,不管 format 是否已存储中间状态。

​ 开启 changelog.enabled 参数后,中间的变更也只是 Best Effort: 异步的压缩任务会将中间变更合并成 1 条,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。当然,通过调整压缩的 buffer 时间可以预留一定的时间 buffer 给 reader,比如调整压缩的两个参数:

​ Ø

compaction.delta_commits:5 

​ Ø

compaction.delta_seconds: 3600

说明:

Changelog 模式开启流读的话,要在 sql-client 里面设置参数:

set sql-client.execution.result-mode=tableau; 

或者

set sql-client.execution.result-mode=changelog;

否则中间结果在读的时候会被直接合并。

2)流读 changelog

仅在 0.10.0 支持,本 feature 为实验性。

开启 changelog 模式后,hudi 会保留一段时间的 changelog 供下游 consumer 消费,我们可以通过流读 ODS 层 changelog 接上 ETL 逻辑写入到 DWD 层,如下图的 pipeline:
在这里插入图片描述

​ 流读的时候我们要注意 changelog 有可能会被 compaction 合并掉,中间记录会消除,可能会影响计算结果,需要关注sql-client的属性(result-mode)同上。

3)案例演示

(1)使用changelog

setsql-client.execution.result-mode=tableau;CREATETABLE t6(
  id int,
  ts int,primarykey(id)not enforced
)WITH('connector'='hudi','path'='hdfs://centos04:9000/tmp/hudi_flink/t6','table.type'='MERGE_ON_READ','read.streaming.enabled'='true','read.streaming.check-interval'='4','changelog.enabled'='true');insertinto t6 values(1,1);insertinto t6 values(1,2);settable.dynamic-table-options.enabled=true;select*from t6/*+ OPTIONS('read.start-commit'='earliest')*/;selectcount(*)from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

(2)不使用changelog

CREATETABLE t6_v(
  id int,
  ts int,primarykey(id)not enforced
)WITH('connector'='hudi','path'='hdfs://centos04:9000/tmp/hudi_flink/t6','table.type'='MERGE_ON_READ','read.streaming.enabled'='true','read.streaming.check-interval'='4');select*from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;selectcount(*)from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;

5.8.2 Append 模式

从 0.10 开始支持

对于 INSERT 模式:

​ Ø MOR 默认会 apply 小文件策略: 会追加写 avro log 文件

​ Ø COW 每次直接写新的 parquet 文件,没有小文件策略

Hudi 支持丰富的 Clustering 策略,优化 INSERT 模式下的小文件问题:

1)Inline Clustering

只有 Copy On Write 表支持该模式
名称Required默认值说明write.insert.clusterfalsefalse是否在写入时合并小文件,COW 表默认 insert 写不合并小文件,开启该参数后,每次写入会优先合并之前的小文件(不会去重),吞吐会受影响
2) Async Clustering

​ 从 0.12 开始支持

(1)WITH参数
名称Required默认值说明clustering.schedule.enabledfalsefalse是否在写入时定时异步调度 clustering plan,默认关闭clustering.delta_commitsfalse4调度 clsutering plan 的间隔 commits,clustering.schedule.enabled 为 true 时生效clustering.async.enabledfalsefalse是否异步执行 clustering plan,默认关闭clustering.tasksfalse4Clustering task 执行并发clustering.plan.strategy.target.file.max.bytesfalse1024 * 1024 * 1024Clustering 单文件目标大小,默认 1GBclustering.plan.strategy.small.file.limitfalse600小于该大小的文件才会参与 clustering,默认600MBclustering.plan.strategy.sort.columnsfalseN/A支持指定特殊的排序字段clustering.plan.partition.filter.modefalseNONE支持NONE:不做限制RECENT_DAYS:按时间(天)回溯SELECTED_PARTITIONS:指定固定的 partitionclustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效,默认 2 天
(2)Clustering Plan Strategy

​ 支持定制化的 clustering 策略。
名称Required默认值说明clustering.plan.partition.filter.modefalseNONE支持· NONE:不做限制· RECENT_DAYS:按时间(天)回溯· SELECTED_PARTITIONS:指定固定的 partitionclustering.plan.strategy.daybased.lookback.partitionsfalse2RECENT_DAYS 生效,默认 2 天clustering.plan.strategy.cluster.begin.partitionfalseN/ASELECTED_PARTITIONS 生效,指定开始 partition(inclusive)clustering.plan.strategy.cluster.end.partitionfalseN/ASELECTED_PARTITIONS 生效,指定结束 partition(incluseve)clustering.plan.strategy.partition.regex.patternfalseN/A正则表达式过滤 partitionsclustering.plan.strategy.partition.selectedfalseN/A显示指定目标 partitions,支持逗号 , 分割多个 partition

5.9 Bucket索引

​ 默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当数据量比较大的时候,state的存储开销可能成为瓶颈,bucket 索引通过固定的 hash 策略,将相同 key 的数据分配到同一个 fileGroup 中,避免了索引的存储和查询开销。
名称Required默认值说明index.typefalseFLINK_STATE设置 BUCKET 开启 Bucket 索引功能hoodie.bucket.index.hash.fieldfalse主键可以设置成主键的子集hoodie.bucket.index.num.bucketsfalse4默认每个 partition 的 bucket 数,当前设置后则不可再变更。

(1)bucket index 没有 state 的存储计算开销,性能较好
(2)bucket index 无法扩 buckets,state index 则可以依据文件的大小动态扩容
(3)bucket index 不支持跨 partition 的变更(如果输入是 cdc 流则没有这个限制),state index 没有限制

5.10 Hudi Catalog

​ 从 0.12.0 开始支持,通过 catalog 可以管理 flink 创建的表,避免重复建表操作,另外 hms 模式的 catalog 支持自动补全 hive 同步参数。

-- DFS 模式 Catalog SQL样例:CREATE CATALOG hoodie_catalog
  WITH('type'='hudi','catalog.path'='${catalog 的默认路径}','mode'='dfs');-- Hms 模式 Catalog SQL 样例:CREATE CATALOG hoodie_catalog
  WITH('type'='hudi','catalog.path'='${catalog 的默认路径}','hive.conf.dir'='${hive-site.xml 所在的目录}','mode'='hms'-- 支持 'dfs' 模式通过文件系统管理表属性);

名称Required默认值说明catalog.pathtrue–默认的 catalog 根路径,用作表路径的自动推导,默认的表路径:

        c
       
       
        a
       
       
        t
       
       
        a
       
       
        l
       
       
        o
       
       
        g
       
       
        .
       
       
        p
       
       
        a
       
       
        t
       
       
        h
       
      
      
       /
      
     
     
      {catalog.path}/
     
    
   catalog.path/{db_name}/${table_name}default-databasefalsedefault默认的 database 名hive.conf.dirfalse–hive-site.xml 所在的目录,只在 hms 模式下生效modefalsedfs支持 hms模式通过 hive 管理元数据table.externalfalsefalse是否创建外部表,只在 hms 模式下生效
案例如下:
--(1)创建sql-client初始化sql文件
vim /opt/apps/flink-1.13.6/conf/sql-client-init.sqlCREATE CATALOG hoodie_catalog
  WITH('type'='hudi','catalog.path'='/tmp/hudi_catalog','mode'='dfs');USE CATALOG hoodie_catalog;--(2)指定sql-client启动时加载sql文件
hadoop fs -mkdir /tmp/hudi_catalog

bin/sql-client.sh embedded -i conf/sql-client-init.sql-s yarn-session--(3)建库建表插入createdatabase test;use test;createtable t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),`partition`varchar(20),primarykey(uuid)not enforced
)with('connector'='hudi','path'='/tmp/hudi_catalog/default/t2','table.type'='MERGE_ON_READ');insertinto t2 values('1','zs',18,TIMESTAMP'1970-01-01 00:00:01','a');--(4)退出sql-client,重新进入,表信息还在use test;showtables;select*from t2;

5.11 离线压缩

MOR 表的 compaction 默认是自动打开的,策略是 5 个 commits 执行一次压缩。

因为压缩操作比较耗费内存,和写流程放在同一个 pipeline,在数据量比较大的时候(10w+/s qps),容易干扰写流程,

此时采用离线定时任务的方式执行 compaction 任务更稳定

5.11.1 设置参数

Ø

compaction.async.enabled

为 false,

关闭在线 compaction

Ø

compaction.schedule.enabled 仍然保持开启,由写任务阶段性触发压缩 plan

5.11.2 原理

一个 compaction 的任务的执行包括两部分:

Ø schedule 压缩 plan

该过程推荐由写任务定时触发,写参数

compaction.schedule.enabled

默认开启

Ø 执行对应的压缩 plan

5.11.3 使用方式

1)执行命令

离线 compaction 需要手动执行 Java 程序,程序入口:

Ø

hudi-flink1.13-bundle-0.12.0.jar

Ø

org.apache.hudi.sink.compact.HoodieFlinkCompactor
# 命令行的方式
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:9000/table

2)参数配置
参数名required默认值备注–pathtrue–目标表的路径–compaction-tasksfalse-1压缩 task 的并发,默认是待压缩 file group 的数量–compaction-max-memoryfalse100 (单位 MB)压缩时 log 数据的索引 map,默认 100MB,内存足够可以开大些–schedulefalsefalse是否要执行 schedule compaction 的操作,当写流程还在持续写入表数据的时候,开启这个参数有丢失查询数据的风险,所以开启该参数一定要保证当前没有任务往表里写数据, 写任务的 compaction plan 默认是一直 schedule 的,除非手动关闭(默认 5 个 commits 一次压缩)–seqfalseLIFO执行压缩任务的顺序,默认是从最新的压缩 plan 开始执行,可选值:LIFO: 从最新的 plan 开始执行;FIFO: 从最老的 plan 开始执行–servicefalsefalse是否开启 service 模式,service 模式会打开常驻进程,一直监听压缩任务并提交到集群执行(从 0.11 开始执行)–min-compaction-interval-secondsfalse600 (单位 秒)service 模式下的执行间隔,默认 10 分钟

案例如下

createtable t7(
  id int,
  ts int,primarykey(id)not enforced
)with('connector'='hudi','path'='/tmp/hudi_catalog/default/t7','compaction.async.enabled'='false',-- 关闭自动压缩'compaction.schedule.enabled'='true',-- 由写任务阶段性触发压缩 plan'table.type'='MERGE_ON_READ');insertinto t7 values(1,1);insertinto t7 values(2,2);insertinto t7 values(3,3);insertinto t7 values(4,4);insertinto t7 values(5,5);// 命令行的方式./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t7

5.12 离线 Clustering

​ 异步的 clustering 相对于 online 的 async clustering 资源隔离,从而更加稳定。

5.12.1 设置参数

Ø

 clustering.async.enabled

为 false,关闭在线 clustering。

Ø

clustering.schedule.enabled

仍然保持开启,由写任务阶段性触发 clustering plan。

5.12.2 原理

一个 clustering 的任务的执行包括两部分:

Ø schedule plan

推荐由写任务定时触发,写参数

clustering.schedule.enabled

默认开启。

Ø 执行对应的 plan

5.12.3 使用方式

1)执行命令

离线 clustering 需要手动执行 Java 程序,程序入口:

Ø

hudi-flink1.13-bundle-0.12.0.jar

Ø

org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob

注意:必须是分区表,否则报错空指针异常。

# 命令行的方式

./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/table

2)参数配置
参数名required默认值备注–pathtrue–目标表的路径。–clustering-tasksfalse-1Clustering task 的并发,默认是待压缩 file group 的数量。–schedulefalsefalse是否要执行 schedule clustering plan 的操作,当写流程还在持续写入表数据的时候,开启这个参数有丢失查询数据的风险,所以开启该参数一定要保证当前没有任务往表里写数据, 写任务的 clustering plan 默认是一直 schedule 的,除非手动关闭(默认 4 个 commits 一次 clustering)。–seqfalseFIFO执行压缩任务的顺序,默认是从最老的 clustering plan 开始执行,可选值:LIFO: 从最新的 plan 开始执行;FIFO: 从最老的 plan 开始执行–target-file-max-bytesfalse1024 * 1024 * 1024最大目标文件,默认 1GB。–small-file-limitfalse600小于该大小的文件会参与 clustering,默认 600MB。–sort-columnsfalseN/AClustering 可选排序列。–servicefalsefalse是否开启 service 模式,service 模式会打开常驻进程,一直监听压缩任务并提交到集群执行(从 0.11 开始执行)。–min-compaction-interval-secondsfalse600 (单位 秒)service 模式下的执行间隔,默认 10 分钟。
3)案例演示

createtable t8(
  id int,
  age int,
  ts int,primarykey(id)not enforced
) partitioned by(age)with('connector'='hudi','path'='/tmp/hudi_catalog/default/t8','clustering.async.enabled'='false','clustering.schedule.enabled'='true','table.type'='COPY_ON_WRITE');insertinto t8 values(1,18,1);insertinto t8 values(2,18,2);insertinto t8 values(3,18,3);insertinto t8 values(4,18,4);insertinto t8 values(5,18,5);-- 命令行的方式./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://centos04:9000/tmp/hudi_catalog/default/t8

5.12.4 常见问题

# 存储一直看不到数据
    如果是 streaming 写,请确保开启 checkpoint,Flink 的 writer 有 3 种刷数据到磁盘的策略:
当某个 bucket 在内存积攒到一定大小 (可配,默认 64MB)
当总的 buffer 大小积攒到一定大小(可配,默认 1GB)
当 checkpoint 触发,将内存里的数据全部 flush 出去

# 数据有重复
    如果是 COW 写,需要开启参数 write.insert.drop.duplicates,COW 写每个 bucket 的第一个文件默认是不去重的,只有增量的数据会去重,全局去重需要开启该参数;MOR 写不需要开启任何参数,定义好 primary key 后默认全局去重。(注意:从 0.10 版本开始,该属性改名 write.precombine 并且默认为 true。)
    
    如果需要多 partition 去重,需要开启参数: index.global.enabled 为 true。(注意:从 0.10 版本开始,该属性默认true。)
    
    索引 index 是判断数据重复的核心数据结构,index.state.ttl 设置了索引保存的时间,默认为 1.5 天,对于长时间周期的更新,比如更新一个月前的数据,需要将 index.state.ttl 调大(单位天),设置小于 0 代表永久保存。(注意:从 0.10 版本开始,该属性默认为 0。)
    
    
# Merge On Read 写只有 log 文件
    Merge On Read 默认开启了异步的 compaction,策略是 5 个 commits 压缩一次,当条件满足参会触发压缩任务,另外,压缩本身因为耗费资源,所以不一定能跟上写入效率,可能会有滞后。

5.13 Hudi核心原理

5.13.1 Hudi数据去重原理

Hoodie 的数据去重分两步:

(1)写入前攒 buffer 阶段去重,核心接口HoodieRecordPayload#preCombine

(2)写入过程中去重,核心接口HoodieRecordPayload#combineAndGetUpdateValue。

1)消息版本新旧

​ 相同 record key (主键)的数据通过

write.precombine.field

指定的字段来判断哪个更新,即 precombine 字段更大的 record 更新,如果是相等的 precombine 字段,则后来的数据更新。

​ 从 0.10 版本开始,write.precombine.field 字段为可选,

如果没有指定,会看 schema 中是否有 ts 字段,如果有,ts 字段被选为 precombine 字段;如果没有指定,schema 中也没有 ts 字段,则为处理顺序:后来的消息默认较新

2)攒消息阶段的去重

​ Hoodie 将 buffer 消息发给 write handle 之前可以执行一次去重操作,通过HoodieRecordPayload#preCombine 接口,保留 precombine 字段较大的消息,此操作为纯内存的计算,在同一个 write task 中为单并发执行。

​ 注意:write.precombine 选项控制了攒消息的去重。

3)写 parquet 增量消息的去重

​ 在Hoodie 写入流程中,Hoodie 每写一个 parquet 都会有 base + 增量 merge 的过程,增量的消息会先放到一个 spillable map 的数据结构构建内存 index,这里的增量数据如果没有提前去重,那么同 key 的后来消息会直接覆盖先来的消息。

​ Writer 接着扫 base 文件,过程中会不断查看内存 index 是否有同 key 的新消息,如果有,会走 HoodieRecordPayload#combineAndGetUpdateValue 接口判断保留哪个消息。

​ 注意: MOR 表的 compaction 阶段和 COW 表的写入流程都会有 parquet 增量消息去重的逻辑。

4)跨 partition 的消息去重

​ 默认情况下,不同的 partition 的消息是不去重的,即相同的 key 消息,如果新消息换了 partition,那么老的 partiiton 消息仍然保留。

​ 开启

index.global.enabled

选项开启跨 partition 去重,原理是先往老的 partiton 发一条删除消息,再写新 partition。

5.13.2 Hudi表写入原理

数据写入、数据压缩与数据清理
1)数据写入分析
(1)基础数据封装:将数据流中flink的RowData封装成Hoodie实体;
(2)BucketAssigner:桶分配器,主要是给数据分配写入的文件地址:若为插入操作,则取大小最小的FileGroup对应的FileId文件内进行插入;在此文件的后续写入中文件 ID 保持不变,并且提交时间会更新以显示最新版本。这也意味着记录的任何特定版本,给定其分区路径,都可以使用文件 ID 和 instantTime进行唯一定位;若为更新操作,则直接在当前location进行数据更新;
(3)Hoodie Stream Writer: 数据写入,将数据缓存起来,在超过设置的最大flushSize或是做checkpoint时进行刷新到文件中;
(4)Oprator Coordinator:主要与Hoodie Stream Writer进行交互,处理checkpoint等事件,在做checkpoint时,提交instant到timeLine上,并生成下一个instant的时间,算法为取当前最新的commi时间,比对当前时间与commit时间,若当前时间大于commit时间,则返回,否则一直循环等待生成。

2)数据压缩
    压缩(compaction)用于在 MergeOnRead存储类型时将基于行的log日志文件转化为parquet列式数据文件,用于加快记录的查找。compaction首先会遍历各分区下最新的parquet数据文件和其对应的log日志文件进行合并,并生成新的FileSlice,在TimeLine 上提交新的Instance:

具体策略分为4种,具体见官网说明:
compaction.trigger.strategy:
Strategy to trigger compaction, options are 
1.'num_commits': trigger compaction when reach N delta commits;2.'time_elapsed': trigger compaction when time elapsed > N seconds since last compaction;3.'num_and_time': trigger compaction when both NUM_COMMITS and TIME_ELAPSED are satisfied;4.'num_or_time': trigger compaction when NUM_COMMITS or TIME_ELAPSED is satisfied. Default is 'num_commits'
Default Value: num_commits (Optional)
    在项目实践中需要注意参数'read.streaming.skip_compaction' 参数的配置,其表示在流式读取该表是否跳过压缩后的数据,若该表用于后续聚合操作表的输入表,则需要配置值为true,表示聚合操作表不再消费读取压缩数据。若不配置或配置为false,则该表中的数据在未被压缩之前被聚合操作表读取了一次,在压缩后数据又被读取一次,会导致聚合表的sum、count等算子结果出现双倍情况。

3)数据清理
    随着用户向表中写入更多数据,对于每次更新,Hudi会生成一个新版本的数据文件用于保存更新后的记录(COPY_ON_WRITE)或将这些增量更新写入日志文件以避免重写更新版本的数据文件(MERGE_ON_READ)。在这种情况下,根据更新频率,文件版本数可能会无限增长,但如果不需要保留无限的历史记录,则必须有一个流程(服务)来回收旧版本的数据,这就是 Hudi 的清理服务。具体清理策略可参考官网,一般使用的清理策略为:KEEP_LATEST_FILE_VERSIONS:此策略具有保持 N 个文件版本而不受时间限制的效果。会删除N之外的FileSlice。

5.13.3 Hudi表读取原理

(1)开启split_monitor算子,每隔N秒(可配置)监听TimeLine上变化,并将变更的Instance封装为FileSlice。

(2)分发log文件时候,按照fileId值进行keyBy,保证同一file group下数据文件都给一个Task进行处理,从而保证数据处理的有序性。

(3)split_reader根据FileSlice信息进行数据读取。

标签: flink hadoop

本文转载自: https://blog.csdn.net/qq_44665283/article/details/129371508
版权归原作者 undo_try 所有, 如有侵权,请联系我们删除。

“数据湖架构Hudi(五)Hudi集成Flink案例详解”的评论:

还没有评论