一、系统内置函数
1. 比较函数
API函数表达式示例Table API===,>,<,!=,>=,<=id===1001,age>18SQL=,>,<,!=,>=,<=id=‘1001’,age>18
2. 逻辑函数
API函数表达式示例Table API&&,||,!,.isFalse1>1 && 2>1,true.isFalseSQLand,or,is false,not1>1 and 2>1,true is false
3. 算术函数
API函数表达式示例Table API+,-,,/,n1.power(n2)1 + 1,2.power(2)SQL+,-,,/,power(n1,n2)1 + 1,power(2,2)
4. 字符串函数
API函数表达式示例Table APIstring1 + string2,.upperCase(),.lowerCase(),.charLength()‘a’ + ‘b’,
‘hello’.upperCase()SQLstring1 || string2,upper(),lower(),char_length()‘a’ || ‘b’,upper(‘hello’)
5. 时间函数
API函数表达式示例Table API.toDate,.toTimestamp,currentTime(),n.days,n.minutes‘20230107’.toDate,
2.days,10.minutesSQLDate string,Timestamp string,current_time,interval string rangeDate ‘20230107’,interval ‘2’ hour/second/minute/day
6. 聚合函数
API函数表达式示例Table API.count,.sum,.sum0id.count,age.sum,sum0 表示求和的所有值都为 null 则返回 0SQLcount(),sum(),rank(),row_number()count(*),sum(age)
二、用户自定义函数(UDF)
UDF 显著地扩展了查询的表达能力,可以解决一些系统内置函数无法解决的需求。使用步骤为:自定义 UDF 函数类继承 UserDefinedFunction 抽象类;创建 UDF 实例并在环境中调用 registerFunction() 方法注册;在 Table API 或 SQL 中使用
1. 标量函数
Scalar Function,可以将 0、1 或多个标量值,映射到一个新的标量值,一进一出
/**
用户自定义标量函数步骤:
1.自定义函数类继承 ScalarFunction 抽象类,并在类中定义一个 public 的名为 eval 的方法
2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数
3.在 Table API 和 SQL 中使用
*/publicclassTestScalarFunction{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream =InputStream.map(line ->{String[] fields = line.split(",");returnnewSensorReading(fields[0],newLong(fields[1]),newDouble(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream,"id, timestamp as ts, temperature as temp");
tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的标量函数 hashCode,查询每条数据 id 的hashCode 值//2.创建自定义标量函数实例HashCode hashCode =newHashCode(0.8);//3.在环境中注册函数
tableEnv.registerFunction("hashCode", hashCode);//4.使用//4.1 Table APITable resultTable = sensorTable.select("id, ts, hashCode(id)");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, ts, hashCode(id) from sensor");
tableEnv.toAppendStream(resultTable,Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");
env.execute();}//1.自定义 HashCode 函数,继承 ScalarFunction 类,并定义 eval 方法publicstaticclassHashCodeextendsScalarFunction{privateint factor;publicHashCode(int factor){this.factor = factor;}//该方法必须为 public 且名称必须为 eval,返回值和参数可以自定义publicinteval(String str){return str.hashCode()* factor;}}}
2. 表函数
Table Function,可以将0、1或多个标量值作为输入参数,可以返回任意数量的行作为输出。一进多出
/**
用户自定义表函数步骤:
1.自定义函数类继承 TableFunction 抽象类,定义输出泛型,并在类中定义一个 public 的名为 eval,返回值为 void 的方法
2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数
3.在 Table API 和 SQL 中使用
*/publicclassTestTableFunction{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream =InputStream.map(line ->{String[] fields = line.split(",");returnnewSensorReading(fields[0],newLong(fields[1]),newDouble(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream,"id, timestamp as ts, temperature as temp");
tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表函数 Split,将每条数据的 id 分割并按 (word, length) 输出//2.创建自定义表函数实例Split split =newSplit("_");//3.在环境中注册函数
tableEnv.registerFunction("split", split);//4.使用//4.1 Table API//一进多出函数要配合 joinLateral 使用,侧写表Table resultTable = sensorTable.joinLateral("split(id) as (word, length)").select("id, ts, word, length");//4.2 SQL//一进多出函数要进行 lateral table 的关联,类似于 lateral viewTable resultSqlTable = tableEnv.sqlQuery("select id, ts, word, length from sensor, lateral table(split(id)) as lt(word, length)");
tableEnv.toAppendStream(resultTable,Row.class).print("result");
tableEnv.toAppendStream(resultSqlTable,Row.class).print("sql");
env.execute();}//1.自定义表函数 Split 继承 TableFunction,定义输出类型为 Tuple2<String, Integer>publicstaticclassSplitextendsTableFunction<Tuple2<String,Integer>>{privateString separator =",";publicSplit(String separator){this.separator = separator;}//必须定义一个 public 的返回值为 void 的 eval 方法publicvoideval(String str){for(String s : str.split(separator)){//使用 collect() 方法输出结果collect(newTuple2<>(s, s.length()));}}}}
3. 聚合函数
Aggregate Function,可以把一个表中的数据,聚合成一个标量值,多进一出
/**
用户自定义聚合函数步骤:
1.自定义函数类继承 AggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和getValue()方法
2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数
3.在 Table API 和 SQL 中使用
*/publicclassTestAggregateFunction{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream =InputStream.map(line ->{String[] fields = line.split(",");returnnewSensorReading(fields[0],newLong(fields[1]),newDouble(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream,"id, timestamp as ts, temperature as temp");
tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的聚合函数 AvgTemp,按 id 分组后求最新的平均温度值//2.创建自定义聚合函数实例AvgTemp avgTemp =newAvgTemp();//3.在环境中注册函数
tableEnv.registerFunction("avgTemp", avgTemp);//4.使用//4.1 Table API//聚合函数必须在 groupBy 后的 aggregate 方法使用Table resultTable = sensorTable.groupBy("id").aggregate("avgTemp(temp) as avgtemp").select("id, avgtemp");//4.2 SQLTable resultSqlTable = tableEnv.sqlQuery("select id, avgTemp(temp) from sensor group by id");
tableEnv.toRetractStream(resultTable,Row.class).print("result");
tableEnv.toRetractStream(resultSqlTable,Row.class).print("sql");
env.execute();}//1.自定义聚合函数 AvgTemp 继承 AggregateFunction 类,定义输出类型为 Double,中间累加器类型为 Tuple2<Double, Integer> 并实现方法publicstaticclassAvgTempextendsAggregateFunction<Double,Tuple2<Double,Integer>>{@OverridepublicDoublegetValue(Tuple2<Double,Integer> accumulator){return accumulator.f0 / accumulator.f1;}@OverridepublicTuple2<Double,Integer>createAccumulator(){returnnewTuple2<>(0.0,0);}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublicvoidaccumulate(Tuple2<Double,Integer> accumulator,Double temp){
accumulator.f0 += temp;
accumulator.f1 +=1;}}}
4. 表聚合函数
Table Aggregate Function,可以把一个表中数据,聚合为具有多行和多列的结果表,多进多出
/**
用户自定义表聚合函数步骤:
1.自定义函数类继承 TableAggregateFunction 抽象类,定义输出和累加器的泛型,实现 createAccumulator()、accumulate()和 emitValue()方法
2.创建自定义函数实例,使用 table 环境对象调用 registerFunction() 方法注册函数
3.在 Table API 和 SQL 中使用
*/publicclassTestAggregateFunction{publicstaticvoidmain(String[] args)throwsException{StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);StreamTableEnvironment tableEnv =StreamTableEnvironment.create(env);DataStream<String> inputStream = env.readTextFile("./sensor.txt");DataStream<SensorReading> dataStream =InputStream.map(line ->{String[] fields = line.split(",");returnnewSensorReading(fields[0],newLong(fields[1]),newDouble(fields[2]));});Table sensorTable = tableEnv.fromDataStream(dataStream,"id, timestamp as ts, temperature as temp");
tableEnv.createTemporaryView("sensor", dataTable);//使用自定义的表聚合函数 Top2Temp,按 id 分组后求最新的 top2 温度值//2.创建自定义表聚合函数实例Top2Temp top2Temp =newTop2Temp();//3.在环境中注册函数
tableEnv.registerFunction("top2Temp", top2Temp);//4.使用//Table API//表聚合函数必须在 groupBy 后的 flatAggregate 方法使用Table resultTable = sensorTable.groupBy("id").flatAggregate("top2Temp(temp) as (temp, irank)").select("id, temp, irank");//表聚合函数目前不能在 SQL 中使用
tableEnv.toRetractStream(resultTable,Row.class).print("result");
env.execute();}//定义一个 AccumulatorpublicstaticclassTop2TempAcc{privateDouble highestTemp =Double.MIN_VALUE;privateDouble secondHighestTemp =Double.MIN_VALUE;publicDoublegetHighestTemp(){return highestTemp;}publicvoidsetHighestTemp(Double highestTemp){this.highestTemp=highestTemp;}publicDoublegetSecondHighestTemp(){return secondHighestTemp;}publicvoidsetSecondHighestTemp(Double secondHighestTemp){this.secondHighestTemp=secondHighestTemp;}}//1.自定义表聚合函数 Top2Temp 继承 TableAggregateFunction 类,定义输出类型为 Tuple2<Double, Integer>,中间累加器类型为自定义的 Top2TempAcc 类并实现方法publicstaticclassTop2TempextendsTableAggregateFunction<Tuple2<Double,Integer>,Top2TempAcc>{@OverridepublicvoidemitValue(Top2TempAcc acc,Collector<Tuple2<Double,Integer>> out){
out.collect(newTuple2<>(acc.getHighestTemp(),1));
out.collect(newTuple2<>(acc.getSecondHighestTemp(),2));}@OverridepublicTop2TempAcccreateAccumulator(){returnnewTop2TempAcc();}//必须定义一个 accumulate 方法,参数列表必须为 (accumulator, data),返回值为 voidpublicvoidaccumulate(Top2TempAcc acc,Double temp){if(acc.getHighestTemp()< temp){
acc.setSecondHighestTemp(acc.getHighestTemp());
acc.setHighestTemp(temp);}elseif(acc.getSecondHighestTemp()< temp){
acc.setSecondHighestTemp(temp);}}}}
版权归原作者 文刀小桂 所有, 如有侵权,请联系我们删除。