0


Flink中Table API和SQL(四)

11.6 联结(Join)查询

按照数据库理论,关系型表的设计往往至少需要满足第三范式(3NF),表中的列都直接依赖于主键,这样就可以避免数据冗余和更新异常。例如商品的订单信息,我们会保存在一个 “订单表”中,而这个表中只有商品 ID,详情则需要到“商品表”按照 ID 去查询;这样的好 处是当商品信息发生变化时,只要更新商品表即可,而不需要在订单表中对所有这个商品的所 有订单进行修改。不过这样一来,我们就无法从一个单独的表中提取所有想要的数据了。

在标准 SQL 中,可以将多个表连接合并起来,从中查询出想要的信息;这种操作就是表 的联结(Join)。在 Flink SQL 中,同样支持各种灵活的联结(Join)查询,操作的对象是动态表

在流处理中,动态表的 Join 对应着两条数据流的 Join 操作。与上一节的聚合查询类似,Flink SQL 中的联结查询大体上也可以分为两类:SQL 原生的联结查询方式,和流处理中特有的联结查询

11.6.1 常规联结查询

常规联结(Regular Join)是 SQL 中原生定义的 Join 方式,是最通用的一类联结操作。它 的具体语法与标准 SQL 的联结完全相同,通过关键字 JOIN 来联结两个表,后面用关键字** ON来指明联结条件**。按照习惯,我们一般以“左侧”和“右侧”来区分联结操作的两个表。

在两个动态表的联结中,任何一侧表的插入(INSERT)或更改(UPDATE)操作都会让 联结的结果表发生改变。例如,如果左侧有新数据到来,那么它会与右侧表中所有之前的数据 进行联结合并,右侧表之后到来的新数据也会与这条数据连接合并。所以,常规联结查询一般 是更新(Update)查询。

与标准 SQL 一致,Flink SQL 的常规联结也可以分为内联结(INNER JOI N)外联结 (OUTER JOIN),区别在于结果中是否包含不符合联结条件的行。目前仅支持“等值条件” 作为联结条件,也就是关键字 ON 后面必须是判断两表中字段相等的逻辑表达式。

1. 等值内联结(INNER Equi-JOIN)

内联结用 INNER JOIN 来定义,会返回两表中符合联接条件的所有行的组合,也就是所谓的笛卡尔积(Cartesian product)。目前仅支持等值联结条件。

例如之前提到的“订单表”(定义为 Order)和“商品表”(定义为 Product)的联结查询, 就可以用以下 SQL 实现:

SELECT * 
FROM Order 
INNER JOIN Product 
ON Order.product_id = Product.id 

这里是一个内联结,联结条件是订单数据的 product_id 和商品数据的 id 相等。由于订单 表中出现的商品id一定会在商品表中出现,因此这样得到的联结结果表,就包含了订单表Order中所有订单数据对应的详细信息。

2. 等值外联结(OUTER Equi-JOIN)

与内联结类似,外联结也会返回符合联结条件的所有行的笛卡尔积;另外,还可以将某一侧表中找不到任何匹配的行也单独返回。Flink SQL 支持左外(LEFT JOIN)、右外(RIGHT JOIN) 和全外(FULL OUTER JOIN),分别表示会将左侧表、右侧表以及双侧表中没有任何匹配的行返回。例如,订单表中未必包含了商品表中的所有 ID,为了将哪些没有任何订单的商品信息也查询出来,我们就可以使用右外联结(RIGHT JOIN)。当然,外联结查询目前也仅支持等值联结条件。具体用法如下:

SELECT * 
FROM Order 
LEFT JOIN Product 
ON Order.product_id = Product.id 
 
SELECT * 
FROM Order 
RIGHT JOIN Product 
ON Order.product_id = Product.id 
 
SELECT * 
FROM Order 
FULL OUTER JOIN Product 
ON Order.product_id = Product.id 

11.6.2 间隔联结查询

在 8.3 节中,我们曾经学习过 DataStream API 中的双流 Join,包括窗口联结(window join) 和间隔联结(interval join)。两条流的 Join 就对应着 SQL 中两个表的 Join,这是流处理中特有的联结方式。目前 Flink SQL 还不支持窗口联结,而间隔联结则已经实现。

