0


Flink DataStream API CDC同步MySQL数据到StarRocks

一、版本信息

  • Flink:1.16.1

二代码实现

  • pom文件如下
<?xml version="1.0" encoding="UTF-8"?><projectxmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.wys</groupId><artifactId>flink</artifactId><version>1.0.0</version><packaging>jar</packaging><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.16.1</flink.version><flink-cdc.version>2.3.0</flink-cdc.version><slf4j.version>1.7.30</slf4j.version></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-loader</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-runtime</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-base</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><!-- mysql-cdc fat jar --><dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-mysql-cdc</artifactId><version>${flink-cdc.version}</version></dependency><!-- flink webui --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web</artifactId><version>${flink.version}</version></dependency><!--日志相关的依赖 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>${slf4j.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-to-slf4j</artifactId><version>2.14.0</version><scope>provided</scope></dependency><!--flink-connector-starrocks --><dependency><groupId>com.starrocks</groupId><artifactId>flink-connector-starrocks</artifactId><version>1.2.9_flink-1.16</version><scope>provided</scope></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.60</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.12</version></dependency></dependencies></project>
  • Java代码
packagecom.wys.flink;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.configuration.RestOptions;importorg.apache.flink.streaming.api.CheckpointingMode;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importcom.wys.flink.bean.DataCenterShine;importcom.wys.flink.util.DataStreamUtil;importcom.wys.flink.util.SourceAndSinkInfo;publicclassDataStreamMySQLToStarRocks{publicstaticvoidmain(String[] args)throwsException{// 流执行环境Configuration conf =newConfiguration();// 设置WebUI绑定的本地端口
        conf.setString(RestOptions.BIND_PORT,"8081");// 使用配置StreamExecutionEnvironment env =StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        env.enableCheckpointing(180000l,CheckpointingMode.EXACTLY_ONCE);//设置source和sink的ip端口等信息SourceAndSinkInfo info=SourceAndSinkInfo.builder().sourceIp("ip").sourcePort(3306).sourceUserName("root").sourcePassword("****").sinkIp("ip").sinkPort(9030).sinkUserName("root").sinkPassword("").build();//设置DataCenterShine实体类对应表的source和sinkDataStreamUtil.setStarRocksSourceAndSink(env, info,DataCenterShine.class);//可以设置多个同步//DataStreamUtil.setStarRocksSourceAndSink(env, info, Organization.class);//定义任务名称
        env.execute("data_center_shine_job");}}
  • SourceAndSinkInfo 类,用于定义source和sink的IP、端口、账号、密码信息
packagecom.wys.flink.util;importlombok.AllArgsConstructor;importlombok.Builder;importlombok.Data;importlombok.NoArgsConstructor;@Data@Builder@AllArgsConstructor@NoArgsConstructorpublicclassSourceAndSinkInfo{/**
     * 数据源ip
     */privateString sourceIp;/**
     * 数据源端口
     */privateint sourcePort;/**
     * 数据源账号
     */privateString sourceUserName;/**
     * 数据源密码
     */privateString sourcePassword;/**
     * 输出源ip
     */privateString sinkIp;/**
     * 输出源端口
     */privateint sinkPort;/**
     * 输出源账号
     */privateString sinkUserName;/**
     * 输出源密码
     */privateString sinkPassword;}
  • DataCenterShine实体类,字段与数据库一一对应。
packagecom.wys.flink.bean;importcom.wys.flink.annotation.FieldInfo;importcom.wys.flink.annotation.TableName;importlombok.Data;importlombok.EqualsAndHashCode;importjava.io.Serializable;/**
 * <p>
 * 业务类型映射表
 * </p>
 *
 * @author wys
 * @since 2023-05-23 11:16:24
 */@Data@TableName("wsmg.data_center_shine")@EqualsAndHashCode(callSuper=false)publicclassDataCenterShineextendsStarRocksPrimaryimplementsSerializable{privatestaticfinallong serialVersionUID =1L;/**
     * 主键
     */@FieldInfo(order =1,isPrimaryKey=true,notNull=true)privateInteger id;/**
     * mapper名称
     */@FieldInfo(order =2)privateString busName;/**
     * mapper类名
     */@FieldInfo(order =3)privateString mapperClassName;/**
     * 实体类名称
     */@FieldInfo(order =4)privateString entityClassName;}
  • StarRocksPrimary 实体类
packagecom.wys.flink.bean;importorg.apache.flink.types.RowKind;importlombok.Data;@DatapublicclassStarRocksPrimary{/**
     * 用于存储StarRocks数据类型:增、删、改
     */privateRowKind rowKind;}
  • FieldInfo注解类,用于标记字段序号、是否为主键、是否为空,后续生成TableSchema需要使用到。
packagecom.wys.flink.annotation;importjava.lang.annotation.ElementType;importjava.lang.annotation.Retention;importjava.lang.annotation.RetentionPolicy;importjava.lang.annotation.Target;@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.FIELD})public@interfaceFieldInfo{/**
     * 字段排序:插入的字段顺序。
     * @return
     */intorder();/**
     * 是否为主键:StarRocks主键模型时需要使用
     * @methodName isPrimaryKey
     * @return boolean
     * @author wys
     * @date 2023-12-12
     */booleanisPrimaryKey()defaultfalse;/**
     * 不为空:字段是否为空
     * @methodName notNull
     * @return boolean
     * @author wys
     * @date 2023-12-12
     */booleannotNull()defaultfalse;}
  • TableName 注解类,用于记录实体类对应的库与表
