0


Doris通过Flink CDC接入MySQL实战

1. 创建MySQL库表,写入demo数据

  1. 登录测试MySQL
 mysql -u root -pnew_password
  1. 创建MySQL库表,写入demo数据
CREATE DATABASE emp_1;
 USE emp_1;
CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no));

INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10036,'1959-08-10','Adamantios','Portugali','M','1992-01-03');

注意:MySQL需要开通bin-log

  • log_bin=mysql_bin
  • binlog-format=Row
  • server-id=1

2. 创建Doris库表

  1. 创建Doris表
mysql -uroot-P9030-h127.0.0.1
create database demo;
use demo;
CREATE TABLE all_employees_info (
    emp_no       int NOT NULL,
    birth_date   date,
    first_name   varchar(20),
    last_name    varchar(20),
    gender       char(2),
    hire_date    date)
UNIQUE KEY(`emp_no`, `birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES ("replication_allocation"="tag.location.default: 1");

3. 启动Flink

  1. 启动flink
cd /mnt/apps/flink-1.15.3/ 
#启动flink,这里服务已经启动
bin/start-cluster.sh 
#进入SQL控制台
bin/sql-client.sh embedded
  1. 创建Flink 任务:
SET 'execution.checkpointing.interval'='10s';

CREATE TABLE employees_source (
    database_name STRING METADATA VIRTUAL,
    table_name STRING METADATA VIRTUAL,
    emp_no int NOT NULL,
    birth_date date,
    first_name STRING,
    last_name STRING,
    gender STRING,
    hire_date date,
    PRIMARY KEY (`emp_no`) NOT ENFORCED
  ) WITH ('connector'='mysql-cdc',
    'hostname'='localhost',
    'port'='3306',
    'username'='root',
    'password'='new_password',
    'database-name'='emp_1',
    'table-name'='employees_1');

CREATE TABLE cdc_doris_sink (
    emp_no       int ,
    birth_date   STRING,
    first_name   STRING,
    last_name    STRING,
    gender       STRING,
    hire_date    STRING
) 
WITH ('connector'='doris',
  'fenodes'='172.16.64.9:8030',
  'table.identifier'='demo.all_employees_info',
  'username'='root',
  'password'='',
  'sink.properties.two_phase_commit'='true',
  'sink.label-prefix'='doris_demo_emp_002');

insert into cdc_doris_sink (emp_no,birth_date,first_name,last_name,gender,hire_date)select emp_no,cast(birth_date as string) as birth_date ,first_name,last_name,gender,cast(hire_date as string) as hire_date  from employees_source;
  1. 输入如下地址,查看flink任务 http://localhost:8081/#/job/running
  2. 数据验证:启动后可以看到有数据实时进入Doris了
mysql -uroot-P9030-h127.0.0.1
mysql>select * from all_employees_info;
+--------+------------+------------+-----------+--------+------------+
| emp_no | birth_date | first_name | last_name | gender | hire_date  |
+--------+------------+------------+-----------+--------+------------+
|10001|1953-09-02 | Georgi     | Facello   | M      |1986-06-26 ||10002|1964-06-02 | Bezalel    | Simmel    | F      |1985-11-21 ||10036|1959-08-10 | Adamantios | Portugali | M      |1992-01-03 ||20001|1953-09-02 | Georgi     | Facello   | M      |1986-06-26 |
+--------+------------+------------+-----------+--------+------------+
4 rows inset(0.02 sec)

Link

Jar包地址:

flink 环境:1.15.3

标签: mysql flink 数据库

本文转载自: https://blog.csdn.net/wangleigiser/article/details/129041505
版权归原作者 wangleigiser 所有, 如有侵权,请联系我们删除。

“Doris通过Flink CDC接入MySQL实战”的评论:

还没有评论