间隔联结(Interval Join)返回的,同样是符合约束条件的两条中数据的笛卡尔积。只不过这里的“约束条件”除了常规的联结条件外还多了一个时间间隔的限制。具体语法有以下要点:

⚫ 两表的联结

间隔联结不需要用 JOIN 关键字,直接在 FROM 后将要联结的两表列出来就可以,用逗号分隔。这与标准 SQL 中的语法一致,表示一个“交叉联结”(Cross Join),会返回两表中所有行的笛卡尔积。

⚫ 联结条件

联结条件用 WHERE 子句来定义,用一个等值表达式描述。交叉联结之后再用 WHERE进行条件筛选,效果跟内联结 INNER JOIN ... ON ...非常类似。

⚫ 时间间隔限制

我们可以在 WHERE 子句中,联结条件后用 AND 追加一个时间间隔的限制条件;做法是 提取左右两侧表中的时间字段,然后用一个表达式来指明两者需要满足的间隔限制。

具体定义 方式有下面三种,这里分别用 ltime 和 rtime 表示左右表中的时间字段:

(1)ltime = rtime

(2)ltime >= rtime AND ltime < rtime + INTERVAL '10' MINUTE

(3)ltime BETWEEN rtime - INTERVAL '10' SECOND AND rtime + INTERVAL '5' SECOND

判断两者相等,这是最强的时间约束,要求两表中数据的时间必须完全一致才能匹配;一 般情况下,我们还是会放宽一些,给出一个间隔。间隔的定义可以用<,<=,>=,>这一类的关系不等式,也可以用 BETWEEN ... AND ...这样的表达式。

例如,我们现在除了订单表 Order 外,还有一个“发货表”Shipment,要求在收到订单后四个小时内发货。那么我们就可以用一个间隔联结查询,把所有订单与它对应的发货信息连接合并在一起返回。

SELECT * 
FROM Order o, Shipment s 
WHERE o.id = s.order_id 
AND o.order_time BETWEEN s.ship_time - INTERVAL '4' HOUR AND s.ship_time

在流处理中,间隔联结查询只支持具有时间属性的“仅追加”(Append-only)表。

那对于有更新操作的表,又怎么办呢?除了间隔联结之外,Flink SQL 还支持时间联结 (Temporal Join),这主要是针对“版本表”(versioned table)而言的。所谓版本表,就是记录 了数据随着时间推移版本变化的表,可以理解成一个“更新日志”(change log),它就是具有 时间属性、还会进行更新操作的表。当我们联结某个版本表时,并不是把当前的数据连接合并 起来就行了,而是希望能够根据数据发生的时间,找到当时的“版本”;这种根据更新时间提 取当时的值进行联结的操作,就叫作“时间联结”(Temporal Join)。

11.7 函数

在 SQL 中,我们可以把一些数据的转换操作包装起来,嵌入到 SQL 查询中统一调用,这 就是“函数”(functions)。 Flink 的 Table API 和 SQL 同样提供了函数的功能。两者在调用时略有不同:Table API 中 的函数是通过数据对象的方法调用来实现的;而 SQL 则是直接引用函数名称,传入数据作为参数。例如,要把一个字符串 str 转换成全大写的形式,Table API 的写法是调用 str 这个 String对象的 upperCase()方法:

str.upperCase(); 

而 SQL 中的写法就是直接引用 UPPER()函数,将 str 作为参数传入:

UPPER(str) 

由于 Table API 是内嵌在 Java 语言中的,很多方法需要在类中额外添加,因此扩展功能比 较麻烦,目前支持的函数比较少;而且 Table API 也不如 SQL 的通用性强,所以一般情况下较 少使用。下面我们主要介绍 Flink SQL 中函数的使用。 Flink SQL 中的函数可以分为两类:一类是 SQL 中内置的系统函数,直接通过函数名调用 就可以,能够实现一些常用的转换操作,比如之前我们用到的 COUNT()、CHAR_LENGTH()、UPPER()等等;而另一类函数则是用户自定义的函数(UDF),需要在表环境中注册才能使用。