packagecom.wys.flink.annotation;importjava.lang.annotation.ElementType;importjava.lang.annotation.Retention;importjava.lang.annotation.RetentionPolicy;importjava.lang.annotation.Target;@Retention(RetentionPolicy.RUNTIME)@Target({ElementType.TYPE})public@interfaceTableName{/***
     * 表名:库名.表名称,如:sys.user
     * @return
     */Stringvalue();}
  • DataStreamUtil工具类,用于设置source和sink。目前定义了MySQL同步到MySQL以及MySQL同步到StarRocks。
packagecom.wys.flink.util;importjava.util.function.Supplier;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.streaming.api.datastream.DataStreamSource;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.wys.flink.annotation.TableName;importcom.wys.flink.bean.DataCenterShine;importcom.wys.flink.sink.MysqlAndStarRocksSink;publicclassDataStreamUtil{/**
     * MySQL同步到MySQL的数据源和输出源设置
     * @methodName setMySQLSourceAndSink
     * @param env
     * @param info
     * @param cls void
     * @author wys
     * @date 2023-12-12
     *//*@SuppressWarnings({ "unchecked", "rawtypes" })
    public static <T> void setMySQLSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls) {
        setSourceAndSink(env, info, cls, ()->new MysqlAndStarRocksSink(cls,info.getSinkIp(), info.getSinkPort()));
    }*//**
     * MySQL同步到StarRocks的数据源和输出源设置
     * @methodName setStarRocksSourceAndSink
     * @param env
     * @param info
     * @param cls void
     * @author wys
     * @date 2023-12-12
     */publicstatic<T>voidsetStarRocksSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls){setSourceAndSink(env, info, cls,()->StarRocksSinkUtil.getStarRocksSink(cls, info));}/**
     * 数据源和输出源设置
     * @methodName setSourceAndSink
     * @param env
     * @param info
     * @param cls
     * @param sink void
     * @author wys
     * @date 2023-12-12
     */@SuppressWarnings({"unchecked","rawtypes"})privatestatic<T>voidsetSourceAndSink(StreamExecutionEnvironment env,SourceAndSinkInfo info,Class<T> cls,Supplier<SinkFunction<T>> sink){if(cls.isAnnotationPresent(TableName.class)){String table=cls.getAnnotation(TableName.class).value();String[] tableArr=table.split("\\.");// sourceMySqlSource<T> mySQLSource=MySqlSource.<DataCenterShine>builder().hostname(info.getSourceIp()).port(info.getSourcePort()).databaseList(tableArr[0])// 设置捕获的数据库, 如果需要同步整个数据库,请将tableList 设置为 ".*"..tableList(table)// 设置捕获的表.username(info.getSourceUserName()).password(info.getSourcePassword()).deserializer(newCustomDebeziumDeserializationSchema(cls)).build();// 流执行环境添加sourceDataStreamSource<T> source=env.fromSource(mySQLSource,WatermarkStrategy.noWatermarks(),tableArr[1]+"_source");// sink
            source.addSink(sink.get()).name(tableArr[1]+"_sink");}}}
  • StarRocksSinkUtil辅助类,用于设置StarRocksSink
