0


flink sql同步mysql数据表到mysql

在这里插入图片描述

1. 关闭防火墙和selinux

systemctl stop firewalld
systemctl disable firewalld
systemctl status firewalld

2.安装java8

yum list java-1.8* 
 
yum install java-1.8.0-openjdk* -y
 
java -version

3.下载和部署mysql

yum -y installwgetwget https://dev.mysql.com/get/Downloads/MySQL-8.0/mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz
mv mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz /usr/local/
cd /usr/local/
tar xvJf mysql-8.0.28-linux-glibc2.12-x86_64.tar.xz
mv mysql-8.0.28-linux-glibc2.12-x86_64 mysql-8.0
mkdir data
groupadd mysql
chown -R mysql.mysql /usr/local/mysql-8.0
cd mysql-8.0/bin/
mkdir data
./mysqld --user=mysql --basedir=/usr/local/mysql-8.0 --datadir=/usr/local/mysql-8.0/data/ --initialize

圈起来的部分为后面数据库登陆的初始密码

vi /etc/my.cnf
basedir=/usr/local/mysql-8.0/
datadir=/usr/local/mysql-8.0/data/
socket=/tmp/mysql.sock
character-set-server=UTF8MB4
 
cp -a .././support-files/mysql.server /etc/init.d/mysql
chmod +x /etc/init.d/mysql
chkconfig --add mysql
service mysql status
service mysql start
ln -s /usr/local/mysql-8.0/bin/mysql /usr/bin

登陆

mysql -u root -p

在这里插入图片描述
输入之前初始化给的初始密码

修改mysql密码
在这里插入图片描述

登陆
在这里插入图片描述

4.下载部署flink

flink下载地址:

https://www.apache.org/dyn/closer.lua/flink/flink-1.18.1/flink-1.18.1-bin-scala_2.12.tgz

下载flink sql所需驱动

1.    flink-connector-jdbc-3.1.1-1.17.jar 下载地址:https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.1-1.17/  
2.    flink-sql-connector-mysql-cdc-3.0.1.jar 下载地址:https://github.com/ververica/flink-cdc-connectors/releases  
3.    mysql-connector-java-8.0.28.jar 下载地址:https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/8.0.28/  

上传flink压缩包并解压

tar -zxvf flink-1.18.1-bin-scala_2.12.tgz

进入flink的lib目录上传三个依赖

cd flink-1.18.1/lib/

需要修改一下flink配置文件
不然会出现以下情况
在这里插入图片描述

vi flink-1.18.1/conf/flink-conf.yaml

原本是localhost修改为ip
在这里插入图片描述

./flink-1.18.1/bin/start-cluster.sh

访问 192.168.207.193:8081 (默认是8081端口 可在配置文件里修改)
在这里插入图片描述

5.实时同步mysql数据表

数据库先创建一个库,在库里创建表再添加数据

create database ljq;

use ljq

CREATE TABLE players (
player_id INT NOT NULL AUTO_INCREMENT,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id));;
insert into players (player_id,team_id,player_name,height) values 
(1001,1001,'韦德','1.93'),
(1002,1002,'雷吉','1.91'),
(1003,1003,'安德烈','2.11'),
(1004,1004,'索恩','2.16'),
(1005,1005,'兰斯顿','1.88'),
(1006,1006,'格伦','1.98'),
(1007,1007,'伊斯梅尔','1.83'),
(1008,1008,'扎扎','2.11'),
(1009,1009,'乔恩','2.08');
select * from players;

在这里插入图片描述

再创一个表不插入数据

CREATE TABLE players2 (
player_id INT NOT NULL AUTO_INCREMENT,
team_id INT,
player_name VARCHAR(255),
height FLOAT(53),
PRIMARY KEY (player_id));

启动flink-sql

./flink-1.18.1/bin/sql-client.sh embedded

根据需要同步的数据创建源表

CREATE TABLE nbaplayers (
player_id INT,
team_id INT,
player_name VARCHAR,
height FLOAT,
PRIMARY KEY (player_id) NOT ENFORCED
) WITH ('connector'='mysql-cdc',
'hostname'='192.168.207.193',
'port'='3306',
'username'='root',
'password'='linux123',
'database-name'='ljq',
'table-name'='players',
'server-time-zone'='Asia/Shanghai');

在这里插入图片描述

查看表时出现报错!

解决方法:修改为允许所有ip访问
在这里插入图片描述

use mysql;select user,host from user;
update user sethost='%' where user ='root';
flush privileges;select user,host from user;

在这里插入图片描述
在这里插入图片描述

重新查看表 select * from nbaplayers;·
在这里插入图片描述

创建结果表

CREATE TABLE nba (
player_id INT,
team_id INT,
player_name VARCHAR,
height FLOAT,
PRIMARY KEY (player_id) NOT ENFORCED
) WITH ('connector'='jdbc',
'url'='jdbc:mysql://192.168.207.193:3306/ljq?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false&serverTimezone=Asia/Shanghai&useSSL=true&dontTrackOpenResources=true&defaultFetchSize=10000&useCursorFetch=true',
'driver'='com.mysql.cj.jdbc.Driver',
'username'='root',
'password'='linux123',
'table-name'='players2');

在这里插入图片描述

执行从源表插入结果表操作,生成同步作业

INSERT INTO nba
SELECT
player_id,
team_id,
player_name,
height
FROM nbaplayers;

在这里插入图片描述

Web端查看
在这里插入图片描述

查看是否同步数据到players2
在这里插入图片描述

如发现height同步数据有很多位小数点
解决方法:

ALTER TABLE players2 MODIFY height DOUBLE(3,2);

在这里插入图片描述
在这里插入图片描述

测试一下增删改是否同步

增 insert into players (player_id,team_id,player_name,height) values (10001,10002,'韦德2','1.95');

在这里插入图片描述
在这里插入图片描述

删delete from players where player_id =10001;
改update players setplayer_name='姚明' where player_id=1001;

在这里插入图片描述

标签: flink sql mysql

本文转载自: https://blog.csdn.net/jiaqijiaqi666/article/details/143587490
版权归原作者 我是琦琦琦琦 所有, 如有侵权,请联系我们删除。

“flink sql同步mysql数据表到mysql”的评论:

还没有评论