0


【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准

关联文章:
各种时间类型和timezone关系浅析

一、测试目的和值

1. 测试一般的数据库不含time zone的类型的时区。

  • mysql timestamp(3) 类型
  • postgres timestamp(3) 类型
  • sqlserver datetime2(3) 类型
  • oracle类型 TIMESTAMP(3) 类型 在以下测试之中均为ts字段

2.测试CDC中元数据

op_ts

时区

op_tsTIMESTAMP_LTZ(3) NOT NULL当前记录表在数据库中更新的时间。如果从表的快照而不是 binlog 读取记录,该值将始终为0。|

在以下测试中cdc表建表均使用

ts_ms TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL

表示。
cdc在读取表时候分两个阶段:

  1. 全量读取阶段,特点是jdbc读取,读取数据中op=r
  2. 增量读取阶段,特点是log读取,读取数据中op=c或u或d op在截图中看到如3="r" 或者 3="r",3是op字段的索引值。ts_ms在全量阶段读取数据以下成为READ数据``````ts_ms在增量阶段读取数据以下成为CREATE数据

3. flink 数据时间表示和时区

flink Table中时间必须使用

org.apache.flink.table.data.TimestampData

对象表示。

@PublicEvolvingpublicfinalclassTimestampDataimplementsComparable<TimestampData>{privatefinallong millisecond;privatefinalint nanoOfMillisecond;}

此类型使用如下两个值联合表示记录时间。并不记录时区数据。

实战测试:

@TestpublicvoidtestTimeZone(){// 常识:Epoch就是值utc的0时间点,是全局绝对时间点,本质是`ZoneOffset.of("+0")`下的0时间。与`January 1, 1970, 00:00:00 GMT`视为等同。  // GMT是前世界标准时,UTC是现世界标准时。UTC 比 GMT更精准,以原子时计时,适应现代社会的精确计时。  // 28800000=8*3600*1000。8小时毫秒值。  // 如下时间是+8时区的数据库存储的不带时区的时间:2023-09-28T09:43:20.320  long ts=1695894200320L;// 如果将ts当做utc时间0时刻转为字符串则会导致时间+8 hour。2023-09-28 17:43:20。这是一般常用的在线转换时间的结果。因其默认是是epoch时间,所以转换后会+8h。  // 可见数据库读取的不带timezone时间的毫秒值,并不是以utc0时间(epoch)为基准的,而是以当前时区0为基准的。  // LocalDateTime对象本质支持LocalDate和LocalTime两个对象,LocalDate持有Integer的`年`,`月`,`日`。LocalTime则持有Integer的`时`,`分`,`秒`等和java.util.Date类型并不一样。  // LocalDateTime 的带有ZoneOffset方法比较难理解,此处:  // epochSecond 当然值的是epoch的秒数,是绝对时间概念和`java.util.Date.getTime()/1000`对应的,而offset是指此epoch秒数需要偏移的时间量。  // 内部代码是`long localSecond = epochSecond + offset.getTotalSeconds();`。  // 如下代码是正确的,因为java中的`java.util.Date`类和`java.sql.Timestamp`类型都是持有绝对时间的类,`Date.getTime`获得也是相对于Epoch的毫秒值(Returns the number of milliseconds since January 1, 1970, 00:00:00 GMT)。  LocalDateTime ldtFromDate =LocalDateTime.ofEpochSecond(newDate().getTime()/1000,0,ZoneOffset.of("+8"));System.out.println(ldtFromDate);// 2023-09-28T16:16:45。此时时钟也是16:17:44。  Date date0 =newDate(0);// number of milliseconds since the standard base time known as "the epoch"  System.out.println(date0.getTime());// 0, date0.getTime()方法返回绝对时间Returns the number of milliseconds since January 1, 1970, 00:00:00 GMT  // 如下的提供`ZoneOffset.UTC`可以理解是告诉LocalDateTime我提供的epochSecond已是`localSecond=当地时间-当地时间的0点`不需要再做转换了。  LocalDateTime ldt0 =LocalDateTime.ofEpochSecond(0L,0,ZoneOffset.UTC);System.out.println(ldt0);// 1970-01-01T00:00  LocalDateTime ldt8 =LocalDateTime.ofEpochSecond(0L,0,ZoneOffset.of("+8"));System.out.println(ldt8);// 1970-01-01T08:00  // TimestampData 默认不会进行任何时区转换。也不存储任何时区信息。内部仅靠`long millisecond`和`int nanoOfMillisecond`存储信息,以便于序列化。  // millisecond 一般可以认为是本地时间。因其在toString方法中会不会进行时区转换,toString方法仅是调用了`toLocalDateTime()`,中进行简单运算,并最终调用`LocalDateTime.toString`方法。  TimestampData td0 =TimestampData.fromEpochMillis(0);// 相当于LocalDateTime.ofEpochSecond(0, 0, ZoneOffset.UTC)。  System.out.println(td0);// 1970-01-01T00:00。可见TimestampData输出转字符串的时间就是以utc时间为基准的这和java.util.Date类型是一致的。  LocalDateTime ldt =LocalDateTime.ofEpochSecond(  
            ts /1000,(int)(ts %1000*1_000_000),ZoneOffset.UTC);System.out.println(ldt);// 2023-09-28T09:43:20.320  TimestampData td =TimestampData.fromEpochMillis(ts);System.out.println(td);// 2023-09-28T09:43:20.320  Date date =newDate(ts);// 注意:参数date(the specified number of milliseconds since the standard base time known as "the epoch")应该是epoch但此时ts并不是epoch基准的而是本地local基准的。  System.out.println(date);// Thu Sep 28 17:43:20 CST 2023,CST就是北京时间了,其在toString方法中`BaseCalendar.Date date = normalize();`进行了时区转换即+8了。  }

