0


【大数据】Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

Flink SQL 语法篇》系列,共包含以下 10 篇文章:

  • Flink SQL 语法篇(一):CREATE
  • Flink SQL 语法篇(二):WITH、SELECT & WHERE、SELECT DISTINCT
  • Flink SQL 语法篇(三):窗口聚合(TUMBLE、HOP、SESSION、CUMULATE)
  • Flink SQL 语法篇(四):Group 聚合、Over 聚合
  • Flink SQL 语法篇(五):Regular Join、Interval Join
  • Flink SQL 语法篇(六):Temporal Join
  • Flink SQL 语法篇(七):Lookup Join、Array Expansion、Table Function
  • Flink SQL 语法篇(八):集合、Order By、Limit、TopN
  • Flink SQL 语法篇(九):Window TopN、Deduplication
  • Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints

😊 如果您觉得这篇文章有用 ✔️ 的话,请给博主一个一键三连 🚀🚀🚀 吧 (点赞 🧡、关注 💛、收藏 💚)!!!您的支持 💖💖💖 将激励 🔥 博主输出更多优质内容!!!

Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints 

1.EXPLAIN 子句

EXPLAIN

子句其实就是用于查看当前这个 SQL 查询的逻辑计划以及优化的执行计划。

SQL 语法标准:

EXPLAINPLANFOR<query_statement_or_insert_statement>

实际案例:

public class Explain_Test {

    public static void main(String[] args) throws Exception {

        FlinkEnv flinkEnv = FlinkEnvUtils.getStreamTableEnv(args);

        flinkEnv.env().setParallelism(1);

        String sql="CREATE TABLE source_table (\n"+"    user_id BIGINT COMMENT '用户 id',\n"+"    name STRING COMMENT '用户姓名',\n"+"    server_timestamp BIGINT COMMENT '用户访问时间戳',\n"+"    proctime AS PROCTIME()\n"+") WITH (\n"+"  'connector' = 'datagen',\n"+"  'rows-per-second' = '1',\n"+"  'fields.name.length' = '1',\n"+"  'fields.user_id.min' = '1',\n"+"  'fields.user_id.max' = '10',\n"+"  'fields.server_timestamp.min' = '1',\n"+"  'fields.server_timestamp.max' = '100000'\n"+");\n"+"\n"+"CREATE TABLE sink_table (\n"+"    user_id BIGINT,\n"+"    name STRING,\n"+"    server_timestamp BIGINT\n"+") WITH (\n"+"  'connector' = 'print'\n"+");\n"+"\n"+"EXPLAIN PLAN FOR\n"+"INSERT INTO sink_table\n"+"select user_id,\n"+"       name,\n"+"       server_timestamp\n"+"from (\n"+"      SELECT\n"+"          user_id,\n"+"          name,\n"+"          server_timestamp,\n"+"          row_number() over(partition by user_id order by proctime) as rn\n"+"      FROM source_table\n"+")\n"+"where rn = 1";/**
         * 算子 {@link org.apache.flink.streaming.api.operators.KeyedProcessOperator}
         *      -- {@link org.apache.flink.table.runtime.operators.deduplicate.ProcTimeDeduplicateKeepFirstRowFunction}
         */for(String innerSql : sql.split(";")) {
            TableResult tableResult = flinkEnv.streamTEnv().executeSql(innerSql);

            tableResult.print();
        }
    }
}

上述代码执行结果如下:

1. 抽象语法树
== Abstract Syntax Tree ==
LogicalSink(table=[default_catalog.default_database.sink_table],fields=[user_id, name, server_timestamp])+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2])+- LogicalFilter(condition=[=($3,1)])+- LogicalProject(user_id=[$0], name=[$1], server_timestamp=[$2], rn=[ROW_NUMBER()OVER(PARTITIONBY $0ORDERBY PROCTIME() NULLS FIRST)])+- LogicalTableScan(table=[[default_catalog, default_database, source_table]])2. 优化后的物理计划
== Optimized Physical Plan==
Sink(table=[default_catalog.default_database.sink_table],fields=[user_id, name, server_timestamp])+- Calc(select=[user_id, name, server_timestamp])+- Deduplicate(keep=[FirstRow],key=[user_id],order=[PROCTIME])+- Exchange(distribution=[hash[user_id]])+- Calc(select=[user_id, name, server_timestamp, PROCTIME()AS $3])+- TableSourceScan(table=[[default_catalog, default_database, source_table]],fields=[user_id, name, server_timestamp])3. 优化后的执行计划
== Optimized Execution Plan==
Sink(table=[default_catalog.default_database.sink_table],fields=[user_id, name, server_timestamp])+- Calc(select=[user_id, name, server_timestamp])+- Deduplicate(keep=[FirstRow],key=[user_id],order=[PROCTIME])+- Exchange(distribution=[hash[user_id]])+- Calc(select=[user_id, name, server_timestamp, PROCTIME()AS $3])+- TableSourceScan(table=[[default_catalog, default_database, source_table]],fields=[user_id, name, server_timestamp])

