0


Flink-cdc 同步mysql数据

下载地址:https://github.com/ververica/flink-cdc-connectors/releases

这里下载2.2.0版本:https://github.com/ververica/flink-cdc-connectors/archive/refs/tags/release-2.2.0.zip

下载完成后,在 pom.xml 中找到这一项:flink.version ,修改 flink 版本号为:

<flink.version>1.13.6</flink.version>

自行打包编译

通过flink-cdc 同步mysql数据

1、flink集群准备

wget http://mirrors.cloud.tencent.com/apache/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.11.tgz
tar zxvf flink-1.13.6-bin-scala_2.11.tgz

将打包好的 flink-sql-connector-mysql-cdc-2.2-SNAPSHOT.jar 包放入到flink的lib目录下
启动集群

cd flink-1.13.6
bin/start-cluster.sh

2、mysql环境准备

CREATEDATABASE mydb;USE mydb;CREATETABLE products (
       id INTEGERNOTNULLAUTO_INCREMENTPRIMARYKEY,
       name VARCHAR(255)NOTNULL,
       description VARCHAR(512));ALTERTABLE products AUTO_INCREMENT=101;INSERTINTO products
     VALUES(default,"scooter","Small 2-wheel scooter"),(default,"car battery","12V car battery"),(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),(default,"hammer","12oz carpenter's hammer"),(default,"hammer","14oz carpenter's hammer"),(default,"hammer","16oz carpenter's hammer"),(default,"rocks","box of assorted rocks"),(default,"jacket","water resistent black wind breaker"),(default,"spare tire","24 inch spare tire");

3、启动flinksql client

cd /opt/flink-1.13.6
bin/sql-client.sh

4、在flinksql client中执行命令

FlinkSQL>SET execution.checkpointing.interval =3s
FlinkSQL>CREATETABLE products (
     id INT,
     name STRING,
     description STRING,PRIMARYKEY(id)NOTENFORCED)WITH(
     'connector' = 'mysql-cdc',
     'hostname' = '自己的ip地址','port'='3306',
     'username' = 'root',
     'password' = '密码',
     'database-name' = 'mydb',
     'table-name' = 'products'
   );FlinkSQL> select * from products;

5、在 MySQL 客户端继续插入数据

INSERTINTO products VALUES(default,"scooter1","Small 2-wheel scooter");INSERTINTO products VALUES(default,"scooter2","Small 2-wheel scooter");INSERTINTO products VALUES(default,"scooter3","Small 2-wheel scooter");INSERTINTO products VALUES(default,"scooter4","Small 2-wheel scooter");

4、在flinksql client中查看数据

select * from products;

可以查看到数据变化

标签: mysql flink flink-cdc

本文转载自: https://blog.csdn.net/weixin_47491957/article/details/126103679
版权归原作者 四维大脑 所有, 如有侵权,请联系我们删除。

“Flink-cdc 同步mysql数据”的评论:

还没有评论