0


【大数据】带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)

提醒:本文的示例代码基于flink1.13,在讲window的使用时也会说明flink版本一些api的弃用情况。

文章目录

一、Time的简介

flink 中的streaming定义了多种流式处理的时间,Event Time(事件时间)、Ingestion Time(接收时间)、Processing Time(处理时间)。

Event Time:指事件产生的时间,比如业务数据库一条数据产生的时间、一条日志数据产生的时间等。

Ingestion Time:指flink接收数据的时间。

Processing Time:指数据被flink算子处理的时间。

在真实的业务代码开发中,我们常使用Event TIme、Processing Time。

二、Window的概念

flink中,streaming流式计算被设计为用于处理无限数据集的数据处理引擎,其中无限数据集是指一种源源不断有数据过来的数据集,window (窗口)将无界数据流切割成为有界数据流进行处理的方式。实现方式是将流分发到有限大小的桶(bucket)中进行分析。

在这里插入图片描述

三、Window的类型

1、分类关系
  • TimeWindow(计时窗口),按照一定时间生成 Window(比如:每5秒
  • CountWindow(计数窗口),按照指定的数据量生成一个 Window,与时间无关(比如:每20个元素)。
  • TumblingWindow(滚动窗口)
  • Sliding Window(滑动窗口)
  • Session Window(会话窗口)

它们之间的关系图

在这里插入图片描述

2、滚动窗口(Tumbling Windows)

将数据依据固定的窗口长度对数据进行切片,它的特点是时间对齐,窗口长度固定,没有重叠。例如:如果你指定了一个 5分钟大小的滚动窗口,窗口的创建如下图所示:

在这里插入图片描述
适合做每个时间段的聚合计算,例如:统计每5分钟内用户的热搜词

3、滑动窗口(Sliding Windows)

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度和滑动间隔组成,特点为时间对齐,窗口长度固定,可以有重叠。例如:你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据,如下图所示:

在这里插入图片描述
适用于对最近一个时间段内的统计,例如:求某接口最近 5min 的失败率来决定是否要报警这种场景。

4、会话窗口(Session Windows)

由一系列事件组合一个指定时间长度的 timeout 间隙组成,类似于 web 应用的session,也就是一段时间没有接收到新数据就会生成新的窗口。他的特点为:窗口大小是由数据本身决定,它没有固定的开始和结束时间。会话窗口根据Session gap间隙切分不同的窗口,当一个窗口在大于Session gap间隙的时间内没有接收到新数据时,窗口将关闭。例如:设置的时间gap是5秒,那么,当相邻的记录相差>=5秒时,则触发窗口。

在这里插入图片描述
适用于每个用户在一个独立的session中平均页面访问时长,前后两个session的间隔时间为15分钟这种场景。

四、windows 的使用

window算子api的使用分有key的、无key的,它们api分别的写法如下:

Keyed Windows
在这里插入图片描述
Non-Keyed Windows
在这里插入图片描述
我们下面按照上面的window分类关系去讲解api的使用,且举得例子都是Keyed Windows的,可以类比使用对应api理解Non-Keyed Windows。

1、Time Window

在time Window中,我们经常会在flink的老版本中使用timeWindow,如下图:
在这里插入图片描述
在这里插入图片描述
输入一个Time.seconds(n)是滚动窗口,输入两个是滑动窗口。特别注意,这里默认使用的Time是Processing Time。

在flink1.13中方法已经过时,能用但不建议使用。请使用原生window进行使用,如下图:
在这里插入图片描述
在window的参数中指定使用的窗口类型、时间类型,这个示例可以看出使用的是滚动窗口,时间类型为Processing Time

2、Count Window

Count Window中我们常在flink中使用
countWindow(高版本中没有过时),传入一个参数就是滚动窗口,如果传入两个参数就是滑动窗口,如下图:
在这里插入图片描述
在这里插入图片描述

3、自定义Window

如下图Keyed Windows、Non-Keyed Windows两种使用自定义的接口, 可以自己定义trigger(触发器)、evictor(移除器)、allowedLateness(允许窗口延时)、sideOutputLateData(侧输出流)等,自定义window。
在这里插入图片描述

4、示例
4.1 滚动窗口示例

需求:每隔5s时间,统计最近5s出现的单词

代码:

  1. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  2. import org.apache.flink.streaming.api.windowing.time.Time
  3. import org.apache.flink.api.scala._
  4. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
  5. /**
  6. * todo: 基于计时的滚动窗口应用--Tumbling Windows
  7. *
  8. * 滚动窗口,每隔5s时间,统计最近5s出现的单词
  9. */
  10. object TestTimeWindowByTumbling {
  11. def main(args: Array[String]): Unit = {
  12. //todo:1、获取流式处理的环境
  13. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  14. //todo:2、获取数据源
  15. val socketTextStream: DataStream[String] = env.socketTextStream("companynode01",19999)
  16. //todo: 3、对数据进行操作处理
  17. socketTextStream.flatMap(x=>x.split(" "))
  18. .map(x=>(x, 1))
  19. .keyBy(_._1)
  20. // .timeWindow(Time.seconds(5)) //过时
  21. // 滚动窗口
  22. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  23. .sum(1)
  24. .print()
  25. //todo: 4、启动
  26. env.execute("TestTimeWindowByTumbling")
  27. }
  28. }
4.2 滑动窗口示例

需求:每隔5s时间,统计最近10s出现的单词

代码:

  1. import org.apache.flink.api.scala._
  2. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  3. import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows
  4. import org.apache.flink.streaming.api.windowing.time.Time
  5. /**
  6. * todo: 基于计时的滚动窗口应用--Sliding Windows
  7. *
  8. * 滑动窗口,每隔5s时间,统计最近10s出现的单词
  9. */
  10. object TestTimeWindowBySliding {
  11. def main(args: Array[String]): Unit = {
  12. //todo:1、获取流式处理的环境
  13. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  14. //todo:2、获取数据源
  15. val socketTextStream: DataStream[String] = env.socketTextStream("companynode01",19999)
  16. //todo: 3、对数据进行操作处理
  17. socketTextStream.flatMap(x=>x.split(" "))
  18. .map(x=>(x, 1))
  19. .keyBy(_._1)
  20. // .timeWindow(Time.seconds(15),Time.seconds(5)) //过时
  21. //滑动窗口
  22. .window(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
  23. .sum(1)
  24. .print()
  25. //todo: 4、启动
  26. env.execute("TestTimeWindowBySliding")
  27. }
  28. }

五、window Function(窗口函数)

1、分类

窗口函数定义了要对窗口中收集的数据做的计算操作。主要可以分为两类:

  • 增量聚合函数(incremental aggregation functions),它在每条数据到来就进行计算,保持一个简单的状态。典型的增量聚合函数有ReduceFunction、AggregateFunction等。
  • 全量窗口函数(full window functions),它先把窗口所有数据收集起来,等到计算的时候会遍历所有数据。典型的全量窗口聚合函数有apply、process。
2、增量聚合统计

窗口当中每加入一条数据,就进行一次统计。常用的增量聚合算子有reduce(reduceFunction)、aggregate(aggregateFunction)sum()、min()、max()等。

在这里插入图片描述

示例:

需求:通过接收socket当中输入的单词,统计每5秒钟单词的累计数量

使用reduce的代码

  1. import org.apache.flink.streaming.api.windowing.time.Time
  2. import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
  3. //import org.apache.flink.api.java.functions.KeySelector
  4. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  5. import org.apache.flink.api.scala._
  6. /**
  7. * 增量聚合函数
  8. * 通过接收socket当中输入的数据,统计每5秒钟数据的累计的值
  9. * 基于 `reduce` 函数的计时窗口数据增量聚合
  10. */
  11. object TestReduceOfTimeWindow {
  12. def main(args: Array[String]): Unit = {
  13. val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  14. val socketTextStream: DataStream[String] = env.socketTextStream("companynode01", 19999)
  15. socketTextStream.flatMap(x => x.split(" "))
  16. .map(x=>(x, 1))
  17. // .keyBy(0)
  18. .keyBy(_._1)
  19. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  20. .reduce((c1, c2) => (c1._1, c1._2+c2._2))
  21. .print()
  22. env.execute()
  23. }
  24. }

使用aggregate的代码

  1. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  2. import org.apache.flink.api.scala._
  3. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
  4. import org.apache.flink.streaming.api.windowing.time.Time
  5. import org.apache.flink.api.common.functions.AggregateFunction
  6. /**
  7. * 增量聚合函数
  8. * 基于`aggregate`函数的计时窗口数据增量聚合
  9. */
  10. object TestAggregateOfTimeWindow {
  11. def main(args: Array[String]): Unit = {
  12. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  13. val socketTextStream: DataStream[String] = env.socketTextStream("companynode01", 9999)
  14. socketTextStream.flatMap(x => x.split(" "))
  15. .map(x => (x, 1))
  16. .keyBy(_._1)
  17. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  18. .aggregate(new MyAggregateFunction)
  19. .print()
  20. env.execute("TestAggregateOfTimeWindow")
  21. }
  22. }
  23. //自定义AggregateFunction函数
  24. class MyAggregateFunction extends AggregateFunction[(String, Int), (String, Int), (String, Int)] {
  25. var initAccumulator: (String, Int) = ("", 0)
  26. //累加值的初始化操作
  27. override def createAccumulator(): (String, Int) = {
  28. initAccumulator
  29. }
  30. //累加元素
  31. override def add(in: (String, Int), acc: (String, Int)): (String, Int) = {
  32. (in._1, acc._2 + in._2)
  33. }
  34. // 聚合的结果
  35. override def getResult(acc: (String, Int)): (String, Int) = {
  36. acc
  37. }
  38. //分布式累加
  39. override def merge(acc: (String, Int), acc1: (String, Int)): (String, Int) = {
  40. (acc._1, acc._2 + acc1._2)
  41. }
  42. }
3、全量聚合统计

等到窗口截止,或者窗口内的数据全部到齐,然后再进行统计。

常用的增量聚合算子有apply(windowFunction)、process(processWindowFunction)其中processWindowFunction 比 windowFunction 提供了更多的上下文信息。

示例:

需求:通过接收socket当中输入的数值,统计5秒钟输入数值的平均值。

使用apply函数实现的代码:

  1. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  2. import org.apache.flink.api.scala._
  3. import org.apache.flink.streaming.api.scala.function.WindowFunction
  4. import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
  5. import org.apache.flink.streaming.api.windowing.time.Time
  6. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  7. import org.apache.flink.util.Collector
  8. object TestApplyOfTimeWindow {
  9. def main(args: Array[String]): Unit = {
  10. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  11. //2、获取数据源,例如输入如下数据源
  12. /**
  13. * 1
  14. * 2
  15. * 3
  16. * 4
  17. * 5
  18. * 6
  19. */
  20. val socketStream: DataStream[String] = env.socketTextStream("companynode01", 19999)
  21. socketStream.flatMap(x => x.split(" "))
  22. .map(x => ("countAvg", x.toInt))
  23. // .keyBy(0)
  24. //keyBy中的key是个虚拟key,不会输出
  25. .keyBy(_._1)
  26. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  27. .apply(new MyApplyWindowFunction)
  28. .print()
  29. env.execute("TestApplyOfTimeWindow")
  30. }
  31. }
  32. class MyApplyWindowFunction extends WindowFunction[(String, Int), Double, String, TimeWindow]{
  33. override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[Double]): Unit = {
  34. //记次数
  35. var totalNum = 0
  36. //记累加结果
  37. var countNum = 0
  38. for(elem <- input) {
  39. totalNum += 1
  40. countNum += elem._2
  41. }
  42. out.collect(countNum/totalNum)
  43. }
  44. }

使用process函数实现的代码:

  1. import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  2. import org.apache.flink.api.scala._
  3. import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
  4. import org.apache.flink.streaming.api.windowing.assigners.{TumblingEventTimeWindows, TumblingProcessingTimeWindows}
  5. import org.apache.flink.streaming.api.windowing.time.Time
  6. import org.apache.flink.streaming.api.windowing.windows.TimeWindow
  7. import org.apache.flink.util.Collector
  8. /**
  9. * 基于 `process` 函数的计时窗口数据全量聚合
  10. */
  11. object TestProcessOfTimeWindow {
  12. def main(args: Array[String]): Unit = {
  13. val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  14. val socketStream: DataStream[String] = env.socketTextStream("companynode01", 19999)
  15. socketStream.flatMap(x => x.split(" "))
  16. .map(x => ("countAvg", x.toInt))
  17. .keyBy(x => x._1)
  18. //是使用的processingTime,接口已经过时
  19. // .timeWindow(Time.seconds(5))
  20. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  21. .process(new MyProcessWindowFunction)
  22. .print()
  23. env.execute("TestProcessOfTimeWindow")
  24. }
  25. }
  26. class MyProcessWindowFunction extends ProcessWindowFunction[(String, Int), Double, String, TimeWindow] {
  27. override def process(key: String, context: Context,
  28. elements: Iterable[(String, Int)],
  29. out: Collector[Double]): Unit = {
  30. //计次数
  31. var totalNum = 0
  32. //计累加次数
  33. var countNum = 0
  34. for (elem <- elements) {
  35. totalNum += 1
  36. countNum += elem._2
  37. }
  38. //计算平均值, totalNum.asInstanceOf[Double]是将totalNum强转成Double类型
  39. out.collect(countNum/totalNum.asInstanceOf[Double])
  40. }
  41. }
标签: flink time windows

本文转载自: https://blog.csdn.net/Chenftli/article/details/124164139
版权归原作者 橙子园 所有, 如有侵权,请联系我们删除。

“【大数据】带你理解并使用flink中的Time、Window(窗口)、Windows Function(窗口函数)”的评论:

还没有评论