0


Flink 中Window Functions

窗口函数就是对一个窗口内的数据的操作处理。Flink的窗口函数分为两类:

  • 窗口聚合函数:ReduceFunction和AggregateFunction,来一条聚合一条,只在窗口关闭时才会输出
  • 全窗口处理函数:ProcessWindowFunction,来一条保存一条,只有在窗口关闭的时候才聚合或者处理,输出结果在这里插入图片描述

ReduceFuntion

注意:

  • 窗口的reduce() 时WindowedStream的聚合函数,而不是KeyedStream的reduce()聚合函数。
  • Flink可以使用ReduceFunction增量地聚合窗口的元素。
package 复习
importorg.apache.flink.api.common.functions.ReduceFunction
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.api.scala._
importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
importorg.apache.flink.streaming.api.windowing.time.Time

/**
 * ReduceFunction
 * 需求:
 * 获取某天某省---每10S新增最大记录值
    2022-5-18 beijing 2
    2022-5-18 beijing 3
    2022-5-18 shanghai 6
 */object Window_ReduceFunction {def main(args: Array[String]):Unit={val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ds = env.socketTextStream("localhost",6666).map(x =>{val fields: Array[String]= x.split(" ")val date = fields(0).trim
        val province = fields(1)val ts = fields(2).trim.toInt
        (date +"_"+ province, ts)}).keyBy(_._1).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce(new ReduceFunction[(String,Int)]{overridedef reduce(t:(String,Int), t1:(String,Int)):(String,Int)={if(t._2>t1._2){
            t
          }else{
            t1
          }}}).print()

    env.execute("test")}}

AggregateFunction

注意:

  • AggregateFunction比ReduceFunction更加灵活。是reduceFunction的一种特殊实现。 - AggregateFunction<泛型1,泛型2,泛型3>,泛型1 是输入类型,泛型2是累加器结果类型,泛型3是输出类型。- 需要重写四个方法:累加器的初始化,聚合逻辑,获取结果,会话窗口的合并窗口
  • Flink也可以使用AggregateFunction增量地聚合窗口的元素
package 复习
importorg.apache.flink.api.common.functions.AggregateFunction
importorg.apache.flink.api.scala._
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
importorg.apache.flink.streaming.api.windowing.time.Time

/**
 * AggregateFunction
 *
 * 窗口增量聚合函数:
 * 需求:
 * 获取某天某省---每10S平均新增记录
    2022-5-18 beijing 2
    2022-5-18 beijing 3
    2022-5-18 shanghai 6
 *
 */object Window_AggregateFunction {def main(args: Array[String]):Unit={val env = StreamExecutionEnvironment.getExecutionEnvironment
  val ds = env.socketTextStream("localhost",6666).map(x =>{val fields: Array[String]= x.split(" ")val date = fields(0).trim
      val province = fields(1)val ts = fields(2).trim.toInt
      (date +"_"+ province, ts)}).keyBy(_._1).window(TumblingProcessingTimeWindows.of(Time.seconds(10)))//    输入类型,累加器,输出类型.aggregate(new AggregateFunction[(String,Int),(String,Int,Int),(String,Int)]{//      初始化overridedef createAccumulator():(String,Int,Int)=("",0,0)//     单个增加overridedef add(in:(String,Int), acc:(String,Int,Int)):(String,Int,Int)={val cnt = acc._2+1val adds = acc._3+in._2
        (in._1,cnt,adds)}//    多个分区(窗口)进行合并overridedef merge(acc:(String,Int,Int), acc1:(String,Int,Int)):(String,Int,Int)={val mergecnt = acc._2+acc1._2
        val mergeadd = acc._3+acc1._3
        (acc._1,mergecnt,mergeadd)}//      获取结果    这里的string 我将个数和sum全部输出出来方便查看overridedef getResult(acc:(String,Int,Int)):(String,Int)={(acc._1+"_"+acc._2+"_"+acc._3,acc._3 / acc._2)}}).print()

    env.execute("test 08")}}

ProcessWIndowFunction

注意:

  • 全窗口函数-process(new ProcessWindowFunction)获取一个包含窗口所有元素的iterable,以及一个可以访问时间和状态信息的Context对象,这使得它比其他窗口函数提供了更多的灵活性。
  • ProcessWindowFunction以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到窗口逻辑可以处理为止。窗口比较大,或者数据量比较大,不建议使用,会占用更多的内存
package 复习
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.api.scala._
importorg.apache.flink.streaming.api.scala.function.ProcessWindowFunction
importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
importorg.apache.flink.streaming.api.windowing.time.Time
importorg.apache.flink.streaming.api.windowing.windows.TimeWindow
importorg.apache.flink.util.Collector

