0


【数据湖Hudi-10-Hudi集成Flink-读取方式&限流&写入方式&写入模式&Bucket索引】

数据湖Hudi-10-Hudi集成Flink-读取方式&限流&写入方式

一、读取方式

1 流读(Streaming Query)

当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。

  • 1.with参数
    名称Required默认值说明read.streaming.enabledfalsefalse设置 true 开启流读模式read.start-commitfalse最新 commit指定 ‘yyyyMMddHHmmss’ 格式的起始 commit(闭区间)read.streaming.skip_compactionfalsefalse流读时是否跳过 compaction 的 commits,跳过 compaction 有两个用途:1)避免 upsert 语义下重复消费 (compaction 的 instant 为重复数据,如果不跳过,有小概率会重复消费)2) changelog 模式下保证语义正确性 0.11 开始,以上两个问题已经通过保留 compaction 的 instant time 修复clean.retain_commitsfalse10cleaner 最多保留的历史 commits 数,大于此数量的历史 commits 会被清理掉,changelog 模式下,这个参数可以控制 changelog 的保留时间,例如 checkpoint 周期为 5 分钟一次,默认最少保留 50 分钟的时间。
    注意:当参数 read.streaming.skip_compaction 打开并且 streaming reader 消费落后于clean.retain_commits 数时,流读可能会丢失数据。从 0.11 开始,compaction 不会再变更 record 的 instant time,因此理论上数据不会再重复消费,但是还是会重复读取并丢弃,因此额外的开销还是无法避免,对性能有要求的话还是可以开启此参数。
案例展示:
CREATETABLEt5(
  uuid VARCHAR(20)PRIMARYKEYNOTENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20))WITH(
  'connector' = 'hudi','path'= 'hdfs://hadoop102:8020/tmp/hudi_flink/t5',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'   -- 默认60s
);

insert into t5 select * from sourceT;

select * from t5;

二、限流

限流的逻辑是,源头数据量级很大,百亿级别。
下面是数据流向图:
全量&增量数据 --> kafka --> flink --> hudi

  • 限流,是限制的flink写出到hudi的速度。这样就减少了flink的背压,消费按照给定速率消费。 这样就可以提高作业的稳定性。

如果将全量数据(百亿数量级) 和增量先同步到 kafka,再通过 flink 流式消费的方式将库表数据直接导成 hoodie 表,因为直接消费全量部分数据:量大(吞吐高)、乱序严重(写入的 partition 随机),会导致写入性能退化,出现吞吐毛刺,这时候可以开启限速参数,保证流量平稳写入。
WITH 参数
名称Required默认值说明write.rate.limitfalse0默认关闭限速

三、写入方式

1.CDC 数据同步

CDC 数据保存了完整的数据库变更,当前可通过两种途径将数据导入 hudi:
在这里插入图片描述
第一种:通过 cdc-connector 直接对接 DB 的 binlog 将数据导入 hudi,优点是不依赖消息队列,缺点是对 db server 造成压力。
第二种:对接 cdc format 消费 kafka 数据导入 hudi,优点是可扩展性强,缺点是依赖 kafka。

使用mysql进行案例分析:

1.使用第二种方式 cdc+kafka进行mysql数据同步到hudi

  • 1)准备MySQL表 (1)MySQL开启binlog (2)建表
create database test;
use test;
create table stu3 (id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址')engine=InnoDB default charset=utf8;
  • 2)flink读取mysql binlog并写入kafka (1)创建MySQL表
create table stu3_binlog(id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with ('connector'='mysql-cdc',
  'hostname'='hadoop1',
  'port'='3306',
  'username'='root',
  'password'='aaaaaa',
  'database-name'='test',
  'table-name'='stu3');

(2)创建Kafka表

create table stu3_binlog_sink_kafka(id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
) with ('connector'='upsert-kafka'
  ,'topic'='cdc_mysql_stu3_sink'
  ,'properties.zookeeper.connect'='hadoop1:2181'
  ,'properties.bootstrap.servers'='hadoop1:9092'
  ,'key.format'='json'
  ,'value.format'='json');

(3)将mysql binlog日志写入kafka

insert into stu3_binlog_sink_kafka
select * from stu3_binlog;
  • 3)flink读取kafka数据并写入hudi数据湖 (1)创建kafka源表