packagecom.wys.flink.util;importjava.lang.reflect.Field;importjava.math.BigDecimal;importjava.util.Date;importjava.util.HashMap;importjava.util.Map;importjava.util.Map.Entry;importjava.util.regex.Matcher;importjava.util.regex.Pattern;importorg.apache.flink.streaming.api.functions.sink.SinkFunction;importorg.apache.flink.table.api.DataTypes;importorg.apache.flink.table.api.TableSchema;importorg.apache.flink.table.api.TableSchema.Builder;importorg.apache.flink.table.types.DataType;importcom.starrocks.connector.flink.StarRocksSink;importcom.starrocks.connector.flink.row.sink.StarRocksSinkRowBuilder;importcom.starrocks.connector.flink.table.sink.StarRocksSinkOptions;importcom.wys.flink.annotation.FieldInfo;importcom.wys.flink.annotation.TableName;importcom.wys.flink.bean.StarRocksPrimary;/**
 * StarRocksSink辅助类
 * @className StarRocksSinkUtil
 * @author wys
 * @date 2023-12-12
 */publicclassStarRocksSinkUtil{privatestaticfinalPatternTPATTERN=Pattern.compile("[A-Z0-9]");/**
     * 获取StarRocksSink
     * @methodName getStarRocksSink
     * @param cls
     * @param info
     * @return SinkFunction<T>
     * @author wys
     * @date 2023-12-12
     */@SuppressWarnings("serial")publicstatic<T>SinkFunction<T>getStarRocksSink(Class<T> cls,SourceAndSinkInfo info){Map<Integer,String> fieldMap =getFieldMap(cls);returnStarRocksSink.sink(getTableSchema(cls),getStarRocksSinkOptions(info, cls),newStarRocksSinkRowBuilder<T>(){@Overridepublicvoidaccept(Object[] objects,T beanDataJava){try{//反射设置objectsfor(Entry<Integer,String> entry : fieldMap.entrySet()){Field field = cls.getDeclaredField(entry.getValue());
                                field.setAccessible(true);Object obj = field.get(beanDataJava);
                                objects[entry.getKey()-1]= obj;}//设置该数据类型if(beanDataJava instanceofStarRocksPrimary){
                                objects[objects.length -1]=((StarRocksPrimary) beanDataJava).getRowKind().ordinal();}}catch(Exception e){
                            e.printStackTrace();}}});}/**
     * 获取FieldMap
     * 
     * @methodName initFieldMap void
     * @author wys
     * @date 2023-12-11
     */privatestatic<T>Map<Integer,String>getFieldMap(Class<T> cls){Map<Integer,String> fieldMap =newHashMap<>();Field[] fields = cls.getDeclaredFields();for(Field field : fields){if(field.isAnnotationPresent(FieldInfo.class)){
                fieldMap.put(field.getAnnotation(FieldInfo.class).order(), field.getName());}}return fieldMap;}/**
     * 获取TableSchema
     * @methodName getTableSchema
     * @param cls
     * @return TableSchema
     * @author wys
     * @date 2023-12-12
     */@SuppressWarnings("deprecation")privatestatic<T>TableSchemagetTableSchema(Class<T> cls){Builder builder =TableSchema.builder();Field[] fields = cls.getDeclaredFields();//反射设置TableSchemafor(Field field : fields){if(!field.isAnnotationPresent(FieldInfo.class)){continue;}FieldInfo fi = field.getAnnotation(FieldInfo.class);if(fi.isPrimaryKey()){
                builder.primaryKey(field.getName());}DataType dataType =getDataType(field.getType());if(fi.notNull()){
                dataType = dataType.notNull();}
            builder.field(humpToUnderlined(field.getName()), dataType);}return builder.build();}/**
     * 获取StarRocksSinkOptions
     * @methodName getStarRocksSinkOptions
     * @param info
     * @param cls
     * @return StarRocksSinkOptions
     * @author wys
     * @date 2023-12-12
     */privatestatic<T>StarRocksSinkOptionsgetStarRocksSinkOptions(SourceAndSinkInfo info,Class<T> cls){String table = cls.getAnnotation(TableName.class).value();String[] tableArr = table.split("\\.");returnStarRocksSinkOptions.builder().withProperty("jdbc-url",String.format("jdbc:mysql://%s:%s/%s", info.getSinkIp(), info.getSinkPort(), tableArr[0])).withProperty("load-url", info.getSinkIp()+":8030").withProperty("username", info.getSinkUserName()).withProperty("password", info.getSinkPassword()).withProperty("table-name", tableArr[1]).withProperty("database-name", tableArr[0]).withProperty("sink.properties.row_delimiter","\\x02").withProperty("sink.properties.column_separator","\\x01").withProperty("sink.buffer-flush.interval-ms","5000").build();}/**
     * 驼峰转下划线
     * 
     * @methodName humpToUnderlined
     * @param str
     * @return String
     * @author wys
     * @date 2023-12-12
     */privatestaticStringhumpToUnderlined(String str){Matcher matcher =TPATTERN.matcher(str);StringBuffer sb =newStringBuffer();while(matcher.find()){
            matcher.appendReplacement(sb,"_"+ matcher.group(0).toLowerCase());}
        matcher.appendTail(sb);return sb.toString();}/**
     * 获取数据类型
     * @methodName getDataType
     * @param cls
     * @return DataType
     * @author wys
     * @date 2023-12-12
     */privatestaticDataTypegetDataType(Class<?> cls){if(cls.equals(Integer.class)){returnDataTypes.INT();}elseif(cls.equals(String.class)){returnDataTypes.STRING();}elseif(cls.equals(Date.class)){returnDataTypes.TIMESTAMP();}elseif(cls.equals(BigDecimal.class)){returnDataTypes.DECIMAL(8,2);}thrownewRuntimeException("未找到属性相应类型");}}
  • CustomDebeziumDeserializationSchema实体类,自定义反序列化方案
