0


Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表

文章目录

Oracle CDC配置(Non-CDB database)

第一步: 开启归档日志

  1. 使用sysdba角色登录到Oracle数据库
  2. 确保Oracle归档日志(Archive Log)已启用select log_mode from v$database;-- 查询结果应为ARCHIVELOG。
  3. 若未启用归档日志, 需运行以下命令启用归档日志1. 设置归档日志存储大小及位置- 设置数据库恢复文件存储区域的大小(如归档重做日志文件、控制文件备份等)alter system set db_recovery_file_dest_size =10G;- 设置恢复文件的实际物理存储路径;scope=spfile参数设置讲persist到spfile参数文件中,即实例重启后也仍然生效alter system set db_recovery_file_dest ='/opt/oracle/oradata/recovery_area' scope=spfile;2. 立即关闭数据库。这会回滚所有未提交的事务,并断开所有连接的会话,然后关闭数据库实例shutdown immediate;3. 启动数据库,但只到‘挂载’阶段,此时数据库文件对用户还不可用。在这个阶段,DBA可以进行一些特殊的管理任务,比如数据库的恢复或者切换日志模式startup mount;4. 将数据库的日志模式切换为归档日志模式。在归档日志模式下,数据库会保存所有的重做日志文件,这对于数据库恢复和数据库备份非常重要。alterdatabase archivelog;5. 将数据库从‘挂载’状态切换到‘开放’状态,此时数据库对用户可用,可进行正常的数据库操作。alterdatabaseopen;6. 再次确认归档日志是否已启用!select log_mode from v$database;-- 查询结果应为ARCHIVELOG。注意:1. 启用归档日志需要数据库重启,请谨慎操作!2. 归档日志会占用大量的磁盘空间,需定期清理过期的归档日志!
  4. 启用Supplemental logging 为捕获数据库中数据变更前的状态,必须在捕获的表或数据库上启用补充日志(Supplemental logging)- 为数据库启用supplemental loggingALTERDATABASEADD SUPPLEMENTAL LOG DATA;- 为指定表启用supplemental loggingALTERTABLE db.tableADD SUPPLEMENTAL LOG DATA(ALL)COLUMNS;

第二步: 创建Tablespace表空间

表空间是用来存储数据库对象(如表、索引等)的逻辑结构。

在Oracle中创建一个名为"logminer_tbs"的表空间,数据文件的路径为"/opt/oracle/oradata/SID/logminer_tbs.dbf",大小为25M,并且允许自动扩展,最大大小为无限。

执行以下SQL需要使用sysdba角色登录到Oracle数据库

CREATETABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

第三步: 创建用户并授予相应权限

执行以下SQL需要使用sysdba角色登录到Oracle数据库

GRANTCREATESESSIONTO cdc_user;-- 授予用户"cdc_user"创建会话的权限,允许用户连接到数据库。GRANTSET CONTAINER TO cdc_user;-- 授予用户"cdc_user"切换到不同的容器(Container)的权限。容器是Oracle 12c中的概念,用于隔离和管理数据库资源。GRANTSELECTON V_$DATABASEto cdc_user;-- 授予用户"cdc_user"对系统视图"V_$DATABASE"的SELECT权限。该视图提供了关于数据库实例的信息。GRANT FLASHBACK ANYTABLETO cdc_user;-- 授予用户"cdc_user"对任意表进行闪回(Flashback)操作的权限。闪回是一种用于还原或查询数据库历史数据的功能。GRANTSELECTANYTABLETO cdc_user;-- 授予用户"cdc_user"对任意表进行SELECT操作的权限。GRANT SELECT_CATALOG_ROLE TO cdc_user;-- 授予用户"cdc_user"执行SELECT_CATALOG_ROLE角色的权限。SELECT_CATALOG_ROLE角色允许用户查询数据库的元数据信息。GRANT EXECUTE_CATALOG_ROLE TO cdc_user;-- 授予用户"cdc_user"执行EXECUTE_CATALOG_ROLE角色的权限。EXECUTE_CATALOG_ROLE角色允许用户执行数据库的元数据操作。GRANTSELECTANYTRANSACTIONTO cdc_user;-- 授予用户"cdc_user"对任意事务进行SELECT操作的权限。GRANT LOGMINING TO cdc_user;-- 授予用户"cdc_user"进行日志挖掘(Log Mining)的权限。日志挖掘是一种用于分析和提取数据库变更信息的技术。GRANTLOCKANYTABLETO cdc_user;-- 授予用户"flinkuser"锁定任意表的权限。(需开启。不开启的话,无法采集数据)GRANTEXECUTEON DBMS_LOGMNR TO cdc_user;-- 授予用户"cdc_user"执行DBMS_LOGMNR包中的过程和函数的权限。DBMS_LOGMNR包提供了用于日志挖掘的功能。GRANTEXECUTEON DBMS_LOGMNR_D TO cdc_user;-- 授予用户"cdc_user"执行DBMS_LOGMNR_D包中的过程和函数的权限。DBMS_LOGMNR_D包扩展了DBMS_LOGMNR包的功能。GRANTSELECTON V_$LOG TO cdc_user;-- 授予用户"cdc_user"对系统视图"V_$LOG"的SELECT权限。该视图提供了关于日志文件的信息。GRANTSELECTON V_$LOG_HISTORY TO cdc_user;-- 授予用户"cdc_user"对系统视图"V_$LOG_HISTORY"的SELECT权限。该视图提供了关于历史日志文件的信息。GRANTSELECTON V_$LOGMNR_LOGS TO cdc_user;-- 授予用户"cdc_user"对系统视图"V_$LOGMNR_LOGS"的SELECT权限。该视图提供了关于日志挖掘所使用的日志文件的信息。GRANTSELECTON V_$LOGMNR_CONTENTS TO cdc_user;-- 授予用户"cdc_user"对系统视图"V_$LOGMNR_CONTENTS"的SELECT权限。该视图提供了关于日志挖掘的内容信息。GRANTSELECTON V_$LOGMNR_PARAMETERS TO cdc_user;-- 授予用户"cdc_user"对系统视图"V_$LOGMNR_PARAMETERS"的SELECT权限。该视图提供了关于日志挖掘参数的信息。GRANTSELECTON V_$LOGFILE TO cdc_user;-- 授予用户"cdc_user"对系统视图"V_$LOGFILE"的SELECT权限。该视图提供了关于日志文件的信息。GRANTSELECTON V_$ARCHIVED_LOG TO cdc_user;-- 授予用户"cdc_user"对系统视图"V_$ARCHIVED_LOG"的SELECT权限。该视图提供了关于已归档日志文件的信息。GRANTSELECTON V_$ARCHIVE_DEST_STATUS TO cdc_user;-- 授予用户"cdc_user"对系统视图"V_$ARCHIVE_DEST_STATUS"的SELECT权限。该视图提供了关于归档目标状态的信息。

