1. 版本说明
组件版本hudi10.0flink13.5hive3.1.0
2. 实现效果 通过flink cdc 整合 hudi 到hive
flink cdc 讲解
flink cdc 1.2实例
flink cdc 2.0 实例
3.flink 需要的jar 包
需要的包:flink-connector-mysql-cdc-2.0.2.jar
-rw-r--r-- 1 root root 78023992月 16 00:36 doris-flink-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root 2495712月 16 00:36 flink-connector-jdbc_2.12-1.13.5.jar
-rw-r--r-- 1 root root 3591382月 16 00:36 flink-connector-kafka_2.12-1.13.5.jar
-rw-r--r-- 1 root root 300872682月 1722:12 flink-connector-mysql-cdc-2.0.2.jar
-rw-r--r-- 1 root root 923152月 16 00:36 flink-csv-1.13.5.jar
-rw-r--r-- 1 root root 1065358302月 16 00:36 flink-dist_2.12-1.13.5.jar
-rw-r--r-- 1 root root 1481272月 16 00:36 flink-json-1.13.5.jar
-rw-r--r-- 1 root root 433170252月 16 00:36 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-r--r-- 1 root root 77097402月 16 00:36 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root root 36741162月 16 00:36 flink-sql-connector-kafka_2.12-1.13.5.jar
-rw-r--r-- 1 root root 350515572月 16 00:35 flink-table_2.12-1.13.5.jar
-rw-r--r-- 1 root root 386133442月 16 00:36 flink-table-blink_2.12-1.13.5.jar
-rw-r--r-- 1 root root 624474682月 16 00:36 hudi-flink-bundle_2.12-0.10.0.jar
-rw-r--r-- 1 root root 172763482月 16 00:36 hudi-hadoop-mr-bundle-0.10.0.jar
-rw-r--r-- 1 root root 2079092月 16 00:36 log4j-1.2-api-2.16.0.jar
-rw-r--r-- 1 root root 3018922月 16 00:36 log4j-api-2.16.0.jar
-rw-r--r-- 1 root root 17895652月 16 00:36 log4j-core-2.16.0.jar
-rw-r--r-- 1 root root 242582月 16 00:36 log4j-slf4j-impl-2.16.0.jar
-rw-r--r-- 1 root root 7242132月 16 00:36 mysql-connector-java-5.1.9.jar
[root@node01 lib]# pwd
/opt/module/flink/flink-1.13.5/lib
[root@node01 lib]#
4. 实现功能场景
5. 实现步骤
1.创建数据库表,并且配置binlog 文件
2.在flinksql 中创建flink cdc 表
3.创建视图
4.创建输出表,关联Hudi表,并且自动同步到Hive表
5.查询视图数据,插入到输出表 -- flink 后台实时执行
5.1 开启mysql binlog
server-id=162
log-bin=mysql-bin
#sync-binlog=1# 指定不同步的库
binlog-ignore-db=information_schema
binlog-ignore-db=performance_schema
binlog-ignore-db=sys
binlog-ignore-db=mysql
binlog_format=ROW
expire_logs_days=30binlog_row_image=full
#指定同步的库#binlog-do-db=test
重启mysql service mysqld restart
5.2 创建mysql 表
CREATE TABLE `Flink_cdc`(`id` BIGINT(64) AUTO_INCREMENT PRIMARY KEY,
`name` VARCHAR(64) NULL,
`age` INT(20) NULL,
birthday TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);
INSERT INTO `wudldb`.`Flink_cdc`(NAME,age) VALUES("flink",18);
5.3 在flinksql 中 创建flinkcdc 表
Flink SQL> CREATE TABLE source_mysql (id BIGINT PRIMARY KEY NOT ENFORCED,
name STRING,
age INT,
birthday TIMESTAMP(3),
ts TIMESTAMP(3)) WITH ('connector'='mysql-cdc',
'hostname'='192.168.1.162',
'port'='3306',
'username'='root',
'password'='123456',
'server-time-zone'='Asia/Shanghai',
'debezium.snapshot.mode'='initial',
'database-name'='wudldb',
'table-name'='Flink_cdc');[INFO] Execute statement succeed.
5.4 创建flinksql 中的 flinkcdc 视图
Flink SQL> create view view_source_flinkcdc_mysql
> AS
> SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM source_mysql;[INFO] Execute statement succeed.
5.5 创建输出表,关联Hudi表,并且自动同步到Hive表
Flink SQL> CREATE TABLE flink_cdc_sink_hudi_hive(>id bigint ,
> name string,
> age int,
> birthday TIMESTAMP(3),
> ts TIMESTAMP(3),
> part VARCHAR(20),
> primary key(id) not enforced
>)> PARTITIONED BY (part)> with(>'connector'='hudi',
>'path'='hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive',
>'table.type'='MERGE_ON_READ',
>'hoodie.datasource.write.recordkey.field'='id',
>'write.precombine.field'='ts',
>'write.tasks'='1',
>'write.rate.limit'='2000',
>'compaction.tasks'='1',
>'compaction.async.enabled'='true',
>'compaction.trigger.strategy'='num_commits',
>'compaction.delta_commits'='1',
>'changelog.enabled'='true',
>'read.streaming.enabled'='true',
>'read.streaming.check-interval'='3',
>'hive_sync.enable'='true',
>'hive_sync.mode'='hms',
>'hive_sync.metastore.uris'='thrift://node02.com:9083',
>'hive_sync.jdbc_url'='jdbc:hive2://node02.com:10000',
>'hive_sync.table'='flink_cdc_sink_hudi_hive',
>'hive_sync.db'='db_hive',
>'hive_sync.username'='root',
>'hive_sync.password'='123456',
>'hive_sync.support_timestamp'='true'>);[INFO] Execute statement succeed.
5.6 . 查询视图数据,插入到输出表
Flink SQL> INSERT INTO flink_cdc_sink_hudi_hive SELECT id, name,age,birthday, ts, part FROM view_source_flinkcdc_mysql ;[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c618c9f528b9793adf4418640bb2a0fc
5.7 查看flink 运行job
6.hudi 与hive 整合
将hudi hudi-hadoop-mr-bundle-0.10.0.jar 拷贝到hive的lib 目录下面 , 重启hive 服务
6.1 连接hive 查看hudi 同步到hive 中的表
0: jdbc:hive2://node01.com:2181,node02.com:21> show tables;
INFO : Compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables
INFO : Semantic Analysis Completed (retrial =false)
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null)
INFO : Completed compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.016 seconds
INFO : Executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables
INFO : Starting task [Stage-0:DDL]in serial mode
INFO : Completed executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.012 seconds
INFO : OK
+------------------------------+
| tab_name |
+------------------------------+
| flink_cdc_sink_hudi_hive_ro || flink_cdc_sink_hudi_hive_rt |
+------------------------------+
hive 的两张表
ro类型是读优化查询 , rt 类型快照查询
6.1 查询
0: jdbc:hive2://node01.com:2181,node02.com:21>selectid ,name , age , birthday from flink_cdc_sink_hudi_hive_ro;
INFO : Compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): selectid ,name , age , birthday from flink_cdc_sink_hudi_hive_ro
INFO : Semantic Analysis Completed (retrial =false)
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:id, type:bigint, comment:null), FieldSchema(name:name, type:string, comment:null), FieldSchema(name:age, type:int, comment:null), FieldSchema(name:birthday, type:bigint, comment:null)], properties:null)
INFO : Completed compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.124 seconds
INFO : Executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): selectid ,name , age , birthday from flink_cdc_sink_hudi_hive_ro
INFO : Completed executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.029 seconds
INFO : OK
+-----+--------+------+----------------+
|id| name | age | birthday |
+-----+--------+------+----------------+
|1| flink |18|1645142397000|
+-----+--------+------+----------------+
1 row selected (0.278 seconds)0: jdbc:hive2://node01.com:2181,node02.com:21>
整体效果
错误 中途遇到一个错误
flinkcdc 需要的 flink-connector-mysql-cdc-2.0.2.jar 而不是 flink-sql-connector-mysql-cdc-2.0.2.jar 这个包
否在会遇到一下错误:
Flink SQL>select * from users_source_mysql;
Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/connect/data/Schema
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.getDeclaredMethod(Class.java:2128)
at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
at java.security.AccessController.doPrivileged(Native Method)
at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:597)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:457)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:378)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:179)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:117)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:934)
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:67)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)
at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:795)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213)
at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213)
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479)
at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412)
at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)... 1more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.connect.data.Schema
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 69more
Shutting down the session...
done.
[root@node01 bin]#
版权归原作者 wudl5566 所有, 如有侵权,请联系我们删除。