0


Flink Oracle CDC Connector源码解读

Flink Oracle CDC简介

flink cdc是在flink的基础上对oracle的数据进行实时采集,底层使用的是debezium框架来实现,debezium使用oracle自带的logminer技术来实现。logminer的采集需要对数据库和采集表添加补充日志,由于oracle18c不支持对数据添加补充日志,所以目前支持的oracle11、12、19三个版本。

Flink Oracle CDC使用

flink oracle cdc 支持sql和api两种方式。oracle需要开启归档日志和补充日志才能完成采集,同时需要提供一个有权限的账号去连接oracle数据库完成实时采集。

归档日志开启方式

# 连接oracle
ORACLE_SID=SID
export ORACLE_SID
sqlplus /nolog
  CONNECT sys/password AS SYSDBA
# 开启归档日志alter system set db_recovery_file_dest_size =10G;alter system set db_recovery_file_dest ='/opt/oracle/oradata/recovery_area' scope=spfile;shutdown immediate;
startup mount;alterdatabase archivelog;alterdatabaseopen;

对数据库和表开启补充日志

-- 开启指定表的所有字段补充日志: 补充日志支持ALL、PRIMARY KEY方式ALTERTABLE inventory.customers ADD SUPPLEMENTAL LOG DATA(ALL)COLUMNS;-- 开启数据库的补充日志ALTERDATABASEADD SUPPLEMENTAL LOG DATA;

提供一个有权限的用户

sqlplus sys/password@host:port/SID AS SYSDBA;CREATETABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;CREATEUSER flinkuser IDENTIFIED BY flinkpw DEFAULTTABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;GRANTCREATESESSIONTO flinkuser;GRANTSET CONTAINER TO flinkuser;GRANTSELECTON V_$DATABASEto flinkuser;GRANT FLASHBACK ANYTABLETO flinkuser;GRANTSELECTANYTABLETO flinkuser;GRANT SELECT_CATALOG_ROLE TO flinkuser;GRANT EXECUTE_CATALOG_ROLE TO flinkuser;GRANTSELECTANYTRANSACTIONTO flinkuser;GRANT LOGMINING TO flinkuser;GRANTCREATETABLETO flinkuser;-- need not to execute if set scan.incremental.snapshot.enabled=true(default)GRANTLOCKANYTABLETO flinkuser;GRANTALTERANYTABLETO flinkuser;GRANTCREATE SEQUENCE TO flinkuser;GRANTEXECUTEON DBMS_LOGMNR TO flinkuser;GRANTEXECUTEON DBMS_LOGMNR_D TO flinkuser;GRANTSELECTON V_$LOG TO flinkuser;GRANTSELECTON V_$LOG_HISTORY TO flinkuser;GRANTSELECTON V_$LOGMNR_LOGS TO flinkuser;GRANTSELECTON V_$LOGMNR_CONTENTS TO flinkuser;GRANTSELECTON V_$LOGMNR_PARAMETERS TO flinkuser;GRANTSELECTON V_$LOGFILE TO flinkuser;GRANTSELECTON V_$ARCHIVED_LOG TO flinkuser;GRANTSELECTON V_$ARCHIVE_DEST_STATUS TO flinkuser;exit;

flink的详细使用请参考官网地址

源码解读

由于源码解读是基于api方式使用的,先要准备源码环境,通过maven引入jar包,并将相关的源码下载下来,就可以在idea里面愉快的阅读和调试源代码。

引入maven包

官网最新的版本是2.4,发布版本是2.3.0,我调试的环境是2.2.0。下面的代码都是基于2.2.0来介绍。

<dependency>
  <groupId>com.ververica</groupId>
  <artifactId>flink-connector-oracle-cdc</artifactId>
  <!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
  <version>2.2.0</version>
</dependency>

基于api方式使用oracle cdc

Properties properties =newProperties();
        properties.put("decimal.handling.mode","double");
        properties.put("database.url","jdbc:oracle:thin:@127.0.0.1:1521:orcl");SourceFunction<String> sourceFunction =OracleSource.<String>builder().hostname("localhost").port(1521).database("orcl")// monitor XE database.schemaList("flinkuser")// monitor inventory schema.tableList("flinkuser.test")// monitor products table.username("flinkuser").password("flinkpw").startupOptions(StartupOptions.latest()).debeziumProperties(properties).deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSOurce(sourceFunction).print();
        env.execute();

