0


hudi-flink核心参数设置

Hudi参数设置

去重:

  1. 设置主键

-- 设置单个主键

create table hoodie_table (

f0 int primary key not enforced,

f1 varchar(20),

...

) with (

'connector' = 'hudi',

...

)

-- 设置联合主键

create table hoodie_table (

f0 int,

f1 varchar(20),

...

primary key(f0, f1) not enforced

) with (

'connector' = 'hudi',

...

)

名称

说明

默认值

备注

hoodie.datasource.write.recordkey.field

主键字段

--

支持主键语法 PRIMARY KEY 设置,支持逗号分隔的多个字段

precombine.field

(0.13.0 之前版本为

write.precombine.field)

去重时间字段

--

record 合并的时候会按照该字段排序,选值较大的 record 为合并结果;不指定则为处理序:选择后到的 record

并发参数

名称

说明

默认值

备注

write.tasks

writer 的并发,每个 writer 顺序写 1~N 个 buckets

4

增加并发对小文件个数没影响

write.bucket_assign.tasks

bucket assigner 的并发

Flink的并行度

增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件(小 bucket) 数

write.index_bootstrap.tasks

Index bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 checkpoint,因此需要设置多一些的 checkpoint 失败容忍次数

Flink的并行度

只在 index.bootstrap.enabled 为 true 时生效

read.tasks

读算子的并发(batch 和 stream)

4

compaction.tasks

online compaction 算子的并发

writer 的并发

online compaction 比较耗费资源,建议走 offline compaction

注:可以flink建表时在with中指定,或Hints临时指定参数的方式:在需要调整的表名后面加上 /*+ OPTIONS() */

开启set table.dynamic-table-options.enabled=true;

insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */

select * from sourceT;

压缩参数

在线压缩的参数,通过设置 compaction.async.enabled =false关闭在线压缩执行,但是调度compaction.schedule.enabled 仍然建议开启,之后通过离线压缩直接执行 在线压缩任务 阶段性调度的压缩 plan。

名称

说明

默认值

备注

compaction.schedule.enabled

是否阶段性生成压缩 plan

true

建议开启,即使compaction.async.enabled 关闭的情况下

compaction.async.enabled

是否开启异步压缩

true

通过关闭此参数关闭在线压缩

compaction.tasks

压缩 task 并发

4

compaction.trigger.strategy

压缩策略

num_commits

支持四种策略:num_commits、time_elapsed、num_and_time、

num_or_time

compaction.delta_commits

默认策略,5 个 commits 压缩一次

5

compaction.delta_seconds

3600

compaction.max_memory

压缩去重的 hash map 可用内存

100(MB)

资源够用的话建议调整到 1GB

compaction.target_io

每个压缩 plan 的 IO 上限,默认 5GB

500(GB)

小文件管理参数

Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用。在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小。

目前只有 log 文件的写入大小可以做到精确控制,parquet 文件大小按照估算值。

名称

说明

默认值

备注

hoodie.parquet.max.file.size

最大可写入的 parquet 文件大小

120 * 1024 * 1024

默认 120MB

(单位 byte)

超过该大小切新的 file group

hoodie.logfile.to.parquet.compression.ratio

log文件大小转 parquet 的比率

0.35

hoodie 统一依据 parquet 大小来评估小文件策略

hoodie.parquet.small.file.limit

在写入时,hudi 会尝试先追加写已存小文件,该参数设置了小文件的大小阈值,小于该参数的文件被认为是小文件

104857600

默认 100MB

(单位 byte)

大于 100MB,小于 120MB 的文件会被忽略,避免写过度放大

hoodie.copyonwrite.record.size.estimate

预估的 record 大小,hoodie 会依据历史的 commits 动态估算 record 的大小,但是前提是之前有单次写入超过

hoodie.parquet.small.file.limit 大小,在未达到这个大小时会使用这个参数

1024

默认 1KB

(单位 byte)

如果作业流量比较小,可以设置下这个参数

hoodie.logfile.max.size

LogFile最大大小。这是在将Log滚转到下一个版本之前允许的最大大小。

1073741824

默认1GB

(单位 byte)

限流

限流

如果将全量数据(百亿数量级) 和增量先同步到 kafka,再通过 flink 流式消费的方式将库表数据直接导成 hoodie 表,因为直接消费全量部分数据:量大(吞吐高)、乱序严重(写入的 partition 随机),会导致写入性能退化,出现吞吐毛刺,这时候可以开启限速参数,保证流量平稳写入。

WITH 参数

名称

Required

默认值

说明

write.rate.limit

false

0

默认关闭限速


本文转载自: https://blog.csdn.net/m0_71551473/article/details/127636450
版权归原作者 资源哥nni 所有, 如有侵权,请联系我们删除。

“hudi-flink核心参数设置”的评论:

还没有评论