//自定义时间转换配置
properties.setProperty("converters","dateConverters");
properties.setProperty("dateConverters.type","com.ysservice.utils.MySqlDateTimeConverter");//构建mysqlSourceMySqlSource mysqlCdcSource =MySqlSource.<String>builder().hostname(FlinkConfig.source_hostname).port(3306).databaseList(databaseNameArray)// set captured database.tableList(inputTableArray)// set captured table.username(FlinkConfig.source_username).password(FlinkConfig.source_password).serverId(FlinkConfig.source_serverId).serverTimeZone("Asia/Shanghai").debeziumProperties(properties).deserializer(newDorisJsonDebeziumDeserializationSchema(inputOutputTableName,tableConfigBean))// 传入gp的表名映射.build();
packagecom.ysservice.utils;importio.debezium.spi.converter.CustomConverter;importio.debezium.spi.converter.RelationalColumn;importorg.apache.kafka.connect.data.SchemaBuilder;importjava.time.*;importjava.time.format.DateTimeFormatter;importjava.util.Properties;/**
* @Description:实现CustomConverter接口,重写对应方法对mysql的时间类型进行标准转换
* @author: WuBo
* @date:2022/10/11 11:50
*/publicclassMySqlDateTimeConverterimplementsCustomConverter<SchemaBuilder,RelationalColumn>{privateDateTimeFormatter dateFormatter =DateTimeFormatter.ISO_DATE;privateDateTimeFormatter timeFormatter =DateTimeFormatter.ISO_TIME;privateDateTimeFormatter datetimeFormatter =DateTimeFormatter.ISO_DATE_TIME;privateDateTimeFormatter timestampFormatter =DateTimeFormatter.ISO_DATE_TIME;privateZoneId timestampZoneId =ZoneId.systemDefault();@Overridepublicvoidconfigure(Properties props){}@OverridepublicvoidconverterFor(RelationalColumn column,ConverterRegistration<SchemaBuilder> registration){String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder =null;Converter converter =null;if("DATE".equals(sqlType)){
schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
converter =this::convertDate;}if("TIME".equals(sqlType)){
schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
converter =this::convertTime;}if("DATETIME".equals(sqlType)){
schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
converter =this::convertDateTime;}if("TIMESTAMP".equals(sqlType)){
schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
converter =this::convertTimestamp;}if(schemaBuilder !=null){
registration.register(schemaBuilder, converter);}}privateStringconvertDate(Object input){if(input ==null)returnnull;if(input instanceofLocalDate){return dateFormatter.format((LocalDate) input);}if(input instanceofInteger){LocalDate date =LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}returnString.valueOf(input);}privateStringconvertTime(Object input){if(input ==null)returnnull;if(input instanceofDuration){Duration duration =(Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time =LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}returnString.valueOf(input);}privateStringconvertDateTime(Object input){if(input ==null)returnnull;if(input instanceofLocalDateTime){return datetimeFormatter.format((LocalDateTime) input).replaceAll("T"," ");}returnString.valueOf(input);}privateStringconvertTimestamp(Object input){if(input ==null)returnnull;if(input instanceofZonedDateTime){// mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间ZonedDateTime zonedDateTime =(ZonedDateTime) input;LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime).replaceAll("T"," ");}returnString.valueOf(input);}}
关键代码:
其中的:com.ysservice.utils.MySqlDateTimeConverter,根据自己的MySqlDateTimeConverter类路径进行修改
全量阶段和增量阶段的时间问题还不一样,实测本方式能全部解决,解决的同学记得回来点个赞!
本文转载自: https://blog.csdn.net/WuBoooo/article/details/127387144
版权归原作者 普罗米修斯之火 所有, 如有侵权,请联系我们删除。
版权归原作者 普罗米修斯之火 所有, 如有侵权,请联系我们删除。