0


Spark Streaming 计算窗口的理解

Spark Streaming Window

1.创建一个流

// 创建一个 SparkConf 对象,并设置应用的名字SparkConf conf =newSparkConf().setAppName("JavaSparkStreamingExample").setMaster("local[*]");// 创建一个 JavaStreamingContext 对象,它是所有 Spark Streaming 功能的入口点。JavaStreamingContext jssc =newJavaStreamingContext(conf,newDuration(1000));// 创建一个 DStream,它会连接到 hostname:port 的服务器JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost",9999);

2.窗口操作的工作原理

在Apache Spark Streaming中,窗口操作不是在第一次触发时仅创建一次,而是根据定义的滑动间隔定期创建和触发。每个窗口操作都会根据其配置的窗口长度和滑动间隔周期性地处理数据。这个过程是持续的,直到StreamingContext停止。

  • 窗口长度(Window Duration):这是窗口覆盖的时间范围,即在每次窗口操作中会处理多长时间内的数据。
  • 滑动间隔(Sliding Interval):这是窗口操作触发的频率。每隔一定的滑动间隔时间,窗口会向前“滑动”,并且处理在新窗口长度内的数据。

窗口的周期性创建和触发

每次窗口操作触发时,都会基于当前的时间点向前回溯窗口长度的时间范围,收集那段时间内的数据进行处理。这意味着:

  • 窗口是周期性创建的,每个滑动间隔结束时,都会基于那个时间点创建一个新的窗口。
  • 窗口操作是周期性触发的,每次窗口滑动时,都会对最近的窗口长度时间内的数据进行处理。

示例

假设有一个窗口长度为10分钟,滑动间隔为5分钟的窗口操作。这意味着:

  • 每5分钟,窗口操作会触发一次。
  • 每次触发时,它会处理过去10分钟内的数据。

这个过程从StreamingContext启动开始,每隔5分钟重复一次,直到StreamingContext停止。

结论

当定义一个窗口操作时,Spark会根据指定的窗口长度和滑动间隔来安排这个操作。从流处理作业开始的那一刻起,Spark就会开始跟踪时间,以决定何时触发窗口操作。这个过程可以分为以下几个步骤:

  1. 定义窗口操作:当您定义一个窗口操作时,您需要指定窗口长度和滑动间隔。窗口长度决定了要处理的数据的时间范围,而滑动间隔决定了窗口操作触发的频率。
  2. 启动流处理作业:一旦流处理作业(StreamingContext)启动,Spark Streaming就会开始监控时间,并根据定义的窗口操作来安排数据处理。
  3. 触发窗口操作:在第一次达到滑动间隔时间后,Spark会触发窗口操作,处理过去窗口长度时间内的数据。例如,如果窗口长度为10分钟,滑动间隔为5分钟,则每隔5分钟,Spark会处理最近10分钟内的数据。
  4. 周期性触发:在第一次触发之后,Spark会继续每隔定义的滑动间隔时间触发窗口操作。每次触发时,它都会处理当前窗口长度内的数据。
  5. 计时和状态维护:Spark内部会维护一个计时器和状态信息,以确保每个窗口操作都能在正确的时间触发。这意味着Spark确实会在第一次触发窗口操作后“记录下有这么一个窗口操作”,并根据滑动间隔来“计时等待”直到下一次触发。

这种机制确保了窗口操作可以按照预定的频率周期性地处理数据,无论数据是连续到达还是有间断。这对于处理实时数据流非常重要,因为它允许开发者基于时间窗口来分析数据,无论数据的到达模式如何。

3.流式窗口计算示例

cacheRdd.window(Durations.seconds(10),Durations.seconds(5))

在 Spark Streaming 中,窗口操作允许你对最近的数据进行操作。窗口有两个参数:窗口长度和滑动间隔。

  • 窗口长度:定义了一个窗口覆盖的数据的时间范围,例如,窗口长度为 10 秒,意味着窗口将包含最近 10 秒的数据。
  • 滑动间隔:定义了窗口操作执行的频率,例如,滑动间隔为 5 秒,意味着每 5 秒执行一次窗口操作。
  • 在 Spark Streaming 中,滑动间隔需要是批处理间隔的整数倍,但窗口长度并不需要是滑动间隔的整数倍。为了避免可能的问题,通常建议将窗口长度设为滑动间隔的整数倍,这样可以确保每个窗口都有相同的数据点数。

