Flink SQL 语法篇(一)
CREATE 语句用于向当前或指定的 Catalog 中注册库、表、视图或函数。注册后的库、表、视图和函数可以在 SQL 查询中使用。
目前 Flink SQL 支持下列 CREATE 语句:
CREATE TABLE
CREATE DATABASE
CREATE VIEW
CREATE FUNCTION
1.建表语句
下面的 SQL 语句就是建表语句的定义,根据指定的表名创建一个表,如果同名表已经在 Catalog 中存在了,则无法注册。
CREATETABLE[IFNOTEXISTS][catalog_name.][db_name.]table_name
(
{ <physical_column_definition>|<metadata_column_definition>|<computed_column_definition> }[,...n][<watermark_definition>][<table_constraint>][,...n])[COMMENT table_comment][PARTITIONED BY(partition_column_name1, partition_column_name2,...)]WITH(key1=val1, key2=val2,...)[LIKE source_table [(<like_options>)]]<physical_column_definition>:
column_name column_type [<column_constraint>][COMMENT column_comment]<column_constraint>:
[CONSTRAINT constraint_name]PRIMARYKEYNOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name]PRIMARYKEY(column_name,...)NOT ENFORCED
<metadata_column_definition>:
column_name column_type METADATA [FROM metadata_key ][ VIRTUAL ]<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<source_table>:
[catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL| CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[,...]
2.表中的列
2.1 常规列(物理列)
物理列 是数据库中所说的 常规列。其定义了物理介质中存储的数据中字段的名称、类型和顺序。其他类型的列可以在物理列之间声明,但不会影响最终的物理列的读取。
CREATETABLE MyTable (`user_id`BIGINT,`name` STRING
)WITH(...);
2.2 元数据列
元数据列 是 SQL 标准的扩展,允许访问数据源本身具有的一些元数据。元数据列由
METADATA
关键字标识。
例如,我们可以使用元数据列从 Kafka 数据中读取 Kafka 数据自带的时间戳(这个时间戳不是数据中的某个时间戳字段,而是数据写入 Kafka 时,Kafka 引擎给这条数据打上的时间戳标记),然后我们可以在 Flink SQL 中使用这个时间戳,比如进行基于时间的窗口操作。
CREATETABLE MyTable (`user_id`BIGINT,`name` STRING,-- 读取 kafka 本身自带的时间戳`record_time` TIMESTAMP_LTZ(3) METADATA FROM'timestamp')WITH('connector'='kafka'...);
元数据列可以用于后续数据的处理,或者写入到目标表中。
INSERTINTO MyTable
SELECT
user_id
, name
, record_time +INTERVAL'1'SECONDFROM MyTable;
如果自定义的列名称和 Connector 中定义
metadata
字段的名称一样的话,
FROM xxx
子句是可以被省略的。
CREATETABLE MyTable (`user_id`BIGINT,`name` STRING,-- 读取 kafka 本身自带的时间戳`timestamp` TIMESTAMP_LTZ(3) METADATA
)WITH('connector'='kafka'...);
关于 Flink SQL 的每种 Connector 都提供了哪些
metadata
字段,详细可见 官网文档。
如果自定义列的数据类型和 Connector 中定义的
metadata
字段的数据类型不一致的话,程序运行时会自动
cast
强转。但是这要求两种数据类型是可以强转的。
CREATETABLE MyTable (`user_id`BIGINT,`name` STRING,-- 将时间戳强转为 BIGINT`timestamp`BIGINT METADATA
)WITH('connector'='kafka'...);
默认情况下,Flink SQL Planner 认为
metadata
列是可以 读取 也可以 写入 的。但是有些外部存储系统的元数据信息是只能用于读取,不能写入的。
那么在往一个表写入的场景下,我们就可以使用
VIRTUAL
关键字来标识某个元数据列不写入到外部存储中(不持久化)。
CREATETABLE MyTable (-- sink 时会写入`timestamp`BIGINT METADATA,-- sink 时不写入`offset`BIGINT METADATA VIRTUAL,`user_id`BIGINT,`name` STRING,)WITH('connector'='kafka'...);
在上面这个案例中,Kafka 引擎的
offset
是只读的。所以我们在把
MyTable
作为数据源(输入)表时,Schema 中是包含
offset
的。在把
MyTable
作为数据汇(输出)表时,Schema 中是不包含
offset
的。如下:
所以这里在写入时需要注意,不要在 SQL 的
INSERT INTO
语句中写入
offset
列,否则 Flink SQL 任务会直接报错。
2.3 计算列
计算列 其实就是在写建表的 DDL 时,可以拿已有的一些列经过一些自定义的运算生成的新列。这些列本身是没有以物理形式存储到数据源中的。
CREATETABLE MyTable (`user_id`BIGINT,`price`DOUBLE,`quantity`DOUBLE,-- cost 就是使用 price 和 quanitity 生成的计算列,计算方式为 price * quanitity`cost`AS price * quanitity,)WITH('connector'='kafka'...);
计算列可以包含其他列、常量或者函数,但是不能写一个子查询进去。
小伙伴萌这时会问到一个问题,既然只能包含列、常量或者函数计算,我就直接在 DML Query 代码中写就完事了呗,为啥还要专门在 DDL 中定义呢?
没错,如果只是简单的四则运算的话直接写在 DML 中就可以,但是 计算列一般是用于定义时间属性的(因为在 SQL 任务中时间属性只能在 DDL 中定义,不能在 DML 语句中定义)。比如要把输入数据的时间格式标准化。处理时间、事件时间分别举例如下:
- 处理时间:使用
PROCTIME()
函数来定义处理时间列。 - 事件时间:事件时间的时间戳可以在声明 Watermark 之前进行预处理。比如,如果字段不是
TIMESTAMP(3)
类型或者时间戳是嵌套在 JSON 字符串中的,则可以使用计算列进行预处理。
❗注意:和虚拟
metadata
列是类似的,计算列也是只能读不能写的。
也就是说,我们在把 MyTable 作为数据源(输入)表时,Schema 中是包含
cost
的。
在把 MyTable 作为数据汇(输出)表时,Schema 中是不包含
cost
的。
-- 当做数据源(输入)的 schema
MyTable(`user_id`BIGINT,`price`DOUBLE,`quantity`DOUBLE,`cost`DOUBLE)-- 当做数据汇(输出)的 schema
MyTable(`user_id`BIGINT,`price`DOUBLE,`quantity`DOUBLE)
3.定义 Watermark
Watermark 是在 Create Table 中进行定义的。具体 SQL 语法标准是:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
rowtime_column_name
:表的事件时间属性字段。该列必须是TIMESTAMP(3)
、TIMESTAMP_LTZ(3)
类,这个时间可以是一个计算列。watermark_strategy_expression
:定义 Watermark 的生成策略。Watermark 的一般都是由rowtime_column_name
列减掉一段固定时间间隔。SQL 中 Watermark 的生产策略是:当前 Watermark 大于上次发出的 Watermark 时发出当前 Watermark。
注意:
- 如果你使用的是事件时间语义,那么必须要设设置事件时间属性和 WATERMARK 生成策略。
- Watermark 的发出频率:Watermark 发出一般是间隔一定时间的,Watermark 的发出间隔时间可以由
pipeline.auto-watermark-interval
进行配置,如果设置为 200ms 则每 200ms 会计算一次 Watermark,如果比之前发出的 Watermark 大,则发出。如果间隔设为 0ms,则 Watermark 只要满足触发条件就会发出,不会受到间隔时间控制。
Flink SQL 提供了几种 WATERMARK 生产策略:
- 有界无序:设置方式为
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit
。此类策略就可以用于设置最大乱序时间,假如设置为WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
,则生成的是运行 5s 延迟的 Watermark。一般都用这种 Watermark 生成策略,此类 Watermark 生成策略通常用于有数据乱序的场景中,而对应到实际的场景中,数据都是会存在乱序的,所以基本都使用此类策略。 - 严格升序:设置方式为
WATERMARK FOR rowtime_column AS rowtime_column
。一般基本不用这种方式。如果你能保证你的数据源的时间戳是严格升序的,那就可以使用这种方式。严格升序代表 Flink 任务认为时间戳只会越来越大,也不存在相等的情况,只要相等或者小于之前的,就认为是迟到的数据。 - 递增:设置方式为
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
。一般基本不用这种方式。如果设置此类,则允许有相同的时间戳出现。
4.Create Table With 子句
CREATETABLE KafkaTable (`user_id`BIGINT,`item_id`BIGINT,`behavior` STRING,`ts`TIMESTAMP(3) METADATA FROM'timestamp')WITH('connector'='kafka','topic'='user_behavior','properties.bootstrap.servers'='localhost:9092','properties.group.id'='testGroup','scan.startup.mode'='earliest-offset','format'='csv')
可以看到 DDL 中 With 子句就是在建表时,描述数据源、数据汇的具体外部存储的元数据信息的。
一般 With 中的配置项由 Flink SQL 的 Connector(链接外部存储的连接器) 来定义,每种 Connector 提供的 With 配置项都是不同的。
注意:
- Flink SQL 中 Connector 其实就是 Flink 用于链接外部数据源的接口。举一个类似的例子,在 Java 中想连接到 MySQL,需要使用
mysql-connector-java
包提供的 Java API 去链接。映射到 Flink SQL 中,在 Flink SQL 中要连接到 Kafka,需要使用kafka connector
。 - Flink SQL 已经提供了一系列的内置 Connector,具体可见:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/overview/。
回到上述案例中,With 声明了以下几项信息:
'connector' = 'kafka'
:声明外部存储是 Kafka。'topic' = 'user_behavior'
:声明 Flink SQL 任务要连接的 Kafka 表的topic
是user_behavior
。'properties.bootstrap.servers' = 'localhost:9092'
:声明 Kafka 的server ip
是localhost:9092
。'properties.group.id' = 'testGroup'
:声明 Flink SQL 任务消费这个 Kafka topic,会使用testGroup
的group id
去消费。'scan.startup.mode' = 'earliest-offset'
:声明 Flink SQL 任务消费这个 Kafka topic 会从最早位点开始消费。'format' = 'csv'
:声明 Flink SQL 任务读入或者写出时对于 Kafka 消息的序列化方式是csv
格式。
从这里也可以看出来 With 中具体要配置哪些配置项都是和每种 Connector 决定的。
5.Create Table Like 子句
Like 子句是 Create Table 子句的一个延伸。
下面定义了一张
Orders
表:
CREATETABLE Orders (`user`BIGINT,
product STRING,
order_time TIMESTAMP(3))WITH('connector'='kafka','scan.startup.mode'='earliest-offset');
但是忘记定义 Watermark 了,那如果想加上 Watermark,就可以用 Like 子句定义一张带 Watermark 的新表:
CREATETABLE Orders_with_watermark (-- 1. 添加了 WATERMARK 定义
WATERMARK FOR order_time AS order_time -INTERVAL'5'SECOND)WITH(-- 2. 覆盖了原 Orders 表中 scan.startup.mode 参数'scan.startup.mode'='latest-offset')-- 3. Like 子句声明是在原来的 Orders 表的基础上定义 Orders_with_watermark 表LIKE Orders;
上面这个语句的效果就等同于:
CREATETABLE Orders_with_watermark (`user`BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time -INTERVAL'5'SECOND)WITH('connector'='kafka','scan.startup.mode'='latest-offset');
不过这种不常使用。就不过多介绍了。如果小伙伴萌感兴趣,直接去 官网 参考具体注意事项。
版权归原作者 G皮T 所有, 如有侵权,请联系我们删除。