0


Flink CDC 同步 Mysql 数据

文章目录

一、Flink CDC、Flink、CDC各有啥关系

  Flink:流式计算框架,不包含 Flink CDC,和 Flink CDC没关系

  CDC:是一种思想,理念,不涉及某一门具体的技术。CDC 是变更数据捕获(Change Data Capture)技术的缩写,它可以将源数据库(Source)的增量变动记录,同步到一个或多个数据目的(Sink)。在同步过程中,还可以对数据进行一定的处理,例如过滤、关联、分组、统计等。目前专业做数据库事件接受和解析的中间件是Debezium,如果是捕获Mysql,还有Canal。

  Flink CDC:是 CDC 的一种实现而已,不属于 Flink 子版块。这个技术是阿里开发的。目的是为了丰富 Flink 的生态。

1.1 概述

  Flink CDC 基于数据库日志的

Change Data Caputre

技术,实现了全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。

1.2 和 jdbc Connectors 对比
JDBC Connectors

连接器,确实可以读取外部的 数据库。比如:MySQL、Oracle、SqlServer等。但是,JDBC连数据库,只是瞬时操作,没办法持续监听数据库的数据变化。

Flink CDC Connectors

,可以实现数据库的变更捕获,能够持续不断地把变更数据同步到下游的系统中。

官网概述:https://ververica.github.io/flink-cdc-connectors/
github链接:https://github.com/ververica/flink-cdc-connectors

二、使用

  FlinkCDC 同步数据,有两种方式,一种是 FlinkSQL 的方式,一种是Flink DataStream 和 Table API 的方式。

  我这里直接用的是 ieda 测试的 DataStream 方式。

  代码来自:https://github.com/yclxiao/flink-cdc-demo/tree/main/src/main/java/com/yclxiao/flinkcdcdemo

  CloudAcctProfit2DwsHdjProfitRecordAPI.java

importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONObject;importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;importcom.xiaoqiang.utils.JdbcUtil;importorg.apache.commons.lang3.StringUtils;importorg.apache.commons.lang3.time.DateFormatUtils;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.functions.FilterFunction;importorg.apache.flink.api.common.functions.FlatMapFunction;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.RichSinkFunction;importorg.apache.flink.util.Collector;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.ResultSet;importjava.sql.Statement;importjava.util.*;publicclassCloudAcctProfit2DwsHdjProfitRecordAPI{privatestaticfinalLogger LOG =LoggerFactory.getLogger(CloudAcctProfit2DwsHdjProfitRecordAPI.class);privatestaticString MYSQL_HOST ="x.x.x.x";privatestaticint MYSQL_PORT =3306;privatestaticString MYSQL_USER ="root";privatestaticString MYSQL_PASSWD ="xiaoqiang";privatestaticString SYNC_DB ="league_test";privatestaticList<String> SYNC_TABLES =Arrays.asList("league_test.oc_settle_profit");publicstaticvoidmain(String[] args)throwsException{MySqlSource<String> mySqlSource =MySqlSource.<String>builder().hostname(MYSQL_HOST).port(MYSQL_PORT).databaseList(SYNC_DB)// set captured database.tableList(String.join(",", SYNC_TABLES))// set captured table.username(MYSQL_USER).password(MYSQL_PASSWD).deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(5000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"CDC Source"+"xiaoqiang-flink");List<String> tableList =getTableList();System.out.println("tableList--->"+tableList);for(String tbl : tableList){SingleOutputStreamOperator<String> filterStream =filterTableData(cdcSource,"oc_settle_profit");//            SingleOutputStreamOperator<String> cleanStream = clean(filterStream);// 流的数据sink出去
            filterStream.addSink(newCustomDealDataSink()).name("sink "+ tbl);}
        env.execute("xiaoqiang-flink");}/**
     * 自定义sink
     */privatestaticclassCustomDealDataSinkextendsRichSinkFunction<String>{privatetransientConnection coalitiondbConnection;privatetransientStatement coalitiondbStatement;privatetransientConnection cloudConnection;privatetransientStatement cloudStatement;@Overridepublicvoidopen(Configuration parameters)throwsException{super.open(parameters);// 在这里初始化 JDBC 连接
            coalitiondbConnection =DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/league_test","root","");
            coalitiondbStatement = coalitiondbConnection.createStatement();
            cloudConnection =DriverManager.getConnection("jdbc:mysql://x.x.x.x:3306/cloud_test","root","");
            cloudStatement = cloudConnection.createStatement();}@Overridepublicvoidinvoke(String value,Context context)throwsException{// 解析拿到的CDC-JSON数据JSONObject rowJson = JSON.parseObject(value);String outNo = rowJson.getString("out_no");Integer userType = rowJson.getInteger("user_type");String id = rowJson.getString("id");String payOrderNo = rowJson.getString("pay_order_no");String title = rowJson.getString("title");String fromUserId = rowJson.getString("from_user_id");String fromAccountId = rowJson.getString("from_account_id");String userId = rowJson.getString("user_id");String accountId = rowJson.getString("account_id");Integer amount = rowJson.getInteger("amount");Integer profitState = rowJson.getInteger("profit_state");Date profitTime = rowJson.getTimestamp("profit_time");Integer refundState = rowJson.getInteger("refund_state");Date refundTime = rowJson.getTimestamp("refund_time");Date addTime = rowJson.getTimestamp("add_time");String remark = rowJson.getString("remark");String acctCircle = rowJson.getString("acct_circle");Integer fromUserType = rowJson.getInteger("from_user_type");String companyId = rowJson.getString("company_id");String bizCompanyId = rowJson.getString("biz_company_id");//            if (1 != profitState || !"PG11111".equals(acctCircle)) {//                return;//            }////            // 读取相关表的数据(与其他表进行关联)//            Integer bizType = null;//            String contributeUserId = null;//            String relationBrandOwnerId = null;//            ResultSet virtualOrderResultSet = coalitiondbStatement.executeQuery("select * from tc_virtual_order where order_type != 2 and id = '" + outNo + "'");//            // 如果是tc_virtual_order订单(上岗卡、安心卡、课程)//            if (virtualOrderResultSet.next()) {//                // 处理数据逻辑//                Integer virtualOrder4OrderType = virtualOrderResultSet.getInt("order_type");//                String virtualOrder4CompanyId = virtualOrderResultSet.getString("company_id");//                String virtualOrder4BrandId = virtualOrderResultSet.getString("brand_id");//                // 上岗卡订单排掉,因为已经有别的任务处理了//                if (virtualOrder4OrderType == 2) {//                    return;//                }//                // orderType转换//                if (virtualOrder4OrderType == 6) {//                    bizType = 10;//                } else if (virtualOrder4OrderType == 1) {//                    bizType = 11;//                } else if (virtualOrder4OrderType == 5) {//                    bizType = 12;//                }//                // userType转换//                if (virtualOrder4OrderType == 6 && userType == 92) {//                    contributeUserId = virtualOrder4CompanyId;//                } else if (virtualOrder4OrderType == 1 && userType == 92) {//                    contributeUserId = virtualOrder4CompanyId;//                } else if (virtualOrder4OrderType == 5 && userType == 92) {//                    contributeUserId = virtualOrder4CompanyId;//                }//                // relationBrandOwnerId转换//                if (virtualOrder4OrderType == 6 && userType == 90) {//                    relationBrandOwnerId = virtualOrder4BrandId;//                } else if (virtualOrder4OrderType == 1 && userType == 90) {//                    relationBrandOwnerId = virtualOrder4BrandId;//                } else if (virtualOrder4OrderType == 5 && userType == 90) {//                    relationBrandOwnerId = virtualOrder4BrandId;//                }//                // remark转换//                if (virtualOrder4OrderType == 1 || virtualOrder4OrderType == 5) {//                    remark = title;//                }//            } else {//                // 如果不是tc_virtual_order的数据,则可能是其他数据,此处只保留好到家实物商品数据//                if (StringUtils.isBlank(payOrderNo)) {//                    return;//                }//                ResultSet acctPayOrderResultSet = cloudStatement.executeQuery("select * from acct_pay_order t where t.id = '" + payOrderNo + "'");//                if (!acctPayOrderResultSet.next()) {//                    return;//                }//                Integer payCate = acctPayOrderResultSet.getInt("pay_cate");//                if (200100 != payCate) { // 好到家实物商品类型//                    return;//                }////                bizType = 20;//                if (userType == 92 && StringUtils.isNotBlank(bizCompanyId)) {//                    contributeUserId = bizCompanyId;//                } else if (userType == 90 && StringUtils.isNotBlank(bizCompanyId)) {//                    ResultSet brandOwnerIdResultSet = cloudStatement.executeQuery("select * from uc_brand_partner t where t.company_id = '" + bizCompanyId + "'");//                    if (brandOwnerIdResultSet.next()) {//                        relationBrandOwnerId = brandOwnerIdResultSet.getString("brand_owner_id");//                    }//                }//            }//            if (StringUtils.isBlank(remark)) {//                remark = title;//            }// 数据写入到mysqlString insertSql ="INSERT INTO dws_profit_record_hdj_flink_api (id, show_profit_id, order_no, from_user_id, from_user_type, user_id,\n"+"                                                    user_type, amount, profit_time, state, acct_circle, biz_type,\n"+"                                                    contribute_user_id, relation_brand_owner_id, remark, add_time)\n"+"VALUES ('"+ id +"', '"+"JSD"+ id +"', '"+ outNo +"', '"+ fromUserId +"', "+ fromUserType +", '"+ userId +"', "+ userType +",\n"+"        "+ amount +", '"+DateFormatUtils.format(newDate(),"yyyy-MM-dd HH:mm:ss",TimeZone.getTimeZone("GMT"))+"', "+ profitState +", '"+ acctCircle +"', "+1+", "+(StringUtils.isBlank("123")?null:"'"+"contributeUserId"+"'")+", "+(StringUtils.isBlank("relationBrandOwnerId")?null:"'"+"relationBrandOwnerId"+"'")+", '"+ remark +"',\n"+"        '"+DateFormatUtils.format(newDate(),"yyyy-MM-dd HH:mm:ss",TimeZone.getTimeZone("GMT"))+"');";
            cloudStatement.execute("delete from dws_profit_record_hdj_flink_api where id = '"+ id +"'");System.out.println("insertSql--->"+insertSql);
            cloudStatement.execute(insertSql);}@Overridepublicvoidclose()throwsException{super.close();// 在这里关闭 JDBC 连接
            coalitiondbStatement.close();
            coalitiondbConnection.close();
            cloudStatement.close();
            cloudConnection.close();}}/**
     * 清晰数据
     *
     * @param source
     * @return
     */privatestaticSingleOutputStreamOperator<String>clean(SingleOutputStreamOperator<String> source){return source.flatMap(newFlatMapFunction<String,String>(){@OverridepublicvoidflatMap(String row,Collector<String> out)throwsException{try{
                    LOG.info("============================row:{}", row);JSONObject rowJson = JSON.parseObject(row);String op = rowJson.getString("op");//history,insert,updateif(Arrays.asList("r","c","u").contains(op)){
                        out.collect(rowJson.getJSONObject("after").toJSONString());}else{
                        LOG.info("filter other op:{}", op);}}catch(Exception ex){
                    LOG.warn("filter other format binlog:{}", row);}}});}/**
     * 过滤数据
     *
     * @param source
     * @param table
     * @return
     */privatestaticSingleOutputStreamOperator<String>filterTableData(DataStreamSource<String> source,String table){return source.filter(newFilterFunction<String>(){@Overridepublicbooleanfilter(String row)throwsException{try{JSONObject rowJson = JSON.parseObject(row);JSONObject source = rowJson.getJSONObject("source");String tbl = source.getString("table");return table.equals(tbl);}catch(Exception ex){
                    ex.printStackTrace();returnfalse;}}});}privatestaticList<String>getTableList(){List<String> tables =newArrayList<>();String sql ="SELECT TABLE_SCHEMA,TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '"+ SYNC_DB +"'";List<JSONObject> tableList =JdbcUtil.executeQuery(MYSQL_HOST, MYSQL_PORT, MYSQL_USER, MYSQL_PASSWD, sql);for(JSONObject jsob : tableList){String schemaName = jsob.getString("TABLE_SCHEMA");String tblName = jsob.getString("TABLE_NAME");String schemaTbl = schemaName +"."+ tblName;if(SYNC_TABLES.contains(schemaTbl)){
                tables.add(tblName);}}return tables;}}

  JdbcUtil.java

