0


Flink CDC和Flink SQL构建实时数仓Flink写入Doris

软件环境 Flink1.13.3

Scala 2.12

doris 0.14

一、MySQL 开启binlog日志、创建用户

1.开启bin log

MySQL 8.0默认开启了binlog,可以通过代码show variables like "%log_bin%";查询是否开启了,show variables like "%server_id%";查询服务器ID。

上图分别显示了bin long是否开启以及bin log所在的位置。

2.创建用户

CREATE USER 'flinktest' IDENTIFIED BY '123456';

GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'flinktest';

如果遇到报错:

Your password does not satisfy the current policy requirements

Mysql8版本输入

set global validate_password.policy=0; set global validate_password.length=6;

如果是mysql5.6版本 set global validate_password_policy=LOW;set global validate_password_length=6;

二、添加依赖

到仓库服务或者这里下载 cdc依赖flink-connector-mysql-cdc-2.0.2.jar 添加到$FLINK_HOME/lib下面

这里一定要注意一下cdc和flink版本的匹配关系,否则执行SQL的时候会报错:

[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;

具体如下表:
Flink CDC Connector Version****Flink Version1.0.01.11.*1.1.01.11.*1.2.01.12.*1.3.01.12.*1.4.01.13.*2.0.1.13.

三、建表

1.MySQL建表:

CREATE TABLE flink_test(id INT ,name VARCHAR(20));

2.Doris建表

2.1启动doris

不懂如何启动可以看这里

2.2Flink连接doris驱动

Flink连接doris需要flink-doris-connector包,如果你懒得编译,可以从这边下载,下面的编译步骤就免了。

驱动编译过程:

首先到Doris官网把整个项目下载下来,然后解压
unzip incubator-doris-master.zip
cd incubator-doris-master/extension/flink-doris-connector
./build.sh

如果遇到报错./build.sh: Permission denied 那就修改权限 chmod 777 build.sh

如果遇到报错./build.sh: line 43: mvn: command not found
Error: mvn is not found 那就安装一下maven可以看到这里

等到N久之后,然后你可能遇到报错,无力吐槽啊:

[ERROR] thrift failed output:
[ERROR] thrift failed error: /bin/sh: /opt/pkg/incubator-doris-master/extension/flink-doris-connector/../../thirdparty/installed/bin/thrift: No such file or directory
[INFO] BUILD FAILURE
[ERROR] Failed to execute goal org.apache.thrift.tools:maven-thrift-plugin:0.1.11:compile (thrift-sources) on project doris-flink: thrift did not exit cleanly. Review output for more information. -> [Help 1]

好吧,那就安装thrift咯。安装过程中可能有报C++错误configure: No compiler with C++11 support was found,那就yum install -y gcc gcc-c++安装一下
#版本别太新哈0.93就行,不然可能报错
1.下载
wget http://mirrors.cnnic.cn/apache/thrift/0.9.3/thrift-0.9.3.tar.gz
或者wget http://archive.apache.org/dist/thrift/0.9.3/
2.解压编译
tar -zxf thrift-0.9.3.tar.gz
cd thrift-0.9.3
./configure --with-lua=no && make && make install
3.验证
thrift -version
4.把thrift复制到thirdparty/installed/bin 目录下,目录如果不存在需要手工创建
cp /usr/local/bin/thrift /opt/pkg/incubator-doris-master/thirdparty/installed/bin

又等待N久,继续执行./build.sh

注意,默认flink版本是1.12版本,如果是1.13版本,需要修改incubator-doris-master/extension/flink-doris-connector下面的pom.xml把property修改一下

虽然短短几行代码,但是踩坑了不少,等待时间又很久,如果有人不想编译,可以到这边下载我编译好的。注意我这个flink版本是1.13.3,scala版本是2.12哈。

2.3 Doris建表

mysql -h 172.16.37.29 -P 9030 -uroot

CREATE TABLE test_cnt
(
    id int,
    name varchar(50)
)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "1",
"in_memory" = "false",
"storage_format" = "V2");

3.启动flink并建表

3.1启动fink

在$FLINK_HOME/bin目录输入./start-cluster.sh

3.2 flink建表

输入./sql-client.sh embedded启动FLINK SQL客户端,FLINK SQL有表模式,日志变更模式和Tableau模式,本次采用表模式,所以启动之后输入 SET sql-client.execution.result-mode=table;
创建mysql source:

CREATE TABLE flink_mysql_src(
id INT NOT NULL,
name STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '139.xxx.xx.xx',
'port' = '3306',
'username' = 'xxxx',
'password' = 'xxx',
'database-name' = 'xx',
'table-name' = 'flink_test',
'scan.incremental.snapshot.enabled' = 'false'
);

注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加'scan.incremental.snapshot.enabled' = 'false'否则会报错:

[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'

看一下是否能监控到MySQL数据:

在mysql中输入
insert into flink_test values(1,'a');
insert into flink_test values(2,'b');
insert into flink_test values(3,'c');
在flink sql输入:
select * from flink_mysql_src;

可以看到结果已经输出到flink控制台了,说明flink到mysql这端数据传输是OK的

如果遇到报错:ClassNotFoundException: com.ververica.cdc.debezium.DebeziumSourceFunction那就把flink-connector-debezium-2.0.2.jar也放到lib目录下面

创建doris sink:
CREATE TABLE flink_doris_sink (
id int,
name string
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'zh.test_cnt',
'sink.batch.size'='2',
'username' = 'root',
'password'=''
);

select * from flink_doris_sink看看有没有报错。

如果报错[ERROR] Could not execute SQL statement. Reason:
java.lang.RuntimeException: can not fetch partitions 说明数据库不存在或者表不存在,注意看建表语句。

如果报错[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.doris.flink.table.DorisRowDataInputForma 说明doris-flink-1.0-SNAPSHOT.jar编译有问题,看看自己版本对不对,不对重新改一下pom重新编译

四、实践

1.flink执行任务

INSERT INTO flink_doris_sink
SELECT id,name
FROM flink_mysql_src;
可以到flink网页端看到任务的情况了

2.往mysql插入数据

insert into flink_test values(1,'a');
在doris 中查询,发现数据已经过来了

3.变更数据

在mysql中执行update flink_test set name='tests' where id=1

在doris中查询发现数据已经变更了,不过变成了两条记录,flink_doris_connector暂时不支持删除,据说后续版本会更新,那就期待一下吧。

注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加'scan.incremental.snapshot.enabled' = 'false'否则会报错:

标签: flink sql java

本文转载自: https://blog.csdn.net/qq_39177151/article/details/131081541
版权归原作者 qq_39177151 所有, 如有侵权,请联系我们删除。

“Flink CDC和Flink SQL构建实时数仓Flink写入Doris”的评论:

还没有评论