0


flink cdc 整合 数据湖hudi 同步 hive

1. 版本说明

组件版本hudi10.0flink13.5hive3.1.0

2. 实现效果 通过flink cdc 整合 hudi 到hive

flink cdc 讲解

flink cdc 1.2实例
flink cdc 2.0 实例

3.flink 需要的jar 包

需要的包:flink-connector-mysql-cdc-2.0.2.jar
-rw-r--r-- 1 root root   78023992月  16 00:36 doris-flink-1.0-SNAPSHOT.jar
-rw-r--r-- 1 root root    2495712月  16 00:36 flink-connector-jdbc_2.12-1.13.5.jar
-rw-r--r-- 1 root root    3591382月  16 00:36 flink-connector-kafka_2.12-1.13.5.jar
-rw-r--r-- 1 root root  300872682月  1722:12 flink-connector-mysql-cdc-2.0.2.jar
-rw-r--r-- 1 root root     923152月  16 00:36 flink-csv-1.13.5.jar
-rw-r--r-- 1 root root 1065358302月  16 00:36 flink-dist_2.12-1.13.5.jar
-rw-r--r-- 1 root root    1481272月  16 00:36 flink-json-1.13.5.jar
-rw-r--r-- 1 root root  433170252月  16 00:36 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-r--r-- 1 root root   77097402月  16 00:36 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 root root   36741162月  16 00:36 flink-sql-connector-kafka_2.12-1.13.5.jar
-rw-r--r-- 1 root root  350515572月  16 00:35 flink-table_2.12-1.13.5.jar
-rw-r--r-- 1 root root  386133442月  16 00:36 flink-table-blink_2.12-1.13.5.jar
-rw-r--r-- 1 root root  624474682月  16 00:36 hudi-flink-bundle_2.12-0.10.0.jar
-rw-r--r-- 1 root root  172763482月  16 00:36 hudi-hadoop-mr-bundle-0.10.0.jar
-rw-r--r-- 1 root root    2079092月  16 00:36 log4j-1.2-api-2.16.0.jar
-rw-r--r-- 1 root root    3018922月  16 00:36 log4j-api-2.16.0.jar
-rw-r--r-- 1 root root   17895652月  16 00:36 log4j-core-2.16.0.jar
-rw-r--r-- 1 root root     242582月  16 00:36 log4j-slf4j-impl-2.16.0.jar
-rw-r--r-- 1 root root    7242132月  16 00:36 mysql-connector-java-5.1.9.jar
[root@node01 lib]# pwd
/opt/module/flink/flink-1.13.5/lib
[root@node01 lib]# 

4. 实现功能场景

在这里插入图片描述

5. 实现步骤

1.创建数据库表,并且配置binlog 文件
2.在flinksql 中创建flink cdc 表
3.创建视图
4.创建输出表,关联Hudi表,并且自动同步到Hive表
5.查询视图数据,插入到输出表 -- flink  后台实时执行

5.1 开启mysql binlog

server-id=162
log-bin=mysql-bin
#sync-binlog=1# 指定不同步的库
binlog-ignore-db=information_schema
binlog-ignore-db=performance_schema
binlog-ignore-db=sys
binlog-ignore-db=mysql
binlog_format=ROW
expire_logs_days=30binlog_row_image=full
#指定同步的库#binlog-do-db=test
重启mysql service mysqld restart

5.2 创建mysql 表

CREATE TABLE `Flink_cdc`(`id` BIGINT(64) AUTO_INCREMENT PRIMARY KEY,
  `name` VARCHAR(64)  NULL,
  `age` INT(20) NULL,
    birthday TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
   ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
);
INSERT INTO `wudldb`.`Flink_cdc`(NAME,age) VALUES("flink",18);

5.3 在flinksql 中 创建flinkcdc 表

Flink SQL> CREATE TABLE source_mysql (id BIGINT PRIMARY KEY NOT ENFORCED,
   name STRING,
   age INT,
   birthday TIMESTAMP(3),
   ts TIMESTAMP(3)) WITH ('connector'='mysql-cdc',
 'hostname'='192.168.1.162',
 'port'='3306',
 'username'='root',
 'password'='123456',
 'server-time-zone'='Asia/Shanghai',
 'debezium.snapshot.mode'='initial',
 'database-name'='wudldb',
 'table-name'='Flink_cdc');[INFO] Execute statement succeed.

5.4 创建flinksql 中的 flinkcdc 视图

Flink SQL> create view view_source_flinkcdc_mysql 
> AS 
> SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM source_mysql;[INFO] Execute statement succeed.

5.5 创建输出表,关联Hudi表,并且自动同步到Hive表

Flink SQL> CREATE TABLE flink_cdc_sink_hudi_hive(>id bigint ,
> name string,
> age int,
> birthday TIMESTAMP(3),
> ts TIMESTAMP(3),
> part VARCHAR(20),
> primary key(id) not enforced
>)> PARTITIONED BY (part)> with(>'connector'='hudi',
>'path'='hdfs://192.168.1.161:8020/flink_cdc_sink_hudi_hive', 
>'table.type'='MERGE_ON_READ',
>'hoodie.datasource.write.recordkey.field'='id', 
>'write.precombine.field'='ts',
>'write.tasks'='1',
>'write.rate.limit'='2000', 
>'compaction.tasks'='1', 
>'compaction.async.enabled'='true',
>'compaction.trigger.strategy'='num_commits',
>'compaction.delta_commits'='1',
>'changelog.enabled'='true',
>'read.streaming.enabled'='true',
>'read.streaming.check-interval'='3',
>'hive_sync.enable'='true',
>'hive_sync.mode'='hms',
>'hive_sync.metastore.uris'='thrift://node02.com:9083',
>'hive_sync.jdbc_url'='jdbc:hive2://node02.com:10000',
>'hive_sync.table'='flink_cdc_sink_hudi_hive',
>'hive_sync.db'='db_hive',
>'hive_sync.username'='root',
>'hive_sync.password'='123456',
>'hive_sync.support_timestamp'='true'>);[INFO] Execute statement succeed.