import com.alibaba.fastjson.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.*;
import java.util.ArrayList;
import java.util.List;

public class JdbcUtil {

    static {try{//Class.forName("com.mysql.cj.jdbc.Driver");Class.forName("com.mysql.jdbc.Driver");}catch(ClassNotFoundException e){
            e.printStackTrace();}}

    private static final Logger LOG = LoggerFactory.getLogger(JdbcUtil.class);

    public static void main(String[] args) throws SQLException {}

    public static List<JSONObject> executeQuery(String hostUrl, int port, String user, String password, String sql){
        List<JSONObject> beJson = new ArrayList<>();
        String connectionUrl = String.format("jdbc:mysql://%s:%s/league_test?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghai", hostUrl, port);
        Connection con = null;try{
            con = DriverManager.getConnection(connectionUrl, user, password);
            PreparedStatement ps = con.prepareStatement(sql);
            ResultSet rs = ps.executeQuery();
            beJson = resultSetToJson(rs);}catch(SQLException e){
            e.printStackTrace();}catch(Exception e){
            e.printStackTrace();}finally{try{
                con.close();}catch(Exception e){}}return beJson;}

    private static List<JSONObject> resultSetToJson(ResultSet rs) throws SQLException {
        List<JSONObject> list = new ArrayList<>();
        ResultSetMetaData metaData = rs.getMetaData();
        int columnCount = metaData.getColumnCount();while(rs.next()){
            JSONObject jsonObj = new JSONObject();for(int i = 1; i <= columnCount; i++){
                String columnName = metaData.getColumnLabel(i);
                String value = rs.getString(columnName);
                jsonObj.put(columnName, value);}
            list.add(jsonObj);}return list;}}

  pom.xml:

