文章目录
一、Flink CDC、Flink、CDC各有啥关系
Flink:流式计算框架,不包含 Flink CDC,和 Flink CDC没关系
CDC:是一种思想,理念,不涉及某一门具体的技术。CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。
Flink CDC:是 CDC 的一种实现而已,不属于 Flink 子版块。这个技术是阿里开发的。目的是为了丰富 Flink 的生态。
1.1 概述
Flink CDC 基于数据库日志的
Change Data Caputre
技术,实现了全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。
1.2 和 jdbc Connectors 对比
JDBC Connectors
连接器,确实可以读取外部的 数据库。比如:MySQL、Oracle、SqlServer等。但是,JDBC连数据库,只是瞬时操作,没办法持续监听数据库的数据变化。
Flink CDC Connectors
,可以实现数据库的变更捕获,能够持续不断地把变更数据同步到下游的系统中。
官网概述:https://ververica.github.io/flink-cdc-connectors/
github链接:https://github.com/ververica/flink-cdc-connectors
二、使用
FlinkCDC 同步数据,有两种方式,一种是 FlinkSQL 的方式,一种是Flink DataStream 和 Table API 的方式。
我这里直接用的是 ieda 测试的 DataStream 方式。
代码来自:https://github.com/yclxiao/flink-cdc-demo/tree/main/src/main/java/com/yclxiao/flinkcdcdemo
CloudAcctProfit2DwsHdjProfitRecordAPI.java
importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importcom.xiaoqiang.utils.JdbcUtil;importorg.apache.commons.lang3.StringUtils;importorg.apache.commons.lang3.time.DateFormatUtils;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importorg.apache.flink.util.Collector;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.ResultSet;importjava.sql.Statement;importjava.util.*;publicclassCloudAcctProfit2DwsHdjProfitRecordAPI{privatestaticfinalLogger LOG =LoggerFactory.getLogger(CloudAcctProfit2DwsHdjProfitRecordAPI.class);privatestaticString MYSQL_HOST ="x.x.x.x";privatestaticint MYSQL_PORT =3306;privatestaticString MYSQL_USER ="root";privatestaticString MYSQL_PASSWD ="xiaoqiang";privatestaticString SYNC_DB ="league_test";privatestaticList<String> SYNC_TABLES =Arrays.asList("league_test.oc_settle_profit");publicstaticvoidmain(String[] args)throwsException{MySqlSource<String> mySqlSource =MySqlSource.<String>builder().hostname(MYSQL_HOST).port(MYSQL_PORT).databaseList(SYNC_DB)// set captured database.tableList(String.join(",", SYNC_TABLES))// set captured table.username(MYSQL_USER).password(MYSQL_PASSWD).deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"CDC Source"+"xiaoqiang-flink");List<String> tableList =getTableList();System.out.println("tableList--->"+tableList);for(String tbl : tableList){SingleOutputStreamOperator<String> filterStream =filterTableData(cdcSource,"oc_settle_profit");// SingleOutputStreamOperator<String> cleanStream = clean(filterStream);// 流的数据sink出去
filterStream.addSink(newCustomDealDataSink()).name("sink "+ tbl);}
env.execute("xiaoqiang-flink");}/**
* 自定义sink
*/privatestaticclassCustomDealDataSinkextendsRichSinkFunction<String>{privatetransientConnection coalitiondbConnection;privatetransientStatement coalitiondbStatement;privatetransientConnection cloudConnection;privatetransientStatement cloudStatement;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);// 在这里初始化 JDBC 连接
coalitiondbConnection =DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/league_test","root","");
coalitiondbStatement = coalitiondbConnection.createStatement();
cloudConnection =DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/cloud_test","root","");
cloudStatement = cloudConnection.createStatement();}@Overridepublicvoidinvoke(String value,Context context)throwsException{// 解析拿到的CDC-JSON数据JSONObject rowJson = JSON.parseObject(value);String outNo = rowJson.getString("out_no");Integer userType = rowJson.getInteger("user_type");String id = rowJson.getString("id");String payOrderNo = rowJson.getString("pay_order_no");String title = rowJson.getString("title");String fromUserId = rowJson.getString("from_user_id");String fromAccountId = rowJson.getString("from_account_id");String userId = rowJson.getString("user_id");String accountId = rowJson.getString("account_id");Integer amount = rowJson.getInteger("amount");Integer profitState = rowJson.getInteger("profit_state");Date profitTime = rowJson.getTimestamp("profit_time");Integer refundState = rowJson.getInteger("refund_state");Date refundTime = rowJson.getTimestamp("refund_time");Date addTime = rowJson.getTimestamp("add_time");String remark = rowJson.getString("remark");String acctCircle = rowJson.getString("acct_circle");Integer fromUserType = rowJson.getInteger("from_user_type");String companyId = rowJson.getString("company_id");String bizCompanyId = rowJson.getString("biz_company_id");// if (1 != profitState || !"PG11111".equals(acctCircle)) {// return;// }//// // 读取相关表的数据(与其他表进行关联)// Integer bizType = null;// String contributeUserId = null;// String relationBrandOwnerId = null;// ResultSet virtualOrderResultSet = coalitiondbStatement.executeQuery("select * from tc_virtual_order where order_type != 2 and id = '" + outNo + "'");// // 如果是tc_virtual_order订单(上岗卡、安心卡、课程)// if (virtualOrderResultSet.next()) {// // 处理数据逻辑// Integer virtualOrder4OrderType = virtualOrderResultSet.getInt("order_type");// String virtualOrder4CompanyId = virtualOrderResultSet.getString("company_id");// String virtualOrder4BrandId = virtualOrderResultSet.getString("brand_id");// // 上岗卡订单排掉,因为已经有别的任务处理了// if (virtualOrder4OrderType == 2) {// return;// }// // orderType转换// if (virtualOrder4OrderType == 6) {// bizType = 10;// } else if (virtualOrder4OrderType == 1) {// bizType = 11;// } else if (virtualOrder4OrderType == 5) {// bizType = 12;// }// // userType转换// if (virtualOrder4OrderType == 6 && userType == 92) {// contributeUserId = virtualOrder4CompanyId;// } else if (virtualOrder4OrderType == 1 && userType == 92) {// contributeUserId = virtualOrder4CompanyId;// } else if (virtualOrder4OrderType == 5 && userType == 92) {// contributeUserId = virtualOrder4CompanyId;// }// // relationBrandOwnerId转换// if (virtualOrder4OrderType == 6 && userType == 90) {// relationBrandOwnerId = virtualOrder4BrandId;// } else if (virtualOrder4OrderType == 1 && userType == 90) {// relationBrandOwnerId = virtualOrder4BrandId;// } else if (virtualOrder4OrderType == 5 && userType == 90) {// relationBrandOwnerId = virtualOrder4BrandId;// }// // remark转换// if (virtualOrder4OrderType == 1 || virtualOrder4OrderType == 5) {// remark = title;// }// } else {// // 如果不是tc_virtual_order的数据,则可能是其他数据,此处只保留好到家实物商品数据// if (StringUtils.isBlank(payOrderNo)) {// return;// }// ResultSet acctPayOrderResultSet = cloudStatement.executeQuery("select * from acct_pay_order t where t.id = '" + payOrderNo + "'");// if (!acctPayOrderResultSet.next()) {// return;// }// Integer payCate = acctPayOrderResultSet.getInt("pay_cate");// if (200100 != payCate) { // 好到家实物商品类型// return;// }//// bizType = 20;// if (userType == 92 && StringUtils.isNotBlank(bizCompanyId)) {// contributeUserId = bizCompanyId;// } else if (userType == 90 && StringUtils.isNotBlank(bizCompanyId)) {// ResultSet brandOwnerIdResultSet = cloudStatement.executeQuery("select * from uc_brand_partner t where t.company_id = '" + bizCompanyId + "'");// if (brandOwnerIdResultSet.next()) {// relationBrandOwnerId = brandOwnerIdResultSet.getString("brand_owner_id");// }// }// }// if (StringUtils.isBlank(remark)) {// remark = title;// }// 数据写入到mysqlString insertSql ="INSERT INTO dws_profit_record_hdj_flink_api (id, show_profit_id, order_no, from_user_id, from_user_type, user_id,\n"+" user_type, amount, profit_time, state, acct_circle, biz_type,\n"+" contribute_user_id, relation_brand_owner_id, remark, add_time)\n"+"VALUES ('"+ id +"', '"+"JSD"+ id +"', '"+ outNo +"', '"+ fromUserId +"', "+ fromUserType +", '"+ userId +"', "+ userType +",\n"+" "+ amount +", '"+DateFormatUtils.format(newDate(),"yyyy-MM-dd HH:mm:ss",TimeZone.getTimeZone("GMT"))+"', "+ profitState +", '"+ acctCircle +"', "+1+", "+(StringUtils.isBlank("123")?null:"'"+"contributeUserId"+"'")+", "+(StringUtils.isBlank("relationBrandOwnerId")?null:"'"+"relationBrandOwnerId"+"'")+", '"+ remark +"',\n"+" '"+DateFormatUtils.format(newDate(),"yyyy-MM-dd HH:mm:ss",TimeZone.getTimeZone("GMT"))+"');";
cloudStatement.execute("delete from dws_profit_record_hdj_flink_api where id = '"+ id +"'");System.out.println("insertSql--->"+insertSql);
cloudStatement.execute(insertSql);}@Overridepublicvoidclose()throwsException{super.close();// 在这里关闭 JDBC 连接
coalitiondbStatement.close();
coalitiondbConnection.close();
cloudStatement.close();
cloudConnection.close();}}/**
* 清晰数据
*
* @param source
* @return
*/privatestaticSingleOutputStreamOperator<String>clean(SingleOutputStreamOperator<String> source){return source.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String row,Collector<String> out)throwsException{try{
LOG.info("============================row:{}", row);JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");//history,insert,updateif(Arrays.asList("r","c","u").contains(op)){
out.collect(rowJson.getJSONObject("after").toJSONString());}else{
LOG.info("filter other op:{}", op);}}catch(Exception ex){
LOG.warn("filter other format binlog:{}", row);}}});}/**
* 过滤数据
*
* @param source
* @param table
* @return
*/privatestaticSingleOutputStreamOperator<String>filterTableData(DataStreamSource<String> source,String table){return source.filter(newFilterFunction<String>(){@Overridepublicbooleanfilter(String row)throwsException{try{JSONObject rowJson = JSON.parseObject(row);JSONObject source = rowJson.getJSONObject("source");String tbl = source.getString("table");return table.equals(tbl);}catch(Exception ex){
ex.printStackTrace();returnfalse;}}});}privatestaticList<String>getTableList(){List<String> tables =newArrayList<>();String sql ="SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '"+ SYNC_DB +"'";List<JSONObject> tableList =JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);for(JSONObject jsob : tableList){String schemaName = jsob.getString("TABLE_SCHEMA");String tblName = jsob.getString("TABLE_NAME");String schemaTbl = schemaName +"."+ tblName;if(SYNC_TABLES.contains(schemaTbl)){
tables.add(tblName);}}return tables;}}
JdbcUtil.java
import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class JdbcUtil {
static {try{//Class.forName("com.mysql.cj.jdbc.Driver");Class.forName("com.mysql.jdbc.Driver");}catch(ClassNotFoundException e){
e.printStackTrace();}}
private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
public static void main(String[] args) throws SQLException {}
public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql){
List<JSONObject> beJson = new ArrayList<>();
String connectionUrl = String.format("jdbc:mysql://%s:%s/league_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai", hostUrl, port);
Connection con = null;try{
con = DriverManager.getConnection(connectionUrl, user, password);
PreparedStatement ps = con.prepareStatement(sql);
ResultSet rs = ps.executeQuery();
beJson = resultSetToJson(rs);}catch(SQLException e){
e.printStackTrace();}catch(Exception e){
e.printStackTrace();}finally{try{
con.close();}catch(Exception e){}}return beJson;}
private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {
List<JSONObject> list = new ArrayList<>();
ResultSetMetaData metaData = rs.getMetaData();
int columnCount = metaData.getColumnCount();while(rs.next()){
JSONObject jsonObj = new JSONObject();for(int i = 1; i <= columnCount; i++){
String columnName = metaData.getColumnLabel(i);
String value = rs.getString(columnName);
jsonObj.put(columnName, value);}
list.add(jsonObj);}return list;}}
pom.xml:
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.0</version></dependency>
2.1 Mysql 打开 bin-log 功能
og_bin 的Value如果为ON代表开启,如果为OFF代表关闭,MySQL8.0默认是开启的:
# 查看是否开启binlog
mysql>SHOW VARIABLES LIKE'%log_bin%';
关闭状态:
- log_bin为ON代表MySQL已经开启binlog日志记录
- log_bin_basename配置了binlog的文件路径及文件前缀名
- log_bin_index配置了binlog索引文件的路径
开启状态:
# 在centos中mysql的配置文件一般都在/etc/mysql目录下,如果不在可以通过 find / -name "my.cnf" 查找
vi /etc/mysql/my.cnf
# 服务ID
server-id=1
# binlog 配置 只要配置了log_bin地址 就会开启
log_bin = /var/lib/mysql/mysql_bin
# 日志存储天数 默认0 永久保存# 如果数据库会定期归档,建议设置一个存储时间不需要一直存储binlog日志,理论上只需要存储归档之后的日志
expire_logs_days = 30
# binlog最大值
max_binlog_size = 1024M
# 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,建议使用row格式
binlog_format = ROW
# 在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操作,而是由文件系统控制刷新日志文件,如果是在线交易和账有关的数据建议设置成1,如果是其他数据可以保持为0即可
sync_binlog = 1
# 重启MySQL服务使配置生效
systemctl restart mysqld / service mysql restart
# 查看日志列表
SHOW MASTER LOGS;
可参考:MySQL 开启配置binlog以及通过binlog恢复数据
2.2 在 Mysql 中建库建表准备
CREATE DATABASE IF NOT EXISTS cloud_test;
CREATE DATABASE IF NOT EXISTS league_test;
CREATE TABLE league_test.oc_settle_profit (
id varchar(32),
show_profit_id varchar(32),
order_no varchar(32),
from_user_id varchar(32),
from_user_type int(11),
user_id varchar(32),
user_type int(11),
rate int(11),
amount int(11),type int(11),
add_time datetime,
state int(11),
expect_profit_time datetime,
profit_time datetime,
profit_mode int(11),
opt_code varchar(32),
opt_name varchar(32),
acct_circle varchar(32),
process_state int(11),
parent_id varchar(32),
keep_account_from_user_id varchar(32),
keep_account_from_bm_user_id varchar(32),
keep_account_user_id varchar(32),
keep_account_bm_user_id varchar(32),
biz_type int(11),
remark varchar(32),
contribute_user_id varchar(32),
relation_brand_owner_id varchar(32),
PRIMARY KEY (id)USING BTREE
);
CREATE TABLE cloud_test.dws_profit_record_hdj_flink_api (
id varchar(32),
show_profit_id varchar(32),
order_no varchar(32),
from_user_id varchar(32),
from_user_type int(11),
user_id varchar(32),
user_type int(11),
amount int(11),
profit_time datetime,
state int(11),
acct_circle varchar(32),
biz_type int(11),
contribute_user_id varchar(32),
relation_brand_owner_id varchar(32),
remark varchar(32),
add_time datetime,
PRIMARY KEY (id)USING BTREE
);
2.3 遇到的坑
2.3.1 The MySQL server has a timezone offset (0 seconds ahead of UTC)
用 JDBC 连接 Mysql 的时候报错:
The MySQL server has a timezone offset (0 seconds ahead of UTC)
原因:从错误即可知道是时区的错误。
show variables like '%time_zone%';
Variable_name |Value |----------------+------+
time_zone |SYSTEM|// 或者下面这条命令
SELECT @@global.time_zone;
解决:使用 root 用户登录 mysql,再执行
set global time_zone='+8:00'
命令。
注意:一开始改成了
SET GLOBAL time_zone = 'Asia/Shanghai'
,但并不好使。
2.3.2 自动转换datetime为时间戳问题
Apache Flink 是一种流处理框架,它可以读取 MySQL binlog 日志,并将其转换为流数据。然而,由于Flink内部采用的时间戳格式与 MySQL 的 datetime 格式不同,所以在抓取 binlog 时,需要进行一定的转换才能正确地解析数据。
自定义类:DateTimeConverter 实现 CustomConverter 接口,重写对应方法对 mysql 的时间类型进行标准转换
importio.debezium.spi.converter.CustomConverter;importio.debezium.spi.converter.RelationalColumn;importorg.apache.kafka.connect.data.SchemaBuilder;importjava.time.*;importjava.time.format.DateTimeFormatter;importjava.util.Properties;/**
* @BelongsProject:
* @BelongsPackage:
* @Author:
* @CreateTime:
* @Description: TODO 实现CustomConverter接口,重写对应方法对mysql的时间类型进行标准转换
* @Version: 1.0
*/publicclassDateTimeConverterimplementsCustomConverter<SchemaBuilder,RelationalColumn>{privateDateTimeFormatter dateFormatter =DateTimeFormatter.ISO_DATE;privateDateTimeFormatter timeFormatter =DateTimeFormatter.ISO_TIME;privateDateTimeFormatter datetimeFormatter =DateTimeFormatter.ISO_DATE_TIME;privateDateTimeFormatter timestampFormatter =DateTimeFormatter.ISO_DATE_TIME;privateZoneId timestampZoneId =ZoneId.systemDefault();@Overridepublicvoidconfigure(Properties props){}@OverridepublicvoidconverterFor(RelationalColumn column,ConverterRegistration<SchemaBuilder> registration){String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder =null;Converter converter =null;if("DATE".equals(sqlType)){
schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
converter =this::convertDate;}if("TIME".equals(sqlType)){
schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
converter =this::convertTime;}if("DATETIME".equals(sqlType)){
schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
converter =this::convertDateTime;}if("TIMESTAMP".equals(sqlType)){
schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
converter =this::convertTimestamp;}if(schemaBuilder !=null){
registration.register(schemaBuilder, converter);}}privateStringconvertDate(Object input){if(input ==null)returnnull;if(input instanceofLocalDate){return dateFormatter.format((LocalDate) input);}if(input instanceofInteger){LocalDate date =LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}returnString.valueOf(input);}privateStringconvertTime(Object input){if(input ==null)returnnull;if(input instanceofDuration){Duration duration =(Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time =LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}returnString.valueOf(input);}privateStringconvertDateTime(Object input){if(input ==null)returnnull;if(input instanceofLocalDateTime){return datetimeFormatter.format((LocalDateTime) input).replaceAll("T"," ");}returnString.valueOf(input);}privateStringconvertTimestamp(Object input){if(input ==null)returnnull;if(input instanceofZonedDateTime){// mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间ZonedDateTime zonedDateTime =(ZonedDateTime) input;LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime).replaceAll("T"," ");}returnString.valueOf(input);}}
引入:
publicclassFlinkSourceUtil{publicstaticMySqlSource<JSONObject>getMySqlSource(ParameterTool parameterTool,List<String> databases,List<String> tables){Properties props =newProperties();
props.setProperty("useSSL","false");
props.setProperty("allowPublicKeyRetrieval","true");Properties debeziumProperties =newProperties();
debeziumProperties.setProperty("converters","dateConverters");
debeziumProperties.setProperty("dateConverters.type","com.xxx.util.DateTimeConverter");
debeziumProperties.setProperty("dateConverters.format.date","yyyy-MM-dd");
debeziumProperties.setProperty("dateConverters.format.time","HH:mm:ss");
debeziumProperties.setProperty("dateConverters.format.datetime","yyyy-MM-dd HH:mm:ss");
debeziumProperties.setProperty("dateConverters.format.timestamp","yyyy-MM-dd HH:mm:ss");
debeziumProperties.setProperty("dateConverters.format.timestamp.zone","UTC+8");String[] databaseArray = databases.toArray(newString[0]);String[] tableArray = tables.toArray(newString[0]);MySqlSource<JSONObject> mySqlSource =MySqlSource.<JSONObject>builder().hostname(parameterTool.get(Constant.MYSQl_SOURCE_HOST_NAME)).port(Integer.parseInt(parameterTool.get(Constant.MYSQl_SOURCE_PORT))).databaseList(databaseArray).tableList(tableArray).username(parameterTool.get(Constant.MYSQL_SOURCE_USER_NAME)).password(parameterTool.get(Constant.MYSQL_SOURCE_PASSWORD)).deserializer(newMyDebeziumDeserializationSchema()).debeziumProperties(debeziumProperties).startupOptions(StartupOptions.initial()).serverId(parameterTool.get(Constant.SERVER_ID)).jdbcProperties(props).build();return mySqlSource;}}
参考:
关于flinkCDC监听MySQL binlog时自动转换datetime为时间戳问题
FlinkCDC时间问题timestamp等
flink-core抓mysql-binlog,字段datetime会自动转换成时间戳,怎么解决?
2.3.3 重启程序会全量读取 binlog,我想要的是增量
其实 MySqlSourceBuilder 是有一个方法特意指定
startUP mode
的,改造代码
MySqlSourceBuilder<String> mySqlSource =newMySqlSourceBuilder<>();
mySqlSource.startupOptions(StartupOptions.latest());
mySqlSource
.hostname(MYSQL_HOST).port(MYSQL_PORT).databaseList(SYNC_DB)// set captured database.tableList(SYNC_TABLES)// set captured table.username(MYSQL_USER).password(MYSQL_PASSWD).deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.debeziumProperties(debeziumProperties).build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(5000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource.build(),WatermarkStrategy.noWatermarks(),"CDC Source"+"flinkcdc-kafka");
参考:Flink cdc如何只进行增量同步,不同步历史数据(只读取binlog)
2.4 测试
Idea 启动程序后,在 oc_settle_profit 表中插入数据后 dws_profit_record_hdj_flink_api 也可以同步插入相应的数据。
参考:
【博学谷学习记录】超强总结,用心分享|大数据之flinkCDC
一次打通FlinkCDC同步Mysql数据
三、番外
用 Flink CDC 可以监控 Mysql,但无法监控 StarRocks,和官方询问过,目前 StarRocks 并没有像 Mysql 这样被外部感知 DDL 操作的 bin-log 功能,所以暂时还无法用 Flink CDC 监控 StarRocks。
版权归原作者 小强签名设计 所有, 如有侵权,请联系我们删除。