0


Flink CDC 同步 Mysql 数据

文章目录

一、Flink CDC、Flink、CDC各有啥关系

  Flink:流式计算框架,不包含 Flink CDC,和 Flink CDC没关系

  CDC:是一种思想,理念,不涉及某一门具体的技术。CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。

  Flink CDC:是 CDC 的一种实现而已,不属于 Flink 子版块。这个技术是阿里开发的。目的是为了丰富 Flink 的生态。

1.1 概述

  Flink CDC 基于数据库日志的

  1. Change Data Caputre

技术,实现了全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。

1.2 和 jdbc Connectors 对比
  1. JDBC Connectors

连接器,确实可以读取外部的 数据库。比如:MySQL、Oracle、SqlServer等。但是,JDBC连数据库,只是瞬时操作,没办法持续监听数据库的数据变化。

  1. Flink CDC Connectors

,可以实现数据库的变更捕获,能够持续不断地把变更数据同步到下游的系统中。

官网概述:https://ververica.github.io/flink-cdc-connectors/
github链接:https://github.com/ververica/flink-cdc-connectors

二、使用

  FlinkCDC 同步数据,有两种方式,一种是 FlinkSQL 的方式,一种是Flink DataStream 和 Table API 的方式。

  我这里直接用的是 ieda 测试的 DataStream 方式。

  代码来自:https://github.com/yclxiao/flink-cdc-demo/tree/main/src/main/java/com/yclxiao/flinkcdcdemo

  CloudAcctProfit2DwsHdjProfitRecordAPI.java

  1. importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importcom.xiaoqiang.utils.JdbcUtil;importorg.apache.commons.lang3.StringUtils;importorg.apache.commons.lang3.time.DateFormatUtils;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importorg.apache.flink.util.Collector;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.ResultSet;importjava.sql.Statement;importjava.util.*;publicclassCloudAcctProfit2DwsHdjProfitRecordAPI{privatestaticfinalLogger LOG =LoggerFactory.getLogger(CloudAcctProfit2DwsHdjProfitRecordAPI.class);privatestaticString MYSQL_HOST ="x.x.x.x";privatestaticint MYSQL_PORT =3306;privatestaticString MYSQL_USER ="root";privatestaticString MYSQL_PASSWD ="xiaoqiang";privatestaticString SYNC_DB ="league_test";privatestaticList<String> SYNC_TABLES =Arrays.asList("league_test.oc_settle_profit");publicstaticvoidmain(String[] args)throwsException{MySqlSource<String> mySqlSource =MySqlSource.<String>builder().hostname(MYSQL_HOST).port(MYSQL_PORT).databaseList(SYNC_DB)// set captured database.tableList(String.join(",", SYNC_TABLES))// set captured table.username(MYSQL_USER).password(MYSQL_PASSWD).deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
  2. env.setParallelism(1);
  3. env.enableCheckpointing(5000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"CDC Source"+"xiaoqiang-flink");List<String> tableList =getTableList();System.out.println("tableList--->"+tableList);for(String tbl : tableList){SingleOutputStreamOperator<String> filterStream =filterTableData(cdcSource,"oc_settle_profit");// SingleOutputStreamOperator<String> cleanStream = clean(filterStream);// 流的数据sink出去
  4. filterStream.addSink(newCustomDealDataSink()).name("sink "+ tbl);}
  5. env.execute("xiaoqiang-flink");}/**
  6. * 自定义sink
  7. */privatestaticclassCustomDealDataSinkextendsRichSinkFunction<String>{privatetransientConnection coalitiondbConnection;privatetransientStatement coalitiondbStatement;privatetransientConnection cloudConnection;privatetransientStatement cloudStatement;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);// 在这里初始化 JDBC 连接
  8. coalitiondbConnection =DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/league_test","root","");
  9. coalitiondbStatement = coalitiondbConnection.createStatement();
  10. cloudConnection =DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/cloud_test","root","");
  11. cloudStatement = cloudConnection.createStatement();}@Overridepublicvoidinvoke(String value,Context context)throwsException{// 解析拿到的CDC-JSON数据JSONObject rowJson = JSON.parseObject(value);String outNo = rowJson.getString("out_no");Integer userType = rowJson.getInteger("user_type");String id = rowJson.getString("id");String payOrderNo = rowJson.getString("pay_order_no");String title = rowJson.getString("title");String fromUserId = rowJson.getString("from_user_id");String fromAccountId = rowJson.getString("from_account_id");String userId = rowJson.getString("user_id");String accountId = rowJson.getString("account_id");Integer amount = rowJson.getInteger("amount");Integer profitState = rowJson.getInteger("profit_state");Date profitTime = rowJson.getTimestamp("profit_time");Integer refundState = rowJson.getInteger("refund_state");Date refundTime = rowJson.getTimestamp("refund_time");Date addTime = rowJson.getTimestamp("add_time");String remark = rowJson.getString("remark");String acctCircle = rowJson.getString("acct_circle");Integer fromUserType = rowJson.getInteger("from_user_type");String companyId = rowJson.getString("company_id");String bizCompanyId = rowJson.getString("biz_company_id");// if (1 != profitState || !"PG11111".equals(acctCircle)) {// return;// }//// // 读取相关表的数据(与其他表进行关联)// Integer bizType = null;// String contributeUserId = null;// String relationBrandOwnerId = null;// ResultSet virtualOrderResultSet = coalitiondbStatement.executeQuery("select * from tc_virtual_order where order_type != 2 and id = '" + outNo + "'");// // 如果是tc_virtual_order订单(上岗卡、安心卡、课程)// if (virtualOrderResultSet.next()) {// // 处理数据逻辑// Integer virtualOrder4OrderType = virtualOrderResultSet.getInt("order_type");// String virtualOrder4CompanyId = virtualOrderResultSet.getString("company_id");// String virtualOrder4BrandId = virtualOrderResultSet.getString("brand_id");// // 上岗卡订单排掉,因为已经有别的任务处理了// if (virtualOrder4OrderType == 2) {// return;// }// // orderType转换// if (virtualOrder4OrderType == 6) {// bizType = 10;// } else if (virtualOrder4OrderType == 1) {// bizType = 11;// } else if (virtualOrder4OrderType == 5) {// bizType = 12;// }// // userType转换// if (virtualOrder4OrderType == 6 && userType == 92) {// contributeUserId = virtualOrder4CompanyId;// } else if (virtualOrder4OrderType == 1 && userType == 92) {// contributeUserId = virtualOrder4CompanyId;// } else if (virtualOrder4OrderType == 5 && userType == 92) {// contributeUserId = virtualOrder4CompanyId;// }// // relationBrandOwnerId转换// if (virtualOrder4OrderType == 6 && userType == 90) {// relationBrandOwnerId = virtualOrder4BrandId;// } else if (virtualOrder4OrderType == 1 && userType == 90) {// relationBrandOwnerId = virtualOrder4BrandId;// } else if (virtualOrder4OrderType == 5 && userType == 90) {// relationBrandOwnerId = virtualOrder4BrandId;// }// // remark转换// if (virtualOrder4OrderType == 1 || virtualOrder4OrderType == 5) {// remark = title;// }// } else {// // 如果不是tc_virtual_order的数据,则可能是其他数据,此处只保留好到家实物商品数据// if (StringUtils.isBlank(payOrderNo)) {// return;// }// ResultSet acctPayOrderResultSet = cloudStatement.executeQuery("select * from acct_pay_order t where t.id = '" + payOrderNo + "'");// if (!acctPayOrderResultSet.next()) {// return;// }// Integer payCate = acctPayOrderResultSet.getInt("pay_cate");// if (200100 != payCate) { // 好到家实物商品类型// return;// }//// bizType = 20;// if (userType == 92 && StringUtils.isNotBlank(bizCompanyId)) {// contributeUserId = bizCompanyId;// } else if (userType == 90 && StringUtils.isNotBlank(bizCompanyId)) {// ResultSet brandOwnerIdResultSet = cloudStatement.executeQuery("select * from uc_brand_partner t where t.company_id = '" + bizCompanyId + "'");// if (brandOwnerIdResultSet.next()) {// relationBrandOwnerId = brandOwnerIdResultSet.getString("brand_owner_id");// }// }// }// if (StringUtils.isBlank(remark)) {// remark = title;// }// 数据写入到mysqlString insertSql ="INSERT INTO dws_profit_record_hdj_flink_api (id, show_profit_id, order_no, from_user_id, from_user_type, user_id,\n"+" user_type, amount, profit_time, state, acct_circle, biz_type,\n"+" contribute_user_id, relation_brand_owner_id, remark, add_time)\n"+"VALUES ('"+ id +"', '"+"JSD"+ id +"', '"+ outNo +"', '"+ fromUserId +"', "+ fromUserType +", '"+ userId +"', "+ userType +",\n"+" "+ amount +", '"+DateFormatUtils.format(newDate(),"yyyy-MM-dd HH:mm:ss",TimeZone.getTimeZone("GMT"))+"', "+ profitState +", '"+ acctCircle +"', "+1+", "+(StringUtils.isBlank("123")?null:"'"+"contributeUserId"+"'")+", "+(StringUtils.isBlank("relationBrandOwnerId")?null:"'"+"relationBrandOwnerId"+"'")+", '"+ remark +"',\n"+" '"+DateFormatUtils.format(newDate(),"yyyy-MM-dd HH:mm:ss",TimeZone.getTimeZone("GMT"))+"');";
  12. cloudStatement.execute("delete from dws_profit_record_hdj_flink_api where id = '"+ id +"'");System.out.println("insertSql--->"+insertSql);
  13. cloudStatement.execute(insertSql);}@Overridepublicvoidclose()throwsException{super.close();// 在这里关闭 JDBC 连接
  14. coalitiondbStatement.close();
  15. coalitiondbConnection.close();
  16. cloudStatement.close();
  17. cloudConnection.close();}}/**
  18. * 清晰数据
  19. *
  20. * @param source
  21. * @return
  22. */privatestaticSingleOutputStreamOperator<String>clean(SingleOutputStreamOperator<String> source){return source.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String row,Collector<String> out)throwsException{try{
  23. LOG.info("============================row:{}", row);JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");//history,insert,updateif(Arrays.asList("r","c","u").contains(op)){
  24. out.collect(rowJson.getJSONObject("after").toJSONString());}else{
  25. LOG.info("filter other op:{}", op);}}catch(Exception ex){
  26. LOG.warn("filter other format binlog:{}", row);}}});}/**
  27. * 过滤数据
  28. *
  29. * @param source
  30. * @param table
  31. * @return
  32. */privatestaticSingleOutputStreamOperator<String>filterTableData(DataStreamSource<String> source,String table){return source.filter(newFilterFunction<String>(){@Overridepublicbooleanfilter(String row)throwsException{try{JSONObject rowJson = JSON.parseObject(row);JSONObject source = rowJson.getJSONObject("source");String tbl = source.getString("table");return table.equals(tbl);}catch(Exception ex){
  33. ex.printStackTrace();returnfalse;}}});}privatestaticList<String>getTableList(){List<String> tables =newArrayList<>();String sql ="SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '"+ SYNC_DB +"'";List<JSONObject> tableList =JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);for(JSONObject jsob : tableList){String schemaName = jsob.getString("TABLE_SCHEMA");String tblName = jsob.getString("TABLE_NAME");String schemaTbl = schemaName +"."+ tblName;if(SYNC_TABLES.contains(schemaTbl)){
  34. tables.add(tblName);}}return tables;}}

  JdbcUtil.java

  1. import com.alibaba.fastjson.JSONObject;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import java.sql.*;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. public class JdbcUtil {
  8. static {try{//Class.forName("com.mysql.cj.jdbc.Driver");Class.forName("com.mysql.jdbc.Driver");}catch(ClassNotFoundException e){
  9. e.printStackTrace();}}
  10. private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);
  11. public static void main(String[] args) throws SQLException {}
  12. public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql){
  13. List<JSONObject> beJson = new ArrayList<>();
  14. String connectionUrl = String.format("jdbc:mysql://%s:%s/league_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai", hostUrl, port);
  15. Connection con = null;try{
  16. con = DriverManager.getConnection(connectionUrl, user, password);
  17. PreparedStatement ps = con.prepareStatement(sql);
  18. ResultSet rs = ps.executeQuery();
  19. beJson = resultSetToJson(rs);}catch(SQLException e){
  20. e.printStackTrace();}catch(Exception e){
  21. e.printStackTrace();}finally{try{
  22. con.close();}catch(Exception e){}}return beJson;}
  23. private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {
  24. List<JSONObject> list = new ArrayList<>();
  25. ResultSetMetaData metaData = rs.getMetaData();
  26. int columnCount = metaData.getColumnCount();while(rs.next()){
  27. JSONObject jsonObj = new JSONObject();for(int i = 1; i <= columnCount; i++){
  28. String columnName = metaData.getColumnLabel(i);
  29. String value = rs.getString(columnName);
  30. jsonObj.put(columnName, value);}
  31. list.add(jsonObj);}return list;}}

  pom.xml:

  1. <dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.0</version></dependency>