5.6 . 查询视图数据,插入到输出表

Flink SQL> INSERT INTO flink_cdc_sink_hudi_hive SELECT id, name,age,birthday, ts, part FROM view_source_flinkcdc_mysql ;[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c618c9f528b9793adf4418640bb2a0fc

5.7 查看flink 运行job

在这里插入图片描述

6.hudi 与hive 整合

将hudi hudi-hadoop-mr-bundle-0.10.0.jar 拷贝到hive的lib 目录下面 , 重启hive 服务

6.1 连接hive 查看hudi 同步到hive 中的表

0: jdbc:hive2://node01.com:2181,node02.com:21> show tables;
INFO  : Compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables
INFO  : Semantic Analysis Completed (retrial =false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:tab_name, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.016 seconds
INFO  : Executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f): show tables
INFO  : Starting task [Stage-0:DDL]in serial mode
INFO  : Completed executing command(queryId=hive_20220218000941_016798b7-3ecd-4c41-ae54-65e6a034968f); Time taken: 0.012 seconds
INFO  : OK
+------------------------------+
|           tab_name           |
+------------------------------+
| flink_cdc_sink_hudi_hive_ro  || flink_cdc_sink_hudi_hive_rt  |
+------------------------------+

hive 的两张表

ro类型是读优化查询 , rt 类型快照查询

6.1 查询

0: jdbc:hive2://node01.com:2181,node02.com:21>selectid ,name , age , birthday from flink_cdc_sink_hudi_hive_ro;
INFO  : Compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): selectid ,name , age , birthday from flink_cdc_sink_hudi_hive_ro
INFO  : Semantic Analysis Completed (retrial =false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:id, type:bigint, comment:null), FieldSchema(name:name, type:string, comment:null), FieldSchema(name:age, type:int, comment:null), FieldSchema(name:birthday, type:bigint, comment:null)], properties:null)
INFO  : Completed compiling command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.124 seconds
INFO  : Executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413): selectid ,name , age , birthday from flink_cdc_sink_hudi_hive_ro
INFO  : Completed executing command(queryId=hive_20220218003353_57a46dca-3cd2-4da1-b455-bbd63da16413); Time taken: 0.029 seconds
INFO  : OK
+-----+--------+------+----------------+
|id|  name  | age  |    birthday    |
+-----+--------+------+----------------+
|1| flink  |18|1645142397000|
+-----+--------+------+----------------+
1 row selected (0.278 seconds)0: jdbc:hive2://node01.com:2181,node02.com:21>

整体效果
在这里插入图片描述

错误 中途遇到一个错误

flinkcdc 需要的 flink-connector-mysql-cdc-2.0.2.jar 而不是 flink-sql-connector-mysql-cdc-2.0.2.jar 这个包
否在会遇到一下错误:

Flink SQL>select * from users_source_mysql;

Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue.
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:201)
    at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161)
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/connect/data/Schema
    at java.lang.Class.getDeclaredMethods0(Native Method)
    at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
    at java.lang.Class.getDeclaredMethod(Class.java:2128)
    at java.io.ObjectStreamClass.getPrivateMethod(ObjectStreamClass.java:1629)
    at java.io.ObjectStreamClass.access$1700(ObjectStreamClass.java:79)
    at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:520)
    at java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.io.ObjectStreamClass.<init>(ObjectStreamClass.java:494)
    at java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1134)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
    at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:597)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:457)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:378)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:179)
    at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:117)
    at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:934)
    at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
    at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
    at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
    at org.apache.flink.client.deployment.executors.AbstractSessionClusterExecutor.execute(AbstractSessionClusterExecutor.java:67)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1957)
    at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeQueryOperation(TableEnvironmentImpl.java:795)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:1225)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$executeOperation$3(LocalExecutor.java:213)
    at org.apache.flink.table.client.gateway.context.ExecutionContext.wrapClassLoader(ExecutionContext.java:90)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeOperation(LocalExecutor.java:213)
    at org.apache.flink.table.client.gateway.local.LocalExecutor.executeQuery(LocalExecutor.java:235)
    at org.apache.flink.table.client.cli.CliClient.callSelect(CliClient.java:479)
    at org.apache.flink.table.client.cli.CliClient.callOperation(CliClient.java:412)
    at org.apache.flink.table.client.cli.CliClient.lambda$executeStatement$0(CliClient.java:327)
    at java.util.Optional.ifPresent(Optional.java:159)
    at org.apache.flink.table.client.cli.CliClient.executeStatement(CliClient.java:327)
    at org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:297)
    at org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:221)
    at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151)
    at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95)
    at org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187)... 1more
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.connect.data.Schema
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)... 69more

Shutting down the session...
done.
[root@node01 bin]# 
标签: hive flink kafka

本文转载自: https://blog.csdn.net/wudonglianga/article/details/122954751
版权归原作者 wudl5566 所有, 如有侵权,请联系我们删除。

“flink cdc 整合 数据湖hudi 同步 hive”的评论:

还没有评论