0


实测解决 flink cdc mysql 时间字段差8小时/差13小时问题

//自定义时间转换配置
        properties.setProperty("converters","dateConverters");
        properties.setProperty("dateConverters.type","com.ysservice.utils.MySqlDateTimeConverter");//构建mysqlSourceMySqlSource mysqlCdcSource =MySqlSource.<String>builder().hostname(FlinkConfig.source_hostname).port(3306).databaseList(databaseNameArray)// set captured database.tableList(inputTableArray)// set captured table.username(FlinkConfig.source_username).password(FlinkConfig.source_password).serverId(FlinkConfig.source_serverId).serverTimeZone("Asia/Shanghai").debeziumProperties(properties).deserializer(newDorisJsonDebeziumDeserializationSchema(inputOutputTableName,tableConfigBean))// 传入gp的表名映射.build();
packagecom.ysservice.utils;importio.debezium.spi.converter.CustomConverter;importio.debezium.spi.converter.RelationalColumn;importorg.apache.kafka.connect.data.SchemaBuilder;importjava.time.*;importjava.time.format.DateTimeFormatter;importjava.util.Properties;/**
 * @Description:实现CustomConverter接口,重写对应方法对mysql的时间类型进行标准转换
 * @author: WuBo
 * @date:2022/10/11 11:50
 */publicclassMySqlDateTimeConverterimplementsCustomConverter<SchemaBuilder,RelationalColumn>{privateDateTimeFormatter dateFormatter =DateTimeFormatter.ISO_DATE;privateDateTimeFormatter timeFormatter =DateTimeFormatter.ISO_TIME;privateDateTimeFormatter datetimeFormatter =DateTimeFormatter.ISO_DATE_TIME;privateDateTimeFormatter timestampFormatter =DateTimeFormatter.ISO_DATE_TIME;privateZoneId timestampZoneId =ZoneId.systemDefault();@Overridepublicvoidconfigure(Properties props){}@OverridepublicvoidconverterFor(RelationalColumn column,ConverterRegistration<SchemaBuilder> registration){String sqlType = column.typeName().toUpperCase();SchemaBuilder schemaBuilder =null;Converter converter =null;if("DATE".equals(sqlType)){

            schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");

            converter =this::convertDate;}if("TIME".equals(sqlType)){

            schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");

            converter =this::convertTime;}if("DATETIME".equals(sqlType)){

            schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");

            converter =this::convertDateTime;}if("TIMESTAMP".equals(sqlType)){

            schemaBuilder =SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");

            converter =this::convertTimestamp;}if(schemaBuilder !=null){

            registration.register(schemaBuilder, converter);}}privateStringconvertDate(Object input){if(input ==null)returnnull;if(input instanceofLocalDate){return dateFormatter.format((LocalDate) input);}if(input instanceofInteger){LocalDate date =LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}returnString.valueOf(input);}privateStringconvertTime(Object input){if(input ==null)returnnull;if(input instanceofDuration){Duration duration =(Duration) input;long seconds = duration.getSeconds();int nano = duration.getNano();LocalTime time =LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}returnString.valueOf(input);}privateStringconvertDateTime(Object input){if(input ==null)returnnull;if(input instanceofLocalDateTime){return datetimeFormatter.format((LocalDateTime) input).replaceAll("T"," ");}returnString.valueOf(input);}privateStringconvertTimestamp(Object input){if(input ==null)returnnull;if(input instanceofZonedDateTime){// mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间ZonedDateTime zonedDateTime =(ZonedDateTime) input;LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime).replaceAll("T"," ");}returnString.valueOf(input);}}

关键代码:

在这里插入图片描述
其中的:com.ysservice.utils.MySqlDateTimeConverter,根据自己的MySqlDateTimeConverter类路径进行修改

全量阶段和增量阶段的时间问题还不一样,实测本方式能全部解决,解决的同学记得回来点个赞!

标签: mysql flink java

本文转载自: https://blog.csdn.net/WuBoooo/article/details/127387144
版权归原作者 普罗米修斯之火 所有, 如有侵权,请联系我们删除。

“实测解决 flink cdc mysql 时间字段差8小时/差13小时问题”的评论:

还没有评论