0


FlinkCDC系列之Oracle实时数据采集

Oracle的安装部署

注:本次部署采用docker, 采用oracle 11g

1. oracle的docker 镜像获取

docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g

2. 运行oracle的镜像,启动容器

docker run -d -p 1521:1521 --name oracle11g registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g

3. 查看容器是否启动

4. 进入容器修改oracle信息

docker exec -it b8fc967bec2d /bin/bash

5. 配置oracle的初始库和实例信息

su root

root用户密码:helowin

vi /etc/profile

添加一下配置信息:

export ORACLE_HOME=/home/oracle/app/oracle/product/11.2.0/dbhome_2
export ORACLE_SID=helowin
export PATH=$ORACLE_HOME/bin:$PATH

note: source /etc/profile

6. 登录,创建用户,授权

su oracle

sqlplus /nolog 或者sqlplus / as sysdba

SQL>alter user system identified bysytem;

SQL>alter user sys identified by sys;

SQL>ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED;

SQL>create user test identified by test;

SQL>grant connect,resource,dba to test;

7. navicate 客户端连接

Oracle 实时数据采集的准备

开启归档日志

首先要以sysdba的角色,登录oracle的客户端: sqlplus / as sysdba

1、检查归档日志是否开启

SELECT dbid, name, log_mode FROM v$database;

解释:

NAME: 数据库名称

LOG_MODE: 日志模式

  1. NOARCHIVELOG
  2. ARCHIVELOG
  3. MANUAL

注:noarchivelog表示未开启归档模式, archivelog表示开启自动归档模式, manual表示开启手动归档模式

ARCHIVE LOG LIST;

解释:

Automatic archival: 自动开启归档模式

next log sequence to archive: 下个将要归档的日志序号

这里要理清归档日志和重做日志的区别:

redolog:即重做日志

  1. 记录数据库的变化(DML, DDL)
  2. 用于数据块的recovery
  3. 以组的方式管理redo flie, 最少两组redo, 循环使用
  4. 和数据文件存放到不同的磁盘上,需读写速度快的磁盘(否则会成为瓶颈)

查询redolog的位置:select member from V$LOGFILE;

archivelog:归档日志

重做日志会不断循环拷贝到archivelog里。形成一个个归档文件。重做日志大小固定,归档文件可以设置大小:alter system set db_recovery_file_dest_size = 10G;

查看归档日志的位置:show parameter recover;

查看归档日志使用率:select * from V$FLASH_RECOVERY_AREA_USAGE;

2、如果没有开启归档日志,需要按照如下方式开启

SQL>alter system set db_recovery_file_dest_size = 10G;
SQL>alter system set db_recovery_file_dest = '/home/oracle/oracle-data-test' scope=spfile;
SQL>shutdown immediate;
SQL>startup mount;
SQL>alter database archivelog;
SQL>alter database open;

3、再次检查一下有没有开启

SQL>archive log list;

用户表空间创建

1、创建表空间

CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;

2、创建用户,授权用户

CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;

grant connect,resource,dba to flinkuser;

GRANT CREATE SESSION TO flinkcdc;
GRANT SELECT ON V_$DATABASE to flinkcdc;
GRANT FLASHBACK ANY TABLE TO flinkcdc;
GRANT SELECT ANY TABLE TO flinkcdc;
GRANT SELECT_CATALOG_ROLE TO flinkcdc;
GRANT EXECUTE_CATALOG_ROLE TO flinkcdc;
GRANT SELECT ANY TRANSACTION TO flinkcdc;
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
GRANT CREATE TABLE TO flinkcdc;
GRANT LOCK ANY TABLE TO flinkcdc;
GRANT ALTER ANY TABLE TO flinkcdc;
GRANT CREATE SEQUENCE TO flinkcdc;

GRANT EXECUTE ON DBMS_LOGMNR TO flinkcdc;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkcdc;

GRANT SELECT ON V_$LOG TO flinkcdc;
GRANT SELECT ON V_$LOG_HISTORY TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkcdc;
GRANT SELECT ON V_$LOGFILE TO flinkcdc;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkcdc;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkcdc;

3、加载导入数据

DROP TABLE "FLINKUSER"."ORDERS";
CREATE TABLE "FLINKUSER"."ORDERS" (
"ORDER_ID" NUMBER(9) NOT NULL ,
"ORDER_DATE" TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL ,
"CUSTOMER_NAME" VARCHAR2(255 BYTE) NOT NULL ,
"PRICE" NUMBER(10,5) NOT NULL ,
"PRODUCT_ID" NUMBER(9) NOT NULL ,
"ORDER_STATUS" NUMBER(1) NOT NULL
)
TABLESPACE "LOGMINER_TBS"
LOGGING
NOCOMPRESS
PCTFREE 10
INITRANS 1
STORAGE (
INITIAL 65536
NEXT 1048576
MINEXTENTS 1
MAXEXTENTS 2147483645
BUFFER_POOL DEFAULT
)
PARALLEL 1
NOCACHE
DISABLE ROW MOVEMENT
;


-- Records of "ORDERS"