11.7.1 系统函数

系统函数(System Functions)也叫内置函数(Built-in Functions),是在系统中预先实现好 的功能模块。我们可以通过固定的函数名直接调用,实现想要的转换操作。Flink SQL 提供了 大量的系统函数,几乎支持所有的标准 SQL 中的操作,这为我们使用 SQL 编写流处理程序提 供了极大的方便。 Flink SQL 中的系统函数又主要可以分为两大类:标量函数(Scalar Functions)和聚合函数(Aggregate Functions)。

1. 标量函数(Scalar Functions)

所谓的“标量”,是指只有数值大小、没有方向的量;所以标量函数指的就是只对输入数据做转换操作、返回一个值的函数。这里的输入数据对应在表中,一般就是一行数据中 1 个或 多个字段,因此这种操作有点像流处理转换算子中的 map。另外,对于一些没有输入参数、直 接可以得到唯一结果的函数,也属于标量函数。

标量函数是最常见、也最简单的一类系统函数,数量非常庞大,很多在标准 SQL 中也有 定义。所以我们这里只对一些常见类型列举部分函数,做一个简单概述,具体应用可以查看官 网的完整函数列表。

⚫ 比较函数(Comparison Functions)

比较函数其实就是一个比较表达式,用来判断两个值之间的关系,返回一个布尔类型的值。 这个比较表达式可以是用 、>、= 等符号连接两个值,也可以是用关键字定义的某种判断。 例如:

(1)value1 = value2 判断两个值相等;

(2)value1 <> value2 判断两个值不相等

(3)value IS NOT NULL 判断 value 不为空

⚫ 逻辑函数(Logical Functions)

逻辑函数就是一个逻辑表达式,也就是用与(AND)、或(OR)、非(NOT)将布尔类型 的值连接起来,也可以用判断语句(IS、IS NOT)进行真值判断;返回的还是一个布尔类型 的值。例如:

(1)boolean1 OR boolean2 布尔值 boolean1 与布尔值 boolean2 取逻辑或

(2)boolean IS FALSE 判断布尔值 boolean 是否为 false

(3)NOT boolean 布尔值 boolean 取逻辑非

⚫ 算术函数(Arithmetic Functions)

进行算术计算的函数,包括用算术符号连接的运算,和复杂的数学运算。例如:

(1)numeric1 + numeric2 两数相加

(2)POWER(numeric1, numeric2) 幂运算,取数 numeric1 的 numeric2 次方

(3)RAND() 返回(0.0, 1.0)区间内的一个 double 类型的伪随机数

⚫ 字符串函数(String Functions)

进行字符串处理的函数。例如:

(1)string1 || string2 两个字符串的连接

(2)UPPER(string) 将字符串 string 转为全部大写

(3)CHAR_LENGTH(string) 计算字符串 string 的长度

⚫ 时间函数(Temporal Functions)

进行与时间相关操作的函数。例如:

(1)DATE string 按格式"yyyy-MM-dd"解析字符串 string,返回类型为 SQL Date

(2)TIMESTAMP string 按格式"yyyy-MM-dd HH:mm:ss[.SSS]"解析,返回类型为 SQL timestamp

(3)CURRENT_TIME 返回本地时区的当前时间,类型为 SQL time(与 LOCALTIME等价)

(4)INTERVAL string range 返回一个时间间隔。string 表示数值;range 可以是 DAY,MINUTE,DAT TO HOUR 等单位,也可以是 YEAR TO MONTH 这样的复合单位。如“2 年10 个月”可以写成:INTERVAL '2-10' YEAR TO MONTH

**2. 聚合函数(Aggregate Functions) **

聚合函数是以表中多个行作为输入,提取字段进行聚合操作的函数,会将唯一的聚合值作为结果返回。聚合函数应用非常广泛,不论分组聚合、窗口聚合还是开窗(Over)聚合,对数据的聚合操作都可以用相同的函数来定义。

标准 SQL 中常见的聚合函数 Flink SQL 都是支持的,目前也在不断扩展,为流处理应用 提供更强大的功能。例如:

⚫ COUNT(*) 返回所有行的数量,统计个数

⚫ SUM([ ALL | DISTINCT ] expression) 对某个字段进行求和操作。默认情况 下省略了关键字 ALL,表示对所有行求和;如果指定 DISTINCT,则会对数据进行去重,每个值只叠加一次。

⚫ RANK()** 返回当前值在一组值中的排名**

⚫ ROW_NUMBER() 对一组值排序后,返回当前值的行号。与 RANK()的 功能相似

其中,RANK()和 ROW_NUMBER()一般用在 OVER 窗口中,在之前 11.5.4 小节实现 Top N 的过程中起到了非常重要的作用。

11.7.2 自定义函数(UDF)

系统函数尽管庞大,也不可能涵盖所有的功能;如果有系统函数不支持的需求,我们就需要用自定义函数(User Defined Functions,UDF)来实现了。事实上,系统内置函数仍然在不断扩充,如果我们认为自己实现的自定义函数足够通用、应用非常广泛,也可以在项目跟踪工 具 JIRA 上向 Flink 开发团队提出“议题”(issue),请求将新的函数添加到系统函数中。 Flink 的 Table API 和 SQL 提供了多种自定义函数的接口,以抽象类的形式定义。

当前 UDF主要有以下几类:

⚫ 标量函数(Scalar Functions):将输入的标量值转换成一个新的标量值;

⚫ 表函数(Table Functions):将标量值转换成一个或多个新的行数据,也就是扩展成一个表;

⚫ 聚合函数(Aggregate Functions):将多行数据里的标量值转换成一个新的标量值;

⚫ 表聚合函数(Table Aggregate Functions):将多行数据里的标量值转换成一 个或多个新的行数据。

1. 整体调用流程

要想在代码中使用自定义的函数,我们需要首先自定义对应 UDF 抽象类的实现,并在表 环境中注册这个函数,然后就可以在 Table API 和 SQL 中调用了。

(1)注册函数

注册函数时需要调用表环境的 createTemporarySystemFunction()方法,传入注册的函数名 以及 UDF 类的 Class 对象:

// 注册函数 
tableEnv.createTemporarySystemFunction("MyFunction", MyFunction.class); 

我们自定义的 UDF 类叫作 MyFunction,它应该是上面四种 UDF 抽象类中某一个的具体 实现;在环境中将它注册为名叫 MyFunction 的函数。

这里 createTemporarySystemFunction()---全局方法的意思是创建了一个“临时系统函数”,所以MyFunction 函数名是全局的 , 可以当作系统函数来使用; 我们也可以用createTemporaryFunction()---局部方法,注册的函数就依赖于当前的数据库(database)和目录(catalog) 了,所以这就不是系统函数,而是“目录函数”(catalog function),它的完整名称应该包括所属的 database 和 catalog。

一般情况下,我们直接用 createTemporarySystemFunction()方法将 UDF 注册为系统函数就可以了。

(2)使用 Table API 调用函数

在 Table API 中,需要使用 call()方法来调用自定义函数:

tableEnv.from("MyTable").select(call("MyFunction", $("myField"))); 

这里 call()方法有两个参数,一个是注册好的函数名 MyFunction,另一个则是函数调用时本身的参数。这里我们定义 MyFunction 在调用时,需要传入的参数是 myField 字段。

此外,在 Table API 中也可以不注册函数,直接用“内联”(inline)的方式调用 UDF:

tableEnv.from("MyTable").select(call(SubstringFunction.class, $("myField"))); 

区别只是在于 call()方法第一个参数不再是注册好的函数名,而直接就是函数类的 Class对象了。

(3)在 SQL 中调用函数

当我们将函数注册为系统函数之后,在 SQL 中的调用就与内置系统函数完全一样了:

tableEnv.sqlQuery("SELECT MyFunction(myField) FROM MyTable"); 

2. 标量函数(Scalar Functions)

自定义标量函数可以把 0 个、 1 个或多个标量值转换成一个标量值,它对应的输入是一行数据中的字段,输出则是唯一的值。所以从输入和输出表中行数据的对应关系看,标量函数 是“一对一”的转换。

