Flink Oracle CDC简介
flink cdc是在flink的基础上对oracle的数据进行实时采集,底层使用的是debezium框架来实现,debezium使用oracle自带的logminer技术来实现。logminer的采集需要对数据库和采集表添加补充日志,由于oracle18c不支持对数据添加补充日志,所以目前支持的oracle11、12、19三个版本。
Flink Oracle CDC使用
flink oracle cdc 支持sql和api两种方式。oracle需要开启归档日志和补充日志才能完成采集,同时需要提供一个有权限的账号去连接oracle数据库完成实时采集。
归档日志开启方式
# 连接oracle
ORACLE_SID=SID
export ORACLE_SID
sqlplus /nolog
CONNECT sys/password AS SYSDBA
# 开启归档日志alter system set db_recovery_file_dest_size =10G;alter system set db_recovery_file_dest ='/opt/oracle/oradata/recovery_area' scope=spfile;shutdown immediate;
startup mount;alterdatabase archivelog;alterdatabaseopen;
对数据库和表开启补充日志
-- 开启指定表的所有字段补充日志: 补充日志支持ALL、PRIMARY KEY方式ALTERTABLE inventory.customers ADD SUPPLEMENTAL LOG DATA(ALL)COLUMNS;-- 开启数据库的补充日志ALTERDATABASEADD SUPPLEMENTAL LOG DATA;
提供一个有权限的用户
sqlplus sys/password@host:port/SID AS SYSDBA;CREATETABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;CREATEUSER flinkuser IDENTIFIED BY flinkpw DEFAULTTABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;GRANTCREATESESSIONTO flinkuser;GRANTSET CONTAINER TO flinkuser;GRANTSELECTON V_$DATABASEto flinkuser;GRANT FLASHBACK ANYTABLETO flinkuser;GRANTSELECTANYTABLETO flinkuser;GRANT SELECT_CATALOG_ROLE TO flinkuser;GRANT EXECUTE_CATALOG_ROLE TO flinkuser;GRANTSELECTANYTRANSACTIONTO flinkuser;GRANT LOGMINING TO flinkuser;GRANTCREATETABLETO flinkuser;-- need not to execute if set scan.incremental.snapshot.enabled=true(default)GRANTLOCKANYTABLETO flinkuser;GRANTALTERANYTABLETO flinkuser;GRANTCREATE SEQUENCE TO flinkuser;GRANTEXECUTEON DBMS_LOGMNR TO flinkuser;GRANTEXECUTEON DBMS_LOGMNR_D TO flinkuser;GRANTSELECTON V_$LOG TO flinkuser;GRANTSELECTON V_$LOG_HISTORY TO flinkuser;GRANTSELECTON V_$LOGMNR_LOGS TO flinkuser;GRANTSELECTON V_$LOGMNR_CONTENTS TO flinkuser;GRANTSELECTON V_$LOGMNR_PARAMETERS TO flinkuser;GRANTSELECTON V_$LOGFILE TO flinkuser;GRANTSELECTON V_$ARCHIVED_LOG TO flinkuser;GRANTSELECTON V_$ARCHIVE_DEST_STATUS TO flinkuser;exit;
flink的详细使用请参考官网地址
源码解读
由于源码解读是基于api方式使用的,先要准备源码环境,通过maven引入jar包,并将相关的源码下载下来,就可以在idea里面愉快的阅读和调试源代码。
引入maven包
官网最新的版本是2.4,发布版本是2.3.0,我调试的环境是2.2.0。下面的代码都是基于2.2.0来介绍。
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.2.0</version>
</dependency>
基于api方式使用oracle cdc
Properties properties =newProperties();
properties.put("decimal.handling.mode","double");
properties.put("database.url","jdbc:oracle:thin:@127.0.0.1:1521:orcl");SourceFunction<String> sourceFunction =OracleSource.<String>builder().hostname("localhost").port(1521).database("orcl")// monitor XE database.schemaList("flinkuser")// monitor inventory schema.tableList("flinkuser.test")// monitor products table.username("flinkuser").password("flinkpw").startupOptions(StartupOptions.latest()).debeziumProperties(properties).deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.addSOurce(sourceFunction).print();
env.execute();
以
OracleSource
作为一个工具类方法调用build()会返回一个
DebeziumSourceFunction
对象,在返回这个对象之前会设置build之前的参数。
publicDebeziumSourceFunction<T>build(){Properties props =newProperties();
props.setProperty("connector.class",OracleConnector.class.getCanonicalName());// Logical name that identifies and provides a namespace for the particular Oracle// database server being// monitored. The logical name should be unique across all other connectors, since it is// used as a prefix// for all Kafka topic names emanating from this connector. Only alphanumeric characters// and// underscores should be used.
props.setProperty("database.server.name", DATABASE_SERVER_NAME);
props.setProperty("database.hostname",checkNotNull(hostname));
props.setProperty("database.user",checkNotNull(username));
props.setProperty("database.password",checkNotNull(password));
props.setProperty("database.port",String.valueOf(port));
props.setProperty("database.history.skip.unparseable.ddl",String.valueOf(true));
props.setProperty("database.dbname",checkNotNull(database));if(schemaList !=null){
props.setProperty("schema.whitelist",String.join(",", schemaList));}if(tableList !=null){
props.setProperty("table.include.list",String.join(",", tableList));}DebeziumOffset specificOffset =null;switch(startupOptions.startupMode){case INITIAL:
props.setProperty("snapshot.mode","initial");break;case LATEST_OFFSET:
props.setProperty("snapshot.mode","schema_only");break;default:thrownewUnsupportedOperationException();}if(dbzProperties !=null){
props.putAll(dbzProperties);}returnnewDebeziumSourceFunction<>(
deserializer, props, specificOffset,newOracleValidator(props));}
跟进
DebeziumSourceFunction
源代码的run()方法里面提交解析oracle实时日志请求
// create the engine with this configuration ...this.engine =DebeziumEngine.create(Connect.class).using(properties).notifying(changeConsumer).using(OffsetCommitPolicy.always()).using((success, message, error)->{if(success){// Close the handover and prepare to exit.
handover.close();}else{
handover.reportError(error);}}).build();// run the engine asynchronously
executor.execute(engine);
debeziumStarted =true;
DebeziumEngine.build()
的实现类是
io.debezium.embedded.EmbeddedEngine.BuilderImpl#build
这个方法,返回一个
EmbeddedEngine
对象,这是一个线程类。在run方法里面完成整个数据采集链路。方法调用栈
@Overridepublicvoidrun(){if(runningThread.compareAndSet(null,Thread.currentThread())){....// Instantiate the connector ...SourceConnector connector =null;try{@SuppressWarnings("unchecked")Class<?extendsSourceConnector> connectorClass =(Class<SourceConnector>) classLoader.loadClass(connectorClassName);
connector = connectorClass.getDeclaredConstructor().newInstance();}// Instantiate the offset store ...finalString offsetStoreClassName = config.getString(OFFSET_STORAGE);OffsetBackingStore offsetStore =null;try{@SuppressWarnings("unchecked")Class<?extendsOffsetBackingStore> offsetStoreClass =(Class<OffsetBackingStore>) classLoader.loadClass(offsetStoreClassName);
offsetStore = offsetStoreClass.getDeclaredConstructor().newInstance();}....// Initialize the offset store ...try{
offsetStore.configure(workerConfig);
offsetStore.start();}....// Set up the offset commit policy ...if(offsetCommitPolicy ==null){
offsetCommitPolicy =Instantiator.getInstanceWithProperties(config.getString(EmbeddedEngine.OFFSET_COMMIT_POLICY),()->getClass().getClassLoader(), config.asProperties());}// Initialize the connector using a context that does NOT respond to requests to reconfigure tasks ...ConnectorContext context =newConnectorContext();....
connector.initialize(context);OffsetStorageWriter offsetWriter =newOffsetStorageWriter(offsetStore, engineName,
keyConverter, valueConverter);OffsetStorageReader offsetReader =newOffsetStorageReaderImpl(offsetStore, engineName,
keyConverter, valueConverter);Duration commitTimeout =Duration.ofMillis(config.getLong(OFFSET_COMMIT_TIMEOUT_MS));try{// Start the connector with the given properties and get the task configurations ...
connector.start(config.asMap());
connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::connectorStarted);List<Map<String,String>> taskConfigs = connector.taskConfigs(1);Class<?extendsTask> taskClass = connector.taskClass();if(taskConfigs.isEmpty()){String msg ="Unable to start connector's task class '"+ taskClass.getName()+"' with no task configuration";fail(msg);return;}
task =null;try{
task =(SourceTask) taskClass.getDeclaredConstructor().newInstance();}catch(IllegalAccessException|InstantiationException t){fail("Unable to instantiate connector's task class '"+ taskClass.getName()+"'", t);return;}try{SourceTaskContext taskContext =newSourceTaskContext();......
task.initialize(taskContext);
task.start(taskConfigs.get(0));
connectorCallback.ifPresent(DebeziumEngine.ConnectorCallback::taskStarted);}......
recordsSinceLastCommit =0;Throwable handlerError =null;try{
timeOfLastCommitMillis = clock.currentTimeInMillis();RecordCommitter committer =buildRecordCommitter(offsetWriter, task, commitTimeout);while(runningThread.get()!=null){List<SourceRecord> changeRecords =null;try{
LOGGER.debug("Embedded engine is polling task for records on thread {}", runningThread.get());
changeRecords = task.poll();// blocks until there are values ...
LOGGER.debug("Embedded engine returned from polling task for records");}catch(InterruptedException e){// Interrupted while polling ...
LOGGER.debug("Embedded engine interrupted on thread {} while polling the task for records", runningThread.get());if(this.runningThread.get()==Thread.currentThread()){// this thread is still set as the running thread -> we were not interrupted// due the stop() call -> probably someone else called the interrupt on us ->// -> we should raise the interrupt flagThread.currentThread().interrupt();}break;}catch(RetriableException e){
LOGGER.info("Retrieable exception thrown, connector will be restarted", e);// Retriable exception should be ignored by the engine// and no change records delivered.// The retry is handled in io.debezium.connector.common.BaseSourceTask.poll()}try{if(changeRecords !=null&&!changeRecords.isEmpty()){
LOGGER.debug("Received {} records from the task", changeRecords.size());
changeRecords = changeRecords.stream().map(transformations::transform).filter(x -> x !=null).collect(Collectors.toList());}if(changeRecords !=null&&!changeRecords.isEmpty()){
LOGGER.debug("Received {} transformed records from the task", changeRecords.size());try{
handler.handleBatch(changeRecords, committer);}catch(StopConnectorException e){break;}}else{
LOGGER.debug("Received no records from the task");}}catch(Throwable t){// There was some sort of unexpected exception, so we should stop work
handlerError = t;break;}}}...}}}
- 通过反射方式初始化connector获取
OracleConnector
对象 - 初始化offset的存储对象
- 设置offset提交策略
- 通过
connector.start(config.asMap());
将配置属性设置给Connector对象 - 通过反射方式从connector获取SourceTask,在这里获取的是
OracleConnectorTask
对象 - 通过调用
task.start(taskConfigs.get(0));
启动任务去获取oracle的变更数据,具体方法路径io.debezium.connector.oracle.OracleConnectorTask#start
,具体实现代码如下:
@OverridepublicChangeEventSourceCoordinatorstart(Configuration config){OracleConnectorConfig connectorConfig =newOracleConnectorConfig(config);TopicSelector<TableId> topicSelector =OracleTopicSelector.defaultSelector(connectorConfig);SchemaNameAdjuster schemaNameAdjuster =SchemaNameAdjuster.create();Configuration jdbcConfig = connectorConfig.jdbcConfig();
jdbcConnection =newOracleConnection(jdbcConfig,()->getClass().getClassLoader());this.schema =newOracleDatabaseSchema(connectorConfig, schemaNameAdjuster, topicSelector, jdbcConnection);this.schema.initializeStorage();String adapterString = config.getString(OracleConnectorConfig.CONNECTOR_ADAPTER);OracleConnectorConfig.ConnectorAdapter adapter =OracleConnectorConfig.ConnectorAdapter.parse(adapterString);OffsetContext previousOffset =getPreviousOffset(newOracleOffsetContext.Loader(connectorConfig, adapter));if(previousOffset !=null){
schema.recover(previousOffset);}
taskContext =newOracleTaskContext(connectorConfig, schema);Clock clock =Clock.system();// Set up the task record queue ...this.queue =newChangeEventQueue.Builder<DataChangeEvent>().pollInterval(connectorConfig.getPollInterval()).maxBatchSize(connectorConfig.getMaxBatchSize()).maxQueueSize(connectorConfig.getMaxQueueSize()).loggingContextSupplier(()-> taskContext.configureLoggingContext(CONTEXT_NAME)).build();
errorHandler =newOracleErrorHandler(connectorConfig.getLogicalName(), queue);finalOracleEventMetadataProvider metadataProvider =newOracleEventMetadataProvider();EventDispatcher<TableId> dispatcher =newEventDispatcher<>(
connectorConfig,
topicSelector,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),DataChangeEvent::new,
metadataProvider,
schemaNameAdjuster);finalOracleStreamingChangeEventSourceMetrics streamingMetrics =newOracleStreamingChangeEventSourceMetrics(taskContext, queue, metadataProvider,
connectorConfig);ChangeEventSourceCoordinator coordinator =newChangeEventSourceCoordinator(
previousOffset,
errorHandler,OracleConnector.class,
connectorConfig,newOracleChangeEventSourceFactory(connectorConfig, jdbcConnection, errorHandler, dispatcher, clock, schema, jdbcConfig, taskContext, streamingMetrics),newOracleChangeEventSourceMetricsFactory(streamingMetrics),
dispatcher,
schema);
coordinator.start(taskContext,this.queue, metadataProvider);return coordinator;}
- 创建一个任务上下文对象
taskContext
,改对象用来保存任务的参数和schema属性 - 设置一个消息队列
queue
,用来保存解析后的消息 - 创建事件分发器对象
dispatcher
,该对象用来下发解析后的数据到队列中 - 创建
io.debezium.pipeline.ChangeEventSourceCoordinator
对象,调用io.debezium.pipeline.ChangeEventSourceCoordinator#start, 方法中的会调用streamEvents,streamEvents最后调用io.debezium.connector.oracle.logminer.LogMinerStreamingChangeEventSource#execute
方法,该方法就是解析oracle日志的最终实现方法
publicvoidexecute(ChangeEventSourceContext context){try(TransactionalBuffer transactionalBuffer =newTransactionalBuffer(schema, clock, errorHandler, streamingMetrics)){try{
startScn = offsetContext.getScn();createFlushTable(jdbcConnection);if(!isContinuousMining && startScn.compareTo(getFirstOnlineLogScn(jdbcConnection, archiveLogRetention))<0){thrownewDebeziumException("Online REDO LOG files or archive log files do not contain the offset scn "+ startScn +". Please perform a new snapshot.");}setNlsSessionParameters(jdbcConnection);checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);initializeRedoLogsForMining(jdbcConnection,false, archiveLogRetention);HistoryRecorder historyRecorder = connectorConfig.getLogMiningHistoryRecorder();try{// todo: why can't OracleConnection be used rather than a Factory+JdbcConfiguration?
historyRecorder.prepare(streamingMetrics, jdbcConfiguration, connectorConfig.getLogMinerHistoryRetentionHours());finalLogMinerQueryResultProcessor processor =newLogMinerQueryResultProcessor(context, jdbcConnection,
connectorConfig, streamingMetrics, transactionalBuffer, offsetContext, schema, dispatcher,
clock, historyRecorder);finalString query =SqlUtils.logMinerContentsQuery(connectorConfig, jdbcConnection.username());try(PreparedStatement miningView = jdbcConnection.connection().prepareStatement(query,ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_READ_ONLY,ResultSet.HOLD_CURSORS_OVER_COMMIT)){
currentRedoLogSequences =getCurrentRedoLogSequences();Stopwatch stopwatch =Stopwatch.reusable();while(context.isRunning()){// Calculate time difference before each mining session to detect time zone offset changes (e.g. DST) on database server
streamingMetrics.calculateTimeDifference(getSystime(jdbcConnection));Instant start =Instant.now();
endScn =getEndScn(jdbcConnection, startScn, streamingMetrics, connectorConfig.getLogMiningBatchSizeDefault());flushLogWriter(jdbcConnection, jdbcConfiguration, isRac, racHosts);if(hasLogSwitchOccurred()){// This is the way to mitigate PGA leaks.// With one mining session, it grows and maybe there is another way to flush PGA.// At this point we use a new mining session
LOGGER.trace("Ending log mining startScn={}, endScn={}, offsetContext.getScn={}, strategy={}, continuous={}",
startScn, endScn, offsetContext.getScn(), strategy, isContinuousMining);endMining(jdbcConnection);initializeRedoLogsForMining(jdbcConnection,true, archiveLogRetention);abandonOldTransactionsIfExist(jdbcConnection, transactionalBuffer);// This needs to be re-calculated because building the data dictionary will force the// current redo log sequence to be advanced due to a complete log switch of all logs.
currentRedoLogSequences =getCurrentRedoLogSequences();}startLogMining(jdbcConnection, startScn, endScn, strategy, isContinuousMining, streamingMetrics);
stopwatch.start();
miningView.setFetchSize(connectorConfig.getMaxQueueSize());
miningView.setFetchDirection(ResultSet.FETCH_FORWARD);
miningView.setString(1, startScn.toString());
miningView.setString(2, endScn.toString());try(ResultSet rs = miningView.executeQuery()){Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal();
streamingMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
processor.processResult(rs);
startScn = endScn;if(transactionalBuffer.isEmpty()){
LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
offsetContext.setScn(startScn);}}
streamingMetrics.setCurrentBatchProcessingTime(Duration.between(start,Instant.now()));pauseBetweenMiningSessions();}}}finally{
historyRecorder.close();}}catch(Throwable t){logError(streamingMetrics,"Mining session stopped due to the {}", t);
errorHandler.setProducerThrowable(t);}finally{
LOGGER.info("startScn={}, endScn={}, offsetContext.getScn()={}", startScn, endScn, offsetContext.getScn());
LOGGER.info("Transactional buffer dump: {}", transactionalBuffer.toString());
LOGGER.info("Streaming metrics dump: {}", streamingMetrics.toString());}}}
- 创建一张临时表,保存最后一次解析的SCN,用来下一次解析的数据位置
CREATETABLE LOGMNR_FLUSH_TABLE (LAST_SCN NUMBER(19,0));
- 检查数据库和表有没有开启归档日志
checkSupplementalLogging(jdbcConnection, connectorConfig.getPdbName(), schema);
- 调用数据库的数据字典构建存储过程,并数据库的归档日志和在线日志添加到logminer中
privatevoidinitializeRedoLogsForMining(OracleConnection connection,boolean postEndMiningSession,Duration archiveLogRetention)throwsSQLException{if(!postEndMiningSession){if(OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)){buildDataDictionary(connection);}if(!isContinuousMining){setRedoLogFilesForMining(connection, startScn, archiveLogRetention);}}else{if(!isContinuousMining){if(OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO.equals(strategy)){buildDataDictionary(connection);}setRedoLogFilesForMining(connection, startScn, archiveLogRetention);}}}
以上代码在数据库中会执行如下语句:
BEGIN DBMS_LOGMNR_D.BUILD (options => DBMS_LOGMNR_D.STORE_IN_REDO_LOGS);END;# 查看在线日志列表SELECTMIN(F.MEMBER)AS FILE_NAME, L.NEXT_CHANGE# AS NEXT_CHANGE, F.GROUP#, L.FIRST_CHANGE# AS FIRST_CHANGE, L.STATUS FROM V$LOG L, V$LOGFILE F
WHERE F.GROUP# = L.GROUP# AND L.NEXT_CHANGE# > 0 GROUPBY F.GROUP#, L.NEXT_CHANGE#, L.FIRST_CHANGE#, L.STATUS ORDER BY 3;# 查看归档日志列表SELECT NAME AS FILE_NAME, NEXT_CHANGE# AS NEXT_CHANGE, FIRST_CHANGE# AS FIRST_CHANGE FROM V$ARCHIVED_LOG
WHERE NAME ISNOTNULLAND ARCHIVED ='YES'ANDSTATUS='A'AND NEXT_CHANGE# '?' --上一次爬取的scnAND DEST_ID IN(SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS WHERESTATUS='VALID'ANDTYPE='LOCAL'AND ROWNUM=1))ORDERBY2;
将归档日志合并到在线日志中,将合并的列表添加到logminer中用来解析
EGIN sys.dbms_logmnr.add_logfile(LOGFILENAME =>'" + fileName + "', OPTIONS =>"DBMS_LOGMNR.ADDFILE");END;
- 调用
sys.dbms_logmnr.start_logmnr
开始解析归档日志,并将解析的结果写入V$LOGMNR_CONTENTS
staticStringstartLogMinerStatement(Scn startScn,Scn endScn,OracleConnectorConfig.LogMiningStrategy strategy,boolean isContinuousMining){String miningStrategy;if(strategy.equals(OracleConnectorConfig.LogMiningStrategy.CATALOG_IN_REDO)){
miningStrategy ="DBMS_LOGMNR.DICT_FROM_REDO_LOGS + DBMS_LOGMNR.DDL_DICT_TRACKING ";}else{
miningStrategy ="DBMS_LOGMNR.DICT_FROM_ONLINE_CATALOG ";}if(isContinuousMining){
miningStrategy +=" + DBMS_LOGMNR.CONTINUOUS_MINE ";}return"BEGIN sys.dbms_logmnr.start_logmnr("+"startScn => '"+ startScn +"', "+"endScn => '"+ endScn +"', "+"OPTIONS => "+ miningStrategy +" + DBMS_LOGMNR.NO_ROWID_IN_STMT);"+"END;";}
最终查询结果的语句
SELECT SCN, SQL_REDO, OPERATION_CODE,TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, USERNAME, ROW_ID,ROLLBACKFROM V$LOGMNR_CONTENTS WHERE SCN >'2468014'AND SCN <='2468297'AND((OPERATION_CODE IN(5,34)AND USERNAME NOTIN('SYS','SYSTEM','FLINKUSER'))OR(OPERATION_CODE IN(7,36))OR(OPERATION_CODE IN(1,2,3)AND TABLE_NAME !='LOG_MINING_FLUSH'AND SEG_OWNER NOTIN('APPQOSSYS','AUDSYS','CTXSYS','DVSYS','DBSFWUSER','DBSNMP','GSMADMIN_INTERNAL','LBACSYS','MDSYS','OJVMSYS','OLAPSYS','ORDDATA','ORDSYS','OUTLN','SYS','SYSTEM','WMSYS','XDB')AND(REGEXP_LIKE(SEG_OWNER,'^flinkuser$','i'))AND(REGEXP_LIKE(SEG_OWNER ||'.'|| TABLE_NAME,'^flinkuser.test$','i'))))
解析查询的结果
miningView.setFetchSize(connectorConfig.getMaxQueueSize());
miningView.setFetchDirection(ResultSet.FETCH_FORWARD);
miningView.setString(1, startScn.toString());
miningView.setString(2, endScn.toString());try(ResultSet rs = miningView.executeQuery()){Duration lastDurationOfBatchCapturing = stopwatch.stop().durations().statistics().getTotal();
streamingMetrics.setLastDurationOfBatchCapturing(lastDurationOfBatchCapturing);
processor.processResult(rs);
startScn = endScn;if(transactionalBuffer.isEmpty()){
LOGGER.debug("Transactional buffer empty, updating offset's SCN {}", startScn);
offsetContext.setScn(startScn);}}
解析的具体类和方法
io.debezium.connector.oracle.logminer.LogMinerQueryResultProcessor#processResult
,这个类就是完成sql语句的解析,将sql语句中的字段和字段所对应的值解析到两个数组中,包装成一个Entry对象传递给converter去解析,这个的具体解析过程后面再补充。
使用中遇到的问题
如果oracle的dbName配置的是SID会出现链接不上的问题。
如果oracle的dbName配置的是SID,而不是service_name,就会出现链接不上的问题,出现这个问题的原因是犹豫oracle的默认链接是一
jdbc:oracle:thin@localhost:1521/service_name
方式拼接,如果要链接SID需要改成
jdbc:oracle:thin:@localhost:1521:sid
方式。在oracle cdc中可以手动指定oracle链接字符串,这样就会根据你指定的url方式去链接。
基于api的方式指定
properties.put("database.url","jdbc:oracle:thin:@localhost:1521:sid");
基于flink sql方式指定
基于sql设置属性需要加一个
debezium.
前缀
CREATETABLE GSP_PURCHASE_ORDER(
ORDER_ID STRING NOTNULL,
EBELN STRING,
BSTYP STRING,PRIMARYKEY(ORDER_ID)NOT ENFORCED
)WITH('connector'='oracle-cdc','debezium.database.url'='jdbc:oracle:thin:@localhost:1521:RACTEST1','debezium.database.tablename.case.insensitive'='false','hostname'='localhost','port'='1521','username'='****','password'='****','database-name'='RACTEST1','schema-name'='GSP_MODULE_UAT','table-name'='GSP_PURCHASE_ORDER');
oracle11g大小写敏感问题
oracle在查询表的状态的时候默认是会将表名转换为小写,这样查询的表补充日志的时候认为是没有,所以需要指定
database.tablename.case.insensitive
属性改成false,这样就不会自动将表名转换为小写去校验表,这样就能成功的校验出表是否有添加补充日志,对于oracle11g,表名和schema一定要大写。
基于api的修改方式
properties.put("database.tablename.case.insensitive","false");
基于sql的修改方式
修改方式参考第一个问题的修改方式。
版权归原作者 IT_xhf 所有, 如有侵权,请联系我们删除。