INSERT INTO "FLINKUSER"."ORDERS" VALUES ('10001', TO_TIMESTAMP('2020-07-30 18:08:22.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Jark', '50.5', '102', '0');
INSERT INTO "FLINKUSER"."ORDERS" VALUES ('10002', TO_TIMESTAMP('2020-07-30 18:11:09.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Sally', '15', '105', '0');
INSERT INTO "FLINKUSER"."ORDERS" VALUES ('10003', TO_TIMESTAMP('2020-07-30 20:00:30.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Edward', '25.25', '106', '0');


-- Primary Key structure for table ORDERS


ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006996" PRIMARY KEY ("ORDER_ID");


-- Checks structure for table ORDERS


ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006990" CHECK ("ORDER_ID" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006991" CHECK ("ORDER_DATE" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006992" CHECK ("CUSTOMER_NAME" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006993" CHECK ("PRICE" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006994" CHECK ("PRODUCT_ID" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006995" CHECK ("ORDER_STATUS" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;

4、支持增量日志

数据库级别:ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;

表级别:ALTER TABLE test.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;

FlinkSQL实时采集数据演示

类型版本Flink1.16.1Flink CDC2.3.0

Flink Table/SQL 程序

1、pom.xml

<properties> <flink.version>1.16.1</flink.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties>
<dependencies>
     <!--flink 依赖-->
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-core</artifactId>
         <version>${flink.version}</version>
     </dependency>

    <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-streaming-java</artifactId>
         <version>${flink.version}</version>
     </dependency>

    <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-common</artifactId>
         <version>${flink.version}</version>
     </dependency>

    <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-api-java-bridge</artifactId>
         <version>${flink.version}</version>
     </dependency>

    <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-base</artifactId>
         <version>${flink.version}</version>
     </dependency>

    <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
         <version>${flink.version}</version>
     </dependency>
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-clients</artifactId>
         <version>${flink.version}</version>
     </dependency>

    <dependency>
         <groupId>com.ververica</groupId>
         <artifactId>flink-sql-connector-oracle-cdc</artifactId>
         <version>2.3.0</version>
     </dependency>

    <!-- web UI -->
     <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-runtime-web</artifactId>
         <version>${flink.version}</version>
     </dependency>

    <!-- 日志加载 -->
     <!-- logback必备依赖 -->
     <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>1.7.30</version>
         <scope>compile</scope>
     </dependency>
     <dependency>
         <groupId>ch.qos.logback</groupId>
         <artifactId>logback-core</artifactId>
         <version>1.2.3</version>
     </dependency>
     <dependency>
         <groupId>ch.qos.logback</groupId>
         <artifactId>logback-classic</artifactId>
         <version>1.2.3</version>
     </dependency>
 </dependencies>

2、table api

public class FlinkSqlCDCOracleSourceExample {

public static void main(String[] args) throws Exception {
     Configuration conf = new Configuration();
     //设置WebUI绑定的本地端口
     conf.setString(RestOptions.BIND_PORT,"8081");

    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
     StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
     tableEnv.executeSql(" CREATE TABLE orders (\n" +
             "   ORDER_ID INT,\n" +
             "   ORDER_DATE TIMESTAMP_LTZ(3),\n" +
             "   CUSTOMER_NAME STRING,\n" +
             "   PRICE DECIMAL(10, 5),\n" +
             "   PRODUCT_ID INT,\n" +
             "   ORDER_STATUS BOOLEAN\n" +
             " ) WITH (\n" +
             "   'connector' = 'oracle-cdc',\n" +
             "   'hostname' = 's171',\n" +
             "   'port' = '1521',\n" +
             "   'username' = 'flinkuser',\n" +
             "   'password' = 'flinkpw',\n" +
             "   'database-name' = 'HELOWIN',\n" +
             "   'schema-name' = 'flinkuser',  \n" +
             "   'table-name' = 'orders',\n" +
             "   'scan.startup.mode' = 'latest-offset',\n" +
             "   'debezium.poll.interval.ms' = '1000',\n" +
             "   'debezium.log.mining.strategy' = 'online_catalog',\n" +
             "    'debezium.log.mining.continuous.mine' = 'true'\n" +
             " )");

    Table table = tableEnv.sqlQuery("select * from orders");

// tableEnv.toRetractStream(table, Orders.class).print("Oracle cdc: ").setParallelism(1);
tableEnv.toChangelogStream(table).print("Oracle cdc: ").setParallelism(1);

     env.execute("Flink CDC: Oracle -> Print");
 }

}

3、启动程序

新增一条数据:INSERT INTO "FLINKUSER"."ORDERS"("ORDER_ID", "ORDER_DATE", "CUSTOMER_NAME", "PRICE", "PRODUCT_ID", "ORDER_STATUS") VALUES ('10006', TO_TIMESTAMP('2020-07-30 20:00:30.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Edward', '25.25', '106', '0');

Flink WEB UI

http://localhost:8081


本文转载自: https://blog.csdn.net/Tanger_Xiaolin/article/details/134178077
版权归原作者 纤竹有泪 所有, 如有侵权,请联系我们删除。

“FlinkCDC系列之Oracle实时数据采集”的评论:

还没有评论