2.USE 子句

如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,

USE

子句通常被用于切换库,那么在 Flink SQL 体系中,它的作用也是和 MySQL 中

USE

子句的功能基本一致,用于切换 Catalog,DataBase,使用 Module。

  • 切换 Catalog
USE CATALOG catalog_name
  • 使用 Module
USE MODULES module_name1[, module_name2,...]
  • 切换 Database
USE db名称

实际案例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// create a catalog
tEnv.executeSql("CREATE CATALOG cat1 WITH (...)");
tEnv.executeSql("SHOW CATALOGS").print();// +-----------------+// |    catalog name |// +-----------------+// | default_catalog |// | cat1            |// +-----------------+// change default catalog
tEnv.executeSql("USE CATALOG cat1");

tEnv.executeSql("SHOW DATABASES").print();// databases are empty// +---------------+// | database name |// +---------------+// +---------------+// create a database
tEnv.executeSql("CREATE DATABASE db1 WITH (...)");
tEnv.executeSql("SHOW DATABASES").print();// +---------------+// | database name |// +---------------+// |        db1    |// +---------------+// change default database
tEnv.executeSql("USE db1");// change module resolution order and enabled status
tEnv.executeSql("USE MODULES hive");
tEnv.executeSql("SHOW FULL MODULES").print();// +-------------+-------+// | module name |  used |// +-------------+-------+// |        hive |  true |// |        core | false |// +-------------+-------+

3.SHOW 子句

如果熟悉 MySQL 的同学会非常熟悉这个子句,在 MySQL 中,

SHOW

子句常常用于查询库、表、函数等,在 Flink SQL 体系中也类似。Flink SQL 支持

SHOW

以下内容。

SQL 语法标准:

  • SHOW CATALOGS:展示所有 Catalog
  • SHOW CURRENT CATALOG:展示当前的 Catalog
  • SHOW DATABASES:展示当前 Catalog 下所有 Database
  • SHOW CURRENT DATABASE:展示当前的 Database
  • SHOW TABLES:展示当前 Database 下所有表
  • SHOW VIEWS:展示所有视图
  • SHOW FUNCTIONS:展示所有的函数
  • SHOW MODULES:展示所有的 Module(Module 是用于 UDF 扩展)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// show catalogs
tEnv.executeSql("SHOW CATALOGS").print();// +-----------------+// |    catalog name |// +-----------------+// | default_catalog |// +-----------------+// show current catalog
tEnv.executeSql("SHOW CURRENT CATALOG").print();// +----------------------+// | current catalog name |// +----------------------+// |      default_catalog |// +----------------------+// show databases
tEnv.executeSql("SHOW DATABASES").print();// +------------------+// |    database name |// +------------------+// | default_database |// +------------------+// show current database
tEnv.executeSql("SHOW CURRENT DATABASE").print();// +-----------------------+// | current database name |// +-----------------------+// |      default_database |// +-----------------------+// create a table
tEnv.executeSql("CREATE TABLE my_table (...) WITH (...)");// show tables
tEnv.executeSql("SHOW TABLES").print();// +------------+// | table name |// +------------+// |   my_table |// +------------+// create a view
tEnv.executeSql("CREATE VIEW my_view AS ...");// show views
tEnv.executeSql("SHOW VIEWS").print();// +-----------+// | view name |// +-----------+// |   my_view |// +-----------+// show functions
tEnv.executeSql("SHOW FUNCTIONS").print();// +---------------+// | function name |// +---------------+// |           mod |// |        sha256 |// |           ... |// +---------------+// create a user defined function
tEnv.executeSql("CREATE FUNCTION f1 AS ...");// show user defined functions
tEnv.executeSql("SHOW USER FUNCTIONS").print();// +---------------+// | function name |// +---------------+// |            f1 |// |           ... |// +---------------+// show modules
tEnv.executeSql("SHOW MODULES").print();// +-------------+// | module name |// +-------------+// |        core |// +-------------+// show full modules
tEnv.executeSql("SHOW FULL MODULES").print();// +-------------+-------+// | module name |  used |// +-------------+-------+// |        core |  true |// |        hive | false |// +-------------+-------+

