Oracle的安装部署
注:本次部署采用docker, 采用oracle 11g
1. oracle的docker 镜像获取
docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
2. 运行oracle的镜像,启动容器
docker run -d -p 1521:1521 --name oracle11g registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g
3. 查看容器是否启动
4. 进入容器修改oracle信息
docker exec -it b8fc967bec2d /bin/bash
5. 配置oracle的初始库和实例信息
su root
root用户密码:helowin
vi /etc/profile
添加一下配置信息:
export ORACLE_HOME=/home/oracle/app/oracle/product/11.2.0/dbhome_2
export ORACLE_SID=helowin
export PATH=$ORACLE_HOME/bin:$PATH
note: source /etc/profile
6. 登录,创建用户,授权
su oracle
sqlplus /nolog 或者sqlplus / as sysdba
SQL>alter user system identified bysytem;
SQL>alter user sys identified by sys;
SQL>ALTER PROFILE DEFAULT LIMIT PASSWORD_LIFE_TIME UNLIMITED;
SQL>create user test identified by test;
SQL>grant connect,resource,dba to test;
7. navicate 客户端连接
Oracle 实时数据采集的准备
开启归档日志
首先要以sysdba的角色,登录oracle的客户端: sqlplus / as sysdba
1、检查归档日志是否开启
SELECT dbid, name, log_mode FROM v$database;
解释:
NAME: 数据库名称
LOG_MODE: 日志模式
- NOARCHIVELOG
- ARCHIVELOG
- MANUAL
注:noarchivelog表示未开启归档模式, archivelog表示开启自动归档模式, manual表示开启手动归档模式
ARCHIVE LOG LIST;
解释:
Automatic archival: 自动开启归档模式
next log sequence to archive: 下个将要归档的日志序号
这里要理清归档日志和重做日志的区别:
redolog:即重做日志
- 记录数据库的变化(DML, DDL)
- 用于数据块的recovery
- 以组的方式管理redo flie, 最少两组redo, 循环使用
- 和数据文件存放到不同的磁盘上,需读写速度快的磁盘(否则会成为瓶颈)
查询redolog的位置:select member from V$LOGFILE;
archivelog:归档日志
重做日志会不断循环拷贝到archivelog里。形成一个个归档文件。重做日志大小固定,归档文件可以设置大小:alter system set db_recovery_file_dest_size = 10G;
查看归档日志的位置:show parameter recover;
查看归档日志使用率:select * from V$FLASH_RECOVERY_AREA_USAGE;
2、如果没有开启归档日志,需要按照如下方式开启
SQL>alter system set db_recovery_file_dest_size = 10G;
SQL>alter system set db_recovery_file_dest = '/home/oracle/oracle-data-test' scope=spfile;
SQL>shutdown immediate;
SQL>startup mount;
SQL>alter database archivelog;
SQL>alter database open;
3、再次检查一下有没有开启
SQL>archive log list;
用户表空间创建
1、创建表空间
CREATE TABLESPACE logminer_tbs DATAFILE '/home/oracle/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
2、创建用户,授权用户
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
grant connect,resource,dba to flinkuser;
GRANT CREATE SESSION TO flinkcdc;
GRANT SELECT ON V_$DATABASE to flinkcdc;
GRANT FLASHBACK ANY TABLE TO flinkcdc;
GRANT SELECT ANY TABLE TO flinkcdc;
GRANT SELECT_CATALOG_ROLE TO flinkcdc;
GRANT EXECUTE_CATALOG_ROLE TO flinkcdc;
GRANT SELECT ANY TRANSACTION TO flinkcdc;
GRANT EXECUTE ON SYS.DBMS_LOGMNR TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
GRANT CREATE TABLE TO flinkcdc;
GRANT LOCK ANY TABLE TO flinkcdc;
GRANT ALTER ANY TABLE TO flinkcdc;
GRANT CREATE SEQUENCE TO flinkcdc;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkcdc;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkcdc;
GRANT SELECT ON V_$LOG TO flinkcdc;
GRANT SELECT ON V_$LOG_HISTORY TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkcdc;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkcdc;
GRANT SELECT ON V_$LOGFILE TO flinkcdc;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkcdc;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkcdc;
3、加载导入数据
DROP TABLE "FLINKUSER"."ORDERS";
CREATE TABLE "FLINKUSER"."ORDERS" (
"ORDER_ID" NUMBER(9) NOT NULL ,
"ORDER_DATE" TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL ,
"CUSTOMER_NAME" VARCHAR2(255 BYTE) NOT NULL ,
"PRICE" NUMBER(10,5) NOT NULL ,
"PRODUCT_ID" NUMBER(9) NOT NULL ,
"ORDER_STATUS" NUMBER(1) NOT NULL
)
TABLESPACE "LOGMINER_TBS"
LOGGING
NOCOMPRESS
PCTFREE 10
INITRANS 1
STORAGE (
INITIAL 65536
NEXT 1048576
MINEXTENTS 1
MAXEXTENTS 2147483645
BUFFER_POOL DEFAULT
)
PARALLEL 1
NOCACHE
DISABLE ROW MOVEMENT
;
-- Records of "ORDERS"
INSERT INTO "FLINKUSER"."ORDERS" VALUES ('10001', TO_TIMESTAMP('2020-07-30 18:08:22.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Jark', '50.5', '102', '0');
INSERT INTO "FLINKUSER"."ORDERS" VALUES ('10002', TO_TIMESTAMP('2020-07-30 18:11:09.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Sally', '15', '105', '0');
INSERT INTO "FLINKUSER"."ORDERS" VALUES ('10003', TO_TIMESTAMP('2020-07-30 20:00:30.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Edward', '25.25', '106', '0');
-- Primary Key structure for table ORDERS
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006996" PRIMARY KEY ("ORDER_ID");
-- Checks structure for table ORDERS
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006990" CHECK ("ORDER_ID" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006991" CHECK ("ORDER_DATE" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006992" CHECK ("CUSTOMER_NAME" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006993" CHECK ("PRICE" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006994" CHECK ("PRODUCT_ID" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;
ALTER TABLE "FLINKUSER"."ORDERS" ADD CONSTRAINT "SYS_C006995" CHECK ("ORDER_STATUS" IS NOT NULL) NOT DEFERRABLE INITIALLY IMMEDIATE NORELY VALIDATE;
4、支持增量日志
数据库级别:ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
表级别:ALTER TABLE test.table2 ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
FlinkSQL实时采集数据演示
类型版本Flink1.16.1Flink CDC2.3.0
Flink Table/SQL 程序
1、pom.xml
<properties> <flink.version>1.16.1</flink.version> <scala.binary.version>2.12</scala.binary.version> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties><dependencies>
<!--flink 依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-oracle-cdc</artifactId>
<version>2.3.0</version>
</dependency>
<!-- web UI -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 日志加载 -->
<!-- logback必备依赖 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>
2、table api
public class FlinkSqlCDCOracleSourceExample {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//设置WebUI绑定的本地端口
conf.setString(RestOptions.BIND_PORT,"8081");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql(" CREATE TABLE orders (\n" +
" ORDER_ID INT,\n" +
" ORDER_DATE TIMESTAMP_LTZ(3),\n" +
" CUSTOMER_NAME STRING,\n" +
" PRICE DECIMAL(10, 5),\n" +
" PRODUCT_ID INT,\n" +
" ORDER_STATUS BOOLEAN\n" +
" ) WITH (\n" +
" 'connector' = 'oracle-cdc',\n" +
" 'hostname' = 's171',\n" +
" 'port' = '1521',\n" +
" 'username' = 'flinkuser',\n" +
" 'password' = 'flinkpw',\n" +
" 'database-name' = 'HELOWIN',\n" +
" 'schema-name' = 'flinkuser', \n" +
" 'table-name' = 'orders',\n" +
" 'scan.startup.mode' = 'latest-offset',\n" +
" 'debezium.poll.interval.ms' = '1000',\n" +
" 'debezium.log.mining.strategy' = 'online_catalog',\n" +
" 'debezium.log.mining.continuous.mine' = 'true'\n" +
" )");
Table table = tableEnv.sqlQuery("select * from orders");
// tableEnv.toRetractStream(table, Orders.class).print("Oracle cdc: ").setParallelism(1);
tableEnv.toChangelogStream(table).print("Oracle cdc: ").setParallelism(1);
env.execute("Flink CDC: Oracle -> Print");
}
}
3、启动程序
新增一条数据:INSERT INTO "FLINKUSER"."ORDERS"("ORDER_ID", "ORDER_DATE", "CUSTOMER_NAME", "PRICE", "PRODUCT_ID", "ORDER_STATUS") VALUES ('10006', TO_TIMESTAMP('2020-07-30 20:00:30.000', 'SYYYY-MM-DD HH24:MI:SS:FF3'), 'Edward', '25.25', '106', '0');
Flink WEB UI
版权归原作者 纤竹有泪 所有, 如有侵权,请联系我们删除。