0


Flink cdc 实时监听oracle归档日志及oracle数据延迟参数调优

Flink cdc实时监听oracle归档日志,oracle数据延迟等问题调优

1. Oracle 配置

1.1 oracle 归档日志开启配置

1.数据库服务器终端执行命令

sqlplus /as sysdba
或
sqlplus /nolog
CONNECT sys/password@host:port ASSYSDBA;

2.检查归档日志是否开启

archive log list;

(“Database log mode: No Archive Mode”,日志归档未开启)
(“Database log mode: Archive Mode”,日志归档已开启)
3.配置归档日志参数

alter system set db_recovery_file_dest_size = 100G;
alter system set db_recovery_file_dest ='/opt/oracle/oradata/recovery_area' scope=spfile;

4.创建表空间

CREATETABLESPACE logminer_tbs DATAFILE'/opt/oracle/oradata/SID/logminer_tbs.dbf'SIZE 25M REUSEAUTOEXTENDONMAXSIZEUNLIMITED;SID文件夹需使用root用户提前创建,赋予读写权限:chmod 777

5.启用归档日志

shutdown immediate; #停止oracle服务
startup mount; #启动oracle服务
alter database archivelog; #开启数据库归档
alter database open;

6.启动完成后重新执行 archive log list; 查看归档打开状态

1.2 为cdc 创建特定用户

账号为 flinkuser 密码为flinkpw
(执行命令中用户、密码、主机、端口需自行替换)

sqlplus sys/password@host:port/SIDASSYSDBA;CREATEUSER flinkuser IDENTIFIEDBY flinkpw DEFAULTTABLESPACELOGMINER_TBSQUOTAUNLIMITEDONLOGMINER_TBS;GRANTCREATESESSIONTO flinkuser;GRANTSETCONTAINERTO flinkuser;GRANTSELECTONV_$DATABASE to flinkuser;GRANTFLASHBACKANYTABLETO flinkuser;GRANTSELECTANYTABLETO flinkuser;GRANTSELECT_CATALOG_ROLETO flinkuser;GRANTEXECUTE_CATALOG_ROLETO flinkuser;GRANTSELECTANYTRANSACTIONTO flinkuser;GRANTLOGMININGTO flinkuser;GRANTCREATETABLETO flinkuser;-- need not to execute ifset scan.incremental.snapshot.enabled=true(default)GRANTLOCKANYTABLETO flinkuser;GRANTALTERANYTABLETO flinkuser;GRANTCREATESEQUENCETO flinkuser;GRANTEXECUTEONDBMS_LOGMNRTO flinkuser;GRANTEXECUTEONDBMS_LOGMNR_DTO flinkuser;GRANTSELECTONV_$LOGTO flinkuser;GRANTSELECTONV_$LOG_HISTORYTO flinkuser;GRANTSELECTONV_$LOGMNR_LOGSTO flinkuser;GRANTSELECTONV_$LOGMNR_CONTENTSTO flinkuser;GRANTSELECTONV_$LOGMNR_PARAMETERSTO flinkuser;GRANTSELECTONV_$LOGFILETO flinkuser;GRANTSELECTONV_$ARCHIVED_LOGTO flinkuser;GRANTSELECTONV_$ARCHIVE_DEST_STATUSTO flinkuser;
  exit;

1.3 指定oracle表、库级启用

-- 指定表启用补充日志记录:ALTERTABLE database.table ADDSUPPLEMENTALLOGDATA(ALL)COLUMNS;-- 为数据库的所有表启用
ALTERDATABASEADDSUPPLEMENTALLOGDATA(ALL)COLUMNS;-- 指定数据库启用补充日志记录
ALTERDATABASEADDSUPPLEMENTALLOGDATA;-- 提交修改
ALTERSYSTEMSWITCHLOGFILE;

2. oracle CDC Connector

2.1 Flink core

flink使用oracle cdc连接器时数据延迟较大,需加入debezium参数进行调整,以下代码均已加入该配置。

Properties properties =newProperties();//设置数据库连接参数(表名大小写转换)
properties.setProperty("database.tablename.case.insensitive","false");
properties.setProperty("database.connection.adapter","logminer");//降低oracle cdc 延迟
properties.setProperty("log.mining.strategy","online_catalog");
properties.setProperty("log.mining.continuous.mine","true");//创建Stream环境
Configuration configuration =newConfiguration();

StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment(configuration);//oracle 连接器配置
SourceFunction<String> build = OracleSource.<String>builder().hostname("localhost").port(1521).database("XE")// monitor XE database.schemaList("test")// monitor test schema.tableList("DBTEST.tbtest1,"+"DBTEST.TBTEST2")// monitor tables.username("flinkuser").password("flinkpw")//从最新位置读取,可自行修改initial()、latest().startupOptions(com.ververica.cdc.connectors.oracle.table.StartupOptions.latest()).deserializer(newFlinkCdcDataDeserializationSchema())// converts SourceRecord to JSON String.debeziumProperties(properties).build();

env.setParallelism(2);

DataStreamSource<String> stringDataStreamSource = env.addSource(build);

stringDataStreamSource.addSink(newCustomSink());

env.execute();

2.2 Flink sql

CREATETABLEproducts(
    db_name STRINGMETADATAFROM'database_name'VIRTUAL,
    schema_name STRINGMETADATAFROM'schema_name'VIRTUAL, 
    table_name STRINGMETADATAFROM'table_name'VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3)METADATAFROM'op_ts'VIRTUAL,IDINTNOTNULL,NAMESTRING,DESCRIPTIONSTRING,WEIGHTDECIMAL(10,3),PRIMARYKEY(id)NOTENFORCED)WITH('connector'='oracle-cdc','hostname'='localhost','port'='1521','username'='flinkuser','password'='flinkpw','database-name'='XE','schema-name'='test','table-name'='tbtest','debezium.log.mining.strategy'='online_catalog','debezium.log.mining.continuous.mine'='true');

3. 补充

3.1 oracle相关记录

SQL>select status from v$instance 查看实例状态
SQL>alter database open;打开数据库

##无法启动时错误代码1261、1263 可指定pfile文件启动(路径按实际位置)
startup mount pfile=’/oracle/app/oracle/admin/win01/pfile/init.ora’;

3.2 官方文档地址

https://ververica.github.io/flink-cdc-connectors/master/content/connectors/oracle-cdc.html

标签: oracle 数据库 flink

本文转载自: https://blog.csdn.net/haoheiao/article/details/128657518
版权归原作者 yuhh_ 所有, 如有侵权,请联系我们删除。

“Flink cdc 实时监听oracle归档日志及oracle数据延迟参数调优”的评论:

还没有评论