OracleSource

作为一个工具类方法调用build()会返回一个

DebeziumSourceFunction

对象,在返回这个对象之前会设置build之前的参数。

publicDebeziumSourceFunction<T>build(){Properties props =newProperties();
            props.setProperty("connector.class",OracleConnector.class.getCanonicalName());// Logical name that identifies and provides a namespace for the particular Oracle// database server being// monitored. The logical name should be unique across all other connectors, since it is// used as a prefix// for all Kafka topic names emanating from this connector. Only alphanumeric characters// and// underscores should be used.
            props.setProperty("database.server.name", DATABASE_SERVER_NAME);
            props.setProperty("database.hostname",checkNotNull(hostname));
            props.setProperty("database.user",checkNotNull(username));
            props.setProperty("database.password",checkNotNull(password));
            props.setProperty("database.port",String.valueOf(port));
            props.setProperty("database.history.skip.unparseable.ddl",String.valueOf(true));
            props.setProperty("database.dbname",checkNotNull(database));if(schemaList !=null){
                props.setProperty("schema.whitelist",String.join(",", schemaList));}if(tableList !=null){
                props.setProperty("table.include.list",String.join(",", tableList));}DebeziumOffset specificOffset =null;switch(startupOptions.startupMode){case INITIAL:
                    props.setProperty("snapshot.mode","initial");break;case LATEST_OFFSET:
                    props.setProperty("snapshot.mode","schema_only");break;default:thrownewUnsupportedOperationException();}if(dbzProperties !=null){
                props.putAll(dbzProperties);}returnnewDebeziumSourceFunction<>(
                    deserializer, props, specificOffset,newOracleValidator(props));}

跟进

DebeziumSourceFunction

源代码的run()方法里面提交解析oracle实时日志请求

// create the engine with this configuration ...this.engine =DebeziumEngine.create(Connect.class).using(properties).notifying(changeConsumer).using(OffsetCommitPolicy.always()).using((success, message, error)->{if(success){// Close the handover and prepare to exit.
                                        handover.close();}else{
                                        handover.reportError(error);}}).build();// run the engine asynchronously
        executor.execute(engine);
        debeziumStarted =true;
DebeziumEngine.build()

的实现类是

io.debezium.embedded.EmbeddedEngine.BuilderImpl#build

这个方法,返回一个

EmbeddedEngine

对象,这是一个线程类。在run方法里面完成整个数据采集链路。方法调用栈
在这里插入图片描述

