0


flink cdc DataStream api 时区问题

flink cdc DataStream api 时区问题

以postgrsql 作为数据源时,Date和timesatmp等类型cdc同步读出来时,会发现一下几个问题:

  1. 时间,日期等类型的数据对应的会转化为Intlong等类型。
  2. 源表同步后,时间相差8小时。这是因为时区不同的缘故。

源表:
在这里插入图片描述
sink 表:
在这里插入图片描述
解决方案:在自定义序列化时进行处理。
java code

  1. package pg.cdc.ds;import com.alibaba.fastjson.JSONObject;import com.ververica.cdc.debezium.DebeziumDeserializationSchema;import io.debezium.data.Envelope;import org.apache.flink.api.common.typeinfo.BasicTypeInfo;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.util.Collector;import org.apache.kafka.connect.data.Field;import org.apache.kafka.connect.data.Schema;import org.apache.kafka.connect.data.Struct;import org.apache.kafka.connect.source.SourceRecord;import java.text.SimpleDateFormat;import java.time.ZoneId;import java.util.Date;import java.util.List;
  2. public class CustomerDeserialization implements DebeziumDeserializationSchema<String>{
  3. ZoneId serverTimeZone;
  4. @Override
  5. public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
  6. //1.创建JSON对象用于存储最终数据
  7. JSONObject result = new JSONObject();
  8. Struct value =(Struct) sourceRecord.value();
  9. //2.获取库名&表名
  10. Struct sourceStruct = value.getStruct("source");
  11. String database = sourceStruct.getString("db");
  12. String schema = sourceStruct.getString("schema");
  13. String tableName = sourceStruct.getString("table");
  14. //3.获取"before"数据
  15. Struct before = value.getStruct("before");
  16. JSONObject beforeJson = new JSONObject();
  17. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  18. SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");if(before != null){
  19. Schema beforeSchema = before.schema();
  20. List<Field> beforeFields = beforeSchema.fields();for(Field field : beforeFields){
  21. Object beforeValue = before.get(field);if("int64".equals(field.schema().type().getName())&&"io.debezium.time.MicroTimestamp".equals(field.schema().name())){if(beforeValue != null){
  22. long times=(long) beforeValue / 1000;
  23. String dateTime = sdf.format(new Date((times -8*60*60*1000)));
  24. beforeJson.put(field.name(), dateTime);}}elseif("int64".equals(field.schema().type().getName())&&"io.debezium.time.NanoTimestamp".equals(field.schema().name())){if(beforeValue != null){
  25. long times=(long) beforeValue;
  26. String dateTime = sdf.format(new Date((times -8*60*60*1000)));
  27. beforeJson.put(field.name(), dateTime);}}elseif("int64".equals(field.schema().type().getName())&&"io.debezium.time.Timestamp".equals(field.schema().name())){if(beforeValue != null){
  28. long times=(long) beforeValue;
  29. String dateTime = sdf.format(new Date((times -8*60*60)));
  30. beforeJson.put(field.name(), dateTime);}}else if("int32".equals(field.schema().type().getName())&&"io.debezium.time.Date".equals(field.schema().name())){
  31. if(beforeValue != null){
  32. int times=(int) beforeValue;
  33. String dateTime = sdf1.format(new Date(times * 24 * 60 * 60L * 1000));
  34. beforeJson.put(field.name(), dateTime);}}else{
  35. beforeJson.put(field.name(), beforeValue);}}}
  36. //4.获取"after"数据
  37. Struct after = value.getStruct("after");
  38. JSONObject afterJson = new JSONObject();if(after != null){
  39. Schema afterSchema = after.schema();
  40. List<Field> afterFields = afterSchema.fields();for(Field field : afterFields){
  41. Object afterValue = after.get(field);if("int64".equals(field.schema().type().getName())&&"io.debezium.time.MicroTimestamp".equals(field.schema().name())){if(afterValue != null){
  42. long times=(long) afterValue / 1000;
  43. String dateTime = sdf.format(new Date((times -8*60*60*1000)));
  44. afterJson.put(field.name(), dateTime);}}elseif("int64".equals(field.schema().type().getName())&&"io.debezium.time.NanoTimestamp".equals(field.schema().name())){if(afterValue != null){
  45. long times=(long) afterValue;
  46. String dateTime = sdf.format(new Date((times -8*60*60*1000)));
  47. afterJson.put(field.name(), dateTime);}}elseif("int64".equals(field.schema().type().getName())&&"io.debezium.time.Timestamp".equals(field.schema().name())){if(afterValue != null){
  48. long times=(long) afterValue;
  49. String dateTime = sdf.format(new Date((times -8*60*60)));
  50. afterJson.put(field.name(), dateTime);}}else if("int32".equals(field.schema().type().getName())&&"io.debezium.time.Date".equals(field.schema().name())){
  51. if(afterValue != null){
  52. int times=(int) afterValue;
  53. String dateTime = sdf1.format(new Date(times * 24 * 60 * 60L * 1000));
  54. afterJson.put(field.name(), dateTime);}}else{
  55. afterJson.put(field.name(), afterValue);}}}
  56. //5.获取操作类型 CREATE UPDATE DELETE
  57. Envelope.Operation operation = Envelope.operationFor(sourceRecord);
  58. String type= operation.toString().toLowerCase();if("create".equals(type)||"read".equals(type)){type="insert";}
  59. //6.将字段写入JSON对象
  60. result.put("database", database);
  61. result.put("schema", schema);
  62. result.put("tableName", tableName);
  63. result.put("before", beforeJson);
  64. result.put("after", afterJson);
  65. result.put("type", type);
  66. //7.输出数据
  67. collector.collect(result.toJSONString());}
  68. @Override
  69. public TypeInformation<String>getProducedType(){return BasicTypeInfo.STRING_TYPE_INFO;}}

