Oracle CDC Connector — CDC Connectors for Apache Flink® documentation
Flink CDC两种实现方式:
1.FlinkDataStream_CDC实现:
利用Flink_CDC自带的连接资源,如MySQLSource通过设置hostname、port、username、password、database、table、deserializer、startupOptions等参数配置
实现获取CRUD数据变化日志
2.FlinkSQL_CDC实现:
通过FlinkSQL创建虚拟表获取关键字段的变化情况并且配置hostname、port、username、password、database、table等参数可以看到具体表数据的变化过程
注意:FlinkSQL_CDC2.0仅支持Flink1.13之后的版本
Flink Table/SQL模块集成了数据库表和变化记录流(例如CDC的数据流)。作为同一事物的两面,结果是Upsert Message结构(+I表示新增、-U表示记录更新前的值、+U表示记录的更新值、-D表示删除)
两种方式对比:
1.FlinkDataStream_CDC支持多库多表的操作(优点)
2.FlinkFlinkDataStream_CDC需要自定义序列化器(缺点)
3.FlinkSQL_CDC只能单表操作(缺点)
4.FlinkSQL_CDC自动序列化(优点)
Oracle CDC 连接器
Maven依赖
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-oracle-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.2.1</version>
</dependency>
下载flink-sql-connector-oracle-cdc-2.2.1.jar放到<FLINK_HOME>/lib/.
设置Oracle
您必须为 Oracle 数据库启用日志归档,并定义一个对 Debezium Oracle 连接器监控的所有数据库具有适当权限的 Oracle 用户。
对于非 CDB 数据库
- 启用日志归档
(1.1)。以 DBA 身份连接到数据库
linux>ORACLE_SID=SID
linux>export ORACLE_SID
linux>sqlplus /nolog
linux>CONNECT sys/password AS SYSDBA
(1.2)。启用日志归档
sql>alter system set db_recovery_file_dest_size = 10G;
sql>alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
sql>shutdown immediate;
sql>startup mount;
sql>alter database archivelog;
sql>alter database open;
注意:
- 启用日志归档需要重启数据库,尝试时注意
- 归档日志会占用大量磁盘空间,建议定期清理过期日志
(1.3)。检查是否启用了日志归档
-- Should now "Database log mode: Archive Mode"
sql>archive log list;
注意:
必须为捕获的表或数据库启用补充日志记录,以便数据更改捕获已更改数据库行的之前状态。下面说明了如何在表/数据库级别进行配置。
-- Enable supplemental logging for a specific table:
sql>ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
-- Enable supplemental logging for database
sql>ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
- 创建具有权限的 Oracle 用户
(2.1)。创建表空间
sql>sqlplus sys/password@host:port/SID AS SYSDBA;
sql>CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
sql>exit;
(2.2)。创建用户并授予权限
sqlplus sys/password@host:port/SID AS SYSDBA;
CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
GRANT CREATE SESSION TO flinkuser;
GRANT SET CONTAINER TO flinkuser;
GRANT SELECT ON V_$DATABASE to flinkuser;
GRANT FLASHBACK ANY TABLE TO flinkuser;
GRANT SELECT ANY TABLE TO flinkuser;
GRANT SELECT_CATALOG_ROLE TO flinkuser;
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
GRANT LOCK ANY TABLE TO flinkuser;
GRANT ALTER ANY TABLE TO flinkuser;
GRANT CREATE SEQUENCE TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;
GRANT SELECT ON V_$LOG TO flinkuser;
GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
GRANT SELECT ON V_$LOGFILE TO flinkuser;
GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
exit;
连接器选项
OptionRequiredDefaultTypeDescriptionconnectorrequired(none)String指定要使用的连接器,这里应该是'oracle-cdc'.hostnamerequired(none)StringOracle 数据库服务器的 IP 地址或主机名。usernamerequired(none)String连接到 Oracle 数据库服务器时要使用的 Oracle 数据库的名称。passwordrequired(none)String连接到 Oracle 数据库服务器时使用的密码。database-namerequired(none)String要监视的 Oracle 服务器的数据库名称。schema-namerequired(none)String要监视的 Oracle 数据库的模式名称。table-namerequired(none)String要监视的 Oracle 数据库的表名。portoptional1521IntegerOracle 数据库服务器的整数端口号。scan.startup.modeoptionalinitialStringOracle CDC 消费者的可选启动模式,有效枚举为“initial”和“latest-offset”。有关更多详细信息,请参阅启动阅读位置部分。debezium.*optional(none)String将 Debezium 的属性传递给用于从 Oracle 服务器捕获数据更改的 Debezium Embedded Engine。例如:'debezium.snapshot.mode' = 'never'。查看有关
启动阅读位置
config 选项scan.startup.mode指定 Oracle CDC 使用者的启动模式。有效的枚举是:
- initial(默认):首次启动时对被监控的数据库表进行初始快照,并继续读取最新的binlog。
- latest-offset:永远不要在第一次启动时对受监控的数据库表执行快照,只需从连接器启动后的更改中读取。
注意:scan.startup.mode选项的机制依赖于 Debezium 的snapshot.mode配置。所以请不要一起使用它们。如果您在 DDL 表中指定了这两个scan.startup.mode和选项,它可能会导致不起作用。**debezium.snapshot.modescan.startup.mode
Oracle CDC SQL 连接器模板:
CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
ID INT NOT NULL,
NAME STRING,
DESCRIPTION STRING,
WEIGHT DECIMAL(10, 3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc',
'hostname' = 'Oracle_IP地址',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'schema-name' = 'inventory',
'table-name' = 'products'
);
注意:Oracle 方言是区分大小写的,如果字段名没有被引用,它会将字段名转换为大写,Flink SQL 不会转换字段名。oracle-cdc因此对于 oracle 数据库中的物理列,我们在 Flink SQL 中定义表时应该使用其在 Oracle 中转换后的字段名称。
Oracle CDC DataStream 连接器模板
Oracle CDC 连接器也可以是 DataStream 源。您可以创建一个 SourceFunction,如下所示:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.oracle.OracleSource;
public class OracleSourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
.hostname(Oracle_IP地址)
.port(1521)
.database("XE") // monitor XE database
.schemaList("inventory") // monitor inventory schema
.tableList("inventory.products") // monitor products table
.username("flinkuser")
.password("flinkpw")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}
数据类型映射
Oracle type
Flink SQL type
NUMBER(p, s <= 0), p - s < 3
TINYINT
NUMBER(p, s <= 0), p - s < 5
SMALLINT
NUMBER(p, s <= 0), p - s < 10
INT
NUMBER(p, s <= 0), p - s < 19
BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)
DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38
STRING
FLOAT
BINARY_FLOAT
FLOAT
DOUBLE PRECISION
BINARY_DOUBLE
DOUBLE
NUMBER(1)
BOOLEAN
DATE
TIMESTAMP [(p)]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] WITH TIME ZONE
TIMESTAMP [(p)] WITH TIME ZONE
TIMESTAMP [(p)] WITH LOCAL TIME ZONE
TIMESTAMP_LTZ [(p)]
CHAR(n)
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
NCLOB
XMLType
STRING
BLOB
ROWID
BYTES
INTERVAL DAY TO SECOND
INTERVAL YEAR TO MONTH
BIGINT
Oracle CDC SQL Oracle To ES 单表 应用实例
public class CP_LOGIN_INFO {
private static final Logger log = LoggerFactory.getLogger(CP_LOGIN_INFO.class); //设置log
public static void main(String[] args) throws Exception {
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance() //构建环境
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1); //设置流的并行
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, fsSettings); //流表环境创造
tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
log.info("This message contains {} placeholders. {}", 2, "Yippie"); // 打印日志
//配置检查点
env.enableCheckpointing(180000); // 开启checkpoint 每180000ms 一次
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);// 确认 checkpoints 之间的时间会进行 50000 ms
env.getCheckpointConfig().setCheckpointTimeout(600000); //设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//设置有且仅有一次模式 目前支持 EXACTLY_ONCE/AT_LEAST_ONCE
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 设置并发checkpoint的数目
env.getCheckpointConfig().setCheckpointStorage("hdfs:///flink-checkpoints/oracle/CP_LOGIN_INFO"); // 这个是存放到hdfs目录下
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 开启在 job 中止后仍然保留的 externalized checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints(); // 开启实验性的 unaligned checkpoints
String sourceDDL ="CREATE TABLE Oracle_Source (\n" +
" ID DECIMAL(12,0), \n" +
" USER_CODE STRING, \n" +
" LOGIN_TIME STRING, \n" + //需要设置成STRING类型,Data或者TIMESTAMP 类型无法CAST转换
" OVER_TIME STRING, \n" + //需要设置成STRING类型,Data或者TIMESTAMP 类型无法CAST转换
" TOKEN STRING, \n" +
" INSERT_TIME_HIS STRING, \n" + //需要设置成STRING类型,Data或者TIMESTAMP 类型无法CAST转换
" UPDATE_TIME_HIS STRING, \n" + //需要设置成STRING类型,Data或者TIMESTAMP 类型无法CAST转换
" VERSION STRING, \n" +
" PRIMARY KEY (ID) NOT ENFORCED \n" +
" ) WITH (\n" +
" 'connector' = 'oracle-cdc',\n" +
" 'hostname' = 'Oracle_IP地址',\n" +
" 'port' = '1521',\n" +
" 'username' = 'xxx',\n" +
" 'password' = 'xxx',\n" +
" 'database-name' = 'ORCL',\n" +
" 'schema-name' = 'XIEPEITEST',\n" + // 注意这里要大写
" 'table-name' = 'CP_LOGIN_INFO',\n" +
" 'debezium.log.mining.continuous.mine'='true',\n" + //oracle11G可以设置此参数,19c会报错
" 'debezium.log.mining.strategy'='online_catalog',\n" + //只读oracle日志不会参数新的归档日志文件
" 'debezium.log.mining.sleep.time.increment.ms'='5000',\n" + //设置睡眠时间可以降低Oracle连接进行内存上涨速度
" 'debezium.log.mining.batch.size.max'='50000000000000',\n" + //如果此值太小会造成SCN追不上,而导致任务失败
" 'debezium.log.mining.batch.size.min'='10000',\n" +
" 'debezium.log.mining.session.max.ms'='1200000',\n" + //设置会话连接时长,如果您的重做日志不经常切换,您可以通过指定 Oracle 切换日志的频率来避免 ORA-04036 错误
" 'debezium.database.tablename.case.insensitive'='false',\n" + //关闭大小写
" 'scan.startup.mode' = 'initial' \n" + //全量模式,先全量后自动记录增量
" )";
// 创建一张用于输出的表 时间字段转换成BIGINT (CAST)
String sinkDDL = "CREATE TABLE SinkTable (\n" +
" ID DECIMAL(12,0), \n" +
" USER_CODE STRING, \n" +
" LOGIN_TIME BIGINT, \n" + //根据业务要求 需要设置成BIGINT类型 es落盘会自动创建对应的字段与相应的类型
" OVER_TIME BIGINT, \n" + //根据业务要求 需要设置成BIGINT类型 es落盘会自动创建对应的字段与相应的类型
" TOKEN STRING, \n" +
" INSERT_TIME_HIS BIGINT, \n" + //根据业务要求 需要设置成BIGINT类型 es落盘会自动创建对应的字段与相应的类型
" UPDATE_TIME_HIS BIGINT, \n" + //根据业务要求 需要设置成BIGINT类型 es落盘会自动创建对应的字段与相应的类型
" VERSION STRING, \n" +
" PRIMARY KEY (ID) NOT ENFORCED \n" +
") WITH (\n" +
" 'connector' = 'elasticsearch-7',\n" +
" 'hosts' = 'http://ES_IP地址:9200',\n" +
" 'format' = 'json',\n" + //一定要加
" 'index' = 'cp_login_info_test',\n" +
" 'username' = 'elastic',\n" +
" 'password' = 'xxx',\n" +
" 'failure-handler' = 'ignore',\n" +
" 'sink.flush-on-checkpoint' = 'true' ,\n"+
" 'sink.bulk-flush.max-actions' = '20000' ,\n"+
" 'sink.bulk-flush.max-size' = '2mb' ,\n"+
" 'sink.bulk-flush.interval' = '1000ms' ,\n"+
" 'sink.bulk-flush.backoff.strategy' = 'CONSTANT',\n"+
" 'sink.bulk-flush.backoff.max-retries' = '3',\n"+
" 'connection.max-retry-timeout' = '3153600000000',\n"+ //设置es连接时间,太短的话会自动断连
" 'sink.bulk-flush.backoff.delay' = '100ms'\n"+
")";
String transformSQL =
" INSERT INTO SinkTable SELECT ID,\n" +
"USER_CODE,\n" +
"(CAST(LOGIN_TIME AS BIGINT) - 8 * 60 * 60 * 1000 ) as LOGIN_TIME,\n" + //类型转换 - 8小时 (CAST(字段 AS 类型 )-时区差) as 新字段名或落盘字段名
"(CAST(OVER_TIME AS BIGINT) - 8 * 60 * 60 * 1000 ) as OVER_TIME,\n" + //类型转换 - 8小时 (CAST(字段 AS 类型 )-时区差) as 新字段名或落盘字段名
"TOKEN,\n" +
"(CAST(INSERT_TIME_HIS AS BIGINT) - 8 * 60 * 60 * 1000 ) as INSERT_TIME_HIS,\n" + //类型转换 - 8小时 (CAST(字段 AS 类型 )-时区差) as 新字段名或落盘字段名
"(CAST(UPDATE_TIME_HIS AS BIGINT) - 8 * 60 * 60 * 1000 ) as UPDATE_TIME_HIS,\n" + //类型转换 - 8小时 (CAST(字段 AS 类型 )-时区差) as 新字段名或落盘字段名
"VERSION FROM Oracle_Source " ;
//执行source表ddl
tableEnv.executeSql(sourceDDL);
//执行sink表ddl
tableEnv.executeSql(sinkDDL);
//执行逻辑sql语句
TableResult tableResult = tableEnv.executeSql(transformSQL);
tableResult.print();
env.execute();
}
}
Oracle CDC DataStream 应用实例
- 编写运行脚本
public class OracleSourceExample2 {
//自定义source 源 oracle
public static void main(String[] args) throws Exception {
//构建source
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
.hostname("Oracle_IP地址")
.port(1521)
.database("ORA19C") // monitor XE database
.schemaList("FLINKUSER") // monitor inventory schema
.tableList("FLINKUSER.T_Oracle_Score") // monitor products table 如果这个注销掉的话,就会监控所有表
.username("flinkuser")
.password("flinkpw")
.deserializer(new CustomerDeserialization())
//.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置flink 执行环境
env.setParallelism(1); // use parallelism 1 for sink to keep message ordering
DataStreamSource<String> stream = env.addSource(sourceFunction); //添加流
stream.print(); //打印流
//定义ES服务器地址列表
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("ES_IP地址", 9200, "http"));
// 设置ES sink方法
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) { //插入数据没有的话进行创建索引
Map<String, String> json = new HashMap<>();
//调用类解析数据
CustomerDeserialization.AfterBean afterBean = JSONObject.parseObject(element,CustomerDeserialization.AfterBean.class);
CustomerDeserialization Data = JSON.parseObject(element,CustomerDeserialization.class);
System.out.println("===============");
System.out.println(String.valueOf(Data.getAfter().getID()));
json.put("ID", String.valueOf(Data.getAfter().getID()));
json.put("NAME",Data.getAfter().getNAME());
return Requests.indexRequest()
.index("testES")
.id(String.valueOf(Data.getAfter().getID())) //必须插入id 以后才能做更新和删除
.source(json);
}
public UpdateRequest updateIndexRequest(String element)throws IOException { //更新数据
//设置表的index和type,必须设置id才能update
CustomerDeserialization.AfterBean afterBean = JSONObject.parseObject(element,CustomerDeserialization.AfterBean.class);
CustomerDeserialization Data = JSON.parseObject(element,CustomerDeserialization.class);
UpdateRequest updateRequest=new UpdateRequest();
System.out.println("*******:"+Data.getAfter().getNAME());
System.out.println(element);
String id = String.valueOf(Data.getAfter().getID());
String data = Data.getAfter().getNAME();
System.out.println(id);
updateRequest
.index("testES").upsert()
.id(id) //指定索引id
.doc(XContentFactory.jsonBuilder().startObject().field("NAME",data).endObject()); //指定数据
return updateRequest;
}
private DeleteRequest deleteDocumentById(String element) { //删除数据
CustomerDeserialization.AfterBean afterBean = JSONObject.parseObject(element,CustomerDeserialization.AfterBean.class);
CustomerDeserialization Data = JSON.parseObject(element,CustomerDeserialization.class);
DeleteRequest deleteRequest = new DeleteRequest();
return deleteRequest
.index("testES")
.id(String.valueOf(Data.getBefore().getID())); //根据id删除数据
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
//根据类解析
CustomerDeserialization.AfterBean afterBean = JSONObject.parseObject(element,CustomerDeserialization.AfterBean.class);
CustomerDeserialization Data = JSON.parseObject(element,CustomerDeserialization.class);
//判断接收的流做增删改
if (Data.getAfter().getID() !=null && Data.getBefore().getID() == null) {
System.out.println("*****插入******");
indexer.add(createIndexRequest(element));
} else if (Data.getAfter().getID() !=null && Data.getBefore().getID() !=null){
try {
System.out.println("*****更新******");
indexer.add(updateIndexRequest(element));
} catch (IOException e) {
e.printStackTrace();
}
} else if (Data.getAfter().getID() ==null && Data.getBefore().getID() !=null) {
System.out.println("*****删除******");
indexer.add(deleteDocumentById(element));
}
}
}
);
//批量请求的配置; 这指示接收器在每个元素之后发出,否则它们将被缓冲
//刷新前缓冲的最大动作量
esSinkBuilder.setBulkFlushMaxActions(1);
//刷新前缓冲区的最大数据大小(以MB为单位)
esSinkBuilder.setBulkFlushMaxSizeMb(500);
//论缓冲操作的数量或大小如何都要刷新的时间间隔
esSinkBuilder.setBulkFlushInterval(501);
stream.addSink(esSinkBuilder.build());
env.execute();
}
}
- 2.编写反序列化
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
/**
* 封装的数据格式
* {
* "database":"",
* "tableName":"",
* "before":{"id":"","tm_name":""....},
* "after":{"id":"","tm_name":""....},
* "type":"c u d",
* //"ts":xxxxxxxxxxxxx
* }
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建JSON对象用于存储最终数据
JSONObject result = new JSONObject();
//2.获取库名&表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
//6.将字段写入JSON对象
result.put("database", database);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);
//7.输出数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
private AfterBean after;
private AfterBean before;
public AfterBean getAfter() {
return after;
}
public void setAfter(AfterBean after) {
this.after = after;
}
public AfterBean getBefore() {
return before;
}
public void setBefore(AfterBean before) {
this.before = before;
}
private String type;
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public static class AfterBean {
private Long ID;
private String NAME;
public Long getID() {
return ID;
}
public void setID(Long ID) {
this.ID = ID;
}
public void setNAME(String NAME) {
this.NAME = NAME;
}
public String getNAME() {
return NAME;
}
}
}
Oracle CDC SQL Oracle TO ES 多表 应用实例
1.编写运行脚本
public class OracleSourceExample2 {
public static void main(String[] args) throws Exception {
//构建source
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
.hostname("Oracle_IP地址")
.port(1521)
.database("ORA19C") // monitor XE database
.schemaList("FLINKUSER") // monitor inventory schema
.tableList("FLINKUSER.T_Oracle_Score,FLINKUSER.T_Oracle_Score2") // monitor products table 如果这个注销掉的话,就会监控所有表
.username("flinkuser")
.password("flinkpw")
.deserializer(new DataStreamMoreOracleToES.CustomerDeserialization()) //自定义反序列化器
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置flink 执行环境
env.setParallelism(1); // use parallelism 1 for sink to keep message ordering
DataStreamSource<String> stream = env.addSource(sourceFunction);
// 定义旁路输出的tag,旁路输出时会绑定该tag。这里需要定义tag的名称。
final OutputTag<String> sideOutputTag1 = new OutputTag<String>("sideOutput1") {};
final OutputTag<String> sideOutputTag2 = new OutputTag<String>("sideOutput2") {};
// 处理数据时,输出到一个或多个侧输出流 ,分流操作
SingleOutputStreamOperator<String> mainDataStream = stream
.process(new ProcessFunction<String,String>() {
@Override
public void processElement(String element, Context context, Collector<String> collector) throws Exception {
DataStreamMoreOracleToES.CustomerDeserialization.AfterBean afterBean = JSON.parseObject(element, DataStreamMoreOracleToES.CustomerDeserialization.AfterBean.class);
DataStreamMoreOracleToES.CustomerDeserialization Data = JSON.parseObject(element, DataStreamMoreOracleToES.CustomerDeserialization.class);
String seasonType = Data.getTableName();
switch (seasonType){
case "T_ORACLE_SCORE":
context.output(sideOutputTag1,element);
break;
case "T_ORACLE_SCORE2":
context.output(sideOutputTag2,element);
break;
}
}
});
DataStream<String> T_ORACLE_SCORE = mainDataStream.getSideOutput(sideOutputTag1); //设置标签
SingleOutputStreamOperator<JSONObject> table1 = T_ORACLE_SCORE.map(JSON::parseObject)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject o, long l) {
return o.getLong("ts_ms");
}
}));
DataStream<String> T_ORACLE_SCORE2 = mainDataStream.getSideOutput(sideOutputTag2);//设置标签
SingleOutputStreamOperator<JSONObject> table2 = T_ORACLE_SCORE2.map(JSON::parseObject)
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<JSONObject>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<JSONObject>() {
@Override
public long extractTimestamp(JSONObject o, long l) {
return o.getLong("ts_ms");
}
}));
table1.print("表1:");
table2.print("表2:");
//设置intervalJoin 指定时间内进行join,数据缺失没有的会丢弃 不join上
SingleOutputStreamOperator <DataStreamMoreOracleToES.CustomerDeserialization.WideOrder.AfterBean > wideOrderDS = table1
.keyBy(jsonObject -> jsonObject.getJSONObject("after").getLong("ID"))
.intervalJoin(table2.keyBy(jsonObject -> jsonObject.getJSONObject("after").getLong("ID")))
.between(Time.milliseconds(-100000), Time.milliseconds(100000))
.process(new ProcessJoinFunction<JSONObject, JSONObject, CustomerDeserialization.WideOrder.AfterBean >() {
@Override
public void processElement(JSONObject ord, JSONObject cust, Context context, Collector<DataStreamMoreOracleToES.CustomerDeserialization.WideOrder.AfterBean > collector) throws Exception {
DataStreamMoreOracleToES.CustomerDeserialization.WideOrder.AfterBean wideOrder = new DataStreamMoreOracleToES.CustomerDeserialization.WideOrder.AfterBean();
JSONObject ordAfter = ord.getJSONObject("after");
JSONObject custAfter = cust.getJSONObject("after");
wideOrder.setID(ordAfter.getLong("ID"));
wideOrder.setNAME(ordAfter.getString("NAME"));
wideOrder.setAGE(custAfter.getLong("AGE"));
JSONObject ordBefore = ord.getJSONObject("before");
JSONObject custBefore = cust.getJSONObject("before");
wideOrder.setBrforeID(ordBefore.getLong("ID"));
collector.collect(wideOrder);
}
});
SingleOutputStreamOperator<String> stringWideOrderDS = wideOrderDS.map(wideOrder -> wideOrder.toString());
stringWideOrderDS.print();
//定义ES服务器地址列表
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("ES_IP地址", 9200, "http"));
// 设置ES sink方法
ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<String>() {
public IndexRequest createIndexRequest(String element) {
Map<String, String> json = new HashMap<>();
DataStreamMoreOracleToES.CustomerDeserialization.AfterBean afterBean = JSONObject.parseObject(element, DataStreamMoreOracleToES.CustomerDeserialization.AfterBean.class);
DataStreamMoreOracleToES.CustomerDeserialization Data = JSON.parseObject(element, DataStreamMoreOracleToES.CustomerDeserialization.class);
System.out.println("===============");
System.out.println(String.valueOf(Data.getAfter().getID()));
json.put("ID", String.valueOf(Data.getAfter().getID()));
json.put("NAME",Data.getAfter().getNAME());
json.put("AGE", String.valueOf(Data.getAfter().getAGE()));
return Requests.indexRequest()
.index("testES")
.id(String.valueOf(Data.getAfter().getID()))
.source(json);
}
public UpdateRequest updateIndexRequest(String element)throws IOException {
//设置表的index和type,必须设置id才能update
DataStreamMoreOracleToES.CustomerDeserialization.AfterBean afterBean = JSONObject.parseObject(element, DataStreamMoreOracleToES.CustomerDeserialization.AfterBean.class);
DataStreamMoreOracleToES.CustomerDeserialization Data = JSON.parseObject(element, DataStreamMoreOracleToES.CustomerDeserialization.class);
UpdateRequest updateRequest=new UpdateRequest();
//设置表的index和type,必须设置id才能update
System.out.println("*******:"+Data.getAfter().getNAME());
System.out.println(element);
String id = String.valueOf(Data.getAfter().getID());
String data = Data.getAfter().getNAME();
System.out.println(id);
updateRequest
.index("testES").upsert()
.id(id)
.doc(XContentFactory.jsonBuilder().startObject().field("NAME",data).endObject());
return updateRequest;
}
private DeleteRequest deleteDocumentById(String element) {
DataStreamMoreOracleToES.CustomerDeserialization.AfterBean afterBean = JSONObject.parseObject(element, DataStreamMoreOracleToES.CustomerDeserialization.AfterBean.class);
DataStreamMoreOracleToES.CustomerDeserialization Data = JSON.parseObject(element, DataStreamMoreOracleToES.CustomerDeserialization.class);
DeleteRequest deleteRequest = new DeleteRequest();
return deleteRequest
.index("testES")
.id(String.valueOf(Data.getBefore().getID()));
}
@Override
public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
DataStreamMoreOracleToES.CustomerDeserialization.AfterBean afterBean = JSONObject.parseObject(element, DataStreamMoreOracleToES.CustomerDeserialization.AfterBean.class);
DataStreamMoreOracleToES.CustomerDeserialization Data = JSON.parseObject(element, DataStreamMoreOracleToES.CustomerDeserialization.class);
if (Data.getAfter().getID() !=null ) {
System.out.println("*****插入******");
System.out.println("***:"+element);
indexer.add(createIndexRequest(element));
} else if (Data.getAfter().getID() !=null && Data.getBefore().getID() !=null){
try {
System.out.println("*****更新******");
indexer.add(updateIndexRequest(element));
} catch (IOException e) {
e.printStackTrace();
}
} else if (Data.getAfter().getID() ==null && Data.getBefore().getID() !=null) {
System.out.println("*****删除******");
indexer.add(deleteDocumentById(element));
}
}
}
);
//批量请求的配置; 这指示接收器在每个元素之后发出,否则它们将被缓冲
//刷新前缓冲的最大动作量
esSinkBuilder.setBulkFlushMaxActions(1);
//刷新前缓冲区的最大数据大小(以MB为单位)
esSinkBuilder.setBulkFlushMaxSizeMb(500);
//论缓冲操作的数量或大小如何都要刷新的时间间隔
esSinkBuilder.setBulkFlushInterval(501);
env.execute();
}
}
2.编写反序列化
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
/**
* 封装的数据格式
* {
* "database":"",
* "tableName":"",
* "before":{"id":"","tm_name":""....},
* "after":{"id":"","tm_name":""....},
* "type":"c u d",
* //"ts":156456135615
* }
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建JSON对象用于存储最终数据
JSONObject result = new JSONObject();
//2.获取库名&表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
//String ts_ms = fields[2];
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//4.获取timestamp
long ts = (long) value.get("ts_ms");
//5.获取操作类型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
//6.将字段写入JSON对象
result.put("database", database);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);
result.put("ts_ms",ts);
//7.输出数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
private AfterBean after;
private AfterBean before;
public AfterBean getAfter() {
return after;
}
public void setAfter(AfterBean after) {
this.after = after;
}
public AfterBean getBefore() {
return before;
}
public void setBefore(AfterBean before) {
this.before = before;
}
private String type;
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
private String tableName;
public String getTableName() {
return tableName;
}
public void setTableName(String tableName) {
this.tableName = tableName;
}
public static class AfterBean {
/**
* EMPNO : zhangsan
*/
private Long ID;
private String NAME;
private Long AGE;
public Long getID() {
return ID;
}
public void setID(Long ID) {
this.ID = ID;
}
public void setNAME(String NAME) {
this.NAME = NAME;
}
public String getNAME() {
return NAME;
}
public Long getAGE() {
return AGE;
}
public void setAGE(Long AGE) {
this.AGE = AGE;
}
}
//多表
public static class WideOrder {
private AfterBean after;
private AfterBean before;
public AfterBean getAfter() {
return after;
}
public void setAfter(AfterBean after) {
this.after = after;
}
public AfterBean getBefore() {
return before;
}
public void setBefore(AfterBean before) {
this.before = before;
}
public static class AfterBean {
private Long ID;
private Long BrforeID;
private String NAME;
private Long AGE;
private Timestamp ts; // come from orders-stream
public Long getID() {
return ID;
}
public String getNAME() {
return NAME;
}
public Long getAGE() {
return AGE;
}
public Timestamp getTs() {
return ts;
}
public void setID(Long ID) {
this.ID = ID;
}
public void setNAME(String NAME) {
this.NAME = NAME;
}
public void setAGE(Long AGE) {
this.AGE = AGE;
}
public void setTs(Timestamp ts) {
this.ts = ts;
}
public Long getBrforeID() {
return BrforeID;
}
public void setBrforeID(Long brforeID) {
BrforeID = brforeID;
}
@Override
public String toString() {
JSONObject afterjsonObject = new JSONObject();
afterjsonObject.put("ID",ID);
afterjsonObject.put("NAME",NAME);
afterjsonObject.put("AGE",AGE);
String after = afterjsonObject.toJSONString();
JSONObject beforejsonObject = new JSONObject();
beforejsonObject.put("ID",BrforeID);
String before = beforejsonObject.toJSONString();
return "{\"before\":" +before+","+"\"after\":"+after+ '}';
}
}
public WideOrder() {}
}
}
Oracle CDC SQL Oracle TO PG 单表 应用实例
1.编写运行脚本
public class OracleSourceExample {
//自定义source 源 oracle
public static void main(String[] args) throws Exception {
//构建source
SourceFunction<String> sourceFunction = OracleSource.<String>builder()
.hostname("Oracle_IP地址")
.port(1521)
.database("ORA19C") // monitor XE database
.schemaList("FLINKUSER") // monitor inventory schema
.tableList("FLINKUSER.T_Oracle_Score") // monitor products table 如果这个注销掉的话,就会监控所有表
.username("flinkuser")
.password("flinkpw")
//.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.deserializer(new CustomerDeserialization())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置flink 执行环境
env.setParallelism(1); // use parallelism 1 for sink to keep message ordering
DataStreamSource<String> stream = env.addSource(sourceFunction);
stream.print(); //打印流
stream.addSink(new MyJdbcSink()); //添加 落盘
env.execute();
}
// 实现自定义的SinkFunction
public static class MyJdbcSink extends RichSinkFunction<String> {
// 声明连接和预编译语句
Connection connection = null;
PreparedStatement insertStmt = null;
PreparedStatement updateStmt = null;
PreparedStatement deleteStmt = null;
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("org.postgresql.Driver");//加载数据库驱动
connection = DriverManager.getConnection("jdbc:postgresql://PG_IP地址/postgres", "postgres", "postgres");
insertStmt = connection.prepareStatement("insert into t_pg_sink values (?,?)");
updateStmt = connection.prepareStatement("update t_pg_sink set name = ? where id = ?");
deleteStmt = connection.prepareStatement("delete from t_pg_sink where id = ?");
}
// 每来一条数据,调用连接,执行sql
@Override
public void invoke(String value, Context context) throws Exception {
// 直接执行更新语句,如果没有更新那么就插入
CustomerDeserialization.AfterBean afterBean = com.alibaba.fastjson.JSON.parseObject(value, CustomerDeserialization.AfterBean.class);
CustomerDeserialization Type = JSON.parseObject(value, CustomerDeserialization.class);
System.out.println("***");
if (Type.getAfter().getID() == null && Type.getBefore().getID() !=null ) {
System.out.println("删除");
deleteStmt.setInt(1,Integer.parseInt(Type.getBefore().getID()));
deleteStmt.execute(); //执行删除
} else {
updateStmt.setString(1, Type.getAfter().getNAME());
updateStmt.setInt(2, Integer.parseInt(Type.getAfter().getID()));
updateStmt.execute(); //执行更新
if (updateStmt.getUpdateCount() == 0) {
insertStmt.setInt(1, Integer.parseInt(Type.getAfter().getID()));
insertStmt.setString(2, Type.getAfter().getNAME());
insertStmt.execute(); //执行插入
}
}
}
@Override
public void close() throws Exception {
deleteStmt.close();
insertStmt.close();
updateStmt.close();
connection.close();
}
}
}
2.编写反序列化
public class CustomerDeserialization implements DebeziumDeserializationSchema<String> {
/**
* 封装的数据格式
* {
* "database":"",
* "tableName":"",
* "before":{"id":"","tm_name":""....},
* "after":{"id":"","tm_name":""....},
* "type":"c u d",
* //"ts":156456135615
* }
*/
@Override
public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
//1.创建JSON对象用于存储最终数据
JSONObject result = new JSONObject();
//2.获取库名&表名
String topic = sourceRecord.topic();
String[] fields = topic.split("\\.");
String database = fields[1];
String tableName = fields[2];
Struct value = (Struct) sourceRecord.value();
//3.获取"before"数据
Struct before = value.getStruct("before");
JSONObject beforeJson = new JSONObject();
if (before != null) {
Schema beforeSchema = before.schema();
List<Field> beforeFields = beforeSchema.fields();
for (Field field : beforeFields) {
Object beforeValue = before.get(field);
beforeJson.put(field.name(), beforeValue);
}
}
//4.获取"after"数据
Struct after = value.getStruct("after");
JSONObject afterJson = new JSONObject();
if (after != null) {
Schema afterSchema = after.schema();
List<Field> afterFields = afterSchema.fields();
for (Field field : afterFields) {
Object afterValue = after.get(field);
afterJson.put(field.name(), afterValue);
}
}
//5.获取操作类型 CREATE UPDATE DELETE
Envelope.Operation operation = Envelope.operationFor(sourceRecord);
String type = operation.toString().toLowerCase();
if ("create".equals(type)) {
type = "insert";
}
//6.将字段写入JSON对象
result.put("database", database);
result.put("tableName", tableName);
result.put("before", beforeJson);
result.put("after", afterJson);
result.put("type", type);
//7.输出数据
collector.collect(result.toJSONString());
}
@Override
public TypeInformation<String> getProducedType() {
return BasicTypeInfo.STRING_TYPE_INFO;
}
private AfterBean after;
private AfterBean before;
public AfterBean getAfter() {
return after;
}
public void setAfter(AfterBean after) {
this.after = after;
}
public AfterBean getBefore() {
return before;
}
public void setBefore(AfterBean before) {
this.before = before;
}
private String type;
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public static class AfterBean {
private String ID;
private String NAME;
public String getID() {
return ID;
}
public void setID(String ID) {
this.ID = ID;
}
public void setNAME(String NAME) {
this.NAME = NAME;
}
public String getNAME() {
return NAME;
}
}
}
版权归原作者 房石阳明i 所有, 如有侵权,请联系我们删除。