0


Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql

环境说明:

flink 1.15.2

Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production

mysql 版本:5.7

windows11 IDEA 本地运行

先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation

  1. Oracle 开启 log archiving

(1).启用 log archiving
a:以DBA用户连接数据库
sqlplus / as sysdba
b:启用 log archiving (会重启数据库)
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
c:检查 log archiving 是否开启 -- Should now "Database log mode: Archive Mode"
archive log list;

(2).注:必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的之前状态。下面演示了如何在表/数据库级别上配置它。
     为一个特定的表启用补充日志记录:修改表目录。客户添加补充日志数据(所有)列;
         ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

    —为数据库启用补充日志修改数据库添加补充日志数据;
         ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

(3).创建具有权限的Oracle用户
     a:创建表空间
         sqlplus / as sysdba
           CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
           exit;
       b:创建用户并赋权  flinkuser  flinkpw 
         sqlplus / as sysdba
           CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
           GRANT CREATE SESSION TO flinkuser;
           GRANT SET CONTAINER TO flinkuser;
           GRANT SELECT ON V_$DATABASE to flinkuser;
           GRANT FLASHBACK ANY TABLE TO flinkuser;
           GRANT SELECT ANY TABLE TO flinkuser;
           GRANT SELECT_CATALOG_ROLE TO flinkuser;
           GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
           GRANT SELECT ANY TRANSACTION TO flinkuser;
           GRANT LOGMINING TO flinkuser;
         
           GRANT CREATE TABLE TO flinkuser;
           GRANT LOCK ANY TABLE TO flinkuser;
           GRANT ALTER ANY TABLE TO flinkuser;
           GRANT CREATE SEQUENCE TO flinkuser;
         
           GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
           GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
         
           GRANT SELECT ON V_$LOG TO flinkuser;
           GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
           GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
           GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
           GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
           GRANT SELECT ON V_$LOGFILE TO flinkuser;
           GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
           GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
           exit;
  1. Oracle 建表,并配置补充日志

CREATE TABLE "USER_INFO" (
ID NUMBER,
USERNAME VARCHAR2(255),
PASSWORD VARCHAR2(255),
PRIMARY KEY (ID));

ALTER TABLE USER_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

  1. Mysql 建表