2.1 Mysql 打开 bin-log 功能

  og_bin 的Value如果为ON代表开启,如果为OFF代表关闭,MySQL8.0默认是开启的:

  1. # 查看是否开启binlog
  2. mysql>SHOW VARIABLES LIKE'%log_bin%';

  关闭状态:

在这里插入图片描述

  • log_bin为ON代表MySQL已经开启binlog日志记录
  • log_bin_basename配置了binlog的文件路径及文件前缀名
  • log_bin_index配置了binlog索引文件的路径

  开启状态:

在这里插入图片描述

  1. # 在centos中mysql的配置文件一般都在/etc/mysql目录下,如果不在可以通过 find / -name "my.cnf" 查找
  2. vi /etc/mysql/my.cnf
  3. # 服务ID
  4. server-id=1
  5. # binlog 配置 只要配置了log_bin地址 就会开启
  6. log_bin = /var/lib/mysql/mysql_bin
  7. # 日志存储天数 默认0 永久保存# 如果数据库会定期归档,建议设置一个存储时间不需要一直存储binlog日志,理论上只需要存储归档之后的日志
  8. expire_logs_days = 30
  9. # binlog最大值
  10. max_binlog_size = 1024M
  11. # 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,建议使用row格式
  12. binlog_format = ROW
  13. # 在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操作,而是由文件系统控制刷新日志文件,如果是在线交易和账有关的数据建议设置成1,如果是其他数据可以保持为0即可
  14. sync_binlog = 1
  15. # 重启MySQL服务使配置生效
  16. systemctl restart mysqld / service mysql restart
  17. # 查看日志列表
  18. SHOW MASTER LOGS;