@Overridepublicvoidrun(){if(runningThread.compareAndSet(null,Thread.currentThread())){....// Instantiate the connector ...SourceConnector connector =null;try{@SuppressWarnings("unchecked")Class<?extendsSourceConnector> connectorClass =(Class<SourceConnector>) classLoader.loadClass(connectorClassName);
                    connector = connectorClass.getDeclaredConstructor().newInstance();}// Instantiate the offset store ...finalString offsetStoreClassName = config.getString(OFFSET_STORAGE);OffsetBackingStore offsetStore =null;try{@SuppressWarnings("unchecked")Class<?extendsOffsetBackingStore> offsetStoreClass =(Class<OffsetBackingStore>) classLoader.loadClass(offsetStoreClassName);
                    offsetStore = offsetStoreClass.getDeclaredConstructor().newInstance();}....// Initialize the offset store ...try{
                    offsetStore.configure(workerConfig);
                    offsetStore.start();}....// Set up the offset commit policy ...if(offsetCommitPolicy ==null){
                    offsetCommitPolicy =Instantiator.getInstanceWithProperties(config.getString(EmbeddedEngine.OFFSET_COMMIT_POLICY),()->getClass().getClassLoader(), config.asProperties());}// Initialize the connector using a context that does NOT respond to requests to reconfigure tasks ...ConnectorContext context =newConnectorContext();....
                connector.initialize(context);OffsetStorageWriter offsetWriter =newOffsetStorageWriter(offsetStore, engineName,
                        keyConverter, valueConverter);OffsetStorageReader offsetReader =newOffsetStorageReaderImpl(offsetStore, engineName,
                        keyConverter, valueConverter);Duration commitTimeout =Duration.ofMillis(config.getLong(OFFSET_COMMIT_TIMEOUT_MS));try{// Start the connector with the given properties and get the task configurations ...
                    connector.start(config.asMap());
                    connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted);List<Map<String,String>> taskConfigs = connector.taskConfigs(1);Class<?extendsTask> taskClass = connector.taskClass();if(taskConfigs.isEmpty()){String msg ="Unable to start connector's task class '"+ taskClass.getName()+"' with no task configuration";fail(msg);return;}
                    task =null;try{
                        task =(SourceTask) taskClass.getDeclaredConstructor().newInstance();}catch(IllegalAccessException|InstantiationException t){fail("Unable to instantiate connector's task class '"+ taskClass.getName()+"'", t);return;}try{SourceTaskContext taskContext =newSourceTaskContext();......
                        task.initialize(taskContext);
                        task.start(taskConfigs.get(0));
                        connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStarted);}......

                    recordsSinceLastCommit =0;Throwable handlerError =null;try{
                        timeOfLastCommitMillis = clock.currentTimeInMillis();RecordCommitter committer =buildRecordCommitter(offsetWriter, task, commitTimeout);while(runningThread.get()!=null){List<SourceRecord> changeRecords =null;try{
                                LOGGER.debug("Embedded engine is polling task for records on thread {}", runningThread.get());
                                changeRecords = task.poll();// blocks until there are values ...
                                LOGGER.debug("Embedded engine returned from polling task for records");}catch(InterruptedException e){// Interrupted while polling ...
                                LOGGER.debug("Embedded engine interrupted on thread {} while polling the task for records", runningThread.get());if(this.runningThread.get()==Thread.currentThread()){// this thread is still set as the running thread -> we were not interrupted// due the stop() call -> probably someone else called the interrupt on us ->// -> we should raise the interrupt flagThread.currentThread().interrupt();}break;}catch(RetriableException e){
                                LOGGER.info("Retrieable exception thrown, connector will be restarted", e);// Retriable exception should be ignored by the engine// and no change records delivered.// The retry is handled in io.debezium.connector.common.BaseSourceTask.poll()}try{if(changeRecords !=null&&!changeRecords.isEmpty()){
                                    LOGGER.debug("Received {} records from the task", changeRecords.size());
                                    changeRecords = changeRecords.stream().map(transformations::transform).filter(x -> x !=null).collect(Collectors.toList());}if(changeRecords !=null&&!changeRecords.isEmpty()){
                                    LOGGER.debug("Received {} transformed records from the task", changeRecords.size());try{
                                        handler.handleBatch(changeRecords, committer);}catch(StopConnectorException e){break;}}else{
                                    LOGGER.debug("Received no records from the task");}}catch(Throwable t){// There was some sort of unexpected exception, so we should stop work
                                handlerError = t;break;}}}...}}}
  • 通过反射方式初始化connector获取OracleConnector对象
  • 初始化offset的存储对象
  • 设置offset提交策略
  • 通过connector.start(config.asMap());将配置属性设置给Connector对象
  • 通过反射方式从connector获取SourceTask,在这里获取的是OracleConnectorTask对象
  • 通过调用task.start(taskConfigs.get(0));启动任务去获取oracle的变更数据,具体方法路径io.debezium.connector.oracle.OracleConnectorTask#start,具体实现代码如下:
