0


大数据Flink(一百一十五):Flink SQL的基本概念

Flink SQL的基本概念

一、​​​​​​​​​​​​​​SQL 中表的概念

一个表的全名(标识)会由三个部分组成:Catalog 名称.数据库名称.表名称

如果 Catalog 名称或者数据库名称没有指明,就会使用当前默认值 default

举个例子,下面这个 SQL 创建的 Table 的全名为 default.default.table1

CREATE TABLE table1 ... WITH ( 'connector' = ... );

下面这个 SQL 创建的 Table 的全名为 default.mydatabase.table1

CREATE TABLE mydatabase.table1 ... WITH ( 'connector' = ... );

Flink中表指的是外部表 TABLE:描述的是外部数据,例如文件(HDFS)、消息队列(Kafka)等。依然拿离线 Hive SQL 举个例子,离线中一个表指的是 Hive 表,也就是所说的外部数据。

注意:这里有不同的地方就是,离线 Hive MetaStore 中不会有 Catalog 这个概念,其标识是数据库.数据表。

另外,在阿里云Flink全托管中,由于内置的vvp catalog是默认的catalog,所以创建表时,如果没有指定catalog,默认使用vvp catalog,可实现元数据持久化。

进入阿里云Flink开发平台,点击SQL开发,在快速入门文件夹下新建文件夹:SQL基础。

在此文件夹下,新建空白的流作业草稿test,引擎版本选择vvr-6.0.7-flink-1.15。

输入下面SQL代码,选中代码后点击运行。

CREATE TABLE table1(
    id     INT
    ,score INT
)WITH (
    'connector' = 'datagen'
    ,'fields.id.kind' = 'sequence'
    ,'fields.id.start' = '1'
    ,'fields.id.end' = '5'
    ,'fields.score.kind' = 'random'
    ,'fields.score.min' = '70'
    ,'fields.score.max' = '100'
);

创建后,点击元数据,可以在vvp下看到table1表,全称即vvp.default.table1。

二、​​​​​​​​​​​​​​SQL 临时表、永久表

表可以是临时的,并与单个 Flink session(可以理解为 Flink 任务运行一次就是一个 session)的生命周期绑定。

表也可以是永久的,并且对多个 Flink session 都生效。

  • 临时表:通常保存于内存中并且仅在创建它们的 Flink session(可以理解为一次 Flink 任务的运行)持续期间存在。这些表对于其它 session(即其他 Flink 任务或非此次运行的 Flink 任务)是不可见的。因为这个表的元数据没有被持久化。如下案例:

在test作业中运行以下代码进行建表

CREATE TEMPORARY TABLE table2(
    id     INT
    ,score INT
)WITH (
    'connector' = 'datagen'
    ,'fields.id.kind' = 'sequence'
    ,'fields.id.start' = '1'
    ,'fields.id.end' = '5'
    ,'fields.score.kind' = 'random'
    ,'fields.score.min' = '70'
    ,'fields.score.max' = '100'
);

建表后,vvp中并没有看到它的表信息。

查询表数据,选中下面代码,点击调试,选择之前创建的session

select * from table2;

出现报错,原因是表不存在。(验证了临时表对于其它任务是不可见的)

同时选中建表语句和查询语句,点击调试。

结果如下

可以查到表的数据(验证了临时表对于本次任务是可见的)。

  • 永久表:需要外部 Catalog(阿里云Flink默认内置了vvp catalog)来持久化表的元数据。一旦永久表被创建,它将对任何连接到这个 Catalog 的 Flink session 可见且持续存在,直至从 Catalog 中被明确删除。如下案例:

在test作业中运行以下代码进行建表

CREATE TABLE table2(
    id     INT
    ,score INT
)WITH (
    'connector' = 'datagen'
    ,'fields.id.kind' = 'sequence'
    ,'fields.id.start' = '6'
    ,'fields.id.end' = '10'
    ,'fields.score.kind' = 'random'
    ,'fields.score.min' = '70'
    ,'fields.score.max' = '100'
);