create table stu3_binlog_source_kafka(id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string
 ) with ('connector'='kafka',
  'topic'='cdc_mysql_stu3_sink',
  'properties.bootstrap.servers'='hadoop1:9092',
  'format'='json',
  'scan.startup.mode'='earliest-offset',
  'properties.group.id'='testGroup');

(2)创建hudi目标表

create table stu3_binlog_sink_hudi(id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with ('connector'='hudi',
  'path'='hdfs://hadoop1:8020/tmp/hudi_flink/stu3_binlog_sink_hudi',
  'table.type'='MERGE_ON_READ',
  'write.option'='insert',
  'write.precombine.field'='school');

(3)将kafka数据写入到hudi中

insert into stu3_binlog_sink_hudi
select * from  stu3_binlog_source_kafka;
  • 5)统计数据入Hudi情况
create table stu3_binlog_hudi_view(id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with ('connector'='hudi',
  'path'='hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type'='MERGE_ON_READ',
  'write.precombine.field'='school');select count(*) from stu3_binlog_hudi_view;
  • 6)实时查看数据入湖情况
create table stu3_binlog_hudi_streaming_view(id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with ('connector'='hudi',
  'path'='hdfs://hadoop1:8020/tmp/stu3_binlog_sink_hudi',
  'table.type'='MERGE_ON_READ',
  'write.precombine.field'='school',
  'read.streaming.enabled'='true');select * from  stu3_binlog_hudi_streaming_view;

2.离线批量导入

如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。

  • 1)原理 (1)批量导入省去了 avro 的序列化以及数据的 merge 过程,后续不会再有去重操作,数据的唯一性需要自己来保证。 (2)bulk_insert 需要在 Batch Execuiton Mode 下执行更高效,Batch 模式默认会按照 partition path 排序输入消息再写入 Hoodie,避免 file handle 频繁切换导致性能下降。
SET execution.runtime-mode = batch; 
SET execution.checkpointing.interval =0;

(3)bulk_insert write task 的并发通过参数 write.tasks 指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task 的并发数就是划分的 bucket 数,当然每个 bucket 在写到文件大小上限(parquet 120 MB)的时候会 roll over 到新的文件句柄,所以最后:写文件数量 >= bulk_insert write task 数。

  • 2)WITH 参数
    名称Required默认值说明write.operationtrueupsert配置 bulk_insert 开启该功能write.tasksfalse4bulk_insert 写 task 的并发,最后的文件数 >=write.taskswrite.bulk_insert.shuffle_by_partition write.bulk_insert.shuffle_input(从 0.11 开始)falsetrue是否将数据按照 partition 字段 shuffle 再通过 write task 写入,开启该参数将减少小文件的数量 但是可能有数据倾斜风险write.bulk_insert.sort_by_partition write.bulk_insert.sort_input(从 0.11 开始)falsetrue是否将数据线按照 partition 字段排序再写入,当一个 write task 写多个 partition,开启可以减少小文件数量write.sort.memory128sort 算子的可用 managed memory(单位 MB)

  • 3)案例 (1)MySQL建表

create database test;
use test;
create table stu4 (id int unsigned auto_increment primary key COMMENT '自增id',
  name varchar(20) not null comment '学生名字',
  school varchar(20) not null comment '学校名字',
  nickname varchar(20) not null comment '学生小名',
  age int not null comment '学生年龄',
  score decimal(4,2) not null comment '成绩',
  class_num int not null comment '班级人数',
  phone bigint not null comment '电话号码',
  email varchar(64) comment '家庭网络邮箱',
  ip varchar(32) comment 'IP地址')engine=InnoDB default charset=utf8;