可参考:MySQL 开启配置binlog以及通过binlog恢复数据

2.2 在 Mysql 中建库建表准备
  1. CREATE DATABASE IF NOT EXISTS cloud_test;
  2. CREATE DATABASE IF NOT EXISTS league_test;
  3. CREATE TABLE league_test.oc_settle_profit (
  4. id varchar(32),
  5. show_profit_id varchar(32),
  6. order_no varchar(32),
  7. from_user_id varchar(32),
  8. from_user_type int(11),
  9. user_id varchar(32),
  10. user_type int(11),
  11. rate int(11),
  12. amount int(11),type int(11),
  13. add_time datetime,
  14. state int(11),
  15. expect_profit_time datetime,
  16. profit_time datetime,
  17. profit_mode int(11),
  18. opt_code varchar(32),
  19. opt_name varchar(32),
  20. acct_circle varchar(32),
  21. process_state int(11),
  22. parent_id varchar(32),
  23. keep_account_from_user_id varchar(32),
  24. keep_account_from_bm_user_id varchar(32),
  25. keep_account_user_id varchar(32),
  26. keep_account_bm_user_id varchar(32),
  27. biz_type int(11),
  28. remark varchar(32),
  29. contribute_user_id varchar(32),
  30. relation_brand_owner_id varchar(32),
  31. PRIMARY KEY (id)USING BTREE
  32. );
  33. CREATE TABLE cloud_test.dws_profit_record_hdj_flink_api (
  34. id varchar(32),
  35. show_profit_id varchar(32),
  36. order_no varchar(32),
  37. from_user_id varchar(32),
  38. from_user_type int(11),
  39. user_id varchar(32),
  40. user_type int(11),
  41. amount int(11),
  42. profit_time datetime,
  43. state int(11),
  44. acct_circle varchar(32),
  45. biz_type int(11),
  46. contribute_user_id varchar(32),
  47. relation_brand_owner_id varchar(32),
  48. remark varchar(32),
  49. add_time datetime,
  50. PRIMARY KEY (id)USING BTREE
  51. );
