flink sql创建表语法
CREATETABLE[IFNOTEXISTS][catalog_name.][db_name.]table_name
(
{ <physical_column_definition>|<metadata_column_definition>|<computed_column_definition> }[,...n][<watermark_definition>][<table_constraint>][,...n])[COMMENT table_comment][PARTITIONED BY(partition_column_name1, partition_column_name2,...)]WITH(key1=val1, key2=val2,...)[LIKE source_table [(<like_options>)]]<physical_column_definition>:
column_name column_type [<column_constraint>][COMMENT column_comment]<column_constraint>:
[CONSTRAINT constraint_name]PRIMARYKEYNOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name]PRIMARYKEY(column_name,...)NOT ENFORCED
<metadata_column_definition>:
column_name column_type METADATA [FROM metadata_key ][ VIRTUAL ]<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<source_table>:
[catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL| CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[,...]
示例:
createtable kafka_t_trade_order_area_service (
orderId string,
orderStatus string,
orderDate string,
totalCashAmt decimal(16,2),
totalTicketAmt decimal(16,2),
orderamt as totalCashAmt + totalTicketAmt,-- 计算列
tmCreate string,
tmSmp timestamp(3),`offset`bigint metadata from'offset',-- 元数据列`timestamp` TIMESTAMP_LTZ(3) METADATA -- 元数据列,如果列名和元数据名一致,可以省略from-- `timestamp` bigint METADATA -- 元数据列,可以设置其他数据类型,会强制数据类型转换
proc_time as PROCTIME(),-- 计算列
watermark for tmSmp as tmSmp -interval'20'minute-- watermark列)with('connector'='kafka','topic'='test_topic','properties.bootstrap.servers'='master.fuyun:9092','properties.group.id'='test_fuyun','format'='json','scan.startup.mode'='timestamp','scan.startup.timestamp-millis'='1679328000000'-- 'scan.startup.mode' = 'latest-offset');
physical_column_definition:
物理列是数据库中已知的常规列。它们定义物理数据中字段的名称、类型和顺序。因此,物理列表示从外部系统读取和写入的有效负载。
可以在物理列之间声明其他类型的列,但不会影响最终的物理架构。
metadata_column_definition:
元数据列是SQL标准的扩展,允许访问连接器或格式化表的每一行的特定字段。元数据列由元数据关键字指示。
例如,元数据列可用于从 Kafka 记录读取和写入时间戳,以便进行基于时间的操作。
连接器和格式文档列出了每个组件的可用元数据字段。元数据列是可选的。
computed_column_definition:
计算列计算可引用同一表中声明的其他列的表达式。可以访问物理列和元数据列。列本身不以物理方式存储在表中。列的数据类型是从给定表达式自动派生的,不必手动声明。
计划器将在源之后将计算列转换为常规投影。对于优化或水印策略下推,评估可能会分布在运算符之间、多次执行或在给定查询不需要时跳过。
表达式可以包含列、常量或函数的任意组合。表达式不能包含子查询。
watermark_definition:
WATERMARK 定义了表的事件时间属性,其形式为
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
。
rowtime_column_name
把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark_strategy_expression
定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark ;表达式的返回类型必须是 TIMESTAMP(3),表示了从 Epoch 以来的经过的时间。 返回watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出(以保证 watermark 递增)。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark ,如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回的 watermark 的值小于最后一个发出的 watermark ,则新的 watermark 不会被发出。 Watermark 根据
pipeline.auto-watermark-interval
中所配置的间隔发出。 若 watermark 的间隔是 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。
使用事件时间语义时,表必须包含事件时间属性和 watermark 策略。
Flink 提供了几种常用的 watermark 策略。
- 严格递增时间戳:
WATERMARK FOR rowtime_column AS rowtime_column
。- 发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到。 - 递增时间戳:
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND
。- 发出到目前为止已观察到的最大时间戳减 1 的 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。 - 有界乱序时间戳:
WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit
。- 发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark ,例如,WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '5' SECOND
是一个 5 秒延迟的 watermark 策略。
PRIMARY KEY:
主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。
主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。\
- 有效性检查- SQL 标准主键限制可以有两种模式:
ENFORCED
或者NOT ENFORCED
。 它申明了是否输入/出数据会做合法性检查(是否唯一)。Flink 不存储数据因此只支持NOT ENFORCED
模式,即不做检查,用户需要自己保证唯一性。- Flink 假设声明了主键的列都是不包含 Null 值的,Connector 在处理数据时需要自己保证语义正确。
注意: 在
CREATE TABLE
语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。
PARTITIONED BY
根据指定的列对已经创建的表进行分区。若表使用
filesystem sink
,则将会为每个分区创建一个目录。
WITH Options
表属性用于创建 table source/sink ,一般用于寻找和创建底层的连接器。
表达式
key1=val1
的键和值必须为字符串文本常量。请参考 连接外部系统 了解不同连接器所支持的属性。
注意: 表名可以为以下三种格式 1.
catalog_name.db_name.table_name
2.
db_name.table_name
3.
table_name
。
使用
catalog_name.db_name.table_name
的表将会与名为 “catalog_name” 的 catalog 和名为 “db_name” 的数据库一起注册到 metastore 中。使用
db_name.table_name
的表将会被注册到当前执行的
table environment
中的 catalog 且数据库会被命名为 “db_name”;对于 table_name, 数据表将会被注册到当前正在运行的catalog和数据库中。
注意: 使用 CREATE TABLE 语句注册的表均可用作 table source 和 table sink。 在被 DML 语句引用前,我们无法决定其实际用于 source 抑或是 sink。
LIKE
LIKE 子句来源于两种 SQL 特性的变体/组合(Feature T171,“表定义中的 LIKE 语法” 和 Feature T173,“表定义中的 LIKE 语法扩展”)。LIKE 子句可以基于现有表的定义去创建新表,并且可以扩展或排除原始表中的某些部分。与 SQL 标准相反,LIKE 子句必须在 CREATE 语句中定义,并且是基于 CREATE 语句的更上层定义,这是因为 LIKE 子句可以用于定义表的多个部分,而不仅仅是 schema 部分。
你可以使用该子句,重用(或改写)指定的连接器配置属性或者可以向外部表添加 watermark 定义,例如可以向 Apache Hive 中定义的表添加 watermark 定义。
示例如下:
CREATETABLE Orders (`user`BIGINT,
product STRING,
order_time TIMESTAMP(3))WITH('connector'='kafka','scan.startup.mode'='earliest-offset');CREATETABLE Orders_with_watermark (-- 添加 watermark 定义
WATERMARK FOR order_time AS order_time -INTERVAL'5'SECOND)WITH(-- 改写 startup-mode 属性'scan.startup.mode'='latest-offset')LIKE Orders;
结果表 Orders_with_watermark 等效于使用以下语句创建的表:
CREATETABLE Orders_with_watermark (`user`BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time -INTERVAL'5'SECOND)WITH('connector'='kafka','scan.startup.mode'='latest-offset');
表属性的合并逻辑可以用 like options 来控制。
可以控制合并的表属性如下:
- CONSTRAINTS - 主键和唯一键约束
- GENERATED - 计算列
- OPTIONS - 连接器信息、格式化方式等配置项
- PARTITIONS - 表分区信息
- WATERMARKS - watermark 定义
并且有三种不同的表属性合并策略:
- INCLUDING - 新表包含源表(source table)所有的表属性,如果和源表的表属性重复则会直接失败,例如新表和源表存在相同 key 的属性。
- EXCLUDING - 新表不包含源表指定的任何表属性。
- OVERWRITING - 新表包含源表的表属性,但如果出现重复项,则会用新表的表属性覆盖源表中的重复表属性,例如,两个表中都存在相同 key 的属性,则会使用当前语句中定义的 key 的属性值。
并且你可以使用
INCLUDING/EXCLUDING ALL
这种声明方式来指定使用怎样的合并策略,例如使用
EXCLUDING ALL INCLUDING WATERMARKS
,那么代表只有源表的 WATERMARKS 属性才会被包含进新表。
示例如下:
-- 存储在文件系统的源表CREATETABLE Orders_in_file (`user`BIGINT,
product STRING,
order_time_string STRING,
order_time AS to_timestamp(order_time))
PARTITIONED BY(`user`)WITH('connector'='filesystem','path'='...');-- 对应存储在 kafka 的源表CREATETABLE Orders_in_kafka (-- 添加 watermark 定义
WATERMARK FOR order_time AS order_time -INTERVAL'5'SECOND)WITH('connector'='kafka',...)LIKE Orders_in_file (-- 排除需要生成 watermark 的计算列之外的所有内容。-- 去除不适用于 kafka 的所有分区和文件系统的相关属性。
EXCLUDING ALL
INCLUDING GENERATED
);
如果未提供 like 配置项(
like options
),默认将使用
INCLUDING ALL OVERWRITING
OPTIONS 的合并策略。
注意: 您无法选择物理列的合并策略,当物理列进行合并时就如使用了 INCLUDING 策略。
注意: 源表 source_table 可以是一个组合 ID。您可以指定不同 catalog 或者 DB 的表作为源表: 例如,
my_catalog.my_db.MyTable
指定了源表 MyTable 来源于名为 MyCatalog 的 catalog 和名为 my_db 的 DB ,
my_db.MyTable
指定了源表 MyTable 来源于当前 catalog 和名为 my_db 的 DB。
版权归原作者 浮云6363 所有, 如有侵权,请联系我们删除。