0


flink-cdc之读取mysql变化数据

pom

<properties>
    <flink-version>1.13.0</flink-version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink-version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>${flink-version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>${flink-version}</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.16</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.12</artifactId>
        <version>${flink-version}</version>
    </dependency>

    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>2.2.1</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
</dependencies>

代码

注意开启checkpoint 和不开启是有区别的(savepoint也可以 启动的flink指定时候 -s savepath)

不开启,如果项目重启了,会重新读取所有的数据

开启了,项目重启了额,会根据保留的信息去读取变化的数据


import com.ververica.cdc.connectors.mysql.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkCDCTest {

    public static void main(String[] args) throws Exception {

        //1.获取Flink 执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //1.1 开启CK
//        env.enableCheckpointing(5000);
//        env.getCheckpointConfig().setCheckpointTimeout(10000);
//        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
//        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//        
//        env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/cdc-test/ck"));

        //2.通过FlinkCDC构建SourceFunction
        DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder()
                .hostname("9.134.70.1")
                .port(3306)
                .username("root")
                .password("xxxxxxx")
                .databaseList("cc")
                .tableList("cc.student")
//                .deserializer(new StringDebeziumDeserializationSchema())
                .deserializer(new JsonDebeziumDeserializationSchema())
                .startupOptions(StartupOptions.initial())
                .build();
        DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction);

        //3.数据打印
        dataStreamSource.print("==FlinkCDC==");

        //4.启动任务
        env.execute("FlinkCDC");

    }

}

mysql

数据库表

增加一条数据

打印日志 op:c 是create

==FlinkCDC==> {"before":null,"after":{"id":1,"name":"name1","age":"1"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1689245998000,"snapshot":"false","db":"cc","sequence":null,"table":"student","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":2781,"row":0,"thread":null,"query":null},"op":"c","ts_ms":1689246005928,"transaction":null}

修改一条数据 age=1 ->age=2 op=u 是update

flink打印日志

==FlinkCDC==> {"before":{"id":1,"name":"name1","age":"1"},"after":{"id":1,"name":"name1","age":"2"},"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1689246092000,"snapshot":"false","db":"cc","sequence":null,"table":"student","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":3049,"row":0,"thread":null,"query":null},"op":"u","ts_ms":1689246099578,"transaction":null}

删除这条数据 op=d 是delete

==FlinkCDC==> {"before":{"id":1,"name":"name1","age":"2"},"after":null,"source":{"version":"1.5.4.Final","connector":"mysql","name":"mysql_binlog_source","ts_ms":1689246163000,"snapshot":"false","db":"cc","sequence":null,"table":"student","server_id":1,"gtid":null,"file":"mysql-bin.000001","pos":3333,"row":0,"thread":null,"query":null},"op":"d","ts_ms":1689246170857,"transaction":null}

由于打印的日志太多 我们可以用fastjson稍微封装下 然后传给sink去处理,根据update delete insert实时更新下游数据,还有一个op=r 是读取的数据

还可以使用flinksql


import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class FlinkSqlCdc {
    public static void main(String[] args) throws Exception {

        //1.获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //2.使用FLINKSQL DDL模式构建CDC 表
        tableEnv.executeSql("CREATE TABLE user_info ( " +
                " id STRING primary key, " +
                " name int, " +
                " age STRING " +
                ") WITH ( " +
                " 'connector' = 'mysql-cdc', " +
                " 'scan.startup.mode' = 'latest-offset', " +
                " 'hostname' = '9.134.70.1', " +
                " 'port' = '3306', " +
                " 'username' = 'root', " +
                " 'password' = 'xxxxx', " +
                " 'database-name' = 'cc', " +
                " 'table-name' = 'student' " +
                ")");

        //3.查询数据并转换为流输出
        Table table = tableEnv.sqlQuery("select * from user_info");
        DataStream<Tuple2<Boolean, Row>> retractStream = tableEnv.toRetractStream(table, Row.class);
        retractStream.print();

        //4.启动
        env.execute("FlinkSQLCDC");

    }

}

同样是增删改查,flink打印日志如下

标签: flink mysql android

本文转载自: https://blog.csdn.net/cclovezbf/article/details/131709474
版权归原作者 cclovezbf 所有, 如有侵权,请联系我们删除。

“flink-cdc之读取mysql变化数据”的评论:

还没有评论