文章目录
前言
Flink CDC 是一个基于流的数据集成工具,旨在为用户提供一套功能更加全面的编程接口(API)。 该工具使得用户能够以 YAML 配置文件的形式实现数据库同步,同时也提供了Flink CDC Source Connector API。 Flink CDC 在任务提交过程中进行了优化,并且增加了一些高级特性,如表结构变更自动同步(Schema Evolution)、数据转换(Data Transformation)、整库同步(Full Database Synchronization)以及 精确一次(Exactly-once)语义。
本文通过flink-connector-oracle-cdc来实现Oracle数据库的数据同步。
一、开启归档日志
1)数据库服务器终端,使用sysdba角色连接数据库
sqlplus / as sysdba
或
sqlplus /nolog
CONNECT sys/password AS SYSDBA;
2)检查归档日志是否开启
archive log list;
(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3)启用归档日志
alter system set db_recovery_file_dest_size = 10G;
alter system set db_recovery_file_dest ='/opt/oracle/oradata/recovery_area'scope=spfile;shutdown immediate;
startup mount;
alter database archivelog;
alter database open;
注意:
启用归档日志需要重启数据库。
归档日志会占用大量的磁盘空间,应定期清除过期的日志文件
4)启动完成后重新执行 archive log list; 查看归档打开状态
二、创建flinkcdc专属用户
2.1 对于Oracle 非CDB数据库,执行如下sql
CREATEUSER flinkuser IDENTIFIED BY flinkpw DEFAULTTABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;GRANTCREATESESSIONTO flinkuser;GRANTSET CONTAINER TO flinkuser;GRANTSELECTON V_$DATABASEto flinkuser;GRANT FLASHBACK ANYTABLETO flinkuser;GRANTSELECTANYTABLETO flinkuser;GRANT SELECT_CATALOG_ROLE TO flinkuser;GRANT EXECUTE_CATALOG_ROLE TO flinkuser;GRANTSELECTANYTRANSACTIONTO flinkuser;GRANT LOGMINING TO flinkuser;GRANTANALYZEANYTO flinkuser;GRANTCREATETABLETO flinkuser;-- need not to execute if set scan.incremental.snapshot.enabled=true(default)GRANTLOCKANYTABLETO flinkuser;GRANTALTERANYTABLETO flinkuser;GRANTCREATE SEQUENCE TO flinkuser;GRANTEXECUTEON DBMS_LOGMNR TO flinkuser;GRANTEXECUTEON DBMS_LOGMNR_D TO flinkuser;GRANTSELECTON V_$LOG TO flinkuser;GRANTSELECTON V_$LOG_HISTORY TO flinkuser;GRANTSELECTON V_$LOGMNR_LOGS TO flinkuser;GRANTSELECTON V_$LOGMNR_CONTENTS TO flinkuser;GRANTSELECTON V_$LOGMNR_PARAMETERS TO flinkuser;GRANTSELECTON V_$LOGFILE TO flinkuser;GRANTSELECTON V_$ARCHIVED_LOG TO flinkuser;GRANTSELECTON V_$ARCHIVE_DEST_STATUS TO flinkuser;
2.2 对于Oracle CDB数据库,执行如下sql
CREATEUSER flinkuser IDENTIFIED BY flinkpw DEFAULTTABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;GRANTCREATESESSIONTO flinkuser CONTAINER=ALL;GRANTSET CONTAINER TO flinkuser CONTAINER=ALL;GRANTSELECTON V_$DATABASEto flinkuser CONTAINER=ALL;GRANT FLASHBACK ANYTABLETO flinkuser CONTAINER=ALL;GRANTSELECTANYTABLETO flinkuser CONTAINER=ALL;GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;GRANTSELECTANYTRANSACTIONTO flinkuser CONTAINER=ALL;GRANT LOGMINING TO flinkuser CONTAINER=ALL;GRANTCREATETABLETO flinkuser CONTAINER=ALL;-- need not to execute if set scan.incremental.snapshot.enabled=true(default)GRANTLOCKANYTABLETO flinkuser CONTAINER=ALL;GRANTCREATE SEQUENCE TO flinkuser CONTAINER=ALL;GRANTEXECUTEON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;GRANTEXECUTEON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;GRANTSELECTON V_$LOG TO flinkuser CONTAINER=ALL;GRANTSELECTON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;GRANTSELECTON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;GRANTSELECTON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;GRANTSELECTON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;GRANTSELECTON V_$LOGFILE TO flinkuser CONTAINER=ALL;GRANTSELECTON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;GRANTSELECTON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
三、指定oracle表、库级启用
-- 指定表启用补充日志记录:ALTERTABLE databasename.tablename ADD SUPPLEMENTAL LOG DATA(ALL)COLUMNS;-- 为数据库的所有表启用ALTERDATABASEADD SUPPLEMENTAL LOG DATA(ALL)COLUMNS;-- 指定数据库启用补充日志记录ALTERDATABASEADD SUPPLEMENTAL LOG DATA;
四、使用flink-connector-oracle-cdc实现数据库同步
4.1 引入pom依赖
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-oracle-cdc</artifactId><version>2.4.0</version></dependency>
4.1 Java主代码
packagetest.datastream.cdc.oracle;importcom.ververica.cdc.connectors.oracle.OracleSource;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.types.Row;importtest.datastream.cdc.oracle.function.CacheDataAllWindowFunction;importtest.datastream.cdc.oracle.function.CdcString2RowMap;importtest.datastream.cdc.oracle.function.DbCdcSinkFunction;importjava.util.Properties;publicclassOracleCdcExample{publicstaticvoidmain(String[] args)throwsException{Properties properties =newProperties();//数字类型数据 转换为字符
properties.setProperty("decimal.handling.mode","string");SourceFunction<String> sourceFunction =OracleSource.<String>builder()// .startupOptions(StartupOptions.latest()) // 从最晚位点启动.url("jdbc:oracle:thin:@localhost:1521:orcl").port(1521).database("ORCL")// monitor XE database.schemaList("c##flink_user")// monitor inventory schema.tableList("c##flink_user.TEST2")// monitor products table.username("c##flink_user").password("flinkpw").debeziumProperties(properties).deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<String> source = env.addSource(sourceFunction).setParallelism(1);// use parallelism 1 for sink to keep message orderingSingleOutputStreamOperator<Row> mapStream = source.flatMap(newCdcString2RowMap());SingleOutputStreamOperator<Row[]> winStream = mapStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))).process(newCacheDataAllWindowFunction());//批量同步
winStream.addSink(newDbCdcSinkFunction(null));
env.execute();}}
4.1 json转换为row
packagetest.datastream.cdc.oracle.function;importcn.com.victorysoft.common.configuration.VsConfiguration;importorg.apache.flink.api.common.functions.RichFlatMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.types.Row;importorg.apache.flink.types.RowKind;importorg.apache.flink.util.Collector;importtest.datastream.cdc.CdcConstants;importjava.sql.Timestamp;importjava.util.HashMap;importjava.util.Map;importjava.util.Set;/**
* @desc cdc json解析,并转换为Row
*/publicclassCdcString2RowMapextendsRichFlatMapFunction<String,Row>{privateMap<String,Integer> columnMap =newHashMap<>();@Overridepublicvoidopen(Configuration parameters)throwsException{
columnMap.put("ID",0);
columnMap.put("NAME",1);
columnMap.put("DESCRIPTION",2);
columnMap.put("AGE",3);
columnMap.put("CREATE_TIME",4);
columnMap.put("SCORE",5);
columnMap.put("C_1",6);
columnMap.put("B_1",7);}@OverridepublicvoidflatMap(String s,Collector<Row> collector)throwsException{System.out.println("receive: "+s);VsConfiguration conf=VsConfiguration.from(s);String op = conf.getString(CdcConstants.K_OP);VsConfiguration before = conf.getConfiguration(CdcConstants.K_BEFORE);VsConfiguration after = conf.getConfiguration(CdcConstants.K_AFTER);Row row =null;if(CdcConstants.OP_C.equals(op)){//插入,使用after数据
row =convertToRow(after);
row.setKind(RowKind.INSERT);}elseif(CdcConstants.OP_U.equals(op)){//更新,使用after数据
row =convertToRow(after);
row.setKind(RowKind.UPDATE_AFTER);}elseif(CdcConstants.OP_D.equals(op)){//删除,使用before数据
row =convertToRow(before);
row.setKind(RowKind.DELETE);}else{//r 操作,使用after数据
row =convertToRow(after);
row.setKind(RowKind.INSERT);}
collector.collect(row);}privateRowconvertToRow(VsConfiguration data){Set<String> keys = data.getKeys();int size = keys.size();Row row=newRow(8);int i=0;for(String key:keys){Integer index =this.columnMap.get(key);Object value=data.get(key);if(key.equals("CREATE_TIME")){//long日期转timestamp
value=long2Timestamp((Long)value);}
row.setField(index,value);}return row;}privatestaticjava.sql.Timestamplong2Timestamp(Long time){Timestamp timestamp =newTimestamp(time/1000);System.out.println(timestamp);return timestamp;}}
版权归原作者 shandongwill 所有, 如有侵权,请联系我们删除。