环境准备
IPHostnameRoleRole210.0.0.1flink-01master,wokerJobManager,TaskManager,Dinky10.0.0.2flink-02wokerTaskManager10.0.0.3flink-03wokerTaskManager
安装JDK
yum install java-11-openjdk.x86_64 java-11-openjdk-devel.x86_64 -y
配置hosts
vim /etc/hosts
10.0.0.1 flink-01
10.0.0.2 flink-02
10.0.0.3 flink-03
配置免密
- 在flink-01上面生成公钥和私钥对,按照提示一直按下回车键,直到生成密钥对为止。
ssh-keygen -t rsa
- 将flink-01上生成的公钥复制到flink-01,flink-02,flink-03
ssh-copy-id root@flink-01
ssh-copy-id root@flink-02
ssh-copy-id root@flink-03
1. Flink集群部署
Master节点操作
- 获取Flink安装包
cd /opt && wget https://archive.apache.org/dist/flink/flink-1.15.4/flink-1.15.4-bin-scala_2.12.tgz
- 解压压缩包并创建软链接
tar xf flink-1.15.4-bin-scala_2.12.tgz
ln -s flink-1.15.4 flink
- 修改配置文件flink-conf.yaml
vim /opt/flink/conf/flink-conf.yaml
jobmanager.rpc.address: 10.0.0.1
jobmanager.bind-host: 0.0.0.0
taskmanager.bind-host: 0.0.0.0
taskmanager.host: flink-01
taskmanager.memory.process.size: 14400m #taskmanager可用内存
taskmanager.numberOfTaskSlots: 3 #taskmanager任务数量 一般跟cpu数量相同
rest.address: flink-01
rest.bind-address: 0.0.0.0
- 修改节点文件
vim /opt/flink/conf/masters
10.0.0.1:8081
vim/opt/flink/conf/workers
10.0.0.1
10.0.0.2
10.0.0.3
- 将flink包发送到flink-02,flink-03
scp -r flink-1.15.4/ root@flink-02:/opt
scp -r flink-1.15.4/ root@flink-03:/opt
worker节点操作(flink-02 flink-03同步操作)
- 创建软链接
cd /opt && ln -s flink-1.15.4 flink
- 修改配置文件
taskmanager.host: 10.0.0.2/10.0.0.3 #根据服务器本身ip进行修改
启动flink集群(Master节点进行启动)
cd /opt/flink/bin && ./start-cluster.sh
2. Dinky部署(flink-01)
- 获取Dinky安装包
cd /opt/ && wget https://github.com/DataLinkDC/dlink/releases/download/v0.7.5/dlink-release-0.7.5.tar.gz
- 解压压缩包并创建软链接
tar xf dlink-release-0.7.5.tar.gz && ln -s dlink-release-0.7.5 dinky
- 创建数据库用户
#登录mysql
mysql -h 10.0.0.1 -uroot -proot@123
#创建数据库并授权
mysql> create database dinky;
mysql> grant all privileges on dinky.* to 'dinky'@'%' identified by 'dinky' with grant option;
mysql> flush privileges;
- 初始化数据库
#此处用 dinky 用户登录
mysql -h 10.0.0.1 -udinky -pdinky
mysql> source /opt/dinky/sql/dinky.sql
- 修改配置文件
vim /opt/dinky/config/application.yml
url: jdbc:mysql://${MYSQL_ADDR:10.0.0.1:3306}/${MYSQL_DATABASE:dinky}?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
username: ${MYSQL_USERNAME:dinky}
password: ${MYSQL_PASSWORD:dinky}
- 启动dinky
cd /opt/dinky &&sh auto.sh start
- 访问Web 默认用户密码admin/admin
10.0.0.1:8888
3. 整库同步
- 将Dinky整库同步依赖包放到flink/lib下
cp /opt/dinky/lib/dlink-client-base-0.7.5.jar /opt/flink/lib/
cp /opt/dinky/lib/dlink-common-0.7.5.jar /opt/flink/lib/
cp /opt/dinky/plugins/flink1.15/dinky/dlink-client-1.15-0.7.5.jar /opt/flink/lib/
- 获取其他依赖包到flink/lib下
wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-connector-jdbc/1.15.4/flink-connector-jdbc-1.15.4.jar
wget https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.15/1.4.0/flink-doris-connector-1.15-1.4.0.jar
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4.2/flink-sql-connector-mysql-cdc-2.4.2.jar
- 将flink/lib下面的所有包复制到dinky plugins目录对应的版本下
cd /opt/flink/lib &&cp * /opt/dinky/plugins/flink1.15/
- 重启flink
cd /opt/flink/bin && ./stop-cluster.sh
./start-cluster.sh
- 重启dinky
cd /opt/dinky && sh auto.sh stop
sh auto.sh start 1.15
操作web界面
访问10.0.0.1:8888 默认密admin/admin
- 添加注册中心
- 创建整库同步作业
- 编辑整库作业 由于bug问题 要同步到Doris库的表 需要先手动在Doris中创建好
EXECUTE CDCSOURCE test WITH (
# mysql 相关配置
'connector' = 'mysql-cdc',
'hostname' = '10.0.0.1',
'port' = '3306',
'username' = 'root',
'password' = 'root@mysql',
'checkpoint' = '10000',
'table.local-time-zone' = 'Asia/Shanghai',
'scan.startup.mode' = 'initial',
'parallelism' = '1',
'table-name' = 'test.\test',
# Doris相关配置
'sink.connector' = 'datastream-doris-schema-evolution',
'sink.fenodes' = '10.0.0.1:8030,10.0.0.2:8030,10.0.0.3:8030',
'sink.username' = 'root',
'sink.password' = 'root@doris',
'sink.doris.batch.size' = '1000',
'sink.sink.max-retries' = '1',
'sink.sink.batch.interval' = '60000',
'sink.sink.db' = 'ods',
'sink.table.prefix' = 'ods_',
'sink.table.identifier' = '${schemaName}.${tableName}'
);
- 选择模式跟集群后开始任务
版权归原作者 Logout: 所有, 如有侵权,请联系我们删除。