CREATE TABLE user_new (
id int(11) NOT NULL,
username varchar(255) DEFAULT NULL,
password varchar(255) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

4.Maven依赖

 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.15.2</flink.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.12</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
            <!--            此标签会移除jar包,当需要打包到集群运行时加上此标签-->
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.29</version>
            <!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-mysql-cdc</artifactId>
            <version>2.3.0</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>1.15.2</version>
            <!--            <scope>provided</scope>-->
            <!--            此标签会移除jar包,当需要打包到集群运行时加上此标签-->
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oracle-cdc -->
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-sql-connector-oracle-cdc</artifactId>
            <version>2.3.0</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>2.12.1</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.15</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.17.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>2.17.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/log4j/log4j -->
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.9</version>
        </dependency>
    </dependencies>

5.demo如下:


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class OracleCdcToMysql {

    public static void main(String[] args) {

        //1.获取stream的执行环境
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.setParallelism(1);
        //2.创建表执行环境
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);

        String sourceTable = "CREATE TABLE oracle_cdc_source " +
                "( ID INT, " +
                "USERNAME STRING, " +
                "PASSWORD STRING, " +
                "PRIMARY KEY(ID) NOT ENFORCED) WITH (\n" +
                "'connector' = 'oracle-cdc',\n" +
                "'hostname' = '1.1.1.1',\n" +
                "'port' = '1521',\n" +
                "'username' = 'flinkcdcuser',\n" +
                "'password' = 'flinkpw',\n" +
                "'database-name' = 'LMDB',\n" +//select name from v$database;
                "'schema-name' = 'TEST',\n" +//select SYS_CONTEXT('USERENV','CURRENT_SCHEMA') CURRENT_SCHEMA from dual;
                "'debezium.snapshot.mode' = 'schema_only',\n" +
                //snapshot.mode = initial 快照包括捕获表的结构和数据。指定此值将用捕获表中数据的完整表示填充主题。
                //snapshot.mode = schema_only 快照只包含捕获表的结构。如果希望连接器仅捕获快照之后发生的更改的数据,请指定此值。
                "'scan.incremental.snapshot.enabled' = 'true',\n" +
                //scan.incremental.snapshot.enabled 增量快照是一种读取表快照的新机制。增量快照与旧的快照机制相比有很多优点,包括:
                // (1)在快照读取期间源可以并行;(2)在快照读取期间源可以在块粒度上执行检查点;(3)在快照读取之前源不需要获取ROW SHARE MODE锁。
                "'scan.incremental.snapshot.chunk.size' = '8096' ,\n" +
                //表快照的块大小(行数),当读取表快照时,捕获的表被分割成多个块。
                "'scan.snapshot.fetch.size' = '1024',\n" +
                //读取表快照时每个轮询的最大读取大小。
                "'connect.max-retries' = '3',\n" +
                //连接器应该重试构建Oracle数据库服务器连接的最大重试次数。
                "'connection.pool.size'= '20',\n" +
                //连接池大小
                "'debezium.log.mining.strategy' = 'online_catalog',\n" +
                //online_catalog -使用数据库的当前数据字典来解析对象id,并且不向在线重做日志中写入任何额外的信息。
                // 这使得LogMiner的挖掘速度大大提高,但代价是无法跟踪DDL的变化。如果捕获的表模式很少或从不更改,那么这是理想的选择。
                "'debezium.log.mining.archive.destination.name' = 'log_archive_dest_1',\n" +
                "'debezium.log.mining.continuous.mine'='true'," +
                "  'table-name' = 'USER_INFO'\n" +
                ")";
        tEnv.executeSql(sourceTable);
//        tEnv.executeSql("select * from oracle_cdc_source").print(); //加上打印后,虽然可以实时看到增删改查记录,但是这些后续操作并不会插入到目标表。如果不加这句打印,则程序无问题
        String sinkTable = "CREATE TABLE mysql_cdc_sink (" +
                "  ID INT,\n" +
                "  USERNAME STRING,\n" +
                "  PASSWORD STRING,\n" +
                "PRIMARY KEY(ID) NOT ENFORCED\n" +
                ") WITH (\n" +
                "'connector' = 'jdbc',\n" +
                "'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                "'url' = 'jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true',\n" +
                "'username' = 'flink_cdc_user',\n" +
                "'password' = 'flink@cdc',\n"+
                "  'table-name' = 'user_new',\n" +
                "  'connection.max-retry-timeout' = '60s'\n" +
                ")";
        tEnv.executeSql(sinkTable);
        tEnv.executeSql("insert into mysql_cdc_sink select ID,USERNAME,PASSWORD from oracle_cdc_source");
    }
}

本地运行控制台是不会输出什么提示的,不像mysql cdc 还可以看到一些查看binlog日志信息。你可以知道程序运行成功与否,Oracle的什么都不会输出。

下图是有打印的,但是只能打印,后续插表动作就失效了。如果不打印,那就是什么都没有。

下图是mysqlCDC的,可以看到有连接,有读取binlog日志,并且还可以打印,后续插表也正常。

具体对应数据类型,还需查看官网,最下面有列出所有对应的数据类型。

具体可用参数,可查官网,也可查阿里介绍,毕竟这是阿里大大的。感觉阿里大大的参数类型更全,更多。具体如何使用,还需研究。MySQL_实时计算 Flink版-阿里云帮助中心

6.打包到集群运行--后续再补一篇吧,前面几篇都需要。单独补一篇。

标签: flink oracle mysql

本文转载自: https://blog.csdn.net/qq_41875667/article/details/131390725
版权归原作者 彩虹豆 所有, 如有侵权,请联系我们删除。

“Flink CDC 基于Oracle log archiving 实时同步Oracle表到Mysql”的评论:

还没有评论