0


Hudi(17):Hudi集成Flink之写入方式

0. 相关文章链接

** Hudi文章汇总 **

1. CDC 数据同步

CDC 数据保存了完整的数据库变更,当前可通过两种途径将数据导入 hudi:

  • 第一种:通过 cdc-connector 直接对接 DB 的 binlog 将数据导入 hudi,优点是不依赖消息队列,缺点是对 db server 造成压力。
  • 第二种:对接 cdc format 消费 kafka 数据导入 hudi,优点是可扩展性强,缺点是依赖 kafka。

注意:如果上游数据无法保证顺序,需要指定 write.precombine.field 字段。

1.1. 准备MySQL表

注意:需要提前开启MySQL的binlog

测试表建表语句如下所示:

create database test;
use test;
create table stu3 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;

1.2. flink读取mysql binlog并写入kafka

步骤一:创建MySQL表(使用flink-sql创建MySQL源的sink表)

create table stu3_binlog(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'mysql-cdc',
  'hostname' = 'hadoop1',
  'port' = '3306',
  'username' = 'root',
  'password' = 'aaaaaa',
  'database-name' = 'test',
  'table-name' = 'stu3'
);

步骤二:创建Kafka表(使用flink-sql创建MySQL源的sink表)

create table stu3_binlog_sink_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with (
  'connector' = 'upsert-kafka'
  ,'topic' = 'cdc_mysql_stu3_sink'
  ,'properties.zookeeper.connect' = 'hadoop1:2181'
  ,'properties.bootstrap.servers' = 'hadoop1:9092'
  ,'key.format' = 'json'
  ,'value.format' = 'json'
);

步骤三:将mysql binlog日志写入kafka(flink-sql内部操作)

insert into stu3_binlog_sink_kafka
select * from stu3_binlog;

1.3. flink读取kafka数据并写入hudi数据湖

步骤一:创建kafka源表(使用flink-sql创建以kafka为源端的表)

create table stu3_binlog_source_kafka(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
) with (
  'connector' = 'kafka',
  'topic' = 'cdc_mysql_stu3_sink',
  'properties.bootstrap.servers' = 'hadoop1:9092',
  'format' = 'json',
  'scan.startup.mode' = 'earliest-offset',
  'properties.group.id' = 'testGroup'
);

步骤二:创建hudi目标表(使用flink-sql创建以hudi为目标端的表)

create table stu3_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
partitioned by (`school`)
with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'insert',
  'write.precombine.field' = 'school'
);

步骤三:将kafka数据写入到hudi中(flink-sql内部操作)

insert into stu3_binlog_sink_hudi
select * from  stu3_binlog_source_kafka;

1.4. 使用datafaker插入数据

datafaker安装及说明:datafaker --- 测试数据生成工具-阿里云开发者社区

步骤一:新建meta.txt文件

id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]

步骤二:生成10000条数据并写入到mysql中的test.stu3表

datafaker rdb mysql+mysqldb://root:aaaaaa@hadoop1:3306/test?charset=utf8 stu3 10000 --meta meta.txt

注意:如果要再次生成测试数据,则需要修改meta.txt将自增id中的1改为比10000大的数,不然会出现主键冲突情况。

1.5. 统计数据入Hudi情况

create table stu3_binlog_hudi_view(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school'
  );

select count(*) from stu3_binlog_hudi_view;  

1.6. 实时查看数据入湖情况

create table stu3_binlog_hudi_streaming_view(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.precombine.field' = 'school',
  'read.streaming.enabled' = 'true'
  );

 
select * from  stu3_binlog_hudi_streaming_view;

2. 离线批量导入

如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。

2.1. 原理

  • 批量导入省去了 avro 的序列化以及数据的 merge 过程,后续不会再有去重操作,数据的唯一性需要自己来保证。
  • bulk_insert 需要在 Batch Execuiton Mode 下执行更高效,Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie,避免 file handle 频繁切换导致性能下降。
SET execution.runtime-mode = batch; 
SET execution.checkpointing.interval = 0;
  • bulk_insert write task 的并发通过参数 write.tasks 指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task 的并发数就是划分的 bucket 数,当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 roll over 到新的文件句柄,所以最后:写文件数量 >= bulk_insert write task 数。

2.2. WITH 参数

名称

Required

默认值

说明

write.operation

true

upsert

配置 bulk_insert 开启该功能

write.tasks

false

4

bulk_insert 写 task 的并发,最后的文件数 >=write.tasks

