0


flinkcdc数据采集代码FlinkAPI

1. flinkcdc数据采集代码:

背景

使用flinkcdc采集mysql数据到kafka,经过长达两个月的各种调试,终于把调试后的版本给写出来了,进行的全量加增量的数据采集,并写了一个窗口,每隔10min中更新一次每张表同步到的数据量,使用FlinkAPI代码实现

组件版本:
flink :flink-1.13.6-bin-scala_2.12
flinkcdc 2.2.1
mysql:5.7
kafka:kafka_2.12-3.0.0

依赖

<?xml version="1.0"encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkcdcAPI</artifactId><version>1.0-SNAPSHOT</version><properties><java.version>1.8</java.version><maven.compiler.source>${java.version}</maven.compiler.source><maven.compiler.target>${java.version}</maven.compiler.target><flink.version>1.13.6</flink.version><scala.version>2.12</scala.version><hadoop.version>3.1.3</hadoop.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava</artifactId><version>30.1.1-jre-15.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-shaded-guava</artifactId><version>18.0-13.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.68</version></dependency><!--如果保存检查点到hdfs上,需要引入此依赖--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version><scope>provided</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope><!--注解工具, 仅仅在 javac 编译的时候有用--></dependency><!--Flink默认使用的是slf4j记录日志,相当于一个日志的接口,我们这里使用log4j作为具体的日志实现--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. --><version>2.2.1</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>com.google.code.findbugs:jsr305</exclude><exclude>org.slf4j:*</exclude><exclude>log4j:*</exclude><exclude>org.apache.hadoop:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><!-- 打包时不复制META-INF下的签名文件,避免报非法签名文件的SecurityExceptions异常--><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers combine.children="append"><!-- The service transformer is needed to merge META-INF/services files --><!-- connector和format依赖的工厂类打包时会相互覆盖,需要使用ServicesResourceTransformer解决--><transformer
                                            implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/></transformers></configuration></execution></executions></plugin></plugins></build></project>

自定义序列化器

package flink;import com.alibaba.fastjson.JSONObject;import com.ververica.cdc.debezium.DebeziumDeserializationSchema;import io.debezium.data.Envelope;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;import java.util.*;

public class CustomerDeserialization implements DebeziumDeserializationSchema<String>{

