环境说明:
flink 1.15.2
Oracle 版本:Oracle Database 11g Enterprise Edition Release 11.2.0.1.0 - 64bit Production
mysql 版本:5.7
windows11 IDEA 本地运行
先上官网使用说明和案例:Oracle CDC Connector — Flink CDC documentation
- Oracle 开启 log archiving
(1).启用 log archiving
a:以DBA用户连接数据库
sqlplus / as sysdba
b:启用 log archiving (会重启数据库)
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
c:检查 log archiving 是否开启 -- Should now "Database log mode: Archive Mode"
archive log list;(2).注:必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的之前状态。下面演示了如何在表/数据库级别上配置它。 为一个特定的表启用补充日志记录:修改表目录。客户添加补充日志数据(所有)列; ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; —为数据库启用补充日志修改数据库添加补充日志数据; ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; (3).创建具有权限的Oracle用户 a:创建表空间 sqlplus / as sysdba CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED; exit; b:创建用户并赋权 flinkuser flinkpw sqlplus / as sysdba CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS; GRANT CREATE SESSION TO flinkuser; GRANT SET CONTAINER TO flinkuser; GRANT SELECT ON V_$DATABASE to flinkuser; GRANT FLASHBACK ANY TABLE TO flinkuser; GRANT SELECT ANY TABLE TO flinkuser; GRANT SELECT_CATALOG_ROLE TO flinkuser; GRANT EXECUTE_CATALOG_ROLE TO flinkuser; GRANT SELECT ANY TRANSACTION TO flinkuser; GRANT LOGMINING TO flinkuser; GRANT CREATE TABLE TO flinkuser; GRANT LOCK ANY TABLE TO flinkuser; GRANT ALTER ANY TABLE TO flinkuser; GRANT CREATE SEQUENCE TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser; GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser; GRANT SELECT ON V_$LOG TO flinkuser; GRANT SELECT ON V_$LOG_HISTORY TO flinkuser; GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser; GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser; GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser; GRANT SELECT ON V_$LOGFILE TO flinkuser; GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser; GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser; exit;
- Oracle 建表,并配置补充日志
CREATE TABLE "USER_INFO" (
ID NUMBER,
USERNAME VARCHAR2(255),
PASSWORD VARCHAR2(255),
PRIMARY KEY (ID));ALTER TABLE USER_INFO ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
- Mysql 建表
CREATE TABLE user_new (
id int(11) NOT NULL,
username varchar(255) DEFAULT NULL,
password varchar(255) DEFAULT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
4.Maven依赖
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.15.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
<!-- 此标签会移除jar包,当需要打包到集群运行时加上此标签-->
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.22</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mysql-cdc</artifactId>
<version>2.3.0</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.15.2</version>
<!-- <scope>provided</scope>-->
<!-- 此标签会移除jar包,当需要打包到集群运行时加上此标签-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<!-- https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oracle-cdc -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>2.3.0</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.15</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.17.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-api -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.17.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.9</version>
</dependency>
</dependencies>
5.demo如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class OracleCdcToMysql {
public static void main(String[] args) {
//1.获取stream的执行环境
StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
senv.setParallelism(1);
//2.创建表执行环境
StreamTableEnvironment tEnv = StreamTableEnvironment.create(senv);
String sourceTable = "CREATE TABLE oracle_cdc_source " +
"( ID INT, " +
"USERNAME STRING, " +
"PASSWORD STRING, " +
"PRIMARY KEY(ID) NOT ENFORCED) WITH (\n" +
"'connector' = 'oracle-cdc',\n" +
"'hostname' = '1.1.1.1',\n" +
"'port' = '1521',\n" +
"'username' = 'flinkcdcuser',\n" +
"'password' = 'flinkpw',\n" +
"'database-name' = 'LMDB',\n" +//select name from v$database;
"'schema-name' = 'TEST',\n" +//select SYS_CONTEXT('USERENV','CURRENT_SCHEMA') CURRENT_SCHEMA from dual;
"'debezium.snapshot.mode' = 'schema_only',\n" +
//snapshot.mode = initial 快照包括捕获表的结构和数据。指定此值将用捕获表中数据的完整表示填充主题。
//snapshot.mode = schema_only 快照只包含捕获表的结构。如果希望连接器仅捕获快照之后发生的更改的数据,请指定此值。
"'scan.incremental.snapshot.enabled' = 'true',\n" +
//scan.incremental.snapshot.enabled 增量快照是一种读取表快照的新机制。增量快照与旧的快照机制相比有很多优点,包括:
// (1)在快照读取期间源可以并行;(2)在快照读取期间源可以在块粒度上执行检查点;(3)在快照读取之前源不需要获取ROW SHARE MODE锁。
"'scan.incremental.snapshot.chunk.size' = '8096' ,\n" +
//表快照的块大小(行数),当读取表快照时,捕获的表被分割成多个块。
"'scan.snapshot.fetch.size' = '1024',\n" +
//读取表快照时每个轮询的最大读取大小。
"'connect.max-retries' = '3',\n" +
//连接器应该重试构建Oracle数据库服务器连接的最大重试次数。
"'connection.pool.size'= '20',\n" +
//连接池大小
"'debezium.log.mining.strategy' = 'online_catalog',\n" +
//online_catalog -使用数据库的当前数据字典来解析对象id,并且不向在线重做日志中写入任何额外的信息。
// 这使得LogMiner的挖掘速度大大提高,但代价是无法跟踪DDL的变化。如果捕获的表模式很少或从不更改,那么这是理想的选择。
"'debezium.log.mining.archive.destination.name' = 'log_archive_dest_1',\n" +
"'debezium.log.mining.continuous.mine'='true'," +
" 'table-name' = 'USER_INFO'\n" +
")";
tEnv.executeSql(sourceTable);
// tEnv.executeSql("select * from oracle_cdc_source").print(); //加上打印后,虽然可以实时看到增删改查记录,但是这些后续操作并不会插入到目标表。如果不加这句打印,则程序无问题
String sinkTable = "CREATE TABLE mysql_cdc_sink (" +
" ID INT,\n" +
" USERNAME STRING,\n" +
" PASSWORD STRING,\n" +
"PRIMARY KEY(ID) NOT ENFORCED\n" +
") WITH (\n" +
"'connector' = 'jdbc',\n" +
"'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
"'url' = 'jdbc:mysql://localhost:3306/test?rewriteBatchedStatements=true',\n" +
"'username' = 'flink_cdc_user',\n" +
"'password' = 'flink@cdc',\n"+
" 'table-name' = 'user_new',\n" +
" 'connection.max-retry-timeout' = '60s'\n" +
")";
tEnv.executeSql(sinkTable);
tEnv.executeSql("insert into mysql_cdc_sink select ID,USERNAME,PASSWORD from oracle_cdc_source");
}
}
本地运行控制台是不会输出什么提示的,不像mysql cdc 还可以看到一些查看binlog日志信息。你可以知道程序运行成功与否,Oracle的什么都不会输出。
下图是有打印的,但是只能打印,后续插表动作就失效了。如果不打印,那就是什么都没有。
下图是mysqlCDC的,可以看到有连接,有读取binlog日志,并且还可以打印,后续插表也正常。
具体对应数据类型,还需查看官网,最下面有列出所有对应的数据类型。
具体可用参数,可查官网,也可查阿里介绍,毕竟这是阿里大大的。感觉阿里大大的参数类型更全,更多。具体如何使用,还需研究。MySQL_实时计算 Flink版-阿里云帮助中心
6.打包到集群运行--后续再补一篇吧,前面几篇都需要。单独补一篇。
版权归原作者 彩虹豆 所有, 如有侵权,请联系我们删除。