table.local-time-zone
table.local-time-zone
table.local-time-zone可用于设置flinksql的时区。
flink的内置数据类型TIMESTAMP(n)或者是TIMESTAMP_LTZ(n), 我们设置水位线都是基于这两种类型,不同的是前者本质是字符串形势,后者本质是long,也因此前者不受时区影响,后者受时区影响类型。(n指的毫秒级的精度取值范围是 0~9)
原始数据库如果不是时间类型,可能要用TO_TIMESTAMP(字符串格式的时间)或者TO_TIMESTAMP_LTZ(long数字,n)
如果原始数据库是string则需要用TO_TIMESTAMP(字符串格式的时间字段)转成TIMESTAMP(n)
如果原始数据库中是long则需要用TO_TIMESTAMP_LTZ(long数字,n) 转成TIMESTAMP_LTZ(n)
DataStream-to-Table Conversion(拓展知识)
datastream API到Table Api转换的时候,是以后string的形式传递event_time, 并且这个string在DataStream Api是以UTC时区转换的,如果你的原始数据中是long, 如果不做处理展示出来的string就是UTC字符串,为了在东八区展示,则需要将long再加上8小时
// 水位线 允许乱序WatermarkStrategy<String> waterStrategy =WatermarkStrategy.<String>forMonotonousTimestamps()//ofSeconds(20).withTimestampAssigner(newSerializableTimestampAssigner<String>(){@OverridepubliclongextractTimestamp(String element,long recordTimestamp){try{Mybook book= JSON.parseObject(element,Mybook.class);return boo.time+8*60*60*1000//转成东八区}catch(Exception e){return recordTimestamp;}}}).withIdleness(Duration.ofSeconds(timeWindowIdleness));SingleOutputStreamOperator<UserSlotGame> processStream = env
.fromSource(source, waterStrategy,"readKafka").process(newProcessFunction<String,UserSlotGame>(){@OverridepublicvoidprocessElement(String value,Context ctx,Collector<UserSlotGame> out)throwsException{// 省略}});
代码测试
mysql时区是Asia/Shanghai
CREATETABLE`versioned_rates`(`operation_code`intDEFAULTNULL,`update_time`varchar(255)DEFAULTNULL,-- 注意这是字符串`product_id`varchar(255)DEFAULTNULL,`product_name`varchar(255)DEFAULTNULL,`price`floatDEFAULTNULL,`time_long`bigintNOTNULLDEFAULT'0'-- 注意这是long)ENGINE=InnoDBDEFAULTCHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
INSERTINTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)VALUES(1,'2024-01-01 00:01:00','p_001','scooter',11.11,1730346179000);INSERTINTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)VALUES(1,'2024-01-01 00:02:00','p_002','basketball',23.11,1730346179000);INSERTINTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)VALUES(2,'2024-01-01 12:00:00','p_001','scooter',11.11,1730346179000);INSERTINTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)VALUES(3,'2024-01-01 12:00:00','p_001','scooter',12.99,1730346179000);INSERTINTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)VALUES(2,'2024-01-01 12:00:00','p_002','basketball',23.11,1730346179000);INSERTINTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)VALUES(3,'2024-01-01 12:00:00','p_002','basketball',19.99,1730346179000);INSERTINTO flink.versioned_rates
(operation_code, update_time, product_id, product_name, price, time_long)VALUES(4,'2024-01-01 18:00:00','p_001','scooter',12.99,1730346179000);
flinksql代码
packagecom.pg.TableAndDataStreamApi;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.table.api.config.TableConfigOptions;/*
*
* */publicclass version_table {privatestaticfinalString SOURCE="CREATE TABLE source_table(\n"+"\toperation_code int,\n"+"\tupdate_time string,\n"+"\tup_t AS TO_TIMESTAMP(update_time),\n"+"\ttime_long bigint,\n"+"\tbbb AS TO_TIMESTAMP_LTZ(time_long,3) \n"+" ) WITH (\n"+" 'connector' = 'jdbc',\n"+" 'url' = 'jdbc:mysql://ip:3306/flink',\n"+" 'driver'='com.mysql.cj.jdbc.Driver',\n "+" 'username'='用户名',\n"+" 'password'='密码',\n"+" 'table-name' = 'versioned_rates'\n"+")";publicstaticvoidmain(String[] args)throwsException{// 创建执行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);
tableEnv.executeSql(SOURCE);Configuration configuration =newConfiguration();// configuration.set(TableConfigOptions.LOCAL_TIME_ZONE, "UTC");
configuration.set(TableConfigOptions.LOCAL_TIME_ZONE,"Asia/Shanghai");
tableEnv.getConfig().addConfiguration(configuration);// 从 MySQL 表中选择所有行Table t = tableEnv.sqlQuery("select * from source_table");
t.execute().print();}}
执行结果截图
TO_TIMESTAMP_LTZ 受时区影响
而TO_TIMESTAMP()意味着原始数据中本就是string, 是不会受到时区影响的
- 下方第一个红色列不管是UTC还是 Asia/Shanghai 我们看大的string都是一样的
- 下方第一个红色列UTC比 Asia/Shanghai 少了8个小时
1. Asia/Shanghai 结果如下
2. UTC结果如下
版权归原作者 我先森 所有, 如有侵权,请联系我们删除。