0


FlinkSQL学习笔记(四)常见表查询详解与用户自定义函数

写在前面

1、本篇只列举一些特殊的查询方式,掌握这些查询语句的基本使用概念即可,实际用到的时候进行查询即可。
参考目录:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/overview/
2、通过对这些例子的编写,感觉Flink相比hive中常见的查询方式,更多地从时间角度进行了更新迭代,需要注意Lookup Join和Temporal Joins区别
3、自定义函数,大致了解就行,后续用到直接套模板

1、Windowing table-valued functions (Windowing TVFs)

即:窗口表值函数
参考连接:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sql/queries/window-tvf/

注:代码测试的时候使用的版本为1.14,这里不支持窗口函数的独立使用,具体使用见窗口聚合函数

以滑动窗口为例:

SELECT*FROMTABLE(
   TUMBLE(TABLE Bid, DESCRIPTOR(bidtime_ltz),INTERVAL'10' MINUTES));

error:
在这里插入图片描述

2、Window TVF Aggregation

即:窗口聚合

  1. 数据准备
StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);StreamTableEnvironment tableEnvironment =StreamTableEnvironment.create(env,EnvironmentSettings.inStreamingMode());DataStreamSource<String> s1 = env.socketTextStream("123.56.100.37",9999);SingleOutputStreamOperator<Bid> s2 = s1.map(s ->{String[] split = s.split(",");DateTimeFormatter df =DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");returnnewBid(LocalDateTime.parse(split[0],df),Double.valueOf(split[1]),split[2],split[3]);});//注:这里列声明的顺序和Bid一致,watermark才生效,测试过程中需注意
tableEnvironment.createTemporaryView("Bid",s2,Schema.newBuilder().column("item",DataTypes.STRING()).column("bidtime",DataTypes.TIMESTAMP(3)).column("price",DataTypes.DOUBLE()).column("supplier_id",DataTypes.STRING()).watermark("bidtime","bidtime - interval '1' second").build());
  1. hopping window aggregation 注:每分钟,计算最近5分钟的订单交易总额
SELECT window_start, window_end,SUM(price)AS sum_price
  FROMTABLE(
    HOP(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'1' MINUTES,INTERVAL'5' MINUTES))GROUPBY window_start, window_end;

在这里插入图片描述
其他:

TUMBLE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '10' MINUTES))

:每隔10分钟计算最近10分钟的price总和

 CUMULATE(TABLE Bid, DESCRIPTOR(bidtime), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))

:每隔10分钟,计算一个窗口里面每2分钟累积的price综合

  1. GROUPING SETS
SELECT window_start, window_end, supplier_id,SUM(price)as price
  FROMTABLE(
    TUMBLE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES))GROUPBY window_start, window_end, GROUPING SETS ((supplier_id),());

在这里插入图片描述

3、Window Top-N

语法要义:在 TVF上使用 row_number() over()

  1. Window Top-N follows after Window Aggregation 注:类似分组TopN,不过这里【window_start、window_end】就充当了组的定义
SELECT*FROM(SELECT window_start, 
                window_end, 
                supplier_id,SUM(price)as price,COUNT(*)as cnt,
        ROW_NUMBER()OVER(PARTITIONBY window_start, window_end ORDERBYSUM(price)DESC)as rownum
        FROMTABLE(
            TUMBLE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES))GROUPBY window_start, window_end, supplier_id
    )WHERE rownum <=3;

在这里插入图片描述

  1. Window Top-N follows after Windowing TVF 注:直接跟在TVF后面,详细查看TVF使用error的说明
SELECT*FROM(SELECT bidtime, price, item, supplier_id, window_start, window_end, ROW_NUMBER()OVER(PARTITIONBY window_start, window_end ORDERBY price DESC)as rownum
    FROMTABLE(
               TUMBLE(TABLE Bid, DESCRIPTOR(bidtime),INTERVAL'10' MINUTES)))WHERE rownum <=3;

在这里插入图片描述

4、Window Join

参考连接:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sql/queries/window-join/

  1. 测试环境构建核心环境
DataStreamSource<String> left1 = env.socketTextStream("123.56.100.37",9999);SingleOutputStreamOperator<DataTest> left2 = left1.map(s ->{String[] split = s.split(",");DateTimeFormatter df =DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");returnnewDataTest(LocalDateTime.parse(split[0],df),Integer.valueOf(split[1]),split[2]);});DataStreamSource<String> right1 = env.socketTextStream("123.56.100.37",9998);SingleOutputStreamOperator<DataTest> right2 = right1.map(s ->{String[] split = s.split(",");DateTimeFormatter df =DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm");returnnewDataTest(LocalDateTime.parse(split[0],df),Integer.valueOf(split[1]),split[2]);});