@OverridepublicChangeEventSourceCoordinatorstart(Configuration config){OracleConnectorConfig connectorConfig =newOracleConnectorConfig(config);TopicSelector<TableId> topicSelector =OracleTopicSelector.defaultSelector(connectorConfig);SchemaNameAdjuster schemaNameAdjuster =SchemaNameAdjuster.create();Configuration jdbcConfig = connectorConfig.jdbcConfig();
       jdbcConnection =newOracleConnection(jdbcConfig,()->getClass().getClassLoader());this.schema =newOracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);this.schema.initializeStorage();String adapterString = config.getString(OracleConnectorConfig.CONNECTOR_ADAPTER);OracleConnectorConfig.ConnectorAdapter adapter =OracleConnectorConfig.ConnectorAdapter.parse(adapterString);OffsetContext previousOffset =getPreviousOffset(newOracleOffsetContext.Loader(connectorConfig, adapter));if(previousOffset !=null){
           schema.recover(previousOffset);}

       taskContext =newOracleTaskContext(connectorConfig, schema);Clock clock =Clock.system();// Set up the task record queue ...this.queue =newChangeEventQueue.Builder<DataChangeEvent>().pollInterval(connectorConfig.getPollInterval()).maxBatchSize(connectorConfig.getMaxBatchSize()).maxQueueSize(connectorConfig.getMaxQueueSize()).loggingContextSupplier(()-> taskContext.configureLoggingContext(CONTEXT_NAME)).build();

       errorHandler =newOracleErrorHandler(connectorConfig.getLogicalName(), queue);finalOracleEventMetadataProvider metadataProvider =newOracleEventMetadataProvider();EventDispatcher<TableId> dispatcher =newEventDispatcher<>(
               connectorConfig,
               topicSelector,
               schema,
               queue,
               connectorConfig.getTableFilters().dataCollectionFilter(),DataChangeEvent::new,
               metadataProvider,
               schemaNameAdjuster);finalOracleStreamingChangeEventSourceMetrics streamingMetrics =newOracleStreamingChangeEventSourceMetrics(taskContext, queue, metadataProvider,
               connectorConfig);ChangeEventSourceCoordinator coordinator =newChangeEventSourceCoordinator(
               previousOffset,
               errorHandler,OracleConnector.class,
               connectorConfig,newOracleChangeEventSourceFactory(connectorConfig, jdbcConnection, errorHandler, dispatcher, clock, schema, jdbcConfig, taskContext, streamingMetrics),newOracleChangeEventSourceMetricsFactory(streamingMetrics),
               dispatcher,
               schema);

       coordinator.start(taskContext,this.queue, metadataProvider);return coordinator;}
  • 创建一个任务上下文对象taskContext,改对象用来保存任务的参数和schema属性
  • 设置一个消息队列queue,用来保存解析后的消息
  • 创建事件分发器对象dispatcher,该对象用来下发解析后的数据到队列中
  • 创建io.debezium.pipeline.ChangeEventSourceCoordinator对象,调用io.debezium.pipeline.ChangeEventSourceCoordinator#start,在这里插入图片描述 方法中的会调用streamEvents,streamEvents最后调用io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource#execute方法,该方法就是解析oracle日志的最终实现方法
publicvoidexecute(ChangeEventSourceContext context){try(TransactionalBuffer transactionalBuffer =newTransactionalBuffer(schema, clock, errorHandler, streamingMetrics)){try{
               startScn = offsetContext.getScn();createFlushTable(jdbcConnection);if(!isContinuousMining && startScn.compareTo(getFirstOnlineLogScn(jdbcConnection, archiveLogRetention))<0){thrownewDebeziumException("Online REDO LOG files or archive log files do not contain the offset scn "+ startScn +".  Please perform a new snapshot.");}setNlsSessionParameters(jdbcConnection);checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);initializeRedoLogsForMining(jdbcConnection,false, archiveLogRetention);HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();try{// todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration?
                   historyRecorder.prepare(streamingMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours());finalLogMinerQueryResultProcessor processor =newLogMinerQueryResultProcessor(context, jdbcConnection,
                           connectorConfig, streamingMetrics, transactionalBuffer, offsetContext, schema, dispatcher,
                           clock, historyRecorder);finalString query =SqlUtils.logMinerContentsQuery(connectorConfig, jdbcConnection.username());try(PreparedStatement miningView = jdbcConnection.connection().prepareStatement(query,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY,ResultSet.HOLD_CURSORS_OVER_COMMIT)){

                       currentRedoLogSequences =getCurrentRedoLogSequences();Stopwatch stopwatch =Stopwatch.reusable();while(context.isRunning()){// Calculate time difference before each mining session to detect time zone offset changes (e.g. DST) on database server
                           streamingMetrics.calculateTimeDifference(getSystime(jdbcConnection));Instant start =Instant.now();
                           endScn =getEndScn(jdbcConnection, startScn, streamingMetrics, connectorConfig.getLogMiningBatchSizeDefault());flushLogWriter(jdbcConnection, jdbcConfiguration, isRac, racHosts);if(hasLogSwitchOccurred()){// This is the way to mitigate PGA leaks.// With one mining session, it grows and maybe there is another way to flush PGA.// At this point we use a new mining session
                               LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
                                       startScn, endScn, offsetContext.getScn(), strategy, isContinuousMining);endMining(jdbcConnection);initializeRedoLogsForMining(jdbcConnection,true, archiveLogRetention);abandonOldTransactionsIfExist(jdbcConnection, transactionalBuffer);// This needs to be re-calculated because building the data dictionary will force the// current redo log sequence to be advanced due to a complete log switch of all logs.
                               currentRedoLogSequences =getCurrentRedoLogSequences();}startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining, streamingMetrics);

                           stopwatch.start();
                           miningView.setFetchSize(connectorConfig.getMaxQueueSize());
                           miningView.setFetchDirection(ResultSet.FETCH_FORWARD);
                           miningView.setString(1, startScn.toString());
                           miningView.setString(2, endScn.toString());try(ResultSet rs = miningView.executeQuery()){Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal();
                               streamingMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
                               processor.processResult(rs);

                               startScn = endScn;if(transactionalBuffer.isEmpty()){
                                   LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
                                   offsetContext.setScn(startScn);}}

                           streamingMetrics.setCurrentBatchProcessingTime(Duration.between(start,Instant.now()));pauseBetweenMiningSessions();}}}finally{
                   historyRecorder.close();}}catch(Throwable t){logError(streamingMetrics,"Mining session stopped due to the {}", t);
               errorHandler.setProducerThrowable(t);}finally{
               LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn());
               LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
               LOGGER.info("Streaming metrics dump: {}", streamingMetrics.toString());}}}
  • 创建一张临时表,保存最后一次解析的SCN,用来下一次解析的数据位置