   @Override
   public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {

       //1.创建 JSON 对象用于存储最终数据
       JSONObject result = new JSONObject();

       //2.获取库名&表名放入 source
       String topic = sourceRecord.topic();
       String[] fields = topic.split("\\.");

       String database = fields[1];
       String tableName = fields[2];

       Struct value =(Struct) sourceRecord.value();
       //3.获取"before"数据
       Struct before = value.getStruct("before");

       HashMap<String, Object> sourceOffset =(HashMap<String, Object>) sourceRecord.sourceOffset();
       Long ts_sec =((Number) sourceOffset.get("ts_sec")).longValue();

       JSONObject beforeJson = new JSONObject();if(before != null){
           Schema beforeSchema = before.schema();
           List<Field> beforeFields = beforeSchema.fields();for(Field field : beforeFields){
               Object beforeValue = before.get(field);
               beforeJson.put(field.name(), beforeValue);}}

       //4.获取"after"数据
       Struct after = value.getStruct("after");
       JSONObject afterJson = new JSONObject();if(after != null){
           Schema afterSchema = after.schema();
           List<Field> afterFields = afterSchema.fields();for(Field field : afterFields){
               Object afterValue = after.get(field);
               afterJson.put(field.name(), afterValue);
//               System.out.println(field.name()+":"+afterValue.getClass());}}

       //5.获取操作类型 CREATE UPDATE DELETE 进行符合 Debezium-op 的字母
       Envelope.Operation operation = Envelope.operationFor(sourceRecord);
       String type= operation.toString().toLowerCase();if("insert".equals(type)){type="c";}if("update".equals(type)){type="u";}if("delete".equals(type)){type="d";}if("create".equals(type)){type="c";}
       //6.将字段写入 JSON 对象
//       result.put("source", source);

       result.put("database",database);
       result.put("table",tableName);
       result.put("before", beforeJson);
       result.put("after", afterJson);
       result.put("op", type);
       result.put("ts", ts_sec);
       //7.输出数据
       collector.collect(result.toJSONString());}

   @Override
   public TypeInformation<String>getProducedType(){return BasicTypeInfo.STRING_TYPE_INFO;}}

时间转化器(时差和时间类型的转化)

package flink;import io.debezium.spi.converter.CustomConverter;import io.debezium.spi.converter.RelationalColumn;import org.apache.kafka.connect.data.SchemaBuilder;import java.time.*;import java.time.format.DateTimeFormatter;import java.util.Properties;

/**
 * @Description:实现CustomConverter接口,重写对应方法对mysql的时间类型进行标准转换
 * @author: WuBo
 * @date:2022/10/11 11:50
 */

public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn>{

    private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;

    private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;

    private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;

    private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;

    private ZoneId timestampZoneId = ZoneId.systemDefault();

    @Override
    public void configure(Properties props){}

    @Override
    public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration){

        String sqlType = column.typeName().toUpperCase();

        SchemaBuilder schemaBuilder = null;

        Converter converter = null;if("DATE".equals(sqlType)){

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");

            converter = this::convertDate;}if("TIME".equals(sqlType)){

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");

            converter = this::convertTime;}if("DATETIME".equals(sqlType)){

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");

            converter = this::convertDateTime;}if("TIMESTAMP".equals(sqlType)){

            schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");

            converter = this::convertTimestamp;}if(schemaBuilder != null){

            registration.register(schemaBuilder, converter);}}

    private String convertDate(Object input){if(input == null)return null;if(input instanceof LocalDate){return dateFormatter.format((LocalDate) input);}if(input instanceof Integer){

            LocalDate date= LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}return String.valueOf(input);}

    private String convertTime(Object input){if(input == null)return null;if(input instanceof Duration){

            Duration duration =(Duration) input;

            long seconds = duration.getSeconds();

            int nano= duration.getNano();

            LocalTime time= LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}return String.valueOf(input);}

    private String convertDateTime(Object input){if(input == null)return null;if(input instanceof LocalDateTime){return datetimeFormatter.format((LocalDateTime) input).replaceAll("T", " ");}return String.valueOf(input);}

    private String convertTimestamp(Object input){if(input == null)return null;if(input instanceof ZonedDateTime){

            // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间

            ZonedDateTime zonedDateTime =(ZonedDateTime) input;

            LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime).replaceAll("T", " ");}return String.valueOf(input);}}

主程序

package flink;import com.ververica.cdc.connectors.mysql.source.MySqlSource;import com.ververica.cdc.connectors.mysql.table.StartupOptions;import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;import org.apache.flink.api.common.eventtime.WatermarkStrategy;import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.functions.MapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.serialization.SerializationSchema;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.common.state.ValueState;import org.apache.flink.api.common.state.ValueStateDescriptor;import org.apache.flink.api.java.functions.KeySelector;import org.apache.flink.api.java.tuple.Tuple;import org.apache.flink.api.java.tuple.Tuple2;import org.apache.flink.api.java.tuple.Tuple3;import org.apache.flink.configuration.Configuration;import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.datastream.DataStreamSource;import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.ProcessFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;import org.apache.flink.streaming.api.functions.windowing.WindowFunction;import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;import org.apache.flink.streaming.api.windowing.time.Time;import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;import org.apache.flink.streaming.api.windowing.windows.TimeWindow;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;import org.apache.flink.util.Collector;import java.text.SimpleDateFormat;import java.util.ArrayList;import java.util.Arrays;import java.util.Date;import java.util.Properties;

public class FlinkCDC {

    public static void main(String[] args) throws Exception {

        String topic ="sapgateway";
        String brokers ="hadoop102:9092,hadoop103:9092,hadoop104:9092";
        //1.创建执行环境
        StreamExecutionEnvironment env=      StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);

        //2.Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,需要从Checkpoint或者Savepoint启动程序
        //2.1 开启Checkpoint,每隔10分鐘做一次CK
        env.enableCheckpointing(100*6000L, CheckpointingMode.EXACTLY_ONCE);//头和头的之间
//        env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);//头和头的之间
        env.getCheckpointConfig().setCheckpointTimeout(10 * 60000L);
        //2.3 设置任务关闭的时候保留最后一次CK数据
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100*6000L);//头和尾
//        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L);//头和尾

        //2.4 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(20, 5000));

        //2.5 设置状态后端
