15、Flink SQL
15.1、sql-client准备
- 启用Hadoop集群(在Hadoop100上)
start-all.sh
- 启用yarn-session模式
/export/soft/flink-1.13.0/bin/yarn-session.sh -d
- 启动sql-client
bin/sql-client.sh embedded -s yarn-session
sql文件初始化
可以初始化模式、环境(流/批)、并行度、ttl、数据库
- 创建文件,可在文件中编写sql语句完成建表初始化
vim conf/sql-client-init.sql
- 启动
bin/sql-client.sh embedded -s yarn-session -i conf/sql-client-init.sql
15.2、流处理的表
普通MYSQL流处理SQL处理的数据对象有界集合无线序列查询访问可以查询完整的数据无法访问到所有数据,持续等待输入查询终止条件生成固定大小结果即终止永不停止,不断更新查询结果
15.2.1、动态表和持续查询
- 动态表当流中有数据来,初始的表会插入一行;基于这个表的查询应更新查询结果。这样得到的表会动态变化,即动态表
- 持续查询对动态表的查询永不停止
15.2.2、流转动态表
每来一条数据向表中插入一条数据
15.2.3、SQL持续查询
- 更新查询随着数据不断到来,查询的结果需要不断更新,更新查询得到的结果表如要转成DataStream,必须用toChangelogStream()方法
- 追加查询开窗后的查询结果不会再变,只会随着窗口推移不断追加
15.2.4、动态表转流
动态表转流需要对更改操作编码,tableAPI和SQL支持三种编码方式:
- 仅追加流流中的发出的数据就是动态表新增的每一行,多在开窗条件下
- 撤回流撤回流调用toChangelogStream(),包含添加消息和撤回消息insert为add消息,delete为retract消息,update为被更改行的retract消息和更新后行的add消息,输出结果会膨胀
- 更新插入流包含两种类型消息:更新插入消息和删除消息insert和update统一编码为upsert
动态表转流只支持仅追加流和撤回流,连接外部系统才支持更新插入流
4、查询限制
在实际应用中,有些持续查询会因为计算代价太高而受到限制。所谓的“代价太高”,可能是由于需要维护的状态持续增长,也可能是由于更新数据的计算太复杂。
- 状态大小用持续查询做流处理,往往会运行至少几周到几个月;所以持续查询处理的数据总量可能非常大。例如我们之前举的更新查询的例子,需要记录每个用户访问url 的次数。如果随着时间的推移用户数越来越大,那么要维护的状态也将逐渐增长,最终可能会耗尽存储空间导致查询失败
- 更新计算对于有些查询来说,更新计算的复杂度可能很高。每来一条新的数据,更新结果的时候可能需要全部重新计算,并且对很多已经输出的行进行更新。一个典型的例子就是 RANK()函数, 它会基于一组数据计算当前值的排名。例如下面的 SQL 查询,会根据用户最后一次点击的时间为每个用户计算一个排名。当我们收到一个新的数据,用户的最后一次点击时间(lastAction) 就会更新,进而所有用户必须重新排序计算一个新的排名。当一个用户的排名发生改变时,被他超过的那些用户的排名也会改变;这样的更新操作无疑代价巨大,而且还会随着用户的增多越来越严重
15.3、DDL数据定义
15.3.1、数据库
- 建库
createdatabase db_flink;
- 查询
showdatabases;
- 切换数据库
use mydatabase;
15.3.2、表
- 建表使用kafka的元数据建表
createtable MyTable('user_id' string,'name' string,'record time' timestamp_ltz(3) metadata from'timestamp')with('connector'='kafka');
其他现用现查 - 示例查看数据库
showdatabases;
切换数据库use mydatabase;
建表createtable test(id int,ts bigint,vc int)with('connnector'='print');
查看表showtables;
使用like建表createtable test1(name string)like test;
查看表信息desc test1;
15.4、TableAPI
15.41、简单测试
引入依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency>
创建测试类
packagetable;importjava.sql.Timestamp;/**
* @Title: Event
* @Author lizhe
* @Package table
* @Date 2024/6/17 19:01
* @description:
*/publicclassEvent{publicString user;publicString url;publicLong timestamp;publicEvent(){}publicEvent(String user,String url,Long timestamp){this.user = user;this.url = url;this.timestamp = timestamp;}@OverridepublicStringtoString(){return"Event{"+"user='"+ user +'\''+", url='"+ url +'\''+", timestamp="+newTimestamp(timestamp)+'}';}}
模拟数据生成
packagetable;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importjava.awt.*;importjava.sql.Timestamp;importjava.util.Calendar;importjava.util.Random;/**
* @Title: ClickSource
* @Author lizhe
* @Package table
* @Date 2024/6/17 13:50
* @description:
*/publicclassClickSourceimplementsSourceFunction<Event>{privateBoolean running =true;@Overridepublicvoidrun(SourceContext<Event> ctx)throwsException{Random random =newRandom();// 在指定的数据集中随机选取数据String[] users ={"Mary","Alice","Bob","Cary"};String[] urls ={"./home","./cart","./fav","./prod?id=1","./prod?id=2"};while(running){String user = users[random.nextInt(users.length)];String url = urls[random.nextInt(urls.length)];long timestamp =Calendar.getInstance().getTimeInMillis();
ctx.collect(newEvent(
user,
url,
timestamp
));// 隔 1 秒生成一个点击事件,方便观测Thread.sleep(1000);}}@Overridepublicvoidcancel(){
running =false;}}
测试用例
packagetable;importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.table.api.Table;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.TableEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importjava.time.Duration;importstaticorg.apache.flink.table.api.Expressions.$;/**
* @Title: SimpleTableDemo
* @Author lizhe
* @Package table
* @Date 2024/6/17 19:38
* @description:
*/publicclassSimpleTableDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv =StreamTableEnvironment.create(env);SingleOutputStreamOperator<Event> eventDS = env.addSource(newClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(newSerializableTimestampAssigner<Event>(){@OverridepubliclongextractTimestamp(Event element,long recordTimestamp){return element.timestamp;}}));Table eventTable = tEnv.fromDataStream(eventDS);//使用TableAPITable result1 = eventTable.select($("user")).where($("user").isEqual("Alice"));//使用SQLTable result2 = tEnv.sqlQuery("select user,url from "+ eventTable);
tEnv.toDataStream(result1).print("result1");
tEnv.toDataStream(result2).print("result2");
env.execute();}}
15.4.2、创建环境
- 创建表环境
- 创建输入表,连接外部系统读取数据
- 注册一个表,连接到外部系统用于输出
- 执行SQL对表进行查询转换得到新表(或者使用TableAPI对表进行查询转换得到新表)。
- 将结果写入输出表
TableEnvironment
是 Table API 和 SQL 的核心概念。它负责:
- 在内部的 catalog 中注册
Table
- 注册外部的 catalog
- 加载可插拔模块
- 执行 SQL 查询
- 注册自定义函数 (scalar、table 或 aggregation)
DataStream
和Table
之间的转换(面向StreamTableEnvironment
)
Table
总是与特定的
TableEnvironment
绑定。
TableEnvironment
可以通过静态方法
TableEnvironment.create()
创建。
packagetable;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.TableEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;/**
* @Title: CommonAPI
* @Author lizhe
* @Package table
* @Date 2024/6/19 21:20
* @description:
*/publicclassCommonAPITest{publicstaticvoidmain(String[] args)throwsException{// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1);// StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//1、定义环境配置来创建表执行环境EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();TableEnvironment tableEnvironment =TableEnvironment.create(environmentSettings);}}
15.4.3、创建表
创建表方式:通过连接器创建和虚拟表创建
连接器表:通过连接器连接到外部系统,并定义出对应的表结构。
虚拟表:在 SQL 的术语中,Table API 的对象对应于
视图
(虚拟表)。 从传统数据库系统的角度来看,
Table
对象与
VIEW
视图非常像。
如果多个查询都引用了同一个注册了的
Table
,那么它会被内嵌每个查询中并被执行多次, 也就是说注册了的
Table
的结果不会被共享。为了方便地查询表,表环境中会维护一个目录(Catalog)和表的对应关系。所以表都是通过目录(Catalog)来进行注册创建的。表在环境中有一个唯一的ID,由三部分组成:目录(catalog)名.数据库(database)名.表名。
测试数据input/clicks.txt
Mary, ./home,1000
Bob, ./cart,2000
Alice, ./prod?id=100,3000
Bob, ./home,3214
Bob, ./cart,2000
Bob, ./home,321
Bob, ./cart,532
Bob, ./home,2000
Bob, ./cart,43356
Bob, ./home,2000
Bob, ./cart,76533
代码
packagetable;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.TableEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importstaticorg.apache.flink.table.api.Expressions.$;/**
* @Title: CommonAPI
* @Author lizhe
* @Package table
* @Date 2024/6/19 21:20
* @description:
*/publicclassCommonAPITest{publicstaticvoidmain(String[] args)throwsException{// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1);// StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//1、定义环境配置来创建表执行环境EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();TableEnvironment tableEnvironment =TableEnvironment.create(environmentSettings);//创建表String createDDl="create table clickTable("+"user_name STRING,"+"url STRING,"+"ts BIGINT "+")with ("+"'connector'='filesystem',"+"'path'='input/clicks.txt',"+"'format'='csv')";
tableEnvironment.executeSql(createDDl);//调用TableAPI进行表的查询转换Table clickTable = tableEnvironment.from("clickTable");Table resultTable = clickTable.where($("user_name").isEqual("Bob")).select($("user_name"), $("url"));
tableEnvironment.createTemporaryView("result2",resultTable);//执行sql进行表的查询转换Table resultTable2 = tableEnvironment.sqlQuery("select user_name,url from result2");//创建一张用于输出的表String createOutDDl="create table clickOutTable("+"user_name STRING,"+"url STRING"+// "ts BIGINT " +")with ("+"'connector'='filesystem',"+"'path'='output',"+"'format'='csv')";
tableEnvironment.executeSql(createOutDDl);//输出表
resultTable.executeInsert("clickOutTable");}}
输出output
单个文件内容
Bob," ./home"
Bob," ./cart"
使用控制台输出
packagetable;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.TableEnvironment;importstaticorg.apache.flink.table.api.Expressions.$;/**
* @Title: CommonAPI
* @Author lizhe
* @Package table
* @Date 2024/6/19 21:20
* @description:
*/publicclassCommonAPITest{publicstaticvoidmain(String[] args)throwsException{// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1);// StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//1、定义环境配置来创建表执行环境EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();TableEnvironment tableEnvironment =TableEnvironment.create(environmentSettings);//创建表String createDDl="create table clickTable("+"user_name STRING,"+"url STRING,"+"ts BIGINT "+")with ("+"'connector'='filesystem',"+"'path'='input/clicks.txt',"+"'format'='csv')";
tableEnvironment.executeSql(createDDl);//调用TableAPI进行表的查询转换Table clickTable = tableEnvironment.from("clickTable");Table resultTable = clickTable.where($("user_name").isEqual("Bob")).select($("user_name"), $("url"));
tableEnvironment.createTemporaryView("result2",resultTable);//执行sql进行表的查询转换Table resultTable2 = tableEnvironment.sqlQuery("select user_name,url from result2");//创建一张用于输出的表String createOutDDl="create table clickOutTable("+"user_name STRING,"+"url STRING"+// "ts BIGINT " +")with ("+"'connector'='filesystem',"+"'path'='output',"+"'format'='csv')";
tableEnvironment.executeSql(createOutDDl);//创建一张用于控制台打印的输出表String createPrintOutDDl="create table printOutTable("+"user_name STRING,"+"url STRING"+// "ts BIGINT " +")with ("+"'connector'='print')";
tableEnvironment.executeSql(createPrintOutDDl);//输出表// resultTable.executeInsert("clickOutTable");
resultTable2.executeInsert("printOutTable");}}
聚合函数
packagetable;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.TableEnvironment;importstaticorg.apache.flink.table.api.Expressions.$;/**
* @Title: CommonAPI
* @Author lizhe
* @Package table
* @Date 2024/6/19 21:20
* @description:
*/publicclassCommonAPITest{publicstaticvoidmain(String[] args)throwsException{// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// env.setParallelism(1);// StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);//1、定义环境配置来创建表执行环境EnvironmentSettings environmentSettings =EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build();TableEnvironment tableEnvironment =TableEnvironment.create(environmentSettings);//创建表String createDDl="create table clickTable("+"user_name STRING,"+"url STRING,"+"ts BIGINT "+")with ("+"'connector'='filesystem',"+"'path'='input/clicks.txt',"+"'format'='csv')";
tableEnvironment.executeSql(createDDl);//调用TableAPI进行表的查询转换Table clickTable = tableEnvironment.from("clickTable");Table resultTable = clickTable.where($("user_name").isEqual("Bob")).select($("user_name"), $("url"));
tableEnvironment.createTemporaryView("result2",resultTable);//执行sql进行表的查询转换Table resultTable2 = tableEnvironment.sqlQuery("select user_name,url from result2");//执行聚合计算的查询转换Table aggRes = tableEnvironment.sqlQuery("select user_name ,count(url) as cnt from clickTable group by user_name");//创建一张用于输出的表String createOutDDl="create table clickOutTable("+"user_name STRING,"+"url STRING"+// "ts BIGINT " +")with ("+"'connector'='filesystem',"+"'path'='output',"+"'format'='csv')";
tableEnvironment.executeSql(createOutDDl);//创建一张用于控制台打印的输出表String createPrintOutDDl="create table printOutTable("+"user_name STRING,"+"cnt BIGINT"+// "ts BIGINT " +")with ("+"'connector'='print')";
tableEnvironment.executeSql(createPrintOutDDl);//输出表// resultTable.executeInsert("clickOutTable");// resultTable2.executeInsert("printOutTable");
aggRes.executeInsert("printOutTable");}}
表和流的转换
1、表转流
toDataStream()针对只插入数据的流
tEnv.toDataStream(result1).print("result1");
toChangelogStream()针对有更新操作的流,可以替代toDataStream()方法
packagetable;importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.table.api.Table;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.TableEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importjava.time.Duration;importstaticorg.apache.flink.table.api.Expressions.$;/**
* @Title: SimpleTableDemo
* @Author lizhe
* @Package table
* @Date 2024/6/17 19:38
* @description:
*/publicclassSimpleTableDemo{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tEnv =StreamTableEnvironment.create(env);SingleOutputStreamOperator<Event> eventDS = env.addSource(newClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(newSerializableTimestampAssigner<Event>(){@OverridepubliclongextractTimestamp(Event element,long recordTimestamp){return element.timestamp;}}));Table eventTable = tEnv.fromDataStream(eventDS);//使用TableAPITable result1 = eventTable.select($("user")).where($("user").isEqual("Alice"));//使用SQLTable result2 = tEnv.sqlQuery("select user,url from "+ eventTable);
tEnv.toDataStream(result1).print("result1");
tEnv.toDataStream(result2).print("result2");//聚合转换
tEnv.createTemporaryView("clickTable",eventTable);Table aggRes = tEnv.sqlQuery("select user ,count(url) as cnt from clickTable group by user");
tEnv.toChangelogStream(aggRes).print("aggRes");
env.execute();}}
2、流转表
调用 fromDataStream()方法
// 读取数据源SingleOutputStreamOperator<Event> eventStream = env.addSource(...)// 将数据流转换成表,可以提取流中某些字段Table eventTable = tableEnv.fromDataStream(eventStream, $("timestamp").as("ts"),
$("url"));
调用createTemporaryView()方法
调用 fromDataStream()方法简单直观,可以直接实现DataStream 到 Table 的转换;不过如果我们希望直接在 SQL 中引用这张表,就还需要调用表环境的 createTemporaryView()方法来创建虚拟视图了。
tableEnv.createTemporaryView("EventTable", eventStream,$("timestamp").as("ts"),$("url"));
调用 fromChangelogStream ()方法,可以将一个更新日志流转换成表
DataStream<Row> dataStream = env.fromElements(Row.ofKind(RowKind.INSERT,"Alice",12),Row.ofKind(RowKind.INSERT,"Bob",5),Row.ofKind(RowKind.UPDATE_BEFORE,"Alice",12),Row.ofKind(RowKind.UPDATE_AFTER,"Alice",100));// 将更新日志流转换为表Table table = tableEnv.fromChangelogStream(dataStream);
15.5、时间属性
基于时间的操作(如开窗)要定义相关的时间语义和时间数据来源的信息。在tableAPI和SQL中会给表单独提供一个逻辑上的时间字段专用指示时间。
时间属性可以在建表时指定也可流转表时定义,时间属性的数据类型为timestamp
15.5.1、事件时间
通过WaterMark来定义事件时间属性
createtable eventTable(user string,
url string,
ts timestamp(3),
watermark for ts as ts -interval'5'second)with(...);
上面的语句将ts字段定义为事件时间属性,并基于ts设置了5s的水位延迟
packagetable;importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importjava.time.Duration;importstaticorg.apache.flink.table.api.Expressions.$;/**
* @Title: TimeAndWindowTest
* @Author lizhe
* @Package table
* @Date 2024/6/20 21:21
* @description:
*/publicclassTimeAndWindowTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);StreamTableEnvironment tableEnvironment =StreamTableEnvironment.create(env);//在建表的DDL中直接定义时间属性String createDDl="create table clickTable("+"user_name STRING,"+"url STRING,"+"ts BIGINT ,"+"et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000)),"+"WATERMARK FOR et AS et - INTERVAL '1' SECOND"+")with ("+"'connector'='filesystem',"+"'path'='input/clicks.txt',"+"'format'='csv')";
tableEnvironment.executeSql(createDDl);//在流转换成table时定义时间属性SingleOutputStreamOperator<Event> clickStream = env.addSource(newClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(newSerializableTimestampAssigner<Event>(){@OverridepubliclongextractTimestamp(Event element,long recordTimestamp){return element.timestamp;}}));Table clickTable = tableEnvironment.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime());
clickTable.printSchema();}}
时间戳必须是timestamp类型,类型转换
ts BIGINT,
time_ltz as to_timestamp_ltz(ts,3),
15.5.2、处理时间
处理时间属性的定义也有两种方式:创建表 DDL 中定义,或者在数据流转换成表时定义。
1、在创建表的DDL 中定义
在创建表的 DDL(CREATE TABLE 语句)中,可以增加一个额外的字段,通过调用系统内置的 PROCTIME()函数来指定当前的处理时间属性,返回的类型是TIMESTAMP_LTZ。
createtable eventTable(user string,
url string,
ts as proctime())with(...);
可以用一个 AS 语句来在表中产生数据中不存在的列, 并且可以利用原有的列、各种运算符及内置函数。在前面事件时间属性的定义中,将 ts 字段转换成 TIMESTAMP_LTZ 类型的 ts_ltz,也是计算列的定义方式
2、在数据流转换为表时定义
处理时间属性同样可以在将 DataStream 转换为表的时候来定义。 我们调用fromDataStream()方法创建表时,可以用.proctime()后缀来指定处理时间属性字段。由于处理时间是系统时间,原始数据中并没有这个字段,所以处理时间属性一定不能定义在一个已有字段上,只能定义在表结构所有字段的最后,作为额外的逻辑字段出现。
Table table = tEnv.fromDataStream(stream, $("user"), $("url"),$("ts").proctime());
15.5.3、窗口
以滚动窗口为例:
这里的 ts 是定义好的时间属性字段,窗口大小用“时间间隔”INTERVAL 来定义。在进行窗口计算时,分组窗口是将窗口本身当作一个字段对数据进行分组的,可以对组内的数据进行聚合。基本使用方式如下
Table result = tableEnv.sqlQuery("SELECT "+"user, "+"TUMBLE_END(ts, INTERVAL '1' HOUR) as endT, "+"COUNT(url) AS cnt "+"FROM EventTable "+"GROUP BY "+// 使用窗口和用户名进行分组"user, "+"TUMBLE(ts, INTERVAL '1' HOUR)"// 定义 1 小时滚动窗口);
分组窗口的功能比较有限,只支持窗口聚合,所以目前已经处于弃用(deprecated)的状态。
1.13 版本开始,Flink 开始使用窗口表值函数(Windowing table-valued functions, Windowing TVFs)来定义窗口。直接调用 TUMBLE()、HOP()、CUMULATE()就可以实现滚动、滑动和累积窗口,不过传入的参数会有所不同。
- 滚动窗口:TUMBLE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL ‘1’ HOUR)。这里基于时间字段 ts,对表 EventTable 中的数据开了大小为 1 小时的滚动窗口。窗口会将表中的每一行数据,按照它们 ts 的值分配到一个指定的窗口中。
- 滑动窗口:HOP(TABLE EventTable, DESCRIPTOR(ts), INTERVAL ‘5’ MINUTES, INTERVAL ‘1’ HOURS));这里我们基于时间属性 ts,在表 EventTable 上创建了大小为 1 小时的滑动窗口,每 5 分钟滑动一次。需要注意的是,紧跟在时间属性字段后面的第三个参数是步长(slide),第四个参数才是窗口大小(size)。
- 累积窗口:CUMULATE(TABLE EventTable, DESCRIPTOR(ts), INTERVAL ‘1’ HOURS, INTERVAL ‘1’ DAYS));累积窗口中有两个核心的参数:最大窗口长度(max window size)和累积步长(step)
15.5.4、聚合查询
15.5.4.1、分组聚合
Table aggTable = tableEnvironment.sqlQuery("select user_name ,count(url) from clickTable group by user_name");
15.5.4.2、时间窗口聚合
滚动窗口
Table tumbleWindowResult = tableEnvironment.sqlQuery("SELECT "+"user_name, "+"window_end AS endT, "+"COUNT(url) AS cnt "+"FROM TABLE( "+"TUMBLE( TABLE clickTable, "+"DESCRIPTOR(et), "+"INTERVAL '10' SECOND)) "+"GROUP BY user_name, window_start, window_end ");
滑动窗口
Table hopWindowResult = tableEnvironment.sqlQuery("SELECT "+"user_name, "+"window_end AS endT, "+"COUNT(url) AS cnt "+"FROM TABLE( "+"HOP( TABLE clickTable, "+"DESCRIPTOR(et),INTERVAL '5' SECOND ,"+"INTERVAL '10' SECOND)) "+"GROUP BY user_name, window_start, window_end ");
累积窗口
Table cumulateWindowResult = tableEnvironment.sqlQuery("SELECT "+"user_name, "+"window_end AS endT, "+"COUNT(url) AS cnt "+"FROM TABLE( "+"CUMULATE( TABLE clickTable, "+"DESCRIPTOR(et),INTERVAL '5' SECOND ,"+"INTERVAL '10' SECOND)) "+"GROUP BY user_name, window_start, window_end ");
15.5.4.3、开窗聚合
可根据行数进行开窗,开窗选择的范围可以基于时间,也可以基于数据的数量,以每一行数据为基准,计算它之前 1 小时内所有数据的平均值;也可以计算它之前 10 个数的平均值。开窗函数的聚合与之前两种聚合有本质的不同:分组聚合、窗口 TVF聚合都是“多对一”的关系,将数据分组之后每组只会得到一个聚合结果;而开窗函数是对每行都要做一次开窗聚合,因此聚合之后表中的行数不会有任何减少,是一个“多对多”的关系。
SELECT<聚合函数>OVER([PARTITIONBY<字段 1>[,<字段 2>,...]]ORDERBY<时间属性字段><开窗范围>),...FROM...
PARTITION BY(可选)用来指定分区的键(key),类似于 GROUP BY 的分组
开窗范围:还有一个必须要指定的就是开窗的范围,也就是到底要扩展多少行来做聚合。这个范围是由BETWEEN <下界> AND <上界> 来定义的,也就是“从下界到上界” 的范围。目前支持的上界只能是 CURRENT ROW
BETWEEN...PRECEDINGANDCURRENTROW
- 范围间隔范围间隔以RANGE 为前缀,就是基于ORDER BY 指定的时间字段去选取一个范围,一般就是当前行时间戳之前的一段时间。当前行之前 1 小时的数据:
RANGE BETWEENINTERVAL'1'HOURPRECEDINGANDCURRENTROW
- 行间隔行间隔以 ROWS 为前缀,就是直接确定要选多少行,由当前行出发向前选取开窗范围选择当前行之前的 5 行数据(含当前行):
ROWSBETWEEN5PRECEDINGANDCURRENTROW
Table overWindowResult = tableEnvironment.sqlQuery("SELECT user_name,avg(ts) OVER( partition BY user_name ORDER BY et ROWS BETWEEN 3 PRECEDING AND CURRENT ROW )AS avg_ts FROM clickTable");
整体代码
packagetable;importorg.apache.flink.api.common.eventtime.SerializableTimestampAssigner;importorg.apache.flink.api.common.eventtime.WatermarkStrategy;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importjava.time.Duration;importstaticorg.apache.flink.table.api.Expressions.$;/**
* @Title: TimeAndWindowTest
* @Author lizhe
* @Package table
* @Date 2024/6/20 21:21
* @description:
*/publicclassTimeAndWindowTest{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);StreamTableEnvironment tableEnvironment =StreamTableEnvironment.create(env);//在建表的DDL中直接定义时间属性String createDDl="create table clickTable ("+"user_name STRING,"+"url STRING,"+"ts BIGINT ,"+"et AS TO_TIMESTAMP(FROM_UNIXTIME(ts/1000)),"+"WATERMARK FOR et AS et - INTERVAL '1' SECOND"+")with ("+"'connector'='filesystem',"+"'path'='input/clicks.txt',"+"'format'='csv')";
tableEnvironment.executeSql(createDDl);//在流转换成table时定义时间属性SingleOutputStreamOperator<Event> clickStream = env.addSource(newClickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(newSerializableTimestampAssigner<Event>(){@OverridepubliclongextractTimestamp(Event element,long recordTimestamp){return element.timestamp;}}));Table clickTable = tableEnvironment.fromDataStream(clickStream, $("user"), $("url"), $("timestamp").as("ts"), $("et").rowtime());//聚合查询Table aggTable = tableEnvironment.sqlQuery("select user_name ,count(url) from clickTable group by user_name");//窗口聚合Table tumbleWindowResult = tableEnvironment.sqlQuery("SELECT "+"user_name, "+"window_end AS endT, "+"COUNT(url) AS cnt "+"FROM TABLE( "+"TUMBLE( TABLE clickTable, "+"DESCRIPTOR(et), "+"INTERVAL '10' SECOND)) "+"GROUP BY user_name, window_start, window_end ");//滑动窗口Table hopWindowResult = tableEnvironment.sqlQuery("SELECT "+"user_name, "+"window_end AS endT, "+"COUNT(url) AS cnt "+"FROM TABLE( "+"HOP( TABLE clickTable, "+"DESCRIPTOR(et),INTERVAL '5' SECOND ,"+"INTERVAL '10' SECOND)) "+"GROUP BY user_name, window_start, window_end ");//累积窗口Table cumulateWindowResult = tableEnvironment.sqlQuery("SELECT "+"user_name, "+"window_end AS endT, "+"COUNT(url) AS cnt "+"FROM TABLE( "+"CUMULATE( TABLE clickTable, "+"DESCRIPTOR(et),INTERVAL '5' SECOND ,"+"INTERVAL '10' SECOND)) "+"GROUP BY user_name, window_start, window_end ");//开窗聚合Table overWindowResult = tableEnvironment.sqlQuery("SELECT user_name,avg(ts) OVER( partition BY user_name ORDER BY et ROWS BETWEEN 3 PRECEDING AND CURRENT ROW )AS avg_ts FROM clickTable");
clickTable.printSchema();
tableEnvironment.toChangelogStream(aggTable).print("aggTable");
tableEnvironment.toChangelogStream(tumbleWindowResult).print("tumbleWindowResult");
tableEnvironment.toChangelogStream(hopWindowResult).print("hopWindowResult");
tableEnvironment.toChangelogStream(cumulateWindowResult).print("cumulateWindowResult");
tableEnvironment.toChangelogStream(overWindowResult).print("overWindowResult");
env.execute();}}
15.5.5、联结查询
待续
15.5.6、函数
待续
15.6、连接外部系统
官网链接:https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/overview/
16、容错机制
16.1、检查点
数据流
保存点为红色的hello
将之前某个时间点所有的状态保存下来,这个存档就是检查点
遇到故障可从检查点恢复,从而不用再从头开始统计
16.1.1、检查点保存
- 周期性保存
- 保存的时间点:所有任务都恰好处理完一个相同数据之后保存状态,从而实现一个数据被完整处理。
- 保存的流程:关键是要等所有任务将同一个数据处理完毕
16.1.2、从检查点恢复
出现故障
恢复步骤:
- 重启应用:所有任务状态都会清空
- 读取检查点,重置状态:找到检查点,恢复快照并填充到对应状态
- 重置偏移量:从检查点之后开始处理数据,要更改偏移量
- 继续处理数据
16.1.3、检查点算法
- 检查点分界线Barrier
- 分布式快照算法(Barrier精准一次)
- 分布式快照算法(Barrier至少一次)
- 分布式快照算法(非Barrier精准一次)
总结:
- Barrier对齐:一个Task收到所有上游同一个编号的Barrier后,才会对自己的本地状态做备份 - 精准一次:对齐过程中,Barrier后的数据阻塞等待,不会越过Barrier- 至少一次:对齐过程中,先到的Barrier其后的数据不阻塞,接着计算
- 非Barrier对齐:一个Task收到第一个Barrier时开始执行备份,能保证精准一次 - 先到的Barrier将本地状态备份,后面的数据接着计算输出- 未到的Barrier其前面的数据接着计算输出,同时也保存到备份中- 最后一个Barrier到达该Task时,这个Task的备份结束
版权归原作者 空指针异常Null_Point_Ex 所有, 如有侵权,请联系我们删除。