(4)Flink SQL client 创建myql数据源

create table stu4(id bigint not null,
  name string,
  school string,
  nickname string,
  age int not null,
  score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  PRIMARY KEY (id) NOT ENFORCED
) with ('connector'='jdbc',
  'url'='jdbc:mysql://hadoop1:3306/test?serverTimezone=GMT%2B8',
  'username'='root',
  'password'='aaaaaa',
  'table-name'='stu4');

(5)Flink SQL client创建hudi表

create table stu4_sink_hudi(id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
 score decimal(4,2) not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with ('connector'='hudi',
  'path'='hdfs://hadoop1:8020/tmp/hudi_flink/stu4_sink_hudi',
  'table.type'='MERGE_ON_READ',
  'write.option'='bulk_insert',
  'write.precombine.field'='school');

(3)Flink SQL client执行mysql数据插入到hudi中

insert into stu4_sink_hudi select * from stu4;

3.全量接增量

如果已经有全量的离线 Hoodie 表,需要接上实时写入,并且保证数据不重复,可以开启 index bootstrap 功能。
如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。
名称Required默认值说明index.bootstrap.enabledtruefalse开启索引加载,会将已存表的最新数据一次性加载到 state 中index.partition.regexfalse*设置正则表达式进行分区筛选,默认为加载全部分区
使用流程
(1) CREATE TABLE 创建和 Hoodie 表对应的语句,注意 table type 要正确
(2)设置 index.bootstrap.enabled = true开启索引加载功能
(3)重启任务将 index.bootstrap.enabled 关闭,参数配置到合适的大小,如果RowDataToHoodieFunction 和 BootstrapFunction 并发不同,可以重启避免 shuffle

说明:
(1)索引加载为并发加载,根据数据量大小加载时间不同,可以在log中搜索
finish loading the index under partition 和 Load records from file 日志来观察索引加载的进度

四、写入模式

1、Changelog模式

如果希望 Hoodie 保留消息的所有变更(I/-U/U/D),之后接上 Flink 引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie 的 MOR 表通过行存原生支持保留消息的所有变更(format 层面的集成),通过流读 MOR 表可以消费到所有的变更记录。

  • 1)WITH 参数
    名称Required默认值说明changelog.enabledfalsefalse默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更。
    批(快照)读仍然会合并所有的中间结果,不管 format 是否已存储中间状态。
    开启 changelog.enabled 参数后,中间的变更也只是 Best Effort: 异步的压缩任务会将中间变更合并成 1 条,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。当然,通过调整压缩的 buffer 时间可以预留一定的时间 buffer 给 reader,比如调整压缩的两个参数:
compaction.delta_commits:5 
compaction.delta_seconds: 3600。

说明:
Changelog 模式开启流读的话,要在 sql-client 里面设置参数:

set sql-client.execution.result-mode=tableau; 
或者
set sql-client.execution.result-mode=changelog;

2)流读 changelog
仅在 0.10.0 支持,本 feature 为实验性。
开启 changelog 模式后,hudi 会保留一段时间的 changelog 供下游 consumer 消费,我们可以通过流读 ODS 层 changelog 接上 ETL 逻辑写入到 DWD 层,如下图的 pipeline:

流读的时候我们要注意 changelog 有可能会被 compaction 合并掉,中间记录会消除,可能会影响计算结果,需要关注sql-client的属性(result-mode)同上。
3)案例演示
(1)使用changelog

set sql-client.execution.result-mode=tableau;
CREATE TABLE t6(id int,
  ts int,
  primary key (id) not enforced
) WITH ('connector'='hudi',
  'path'='hdfs://hadoop1:8020/tmp/hudi_flink/t6',
  'table.type'='MERGE_ON_READ',
  'read.streaming.enabled'='true',
  'read.streaming.check-interval'='4',
  'changelog.enabled'='true');