想要实现自定义的标量函数,我们需要自定义一个类来继承抽象类 ScalarFunction,并实现叫作 eval() 的求值方法。标量函数的行为就取决于求值方法的定义,它必须是公有的(public), 而且名字必须是 eval。求值方法 eval 可以重载多次,任何数据类型都可作为求值方法的参数 和返回值类型。

这里需要特别说明的是,ScalarFunction 抽象类中并没有定义 eval()方法,所以我们不能直 接在代码中重写(override);但 Table API 的框架底层又要求了求值方法必须名字为 eval()。这 是 Table API 和 SQL 目前还显得不够完善的地方,未来的版本应该会有所改进。

ScalarFunction 以及其它所有的 UDF 接口,都在 org.apache.flink.table.functions 中。

实例:简单的标量函数的代码实现(求HashCode):

package com.atguigu.chapter11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.UserDefinedFunction;

public class UdfTestScalarFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv=StreamTableEnvironment.create(env);

        //1.在创建表的DDL中直接定义时间属性
        String createDDL="CREATE TABLE clickTable (" +
                " `user` STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
                "WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createDDL);

        //2.注册自定义标量函数
        tableEnv.createTemporarySystemFunction("MyHash",MyHashFunction.class);

        //3.调用UDF进行查询转换
        Table resultTAble = tableEnv.sqlQuery("SELECT user, MyHash(user) from clickTable");

        //4.转换成流打印输出
        tableEnv.toDataStream(resultTAble).print();

        env.execute();
    }

    //自定义实现ScalarFunction
    public static class MyHashFunction extends ScalarFunction {
        public int eval(String str){
            return str.hashCode();
        }
    }
}

3. 表函数(Table Functions)

跟标量函数一样,表函数的输入参数也可以是 0 个、1 个或多个标量值;不同的是,它可 以返回任意多行数据。“多行数据”事实上就构成了一个表,所以“表函数”可以认为就是返回一个表的函数,这是一个“一对多”的转换关系。之前我们介绍过的窗口 TVF,本质上就是表函数。

类似地,要实现自定义的表函数,需要自定义类来继承抽象类** TableFunction**,内部必须要实现的也是一个名为 eval 的求值方法。与标量函数不同的是,TableFunction 类本身是有一 个泛型参数T 的,这就是表函数返回数据的类型;而 eval()方法没有返回类型,内部也没有 return语句,是通过调用 collect()方法来发送想要输出的行数据的。

我们使用表函数,可以对一行数据得到一个表,这和 Hive 中的 UDTF 非常相似。那对于 原先输入的整张表来说,又该得到什么呢?一个简单的想法是,就让输入表中的每一行,与它 转换得到的表进行联结(join),然后再拼成一个完整的大表,这就相当于对原来的表进行了 扩展。在 Hive 的 SQL 语法中,提供了“侧向视图”(lateral view,也叫横向视图)的功能,可 以将表中的一行数据拆分成多行;Flink SQL 也有类似的功能,是用 LATERAL TABLE 语法来 实现的。

在 SQL 中调用表函数,需要使用 LATERAL TABLE()来生成扩展的“侧向表”,然后与原始表进行联结(Join)。这里的 Join 操作可以是直接做交叉联结(cross join), 在 FROM 后用逗号分隔两个表就可以;也可以是以 ON TRUE 为条件的左联结(LEFT JOIN)。

下面是表函数的一个具体示例。我们实现了一个分隔字符串的函数 SplitFunction,可以将 一个字符串转换成(字符串,长度)的二元组。

// 注意这里的类型标注,输出是 Row 类型,Row 中包含两个字段:word 和 length。 
@FunctionHint(output = @DataTypeHint("ROW<word STRING, length INT>")) 
public static class SplitFunction extends TableFunction<Row> { 
 
 public void eval(String str) { 
 for (String s : str.split(" ")) { 
 // 使用 collect()方法发送一行数据 
 collect(Row.of(s, s.length())); 
 } 
 } 
} 
 