4.LOAD、UNLOAD 子句

我们可以使用

LOAD

子句去加载 Flink SQL 体系内置的或者用户自定义的 Module,

UNLOAD

子句去卸载 Flink SQL 体系内置的或者用户自定义的 Module。

SQL 语法标准:

-- 加载LOAD MODULE module_name [WITH('key1'='val1','key2'='val2',...)]-- 卸载
UNLOAD MODULE module_name
  • LOAD 案例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 加载 Flink SQL 体系内置的 Hive module
tEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '3.1.2')");
tEnv.executeSql("SHOW MODULES").print();// +-------------+// | module name |// +-------------+// |        core |// |        hive |// +-------------+
  • UNLOAD 案例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);// 卸载唯一的一个 CoreModule
tEnv.executeSql("UNLOAD MODULE core");
tEnv.executeSql("SHOW MODULES").print();// 结果啥 Moudle 都没有了

5.SET、RESET 子句

SET

子句可以用于修改一些 Flink SQL 的环境配置,

RESET

子句是可以将所有的环境配置恢复成默认配置,但只能在 SQL CLI 中进行使用,主要是为了让用户更纯粹的使用 SQL 而不必使用其他方式或者切换系统环境。

SET(key=value)?

RESET (key)?

启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:

Flink SQL>SETtable.planner = blink;[INFO]Session property has been set.

Flink SQL>SET;table.planner=blink;

Flink SQL> RESET table.planner;[INFO]Session property has been reset.

Flink SQL> RESET;[INFO]Allsession properties have been setto their defaultvalues.

6.SQL Hints

Hints(提示)是一种机制,用来告诉优化器按照我们的告诉它的方式生成执行计划。

比如有一个 Kafka 数据源表

kafka_table1

,用户想直接从

latest-offset

Select 一些数据出来预览,其元数据已经存储在 Hive MetaStore 中,但是 Hive MetaStore 中存储的配置中的

scan.startup.mode

earliest-offset

,通过 SQL Hints,用户可以在 DML 语句中将

scan.startup.mode

改为

latest-offset

查询,因此可以看出 SQL Hints 常用语这种比较临时的参数修改,比如 Ad-hoc 这种临时查询中,方便用户使用自定义的新的表参数而不是 Catalog 中已有的表参数。

以下 DML SQL 中的

/*+ OPTIONS(key=val [, key=val]*) */

就是 SQL Hints。

SELECT*FROM table_path /*+ OPTIONS(key=val [, key=val]*) */

启动一个 SQL CLI 之后,在 SQL CLI 中可以进行以下 SET 设置:

CREATETABLE kafka_table1 (id BIGINT, name STRING, age INT)WITH(...);CREATETABLE kafka_table2 (id BIGINT, name STRING, age INT)WITH(...);-- 1. 使用 'scan.startup.mode'='earliest-offset' 覆盖原来的 scan.startup.modeselect id, name from kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */;-- 2. 使用 'scan.startup.mode'='earliest-offset' 覆盖原来的 scan.startup.modeselect*from
    kafka_table1 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t1
    join
    kafka_table2 /*+ OPTIONS('scan.startup.mode'='earliest-offset') */ t2
    on t1.id = t2.id;-- 3. 使用 'sink.partitioner'='round-robin' 覆盖原来的 Sink 表的 sink.partitionerinsertinto kafka_table1 /*+ OPTIONS('sink.partitioner'='round-robin') */select*from kafka_table2;
标签: 大数据 flink sql

本文转载自: https://blog.csdn.net/be_racle/article/details/136379997
版权归原作者 G皮T 所有, 如有侵权,请联系我们删除。

“【大数据】Flink SQL 语法篇(十):EXPLAIN、USE、LOAD、SET、SQL Hints”的评论:

还没有评论