insert into t6 values (1,1);
insert into t6 values (1,2);set table.dynamic-table-options.enabled=true;select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/;select count(*) from t6/*+ OPTIONS('read.start-commit'='earliest')*/;

(2)不使用changelog

CREATE TABLE t6_v(id int,
  ts int,
  primary key (id) not enforced
) WITH ('connector'='hudi',
  'path'='hdfs://hadoop1:8020/tmp/hudi_flink/t6',
  'table.type'='MERGE_ON_READ',
  'read.streaming.enabled'='true',
  'read.streaming.check-interval'='4');select * from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;select count(*) from t6_v/*+ OPTIONS('read.start-commit'='earliest')*/;

2 Append 模式

从 0.10 开始支持
对于 INSERT 模式:
MOR 默认会 apply 小文件策略: 会追加写 avro log 文件
COW 每次直接写新的 parquet 文件,没有小文件策略
Hudi 支持丰富的 Clustering 策略,优化 INSERT 模式下的小文件问题:

  • 1)Inline Clustering 只有 Copy On Write 表支持该模式在这里插入图片描述
  • 2) Async Clustering 从 0.12 开始支持

六、Bucket索引

从 0.11 开始支持
默认的 flink 流式写入使用 state 存储索引信息:primary key 到 fileId 的映射关系。当数据量比较大的时候,state的存储开销可能成为瓶颈,bucket 索引通过固定的 hash 策略,将相同 key 的数据分配到同一个 fileGroup 中,避免了索引的存储和查询开销。

1)WITH参数
名称Required默认值说明index.typefalseFLINK_STATE设置 BUCKET 开启 Bucket 索引功能hoodie.bucket.index.hash.fieldfalse主键可以设置成主键的子集hoodie.bucket.index.num.bucketsfalse4默认每个 partition 的 bucket 数,当前设置后则不可再变更。2)和 state 索引的对比:(1)bucket index 没有 state 的存储计算开销,性能较好(2)bucket index 无法扩 buckets,state index 则可以依据文件的大小动态扩容(3)bucket index 不支持跨 partition 的变更(如果输入是 cdc 流则没有这个限制),state index 没有限制

七、Hudi CataLog

从 0.12.0 开始支持,通过 catalog 可以管理 flink 创建的表,避免重复建表操作,另外 hms 模式的 catalog 支持自动补全 hive 同步参数。
DFS 模式 Catalog SQL样例:

CREATE CATALOG hoodie_catalog
  WITH ('type'='hudi',
    'catalog.path'='${catalog 的默认路径}',
    'mode'='dfs');
Hms 模式 Catalog SQL 样例:
CREATE CATALOG hoodie_catalog
  WITH ('type'='hudi',
    'catalog.path'='${catalog 的默认路径}',
    'hive.conf.dir'='${hive-site.xml 所在的目录}',
    'mode'='hms' -- 支持 'dfs' 模式通过文件系统管理表属性
  );
  • 1)WITH 参数
    名称Required默认值说明catalog.pathtrue–默认的 catalog 根路径,用作表路径的自动推导,默认的表路径:

           c 
          
         
           a 
          
         
           t 
          
         
           a 
          
         
           l 
          
         
           o 
          
         
           g 
          
         
           . 
          
         
           p 
          
         
           a 
          
         
           t 
          
         
           h 
          
         
        
          / 
         
        
       
         {catalog.path}/ 
        
       
     catalog.path/{db_name}/${table_name}default-databasefalsedefault默认的 database 名hive.conf.dirfalse–hive-site.xml 所在的目录,只在 hms 模式下生效modefalsedfs支持 hms模式通过 hive 管理元数据table.externalfalsefalse是否创建外部表,只在 hms 模式下生效
    
  • 2)使用dfs方式 (1)创建sql-client初始化sql文件

vim /opt/module/flink-1.13.6/conf/sql-client-init.sql

