Flink SQL的基本概念
一、SQL 中表的概念
一个表的全名(标识)会由三个部分组成:Catalog 名称.数据库名称.表名称。
如果 Catalog 名称或者数据库名称没有指明,就会使用当前默认值 default。
举个例子,下面这个 SQL 创建的 Table 的全名为 default.default.table1。
CREATE TABLE table1 ... WITH ( 'connector' = ... );
下面这个 SQL 创建的 Table 的全名为 default.mydatabase.table1。
CREATE TABLE mydatabase.table1 ... WITH ( 'connector' = ... );
Flink中表指的是外部表 TABLE:描述的是外部数据,例如文件(HDFS)、消息队列(Kafka)等。依然拿离线 Hive SQL 举个例子,离线中一个表指的是 Hive 表,也就是所说的外部数据。
注意:这里有不同的地方就是,离线 Hive MetaStore 中不会有 Catalog 这个概念,其标识是数据库.数据表。
另外,在阿里云Flink全托管中,由于内置的vvp catalog是默认的catalog,所以创建表时,如果没有指定catalog,默认使用vvp catalog,可实现元数据持久化。
进入阿里云Flink开发平台,点击SQL开发,在快速入门文件夹下新建文件夹:SQL基础。
在此文件夹下,新建空白的流作业草稿test,引擎版本选择vvr-6.0.7-flink-1.15。
输入下面SQL代码,选中代码后点击运行。
CREATE TABLE table1(
id INT
,score INT
)WITH (
'connector' = 'datagen'
,'fields.id.kind' = 'sequence'
,'fields.id.start' = '1'
,'fields.id.end' = '5'
,'fields.score.kind' = 'random'
,'fields.score.min' = '70'
,'fields.score.max' = '100'
);
创建后,点击元数据,可以在vvp下看到table1表,全称即vvp.default.table1。
二、SQL 临时表、永久表
表可以是临时的,并与单个 Flink session(可以理解为 Flink 任务运行一次就是一个 session)的生命周期绑定。
表也可以是永久的,并且对多个 Flink session 都生效。
- 临时表:通常保存于内存中并且仅在创建它们的 Flink session(可以理解为一次 Flink 任务的运行)持续期间存在。这些表对于其它 session(即其他 Flink 任务或非此次运行的 Flink 任务)是不可见的。因为这个表的元数据没有被持久化。如下案例:
在test作业中运行以下代码进行建表
CREATE TEMPORARY TABLE table2(
id INT
,score INT
)WITH (
'connector' = 'datagen'
,'fields.id.kind' = 'sequence'
,'fields.id.start' = '1'
,'fields.id.end' = '5'
,'fields.score.kind' = 'random'
,'fields.score.min' = '70'
,'fields.score.max' = '100'
);
建表后,vvp中并没有看到它的表信息。
查询表数据,选中下面代码,点击调试,选择之前创建的session
select * from table2;
出现报错,原因是表不存在。(验证了临时表对于其它任务是不可见的)
同时选中建表语句和查询语句,点击调试。
结果如下
可以查到表的数据(验证了临时表对于本次任务是可见的)。
- 永久表:需要外部 Catalog(阿里云Flink默认内置了vvp catalog)来持久化表的元数据。一旦永久表被创建,它将对任何连接到这个 Catalog 的 Flink session 可见且持续存在,直至从 Catalog 中被明确删除。如下案例:
在test作业中运行以下代码进行建表
CREATE TABLE table2(
id INT
,score INT
)WITH (
'connector' = 'datagen'
,'fields.id.kind' = 'sequence'
,'fields.id.start' = '6'
,'fields.id.end' = '10'
,'fields.score.kind' = 'random'
,'fields.score.min' = '70'
,'fields.score.max' = '100'
);
在元数据中可以看到表信息。
查询表table2,点击调试,如下:
可以查到结果(验证了它对任何连接到这个 Catalog 的 任务可见且持续存在)
- 如果临时表和永久表使用了相同的名称(Catalog名.数据库名.表名)。那么在这个 Flink session 中,你的任务访问到这个表时,访问到的永远是临时表(即相同名称的表,临时表会屏蔽永久表)。可以自行验证。
三、SQL表类型的定义
阿里云实时计算Flink版通过Flink SQL定义表对上下游存储进行映射或者使用Datastream API进行连接来实现读写。支持以下几种Flink SQL表类型的定义:
- 源表(Source Table)- Flink作业的数据输入表,是计算的驱动来源。- 不能作为维表,必须作为驱动表来推进后续计算。产生的记录决定了计算链的触发。- 通常是需要进行转换计算的大规模业务数据,量级可以达到千万级甚至亿级别。- 以流式数据的形式输入,表示连续不断的新数据,可以来自消息队列、数据库变更日志等。- 包含需要Join和关联的关键字段,如用户ID、订单ID等业务主键。
- 维表(Dimension Table)- 辅助表,用于丰富和扩展源表的数据。- 不能作为主驱动表,只能辅助补充源表。维表本身不驱动计算。- 数据规模通常较小,可以是静态表也可以是低吞吐的流表,数据量级可能在GB到TB级别。- 提供对业务数据的额外补充信息,如用户姓名、产品详情、区域信息等。- 通过与源表进行Join连接,可以丰富源表的信息,形成更加详细的宽表。
- 结果表(Result Table)- Flink作业输出的结果数据表。- 存储着经过计算转换后的最终结果数据,如聚合结果、过滤后的数据等。- 可以输出到数据库、消息队列、文件等外部系统,用于后续的分析。- 是整个作业处理链的最终产出和输出,存储了计算的输出。
- 例如,有如下源表和维表:- 源表:订单数据表,包含用户ID、订单ID、订单金额等信息。- 维表:用户信息表,包含用户ID、用户名、地址等信息。
作业首先从订单数据源表读取实时订单数据,将订单数据流与用户信息静态维表进行Join,然后按地区聚合统计订单总额,最后将统计结果写入结果表。
在这个作业中,订单表作为驱动源表输入,用户信息表作为静态维表,统计结果表作为作业最终输出。订单表不能作为维表,必须作为驱动表输入数据;而用户信息表不能作为驱动表,只能作为辅助维表补充订单数据。
四、常见的连接器
- 我们在生产和测试中经常用到的连接器有:kafka、mysql、hologres、upsert-kafka、print、blackhole、jdbc、starrocks、paimon等。详细介绍可以点击下方表格中连接器的超链接进入阿里云官网查看。
- 阿里云Flink全托管支持的连接器以及特性如下
连接器
支持类型
运行模式
API类型
是否支持更新或删除结果表数据
源表
维表
结果表
消息队列Kafka
√
×
√
流模式
SQL和DataStream
不支持更新和删除结果表数据,只支持插入数据。
实时数仓Hologres
√
√
√
流模式和批模式
SQL和DataStream
是
日志服务SLS
√
×
√
流模式
SQL
不支持更新和删除结果表数据,只支持插入数据。
MySQL
√
√
√
流模式
SQL和DataStream
是
云数据库RDS MySQL版
×
√
√
流模式和批模式
SQL
是
大数据计算服务MaxCompute
√
√
√
流模式和批模式
SQL和DataStream
不支持更新和删除结果表数据,只支持插入数据。
数据总线DataHub
√
×
√
流模式和批模式
SQL和DataStream
不支持更新和删除结果表数据,只支持插入数据。
云数据库Redis
×
√
√
流模式
SQL
是
Upsert Kafka
√
×
√
流模式和批模式
SQL
是
Elasticsearch
√
√
√
流模式和批模式
SQL和DataStream
是
云原生数据仓库 AnalyticDB MySQL版 3.0
×
√
√
流模式和批模式
SQL
是
ClickHouse
×
×
√
流模式和批模式
SQL
是
Hudi
√
×
√
流模式和批模式
SQL和DataStream
是
×
×
√
流模式和批模式
SQL
是
Blackhole
×
×
√
流模式和批模式
SQL
是
云数据库HBase
×
√
√
流模式
SQL
是
Datagen
√
×
×
流模式和批模式
SQL
不涉及
消息队列RocketMQ
√
×
√
流模式
SQL和DataStream
不支持更新和删除结果表数据,只支持插入数据。
表格存储Tablestore(OTS)
√
√
√
流模式
SQL
是
JDBC
√
√
√
流模式和批模式
SQL
是
云数据库MongoDB
×
×
√
流模式
SQL
是
StarRocks
√
×
√
流模式和批模式
SQL和DataStream
是
Postgres CDC(公测中)
√
×
×
流模式
SQL
不涉及
云原生数据仓库AnalyticDB PostgreSQL版
×
√
√
流模式和批模式
SQL
是
云原生多模数据库Lindorm
×
√
√
流模式
SQL
是
对象存储OSS
√
×
√
流模式和批模式
SQL和DataStream
不支持更新和删除结果表数据,只支持插入数据。
模拟数据生成Faker
√
√
×
流模式和批模式
SQL
不涉及
Iceberg
√
×
√
流模式和批模式
SQL
是
InfluxDB
×
×
√
流模式
SQL
否
流式数据湖仓Paimon
√
√
√
流模式和批模式
SQL
是
Hudi
√
×
√
流模式和批模式
SQL和DataStream
是
云原生内存数据库Tair
×
×
√
流模式
SQL
是
MongoDB CDC(公测中)
√
×
×
流模式
SQL和DataStream
不涉及
OceanBase(公测中)
×
√
√
流模式和批模式
SQL
是
五、SQL数据视图
当业务逻辑比较复杂时,需要将多层嵌套写在DML语句中,但是这种方式定位问题比较困难。此时,我们可以通过定义数据视图的方式,将多层嵌套写在数据视图中,简化开发过程。
说明:数据视图仅用于辅助计算逻辑的描述,不会产生数据的物理存储。
1、语法
CREATE TEMPORARY VIEW viewName
AS [ (columnName[ , columnName]* ) ]
queryStatement;
- viewName:视图名称。
- columnName:字段名称。
- queryStatement:嵌套语句别名。
2、示例
下面代码是数据视图的一个使用案例。
--源表
CREATE TEMPORARY TABLE datagen_source (
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'datagen',
'rows-per-second'='1'
);
--结果表
CREATE TEMPORARY TABLE rds_output (
name VARCHAR,
score BIGINT
) WITH (
'connector' = 'print'
);
--数据视图
CREATE TEMPORARY VIEW tmp_view AS
SELECT
*
FROM
datagen_source;
--DML
INSERT INTO
rds_output
SELECT
name,
score
FROM
tmp_view;
- 可以部署任务进行体验。
在SQL基础文件夹下新建data_view空白流作业,引擎与之前一致。将代码拷贝进去,点击右上角部署,点击确定。然后在作业运维界面,启动任务。
任务状态变成运行中后,点击任务进入任务详情,点击作业探查,点击运行日志下的Task Managers,点击Path,ID实例。
点击Stout,可以看到打印出的测试数据。
- 📢博客主页:https://lansonli.blog.csdn.net
- 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
- 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
- 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。