<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.4.0</version></dependency>
2.1 Mysql 打开 bin-log 功能

  og_bin 的Value如果为ON代表开启,如果为OFF代表关闭,MySQL8.0默认是开启的:

# 查看是否开启binlog
mysql>SHOW VARIABLES LIKE'%log_bin%';

  关闭状态:

在这里插入图片描述

  • log_bin为ON代表MySQL已经开启binlog日志记录
  • log_bin_basename配置了binlog的文件路径及文件前缀名
  • log_bin_index配置了binlog索引文件的路径

  开启状态:

在这里插入图片描述

# 在centos中mysql的配置文件一般都在/etc/mysql目录下,如果不在可以通过 find / -name "my.cnf" 查找
vi /etc/mysql/my.cnf

# 服务ID
server-id=1
# binlog 配置 只要配置了log_bin地址 就会开启
log_bin = /var/lib/mysql/mysql_bin
# 日志存储天数 默认0 永久保存# 如果数据库会定期归档,建议设置一个存储时间不需要一直存储binlog日志,理论上只需要存储归档之后的日志
expire_logs_days = 30
# binlog最大值
max_binlog_size = 1024M
# 规定binlog的格式,binlog有三种格式statement、row、mixad,默认使用statement,建议使用row格式
binlog_format = ROW
# 在提交n次事务后,进行binlog的落盘,0为不进行强行的刷新操作,而是由文件系统控制刷新日志文件,如果是在线交易和账有关的数据建议设置成1,如果是其他数据可以保持为0即可
sync_binlog = 1

# 重启MySQL服务使配置生效
systemctl restart mysqld / service mysql restart

# 查看日志列表
SHOW MASTER LOGS;

可参考:MySQL 开启配置binlog以及通过binlog恢复数据

2.2 在 Mysql 中建库建表准备
CREATE DATABASE IF NOT EXISTS cloud_test;
CREATE DATABASE IF NOT EXISTS league_test;

CREATE TABLE league_test.oc_settle_profit (
        id                           varchar(32),
        show_profit_id               varchar(32),
        order_no                     varchar(32),
        from_user_id                 varchar(32),
        from_user_type               int(11),
        user_id                      varchar(32),
        user_type                    int(11),
        rate                         int(11),
        amount                       int(11),type                         int(11),
        add_time                     datetime,
        state                        int(11),
        expect_profit_time           datetime,
        profit_time                  datetime,
        profit_mode                  int(11),
        opt_code                     varchar(32),
        opt_name                     varchar(32),
        acct_circle                  varchar(32),
        process_state                int(11),
        parent_id                    varchar(32),
        keep_account_from_user_id    varchar(32),
        keep_account_from_bm_user_id varchar(32),
        keep_account_user_id         varchar(32),
        keep_account_bm_user_id      varchar(32),
        biz_type                     int(11),
        remark                       varchar(32),
        contribute_user_id           varchar(32),
        relation_brand_owner_id      varchar(32),
        PRIMARY KEY (id)USING BTREE
);