// 注册函数 
tableEnv.createTemporarySystemFunction("SplitFunction", SplitFunction.class);

// 在 SQL 里调用注册好的函数 
// 1. 交叉联结 
tableEnv.sqlQuery( 
 "SELECT myField, word, length " + 
 "FROM MyTable, LATERAL TABLE(SplitFunction(myField))"); 
// 2. 带 ON TRUE 条件的左联结 
tableEnv.sqlQuery( 
 "SELECT myField, word, length " + 
 "FROM MyTable " + 
 "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) ON TRUE"); 
 
// 重命名侧向表中的字段 
tableEnv.sqlQuery( 
 "SELECT myField, newWord, newLength " + 
 "FROM MyTable " + 
 "LEFT JOIN LATERAL TABLE(SplitFunction(myField)) AS T(newWord, newLength) ON 
TRUE"); 

4. 聚合函数(Aggregate Functions)

用户自定义聚合函数(User Defined AGGregate function,UDAGG)会把一行或多行数据 (也就是一个表)聚合成一个标量值。这是一个标准的“多对一”的转换。

聚合函数的概念我们之前已经接触过多次,如 SUM()、MAX()、MIN()、AVG()、COUNT()都是常见的系统内置聚合函数。而如果有些需求无法直接调用系统函数解决,我们就必须自定 义聚合函数来实现功能了。

自定义聚合函数需要继承抽象类 AggregateFunction。AggregateFunction 有两个泛型参数

T 表示聚合输出的结果类型,ACC 则表示聚合的中间状态类型

Flink SQL 中的聚合函数的工作原理如下:

(1)首先,它需要创建一个累加器(accumulator),用来存储聚合的中间结果。这与DataStream API 中的 AggregateFunction 非常类似,累加器就可以看作是一个聚合状态。调用createAccumulator()方法可以创建一个空的累加器。

(2)对于输入的每一行数据,都会调用 accumulate()方法来更新累加器,这是聚合的核心 过程。

(3)当所有的数据都处理完之后,通过调用 getValue()方法来计算并返回最终的结果。

所以,每个 AggregateFunction 都必须实现以下几个方法:

⚫ createAccumulator()

这是创建累加器的方法。没有输入参数,返回类型为累加器类型 ACC。

⚫ accumulate()

这是进行聚合计算的核心方法,每来一行数据都会调用。它的第一个参数是确定的,就是 当前的累加器,类型为 ACC,表示当前聚合的中间状态;后面的参数则是聚合函数调用时传 入的参数,可以有多个,类型也可以不同。这个方法主要是更新聚合状态,所以没有返回类型。 需要注意的是,accumulate()与之前的求值方法 eval()类似,也是底层架构要求的,必须为 public, 方法名必须为 accumulate,且无法直接 override、只能手动实现。

⚫ getValue()

这是得到最终返回结果的方法。输入参数是 ACC 类型的累加器,输出类型为 T。

在遇到复杂类型时,Flink 的类型推导可能会无法得到正确的结果。所以AggregateFunction也可以专门对累加器和返回结果的类型进行声明,这是通过 getAccumulatorType()和getResultType()两个方法来指定的。

除了上面的方法,还有几个方法是可选的。这些方法有些可以让查询更加高效,有些是在 某些特定场景下必须要实现的。比如,如果是对会话窗口进行聚合,merge()方法就是必须要实现的,它会定义累加器的合并操作,而且这个方法对一些场景的优化也很有用;而如果聚合 函数用在 OVER 窗口聚合中,就必须实现** retract()方法,保证数据可以进行撤回操作resetAccumulator()方法则是重置累加器**,这在一些批处理场景中会比较有用。 AggregateFunction 的所有方法都必须是 公有的(public),不能是静态的(static),而且 名字必须跟上面写的完全一样。 createAccumulator 、 getValue 、 getResultType 以 及 getAccumulatorType 这几个方法是在抽象类 AggregateFunction 中定义的,可以 override;而 其他则都是底层架构约定的方法。

下面举一个具体的示例。在常用的系统内置聚合函数里,可以用 AVG()来计算平均值;如 果我们现在希望计算的是某个字段的“加权平均值”,又该怎么做呢?系统函数里没有现成的 实现,所以只能自定义一个聚合函数 WeightedAvg 来计算了。

比如我们要从学生的分数表 ScoreTable 中计算每个学生的加权平均分。为了计算加权平均值,应该从输入的每行数据中提取两个值作为参数:要计算的分数值 score,以及它的权重weight。而在聚合过程中,累加器(accumulator)需要存储当前的加权总和 sum,以及目前数据的个数 count。这可以用一个二元组来表示,也可以单独定义一个类 WeightedAvgAccum, 里面包含 sum 和 count 两个属性,用它的对象实例来作为聚合的累加器。

具体代码如下:

package com.atguigu.chapter11;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;

public class UdfTest_AggregateFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv=StreamTableEnvironment.create(env);

        //1.在创建表的DDL中直接定义时间属性
        String createDDL="CREATE TABLE clickTable (" +
                " `user` STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
                "WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createDDL);

        //2.注册自定义标量函数
        tableEnv.createTemporarySystemFunction("WeightedAverage",WeightedAverage.class);

        //3.调用UDF进行查询转换
        Table resultTable = tableEnv.sqlQuery("SELECT user, WeightedAverage(ts ,1) as w_avg " +
                "from clickTable group by user ");

        //4.转换成流打印输出
        tableEnv.toChangelogStream(resultTable).print();

        env.execute();
    }

    //单独定义一个累加器类型
    public static class WeightedAvgAccumulator{
        public long sum=0;
        public int count=0;
    }

    //自定义实现的聚合函数,计算加权平均值

    /**
     * 参数解析:
     * Long:输出类型
     * WeightedAvgAccumulator:ACC类型,也就是累加器类型
     */
    public static class WeightedAverage extends AggregateFunction<Long, WeightedAvgAccumulator>{

        @Override
        public Long getValue(WeightedAvgAccumulator accumulator) {
           if(accumulator.count==0){
               return null;
           }else {
               return accumulator.sum / accumulator.count;
           }
        }

        @Override
        public WeightedAvgAccumulator createAccumulator() {
            return new WeightedAvgAccumulator();
        }

        //实现一个累加计算的方法
        public void accumulate(WeightedAvgAccumulator accumulator, Long iValue, Integer iWeight){
            accumulator.sum+=iValue * iWeight;
            accumulator.count+=iWeight;
        }
    }
}