4. 测试组件版本

  • flink 1.13
  • flink-cdc 2.2.1
  • flink-connector-jdbc 自己定制的,根据3.1.1-1.17版本修改而来。

二、本测试共测试四大数据库:

  • mysql
  • postgres
  • sqlserver
  • oracle

二、每种数据库测试8项:

  • database-SQL 直接从数据中读取数据,是测试的基准值
  • cdc-RowData 使用cdc的SQL API从数据库中读取值并在 com.ververica.cdc.debezium.table.AppendMetadataCollector#collect 方法中debug得到数据
  • cdc-SQL(测试除ts_ms的字段) 使用cdc的SQL API读取值使用flink sql-client查询,用于测试除ts_ms的字段。因ts_ms准确性需分两种情况讨论。
  • cdc-SQL-RealTime(测试ts_ms) 使用cdc的SQL API从读取值,左上角是系统时间,下侧是实时读取的数据。
  • cdc-Read数据(测试snapshot读取ts_ms字段) 测试snapshot读取ts_ms字段,即全量读取阶段的ts_ms值,按照flink-cdc官方解释此四个数据的全量阶段值均为0(1970-01-01 00:00:00)。非0即为不正确。
  • cdc-Create数据(测试incremental读取ts_ms字段) 测试incremental读取ts_ms字段,即增量读取阶段的ts_ms值。按照flink-cdc官方解释此四个数据的增量阶段值为数据日志记录时间。
  • jdbc-RowData 使用flink SQL API 读取connector是jdbc的表数据org.apache.flink.connector.jdbc.table.JdbcRowDataInputFormat#nextRecord的方法中debug得到数据。。不含tm_ms数据。
  • jdbc-SQL 使用flink SQL API 读取connector是jdbc的表数据。使用flink sql-client查询。。不含tm_ms数据。

三、测试过程数据

3.1 mysql

3.1.1 database-SQL

在这里插入图片描述

3.1.2 cdc-RowData

在这里插入图片描述

3.1.3 cdc-SQL(测试除ts_ms的字段)