CREATE TABLE cloud_test.dws_profit_record_hdj_flink_api (
        id                      varchar(32),
        show_profit_id          varchar(32),
        order_no                varchar(32),
        from_user_id            varchar(32),
        from_user_type          int(11),
        user_id                 varchar(32),
        user_type               int(11),
        amount                  int(11),
        profit_time             datetime,
        state                   int(11),
        acct_circle             varchar(32),
        biz_type                int(11),
        contribute_user_id      varchar(32),
        relation_brand_owner_id varchar(32),
        remark                  varchar(32),
        add_time                datetime,
        PRIMARY KEY (id)USING BTREE
        );
2.3 遇到的坑
2.3.1 The MySQL server has a timezone offset (0 seconds ahead of UTC)

  用 JDBC 连接 Mysql 的时候报错:

The MySQL server has a timezone offset (0 seconds ahead of UTC)

  原因:从错误即可知道是时区的错误。

show variables like '%time_zone%';
Variable_name   |Value |----------------+------+
time_zone       |SYSTEM|// 或者下面这条命令
SELECT @@global.time_zone;

  解决:使用 root 用户登录 mysql,再执行

set global time_zone='+8:00'

命令。

  注意:一开始改成了

SET GLOBAL time_zone = 'Asia/Shanghai'

,但并不好使。

2.3.2 自动转换datetime为时间戳问题

  Apache Flink 是一种流处理框架,它可以读取 MySQL binlog 日志,并将其转换为流数据。然而,由于Flink内部采用的时间戳格式与 MySQL 的 datetime 格式不同,所以在抓取 binlog 时,需要进行一定的转换才能正确地解析数据。

  自定义类:DateTimeConverter 实现 CustomConverter 接口,重写对应方法对 mysql 的时间类型进行标准转换

