Flink cdc实时监听oracle归档日志,oracle数据延迟等问题调优
1. Oracle 配置
1.1 oracle 归档日志开启配置
1.数据库服务器终端执行命令
sqlplus /as sysdba
或
sqlplus /nolog
CONNECT sys/password@host:port ASSYSDBA;
2.检查归档日志是否开启
archive log list;
(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3.配置归档日志参数
alter system set db_recovery_file_dest_size = 100G;
alter system set db_recovery_file_dest ='/opt/oracle/oradata/recovery_area' scope=spfile;
4.创建表空间
CREATETABLESPACE logminer_tbs DATAFILE'/opt/oracle/oradata/SID/logminer_tbs.dbf'SIZE 25M REUSEAUTOEXTENDONMAXSIZEUNLIMITED;SID文件夹需使用root用户提前创建,赋予读写权限:chmod 777
5.启用归档日志
shutdown immediate; #停止oracle服务
startup mount; #启动oracle服务
alter database archivelog; #开启数据库归档
alter database open;
6.启动完成后重新执行 archive log list; 查看归档打开状态
1.2 为cdc 创建特定用户
账号为 flinkuser 密码为flinkpw
(执行命令中用户、密码、主机、端口需自行替换)
sqlplus sys/password@host:port/SIDASSYSDBA;CREATEUSER flinkuser IDENTIFIEDBY flinkpw DEFAULTTABLESPACELOGMINER_TBSQUOTAUNLIMITEDONLOGMINER_TBS;GRANTCREATESESSIONTO flinkuser;GRANTSETCONTAINERTO flinkuser;GRANTSELECTONV_$DATABASE to flinkuser;GRANTFLASHBACKANYTABLETO flinkuser;GRANTSELECTANYTABLETO flinkuser;GRANTSELECT_CATALOG_ROLETO flinkuser;GRANTEXECUTE_CATALOG_ROLETO flinkuser;GRANTSELECTANYTRANSACTIONTO flinkuser;GRANTLOGMININGTO flinkuser;GRANTCREATETABLETO flinkuser;-- need not to execute ifset scan.incremental.snapshot.enabled=true(default)GRANTLOCKANYTABLETO flinkuser;GRANTALTERANYTABLETO flinkuser;GRANTCREATESEQUENCETO flinkuser;GRANTEXECUTEONDBMS_LOGMNRTO flinkuser;GRANTEXECUTEONDBMS_LOGMNR_DTO flinkuser;GRANTSELECTONV_$LOGTO flinkuser;GRANTSELECTONV_$LOG_HISTORYTO flinkuser;GRANTSELECTONV_$LOGMNR_LOGSTO flinkuser;GRANTSELECTONV_$LOGMNR_CONTENTSTO flinkuser;GRANTSELECTONV_$LOGMNR_PARAMETERSTO flinkuser;GRANTSELECTONV_$LOGFILETO flinkuser;GRANTSELECTONV_$ARCHIVED_LOGTO flinkuser;GRANTSELECTONV_$ARCHIVE_DEST_STATUSTO flinkuser;
exit;
1.3 指定oracle表、库级启用
-- 指定表启用补充日志记录:ALTERTABLE database.table ADDSUPPLEMENTALLOGDATA(ALL)COLUMNS;-- 为数据库的所有表启用
ALTERDATABASEADDSUPPLEMENTALLOGDATA(ALL)COLUMNS;-- 指定数据库启用补充日志记录
ALTERDATABASEADDSUPPLEMENTALLOGDATA;-- 提交修改
ALTERSYSTEMSWITCHLOGFILE;
2. oracle CDC Connector
2.1 Flink core
flink使用oracle cdc连接器时数据延迟较大,需加入debezium参数进行调整,以下代码均已加入该配置。
Properties properties =newProperties();//设置数据库连接参数(表名大小写转换)
properties.setProperty("database.tablename.case.insensitive","false");
properties.setProperty("database.connection.adapter","logminer");//降低oracle cdc 延迟
properties.setProperty("log.mining.strategy","online_catalog");
properties.setProperty("log.mining.continuous.mine","true");//创建Stream环境
Configuration configuration =newConfiguration();
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);//oracle 连接器配置
SourceFunction<String> build = OracleSource.<String>builder().hostname("localhost").port(1521).database("XE")// monitor XE database.schemaList("test")// monitor test schema.tableList("DBTEST.tbtest1,"+"DBTEST.TBTEST2")// monitor tables.username("flinkuser").password("flinkpw")//从最新位置读取,可自行修改initial()、latest().startupOptions(com.ververica.cdc.connectors.oracle.table.StartupOptions.latest()).deserializer(newFlinkCdcDataDeserializationSchema())// converts SourceRecord to JSON String.debeziumProperties(properties).build();
env.setParallelism(2);
DataStreamSource<String> stringDataStreamSource = env.addSource(build);
stringDataStreamSource.addSink(newCustomSink());
env.execute();
2.2 Flink sql
CREATETABLEproducts(
db_name STRINGMETADATAFROM'database_name'VIRTUAL,
schema_name STRINGMETADATAFROM'schema_name'VIRTUAL,
table_name STRINGMETADATAFROM'table_name'VIRTUAL,
operation_ts TIMESTAMP_LTZ(3)METADATAFROM'op_ts'VIRTUAL,IDINTNOTNULL,NAMESTRING,DESCRIPTIONSTRING,WEIGHTDECIMAL(10,3),PRIMARYKEY(id)NOTENFORCED)WITH('connector'='oracle-cdc','hostname'='localhost','port'='1521','username'='flinkuser','password'='flinkpw','database-name'='XE','schema-name'='test','table-name'='tbtest','debezium.log.mining.strategy'='online_catalog','debezium.log.mining.continuous.mine'='true');
3. 补充
3.1 oracle相关记录
SQL>select status from v$instance 查看实例状态
SQL>alter database open;打开数据库
##无法启动时错误代码1261、1263 可指定pfile文件启动(路径按实际位置)
startup mount pfile=’/oracle/app/oracle/admin/win01/pfile/init.ora’;
3.2 官方文档地址
https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html
版权归原作者 yuhh_ 所有, 如有侵权,请联系我们删除。