2.3 遇到的坑
2.3.1 The MySQL server has a timezone offset (0 seconds ahead of UTC)

  用 JDBC 连接 Mysql 的时候报错:

  1. The MySQL server has a timezone offset (0 seconds ahead of UTC)

  原因:从错误即可知道是时区的错误。

  1. show variables like '%time_zone%';
  2. Variable_name |Value |----------------+------+
  3. time_zone |SYSTEM|// 或者下面这条命令
  4. SELECT @@global.time_zone;

  解决:使用 root 用户登录 mysql,再执行

  1. set global time_zone='+8:00'

命令。

  注意:一开始改成了

  1. SET GLOBAL time_zone = 'Asia/Shanghai'

,但并不好使。

2.3.2 自动转换datetime为时间戳问题

  Apache Flink 是一种流处理框架,它可以读取 MySQL binlog 日志,并将其转换为流数据。然而,由于Flink内部采用的时间戳格式与 MySQL 的 datetime 格式不同,所以在抓取 binlog 时,需要进行一定的转换才能正确地解析数据。

  自定义类:DateTimeConverter 实现 CustomConverter 接口,重写对应方法对 mysql 的时间类型进行标准转换

  1. importio.debezium.spi.converter.CustomConverter;importio.debezium.spi.converter.RelationalColumn;importorg.apache.kafka.connect.data.SchemaBuilder;importjava.time.*;importjava.time.format.DateTimeFormatter;importjava.util.Properties;/**
  2. * @BelongsProject:
  3. * @BelongsPackage:
  4. * @Author:
  5. * @CreateTime:
  6. * @Description: TODO 实现CustomConverter接口,重写对应方法对mysql的时间类型进行标准转换
  7. * @Version: 1.0
  8. */publicclassDateTimeConverterimplementsCustomConverter<SchemaBuilder,RelationalColumn>{privateDateTimeFormatter dateFormatter =DateTimeFormatter.ISO_DATE;privateDateTimeFormatter timeFormatter =DateTimeFormatter.ISO_TIME;privateDateTimeFormatter datetimeFormatter =DateTimeFormatter.ISO_DATE_TIME;privateDateTimeFormatter timestampFormatter =DateTimeFormatter.ISO_DATE_TIME;privateZoneId timestampZoneId =ZoneId.systemDefault();@Overridepublicvoidconfigure(Properties props){}@OverridepublicvoidconverterFor(RelationalColumn column,ConverterRegistration<SchemaBuilder> registration){String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder =null;Converter converter =null;if("DATE".equals(sqlType)){
  9. schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
  10. converter =this::convertDate;}if("TIME".equals(sqlType)){
  11. schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
  12. converter =this::convertTime;}if("DATETIME".equals(sqlType)){
  13. schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
  14. converter =this::convertDateTime;}if("TIMESTAMP".equals(sqlType)){
  15. schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
  16. converter =this::convertTimestamp;}if(schemaBuilder !=null){
  17. registration.register(schemaBuilder, converter);}}privateStringconvertDate(Object input){if(input ==null)returnnull;if(input instanceofLocalDate){return dateFormatter.format((LocalDate) input);}if(input instanceofInteger){LocalDate date =LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}returnString.valueOf(input);}privateStringconvertTime(Object input){if(input ==null)returnnull;if(input instanceofDuration){Duration duration =(Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time =LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}returnString.valueOf(input);}privateStringconvertDateTime(Object input){if(input ==null)returnnull;if(input instanceofLocalDateTime){return datetimeFormatter.format((LocalDateTime) input).replaceAll("T"," ");}returnString.valueOf(input);}privateStringconvertTimestamp(Object input){if(input ==null)returnnull;if(input instanceofZonedDateTime){// mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间ZonedDateTime zonedDateTime =(ZonedDateTime) input;LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime).replaceAll("T"," ");}returnString.valueOf(input);}}

  引入:

  1. publicclassFlinkSourceUtil{publicstaticMySqlSource<JSONObject>getMySqlSource(ParameterTool parameterTool,List<String> databases,List<String> tables){Properties props =newProperties();
  2. props.setProperty("useSSL","false");
  3. props.setProperty("allowPublicKeyRetrieval","true");Properties debeziumProperties =newProperties();
  4. debeziumProperties.setProperty("converters","dateConverters");
  5. debeziumProperties.setProperty("dateConverters.type","com.xxx.util.DateTimeConverter");
  6. debeziumProperties.setProperty("dateConverters.format.date","yyyy-MM-dd");
  7. debeziumProperties.setProperty("dateConverters.format.time","HH:mm:ss");
  8. debeziumProperties.setProperty("dateConverters.format.datetime","yyyy-MM-dd HH:mm:ss");
  9. debeziumProperties.setProperty("dateConverters.format.timestamp","yyyy-MM-dd HH:mm:ss");
  10. debeziumProperties.setProperty("dateConverters.format.timestamp.zone","UTC+8");String[] databaseArray = databases.toArray(newString[0]);String[] tableArray = tables.toArray(newString[0]);MySqlSource<JSONObject> mySqlSource =MySqlSource.<JSONObject>builder().hostname(parameterTool.get(Constant.MYSQl_SOURCE_HOST_NAME)).port(Integer.parseInt(parameterTool.get(Constant.MYSQl_SOURCE_PORT))).databaseList(databaseArray).tableList(tableArray).username(parameterTool.get(Constant.MYSQL_SOURCE_USER_NAME)).password(parameterTool.get(Constant.MYSQL_SOURCE_PASSWORD)).deserializer(newMyDebeziumDeserializationSchema()).debeziumProperties(debeziumProperties).startupOptions(StartupOptions.initial()).serverId(parameterTool.get(Constant.SERVER_ID)).jdbcProperties(props).build();return mySqlSource;}}

参考:
关于flinkCDC监听MySQL binlog时自动转换datetime为时间戳问题
FlinkCDC时间问题timestamp等
flink-core抓mysql-binlog,字段datetime会自动转换成时间戳,怎么解决?

2.3.3 重启程序会全量读取 binlog,我想要的是增量

  其实 MySqlSourceBuilder 是有一个方法特意指定

  1. startUP mode

的,改造代码

  1. MySqlSourceBuilder<String> mySqlSource =newMySqlSourceBuilder<>();
  2. mySqlSource.startupOptions(StartupOptions.latest());
  3. mySqlSource
  4. .hostname(MYSQL_HOST).port(MYSQL_PORT).databaseList(SYNC_DB)// set captured database.tableList(SYNC_TABLES)// set captured table.username(MYSQL_USER).password(MYSQL_PASSWD).deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.debeziumProperties(debeziumProperties).build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
  5. env.setParallelism(1);
  6. env.enableCheckpointing(5000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource.build(),WatermarkStrategy.noWatermarks(),"CDC Source"+"flinkcdc-kafka");

参考:Flink cdc如何只进行增量同步,不同步历史数据(只读取binlog)

2.4 测试

  Idea 启动程序后,在 oc_settle_profit 表中插入数据后 dws_profit_record_hdj_flink_api 也可以同步插入相应的数据。

参考:
【博学谷学习记录】超强总结,用心分享|大数据之flinkCDC
一次打通FlinkCDC同步Mysql数据

三、番外

  用 Flink CDC 可以监控 Mysql,但无法监控 StarRocks,和官方询问过,目前 StarRocks 并没有像 Mysql 这样被外部感知 DDL 操作的 bin-log 功能,所以暂时还无法用 Flink CDC 监控 StarRocks。

标签: flink mysql 大数据

本文转载自: https://blog.csdn.net/m0_37739193/article/details/143374062
版权归原作者 小强签名设计 所有, 如有侵权,请联系我们删除。

“Flink CDC 同步 Mysql 数据”的评论:

还没有评论