packagecom.wys.flink.util;importjava.util.List;importorg.apache.flink.api.common.typeinfo.BasicTypeInfo;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.types.RowKind;importorg.apache.flink.util.Collector;importcom.alibaba.fastjson.JSONObject;importcom.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field;importcom.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct;importcom.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord;importcom.ververica.cdc.debezium.DebeziumDeserializationSchema;/**
 * 自定义反序列化方案
 * @className CustomDebeziumDeserializationSchema
 * @author wys
 * @date 2023-12-12
 */publicclassCustomDebeziumDeserializationSchema<T>implementsDebeziumDeserializationSchema<T>{privatestaticfinallong serialVersionUID =1L;privateClass<T> cls;publicCustomDebeziumDeserializationSchema(Class<T> cls){this.cls=cls;}/**
     * 只有after,则表明插入;若只有before,说明删除;若既有before,也有after,则代表更新
     * @methodName deserialize
     * @param sourceRecord
     * @param collector void
     * @author wys
     * @date 2023-12-12
     */@Overridepublicvoiddeserialize(SourceRecord sourceRecord,Collector<T> collector){JSONObject resJson =newJSONObject();try{Struct valueStruct =(Struct) sourceRecord.value();Struct afterStruct = valueStruct.getStruct("after");Struct beforeStruct = valueStruct.getStruct("before");// 修改if(null!=beforeStruct &&null!=afterStruct){setDataContent(afterStruct, resJson);
                resJson.put("rowKind",RowKind.UPDATE_AFTER);}// 插入elseif(null!= afterStruct){setDataContent(afterStruct, resJson);
                resJson.put("rowKind",RowKind.INSERT);}// 删除elseif(null!= beforeStruct ){setDataContent(beforeStruct, resJson);
                resJson.put("rowKind",RowKind.UPDATE_BEFORE);}}catch(Exception e){
            e.printStackTrace();thrownewRuntimeException("反序列化失败");}T t =resJson.toJavaObject(cls);
        collector.collect(t);}/**
     * 设置数据内容
     * @methodName setDataContent
     * @param struct
     * @param resJson void
     * @author wys
     * @date 2023-12-12
     */privatevoidsetDataContent(Struct struct,JSONObject resJson){List<Field> fields = struct.schema().fields();for(Field field : fields){String name = field.name();Object value = struct.get(name);
            resJson.put(name, value);}}@OverridepublicTypeInformation<T>getProducedType(){returnBasicTypeInfo.of(cls);}}