5. 表聚合函数(Table Aggregate Functions)

用户自定义表聚合函数(UDTAGG)可以把一行或多行数据(也就是一个表)聚合成另一张表,结果表中可以有多行多列。很明显,这就像表函数和聚合函数的结合体,是一个“多对多”的转换。

自定义表聚合函数需要继承抽象类** TableAggregateFunction**。TableAggregateFunction 的结 构和原理与 AggregateFunction 非常类似,同样有两个泛型参数,用一个 ACC 类型的 累加器(accumulator)来存储聚合的中间结果。聚合函数中必须实现的三个方法,在TableAggregateFunction 中也必须对应实现:

⚫ createAccumulator()

创建累加器的方法,与 AggregateFunction 中用法相同。

⚫ accumulate()

聚合计算的核心方法,与 AggregateFunction 中用法相同。

⚫ emitValue()---需要手动实现

所有输入行处理完成后,输出最终计算结果的方法。这个方法对应着 AggregateFunction中的 getValue()方法;区别在于 emitValue 没有输出类型,而输入参数有两个:第一个是** ACC类型的累加器,第二个则是用于输出数据的“收集器”out,它的类型为 Collect**。

所以很明显,表聚合函数输出数据不是直接 return,而是调用 out.collect()方法,调用多次就可以输出多行数据了;这一点与表函数非常相似。另外,emitValue()在抽象类中也没有定义,无法 override, 必须手动实现

