前言
flinkcdc单表同步比较简单,按照官方案例基本都能成功,多表异构同步、整库同步这块一直想尝试一下,社区说使用API可以做到,但是一直没能白嫖到可行方案(代码),然后自己动手尝试了下,咳咳,无奈技术太菜,java各种语法都搞的不是太明白,时间跨度蛮久,中间遇到了不少问题,中途偶然间在群里看到了很久很久以前群友发的一份同步方案,可惜缺少了反序列化的过程,借鉴过来改巴改巴(也改了好几个星期,太菜了),勉强是能跑了,分享出来,能帮到大家一点也就很好了。
方案思路
这个方案的整体思路我先说一下(大佬的思路,我借鉴的),首先我们先使用mysqlcatalog获取到各个表的信息(列名、列类型之类的),然后创建相应的sink table,然后flinkcdc的DataStream是提供了整库获取数据的能力的,所以我们就采用DataStream的方式拿到数据,然后在自定义反序列化里形成<tableName,Row>的输出,得到DataStream<<tableName,Row>,然后根据tableName将这个流拆分(过滤),就相当于一个tablename对应一个自己的DataStream,然后将每个流转为一个sourcetable,然后insert into sinktable select * from sourcetable,然后…gameover。
走起:
flink版本:1.15.2(1.15以下版本貌似还没有mysqlcatalog,如果要使用低版本,代码需要调整一下)
flink cdc版本:2.3.0
不巴拉了,直接上代码,场景是mysql -> mysql,sink端如果是其他数据库理论上应该是一样,source表需要有主键,这是flinkcdc底层约定好的,没有会报错。
packagecom.cityos;importcom.ververica.cdc.connectors.mysql.source.MySqlSource;importcom.ververica.cdc.connectors.mysql.table.StartupOptions;importorg.apache.commons.lang3.StringUtils;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.api.java.typeutils.RowTypeInfo;importorg.apache.flink.calcite.shaded.com.google.common.collect.Maps;importorg.apache.flink.connector.jdbc.catalog.MySqlCatalog;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Schema;importorg.apache.flink.table.api.StatementSet;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.table.catalog.DefaultCatalogTable;importorg.apache.flink.table.catalog.ObjectPath;importorg.apache.flink.table.runtime.typeutils.InternalTypeInfo;importorg.apache.flink.table.types.DataType;importorg.apache.flink.table.types.logical.LogicalType;importorg.apache.flink.table.types.logical.RowType;importorg.apache.flink.types.Row;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importjava.util.ArrayList;importjava.util.List;importjava.util.Map;/**
* flink cdc 整库同步
*/publicclassFlinkCdcMultiSyncJdbc{privatestaticfinalLogger log =LoggerFactory.getLogger(FlinkCdcMultiSyncJdbc.class);publicstaticvoidmain(String[] args)throwsException{// source端连接信息String userName ="root";String passWord ="18772247265Ldy@";String host ="localhost";String db ="flinktest1";// 如果是整库,tableList = ".*"String tableList ="lidy.nlp_category,lidy.nlp_classify_man_made3";int port =33306;// sink连接信息模板String sink_url ="jdbc:mysql://localhost:33306/flinktest?useUnicode=true&characterEncoding=UTF-8&serverTimezone=Asia/Shanghai";String sink_username ="root";String sink_password ="18772247265Ldy@";String connectorWithBody =" with (\n"+" 'connector' = 'jdbc',\n"+" 'url' = '${sink_url}',\n"+" 'username' = '${sink_username}',\n"+" 'password' = '${sink_password}',\n"+" 'table-name' = '${tableName}'\n"+")";
connectorWithBody = connectorWithBody.replace("${sink_url}", sink_url).replace("${sink_username}", sink_username).replace("${sink_password}", sink_password);StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(3000);StreamTableEnvironment tEnv =StreamTableEnvironment.create(env);// 注册同步的库对应的catalogMySqlCatalog mysqlCatalog =newMySqlCatalog("mysql-catalog", db, userName, passWord,String.format("jdbc:mysql://%s:%d", host, port));List<String> tables =newArrayList<>();// 如果整库同步,则从catalog里取所有表,否则从指定表中取表名if(".*".equals(tableList)){
tables = mysqlCatalog.listTables(db);}else{String[] tableArray = tableList.split(",");for(String table : tableArray){
tables.add(table.split("\\.")[1]);}}// 创建表名和对应RowTypeInfo映射的MapMap<String,RowTypeInfo> tableTypeInformationMap =Maps.newConcurrentMap();Map<String,DataType[]> tableDataTypesMap =Maps.newConcurrentMap();Map<String,RowType> tableRowTypeMap =Maps.newConcurrentMap();for(String table : tables){// 获取mysql catalog中注册的表ObjectPath objectPath =newObjectPath(db, table);DefaultCatalogTable catalogBaseTable =(DefaultCatalogTable) mysqlCatalog.getTable(objectPath);// 获取表的SchemaSchema schema = catalogBaseTable.getUnresolvedSchema();// 获取表中字段名列表String[] fieldNames =newString[schema.getColumns().size()];// 获取DataTypeDataType[] fieldDataTypes =newDataType[schema.getColumns().size()];LogicalType[] logicalTypes =newLogicalType[schema.getColumns().size()];// 获取表字段类型TypeInformation<?>[] fieldTypes =newTypeInformation[schema.getColumns().size()];// 获取表的主键List<String> primaryKeys = schema.getPrimaryKey().get().getColumnNames();for(int i =0; i < schema.getColumns().size(); i++){Schema.UnresolvedPhysicalColumn column =(Schema.UnresolvedPhysicalColumn) schema.getColumns().get(i);
fieldNames[i]= column.getName();
fieldDataTypes[i]=(DataType) column.getDataType();
fieldTypes[i]=InternalTypeInfo.of(((DataType) column.getDataType()).getLogicalType());
logicalTypes[i]=((DataType) column.getDataType()).getLogicalType();}RowType rowType =RowType.of(logicalTypes, fieldNames);
tableRowTypeMap.put(table, rowType);// 组装sink表ddl sqlStringBuilder stmt =newStringBuilder();String tableName = table;String jdbcSinkTableName =String.format("jdbc_sink_%s", tableName);
stmt.append("create table ").append(jdbcSinkTableName).append("(\n");for(int i =0; i < fieldNames.length; i++){String column = fieldNames[i];String fieldDataType = fieldDataTypes[i].toString();
stmt.append("\t").append(column).append(" ").append(fieldDataType).append(",\n");}
stmt.append(String.format("PRIMARY KEY (%s) NOT ENFORCED\n)",StringUtils.join(primaryKeys,",")));String formatJdbcSinkWithBody = connectorWithBody
.replace("${tableName}", jdbcSinkTableName);String createSinkTableDdl = stmt.toString()+ formatJdbcSinkWithBody;// 创建sink表
log.info("createSinkTableDdl: {}", createSinkTableDdl);
tEnv.executeSql(createSinkTableDdl);
tableDataTypesMap.put(tableName, fieldDataTypes);
tableTypeInformationMap.put(tableName,newRowTypeInfo(fieldTypes, fieldNames));}// 监控mysql binlogMySqlSource mySqlSource =MySqlSource.<Tuple2<String,Row>>builder().hostname(host).port(port).databaseList(db).tableList(tableList).username(userName).password(passWord).deserializer(newCustomDebeziumDeserializer(tableRowTypeMap)).startupOptions(StartupOptions.initial()).build();SingleOutputStreamOperator<Tuple2<String,Row>> dataStreamSource = env.fromSource(mySqlSource,WatermarkStrategy.noWatermarks(),"mysql cdc").disableChaining();StatementSet statementSet = tEnv.createStatementSet();// dataStream转Table,创建临时视图,插入sink表for(Map.Entry<String,RowTypeInfo> entry : tableTypeInformationMap.entrySet()){String tableName = entry.getKey();RowTypeInfo rowTypeInfo = entry.getValue();SingleOutputStreamOperator<Row> mapStream = dataStreamSource.filter(data -> data.f0.equals(tableName)).map(data -> data.f1, rowTypeInfo);Table table = tEnv.fromChangelogStream(mapStream);String temporaryViewName =String.format("t_%s", tableName);
tEnv.createTemporaryView(temporaryViewName, table);String sinkTableName =String.format("jdbc_sink_%s", tableName);String insertSql =String.format("insert into %s select * from %s", sinkTableName, temporaryViewName);
log.info("add insertSql for {},sql: {}", tableName, insertSql);
statementSet.addInsertSql(insertSql);}
statementSet.execute();}}
对应的反序列化代码
packagecom.cityos;importcom.ververica.cdc.debezium.DebeziumDeserializationSchema;importcom.ververica.cdc.debezium.table.DeserializationRuntimeConverter;importcom.ververica.cdc.debezium.utils.TemporalConversions;importio.debezium.data.Envelope;importio.debezium.data.SpecialValueDecimal;importio.debezium.data.VariableScaleDecimal;importio.debezium.time.*;importorg.apache.flink.api.common.typeinfo.TypeHint;importorg.apache.flink.api.common.typeinfo.TypeInformation;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.calcite.shaded.com.google.common.collect.Maps;importorg.apache.flink.table.data.DecimalData;importorg.apache.flink.table.data.RowData;importorg.apache.flink.table.data.StringData;importorg.apache.flink.table.data.TimestampData;importorg.apache.flink.table.types.logical.DecimalType;importorg.apache.flink.table.types.logical.LogicalType;importorg.apache.flink.table.types.logical.RowType;importorg.apache.flink.types.Row;importorg.apache.flink.types.RowKind;importorg.apache.flink.util.Collector;importorg.apache.kafka.connect.data.Decimal;importorg.apache.kafka.connect.data.Field;importorg.apache.kafka.connect.data.Schema;importorg.apache.kafka.connect.data.Struct;importorg.apache.kafka.connect.source.SourceRecord;importjava.math.BigDecimal;importjava.nio.ByteBuffer;importjava.time.Instant;importjava.time.LocalDateTime;importjava.time.ZoneId;importjava.util.Map;publicclassCustomDebeziumDeserializerimplementsDebeziumDeserializationSchema{/**
* Runtime converter that converts Kafka {@link SourceRecord}s into {@link RowData} consisted of
* physical column values.
*/privatefinalMap<String,RowType> tableRowTypeMap;privateMap<String,DeserializationRuntimeConverter> physicalConverterMap =Maps.newConcurrentMap();CustomDebeziumDeserializer(Map tableRowTypeMap){this.tableRowTypeMap = tableRowTypeMap;for(String tablename :this.tableRowTypeMap.keySet()){RowType rowType =this.tableRowTypeMap.get(tablename);DeserializationRuntimeConverter physicalConverter =createNotNullConverter(rowType);this.physicalConverterMap.put(tablename,physicalConverter);}}@Overridepublicvoiddeserialize(SourceRecord record,Collector out)throwsException{Envelope.Operation op =Envelope.operationFor(record);Struct value =(Struct) record.value();Schema valueSchema = record.valueSchema();Struct source = value.getStruct("source");String tablename = source.get("table").toString();DeserializationRuntimeConverter physicalConverter = physicalConverterMap.get(tablename);if(op ==Envelope.Operation.CREATE|| op ==Envelope.Operation.READ){Row insert =extractAfterRow(value, valueSchema, physicalConverter);
insert.setKind(RowKind.INSERT);
out.collect(Tuple2.of(tablename,insert));}elseif(op ==Envelope.Operation.DELETE){Row delete =extractBeforeRow(value, valueSchema, physicalConverter);
delete.setKind(RowKind.DELETE);
out.collect(Tuple2.of(tablename,delete));}else{Row before =extractBeforeRow(value, valueSchema, physicalConverter);
before.setKind(RowKind.UPDATE_BEFORE);
out.collect(Tuple2.of(tablename,before));Row after =extractAfterRow(value, valueSchema, physicalConverter);
after.setKind(RowKind.UPDATE_AFTER);
out.collect(Tuple2.of(tablename,after));}}privateRowextractAfterRow(Struct value,Schema valueSchema,DeserializationRuntimeConverter physicalConverter)throwsException{Schema afterSchema = valueSchema.field(Envelope.FieldName.AFTER).schema();Struct after = value.getStruct(Envelope.FieldName.AFTER);return(Row) physicalConverter.convert(after, afterSchema);}privateRowextractBeforeRow(Struct value,Schema valueSchema,DeserializationRuntimeConverter physicalConverter)throwsException{Schema beforeSchema = valueSchema.field(Envelope.FieldName.BEFORE).schema();Struct before = value.getStruct(Envelope.FieldName.BEFORE);return(Row) physicalConverter.convert(before, beforeSchema);}@OverridepublicTypeInformation<Tuple2<String,Row>>getProducedType(){returnTypeInformation.of(newTypeHint<Tuple2<String,Row>>(){});}publicstaticDeserializationRuntimeConvertercreateNotNullConverter(LogicalType type){switch(type.getTypeRoot()){caseNULL:returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){returnnull;}};caseBOOLEAN:returnconvertToBoolean();caseTINYINT:returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){returnByte.parseByte(dbzObj.toString());}};caseSMALLINT:returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){returnShort.parseShort(dbzObj.toString());}};caseINTEGER:caseINTERVAL_YEAR_MONTH:returnconvertToInt();caseBIGINT:caseINTERVAL_DAY_TIME:returnconvertToLong();caseDATE:returnconvertToDate();caseTIME_WITHOUT_TIME_ZONE:returnconvertToTime();caseTIMESTAMP_WITHOUT_TIME_ZONE:returnconvertToTimestamp(ZoneId.of("UTC"));caseTIMESTAMP_WITH_LOCAL_TIME_ZONE:returnconvertToLocalTimeZoneTimestamp(ZoneId.of("UTC"));caseFLOAT:returnconvertToFloat();caseDOUBLE:returnconvertToDouble();caseCHAR:caseVARCHAR:returnconvertToString();caseBINARY:caseVARBINARY:returnconvertToBinary();caseDECIMAL:returncreateDecimalConverter((DecimalType) type);caseROW:returncreateRowConverter((RowType) type);caseARRAY:caseMAP:caseMULTISET:caseRAW:default:thrownewUnsupportedOperationException("Unsupported type: "+ type);}}privatestaticDeserializationRuntimeConverterconvertToBoolean(){returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){if(dbzObj instanceofBoolean){return dbzObj;}elseif(dbzObj instanceofByte){return(byte) dbzObj ==1;}elseif(dbzObj instanceofShort){return(short) dbzObj ==1;}else{returnBoolean.parseBoolean(dbzObj.toString());}}};}privatestaticDeserializationRuntimeConverterconvertToInt(){returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){if(dbzObj instanceofInteger){return dbzObj;}elseif(dbzObj instanceofLong){return((Long) dbzObj).intValue();}else{returnInteger.parseInt(dbzObj.toString());}}};}privatestaticDeserializationRuntimeConverterconvertToLong(){returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){if(dbzObj instanceofInteger){return((Integer) dbzObj).longValue();}elseif(dbzObj instanceofLong){return dbzObj;}else{returnLong.parseLong(dbzObj.toString());}}};}privatestaticDeserializationRuntimeConvertercreateDecimalConverter(DecimalType decimalType){finalint precision = decimalType.getPrecision();finalint scale = decimalType.getScale();returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){BigDecimal bigDecimal;if(dbzObj instanceofbyte[]){// decimal.handling.mode=precise
bigDecimal =Decimal.toLogical(schema,(byte[]) dbzObj);}elseif(dbzObj instanceofString){// decimal.handling.mode=string
bigDecimal =newBigDecimal((String) dbzObj);}elseif(dbzObj instanceofDouble){// decimal.handling.mode=double
bigDecimal =BigDecimal.valueOf((Double) dbzObj);}else{if(VariableScaleDecimal.LOGICAL_NAME.equals(schema.name())){SpecialValueDecimal decimal =VariableScaleDecimal.toLogical((Struct) dbzObj);
bigDecimal = decimal.getDecimalValue().orElse(BigDecimal.ZERO);}else{// fallback to string
bigDecimal =newBigDecimal(dbzObj.toString());}}returnDecimalData.fromBigDecimal(bigDecimal, precision, scale);}};}privatestaticDeserializationRuntimeConverterconvertToDouble(){returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){if(dbzObj instanceofFloat){return((Float) dbzObj).doubleValue();}elseif(dbzObj instanceofDouble){return dbzObj;}else{returnDouble.parseDouble(dbzObj.toString());}}};}privatestaticDeserializationRuntimeConverterconvertToFloat(){returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){if(dbzObj instanceofFloat){return dbzObj;}elseif(dbzObj instanceofDouble){return((Double) dbzObj).floatValue();}else{returnFloat.parseFloat(dbzObj.toString());}}};}privatestaticDeserializationRuntimeConverterconvertToDate(){returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){return(int)TemporalConversions.toLocalDate(dbzObj).toEpochDay();}};}privatestaticDeserializationRuntimeConverterconvertToTime(){returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){if(dbzObj instanceofLong){switch(schema.name()){caseMicroTime.SCHEMA_NAME:return(int)((long) dbzObj /1000);caseNanoTime.SCHEMA_NAME:return(int)((long) dbzObj /1000_000);}}elseif(dbzObj instanceofInteger){return dbzObj;}// get number of milliseconds of the dayreturnTemporalConversions.toLocalTime(dbzObj).toSecondOfDay()*1000;}};}privatestaticDeserializationRuntimeConverterconvertToTimestamp(ZoneId serverTimeZone){returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){if(dbzObj instanceofLong){switch(schema.name()){caseTimestamp.SCHEMA_NAME:returnTimestampData.fromEpochMillis((Long) dbzObj);caseMicroTimestamp.SCHEMA_NAME:long micro =(long) dbzObj;returnTimestampData.fromEpochMillis(
micro /1000,(int)(micro %1000*1000));caseNanoTimestamp.SCHEMA_NAME:long nano =(long) dbzObj;returnTimestampData.fromEpochMillis(
nano /1000_000,(int)(nano %1000_000));}}LocalDateTime localDateTime =TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);returnTimestampData.fromLocalDateTime(localDateTime);}};}privatestaticDeserializationRuntimeConverterconvertToLocalTimeZoneTimestamp(ZoneId serverTimeZone){returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){if(dbzObj instanceofString){String str =(String) dbzObj;// TIMESTAMP_LTZ type is encoded in string typeInstant instant =Instant.parse(str);returnTimestampData.fromLocalDateTime(LocalDateTime.ofInstant(instant, serverTimeZone));}thrownewIllegalArgumentException("Unable to convert to TimestampData from unexpected value '"+ dbzObj
+"' of type "+ dbzObj.getClass().getName());}};}privatestaticDeserializationRuntimeConverterconvertToString(){returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){returnStringData.fromString(dbzObj.toString());}};}privatestaticDeserializationRuntimeConverterconvertToBinary(){returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema){if(dbzObj instanceofbyte[]){return dbzObj;}elseif(dbzObj instanceofByteBuffer){ByteBuffer byteBuffer =(ByteBuffer) dbzObj;byte[] bytes =newbyte[byteBuffer.remaining()];
byteBuffer.get(bytes);return bytes;}else{thrownewUnsupportedOperationException("Unsupported BYTES value type: "+ dbzObj.getClass().getSimpleName());}}};}privatestaticDeserializationRuntimeConvertercreateRowConverter(RowType rowType){finalDeserializationRuntimeConverter[] fieldConverters =
rowType.getFields().stream().map(RowType.RowField::getType).map(
logicType ->createNotNullConverter( logicType)).toArray(DeserializationRuntimeConverter[]::new);finalString[] fieldNames = rowType.getFieldNames().toArray(newString[0]);returnnewDeserializationRuntimeConverter(){privatestaticfinallong serialVersionUID =1L;@OverridepublicObjectconvert(Object dbzObj,Schema schema)throwsException{Struct struct =(Struct) dbzObj;int arity = fieldNames.length;Row row =newRow(arity);for(int i =0; i < arity; i++){String fieldName = fieldNames[i];Field field = schema.field(fieldName);if(field ==null){
row.setField(i,null);}else{Object fieldValue = struct.getWithoutDefault(fieldName);Schema fieldSchema = schema.field(fieldName).schema();Object convertedField =convertField(fieldConverters[i], fieldValue, fieldSchema);
row.setField(i, convertedField);}}return row;}};}privatestaticObjectconvertField(DeserializationRuntimeConverter fieldConverter,Object fieldValue,Schema fieldSchema)throwsException{if(fieldValue ==null){returnnull;}else{return fieldConverter.convert(fieldValue, fieldSchema);}}}
再贴上我的pom.xml
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.cityos</groupId><artifactId>flink_1_15</artifactId><version>1.0-SNAPSHOT</version><properties><java.version>1.8</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>2.3.7.RELEASE</spring-boot.version><flink.version>1.15.2</flink.version><scala.binary.version>2.12</scala.binary.version><!--<scala.version>2.12.12</scala.version>--></properties><repositories><repository><id>scala-tools.org</id><name>Scala-ToolsMaven2Repository</name><url>http://scala-tools.org/repo-releases</url></repository><repository><id>spring</id><url>https://maven.aliyun.com/repository/spring</url></repository><repository><id>cloudera</id><url>https://repository.cloudera.com/artifactory/cloudera-repos/</url></repository></repositories><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_${scala.binary.version}</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><!--<scope>provided</scope>--></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients</artifactId><version>${flink.version}</version></dependency><!-- flink-connector-kafka --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka</artifactId><version>${flink.version}</version></dependency><!-- flink-connector-jdbc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><!-- mysql-cdc--><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version></dependency><!-- mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.29</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 --><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.21</version><scope>compile</scope></dependency><!-- https://mvnrepository.com/artifact/log4j/log4j --><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.8.1</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding></configuration></plugin><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>2.3.7.RELEASE</version><configuration><mainClass>com.cityos.Flink1142Application</mainClass></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>
有兴趣的看看,没兴趣的或者感觉不屑的划过就好,莫喷我,代码写的确实是丑。
版权归原作者 云侣 所有, 如有侵权,请联系我们删除。