scala code

  1. import com.ververica.cdc.debezium.DebeziumDeserializationSchema
  2. import com.ververica.cdc.debezium.utils.TemporalConversions
  3. import io.debezium.time._
  4. import org.apache.flink.api.common.typeinfo.TypeInformation
  5. import org.apache.flink.types.Row
  6. import org.apache.flink.util.Collector
  7. import org.apache.kafka.connect.data.{SchemaBuilder, Struct}import org.apache.kafka.connect.source.SourceRecord
  8. import java.sql
  9. import java.time.{Instant, LocalDateTime, ZoneId}import scala.collection.JavaConverters._
  10. import scala.util.parsing.json.JSONObject
  11. class StructDebeziumDeserializationSchema(serverTimeZone: String) extends DebeziumDeserializationSchema[Row]{
  12. override def deserialize(sourceRecord: SourceRecord, collector: Collector[Row]): Unit ={
  13. // 解析主键
  14. val key = sourceRecord.key().asInstanceOf[Struct]
  15. val keyJs = parseStruct(key)
  16. // 解析值
  17. val value = sourceRecord.value().asInstanceOf[Struct]
  18. val source= value.getStruct("source")
  19. val before = parseStruct(value.getStruct("before"))
  20. val after = parseStruct(value.getStruct("after"))
  21. val row = Row.withNames()
  22. row.setField("table", s"${source.get("db")}.${source.get("table")}")
  23. row.setField("key", keyJs)
  24. row.setField("op", value.get("op"))
  25. row.setField("op_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(source.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))
  26. row.setField("current_ts", LocalDateTime.ofInstant(Instant.ofEpochMilli(value.getInt64("ts_ms")), ZoneId.of(serverTimeZone)))
  27. row.setField("before", before)
  28. row.setField("after", after)
  29. collector.collect(row)}
  30. /** 解析[[Struct]]结构为json字符串 */
  31. private def parseStruct(struct: Struct): String ={if(struct == null)return null
  32. val map = struct.schema().fields().asScala.map(field =>{
  33. val v= struct.get(field)
  34. val typ = field.schema().name()
  35. println(s"$v, $typ, ${field.name()}")
  36. val value =v match {case long if long.isInstanceOf[Long]=> convertLongToTime(long.asInstanceOf[Long], typ)case iv if iv.isInstanceOf[Int]=> convertIntToDate(iv.asInstanceOf[Int], typ)case iv if iv == null => null
  37. case _ => convertObjToTime(v, typ)}(field.name(), value)}).filter(_._2 != null).toMap
  38. JSONObject.apply(map).toString()}
  39. /** 类型转换 */
  40. private def convertObjToTime(obj: Any, typ: String): Any ={
  41. typ match {case Time.SCHEMA_NAME | MicroTime.SCHEMA_NAME | NanoTime.SCHEMA_NAME =>
  42. sql.Time.valueOf(TemporalConversions.toLocalTime(obj)).toString
  43. case Timestamp.SCHEMA_NAME | MicroTimestamp.SCHEMA_NAME | NanoTimestamp.SCHEMA_NAME | ZonedTimestamp.SCHEMA_NAME =>
  44. sql.Timestamp.valueOf(TemporalConversions.toLocalDateTime(obj, ZoneId.of(serverTimeZone))).toString
  45. case _ => obj
  46. }}
  47. /** long 转换为时间类型 */
  48. private def convertLongToTime(obj: Long, typ: String): Any ={
  49. val time_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Time")
  50. val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")
  51. val timestamp_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Timestamp")
  52. typ match {case Time.SCHEMA_NAME =>
  53. org.apache.kafka.connect.data.Time.toLogical(time_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
  54. case MicroTime.SCHEMA_NAME =>
  55. org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
  56. case NanoTime.SCHEMA_NAME =>
  57. org.apache.kafka.connect.data.Time.toLogical(time_schema, (obj / 1000 / 1000).asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalTime.toString
  58. case Timestamp.SCHEMA_NAME =>
  59. val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
  60. java.sql.Timestamp.valueOf(t).toString
  61. case MicroTimestamp.SCHEMA_NAME =>
  62. val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
  63. java.sql.Timestamp.valueOf(t).toString
  64. case NanoTimestamp.SCHEMA_NAME =>
  65. val t = org.apache.kafka.connect.data.Timestamp.toLogical(timestamp_schema, obj / 1000 / 1000).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDateTime
  66. java.sql.Timestamp.valueOf(t).toString
  67. case Date.SCHEMA_NAME =>
  68. org.apache.kafka.connect.data.Date.toLogical(date_schema, obj.asInstanceOf[Int]).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString
  69. case _ => obj
  70. }}
  71. private def convertIntToDate(obj:Int, typ: String): Any ={
  72. val date_schema = SchemaBuilder.int64().name("org.apache.kafka.connect.data.Date")
  73. typ match {case Date.SCHEMA_NAME =>
  74. org.apache.kafka.connect.data.Date.toLogical(date_schema, obj).toInstant.atZone(ZoneId.of(serverTimeZone)).toLocalDate.toString
  75. case _ => obj
  76. }}
  77. override def getProducedType: TypeInformation[Row]={
  78. TypeInformation.of(classOf[Row])}}

mysql cdc时区问题

mysql cdc也会出现上述时区问题,Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是写死的无法更改,导致数据库中设置的UTC+8,到kafka中变成了多八个小时的long型时间戳 Debezium默认将MySQL中的timestamp类型转成UTC的字符串。

解决思路有两种:

1:自定义序列化方式的时候做时区转换。
2:自定义时间转换类,通过debezium配置文件指定转化格式。

这里主要使用第二种方式。

  1. package com.zmn.schema;import io.debezium.spi.converter.CustomConverter;import io.debezium.spi.converter.RelationalColumn;import org.apache.kafka.connect.data.SchemaBuilder;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import java.time.*;import java.time.format.DateTimeFormatter;import java.util.Properties;import java.util.function.Consumer;
  2. /**
  3. * 处理Debezium时间转换的问题
  4. * Debezium默认将MySQL中datetime类型转成UTC的时间戳({@link io.debezium.time.Timestamp}),时区是写死的无法更改,
  5. * 导致数据库中设置的UTC+8,到kafka中变成了多八个小时的long型时间戳
  6. * Debezium默认将MySQL中的timestamp类型转成UTC的字符串。
  7. * | mysql | mysql-binlog-connector | debezium |
  8. * | ----------------------------------- | ---------------------------------------- | --------------------------------- |
  9. * | date<br>(2021-01-28)| LocalDate<br/>(2021-01-28)| Integer<br/>(18655)|
  10. * | time<br/>(17:29:04)| Duration<br/>(PT17H29M4S)| Long<br/>(62944000000)|
  11. * | timestamp<br/>(2021-01-28 17:29:04)| ZonedDateTime<br/>(2021-01-28T09:29:04Z)| String<br/>(2021-01-28T09:29:04Z)|
  12. * | Datetime<br/>(2021-01-28 17:29:04)| LocalDateTime<br/>(2021-01-28T17:29:04)| Long<br/>(1611854944000)|
  13. *
  14. * @see io.debezium.connector.mysql.converters.TinyIntOneToBooleanConverter
  15. */
  16. public class MySqlDateTimeConverter implements CustomConverter<SchemaBuilder, RelationalColumn>{
  17. private final static Logger logger = LoggerFactory.getLogger(MySqlDateTimeConverter.class);
  18. private DateTimeFormatter dateFormatter = DateTimeFormatter.ISO_DATE;
  19. private DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;
  20. private DateTimeFormatter datetimeFormatter = DateTimeFormatter.ISO_DATE_TIME;
  21. private DateTimeFormatter timestampFormatter = DateTimeFormatter.ISO_DATE_TIME;
  22. private ZoneId timestampZoneId = ZoneId.systemDefault();
  23. @Override
  24. public void configure(Properties props){
  25. readProps(props, "format.date", p -> dateFormatter = DateTimeFormatter.ofPattern(p));
  26. readProps(props, "format.time", p -> timeFormatter = DateTimeFormatter.ofPattern(p));
  27. readProps(props, "format.datetime", p -> datetimeFormatter = DateTimeFormatter.ofPattern(p));
  28. readProps(props, "format.timestamp", p -> timestampFormatter = DateTimeFormatter.ofPattern(p));
  29. readProps(props, "format.timestamp.zone", z -> timestampZoneId = ZoneId.of(z));}
  30. private void readProps(Properties properties, String settingKey, Consumer<String> callback){
  31. String settingValue =(String) properties.get(settingKey);if(settingValue == null || settingValue.length()==0){return;}
  32. try {
  33. callback.accept(settingValue.trim());} catch (IllegalArgumentException | DateTimeException e){
  34. logger.error("The {} setting is illegal: {}",settingKey,settingValue);
  35. throw e;}}
  36. @Override
  37. public void converterFor(RelationalColumn column, ConverterRegistration<SchemaBuilder> registration){
  38. String sqlType = column.typeName().toUpperCase();
  39. SchemaBuilder schemaBuilder = null;
  40. Converter converter = null;if("DATE".equals(sqlType)){
  41. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.date.string");
  42. converter = this::convertDate;}if("TIME".equals(sqlType)){
  43. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.time.string");
  44. converter = this::convertTime;}if("DATETIME".equals(sqlType)){
  45. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.datetime.string");
  46. converter = this::convertDateTime;}if("TIMESTAMP".equals(sqlType)){
  47. schemaBuilder = SchemaBuilder.string().optional().name("com.darcytech.debezium.timestamp.string");
  48. converter = this::convertTimestamp;}if(schemaBuilder != null){
  49. registration.register(schemaBuilder, converter);}}
  50. private String convertDate(Object input){if(input instanceof LocalDate){return dateFormatter.format((LocalDate) input);}if(input instanceof Integer){
  51. LocalDate date= LocalDate.ofEpochDay((Integer) input);return dateFormatter.format(date);}return null;}
  52. private String convertTime(Object input){if(input instanceof Duration){
  53. Duration duration =(Duration) input;
  54. long seconds = duration.getSeconds();
  55. int nano= duration.getNano();
  56. LocalTime time= LocalTime.ofSecondOfDay(seconds).withNano(nano);return timeFormatter.format(time);}return null;}
  57. private String convertDateTime(Object input){if(input instanceof LocalDateTime){return datetimeFormatter.format((LocalDateTime) input);}return null;}
  58. private String convertTimestamp(Object input){if(input instanceof ZonedDateTime){
  59. // mysql的timestamp会转成UTC存储,这里的zonedDatetime都是UTC时间
  60. ZonedDateTime zonedDateTime =(ZonedDateTime) input;
  61. LocalDateTime localDateTime = zonedDateTime.withZoneSameInstant(timestampZoneId).toLocalDateTime();return timestampFormatter.format(localDateTime);}return null;}}
使用方式:
  1. StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
  2. Properties properties = new Properties();
  3. properties.setProperty("snapshot.mode", "schema_only"); // 增量读取
  4. //自定义时间转换配置
  5. properties.setProperty("converters", "dateConverters");
  6. properties.setProperty("dateConverters.type", "pg.cdc.ds.PgSQLDateTimeConverter");
  7. properties.setProperty("dateConverters.format.date", "yyyy-MM-dd");
  8. properties.setProperty("dateConverters.format.time", "HH:mm:ss");
  9. properties.setProperty("dateConverters.format.datetime", "yyyy-MM-dd HH:mm:ss");
  10. properties.setProperty("dateConverters.format.timestamp", "yyyy-MM-dd HH:mm:ss");
  11. properties.setProperty("dateConverters.format.timestamp.zone", "UTC+8");
  12. properties.setProperty("debezium.snapshot.locking.mode","none"); //全局读写锁,可能会影响在线业务,跳过锁设置
  13. properties.setProperty("include.schema.changes", "true");
  14. // 使用flink mysql cdc 发现bigint unsigned类型的字段,capture以后转成了字符串类型,
  15. // 用的这个解析吧JsonDebeziumDeserializationSchema。
  16. properties.setProperty("bigint.unsigned.handling.mode","long");
  17. properties.setProperty("decimal.handling.mode","double");
  18. MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
  19. .hostname("192.168.10.102")
  20. .port(3306)
  21. .username("yusys")
  22. .password("yusys")
  23. .port(3306)
  24. .databaseList("gmall")
  25. .tableList("gmall.faker_user1")
  26. .deserializer(new JsonDebeziumDeserializationSchema())
  27. .debeziumProperties(properties)
  28. .serverId(5409)
  29. .build();
  30. SingleOutputStreamOperator<string> dataSource =env
  31. .addSource(sourceFunction).setParallelism(10).name("binlog-source");
标签: flink java kafka

本文转载自: https://blog.csdn.net/hellowangxiansheng/article/details/130124580
版权归原作者 活在风浪里~ 所有, 如有侵权,请联系我们删除。

“flink cdc DataStream api 时区问题”的评论:

还没有评论