在这里插入图片描述

  1. INNER/LEFT/RIGHT/FULL OUTER 测试代码:
SELECT L.num as L_Num, 
       L.id as L_Id, 
       R.num as R_Num, 
       R.id as R_Id,COALESCE(L.window_start, R.window_start)as window_start,COALESCE(L.window_end, R.window_end)as window_end
   FROM(SELECT*FROMTABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time),INTERVAL'5' MINUTES))) L
   FULLJOIN(SELECT*FROMTABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time),INTERVAL'5' MINUTES))) R
   ON L.num = R.num AND L.window_start = R.window_start AND L.window_end = R.window_end;

测试结果:
在这里插入图片描述

  1. SEMI JOIN 本质上是通过select ... in (...sql...)实现的 测试代码:
SELECT num,id,row_time,window_start,window_end
  FROM(SELECT*FROMTABLE(TUMBLE(TABLE LeftTable, DESCRIPTOR(row_time),INTERVAL'5' MINUTES))) L 
  WHERE L.num IN(SELECT num FROM(SELECT*FROMTABLE(TUMBLE(TABLE RightTable, DESCRIPTOR(row_time),INTERVAL'5' MINUTES))) R 
 WHERE L.window_start = R.window_start AND L.window_end = R.window_end);

测试结果:
在这里插入图片描述

error:

参考连接:

5、Join

常规 join的实现逻辑所在类:

org.apache.flink.table.runtime.operators.join.stream.StreamingJoinOperator

常规 join,flink底层是会对两个参与 join的输入流中的数据进行状态存储的;所以,随着时间的推进,状态中的数据量会持续膨胀,可能会导致过于庞大,从而降低系统的整体效
率;
可以如何去缓解:自己根据自己业务系统数据特性(估算能产生关联的左表数据和右表数据到达的最
大时间差),根据这个最大时间差,去设置 ttl时长

配置参考链接:https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/config/
在这里插入图片描述

5.1、Regular Joins

执行语句:

SELECT*FROM LeftTable LEFTJOIN RightTable ON LeftTable.id = RightTable.id;

执行结果:
在这里插入图片描述

5.2、Lookup Join

在这里插入图片描述
查找联接通常用于使用从外部系统查询的数据来扩充表。联接要求一个表具有处理时间属性,另一个表由查找源连接器提供支持。查找联接使用上述处理时临时联接语法,以及由查找源连接器支持的正确表。

lookup join

为了提高性能,lookup的连接器会将查询过的维表数据进行缓存(默认未开启此机制),可以通过参数开启,比如 jdbc-connector的 lookup模式下,有如下参数:

  • lookup.cache.max-rows = (none) 未开启
  • lookup.cache.ttl = (none) ttl缓存清除的时长

执行语句:

  1. 创建SQL表,创建 lookup维表(jdbc connector表)
CREATETEMPORARYTABLE Customers (
  id INT,
  name STRING,
  country STRING,
  zip STRING
)WITH('connector'='jdbc','url'='jdbc:mysql://123.56.100.37:3306/flinktest','table-name'='customers','username'='root','password'='123456')
  1. 执行lookup join
SELECT o.order_id, o.total, c.country, c.zip
  FROM Orders AS o
  JOIN Customers FOR SYSTEM_TIME ASOF o.proc_time AS c
    ON o.customer_id = c.id

执行结果:
在这里插入图片描述

5.3、Interval Joins

举例说明:比如广告曝光流广告观看事件流的 join
对于这种join,目前并没有实际遇到过,因此这里只给出相关限制条件,具体用到后可以参考官网。

  • ltime = rtime
  • ltime >= rtime AND ltime < rtime + INTERVAL ‘10’ MINUTE
  • ltime BETWEEN rtime - INTERVAL ‘10’ SECOND AND rtime + INTERVAL ‘5’ SECOND

5.4、Temporal Joins

要义: 左表的数据永远去关联右表数据的对应时间上的版本

注:Flink会根据

版本表

的记录自动推测在对于位置上的时间,可以推测成功即关联上;这里以

Event Time Temporal Join

举例说明

  1. 创建order表
tableEnvironment.createTemporaryView("orders",s2,Schema.newBuilder().column("order_id",DataTypes.STRING()).column("price",DataTypes.DECIMAL(32,2)).column("currency",DataTypes.STRING()).column("order_time",DataTypes.TIMESTAMP(3)).watermark("order_time","order_time - INTERVAL '0' SECOND").build());
  1. 创建汇率表,即版本表