CREATETABLE LOGMNR_FLUSH_TABLE (LAST_SCN NUMBER(19,0));
  • 检查数据库和表有没有开启归档日志
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
  • 调用数据库的数据字典构建存储过程,并数据库的归档日志和在线日志添加到logminer中
privatevoidinitializeRedoLogsForMining(OracleConnection connection,boolean postEndMiningSession,Duration archiveLogRetention)throwsSQLException{if(!postEndMiningSession){if(OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)){buildDataDictionary(connection);}if(!isContinuousMining){setRedoLogFilesForMining(connection, startScn, archiveLogRetention);}}else{if(!isContinuousMining){if(OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)){buildDataDictionary(connection);}setRedoLogFilesForMining(connection, startScn, archiveLogRetention);}}}

以上代码在数据库中会执行如下语句:

BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS);END;# 查看在线日志列表SELECTMIN(F.MEMBER)AS FILE_NAME, L.NEXT_CHANGE# AS NEXT_CHANGE, F.GROUP#, L.FIRST_CHANGE# AS FIRST_CHANGE, L.STATUS FROM V$LOG L, V$LOGFILE F 
                 WHERE F.GROUP# = L.GROUP# AND L.NEXT_CHANGE# > 0 GROUPBY F.GROUP#, L.NEXT_CHANGE#, L.FIRST_CHANGE#, L.STATUS ORDER BY 3;# 查看归档日志列表SELECT NAME AS FILE_NAME, NEXT_CHANGE# AS NEXT_CHANGE, FIRST_CHANGE# AS FIRST_CHANGE FROM V$ARCHIVED_LOG
        WHERE NAME ISNOTNULLAND ARCHIVED ='YES'ANDSTATUS='A'AND NEXT_CHANGE# '?' --上一次爬取的scnAND DEST_ID IN(SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERESTATUS='VALID'ANDTYPE='LOCAL'AND ROWNUM=1))ORDERBY2;

将归档日志合并到在线日志中,将合并的列表添加到logminer中用来解析

EGIN sys.dbms_logmnr.add_logfile(LOGFILENAME =>'" + fileName + "', OPTIONS =>"DBMS_LOGMNR.ADDFILE");END;
  • 调用sys.dbms_logmnr.start_logmnr开始解析归档日志,并将解析的结果写入V$LOGMNR_CONTENTS
staticStringstartLogMinerStatement(Scn startScn,Scn endScn,OracleConnectorConfig.LogMiningStrategy strategy,boolean isContinuousMining){String miningStrategy;if(strategy.equals(OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO)){
            miningStrategy ="DBMS_LOGMNR.DICT_FROM_REDO_LOGS + DBMS_LOGMNR.DDL_DICT_TRACKING ";}else{
            miningStrategy ="DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG ";}if(isContinuousMining){
            miningStrategy +=" + DBMS_LOGMNR.CONTINUOUS_MINE ";}return"BEGIN sys.dbms_logmnr.start_logmnr("+"startScn => '"+ startScn +"', "+"endScn => '"+ endScn +"', "+"OPTIONS => "+ miningStrategy +" + DBMS_LOGMNR.NO_ROWID_IN_STMT);"+"END;";}

