FlinkCDC 同步Mysql到Doris
1.安装Flink
下载 Flink 1.18.0,下载后把压缩包上传到服务器,使用
tar -zxvf flink-xxx-bin-scala_2.12.tgz
解压后得到
flink-1.18.0
目录
cd flink-1.18.1
然后需要配置FLINK_HOME ,执行
vi /etc/profile
,增加如下内容
exportFLINK_HOME=/root/flink/flink-1.18.1 #你的安装目录exportPATH=$PATH:$FLINK_HOME/bin
执行:source /etc/profile 让其生效,然后通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。
execution.checkpointing.interval: 3000
使用下面的命令启动 Flink 集群,
./bin/start-cluster.sh
启动成功的话,可以在 http://localhost:8081/访问到 Flink Web UI,多次执行 start-cluster.sh 可以拉起多个 TaskManager。如下所示:
访问之前记得开放防火墙端口
firewall-cmd --zone=public --add-port=8081/tcp --permanent;
firewall-cmd --zone=public --add-port=8030/tcp --permanent;
firewall-cmd --zone=public --add-port=8040/tcp --permanent;
firewall-cmd --zone=public --add-port=9030/tcp --permanent;
firewall-cmd --reload ;
2.准备同步的数据库
准备好Mysql数据库,创建数据库 app_db 和表 orders,products,shipments,并插入数据
-- 创建数据库CREATEDATABASE app_db;USE app_db;-- 创建 orders 表CREATETABLE`orders`(`id`INTNOTNULL,`price`DECIMAL(10,2)NOTNULL,PRIMARYKEY(`id`));-- 插入数据INSERTINTO`orders`(`id`,`price`)VALUES(1,4.00);INSERTINTO`orders`(`id`,`price`)VALUES(2,100.00);-- 创建 shipments 表CREATETABLE`shipments`(`id`INTNOTNULL,`city`VARCHAR(255)NOTNULL,PRIMARYKEY(`id`));-- 插入数据INSERTINTO`shipments`(`id`,`city`)VALUES(1,'beijing');INSERTINTO`shipments`(`id`,`city`)VALUES(2,'xian');-- 创建 products 表CREATETABLE`products`(`id`INTNOTNULL,`product`VARCHAR(255)NOTNULL,PRIMARYKEY(`id`));-- 插入数据INSERTINTO`products`(`id`,`product`)VALUES(1,'Beer');INSERTINTO`products`(`id`,`product`)VALUES(2,'Cap');INSERTINTO`products`(`id`,`product`)VALUES(3,'Peanut');
给doris创建数据库,通过 Web UI 创建 app_db 数据库 :
create database app_db;
3.安装FlinkCDC
下载 flink cdc-3.0.0 的二进制压缩包 flink-cdc-3.0.0-bin.tar.gz,并解压得到目录 flink cdc-3.0.0 ':. flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录
然后把下面2个 connector 包,并且移动到 lib 目录下
- MySQL pipeline connector 3.0.0 : mysql的驱动
- Apache Doris pipeline connector 3.0.0 : doris的驱动
编写任务配置 yaml 文件 文件可以放到config目录下。 下面给出了一个整库同步的示例文件 mysql-to-doris.yaml,
################################################################################# Description: Sync MySQL all tables to Doris################################################################################
source:
type: mysql
hostname: 192.168.220.253
port: 3307
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
username: root
password: 123456
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
pipeline:
name: Sync MySQL Database to Doris
parallelism: 1
其中: source 中的 tables: app_db..* 通过正则匹配同步 app_db 下的所有表。 sink 添加table.create.properties.replication_num :1 参数是由于 只有一个 Doris BE 节点。
最后,进入到 flink-cdc-3.0.0 目录,通过命令行提交任务到 Flink Standalone cluster :
bash bin/flink-cdc.sh mysql-to-doris.yaml
[root@localhost flink-cdc-3.0.0]# bash bin/flink-cdc.sh conf/mysql-to-doris.yaml
Pipeline has been submitted to cluster.
Job ID: 13e2925fd46e5840243c9523cd093e11
Job Description: Sync MySQL Database to Doris
执行之后查看flink的控制台界面 : 访问 8081端口
点击 Job Name 进入任务,可以看到同步的情况,还可以查看任务日志如下
登录doris的控制台,查看数据是否同步进去,访问:8030端口
当我们修改了Mysql中的数据后就会自动同步到Doris
4.表结构同步
Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。 下面提供一个配置文件说明:
################################################################################# Description: Sync MySQL all tables to Doris################################################################################
source:
type: mysql
hostname: localhost
port: 3306
username: root
password: 123456
tables: app_db.\.*
server-id: 5400-5404
server-time-zone: UTC
sink:
type: doris
fenodes: 127.0.0.1:8030
benodes: 127.0.0.1:8040
username: root
password: ""
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
route:
- source-table: app_db.orders
sink-table: ods_db.ods_orders
- source-table: app_db.shipments
sink-table: ods_db.ods_shipments
- source-table: app_db.products
sink-table: ods_db.ods_products
pipeline:
name: Sync MySQL Database to Doris
parallelism: 1
通过上面的 route 配置,会将 app_db.orders 表的结构和数据同步到 ods_db.ods_orders 中。从而实现数据库迁移的功能。 特别地,source-table 支持正则表达式匹配多表,从而实现分库分表同步的功能,例如下面的配置:
route:
- source-table: app_db.order\.*
sink-table: ods_db.ods_orders
这样,就可以将诸如 app_db.order01、app_db.order02、app_db.order03 的表汇总到 ods_db.ods_orders 中。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。
文章到这就结束了 ,如果对你有帮助请给个好评
版权归原作者 墨家巨子@俏如来 所有, 如有侵权,请联系我们删除。