importio.debezium.spi.converter.CustomConverter;importio.debezium.spi.converter.RelationalColumn;importorg.apache.kafka.connect.data.SchemaBuilder;importjava.time.*;importjava.time.format.DateTimeFormatter;importjava.util.Properties;/**
 * @BelongsProject: 
 * @BelongsPackage: 
 * @Author: 
 * @CreateTime: 
 * @Description: TODO 实现CustomConverter接口,重写对应方法对mysql的时间类型进行标准转换
 * @Version: 1.0
 */publicclassDateTimeConverterimplementsCustomConverter<SchemaBuilder,RelationalColumn>{privateDateTimeFormatter dateFormatter =DateTimeFormatter.ISO_DATE;privateDateTimeFormatter timeFormatter =DateTimeFormatter.ISO_TIME;privateDateTimeFormatter datetimeFormatter =DateTimeFormatter.ISO_DATE_TIME;privateDateTimeFormatter timestampFormatter =DateTimeFormatter.ISO_DATE_TIME;privateZoneId timestampZoneId =ZoneId.systemDefault();@Overridepublicvoidconfigure(Properties props){}@OverridepublicvoidconverterFor(RelationalColumn column,ConverterRegistration<SchemaBuilder> registration){String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder =null;Converter converter =null;if("DATE".equals(sqlType)){
            schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
            converter =this::convertDate;}if("TIME".equals(sqlType)){
            schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
            converter =this::convertTime;}if("DATETIME".equals(sqlType)){
            schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
            converter =this::convertDateTime;}if("TIMESTAMP".equals(sqlType)){
            schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
            converter =this::convertTimestamp;}if(schemaBuilder !=null){
            registration.register(schemaBuilder, converter);}}privateStringconvertDate(Object input){if(input ==null)returnnull;if(input instanceofLocalDate){return dateFormatter.format((LocalDate) input);}if(input instanceofInteger){LocalDate date =LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}returnString.valueOf(input);}privateStringconvertTime(Object input){if(input ==null)returnnull;if(input instanceofDuration){Duration duration =(Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time =LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}returnString.valueOf(input);}privateStringconvertDateTime(Object input){if(input ==null)returnnull;if(input instanceofLocalDateTime){return datetimeFormatter.format((LocalDateTime) input).replaceAll("T"," ");}returnString.valueOf(input);}privateStringconvertTimestamp(Object input){if(input ==null)returnnull;if(input instanceofZonedDateTime){// mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间ZonedDateTime zonedDateTime =(ZonedDateTime) input;LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime).replaceAll("T"," ");}returnString.valueOf(input);}}

  引入:

publicclassFlinkSourceUtil{publicstaticMySqlSource<JSONObject>getMySqlSource(ParameterTool parameterTool,List<String> databases,List<String> tables){Properties props =newProperties();
        props.setProperty("useSSL","false");
        props.setProperty("allowPublicKeyRetrieval","true");Properties debeziumProperties =newProperties();
        debeziumProperties.setProperty("converters","dateConverters");
        debeziumProperties.setProperty("dateConverters.type","com.xxx.util.DateTimeConverter");
        debeziumProperties.setProperty("dateConverters.format.date","yyyy-MM-dd");
        debeziumProperties.setProperty("dateConverters.format.time","HH:mm:ss");
        debeziumProperties.setProperty("dateConverters.format.datetime","yyyy-MM-dd HH:mm:ss");
        debeziumProperties.setProperty("dateConverters.format.timestamp","yyyy-MM-dd HH:mm:ss");
        debeziumProperties.setProperty("dateConverters.format.timestamp.zone","UTC+8");String[] databaseArray = databases.toArray(newString[0]);String[] tableArray = tables.toArray(newString[0]);MySqlSource<JSONObject> mySqlSource =MySqlSource.<JSONObject>builder().hostname(parameterTool.get(Constant.MYSQl_SOURCE_HOST_NAME)).port(Integer.parseInt(parameterTool.get(Constant.MYSQl_SOURCE_PORT))).databaseList(databaseArray).tableList(tableArray).username(parameterTool.get(Constant.MYSQL_SOURCE_USER_NAME)).password(parameterTool.get(Constant.MYSQL_SOURCE_PASSWORD)).deserializer(newMyDebeziumDeserializationSchema()).debeziumProperties(debeziumProperties).startupOptions(StartupOptions.initial()).serverId(parameterTool.get(Constant.SERVER_ID)).jdbcProperties(props).build();return mySqlSource;}}

参考:
关于flinkCDC监听MySQL binlog时自动转换datetime为时间戳问题
FlinkCDC时间问题timestamp等
flink-core抓mysql-binlog,字段datetime会自动转换成时间戳,怎么解决?

2.3.3 重启程序会全量读取 binlog,我想要的是增量

  其实 MySqlSourceBuilder 是有一个方法特意指定

startUP mode

的,改造代码

MySqlSourceBuilder<String> mySqlSource =newMySqlSourceBuilder<>();
        mySqlSource.startupOptions(StartupOptions.latest());

        mySqlSource
                .hostname(MYSQL_HOST).port(MYSQL_PORT).databaseList(SYNC_DB)// set captured database.tableList(SYNC_TABLES)// set captured table.username(MYSQL_USER).password(MYSQL_PASSWD).deserializer(newJsonDebeziumDeserializationSchema())// converts SourceRecord to JSON String.debeziumProperties(debeziumProperties).build();StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.enableCheckpointing(5000);DataStreamSource<String> cdcSource = env.fromSource(mySqlSource.build(),WatermarkStrategy.noWatermarks(),"CDC Source"+"flinkcdc-kafka");

参考:Flink cdc如何只进行增量同步,不同步历史数据(只读取binlog)

2.4 测试

  Idea 启动程序后,在 oc_settle_profit 表中插入数据后 dws_profit_record_hdj_flink_api 也可以同步插入相应的数据。

参考:
【博学谷学习记录】超强总结,用心分享|大数据之flinkCDC
一次打通FlinkCDC同步Mysql数据

三、番外

  用 Flink CDC 可以监控 Mysql,但无法监控 StarRocks,和官方询问过,目前 StarRocks 并没有像 Mysql 这样被外部感知 DDL 操作的 bin-log 功能,所以暂时还无法用 Flink CDC 监控 StarRocks。

标签: flink mysql 大数据

本文转载自: https://blog.csdn.net/m0_37739193/article/details/143374062
版权归原作者 小强签名设计 所有, 如有侵权,请联系我们删除。

“Flink CDC 同步 Mysql 数据”的评论:

还没有评论