CREATE CATALOG hoodie_catalog
  WITH ('type'='hudi',
    'catalog.path'='/tmp/hudi_catalog',
    'mode'='dfs');

USE CATALOG hoodie_catalog;

(2)指定sql-client启动时加载sql文件

hadoop fs -mkdir /tmp/hudi_catalog

bin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session

(3)建库建表插入

create database test;
use test;

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20),
primary key (uuid) not enforced
)
with ('connector'='hudi',
  'path'='/tmp/hudi_catalog/default/t2',
  'table.type'='MERGE_ON_READ');

insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a');

(4)退出sql-client,重新进入,表信息还在

use test;
show tables;select * from t2;

七、离线 Compaction

MOR 表的 compaction 默认是自动打开的,策略是 5 个 commits 执行一次压缩。 因为压缩操作比较耗费内存,和写流程放在同一个 pipeline,在数据量比较大的时候(10w+/s qps),容易干扰写流程,此时采用离线定时任务的方式执行 compaction 任务更稳定。

  • 1 设置参数
compaction.async.enabled 为 false,关闭在线 compaction。
compaction.schedule.enabled 仍然保持开启,由写任务阶段性触发压缩 plan。
  • 2 原理 一个 compaction 的任务的执行包括两部分: schedule 压缩 plan 该过程推荐由写任务定时触发,写参数 compaction.schedule.enabled 默认开启 执行对应的压缩 plan
  • 3 使用方式 1)执行命令 离线 compaction 需要手动执行 Java 程序,程序入口:
// 命令行的方式
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table

2)案例演示
(1)创建表,关闭在线压缩

create table t7(id int,
  ts int,
  primary key (id) not enforced
)
with ('connector'='hudi',
  'path'='/tmp/hudi_catalog/default/t7',
  'compaction.async.enabled'='false',
  'compaction.schedule.enabled'='true',
  'table.type'='MERGE_ON_READ');

insert into t7 values(1,1);
insert into t7 values(2,2);
insert into t7 values(3,3);
insert into t7 values(4,4);
insert into t7 values(5,5);

// 命令行的方式

./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop1:8020/tmp/hudi_catalog/default/t7

八、离线 Clustering

异步的 clustering 相对于 online 的 async clustering 资源隔离,从而更加稳定。

  • 1 设置参数
clustering.async.enabled 为 false,关闭在线 clustering。
clustering.schedule.enabled 仍然保持开启,由写任务阶段性触发 clustering plan。
  • 2 原理 一个 clustering 的任务的执行包括两部分:- schedule plan 推荐由写任务定时触发,写参数 clustering.schedule.enabled 默认开启。- 执行对应的 plan
  • 3 使用方式 1)执行命令 离线 clustering 需要手动执行 Java 程序,程序入口:
// 命令行的方式
./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://xxx:8020/table

注意:必须是分区表,否则报错空指针异常。

2)案例演示
(1)创建表,关闭在线压缩
create table t8(id int,
  age int,
  ts int,
  primary key (id) not enforced
) partitioned by (age)
with ('connector'='hudi',
  'path'='/tmp/hudi_catalog/default/t8',
  'clustering.async.enabled'='false',
  'clustering.schedule.enabled'='true',
  'table.type'='COPY_ON_WRITE');

insert into t8 values(1,18,1);
insert into t8 values(2,18,2);
insert into t8 values(3,18,3);
insert into t8 values(4,18,4);
insert into t8 values(5,18,5);
// 命令行的方式
./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop1:8020/tmp/hudi_catalog/default/t8
标签: 大数据 hadoop

本文转载自: https://blog.csdn.net/weixin_38136584/article/details/128662074
版权归原作者 Apache Minor Trend 所有, 如有侵权,请联系我们删除。

“【数据湖Hudi-10-Hudi集成Flink-读取方式&限流&写入方式&写入模式&Bucket索引】”的评论:

还没有评论