最终查询结果的语句

SELECT SCN, SQL_REDO, OPERATION_CODE,TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID,ROLLBACKFROM V$LOGMNR_CONTENTS WHERE SCN >'2468014'AND SCN <='2468297'AND((OPERATION_CODE IN(5,34)AND USERNAME NOTIN('SYS','SYSTEM','FLINKUSER'))OR(OPERATION_CODE IN(7,36))OR(OPERATION_CODE IN(1,2,3)AND TABLE_NAME !='LOG_MINING_FLUSH'AND SEG_OWNER NOTIN('APPQOSSYS','AUDSYS','CTXSYS','DVSYS','DBSFWUSER','DBSNMP','GSMADMIN_INTERNAL','LBACSYS','MDSYS','OJVMSYS','OLAPSYS','ORDDATA','ORDSYS','OUTLN','SYS','SYSTEM','WMSYS','XDB')AND(REGEXP_LIKE(SEG_OWNER,'^flinkuser$','i'))AND(REGEXP_LIKE(SEG_OWNER ||'.'|| TABLE_NAME,'^flinkuser.test$','i'))))

解析查询的结果

         miningView.setFetchSize(connectorConfig.getMaxQueueSize());
         miningView.setFetchDirection(ResultSet.FETCH_FORWARD);
         miningView.setString(1, startScn.toString());
         miningView.setString(2, endScn.toString());try(ResultSet rs = miningView.executeQuery()){Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal();
             streamingMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
             processor.processResult(rs);

             startScn = endScn;if(transactionalBuffer.isEmpty()){
                 LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
                 offsetContext.setScn(startScn);}}

解析的具体类和方法

io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor#processResult

,这个类就是完成sql语句的解析,将sql语句中的字段和字段所对应的值解析到两个数组中,包装成一个Entry对象传递给converter去解析,这个的具体解析过程后面再补充。

使用中遇到的问题

如果oracle的dbName配置的是SID会出现链接不上的问题。

如果oracle的dbName配置的是SID,而不是service_name,就会出现链接不上的问题,出现这个问题的原因是犹豫oracle的默认链接是一

jdbc:oracle:thin@localhost:1521/service_name

方式拼接,如果要链接SID需要改成

jdbc:oracle:thin:@localhost:1521:sid

方式。在oracle cdc中可以手动指定oracle链接字符串,这样就会根据你指定的url方式去链接。

基于api的方式指定

properties.put("database.url","jdbc:oracle:thin:@localhost:1521:sid");

基于flink sql方式指定

基于sql设置属性需要加一个

debezium.

前缀

CREATETABLE GSP_PURCHASE_ORDER(
     ORDER_ID STRING NOTNULL,
     EBELN STRING,
     BSTYP STRING,PRIMARYKEY(ORDER_ID)NOT ENFORCED
     )WITH('connector'='oracle-cdc','debezium.database.url'='jdbc:oracle:thin:@localhost:1521:RACTEST1','debezium.database.tablename.case.insensitive'='false','hostname'='localhost','port'='1521','username'='****','password'='****','database-name'='RACTEST1','schema-name'='GSP_MODULE_UAT','table-name'='GSP_PURCHASE_ORDER');

oracle11g大小写敏感问题

oracle在查询表的状态的时候默认是会将表名转换为小写,这样查询的表补充日志的时候认为是没有,所以需要指定

database.tablename.case.insensitive

属性改成false,这样就不会自动将表名转换为小写去校验表,这样就能成功的校验出表是否有添加补充日志,对于oracle11g,表名和schema一定要大写。

基于api的修改方式

properties.put("database.tablename.case.insensitive","false");

基于sql的修改方式

修改方式参考第一个问题的修改方式。

标签: oracle 数据库 flink

本文转载自: https://blog.csdn.net/IT_xhf/article/details/130364090
版权归原作者 IT_xhf 所有, 如有侵权,请联系我们删除。

“Flink Oracle CDC Connector源码解读”的评论:

还没有评论