表聚合函数得到的是一张表;在流处理中做持续查询,应该每次都会把这个表重新计算输出。如果输入一条数据后,只是对结果表里一行或几行进行了更新(Update),这时我们重新 计算整个表、全部输出显然就不够高效了。为了提高处理效率,TableAggregateFunction 还提 供了一个 emitUpdateWithRetract()方法,它可以在结果表发生变化时,以“撤回”(retract)老数 据、发送新数据的方式增量地进行更新。如果同时定义了 emitValue()和 emitUpdateWithRetract()两个方法,在进行更新操作时会优先调用 emitUpdateWithRetract()。

表聚合函数相对比较复杂,它的一个典型应用场景就是 Top N 查询。比如我们希望选出 一组数据排序后的前两名,这就是最简单的 TOP-2 查询。没有线程的系统函数,那么我们就 可以自定义一个表聚合函数来实现这个功能。在累加器中应该能够保存当前最大的两个值,每当来一条新数据就在 accumulate()方法中进行比较更新,最终在 emitValue()中调用两次out.collect()将前两名数据输出

package com.atguigu.chapter11;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

public class Udf_TableAggregateFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        StreamTableEnvironment tableEnv=StreamTableEnvironment.create(env);

        //1.在创建表的DDL中直接定义时间属性
        String createDDL="CREATE TABLE clickTable (" +
                " `user` STRING, " +
                " url STRING, " +
                " ts BIGINT, " +
                " et AS TO_TIMESTAMP( FROM_UNIXTIME(ts / 1000) ), " +
                "WATERMARK FOR et AS et - INTERVAL '1' SECOND " +
                ") WITH (" +
                " 'connector' = 'filesystem'," +
                " 'path' = 'input/clicks.txt'," +
                " 'format' = 'csv'" +
                ")";
        tableEnv.executeSql(createDDL);

        //2.注册自定义标量函数
        tableEnv.createTemporarySystemFunction("Top2",Top2.class);

        //3.调用UDF进行查询转换

        //窗口TOP N,统计一段时间内的(前两名)用户
        String windowAggQuery="SELECT user, count(url) AS cnt, window_start, window_end " +
                "FROM TABLE (" +
                " TUMBLE(TABLE clickTable, DESCRIPTOR(et), INTERVAL '10' SECOND)" +
                ")" +
                "GROUP BY user, window_start, window_end";

        Table aggTable = tableEnv.sqlQuery(windowAggQuery);

        Table resultTable = aggTable.groupBy($("window_end"))
                .flatAggregate(call("Top2", $("cnt")).as("value", "rank"))
                .select($("window_end"), $("value"), $("rank"));

        //4.转换成流打印输出
        tableEnv.toChangelogStream(resultTable).print();

        env.execute();
    }

    //单独定义一个累加器类,包含了当前第二大、第一大的数据
    public static class Top2Accumulator{
        public Long max;
        public Long secondMax;
    }

    //实现自定义的表聚合函数
    public static class Top2 extends TableAggregateFunction<Tuple2<Long, Integer>, Top2Accumulator>{

        @Override
        public Top2Accumulator createAccumulator() {
            Top2Accumulator top2Accumulator=new Top2Accumulator();
            top2Accumulator.max=Long.MIN_VALUE;
            top2Accumulator.secondMax=Long.MIN_VALUE;

            return top2Accumulator;
        }

        //定义一个更新累加器的方法
        public void accumulate(Top2Accumulator accumulator,Long value){
            if(value >accumulator.max){
                accumulator.secondMax=accumulator.max;
                accumulator.max=value;
            }else if ( value >accumulator.secondMax){
                accumulator.secondMax=value;
            }
        }

        //输出结果,当前Top2
        public void emitValue(Top2Accumulator accumulator, Collector<Tuple2<Long, Integer>> out){
            if(accumulator.max!=Long.MIN_VALUE){
                out.collect(Tuple2.of(accumulator.max, 1));
            }
            if(accumulator.secondMax!=Long.MIN_VALUE){
                out.collect(Tuple2.of(accumulator.secondMax, 2));
            }
        }
    }
}
标签: sql flink 数据库

本文转载自: https://blog.csdn.net/JiaXingNashishua/article/details/127155068
版权归原作者 大数据阿嘉 所有, 如有侵权,请联系我们删除。

“Flink中Table API和SQL(四)”的评论:

还没有评论