在元数据中可以看到表信息。

查询表table2,点击调试,如下:

可以查到结果(验证了它对任何连接到这个 Catalog 的 任务可见且持续存在)

  • 如果临时表和永久表使用了相同的名称(Catalog名.数据库名.表名)。那么在这个 Flink session 中,你的任务访问到这个表时,访问到的永远是临时表(即相同名称的表,临时表会屏蔽永久表)。可以自行验证。

三、​​​​​​​​​​​​​​SQL表类型的定义

阿里云实时计算Flink版通过Flink SQL定义表对上下游存储进行映射或者使用Datastream API进行连接来实现读写。支持以下几种Flink SQL表类型的定义:

  • 源表(Source Table)- Flink作业的数据输入表,是计算的驱动来源。- 不能作为维表,必须作为驱动表来推进后续计算。产生的记录决定了计算链的触发。- 通常是需要进行转换计算的大规模业务数据,量级可以达到千万级甚至亿级别。- 以流式数据的形式输入,表示连续不断的新数据,可以来自消息队列、数据库变更日志等。- 包含需要Join和关联的关键字段,如用户ID、订单ID等业务主键。
  • 维表(Dimension Table)- 辅助表,用于丰富和扩展源表的数据。- 不能作为主驱动表,只能辅助补充源表。维表本身不驱动计算。- 数据规模通常较小,可以是静态表也可以是低吞吐的流表,数据量级可能在GB到TB级别。- 提供对业务数据的额外补充信息,如用户姓名、产品详情、区域信息等。- 通过与源表进行Join连接,可以丰富源表的信息,形成更加详细的宽表。
  • 结果表(Result Table)- Flink作业输出的结果数据表。- 存储着经过计算转换后的最终结果数据,如聚合结果、过滤后的数据等。- 可以输出到数据库、消息队列、文件等外部系统,用于后续的分析。- 是整个作业处理链的最终产出和输出,存储了计算的输出。
  • 例如,有如下源表和维表:- 源表:订单数据表,包含用户ID、订单ID、订单金额等信息。- 维表:用户信息表,包含用户ID、用户名、地址等信息。

作业首先从订单数据源表读取实时订单数据,将订单数据流与用户信息静态维表进行Join,然后按地区聚合统计订单总额,最后将统计结果写入结果表。

在这个作业中,订单表作为驱动源表输入,用户信息表作为静态维表,统计结果表作为作业最终输出。订单表不能作为维表,必须作为驱动表输入数据;而用户信息表不能作为驱动表,只能作为辅助维表补充订单数据。

四、​​​​​​​​​​​​​​常见的连接器

  • 我们在生产和测试中经常用到的连接器有:kafka、mysql、hologres、upsert-kafka、print、blackhole、jdbc、starrocks、paimon等。详细介绍可以点击下方表格中连接器的超链接进入阿里云官网查看。
  • 阿里云Flink全托管支持的连接器以及特性如下

连接器

支持类型

运行模式

API类型

是否支持更新或删除结果表数据

源表

维表

结果表

消息队列Kafka

×

流模式

SQL和DataStream

不支持更新和删除结果表数据,只支持插入数据。

实时数仓Hologres

流模式和批模式

SQL和DataStream

日志服务SLS

×

流模式

SQL

不支持更新和删除结果表数据,只支持插入数据。

MySQL

流模式

SQL和DataStream

云数据库RDS MySQL版

×

流模式和批模式

SQL

大数据计算服务MaxCompute

流模式和批模式

SQL和DataStream

不支持更新和删除结果表数据,只支持插入数据。

数据总线DataHub

×

流模式和批模式

SQL和DataStream

不支持更新和删除结果表数据,只支持插入数据。

云数据库Redis

×

流模式

SQL

Upsert Kafka

×

流模式和批模式

SQL

Elasticsearch

流模式和批模式

SQL和DataStream

