1. 创建MySQL库表,写入demo数据
- 登录测试MySQL
mysql -u root -pnew_password
- 创建MySQL库表,写入demo数据
CREATE DATABASE emp_1;
USE emp_1;
CREATE TABLE employees_1 (
emp_no INT NOT NULL,
birth_date DATE NOT NULL,
first_name VARCHAR(14) NOT NULL,
last_name VARCHAR(16) NOT NULL,
gender ENUM ('M','F') NOT NULL,
hire_date DATE NOT NULL,
PRIMARY KEY (emp_no));
INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10036,'1959-08-10','Adamantios','Portugali','M','1992-01-03');
注意:MySQL需要开通bin-log
- log_bin=mysql_bin
- binlog-format=Row
- server-id=1
2. 创建Doris库表
- 创建Doris表
mysql -uroot-P9030-h127.0.0.1
create database demo;
use demo;
CREATE TABLE all_employees_info (
emp_no int NOT NULL,
birth_date date,
first_name varchar(20),
last_name varchar(20),
gender char(2),
hire_date date)
UNIQUE KEY(`emp_no`, `birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES ("replication_allocation"="tag.location.default: 1");
3. 启动Flink
- 启动flink
cd /mnt/apps/flink-1.15.3/
#启动flink,这里服务已经启动
bin/start-cluster.sh
#进入SQL控制台
bin/sql-client.sh embedded
- 创建Flink 任务:
SET 'execution.checkpointing.interval'='10s';
CREATE TABLE employees_source (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
emp_no int NOT NULL,
birth_date date,
first_name STRING,
last_name STRING,
gender STRING,
hire_date date,
PRIMARY KEY (`emp_no`) NOT ENFORCED
) WITH ('connector'='mysql-cdc',
'hostname'='localhost',
'port'='3306',
'username'='root',
'password'='new_password',
'database-name'='emp_1',
'table-name'='employees_1');
CREATE TABLE cdc_doris_sink (
emp_no int ,
birth_date STRING,
first_name STRING,
last_name STRING,
gender STRING,
hire_date STRING
)
WITH ('connector'='doris',
'fenodes'='172.16.64.9:8030',
'table.identifier'='demo.all_employees_info',
'username'='root',
'password'='',
'sink.properties.two_phase_commit'='true',
'sink.label-prefix'='doris_demo_emp_002');
insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date)select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date from employees_source;
- 输入如下地址,查看flink任务 http://localhost:8081/#/job/running
- 数据验证:启动后可以看到有数据实时进入Doris了
mysql -uroot-P9030-h127.0.0.1
mysql>select * from all_employees_info;
+--------+------------+------------+-----------+--------+------------+
| emp_no | birth_date | first_name | last_name | gender | hire_date |
+--------+------------+------------+-----------+--------+------------+
|10001|1953-09-02 | Georgi | Facello | M |1986-06-26 ||10002|1964-06-02 | Bezalel | Simmel | F |1985-11-21 ||10036|1959-08-10 | Adamantios | Portugali | M |1992-01-03 ||20001|1953-09-02 | Georgi | Facello | M |1986-06-26 |
+--------+------------+------------+-----------+--------+------------+
4 rows inset(0.02 sec)
Link
- https://zhuanlan.zhihu.com/p/532913664
- https://www.runoob.com/mysql/mysql-install.html
- https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.15/1.2.1/
- https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/
Jar包地址:
flink 环境:1.15.3
- https://dlcdn.apache.org/flink/flink-1.15.3/flink-1.15.3-bin-scala_2.12.tgz 解压并将jar包防止在Flink 的lib下 flink-doris-connector:1.15
- https://repo.maven.apache.org/maven2/org/apache/doris/flink-doris-connector-1.15/1.2.1/flink-doris-connector-1.15-1.2.1.jar cdc mysql:flink-sql-connector-mysql-cdc-2.2.1.jar
- https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
本文转载自: https://blog.csdn.net/wangleigiser/article/details/129041505
版权归原作者 wangleigiser 所有, 如有侵权,请联系我们删除。
版权归原作者 wangleigiser 所有, 如有侵权,请联系我们删除。