Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及FileSystem示例(1)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Elasticsearch示例(2)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Kafka示例(3)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及JDBC示例(4)
16、Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式以及Apache Hive示例(6)
17、Flink 之Table API: Table API 支持的操作(1)
17、Flink 之Table API: Table API 支持的操作(2)
18、Flink的SQL 支持的操作和语法
19、Flink 的Table API 和 SQL 中的内置函数及示例(1)
20、Flink SQL之SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
22、Flink 的table api与sql之创建表的DDL
24、Flink 的table api与sql之Catalogs(介绍、类型、java api和sql实现ddl、java api和sql操作catalog)-1
24、Flink 的table api与sql之Catalogs(java api操作数据库、表)-2
24、Flink 的table api与sql之Catalogs(java api操作视图)-3
24、Flink 的table api与sql之Catalogs(java api操作分区与函数)-4
26、Flink 的SQL之概览与入门示例
27、Flink 的SQL之SELECT (select、where、distinct、order by、limit、集合操作和去重)介绍及详细示例(1)
27、Flink 的SQL之SELECT (SQL Hints 和 Joins)介绍及详细示例(2)
27、Flink 的SQL之SELECT (窗口函数)介绍及详细示例(3)
27、Flink 的SQL之SELECT (窗口聚合)介绍及详细示例(4)
27、Flink 的SQL之SELECT (Group Aggregation分组聚合、Over Aggregation Over聚合 和 Window Join 窗口关联)介绍及详细示例(5)
27、Flink 的SQL之SELECT (Top-N、Window Top-N 窗口 Top-N 和 Window Deduplication 窗口去重)介绍及详细示例(6)
27、Flink 的SQL之SELECT (Pattern Recognition 模式检测)介绍及详细示例(7)
28、Flink 的SQL之DROP 、ALTER 、INSERT 、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(1)
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE(2)
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
32、Flink table api和SQL 之用户自定义 Sources & Sinks实现及详细示例
41、Flink之Hive 方言介绍及详细示例
42、Flink 的table api与sql之Hive Catalog
43、Flink之Hive 读写及详细验证示例
44、Flink之module模块介绍及使用示例和Flink SQL使用hive内置函数及自定义函数详细示例–网上有些说法好像是错误的
文章目录
本文介绍了flink的函数分类、内置函数的说明及示例,特别是针对json function函数每个均以可运行的示例进行说明。
本文依赖flink集群能正常使用。
本文分为2个部分,即函数分类以及内置函数。
本文的示例均在Flink 1.17版本中运行。
一、函数分类
Flink 允许用户在 Table API 和 SQL 中使用函数进行数据的转换。
Flink 中的函数有两个划分标准。
1、分类标准及类别
一个划分标准是:系统(内置)函数和 Catalog 函数。系统函数没有名称空间,只能通过其名称来进行引用。 Catalog 函数属于 Catalog 和数据库,因此它们拥有 Catalog 和数据库命名空间。 用户可以通过全/部分限定名(catalog.db.func 或 db.func)或者函数名 来对 Catalog 函数进行引用。
另一个划分标准是:临时函数和持久化函数。 临时函数始终由用户创建,它容易改变并且仅在会话的生命周期内有效。 持久化函数不是由系统提供,就是存储在 Catalog 中,它在会话的整个生命周期内都有效。
这两个划分标准给 Flink 用户提供了 4 种函数:
- 临时性系统函数
- 系统函数
- 临时性 Catalog 函数
- Catalog 函数
系统函数始终优先于 Catalog 函数解析,临时函数始终优先于持久化函数解析, 函数解析优先级如下所述。
2、函数引用
用户在 Flink 中可以通过精确、模糊两种引用方式引用函数。
1)、精确函数引用
精确函数引用允许用户跨 Catalog,跨数据库调用 Catalog 函数。
例如:select mycatalog.mydb.myfunc(x) from mytable 和 select mydb.myfunc(x) from mytable。
仅 Flink 1.10 以上版本支持。
2)、模糊函数引用
在模糊函数引用中,用户只需在 SQL 查询中指定函数名,例如: select myfunc(x) from mytable。
3、函数解析顺序
当函数名相同,函数类型不同时,函数解析顺序才有意义。
例如:当有三个都名为 “myfunc” 的临时性 Catalog 函数,Catalog 函数,和系统函数时, 如果没有命名冲突,三个函数将会被解析为一个函数。
1)、精确函数引用
由于系统函数没有命名空间,Flink 中的精确函数引用必须 指向临时性 Catalog 函数或 Catalog 函数。
解析顺序如下:
- 临时性 catalog 函数
- Catalog 函数
2)、模糊函数引用
解析顺序如下:
- 临时性系统函数
- 系统函数
- 临时性 Catalog 函数, 在会话的当前 Catalog 和当前数据库中
- Catalog 函数, 在会话的当前 Catalog 和当前数据库中
二、系统(内置)函数
Flink Table API & SQL 为用户提供了一组内置的数据转换函数。
1、标量函数
标量函数将零、一个或多个值作为输入并返回单个值作为结果。
1)、比较函数
2)、逻辑函数
3)、算术函数
4)、字符串函数
5)、时间函数
6)、条件函数
7)、类型转换函数
8)、集合函数
9)、JSON Functions
JSON 函数使用 SQL 标准的 ISO/IEC TR 19075-6 中所述的 JSON 路径表达式(JSON path expressions )。它们的语法受到 ECMAScript 的启发并采用了 ECMAScript 的许多功能,但既不是它的子集也不是它的超集。
路径表达式有两种风格,宽松和严格( lax and strict.)。省略时,它默认为严格模式。
严格模式旨在从架构角度检查数据,每当数据不符合路径表达式时,就会引发错误。但是,像 JSON_VALUE 这样的函数允许在遇到错误时定义回退行为。
宽松模式更宽容,并将错误转换为空序列。
特殊字符 $ 表示 JSON 路径中的根节点。路径可以访问属性 (
.
a
)、数组元素(
.a)、数组元素 (
.a)、数组元素(.a[0].b) 或分支数组中的所有元素 ($.a[*].b)。
已知限制:
截至Flink 1.17版本并非正确支持宽松模式的所有功能。这是一个上游错误 (CALCITE-4717)。不保证非标准行为。
1、IS JSON
确定给定字符串是否为有效的 JSON。
指定可选的类型参数会限制允许哪种类型的 JSON 对象。如果字符串是有效的 JSON,但不是该类型,则返回 false。默认值为 VALUE。
- SQL语法
IS JSON [ { VALUE| SCALAR | ARRAY | OBJECT } ]
- table api语法
STRING.isJson([JsonType type])
- 示例
-- TRUE
Flink SQL>select'1'IS JSON;+----+--------+| op | EXPR$0|+----+--------+|+I |TRUE|+----+--------+
Flink SQL>select'[]'IS JSON;+----+--------+| op | EXPR$0|+----+--------+|+I |TRUE|+----+--------+-- The following statements return TRUE.SELECT'1'IS JSON;SELECT'[]'IS JSON;SELECT'{}'IS JSON;SELECT'"abc"'IS JSON;SELECT'1'IS JSON SCALAR;SELECT'{}'IS JSON OBJECT;-- The following statements return FALSE.SELECT'abc'IS JSON;SELECT'1'IS JSON ARRAY;SELECT'1'IS JSON OBJECT;SELECT'{}'IS JSON SCALAR;SELECT'{}'IS JSON ARRAY;# 以下示例一样,不再赘述'1'IS JSON
'[]'IS JSON
'{}'IS JSON
-- TRUE'"abc"'IS JSON
-- FALSE'abc'IS JSON
NULLIS JSON
-- TRUE'1'IS JSON SCALAR
-- FALSE'1'IS JSON ARRAY
-- FALSE'1'IS JSON OBJECT
-- FALSE'{}'IS JSON SCALAR
-- FALSE'{}'IS JSON ARRAY
-- TRUE'{}'IS JSON OBJECT
2、JSON_EXISTS
确定 JSON 字符串是否满足给定的路径搜索条件。
如果省略错误行为,则假定 FALSE ON ERROR 为默认值。
- SQL语法
JSON_EXISTS(jsonValue, path [ { TRUE|FALSE| UNKNOWN | ERROR } ON ERROR ])
- table api语法
STRING.jsonExists(STRING path [,JsonExistsOnError onError])
- 示例
Flink SQL>SELECT JSON_EXISTS('{"a": true}','strict $.b'FALSEON ERROR);+----+--------+| op | EXPR$0|+----+--------+|+I |FALSE|+----+--------+-- The following statements return TRUE.SELECT JSON_EXISTS('{"a": true}','$.a');SELECT JSON_EXISTS('{"a": [{ "b": 1 }]}','$.a[0].b');SELECT JSON_EXISTS('{"a": true}','strict $.b'TRUEON ERROR);-- The following statements return FALSE.SELECT JSON_EXISTS('{"a": true}','$.b');SELECT JSON_EXISTS('{"a": true}','strict $.b'FALSEON ERROR);-- TRUESELECT JSON_EXISTS('{"a": true}','$.a');-- FALSESELECT JSON_EXISTS('{"a": true}','$.b');-- TRUESELECT JSON_EXISTS('{"a": [{ "b": 1 }]}','$.a[0].b');-- TRUESELECT JSON_EXISTS('{"a": true}','strict $.b'TRUEON ERROR);-- FALSESELECT JSON_EXISTS('{"a": true}','strict $.b'FALSEON ERROR);
3、JSON_STRING
将值序列化为 JSON。
此函数返回包含序列化值的 JSON 字符串。如果值为 NULL,则该函数返回 NULL。
- SQL语法
JSON_STRING(value)
- table api语法
jsonString(value)
- 示例
Flink SQL>SELECT JSON_STRING(1);+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I |1|+----+--------------------------------+-- returns NULLSELECT JSON_STRING(CAST(NULLASINT));-- returns '1'SELECT JSON_STRING(1);-- returns 'true'SELECT JSON_STRING(TRUE);-- returns '"Hello, World!"'
JSON_STRING('Hello, World!');-- returns '[1,2]'
JSON_STRING(ARRAY[1,2])-- NULL
JSON_STRING(CAST(NULLASINT))-- '1'
JSON_STRING(1)-- 'true'
JSON_STRING(TRUE)-- '"Hello, World!"'
JSON_STRING('Hello, World!')-- '[1,2]'
JSON_STRING(ARRAY[1,2])
4、JSON_VALUE
从 JSON 字符串中提取标量。
此方法在 JSON 字符串中搜索给定的路径表达式,如果该路径的值为标量,则返回该值。不能返回非标量值。
默认情况下,该值以 STRING 形式返回。使用 returningType 可以选择不同的类型,并支持以下类型:
- VARCHAR / STRING
- BOOLEAN
- INTEGER
- DOUBLE
对于空路径表达式或错误,可以将行为定义为返回 null、引发错误或返回定义的默认值。
省略时,默认值分别为 NULL ON EMPTY 或 NULL ON ERROR。
默认值可以是文本或表达式。如果默认值本身引发错误,则它将下降到 ON EMPTY 的错误行为,并引发 ON ERROR 的错误。
对于包含空格等特殊字符的路径,可以使用 [‘property’] 或 [“property”] 选择父对象中的指定属性。
请务必在属性名称两边加上单引号或双引号。
在 SQL 中使用 JSON_VALUE 时,路径是一个字符参数,该参数已经是单引号,因此您必须对属性名称周围的单引号进行转义,
例如 JSON_VALUE(‘{“a b”: “true”}’, ‘$.[’‘a b’‘]’)。
- SQL语法
JSON_VALUE(jsonValue, path [RETURNING<dataType>][ { NULL| ERROR |DEFAULT<defaultExpr> } ON EMPTY ][ { NULL| ERROR |DEFAULT<defaultExpr> } ON ERROR ])
- table api语法
STRING.jsonValue(STRING path [, returnType, onEmpty, defaultOnEmpty, onError, defaultOnError])
- 示例
Flink SQL>SELECT JSON_VALUE('{"a": true}','$.a');+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I |true|+----+--------------------------------+
Flink SQL>SELECT JSON_VALUE('{"contains blank": "right"}','strict $.[''contains blank'']'NULLON EMPTY DEFAULT'wrong'ON ERROR);+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I |right|+----+--------------------------------+-- returns "true"SELECT JSON_VALUE('{"a": true}','$.a');-- returns TRUESELECT JSON_VALUE('{"a": true}','$.a'RETURNINGBOOLEAN);-- returns "false"SELECT JSON_VALUE('{"a": true}','lax $.b'DEFAULTFALSEON EMPTY);-- returns "false"SELECT JSON_VALUE('{"a": true}','strict $.b'DEFAULTFALSEON ERROR);-- returns 0.998DSELECT JSON_VALUE('{"a.b": [0.998,0.996]}','$.["a.b"][0]'RETURNINGDOUBLE);-- returns "right"SELECT JSON_VALUE('{"contains blank": "right"}','strict $.[''contains blank'']'NULLON EMPTY DEFAULT'wrong'ON ERROR);
5、JSON_QUERY
目前不支持 RETURNING 子句。
wrappingBehavior 确定是否应将提取的值包装到数组中,以及是无条件地包装,还是仅在值本身还不是数组时才这样做。
onEmpty 和 onError 分别确定路径表达式为空或引发错误时的行为。
默认情况下,在这两种情况下都返回 null。其他选择是使用空数组、空对象或引发错误。
- SQL语法
JSON_QUERY(jsonValue, path [ { WITHOUT |WITH CONDITIONAL |WITH UNCONDITIONAL } [ ARRAY ] WRAPPER ][ { NULL| EMPTY ARRAY | EMPTY OBJECT | ERROR } ON EMPTY ][ { NULL| EMPTY ARRAY | EMPTY OBJECT | ERROR } ON ERROR ])
- table api语法
STRING.jsonQuery(path [,JsonQueryWrapper[,JsonQueryOnEmptyOrError,JsonQueryOnEmptyOrError]])
- 示例
Flink SQL>SELECT JSON_QUERY('{ "a": { "b": 1 } }','$.a');+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I | {"b":1} |+----+--------------------------------+
Flink SQL>SELECT JSON_QUERY('{}','lax $.invalid' EMPTY OBJECT ON EMPTY);+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I | {} |+----+--------------------------------+-- returns '{ "b": 1 }'SELECT JSON_QUERY('{ "a": { "b": 1 } }','$.a');-- returns '[1, 2]'SELECT JSON_QUERY('[1, 2]','$');-- returns NULLSELECT JSON_QUERY(CAST(NULLAS STRING),'$');-- returns '["c1","c2"]'SELECT JSON_QUERY('{"a":[{"c":"c1"},{"c":"c2"}]}','lax $.a[*].c');-- Wrap the result into an array.-- returns '[{}]'SELECT JSON_QUERY('{}','$'WITH CONDITIONAL ARRAY WRAPPER);-- returns '[1, 2]'SELECT JSON_QUERY('[1, 2]','$'WITH CONDITIONAL ARRAY WRAPPER);-- returns '[[1, 2]]'SELECT JSON_QUERY('[1, 2]','$'WITH UNCONDITIONAL ARRAY WRAPPER);-- Scalars must be wrapped to be returned.-- returns NULLSELECT JSON_QUERY(1,'$');-- returns '[1]'SELECT JSON_QUERY(1,'$'WITH CONDITIONAL ARRAY WRAPPER);-- Behavior if the path expression is empty.-- returns '{}'SELECT JSON_QUERY('{}','lax $.invalid' EMPTY OBJECT ON EMPTY);-- Behavior if the path expression has an error.-- returns '[]'SELECT JSON_QUERY('{}','strict $.invalid' EMPTY ARRAY ON ERROR);-- '{ "b": 1 }'
JSON_QUERY('{ "a": { "b": 1 } }','$.a')-- '[1, 2]'
JSON_QUERY('[1, 2]','$')-- NULL
JSON_QUERY(CAST(NULLAS STRING),'$')-- '["c1","c2"]'
JSON_QUERY('{"a":[{"c":"c1"},{"c":"c2"}]}','lax $.a[*].c')-- Wrap result into an array-- '[{}]'
JSON_QUERY('{}','$'WITH CONDITIONAL ARRAY WRAPPER)-- '[1, 2]'
JSON_QUERY('[1, 2]','$'WITH CONDITIONAL ARRAY WRAPPER)-- '[[1, 2]]'
JSON_QUERY('[1, 2]','$'WITH UNCONDITIONAL ARRAY WRAPPER)-- Scalars must be wrapped to be returned-- NULL
JSON_QUERY(1,'$')-- '[1]'
JSON_QUERY(1,'$'WITH CONDITIONAL ARRAY WRAPPER)-- Behavior if path expression is empty / there is an error-- '{}'
JSON_QUERY('{}','lax $.invalid' EMPTY OBJECT ON EMPTY)-- '[]'
JSON_QUERY('{}','strict $.invalid' EMPTY ARRAY ON ERROR)
6、JSON_OBJECT
从键值对列表生成 JSON 对象字符串。
请注意,键必须是非 NULL 字符串文本,而值可以是任意表达式。
此函数返回一个 JSON 字符串。ON NULL 行为定义如何处理 NULL 值。如果省略,则默认假定 NULL ON NULL。
从另一个 JSON 构造函数调用(JSON_OBJECT、JSON_ARRAY)创建的值是直接插入的,而不是作为字符串插入的。这允许构建嵌套的 JSON 结构。
- SQL语法
JSON_OBJECT([[KEY]keyVALUEvalue]*[ { NULL| ABSENT } ONNULL])
- table api语法
jsonObject(JsonOnNull, keyValues...)
- 示例
Flink SQL>SELECT JSON_OBJECT(>KEY'K1'>VALUE JSON_OBJECT(>KEY'K2'>VALUE'V'>)>);+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I | {"K1":{"K2":"V"}} |+----+--------------------------------+
Flink SQL>SELECT JSON_OBJECT(KEY'K1'VALUE CAST(NULLAS STRING) ABSENT ONNULL);+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I | {} |+----+--------------------------------+-- returns '{}'SELECT JSON_OBJECT();-- returns '{"K1":"V1","K2":"V2"}'SELECT JSON_OBJECT('K1'VALUE'V1','K2'VALUE'V2');-- Use an expression as a value.SELECT JSON_OBJECT('orderNo'VALUE orders.orderId);-- ON NULL-- '{"K1":null}'SELECT JSON_OBJECT(KEY'K1'VALUE CAST(NULLAS STRING)NULLONNULL);-- ON NULL-- '{}'SELECT JSON_OBJECT(KEY'K1'VALUE CAST(NULLAS STRING) ABSENT ONNULL);-- returns '{"K1":{"K2":"V"}}'SELECT JSON_OBJECT(KEY'K1'VALUE JSON_OBJECT(KEY'K2'VALUE'V'));-- '{}'
JSON_OBJECT()-- '{"K1":"V1","K2":"V2"}'
JSON_OBJECT('K1'VALUE'V1','K2'VALUE'V2')-- Expressions as values
JSON_OBJECT('orderNo'VALUE orders.orderId)-- ON NULL
JSON_OBJECT(KEY'K1'VALUE CAST(NULLAS STRING)NULLONNULL)-- '{"K1":null}'
JSON_OBJECT(KEY'K1'VALUE CAST(NULLAS STRING) ABSENT ONNULL)-- '{}'-- '{"K1":{"K2":"V"}}'
JSON_OBJECT(KEY'K1'VALUE JSON_OBJECT(KEY'K2'VALUE'V'))
7、JSON_ARRAY
从值列表生成 JSON 数组字符串。
此函数返回一个 JSON 字符串。这些值可以是任意表达式。ON NULL 行为定义如何处理 NULL 值。如果省略,则默认假定 ABSENT ON NULL。
从另一个 JSON 构造函数调用(JSON_OBJECT、JSON_ARRAY)创建的元素是直接插入的,而不是作为字符串插入的。这允许构建嵌套的 JSON 结构。
- SQL语法
JSON_ARRAY([value]*[ { NULL| ABSENT } ONNULL])
- table api语法
jsonArray(JsonOnNull, values...)
- 示例
Flink SQL>>SELECT JSON_ARRAY(1,'2');+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I |[1,"2"]|+----+--------------------------------+
Received a total of1row
Flink SQL>SELECT JSON_ARRAY(CAST(NULLAS STRING) ABSENT ONNULL);+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I |[]|+----+--------------------------------+-- returns '[]'SELECT JSON_ARRAY();-- returns '[1,"2"]'SELECT JSON_ARRAY(1,'2');-- Use an expression as a value.SELECT JSON_ARRAY(orders.orderId);-- ON NULL-- returns '[null]'SELECT JSON_ARRAY(CAST(NULLAS STRING)NULLONNULL);-- ON NULL-- returns '[]'SELECT JSON_ARRAY(CAST(NULLAS STRING) ABSENT ONNULL);-- returns '[[1]]'SELECT JSON_ARRAY(JSON_ARRAY(1));-- '[]'
JSON_ARRAY()-- '[1,"2"]'
JSON_ARRAY(1,'2')-- Expressions as values
JSON_ARRAY(orders.orderId)-- ON NULL
JSON_ARRAY(CAST(NULLAS STRING)NULLONNULL)-- '[null]'
JSON_ARRAY(CAST(NULLAS STRING) ABSENT ONNULL)-- '[]'-- '[[1]]'
JSON_ARRAY(JSON_ARRAY(1))
8、JSON_ARRAYAGG
将明细聚合到 JSON 数组字符串中。
JSON_ARRAYAGG 函数通过将指定的项聚合到数组中来创建 JSON 对象字符串。
item 表达式可以是任意的,包括其他 JSON 函数。
如果值为 NULL,则 ON NULL 行为定义要执行的操作。如果省略,则 ABSENT ON NULL 为默认值。
OVER 窗口、无限会话窗口或 HOP 窗口不支持JSON_ARRAYAGG函数。
- SQL语法
JSON_ARRAYAGG(items [ { NULL| ABSENT } ONNULL])
- table api语法
在这里插入代码片
- 示例
Flink SQL>CREATETABLE source_table (> userId INT,> age INT,> balance DOUBLE,> userName STRING,> t_insert_time AS localtimestamp,> WATERMARK FOR t_insert_time AS t_insert_time
>)WITH(>'connector'='datagen',>'rows-per-second'='5',>'fields.userId.kind'='sequence',>'fields.userId.start'='1',>'fields.userId.end'='10',>>'fields.balance.kind'='random',>'fields.balance.min'='1',>'fields.balance.max'='100',>>'fields.age.min'='1',>'fields.age.max'='1000',>>'fields.userName.length'='10'>);[INFO]Execute statement succeed.
Flink SQL>select*from source_table;+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+| op | userId | age | balance | userName | t_insert_time |+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+|+I |1|555|90.45012880441223|7e2b6c7beb |2023-11-0617:29:05.273||+I |2|209|32.07201650494765| f652baac94 |2023-11-0617:29:05.274||+I |3|278|24.299962537076734|11b4353416 |2023-11-0617:29:05.274||+I |4|433|58.634356546049574|21d5d09603 |2023-11-0617:29:05.274||+I |5|55|16.20617629075601| d626f31213 |2023-11-0617:29:05.274||+I |6|442|98.87803427244727|0305c21dc5 |2023-11-0617:29:06.267||+I |7|19|96.11095443982174| ea873b2df2 |2023-11-0617:29:06.268||+I |8|806|36.5775262369553| f8df556b22 |2023-11-0617:29:06.268||+I |9|919|69.47517602162831|85074390f3 |2023-11-0617:29:06.268||+I |10|46|47.519467818569815|662990446f |2023-11-0617:29:06.268|+----+-------------+-------------+--------------------------------+--------------------------------+-------------------------+
Received a total of10rows
Flink SQL>SELECT> JSON_ARRAYAGG(userName)>FROM source_table;+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I |["ee2e4edb32"]||-U |["ee2e4edb32"]||+U |["ee2e4edb32","66e13f3f77"]||-U |["ee2e4edb32","66e13f3f77"]||+U |["ee2e4edb32","66e13f3f77",...||-U |["ee2e4edb32","66e13f3f77",...||+U |["ee2e4edb32","66e13f3f77",...||-U |["ee2e4edb32","66e13f3f77",...||+U |["ee2e4edb32","66e13f3f77",...||-U |["ee2e4edb32","66e13f3f77",...||+U |["ee2e4edb32","66e13f3f77",...||-U |["ee2e4edb32","66e13f3f77",...||+U |["ee2e4edb32","66e13f3f77",...||-U |["ee2e4edb32","66e13f3f77",...||+U |["ee2e4edb32","66e13f3f77",...||-U |["ee2e4edb32","66e13f3f77",...||+U |["ee2e4edb32","66e13f3f77",...||-U |["ee2e4edb32","66e13f3f77",...||+U |["ee2e4edb32","66e13f3f77",...|+----+--------------------------------+
Received a total of19rows
Flink SQL>SELECT> JSON_ARRAYAGG(userId)>FROM source_table;+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I |[1]||-U |[1]||+U |[1,2]||-U |[1,2]||+U |[1,2,3]||-U |[1,2,3]||+U |[1,2,3,4]||-U |[1,2,3,4]||+U |[1,2,3,4,5]||-U |[1,2,3,4,5]||+U |[1,2,3,4,5,6]||-U |[1,2,3,4,5,6]||+U |[1,2,3,4,5,6,7]||-U |[1,2,3,4,5,6,7]||+U |[1,2,3,4,5,6,7,8]||-U |[1,2,3,4,5,6,7,8]||+U |[1,2,3,4,5,6,7,8,9]||-U |[1,2,3,4,5,6,7,8,9]||+U |[1,2,3,4,5,6,7,8,9,10]|+----+--------------------------------+
Received a total of19rows
10、JSON_OBJECTAGG
将key-value表达式聚合到 JSON 字符串中。
JSON_OBJECTAGG 函数通过将key-value表达式聚合到单个 JSON 对象中来创建 JSON 对象字符串。
key表达式必须返回不可为 null 的字符串。value表达式可以是任意的,包括其他 JSON 函数。
密钥必须是唯一的。如果一个key多次出现,则会引发错误。
如果value为 NULL,则 ON NULL 行为定义要执行的操作。如果省略,则 NULL ON NULL 为默认值。
OVER 窗口中不支持 JSON_OBJECTAGG 函数。
- SQL语法
JSON_OBJECTAGG([KEY]keyVALUEvalue[ { NULL| ABSENT } ONNULL])
- table api语法
在这里插入代码片
- 示例
Flink SQL>select> JSON_OBJECTAGG(userName VALUE'f652baac94')>FROM source_table;+----+--------------------------------+| op | EXPR$0|+----+--------------------------------+|+I | {"0c3ceeca6f":"f652baac94"} ||-U | {"0c3ceeca6f":"f652baac94"} ||+U | {"0c3ceeca6f":"f652baac94",...||-U | {"0c3ceeca6f":"f652baac94",...||+U | {"0c3ceeca6f":"f652baac94",...||-U | {"0c3ceeca6f":"f652baac94",...||+U | {"0c3ceeca6f":"f652baac94",...||-U | {"0c3ceeca6f":"f652baac94",...||+U | {"0c3ceeca6f":"f652baac94",...||-U | {"0c3ceeca6f":"f652baac94",...||+U | {"0c3ceeca6f":"f652baac94",...||-U | {"0c3ceeca6f":"f652baac94",...||+U | {"0c3ceeca6f":"f652baac94",...||-U | {"0c3ceeca6f":"f652baac94",...||+U | {"0c3ceeca6f":"f652baac94",...||-U | {"0c3ceeca6f":"f652baac94",...||+U | {"0c3ceeca6f":"f652baac94",...||-U | {"0c3ceeca6f":"f652baac94",...||+U | {"0c3ceeca6f":"f652baac94",...|+----+--------------------------------+
10)、值构建函数
11)、值获取函数
12)、分组函数
13)、哈希函数
2、聚合函数
聚合函数将所有的行作为输入,并返回单个聚合值作为结果。
3、时间间隔单位和时间点单位标识符
下表列出了时间间隔单位和时间点单位标识符。
对于 Table API,请使用 _ 代替空格(例如 DAY_TO_HOUR)
4、列函数
列函数用于选择或丢弃表的列。
列函数仅在 Table API 中使用。
详细语法如下:
//列函数:withColumns(columnExprs)withoutColumns(columnExprs)//多列表达式:
columnExpr [, columnExpr]*//单列表达式:
columnRef | columnIndex tocolumnIndex| columnName tocolumnName//列引用:columnName(The field name that exists in the table)|columnIndex(a positive integer starting from 1)
列函数的用法如下表所示(假设我们有一个包含 5 列的表:(a: Int, b: Long, c: String, d:String, e: String)):
列函数可用于所有需要列字段的地方,例如 select、groupBy、orderBy、UDFs 等函数,例如:
table
.groupBy(withColumns(range(1,3))).select(withColumns(range("a","b")),myUDAgg(myUDF(withColumns(range(5,20)))));
以上,介绍了flink的函数分类、内置函数的说明及示例,特别是针对json function函数每个均以可运行的示例进行说明。
版权归原作者 一瓢一瓢的饮 alanchan 所有, 如有侵权,请联系我们删除。