CREATETABLE currency_rates (
    currency STRING,
    conversion_rate DECIMAL(32,2),
    update_time TIMESTAMP(3),
    WATERMARK FOR update_time AS update_time -INTERVAL'0'SECOND,PRIMARYKEY(currency)NOT ENFORCED
)WITH('connector'='mysql-cdc','hostname'='123.56.100.37','port'='3306','username'='root','password'='123456','database-name'='flinktest','table-name'='currency_rates','server-time-zone'='Asia/Shanghai');
  1. join语句
SELECT order_id,    
       price,
       orders.currency,
       conversion_rate,
       order_time,
       update_time
  FROM orders
  LEFTJOIN currency_rates FOR SYSTEM_TIME ASOF orders.order_time
    ON orders.currency = currency_rates.currency
  1. 运行测试 12:10分的order来的时候,currency_rates的汇率为1.20,时间戳位12:12,因此order对于12:10的记录无法找到对应汇率值;12:14分的order来的时候,currency_rates汇率已经改为1.22,时间戳更新为12:16,因此推测,12:14分order的汇率应该还是1.20在这里插入图片描述

5.5、Array Expansion(CROSS JOIN)

为给定数组中的每个元素返回一个新行

(数组的行转列)
  1. 测试数据
Tabletable= tableEnvironment.fromValues(DataTypes.ROW(
        DataTypes.FIELD("id", DataTypes.INT()),
        DataTypes.FIELD("name", DataTypes.STRING()),
        DataTypes.FIELD("phone_numbers", DataTypes.ARRAY(DataTypes.STRING()))),Row.of(1,"zs", Expressions.array("137","138","125")),Row.of(2,"ls", Expressions.array("159","126","199")));
        
tableEnvironment.createTemporaryView("phone_table",table);
  1. SQL语句
SELECT id,name,phone_numbers,phone_number 
  from phone_table 
 CROSSJOIN UNNEST(phone_numbers)AS t(phone_number)

测试结果:
在这里插入图片描述

6、Over Aggregation

row_number( ) over ( )
flinksql中,over聚合时,指定聚合数据区间有两种方式

  • 方式 1,带时间设定区间RANGE BETWEEN INTERVAL '30' MINUTE PRECEDING AND CURRENT ROW
  • 方式 2,按行设定区间ROWS BETWEEN 10 PRECEDING AND CURRENT ROW

over函数类似在hive中的使用,这里不列举了,给出一个简单的SQL语法格式,详细参考官网

SELECT
  agg_func(agg_col)OVER([PARTITIONBY col1[, col2,...]]ORDERBY time_col
    range_definition),...FROM...

7、User-defined Functions

7.1、Scalar Functions

标量函数,特点:每次只接收一行数据,输出结果也是1行1列

  1. 继承ScalarFunction,实现eval方法
publicstaticclassMyLowerextendsScalarFunction{publicStringeval(String input){return  input.toLowerCase();}}
  1. 注册函数
tableEnvironment.createTemporaryFunction("myLower",MyLower.class);
  1. 执行SQL语句
select*,myLower(currency)as myLower  from currency_rates
  1. 执行结果在这里插入图片描述

7.2、Aggregate Functions

聚合函数,特点:对输入的数据行(一组)进行持续的聚合,最终对每组数据输出一行(多列)结果

  1. 自定义累加器
publicstaticclassMyAccumulator{publicint count;publicint  sum;}
  1. 继承AggregateFunction,至少实现下面三个方法
  • createAccumulator()
  • accumulate()
  • getValue()
publicstaticclassMyAvgextendsAggregateFunction<Double,MyAccumulator>{// Creates and init the Accumulator for this (table)aggregate function.@OverridepublicMyAccumulatorcreateAccumulator(){MyAccumulatorMyAccumulator=newMyAccumulator();MyAccumulator.count =0;MyAccumulator.sum =0;returnMyAccumulator;}// rocesses the input values and update the provided accumulator instancepublicvoidaccumulate(MyAccumulator acc,Double rate){
        acc.sum += rate;
        acc.count +=1;}// Retracts the input values from the accumulator instance. publicvoidretract(MyAccumulator acc,Double rate){
        acc.sum -= rate;
        acc.count -=1;}//Called every time when an aggregation result should be materialized.@OverridepublicDoublegetValue(MyAccumulator accumulator){return accumulator.sum/accumulator.count;}}
  1. 执行SQL语句 注:这里用Flink-CDC提示,必须实现retract函数,因此上面加上了;实际证明这个函数可以帮助我们记录group by结果的变化情况
