窗口函数就是对一个窗口内的数据的操作处理。Flink的窗口函数分为两类:
- 窗口聚合函数:ReduceFunction和AggregateFunction,来一条聚合一条,只在窗口关闭时才会输出
- 全窗口处理函数:ProcessWindowFunction,来一条保存一条,只有在窗口关闭的时候才聚合或者处理,输出结果
ReduceFuntion
注意:
- 窗口的reduce() 时WindowedStream的聚合函数,而不是KeyedStream的reduce()聚合函数。
- Flink可以使用ReduceFunction增量地聚合窗口的元素。
package 复习
importorg.apache.flink.api.common.functions.ReduceFunction
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.api.scala._
importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
importorg.apache.flink.streaming.api.windowing.time.Time
/**
* ReduceFunction
* 需求:
* 获取某天某省---每10S新增最大记录值
2022-5-18 beijing 2
2022-5-18 beijing 3
2022-5-18 shanghai 6
*/object Window_ReduceFunction {def main(args: Array[String]):Unit={val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.socketTextStream("localhost",6666).map(x =>{val fields: Array[String]= x.split(" ")val date = fields(0).trim
val province = fields(1)val ts = fields(2).trim.toInt
(date +"_"+ province, ts)}).keyBy(_._1).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction[(String,Int)]{overridedef reduce(t:(String,Int), t1:(String,Int)):(String,Int)={if(t._2>t1._2){
t
}else{
t1
}}}).print()
env.execute("test")}}
AggregateFunction
注意:
- AggregateFunction比ReduceFunction更加灵活。是reduceFunction的一种特殊实现。 - AggregateFunction<泛型1,泛型2,泛型3>,泛型1 是输入类型,泛型2是累加器结果类型,泛型3是输出类型。- 需要重写四个方法:累加器的初始化,聚合逻辑,获取结果,会话窗口的合并窗口
- Flink也可以使用AggregateFunction增量地聚合窗口的元素
package 复习
importorg.apache.flink.api.common.functions.AggregateFunction
importorg.apache.flink.api.scala._
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
importorg.apache.flink.streaming.api.windowing.time.Time
/**
* AggregateFunction
*
* 窗口增量聚合函数:
* 需求:
* 获取某天某省---每10S平均新增记录
2022-5-18 beijing 2
2022-5-18 beijing 3
2022-5-18 shanghai 6
*
*/object Window_AggregateFunction {def main(args: Array[String]):Unit={val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.socketTextStream("localhost",6666).map(x =>{val fields: Array[String]= x.split(" ")val date = fields(0).trim
val province = fields(1)val ts = fields(2).trim.toInt
(date +"_"+ province, ts)}).keyBy(_._1).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))// 输入类型,累加器,输出类型.aggregate(new AggregateFunction[(String,Int),(String,Int,Int),(String,Int)]{// 初始化overridedef createAccumulator():(String,Int,Int)=("",0,0)// 单个增加overridedef add(in:(String,Int), acc:(String,Int,Int)):(String,Int,Int)={val cnt = acc._2+1val adds = acc._3+in._2
(in._1,cnt,adds)}// 多个分区(窗口)进行合并overridedef merge(acc:(String,Int,Int), acc1:(String,Int,Int)):(String,Int,Int)={val mergecnt = acc._2+acc1._2
val mergeadd = acc._3+acc1._3
(acc._1,mergecnt,mergeadd)}// 获取结果 这里的string 我将个数和sum全部输出出来方便查看overridedef getResult(acc:(String,Int,Int)):(String,Int)={(acc._1+"_"+acc._2+"_"+acc._3,acc._3 / acc._2)}}).print()
env.execute("test 08")}}
ProcessWIndowFunction
注意:
- 全窗口函数-process(new ProcessWindowFunction)获取一个包含窗口所有元素的iterable,以及一个可以访问时间和状态信息的Context对象,这使得它比其他窗口函数提供了更多的灵活性。
- ProcessWindowFunction以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到窗口逻辑可以处理为止。窗口比较大,或者数据量比较大,不建议使用,会占用更多的内存
package 复习
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.api.scala._
importorg.apache.flink.streaming.api.scala.function.ProcessWindowFunction
importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
importorg.apache.flink.streaming.api.windowing.time.Time
importorg.apache.flink.streaming.api.windowing.windows.TimeWindow
importorg.apache.flink.util.Collector
/**
* AggregateFunction
*
* 窗口增量聚合函数:
* 需求:
* 获取某天某省---每10S平均新增记录
2022-5-18 beijing 2
2022-5-18 beijing 3
2022-5-18 shanghai 6
*
*/object Window_ProcessWindowFunction {def main(args: Array[String]):Unit={val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.socketTextStream("localhost",6666).map(x =>{val fields: Array[String]= x.split(" ")val date = fields(0).trim
val province = fields(1)val ts = fields(2).trim.toInt
(date +"_"+ province, ts)}).keyBy(_._1).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{overridedef process(key:String, context: Context, elements: Iterable[(String,Int)], out: Collector[(String,Int)]):Unit={var cnt =0var totalAdds =0
elements.foreach(x =>{
cnt+=1
totalAdds = x._2+totalAdds
})// 这里也是将计数和sum放在了string里。
out.collect((key+"_"+cnt+"_"+totalAdds),totalAdds/cnt)}}).print()
env.execute("test 09")}}
窗口增量聚合处理函数
ProcessWindowFunction可以与ReduceFunction或AggregateFunction结合在一起,在元素到达窗口时进行增量。当窗口关闭时,ProcessWindowFunction将提供聚合的结果,这允许它在访问ProcessWindowFunction的附加窗口元信息的同时,递增地计算窗口。你也可以使用旧的WindowFunction而不是ProcessWindowFunction来增加窗口聚合。
使用ReduceFunction与WindowFunction组合,返回窗口中最大的事件以及窗口的开始时间
package 复习
importorg.apache.flink.api.java.tuple.Tuple
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.api.scala._
importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
importorg.apache.flink.streaming.api.windowing.time.Time
importorg.apache.flink.streaming.api.windowing.windows.TimeWindow
importorg.apache.flink.util.Collector
object Window_ReduceFunction_And_Process {def main(args: Array[String]):Unit={val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.socketTextStream("localhost",6666).map(x =>{val fields: Array[String]= x.split(" ")val date = fields(0).trim
val province = fields(1)val ts = fields(2).trim.toInt
(date +"_"+ province, ts)}).keyBy(_._1).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce((r1:(String,Int), r2:(String,Int))=>{if(r1._2 > r2._2){
r1
}else{
r2
}},(key:String,
window: TimeWindow,
maxReadings: Iterable[(String,Int)],
out: Collector[String])=>{val max = maxReadings.iterator.next()
out.collect(window.getStart +","+ max._1 +","+ max._2)}).print()
env.execute("test 1")}}
Reduce&Aggregate&Process窗口函数区别
相同
- 针对窗口里面的数据做聚合或者其他处理
不同
- Reduce:聚合,通用性较强,可以使用多个function包括ReduceFunction,AggregateFunction,FoldFunction or ProcessWindowFunction 支持增量计算
- Aggregate:是一种Reduce的通用特例,多用于累加;支持增量计算。
- Process:比Redeuce和Aggregate更灵活,除聚合外可以做更多操作处理;不支持增量(即全量),窗口较大时不建议
ProcessWIndowFunction和WIndowFunction的区别
ProcessWindowFunction
- 可以访问Keyed State(就像任何富函数一样,你们想象富函数是否能访问状态)。
- ProcessWindowFunction还可以将当前窗口外的Keyed State使用到当前正在处理的窗口函数中。换句话就是不同窗口数据调用。
- 该函数的process()调用接收到Context对象上的两个方法,它们允许访问两种类型的状态:- globalState():它允许访问执行窗口之外的Keyed State。- windowState():它允许访问执行窗口域内的Keyed State。
- 当使用窗口状态时,清除窗口状态也很重要。这应该在clear()方法中发生。
WindowFunction
- 使用WindowFunction的地方,你都能使用ProcessWindowFunction。
- WindowFunction是ProcessWindowFunction的一个旧版本,它提供了较少的上下文信息,并且没有一些高级特性,比如每个窗口的Keyed State。
- 这个接口将在某个时候被弃用。
版权归原作者 小小鱼爱疯狂 所有, 如有侵权,请联系我们删除。