//        env.setStateBackend(new HashMapStateBackend());
        //开启增量检查点
        env.setStateBackend(new EmbeddedRocksDBStateBackend(true));
        env.getCheckpointConfig().enableUnalignedCheckpoints(false);

        // 3. 设置 checkpoint 的存储路径
//        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/ck/" + topic);
        env.getCheckpointConfig().setCheckpointStorage("hdfs://mycluster:8020/ck/" + topic);

        //2.6 设置访问HDFS的用户名
        System.setProperty("HADOOP_USER_NAME", "sarah");

        //3.创建Flink-MySQL-CDC的Source
        //自定义时间转换配置
        Properties properties = new Properties();
        properties.setProperty("converters", "dateConverters");
        properties.setProperty("dateConverters.type", "flink.MySqlDateTimeConverter");

        //4.定义jdbc配置

//        MySqlSource mysqlCdcSource = MySqlSource.<String>builder()
//                .hostname("47.52.185.61")
//                .port(3306)
//                .username("dw_readonly")
//                .password("jjhpM#b#Z0")
//                .serverTimeZone("America/Los_Angeles")
//                .databaseList("db_sjfood")
//                .tableList("db_sjfood.tb_010_sjfood_zsd01")
//                .debeziumProperties(properties)
//                .scanNewlyAddedTableEnabled(true)
//                .deserializer(new CustomerDeserialization()) // converts SourceRecord to JSON String
//                .build();
//

        //构建mysqlSource
        MySqlSource mysqlCdcSource = MySqlSource.<String>builder()
                .hostname("xx.xxx.xxx.xx")
                .port(3306)
                .username("sapgateway")
                .password("xxxxxx")
                .databaseList("sap_gateway")
                .tableList("sap_gateway.VBAK",
                        "sap_gateway.VBAP",
                        "sap_gateway.VBFA",
                        "sap_gateway.LIPS",
                        "sap_gateway.ZTSD039")
                .serverId("5100-6200")
                .fetchSize(8192)
                .splitSize(10240)
                .debeziumProperties(properties)
//                .scanNewlyAddedTableEnabled(true)
                .deserializer(new CustomerDeserialization()) // converts SourceRecord to JSON String
                .build();

        //使用CDC Source从MySQL读取数据
        DataStreamSource<String> mysqlSourceDS = env.fromSource(mysqlCdcSource, WatermarkStrategy.noWatermarks(), "MysqlSource");

        //打印数据并将数据写入 Kafka
        mysqlSourceDS.addSink(getKafkaProducer(brokers, topic)).name(topic).uid(topic + "uid3");

//写一个窗口,记录每张表同步到的数据量
        mysqlSourceDS
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>(){
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        String[] data = value.split("\"table\"");
                        data[1]="table" + data[1];
                        String[] data2 = data[1].split(",");
                        out.collect(Tuple2.of(data2[0], 1));}})
                .keyBy(value -> value.f0) // 使用 f0 作为 key
                .window(SlidingProcessingTimeWindows.of(Time.minutes(10), Time.minutes(10))) // 5分钟的窗口,每5秒滑动一次
                .apply(new CustomWindowFunction())
                .print().setParallelism(1);

        env.execute("FlinkCDC");}

//自定义窗口
        private static class CustomWindowFunction implements WindowFunction<Tuple2<String, Integer>, Tuple3<String, String, Integer>, String, TimeWindow>{
            @Override
            public void apply(String key, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple3<String, String, Integer>> out) throws Exception {
                int count =0;for(Tuple2<String, Integer> element : input){
                    count += element.f1;}
                out.collect(Tuple3.of(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(window.getStart())), key, count));}}

    //kafka 生产者
    public static FlinkKafkaProducer<String> getKafkaProducer(String brokers,String topic){
        Properties props = new Properties();
        props.setProperty("bootstrap.servers",brokers);
        props.put("buffer.memory", 53554432);
//        props.put("batch.size", 131072);
        props.put("linger.ms", 10);
        props.put("max.request.size", 10485760);
        props.put("acks", "1");
        props.put("retries", 10);
        props.put("retry.backoff.ms", 500);

        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer(topic,new SimpleStringSchema(), props);return producer;}}
标签: flink mysql kakfa

本文转载自: https://blog.csdn.net/m0_37759590/article/details/132557714
版权归原作者 m0_37759590 所有, 如有侵权,请联系我们删除。

“flinkcdc数据采集代码FlinkAPI”的评论:

还没有评论