Flink学习笔记
前言:今天是学习 flink 的第 19 天啦!学习了 flinkSQL 中窗口的应用,包括滚动窗口,滑动窗口,会话窗口,累计窗口,学会了如何计算累计值(类似于中视频计划中的累计播放量业务需求),多维数据分析等大数据热点问题,总结了很多自己的理解和想法,希望和大家多多交流,希望对大家有帮助!
Tips:"分享是快乐的源泉💧,在我的博客里,不仅有知识的海洋🌊,还有满满的正能量加持💪,快来和我一起分享这份快乐吧😊!
喜欢我的博客的话,记得点个红心❤️和小关小注哦!您的支持是我创作的动力!"
文章目录
六、FlinkSQL 窗口
1. 窗口表值函数(tvfs)
将流变成特殊的“批”处理,常用的窗口:
- 滑动窗口
- 滚动窗口
- 会话窗口(flink 1.14 版本支持)
- 累计窗口(flink 1.13 版本新增)
在 flink 1.13 之前,是一个特殊的 GroupWindowFunction
SELECT
TUMBLE_START( bidtime, INTERVAL '10' MINUTE),
TUMBLE_END( bidtime, INTERVAL '10' MINUTE),
TUMBLE_ROWTIME( bidtime, INTERVAL '10' MINUTE),
SUM(price)
FROM MyTable
GROUP BY TUMBLE( bidtime, INTERVAL '10' MINUTE),
在 flink 1.13 之后,用 Table-Value Function 进行语法标准化
SELECT window_start, window_end, window_time,SUM(price)FROMTABLE(
TUMBLE(TABLE MyTable, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES))GROUPBY window_start, window_end;
2. 窗口分类函数及聚合操作
2.1 滚动窗口(Tumble Windows)
语法:
TUMBLE(TABLE data,DESCRIPTOR(timecol), size)
data:一个表名。
timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到翻转窗口。
size:是指定滚动窗口宽度的持续时间。
数据:
2021-04-1508:05:00,4.00,C2021-04-1508:07:00,2.00,A2021-04-1508:09:00,5.00,D2021-04-1508:11:00,3.00,B2021-04-1508:13:00,1.00,E2021-04-1508:17:00,6.00,F
需求:现在有一个实时数据看板,需要计算当前每10分钟GMV的总和
packagecn.itcast.day02.Window;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.EnvironmentSettings;importorg.apache.flink.table.api.Table;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;importorg.apache.flink.types.Row;/**
* @author lql
* @time 2024-03-16 17:33:47
* @description TODO
*/publicclassGroupWindowsSqlTumbleExample{publicstaticvoidmain(String[] args)throwsException{//todo 1)构建flink流处理的运行环境StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();//todo 2)设置并行度
env.setParallelism(1);//todo 3)构建flink的表的运行环境EnvironmentSettings settings =EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tabEnv =StreamTableEnvironment.create(env, settings);String filePath =GroupWindowsSqlTumbleExample.class.getClassLoader().getResource("bid.csv").getPath();
tabEnv.executeSql("create table Bid("+"bidtime TIMESTAMP(3),"+"price DECIMAL(10, 2), "+"item string,"+"watermark for bidtime as bidtime - interval '1' second) "+"with("+"'connector' = 'filesystem',"+"'path' = 'file:///"+filePath+"',"+"'format' = 'csv'"+")");Table table = tabEnv.sqlQuery(""+"select window_start,window_end,sum(price) as sum_price "+" from table("+" tumble(table Bid, DESCRIPTOR(bidtime), interval '10' MINUTES))"+" group by window_start,window_end");
tabEnv.toAppendStream(table,Row.class).print();
env.execute();}}
结果:
+I[2021-04-15T08:00,2021-04-15T08:10,11.00]+I[2021-04-15T08:10,2021-04-15T08:20,10.00]
2.2 滑动窗口(Hop Windows)
语法:
HOP(TABLE data,DESCRIPTOR(timecol), slide, size [, offset ])
data:是一个表名。
timecol:是一个列描述符,指示应将数据的哪个时间属性列映射到滑动窗口。
slide:是一个持续时间,指定了连续跳跃窗口开始之间的持续时间
size:是指定跳变窗口宽度的持续时间
需求:每隔 5 分钟,统计 10 分钟的数据
package cn.itcast.day02.Window;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @author lql
* @time 2024-03-16 19:28:30
* @description TODO
*/
public class GroupWindowsSqlHopExample {
public static void main(String[] args) throws Exception {
//todo 1)构建flink流处理的运行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//todo 2)设置并行度
env.setParallelism(1);
//todo 3)构建flink的表的运行环境
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env, settings);
String filePath = GroupWindowsSqlHopExample.class.getClassLoader().getResource("bid.csv").getPath();
tabEnv.executeSql("create table Bid(" +
"bidtime TIMESTAMP(3)," +
"price DECIMAL(10, 2), " +
"item string," +
"watermark for bidtime as bidtime - interval '1' second) " +
"with("
+ "'connector' = 'filesystem',"
+ "'path' = 'file:///"+filePath+"',"
+ "'format' = 'csv'"
+ ")");
Table table = tabEnv.sqlQuery("" +
"select window_start,window_end,sum(price) as sum_price " +
" from table(" +
" hop(table Bid, DESCRIPTOR(bidtime), interval '5' MINUTES, interval '10' MINUTES))" +
" group by window_start,window_end");
tabEnv.toAppendStream(table, Row.class).print();
env.execute();
}
}
结果:
+I[2021-04-15T08:00,2021-04-15T08:10,11.00]+I[2021-04-15T08:05,2021-04-15T08:15,15.00]+I[2021-04-15T08:10,2021-04-15T08:20,10.00]+I[2021-04-15T08:15,2021-04-15T08:25,6.00]
2.3 会话窗口(Session Windows,暂不支持 Window TVF)
Flink1.13 版本中不支持 Window TVF,预计在 flink1.14 版本中支持;
需求:用老版本实现,定义 Session Gap 为3分钟,一个窗口最后一条数据之后的三分钟内没有新数据出现,则该窗口关闭,再之后的数据被归为下一个窗口
packagecn.itcast.day02.Window;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;/**
* @author lql
* @time 2024-03-16 19:37:20
* @description TODO
*/publicclassGroupWindowsSqlSessionExample{publicstaticvoidmain(String[] args){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);StreamTableEnvironment tEnv =StreamTableEnvironment.create(env);String filePath =GroupWindowsSqlSessionExample.class.getClassLoader().getResource("bid.csv").getPath();// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t
tEnv.executeSql("create table Bid("+"bidtime TIMESTAMP(3),"+"price DECIMAL(10, 2), "+"item string,"+"watermark for bidtime as bidtime - interval '1' second) "+"with("+"'connector' = 'filesystem',"+"'path' = 'file:///"+filePath+"',"+"'format' = 'csv'"+")");
tEnv.sqlQuery("SELECT "+" SESSION_START(bidtime, INTERVAL '3' minute) as wStart, "+" SESSION_END(bidtime, INTERVAL '3' minute) as wEnd, "+" SUM(price) sum_price "+"FROM Bid "+"GROUP BY SESSION(bidtime, INTERVAL '3' minute)").execute().print();}}
结果:
+----+-------------------------+-------------------------+-----------+| op | wStart | wEnd | sum_price |+----+-------------------------+-------------------------+-----------+|+I|2021-04-1508:05:00.000|2021-04-1508:16:00.000|15.00||+I|2021-04-1508:17:00.000|2021-04-1508:20:00.000|6.00|+----+-------------------------+-------------------------+-----------+2 rows in set
2.4 累计窗口(Comulate Windows flink1.13 版本新特性)
语法:
CUMULATE(TABLE data,DESCRIPTOR(timecol), step, size)TABLE 表名称
DESCRIPTOR 表中作为开窗的时间字段名称
step 大窗口的分割长度
size 指定最大的那个时间窗口
需求:10 分钟作为窗口,统计每隔两分钟的累计数(类似于
中视频计划
计算播放量完美累计曲线!)
packagecn.itcast.day02.Window;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.table.api.bridge.java.StreamTableEnvironment;/**
* @author lql
* @time 2024-03-16 19:45:02
* @description TODO
*/publicclassGroupWindowsSqlCumulateExample{publicstaticvoidmain(String[] args){StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);StreamTableEnvironment tEnv =StreamTableEnvironment.create(env);String filePath =GroupWindowsSqlCumulateExample.class.getClassLoader().getResource("bid.csv").getPath();// 作为事件时间的字段必须是 timestamp 类型, 所以根据 long 类型的 ts 计算出来一个 t
tEnv.executeSql("create table Bid("+"bidtime TIMESTAMP(3),"+"price DECIMAL(10, 2), "+"item string,"+"watermark for bidtime as bidtime - interval '1' second) "+"with("+"'connector' = 'filesystem',"+"'path' = 'file:///"+filePath+"',"+"'format' = 'csv'"+")");
tEnv.sqlQuery("SELECT window_start, window_end, SUM(price) as sum_price\n"+" FROM TABLE(\n"+" CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))\n"+" GROUP BY window_start, window_end").execute().print();}}
结果:
+----+-------------------------+-------------------------+-----------+| op | window_start | window_end | sum_price |+----+-------------------------+-------------------------+-----------+|+I|2021-04-1508:00:00.000|2021-04-1508:06:00.000|4.00||+I|2021-04-1508:00:00.000|2021-04-1508:08:00.000|6.00||+I|2021-04-1508:00:00.000|2021-04-1508:10:00.000|11.00||+I|2021-04-1508:10:00.000|2021-04-1508:12:00.000|3.00||+I|2021-04-1508:10:00.000|2021-04-1508:14:00.000|4.00||+I|2021-04-1508:10:00.000|2021-04-1508:16:00.000|4.00||+I|2021-04-1508:10:00.000|2021-04-1508:18:00.000|10.00||+I|2021-04-1508:10:00.000|2021-04-1508:20:00.000|10.00|+----+-------------------------+-------------------------+-----------+8 rows in set
3. 多维数据分析
3.1 GROUPING SETS
当前效果:
SELECT window_start,
window_end,
userId,
category,sum(price) as sum_price
FROMTABLE(TUMBLE(TABLE orders,DESCRIPTOR(t),INTERVAL'5'SECONDS))GROUPBY window_start, window_end,GROUPINGSETS((userId, category),(userId),())
以前效果:
// ()
SELECT window_start, window_end, 'NULL' as userId, 'NULL' as category, sum(price) as sum_price
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end
UNION ALL
// (userId)
SELECT window_start, window_end, userId as userId, 'NULL' as category, sum(price) as sum_price
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end, userId
UNION ALL
// (userId, category)
SELECT window_start, window_end,userId, category, sum(price) as sum_price
FROM TABLE(
TUMBLE(TABLE orders, DESCRIPTOR(t), INTERVAL '5' SECONDS))
GROUP BY window_start, window_end, userId, category
3.2 ROLLUP
速记:从右往左,全面到稀缺!
GROUP BY ROLLUP(a, b, c)
--等价于以下语句。
GROUPING SETS((a,b,c),(a,b),(a), ())
GROUP BY ROLLUP ( a, (b, c), d )
--等价于以下语句。
GROUPING SETS (
( a, b, c, d ),
( a, b, c ),
( a ),
( )
)
3.3 CUBE
速记:排列组合
GROUPBYCUBE(a, b, c)--等价于以下语句。
GROUPINGSETS((a,b,c),(a,b),(a,c),(b,c),(a),(b),(c),())GROUPBYCUBE((a, b),(c, d))--等价于以下语句。
GROUPINGSETS(( a, b, c, d ),( a, b ),( c, d ),())// CUBE 和 GROUPING SETS 组合,相当于排列组合基础上加上元素GROUPBY a,CUBE(b, c),GROUPINGSETS((d),(e))--等价于以下语句。
GROUPBYGROUPINGSETS((a, b, c, d),(a, b, c, e),(a, b, d),(a, b, e),(a, c, d),(a, c, e),(a, d),(a, e))
3.4 GROUPING 和 GROUPING_ID
背景:GROUPING SETS 结果中使用 NULL 充当占位符,导致无法区分占位符 NULL 与数据中真正的 NULL。
3.4.1 GROUPING 函数
- 接受一个列名作为参数
- 返回0,意味着 无NULL / 来自输入数据(原本存在的空值)
- 返回1,意味着 NULL 是 GROUPING SETS 的占位符。
实例:
SELECT window_start, window_end, userId, category,
GROUPING(category)as categoryFlag,sum(price)as sum_price,IF(GROUPING(category)=0, category,'ALL')as`all`FROMTABLE(
TUMBLE(TABLE orders, DESCRIPTOR(t),INTERVAL'5' SECONDS))GROUPBY window_start, window_end, GROUPING SETS((userId, category),(userId))
结果:
window_startwindow_enduserIdcategorysum_priceflagall2021-05-23 05:16:35.0002021-05-23 05:16:40.000NULLNULL10.11ALL2021-05-23 05:16:40.0002021-05-23 05:16:45.000NULLNULL96.61ALL2021-05-23 05:16:45.0002021-05-23 05:16:50.000NULLNULL15.61ALL2021-05-23 05:16:35.0002021-05-23 05:16:40.000user_001电脑10.10电脑2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_001手机14.10手机2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_002手机82.50手机2021-05-23 05:16:45.0002021-05-23 05:16:50.000user_001电脑15.60电脑2021-05-23 05:16:35.0002021-05-23 05:16:40.000user_001NULL10.11ALL2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_001NULL14.11ALL2021-05-23 05:16:40.0002021-05-23 05:16:45.000user_002NULL82.51ALL2021-05-23 05:16:45.0002021-05-23 05:16:50.000user_001NULL15.61ALL
3.4.2 GROUPING_ID(兼容 Hive)
MaxCompute还提供了无参数的 GROUPING__ID 函数,用于兼容Hive查询。
结果是将参数列的GROUPING结果按照BitMap的方式组成整数
MaxCompute 和 Hive 2.3.0 及以上版本兼容该函数,在Hive 2.3.0以下版本中该函数输出不一致,因此并不推荐使用此函数。
SELECT
a,b,c ,COUNT(*),GROUPING_IDFROMVALUES(1,2,3) as t(a,b,c)GROUPBY a, b, c GROUPINGSETS((a,b,c),(a));GROUPING_ID既无输入参数,也无括号。此表达方式在 MaxCompute 中等价于 GROUPING_ID(a,b,c),参数与 GROUPBY 的顺序一致。
3.5 Window Top-N
模板:计算每10分钟营业时间窗内销售额最高的前3名供应商。
SELECT *
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
FROM (
SELECT window_start, window_end, supplier_id, SUM(price) as price, COUNT(*) as cnt
FROM TABLE(
TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))
GROUP BY window_start, window_end, supplier_id
)
) WHERE rownum <= 3;
思路:先算滚动时间 10 分钟,按照窗口时间,id 分组求和,再排序函数取前三。
4. Over Windows
4.1 ROWS OVER WINDOW
按照行进行划分:BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW
注解:如果不加 rowCount 相当于从以前到现在,加上 rowCount 相当于从前 n 行到现在!
数据源:
itemIDitemTypeonSellTime****priceITEM001Electronic2021-05-11 10:01:00.00020ITEM002Electronic2021-05-11 10:02:00.00050ITEM003Electronic2021-05-11 10:03:00.00030ITEM004Electronic2021-05-11 10:03:00.00060ITEM005Electronic2021-05-11 10:05:00.00040ITEM006Electronic2021-05-11 10:06:00.00020ITEM007Electronic2021-05-11 10:07:00.00070ITEM008Clothes2021-05-11 10:08:00.00020ITEM009Clothes2021-05-11 10:09:00.00040ITEM010Clothes2021-05-11 10:11:00.00030
示例:按照 itemType 分组,onSellTime 升序,求从以前到现在总金额
select
itemID,
itemType,
onSellTime,
price,
sum(price) over w as sumPrice
from tmall_item
WINDOW w AS (
PARTITION BY itemType
ORDER BY onSellTime
ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW
)
结果:
itemIDitemTypeonSellTimepricesumPriceITEM001Electronic2021-05-11 10:01:00.00020.020.0ITEM002Electronic2021-05-11 10:02:00.00050.070.0ITEM003Electronic2021-05-11 10:03:00.00030.0100.0ITEM004Electronic2021-05-11 10:03:00.00060.0160.0ITEM005Electronic2021-05-11 10:05:00.00040.0200.0ITEM006Electronic2021-05-11 10:06:00.00020.0220.0ITEM007Electronic2021-05-11 10:07:00.00070.0290.0ITEM008Clothes2021-05-11 10:08:00.00020.020.0ITEM009Clothes2021-05-11 10:09:00.00040.060.0ITEM010Clothes2021-05-11 10:11:00.00030.090.0
4.2 RANGE OVER WINDOW
按照时间进行划分:ROWS BETWEEN ( UNBOUNDED | rowCount ) preceding AND CURRENT ROW
例子:实时统计两分钟内金额
select
itemID,
itemType,
onSellTime,
price,sum(price) over w as sumPrice
from tmall_item
WINDOW w AS(PARTITIONBY itemType
ORDERBY onSellTime
RANGEBETWEENINTERVAL'2'MINUTE preceding ANDCURRENTROW)
5. TableAPI 窗口的定义
5.1.1 滚动窗口
Tumble 类方法:
- over:定义窗口长度
- on:用来分组(按时间间隔)或者排序(按行数)的时间字段
- as:别名,必须出现在后面的groupBy中
例子:每隔5秒钟统计一次每个商品类型的销售总额
publicclassGroupWindowsTableApiTumbleExample{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);SingleOutputStreamOperator<OrderInfo> dataStream = env
.fromElements(newOrderInfo("电脑",1000L,100D),newOrderInfo("手机",2000L,200D),newOrderInfo("电脑",3000L,300D),newOrderInfo("手机",4000L,400D),newOrderInfo("手机",5000L,500D),newOrderInfo("电脑",6000L,600D)).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp)-> element.getTimestamp()));StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);Table table = tableEnv
.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));
table
.window(Tumble.over(lit(5).second()).on($("timestamp")).as("w"))// 定义滚动窗口并给窗口起一个别名.groupBy($("category"),
$("w"))// 窗口必须出现的分组字段中.select($("category"),
$("w").start().as("window_start"),
$("w").end().as("window_end"),
$("money").sum().as("total_money")).execute().print();
env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublicstaticclassOrderInfo{privateString category;privateLong timestamp;privateDouble money;}}
5.1.2 滑动窗口
Slide 类方法:
- over:定义窗口长度
- every:定义滑动步长
- on:用来分组(按时间间隔)或者排序(按行数)的时间字段
- as:别名,必须出现在后面的groupBy中
例子:每隔5秒钟统计过去10秒钟每个商品类型的销售总额
publicclassGroupWindowsTableApiTumbleExample{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);SingleOutputStreamOperator<OrderInfo> dataStream = env
.fromElements(newOrderInfo("电脑",1000L,100D),newOrderInfo("手机",2000L,200D),newOrderInfo("电脑",3000L,300D),newOrderInfo("手机",4000L,400D),newOrderInfo("手机",5000L,500D),newOrderInfo("电脑",6000L,600D)).assignTimestampsAndWatermarks(WatermarkStrategy.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((element, recordTimestamp)-> element.getTimestamp()));StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);Table table = tableEnv
.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));
table
.window(Slide.over(lit(10).second()).every(lit(5).second()).on($("timestamp")).as("w"))// 定义滚动窗口并给窗口起一个别名.groupBy($("category"),
$("w"))// 窗口必须出现的分组字段中.select($("category"),
$("w").start().as("window_start"),
$("w").end().as("window_end"),
$("money").sum().as("total_money")).execute().print();
env.execute();}@Data@AllArgsConstructor@NoArgsConstructorpublicstaticclassOrderInfo{privateString category;privateLong timestamp;privateDouble money;}}
5.1.3 会话窗口
Session 类方法:
- withGap:会话时间间隔
- on:用来分组(按时间间隔)或者排序(按行数)的时间字段
- as:别名,必须出现在后面的groupBy中
例子:两次的时间间隔超过6秒的基础上,没有新的订单事件这个窗口就会关闭,然后处理这个窗口区间内所产生的订单数据计算
public class GroupWindowsTableApiTumbleExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<OrderInfo> dataStream = env
.fromElements(
new OrderInfo("电脑", 1000L, 100D),
new OrderInfo("手机", 2000L, 200D),
new OrderInfo("电脑", 3000L, 300D),
new OrderInfo("手机", 4000L, 400D),
new OrderInfo("手机", 5000L, 500D),
new OrderInfo("电脑", 6000L, 600D))
.assignTimestampsAndWatermarks(
WatermarkStrategy
.<OrderInfo>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv
.fromDataStream(dataStream, $("category"), $("timestamp").rowtime(), $("money"));
table
.window(Session.withGap(lit(6).second())
.on($("timestamp"))
.as("w")) // 定义滚动窗口并给窗口起一个别名
.groupBy($("category"),
$("w")) // 窗口必须出现的分组字段中
.select($("category"),
$("w").start().as("window_start"),
$("w").end().as("window_end"),
$("money").sum().as("total_money"))
.execute()
.print();
env.execute();
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class OrderInfo {
private String category;
private Long timestamp;
private Double money;
}
}
版权归原作者 卡林神不是猫 所有, 如有侵权,请联系我们删除。