软件环境 Flink1.13.3
Scala 2.12
doris 0.14
一、MySQL 开启binlog日志、创建用户
1.开启bin log
MySQL 8.0默认开启了binlog,可以通过代码show variables like "%log_bin%";查询是否开启了,show variables like "%server_id%";查询服务器ID。
上图分别显示了bin long是否开启以及bin log所在的位置。
2.创建用户
CREATE USER 'flinktest' IDENTIFIED BY '123456';
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'flinktest';
如果遇到报错:
Your password does not satisfy the current policy requirements
Mysql8版本输入
set global validate_password.policy=0; set global validate_password.length=6;
如果是mysql5.6版本 set global validate_password_policy=LOW;set global validate_password_length=6;
二、添加依赖
到仓库服务或者这里下载 cdc依赖flink-connector-mysql-cdc-2.0.2.jar 添加到$FLINK_HOME/lib下面
这里一定要注意一下cdc和flink版本的匹配关系,否则执行SQL的时候会报错:
[ERROR] Could not execute SQL statement. Reason:
java.lang.NoSuchMethodError: org.apache.flink.table.factories.DynamicTableFactory$Context.getCatalogTable()Lorg/apache/flink/table/catalog/ResolvedCatalogTable;
具体如下表:
Flink CDC Connector Version****Flink Version1.0.01.11.*1.1.01.11.*1.2.01.12.*1.3.01.12.*1.4.01.13.*2.0.1.13.
三、建表
1.MySQL建表:
CREATE TABLE flink_test(id INT ,name VARCHAR(20));
2.Doris建表
2.1启动doris
不懂如何启动可以看这里
2.2Flink连接doris驱动
Flink连接doris需要flink-doris-connector包,如果你懒得编译,可以从这边下载,下面的编译步骤就免了。
驱动编译过程:
首先到Doris官网把整个项目下载下来,然后解压
unzip incubator-doris-master.zip
cd incubator-doris-master/extension/flink-doris-connector
./build.sh
如果遇到报错./build.sh: Permission denied 那就修改权限 chmod 777 build.sh
如果遇到报错./build.sh: line 43: mvn: command not found
Error: mvn is not found 那就安装一下maven可以看到这里
等到N久之后,然后你可能遇到报错,无力吐槽啊:
[ERROR] thrift failed output:
[ERROR] thrift failed error: /bin/sh: /opt/pkg/incubator-doris-master/extension/flink-doris-connector/../../thirdparty/installed/bin/thrift: No such file or directory
[INFO] BUILD FAILURE
[ERROR] Failed to execute goal org.apache.thrift.tools:maven-thrift-plugin:0.1.11:compile (thrift-sources) on project doris-flink: thrift did not exit cleanly. Review output for more information. -> [Help 1]
好吧,那就安装thrift咯。安装过程中可能有报C++错误configure: No compiler with C++11 support was found,那就yum install -y gcc gcc-c++安装一下
#版本别太新哈0.93就行,不然可能报错
1.下载
wget http://mirrors.cnnic.cn/apache/thrift/0.9.3/thrift-0.9.3.tar.gz
或者wget http://archive.apache.org/dist/thrift/0.9.3/
2.解压编译
tar -zxf thrift-0.9.3.tar.gz
cd thrift-0.9.3
./configure --with-lua=no && make && make install
3.验证
thrift -version
4.把thrift复制到thirdparty/installed/bin 目录下,目录如果不存在需要手工创建
cp /usr/local/bin/thrift /opt/pkg/incubator-doris-master/thirdparty/installed/bin
又等待N久,继续执行./build.sh
注意,默认flink版本是1.12版本,如果是1.13版本,需要修改incubator-doris-master/extension/flink-doris-connector下面的pom.xml把property修改一下
虽然短短几行代码,但是踩坑了不少,等待时间又很久,如果有人不想编译,可以到这边下载我编译好的。注意我这个flink版本是1.13.3,scala版本是2.12哈。
2.3 Doris建表
mysql -h 172.16.37.29 -P 9030 -uroot
CREATE TABLE test_cnt
(
id int,
name varchar(50)
)
DISTRIBUTED BY HASH(id) BUCKETS 10
PROPERTIES("replication_num" = "1",
"in_memory" = "false",
"storage_format" = "V2");
3.启动flink并建表
3.1启动fink
在$FLINK_HOME/bin目录输入./start-cluster.sh
3.2 flink建表
输入./sql-client.sh embedded启动FLINK SQL客户端,FLINK SQL有表模式,日志变更模式和Tableau模式,本次采用表模式,所以启动之后输入 SET sql-client.execution.result-mode=table;
创建mysql source:
CREATE TABLE flink_mysql_src(
id INT NOT NULL,
name STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = '139.xxx.xx.xx',
'port' = '3306',
'username' = 'xxxx',
'password' = 'xxx',
'database-name' = 'xx',
'table-name' = 'flink_test',
'scan.incremental.snapshot.enabled' = 'false'
);
注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加'scan.incremental.snapshot.enabled' = 'false'否则会报错:
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true'
看一下是否能监控到MySQL数据:
在mysql中输入
insert into flink_test values(1,'a');
insert into flink_test values(2,'b');
insert into flink_test values(3,'c');
在flink sql输入:
select * from flink_mysql_src;
可以看到结果已经输出到flink控制台了,说明flink到mysql这端数据传输是OK的
如果遇到报错:ClassNotFoundException: com.ververica.cdc.debezium.DebeziumSourceFunction那就把flink-connector-debezium-2.0.2.jar也放到lib目录下面
创建doris sink:
CREATE TABLE flink_doris_sink (
id int,
name string
)
WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'zh.test_cnt',
'sink.batch.size'='2',
'username' = 'root',
'password'=''
);
select * from flink_doris_sink看看有没有报错。
如果报错[ERROR] Could not execute SQL statement. Reason:
java.lang.RuntimeException: can not fetch partitions 说明数据库不存在或者表不存在,注意看建表语句。
如果报错[ERROR] Could not execute SQL statement. Reason: java.lang.ClassNotFoundException: org.apache.doris.flink.table.DorisRowDataInputForma 说明doris-flink-1.0-SNAPSHOT.jar编译有问题,看看自己版本对不对,不对重新改一下pom重新编译
四、实践
1.flink执行任务
INSERT INTO flink_doris_sink
SELECT id,name
FROM flink_mysql_src;
可以到flink网页端看到任务的情况了
2.往mysql插入数据
insert into flink_test values(1,'a');
在doris 中查询,发现数据已经过来了
3.变更数据
在mysql中执行update flink_test set name='tests' where id=1
在doris中查询发现数据已经变更了,不过变成了两条记录,flink_doris_connector暂时不支持删除,据说后续版本会更新,那就期待一下吧。
注意,在flink1.13版本支持根据mysql主键多并发读取数据功能,如果mysql没有设置主键,with里面要加'scan.incremental.snapshot.enabled' = 'false'否则会报错:
版权归原作者 qq_39177151 所有, 如有侵权,请联系我们删除。