select place,MyAvg(conversion_rate)as MyAvg  from currency_rates groupby place
  1. 执行结果在这里插入图片描述

7.3、Table Functions

表值函数,特点:运行时每接收一行数据(一个或多个字段),能产出多行、多列的结果;如:explode(), unnest()

注:其实这种方式就相当于产出一张表,Flink中对于数组类型的行转列可以参考

5.5、Array Expansion
  1. 自定义表值函数,分割字符串;并注册函数
@FunctionHint(output =@DataTypeHint("ROW<phoneNumber STRING, phoneLength INT>"))public static class MySplitFunction extends TableFunction<Row> {

    public void eval(String str) {
        for(String phone : str.split(",")) {
            // use collect(...) to emit a row
            collect(Row.of(phone, phone.length()));
        }
    }
}
  1. 编写SQL语句
SELECT id,name,phone_numbers,phone_number,phone_length
  FROM phone_table 
  LEFTJOIN LATERAL TABLE(MySplitFunction(phone_numbers))AS T(phone_number, phone_length)ONTRUE

运行结果:
在这里插入图片描述

7.4、Table Aggregate Functions

表值聚合函数,特点:对输入的数据行(一组)进行持续的聚合,最终对每组数据输出一行或多行(多列)结果,

目前不支持SQL语法,只能通过tableAPI实现

下面以

分组top2

为例说明实现过程,相比传统的分组Top2,这种方式可以带出更多的字段

  1. 创建测试数据
Tabletable= tableEnvironment.fromValues(DataTypes.ROW(
        DataTypes.FIELD("id", DataTypes.INT()),
        DataTypes.FIELD("gender", DataTypes.STRING()),
        DataTypes.FIELD("score", DataTypes.DOUBLE())),Row.of(1,"male","98"),Row.of(2,"male","76"),Row.of(3,"male","99"),Row.of(4,"female","44"),Row.of(5,"female","76"),Row.of(6,"female","86"));
tableEnvironment.createTemporaryView("stu_score",table);
  1. 自定义Top2函数,即表值聚合函数
publicstaticclassTop2Accum{//emitValue()根据Top2Accum数据提交,因此如果要整行数据,Top2Accum需要定义为Row类型public@DataTypeHint("ROW<id INT, gender STRING, score DOUBLE>")Row first;public@DataTypeHint("ROW<id INT, gender STRING, score DOUBLE>")Row second;}//这里groupBy的gender会一直带出来,输出的时候也会检查字段是否重复,需要将和groupBy相关的字段重命名@FunctionHint(input =@DataTypeHint("ROW<id INT, gender STRING, score DOUBLE>"),output =@DataTypeHint("ROW<id INT, rgender STRING, score DOUBLE,rank INT>"))publicstaticclassTop2extendsTableAggregateFunction<Row,Top2Accum>{@OverridepublicTop2AccumcreateAccumulator(){Top2Accum top2Accum =newTop2Accum();
        top2Accum.first  =null;
        top2Accum.second =null;return top2Accum;}//这些需要封装一整个Row给Top2Accum,因此传入Rowpublicvoidaccumulate(Top2Accum acc,Row row){Double score =(Double) row.getField(2);if(acc.first ==null||score >(double)acc.first.getField(2)){
            acc.second = acc.first;
            acc.first = row;}elseif(acc.second ==null|| score >(double)acc.second.getField(2)){
            acc.second = row;}}publicvoidmerge(Top2Accum acc,Iterable<Top2Accum> iterable){for(Top2Accum otherAcc : iterable){accumulate(acc, otherAcc.first);accumulate(acc, otherAcc.second);}}publicvoidemitValue(Top2Accum acc,Collector<Row> out){// emit the value and rankif(acc.first !=null){
            out.collect(Row.of(acc.first.getField(0),acc.first.getField(1),acc.first.getField(2),1));}if(acc.second !=null){
            out.collect(Row.of(acc.second.getField(0),acc.second.getField(1),acc.second.getField(2),2));}}}
  1. 通过TableAPI编程
table.groupBy($("gender")).flatAggregate(call(Top2.class,row($("id"), $("gender"), $("score")))).select($("id"),$("rgender"), $("score"),$("rank")).execute().print();
  1. 测试结果在这里插入图片描述
标签: 学习 笔记 flink

本文转载自: https://blog.csdn.net/weixin_37172178/article/details/136724872
版权归原作者 星星点灯1966 所有, 如有侵权,请联系我们删除。

“FlinkSQL学习笔记(四)常见表查询详解与用户自定义函数”的评论:

还没有评论