当如果窗口大小不是滑动间隔的整数倍时:

cacheRdd.window(Durations.seconds(300+10),Durations.seconds(300))

窗口大小(Window Length):Durations.seconds(300 + 10)。窗口大小就是300 + 10 = 310秒。这意味着每个窗口将覆盖310秒(即5分钟零10秒)的数据。

滑动间隔(Slide Interval):Durations.seconds(300)。这里的滑动间隔是300秒,即5分钟。滑动间隔定义了窗口操作执行的频率。在这个例子中,每5分钟,窗口操作会被执行一次。

窗口大小为310秒,滑动间隔为300秒。这意味着每5分钟执行一次窗口操作,但每次处理的数据覆盖的时间范围是5分钟零10秒。这里给出一个具体的时间节点示例来解释这个过程。

示例

假设数据流开始时间是00:00(午夜)
理论情况如下

  1. 第一个窗口:00:00 - 00:05:10- 窗口开始时间:00:00- 窗口结束时间:00:05:10- 在00:05时刻,窗口操作执行,处理的是从00:00到00:05:10的数据。
  2. 第二个窗口:00:05 - 00:10:10- 窗口开始时间:00:05- 窗口结束时间:00:10:10- 在00:10时刻,窗口操作执行,处理的是从00:05到00:10:10的数据。
  3. 第三个窗口:00:10 - 00:15:10- 窗口开始时间:00:10- 窗口结束时间:00:15:10- 在00:15时刻,窗口操作执行,处理的是从00:10到00:15:10的数据。

以此类推,每个后续窗口都会向前滑动300秒(5分钟),但每次都处理310秒(5分钟零10秒)的数据。

关键点

  • 数据重叠:由于窗口大小(310秒)比滑动间隔(300秒)大,所以每个窗口与前一个窗口有10秒的数据重叠。这意味着某些数据会被重复处理,这在某些情况下是有意义的,比如确保数据处理的完整性和连续性。
  • 实时处理与延迟:虽然处理是实时进行的,但由于窗口的大小,每次处理都会稍微延后10秒开始。这种延迟对于确保数据完整性可能是可接受的,特别是在需要覆盖每一条数据的场景中。

实际窗口操作的触发时刻

在Spark Streaming中,窗口操作的触发时刻是基于滑动间隔的。这意味着,尽管窗口结束时间是00:05:10,实际上窗口操作(即数据的处理计算)是在窗口的结束时间之后立即触发的。因此,对于您的示例:

  • 窗口开始时间:00:00
  • 窗口结束时间:00:05:10

窗口操作(处理00:00到00:05:10之间的数据)实际上是在00:05:10这一时刻之后立即开始的。这是因为系统需要等到窗口内所有数据都到达并且窗口关闭之后,才能开始处理这段时间内的数据。

  • 下一个窗口
  • 窗口开始时间:00:05
  • 窗口结束时间:00:10:10

执行时间:00:10:10

理解执行时间

  • 理论上的执行时间:从理论上讲,可以认为窗口操作的执行是在窗口结束的那一刻(00:05:10)触发的。但实际上,处理的开始可能会因为系统调度、资源可用性等因素有微小的延迟。
  • 实际执行时间:实际上,处理开始的确切时间可能会稍稍晚于00:05:10,这取决于系统的当前负载、资源调度等因素。但从概念上讲,我们认为处理是在窗口结束时触发的。

4.Spark Streaming window的缺陷

时间概念

  • 1)、事件时间【EventTime】,表示数据本身产生的时间,该字段在【数据本身】中;
  • 2)、注入时间【IngestionTime】,表示数据到达流式系统时间,简而言之就是流式处理系统接收到数据的时间;
  • 3)、处理时间【ProcessingTime】,表示数据被流式系统真正开始计算操作的时间。

Spark Streaming只能基于注入时间, 当数据延迟到达时计算结果可能存在偏差


本文转载自: https://blog.csdn.net/qq_38174815/article/details/136919404
版权归原作者 橙子不吃葱 所有, 如有侵权,请联系我们删除。

“Spark Streaming 计算窗口的理解”的评论:

还没有评论