云原生数据仓库 AnalyticDB MySQL版 3.0

×

流模式和批模式

SQL

ClickHouse

×

×

流模式和批模式

SQL

Hudi

×

流模式和批模式

SQL和DataStream

Print

×

×

流模式和批模式

SQL

Blackhole

×

×

流模式和批模式

SQL

云数据库HBase

×

流模式

SQL

Datagen

×

×

流模式和批模式

SQL

不涉及

消息队列RocketMQ

×

流模式

SQL和DataStream

不支持更新和删除结果表数据,只支持插入数据。

表格存储Tablestore(OTS)

流模式

SQL

JDBC

流模式和批模式

SQL

云数据库MongoDB

×

×

流模式

SQL

StarRocks

×

流模式和批模式

SQL和DataStream

Postgres CDC(公测中)

×

×

流模式

SQL

不涉及

云原生数据仓库AnalyticDB PostgreSQL版

×

流模式和批模式

SQL

云原生多模数据库Lindorm

×

流模式

SQL

对象存储OSS

×

流模式和批模式

SQL和DataStream

不支持更新和删除结果表数据,只支持插入数据。

模拟数据生成Faker

×

流模式和批模式

SQL

不涉及

Iceberg

×

流模式和批模式

SQL

InfluxDB

×

×

流模式

SQL

流式数据湖仓Paimon

流模式和批模式

SQL

Hudi

×

流模式和批模式

SQL和DataStream

云原生内存数据库Tair

×

×

流模式

SQL

MongoDB CDC(公测中)

×

×

流模式

SQL和DataStream

不涉及

OceanBase(公测中)

×

流模式和批模式

SQL

五、​​​​​​SQL数据视图

当业务逻辑比较复杂时,需要将多层嵌套写在DML语句中,但是这种方式定位问题比较困难。此时,我们可以通过定义数据视图的方式,将多层嵌套写在数据视图中,简化开发过程。

说明:数据视图仅用于辅助计算逻辑的描述,不会产生数据的物理存储。

1、​​​​​​语法

CREATE TEMPORARY VIEW viewName 
AS  [ (columnName[ , columnName]* ) ] 
queryStatement;
  • viewName:视图名称。
  • columnName:字段名称。
  • queryStatement:嵌套语句别名。

2、​​​​​​​​​​​​​​示例

下面代码是数据视图的一个使用案例。

--源表
CREATE TEMPORARY TABLE datagen_source (
 name VARCHAR,
 score BIGINT
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='1'
);

--结果表
CREATE TEMPORARY TABLE rds_output (
 name VARCHAR,
 score BIGINT
) WITH (
 'connector' = 'print'
);

--数据视图
CREATE TEMPORARY VIEW tmp_view AS 
SELECT 
 * 
FROM 
datagen_source;

--DML
INSERT INTO
rds_output
SELECT
name,
score
FROM
tmp_view;
  • 可以部署任务进行体验。

在SQL基础文件夹下新建data_view空白流作业,引擎与之前一致。将代码拷贝进去,点击右上角部署,点击确定。然后在作业运维界面,启动任务。

任务状态变成运行中后,点击任务进入任务详情,点击作业探查,点击运行日志下的Task Managers,点击Path,ID实例。

点击Stout,可以看到打印出的测试数据。


  • 📢博客主页:https://lansonli.blog.csdn.net
  • 📢欢迎点赞 👍 收藏 ⭐留言 📝 如有错误敬请指正!
  • 📢本文由 Lansonli 原创,首发于 CSDN博客🙉
  • 📢停下休息的时候不要忘了别人还在奔跑,希望大家抓紧时间学习,全力奔赴更美好的生活✨
标签: 大数据 flink

本文转载自: https://blog.csdn.net/xiaoweite1/article/details/142023751
版权归原作者 Lansonli 所有, 如有侵权,请联系我们删除。

“大数据Flink(一百一十五):Flink SQL的基本概念”的评论:

还没有评论