0


【flink-sql实战】flink 主键声明与upsert功能实战

文章目录

一. flink 主键声明语法

主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。

主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。

有效性检查

SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED。 它申明了是否输入/出数据会做合法性检查(是否唯一)。

Flink 不存储数据因此只支持 NOT ENFORCED 模式,即不做检查,用户需要自己保证唯一性。

注意: 在 CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。

sql声明语法:

CREATETABLE[IFNOTEXISTS][catalog_name.][db_name.]table_name
  (
    { <physical_column_definition>|<metadata_column_definition>|<computed_column_definition> }[,...n][<watermark_definition>][<table_constraint>][,...n])...<column_constraint>:
  [CONSTRAINT constraint_name]PRIMARYKEYNOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name]PRIMARYKEY(column_name,...)NOT ENFORCED
...

联合主键声明

createtable t_sink_01 ( 
f1 varchar, 
f2 varchar,
f3 int,
f4 timestamp(3),
f5 varchar,primarykey(f1,f2)NOT ENFORCED  -- 主键声明,字段之间逗号分隔)with(..);

二. 物理表创建联合主键表

CREATETABLE test003(
     id INT(10),
     name VARCHAR(25),
     age int(10),PRIMARYKEY(id,name));desc test003

Field|Type|Null|Key|Default|Extra|-----+-----------+----+---+-------+-----+
id   |int|NO|PRI|||
name |varchar(25)|NO|PRI|||
age  |int|YES ||||

三. flink sql使用

CREATETABLE source
(`id`int,`username`varchar,`age`int)WITH('connector'='binlog-x','username'='root','password'='11111111','cat'='insert,delete,update','url'='jdbc:mysql://10.17.31.234:3306/360test','host'='10.17.31.234','port'='3306'-- 什么都不加:最新位置消费-- 加文件名,从此文件开头消费,'journal-name'='binlog.000194'--  ,'timestamp'='169944781200','table'='360test.dimension_table','timestamp-format.standard'='SQL');CREATETABLE sink
(`id`int,`name`varchar,`age`int,PRIMARYKEY(id,name)NOT ENFORCED
)WITH('connector'='mysql-x','url'='jdbc:mysql://localhost:3306/360test','table-name'='test003','username'='root','password'='11111111','sink.buffer-flush.max-rows'='1024',-- 批量写数据条数,默认:1024'sink.buffer-flush.interval'='10000',-- 批量写时间间隔,默认:10000毫秒-- insert时的选项,覆盖或者忽略。-- 声明了主键时,设置all-replace为true,全部更新覆盖,-- 或者是忽略,即来的新数据不插入?'sink.all-replace'='true',-- 解释如下(其他rdb数据库类似):默认:false。定义了PRIMARY KEY才有效,否则是追加语句-- sink.all-replace = 'true' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=VALUES(`mid`), `mbb`=VALUES(`mbb`), `sid`=VALUES(`sid`), `sbb`=VALUES(`sbb`) 。会将所有的数据都替换。-- sink.all-replace = 'false' 生成如:INSERT INTO `result3`(`mid`, `mbb`, `sid`, `sbb`) VALUES (?, ?, ?, ?) ON DUPLICATE KEY UPDATE `mid`=IFNULL(VALUES(`mid`),`mid`), `mbb`=IFNULL(VALUES(`mbb`),`mbb`), `sid`=IFNULL(VALUES(`sid`),`sid`), `sbb`=IFNULL(VALUES(`sbb`),`sbb`) 。如果新值为null,数据库中的旧值不为null,则不会覆盖。-- 新增写入选项:默认会判断,当声明了key则是update'sink.parallelism'='1'-- 写入结果的并行度,默认:null);insertinto sink select id,username as name,age as age  from source;
标签: sql flink 数据库

本文转载自: https://blog.csdn.net/hiliang521/article/details/134327641
版权归原作者 roman_日积跬步-终至千里 所有, 如有侵权,请联系我们删除。

“【flink-sql实战】flink 主键声明与upsert功能实战”的评论:

还没有评论