Oracle CDC DataStream API实现

所使用软件的版本

  • java 1.8
  • Scala 2.11
  • Flink 1.14.2
  • Flink CDC 2.3.0
  • Source Oracle 19c
  • Sink MySQL 5.7
  • jackson 2.10.2

Oracle CDC DataStream API可实现一个job监控采集一个数据库的多个表.

1. 定义OracleSource

//源数据库连接配置文件Properties sourceDbProps =DbConfigUtil.loadConfig("oracle.properties");//Debezium配置Properties debeziumProps =newProperties();//参考 https://debezium.io/documentation/reference/1.6/connectors/oracle.html#oracle-property-log-mining-strategy
debeziumProps.setProperty("log.mining.strategy","online_catalog");
debeziumProps.setProperty("log.mining.continuous.mine","true");//decimal.handling.mode指定connector如何处理DECIMAL和NUMERIC列的值,有3种模式:precise、double和string//precise(默认值):以二进制形式在变更事件中精确表示它们,使用java.math.BigDecimal值来表示(此种模式采集会将DECIMAL和NUMERIC列转成二进制格式,不易读,不便于数据处理)//以double值来表示它们,这可能会到值精度丢失//string:将值编码为格式化的字符串,易于下游消费,但会丢失有关实际类型的语义信息。(建议使用此种模式,便于下游进行数据处理)
debeziumProps.setProperty("decimal.handling.mode","string");//Oracle CDC数据源SourceFunction<String> sourceFunction =OracleSource.<String>builder().hostname(sourceDbProps.getProperty("host")).port(Integer.parseInt(sourceDbProps.getProperty("port"))).database(sourceDbProps.getProperty("database"))// monitor database.schemaList(sourceDbProps.getProperty("schema_list").split(","))// monitor schema.tableList(sourceDbProps.getProperty("table_list").split(","))// monitor table.username(sourceDbProps.getProperty("username")).password(sourceDbProps.getProperty("password")).deserializer(newJsonDebeziumDeserializationSchema()).debeziumProperties(debeziumProps).startupOptions(StartupOptions.initial()).build();

2. 数据处理

参考: MySQL CDC配置及DataStream API实现代码

3. Sink到MySQL

参考: MySQL CDC配置及DataStream API实现代码

参考

  1. https://debezium.io/documentation/reference/1.6/connectors/oracle.html
  2. https://ververica.github.io/flink-cdc-connectors/release-2.3/content/connectors/oracle-cdc.html
  3. https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
标签: flink oracle

本文转载自: https://blog.csdn.net/lovetechlovelife/article/details/132841801
版权归原作者 Southwest- 所有, 如有侵权,请联系我们删除。

“Flink CDC-Oracle CDC配置及DataStream API实现代码...可实现监控采集一个数据库的多个表”的评论:

还没有评论