write.bulk_insert.shuffle_by_partition

write.bulk_insert.shuffle_input

(从 0.11 开始)

false

true

是否将数据按照 partition 字段 shuffle 再通过 write task 写入,开启该参数将减少小文件的数量但是可能有数据倾斜风险

write.bulk_insert.sort_by_partition

write.bulk_insert.sort_input

(从 0.11 开始)

false

true

是否将数据线按照 partition 字段排序再写入,当一个 write task 写多个 partition,开启可以减少小文件数量

write.sort.memory

128

sort 算子的可用 managed memory(单位 MB)

2.3. 案例

步骤一:MySQL建表

create database test;
use test;
create table stu4 (
  id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  score decimal(4,2) not null comment '成绩',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址'
) engine=InnoDB default charset=utf8;

步骤二:新建meta.txt文件

id||int||自增id[:inc(id,1)]
name||varchar(20)||学生名字
school||varchar(20)||学校名字[:enum(qinghua,beida,shanghaijiaoda,fudan,xidian,zhongda)]
nickname||varchar(20)||学生小名[:enum(tom,tony,mick,rich,jasper)]
age||int||学生年龄[:age]
score||decimal(4,2)||成绩[:decimal(4,2,1)]
class_num||int||班级人数[:int(10, 100)]
phone||bigint||电话号码[:phone_number]
email||varchar(64)||家庭网络邮箱[:email]
ip||varchar(32)||IP地址[:ipv4]

步骤三:使用datafaker生成10万条数据并写入到mysql中的test.stu4表

datafaker rdb mysql+mysqldb://root:aaaaaa@hadoop1:3306/test?charset=utf8 stu4 100000 --meta meta.txt

注意:
    如果要再次生成测试数据,
    则需要将meta.txt中的自增id改为比100000大的数,
    不然会出现主键冲突情况。

步骤四:Flink SQL client 创建myql数据源

create table stu4(
  id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  PRIMARY KEY (id) NOT ENFORCED
) with (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://hadoop1:3306/test?serverTimezone=GMT%2B8',
  'username' = 'root',
  'password' = 'aaaaaa',
  'table-name' = 'stu4'
);

步骤五:Flink SQL client创建hudi表

 create table stu4_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
 score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'bulk_insert',
  'write.precombine.field' = 'school'
);

-- 注意:核心点是其中的 write.option 配置为 bulk_insert

步骤六:Flink SQL client执行mysql数据插入到hudi中

insert into stu4_sink_hudi select * from stu4;

3. 全量接增量

  • 如果已经有全量的离线 Hoodie 表,需要接上实时写入,并且保证数据不重复,可以开启 index bootstrap 功能。
  • 如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。

3.1. WITH 参数

名称

Required

默认值

说明

index.bootstrap.enabled

true

false

开启索引加载,会将已存表的最新数据一次性加载到 state 中

index.partition.regex

false

设置正则表达式进行分区筛选,默认为加载全部分区

3.2. 使用流程

  1. CREATE TABLE 创建和 Hoodie 表对应的语句,注意 table type 要正确
  2. 设置 index.bootstrap.enabled = true开启索引加载功能
  3. flink conf 中设置 checkpoint 失败容忍 execution.checkpointing.tolerable-failed-checkpoints = n(取决于checkpoint 调度次数)
  4. 等待第一次 checkpoint 成功,表示索引加载完成
  5. 索引加载完成后可以退出并保存 savepoint (也可以直接用 externalized checkpoint)
  6. 重启任务将 index.bootstrap.enabled 关闭,参数配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同,可以重启避免 shuffle

3.3. 说明

  1. 索引加载是阻塞式,所以在索引加载过程中 checkpoint 无法完成
  2. 索引加载由数据流触发,需要确保每个 partition 都至少有1条数据,即上游 source 有数据进来
  3. 索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索 finish loading the index under partition 和 Load records from file 日志来观察索引加载的进度
  4. 第一次checkpoint成功就表示索引已经加载完成,后续从 checkpoint 恢复时无需再次加载索引

注意:在当前的0.12版本,以上划横线的部分已经不再需要了。(0.9 cherry pick 分支之后)


注:**其他Hudi相关文章链接由此进 -> Hudi文章汇总 **



本文转载自: https://blog.csdn.net/yang_shibiao/article/details/128728829
版权归原作者 电光闪烁 所有, 如有侵权,请联系我们删除。

“Hudi(17):Hudi集成Flink之写入方式”的评论:

还没有评论