三、自定义MySQL同步数据到StarRocks

一、功能描述

  • 通过上传jar到Apache Flink Dashboard,输入需要同步的表,可自动生成任务

二、代码实现

packagecom.wys.flink;importjava.util.ArrayList;importjava.util.List;importorg.apache.flink.api.java.utils.ParameterTool;importorg.apache.flink.streaming.api.CheckpointingMode;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importcom.wys.flink.annotation.TableName;importcom.wys.flink.util.DataStreamUtil;importcom.wys.flink.util.SourceAndSinkInfo;/**
 * 自定义任务:--entity DataCenterShine,Organization
 * @className CustomStreamCDC
 * @author wys
 * @date 2023-12-11
 */publicclassStarRocksCustomStreamCDC{publicstaticvoidmain(String[] args)throwsException{List<Class<?>> clsList=newArrayList<>();StringBuilder jobName=newStringBuilder();ParameterTool parameters =ParameterTool.fromArgs(args);String entitys = parameters.get("entity",null);if(null==entitys){thrownewRuntimeException("在Program Arguments中输入需要同步表对应的实体类名称,格式:--entity User,Role...");}//获取参数内容这里是实体名称的数组String[] entityArr=entitys.split(",");for(String className:entityArr){Class<?> cls=getBeanClass(String.format("com.wys.flink.bean.%s", className));
            clsList.add(cls);
            jobName.append(cls.getSimpleName()).append("_");}
        jobName.append("job");// 流执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(180000l,CheckpointingMode.EXACTLY_ONCE);SourceAndSinkInfo ssi=SourceAndSinkInfo.builder().sourceIp("ip").sourcePort(3306).sourceUserName("root").sourcePassword("****").sinkIp("ip").sinkPort(9030).sinkUserName("root").sinkPassword("****").build();//设置输入输出源
        clsList.forEach(item->DataStreamUtil.setStarRocksSourceAndSink(env, ssi, item));
        env.execute(jobName.toString().toLowerCase());}/**
     * 获取class
     * @methodName getBeanClass
     * @param className 为全路径
     * @return Class<?>
     * @author wys
     * @date 2023-05-18
     */privatestaticClass<?>getBeanClass(String className){try{Class<?> cls=Class.forName(className);if(!cls.isAnnotationPresent(TableName.class)){thrownewRuntimeException("同步的实体类不存在@TableName");}return cls;}catch(ClassNotFoundException e){//抛出异常:获取Class失败thrownewRuntimeException(String.format("未找到实体类[%s]", className));}}}

三、Apache Flink Dashboard执行任务

  • 在Apache Flink Dashboard的Submit New Job菜单,上传打包的jar,输入执行的主类,以及需要同步的表所对应的实体类(多个逗号分割)在这里插入图片描述
  • 点击Submit生成相应任务在这里插入图片描述
标签: flink mysql 大数据

本文转载自: https://blog.csdn.net/qq_34052481/article/details/134924228
版权归原作者 舒适边缘 所有, 如有侵权,请联系我们删除。

“Flink DataStream API CDC同步MySQL数据到StarRocks”的评论:

还没有评论