/**
 * AggregateFunction
 *
 * 窗口增量聚合函数:
 * 需求:
 * 获取某天某省---每10S平均新增记录
    2022-5-18 beijing 2
    2022-5-18 beijing 3
    2022-5-18 shanghai 6
 *
 */object Window_ProcessWindowFunction {def main(args: Array[String]):Unit={val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ds = env.socketTextStream("localhost",6666).map(x =>{val fields: Array[String]= x.split(" ")val date = fields(0).trim
        val province = fields(1)val ts = fields(2).trim.toInt
        (date +"_"+ province, ts)}).keyBy(_._1).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{overridedef process(key:String, context: Context, elements: Iterable[(String,Int)], out: Collector[(String,Int)]):Unit={var cnt =0var totalAdds =0
          elements.foreach(x =>{
            cnt+=1
            totalAdds = x._2+totalAdds
          })//          这里也是将计数和sum放在了string里。
          out.collect((key+"_"+cnt+"_"+totalAdds),totalAdds/cnt)}}).print()

    env.execute("test 09")}}

窗口增量聚合处理函数

ProcessWindowFunction可以与ReduceFunction或AggregateFunction结合在一起,在元素到达窗口时进行增量。当窗口关闭时,ProcessWindowFunction将提供聚合的结果,这允许它在访问ProcessWindowFunction的附加窗口元信息的同时,递增地计算窗口。你也可以使用旧的WindowFunction而不是ProcessWindowFunction来增加窗口聚合。

使用ReduceFunction与WindowFunction组合,返回窗口中最大的事件以及窗口的开始时间

package 复习
importorg.apache.flink.api.java.tuple.Tuple
importorg.apache.flink.streaming.api.scala.StreamExecutionEnvironment
importorg.apache.flink.api.scala._
importorg.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
importorg.apache.flink.streaming.api.windowing.time.Time
importorg.apache.flink.streaming.api.windowing.windows.TimeWindow
importorg.apache.flink.util.Collector

object Window_ReduceFunction_And_Process {def main(args: Array[String]):Unit={val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ds = env.socketTextStream("localhost",6666).map(x =>{val fields: Array[String]= x.split(" ")val date = fields(0).trim
        val province = fields(1)val ts = fields(2).trim.toInt
        (date +"_"+ province, ts)}).keyBy(_._1).window(TumblingProcessingTimeWindows.of(Time.seconds(10))).reduce((r1:(String,Int), r2:(String,Int))=>{if(r1._2 > r2._2){
          r1
        }else{
          r2
        }},(key:String,
         window: TimeWindow,
         maxReadings: Iterable[(String,Int)],
         out: Collector[String])=>{val max = maxReadings.iterator.next()
          out.collect(window.getStart +","+ max._1 +","+ max._2)}).print()

    env.execute("test 1")}}

Reduce&Aggregate&Process窗口函数区别

相同

  • 针对窗口里面的数据做聚合或者其他处理

不同

  • Reduce:聚合,通用性较强,可以使用多个function包括ReduceFunction,AggregateFunction,FoldFunction or ProcessWindowFunction 支持增量计算
  • Aggregate:是一种Reduce的通用特例,多用于累加;支持增量计算。
  • Process:比Redeuce和Aggregate更灵活,除聚合外可以做更多操作处理;不支持增量(即全量),窗口较大时不建议

ProcessWIndowFunction和WIndowFunction的区别

ProcessWindowFunction

  • 可以访问Keyed State(就像任何富函数一样,你们想象富函数是否能访问状态)。
  • ProcessWindowFunction还可以将当前窗口外的Keyed State使用到当前正在处理的窗口函数中。换句话就是不同窗口数据调用。
  • 该函数的process()调用接收到Context对象上的两个方法,它们允许访问两种类型的状态:- globalState():它允许访问执行窗口之外的Keyed State。- windowState():它允许访问执行窗口域内的Keyed State。
  • 当使用窗口状态时,清除窗口状态也很重要。这应该在clear()方法中发生。

WindowFunction

  • 使用WindowFunction的地方,你都能使用ProcessWindowFunction。
  • WindowFunction是ProcessWindowFunction的一个旧版本,它提供了较少的上下文信息,并且没有一些高级特性,比如每个窗口的Keyed State。
  • 这个接口将在某个时候被弃用。
标签: flink 大数据

本文转载自: https://blog.csdn.net/TB_CYX/article/details/136066243
版权归原作者 小小鱼爱疯狂 所有, 如有侵权,请联系我们删除。

“Flink 中Window Functions”的评论:

还没有评论