![![[image-20230927163847043.png|201]]](https://img-blog.csdnimg.cn/5dc1e844eca540e6926facd3209de6a2.png)

3.1.4 cdc-SQL-RealTime(测试ts_ms)

如下:上侧(win系统显示时间截图),下侧(cdc-query的ts_ms)
如果基本一致(不是差值8h),说明cdc-query的ts_ms是正确的的。
![![[image-20230928132434484.png|325]]](https://img-blog.csdnimg.cn/54bebac09bb142358481f76a4765f1d7.png)

3.1.5 cdc-Read数据(测试snapshot读取ts_ms字段)

![![[image-20230928100333641.png]]](https://img-blog.csdnimg.cn/28ed59b1533c4a83a293388ac4a9141c.png)

3.1.6 cdc-Create数据(测试incremental读取ts_ms字段)

![![[image-20230928101529479.png]]](https://img-blog.csdnimg.cn/1459d1a99ba049a4885643fe5e12fbe1.png)

3.1.7 jdbc-RowData

![![[image-20230927172538194.png]]](https://img-blog.csdnimg.cn/3c9f9929ae3741348e5d166ccc3877f3.png)

3.1.8 jdbc-SQL

![![[image-20230927171613530.png|206]]](https://img-blog.csdnimg.cn/c3dbe1b468f34e96b9a62a14f04b5afd.png)

3.2 postgres

3.2.1 database-SQL

![![[image-20230927145744323.png]]](https://img-blog.csdnimg.cn/873dec1457494561a1316ac96c66ec84.png)

3.2.2 cdc

cdc-RowData
![![[image-20230927145825569.png]]](https://img-blog.csdnimg.cn/4a6a0f529e834255b1352b3faff7b043.png)

3.2.3 cdc-SQL(测试除ts_ms的字段)

![![[image-20230927151801248.png|200]]](https://img-blog.csdnimg.cn/8723056622d94037ae316ad8f93e17c9.png)

3.2.4 cdc-SQL-RealTime(测试ts_ms)

![![[image-20230928132850256.png|325]]](https://img-blog.csdnimg.cn/6bb31356054e434d9bf898fe03ccb8f8.png)

3.2.5 cdc-Read数据(测试snapshot读取ts_ms字段)

![![[image-20230928095911025.png]]](https://img-blog.csdnimg.cn/b1eecb29feae44438e8248e9825cd816.png)

3.2.6 cdc-Create数据(测试incremental读取ts_ms字段)

![![[image-20230928101453266.png]]](https://img-blog.csdnimg.cn/35fb9eda1450498e8c0c3d42b55d44e1.png)

3.2.7 jdbc

jdbc-RowData
![![[image-20230927173637049.png]]](https://img-blog.csdnimg.cn/6dfd4c6ce62543a2ae003ca41d9bb2a0.png)

3.2.8 jdbc-SQL

![![[image-20230927173456643.png|212]]](https://img-blog.csdnimg.cn/85e25320e64a42a0b37fe7e0b44b1e78.png)

3.3 sqlserver

3.3.1 database-SQL

![![[image-20230927163637993.png]]](https://img-blog.csdnimg.cn/6fd846687b8b45b49563f0ae5a5ecaef.png)

3.3.2 cdc-RowData

![![[image-20230927163611807.png]]](https://img-blog.csdnimg.cn/6195b9e69d834df0a9a8f4f0e66d6512.png)

3.3.3 cdc-SQL(测试除ts_ms的字段)

![![[image-20230927163808365.png|192]]](https://img-blog.csdnimg.cn/a687aacb21ad49429e8399fe212eb305.png)

3.3.4 cdc-SQL-RealTime(测试ts_ms)

![![[image-20230928133349412.png|350]]](https://img-blog.csdnimg.cn/67433a3da0164965bda4e402804d9f65.png)

3.3.5 cdc-Read数据(测试snapshot读取ts_ms字段)

![![[image-20230928094006306.png]]](https://img-blog.csdnimg.cn/dd454f19c0964c4fa192f2b95923de50.png)

3.3.6 cdc-Create数据(测试incremental读取ts_ms字段)

![![[image-20230928101415704.png]]](https://img-blog.csdnimg.cn/c61cf74393b749c0a41dbe0c8b22ea19.png)

3.3.7 jdbc-RowData

![![[image-20230927174904854.png]]](https://img-blog.csdnimg.cn/c54c477e82af450c9d1025de0203bac6.png)

3.3.8 jdbc-SQL

![![[image-20230927182456589.png|194]]](https://img-blog.csdnimg.cn/0fbdf30efa8d493b90bb09b048f2c1b1.png)

3.4 oracle

3.4.1 database-SQL

![![[image-20230927160526864.png]]](https://img-blog.csdnimg.cn/d4437dea8a974810b1c3e8618cc19fb2.png)

3.4.2 cdc-RowData

![![[image-20230927160425443.png]]](https://img-blog.csdnimg.cn/feb7073c5dbb4541afea6d29a678eed1.png)

3.4.3 cdc-SQL(测试除ts_ms的字段)

![![[image-20230927160753056.png|191]]](https://img-blog.csdnimg.cn/878d4f1b78024de9a0fdf3627b58b507.png)

3.4.3 cdc-SQL-RealTime(测试ts_ms)

![![[image-20230928133736851.png|400]]](https://img-blog.csdnimg.cn/103cab6bf1384886ada08e6ef8b09de5.png)

3.4.4 cdc-Read数据(测试snapshot读取ts_ms字段)

![![[image-20230928101223538.png]]](https://img-blog.csdnimg.cn/e25acb0bd5074b7ab61bc4849a4f73fe.png)

3.4.5 cdc-Create数据(测试incremental读取ts_ms字段)

![![[image-20230928101030948.png]]](https://img-blog.csdnimg.cn/c908efb2f41e4111957d65e4adce0f21.png)

3.4.7 jdbc-RowData

![![[image-20230927183056565.png]]](https://img-blog.csdnimg.cn/c800b6c5934f4c2ea949e910e5c17e89.png)

3.4.8 jdbc-SQL

![![[image-20230927182935788.png|203]]](https://img-blog.csdnimg.cn/cbb5a435bc994775af31c8a3d371df2d.png)

四、结论

(1)数据库获取的

without time zone

在flink中都是以本地时间的存储的。可以使用

LocalDateTime.ofEpochSecond(long epochSecond, int nanoOfSecond, ZoneOffset.UTC)

直接获取。
(2)Flink中的TimestampData中存储的一般可以认为是本地时间。但需要注意:TimestampData 不可将

instant 相关方法

localDateTime 、Timestamp 相关方法

混用。因为instant代表与epoch时间差。而后两者代表与local是时间差。
(3)Flink程序中时间的标准值都是local本地的。因其在Sql API(sql-client)中打印出的结果会与原始数据库中打印的一致。

如下图中红色字体的是错误的数据,使用CDC需要额外注意并进行转换。
![![[image-20230928164847790.png]]](https://img-blog.csdnimg.cn/c2bb1dc870384bd681d0af8b07756372.png)

五、附录

5.1 查询数据库时区SQL

-- mysql 以:time_zone 为准,system_time_zone至服务器时区show variables like'%time_zone%';-- postgresshowtime zone;-- sqlserverDECLARE@TimeZone NVARCHAR(255)EXEC
master.dbo.xp_instance_regread
N'HKEY_LOCAL_MACHINE',
N'SYSTEM\CurrentControlSet\Control\TimeZoneInformation',
N'TimeZoneKeyName',@TimeZone
OUTPUT
SELECT@TimeZone-- oracleselect dbtimezone from dual;
标签: flink 大数据

本文转载自: https://blog.csdn.net/lisacumt/article/details/133387085
版权归原作者 lisacumt 所有, 如有侵权,请联系我们删除。

“【时区】Flink JDBC 和CDC时间字段时区 测试及时间基准”的评论:

还没有评论