时间语义
如图所示,由事件生成器(Event Producer)生成事件,生成的事件数据被收集起来,首先进入分布式消息队列(Message Queue),然后被 Flink 系统中的 Source 算子(Data Source)读取消费,进而向下游的窗口算子(Window Operator)传递,最终由窗口算子进行计算处理。
有两个非常重要的时间点:
(1)一个是数据产生的时刻,我们把它叫作“事件时间”(Event Time);
(2)另一个是数据真正被Flink处理的时刻,叫作“处理时间”(Processing Time)。
我们所定义的窗口操作,到底是以哪种时间作为衡量标准,就是所谓的“时间语义”(Notions of Time)。由于分布式系统中存在网络传输延迟和时钟漂移,事件的处理时间相对发生时间会有所滞后。
事件时间(Event Time): 指每个事件在其产生设备上发生的时间。
处理时间(Processing Time): 是指对事件执行相应操作的机器的系统时间。
当流程序(a streaming program)基于**处理时间**(Processing Time)运行时,所有基于时间的操作(如时间窗口)将使用运行相应操作的机器的系统时钟。**每小时处理时间窗口**(An hourly processing time window)将包括在系统时钟指示整小时之间到达特定操作的所有记录。例如,如果应用程序在上午9:15开始运行,那么第一个小时处理时间窗口,也就是[9,10),将包括上午9:15到上午10:00之间处理的事件,下一个窗口,也就是[10,11)将包括上午10:00到上午11:00之间处理的事件,依此类推。无论事件是以有序的状态进入Flink事件窗口,还是无序的状态进入Flink事件窗口,窗口都将按照处理时间对已经到达的事件进行计算。
基于处理时间来处理数据的方式是有时间标准的,这个时间标准就是运行相应操作的机器的系统时钟。到什么时间就做什么事。
Processing time 提供了最佳的性能和最低的延迟,但是不能提供确定性,即计算结果是不确定的。例如,时间窗口为5min的求和统计,应用程序在 9:00 开始运行,则第一个时间窗口处理 [9:00, 9:05) 的事件,下一个窗口处理 [9:05, 9:10) 的事件,依此类推。通信延迟、作业故障重启等问题,可能导致窗口的计算结果是不一样的。如下图所示,假设事件(事件时间, 数值) 遇到上述问题,场景一:事件B(9:03,2)有网络延迟落在[9:10, 9:15),场景二:作业故障重启导致事件B(9:03,2)和事件C(9:06,3)落在[9:10, 9:15)。都会导致求和的结果不正确。
Processing Time是指数据被Operator处理时当前所在主机的系统时间。当用户选择使用Processing Time时,在Flink中所有和时间相关的操作都会按照当前系统时间进行处理,例如:Window窗口划分。使用这种语义时Flink中处理数据延迟较低、处理性能高,无论进入到Flink中的源头数据是否有乱序,只要被Flink应用接收的数据都会按照当前数据处理时的系统时间赋值时间语义,可见这种语义虽然处理数据性能高但不能解决数据乱序和延迟问题,从而导致数据统计不精准。Processing Time适合计算精度要求不高的计算场景。
既然使用Processing Time时间语义,计算的结果的正确性无法得到保证,那么使用Event Time语义呢?
Event Time是每个事件在其生成设备上发生的时间,这个时间往往是嵌入在事件记录中,例如一条数据中的时间戳记录了该事件数据的产生时间,该时间与下游Flink处理时系统时间无关。如果每个事件包含事件时间,当事件经过网络传输流转到Flink中处理时,理论上来说,先产生的事件会比后产生的事件先到达Flink系统中被处理,但实际情况往往由于网络传输延迟导致早先产生的事件后到达Flink系统被处理的情况(数据延迟到达),这就出现了数据乱序。但基于Event Time的时间概念,我们可以让Flink进行数据处理时基于事件产生的时间处理,这样就可以还原事件的先后关系,保证数据处理的准确性。Event Time 时间语义在实际生产环境中使用较多,该时间语义能保证乱序数据处理的准确性。
当Flink应用使用Event Time作为时间的衡量标准。窗口计算什么时候触发计算呢?假设事件是有序到达窗口的,窗口每隔1个小时计算一次,当Flink应用程序接收到的第一个事件的事件时间为9:15,程序则认为现在的时间是9:15,可能机器的系统时间已经为10:00,但这并不重要,因为Flink程序已经使用事件时间语义。此时[9,10)窗口不会触发计算,Flink应用继续接收事件,当接收到事件时间为10:00或者之后的事件,则Flink应用程序认为现在时间已经到了10:00或者超过10:00点了,应该触发[9,10)窗口计算。此时就可以使用事件时间作为时间标记来触发窗口计算。
那当事件因为延迟等原因无序到达窗口呢?比如此时10:00的事件已经到达窗口,是否要触发[9,10)窗口计算呢?还不可以,因为此时可能还有10:00之前的事件尚未到达,例如9:15的事件。那么Flink应用就需要“等一等”,但是这里我们又不能无限期的等待下去,所以这里需要有一个时间标记来决定何时触发窗口,就是要告诉Flink程序要等多久,这个时间标记就是Watermark。所以Watermark是基于Event Time语义给出的。
其实除了上面两个时间语义,还有一个时间语义叫Ingestion Time,它指的是事件进入Flink的时间。
水位线(Watermarks)
Flink中测量Event Time进展的机制是水位线(Watermarks)。Watermarks作为数据流的一部分,并携带时间戳t。实际上Watermarks的本质就是一个时间戳t,它度量了Event Time到底进展到什么时候了。Watermarks(t)表明事件时间在该流中已经达到时间t,这意味着后面的流中不应该再有时间戳小于等于t的事件。说白了,水位线就是另一种时钟,看水位线我们就知道此时此刻几点了,只是这个时钟不再是机器的系统时钟,那么水位线这个时钟到底怎么衡量时间,我们继续。
下图显示了有序事件流。图中每个方块表示一个事件,方块中的数字表示事件时间,在本例中,事件是按顺序排列的(相对于它们的时间戳),这意味着Watermark只是流中的周期性标记。所谓周期性标记指的是Flink每隔200ms计算一次Watermark。
图中有两个Watermak,w(11)和w(20),它们分别表示事件时间已经达到11和20。事实上,对于有序流,Watermark可以使用事件的事件时间即可。在流中周期性的标记即可。
Watermark对于乱序流( *out-of-order* streams)是至关重要的,如下图所示,其中事件不是按事件时间戳排序的。一般来说,Watermark(t)是一种声明,声明到流中的那个点,在时间戳t之前的所有事件都应该已经到达。例如w(11)声明到流中的这个位置,时间已经是11了。一旦Watermark到达一个算子,算子可以将其内部事件时间时钟的时间调整到Watermark的值。
Watermark是如何计算的呢?还记得之前说的“等一等”吗?“等一等”其实是事件到达Flink延迟的一个时间。例如上图中,当事件到达后,再等待4s。
w= max(事件时间)-延迟的时间
例如上图中,当事件7到达时,w=max(7)-4=3,
当事件11到达时,w=max(7,11)-4=7,
当事件15到达时,w=max(7,11,15)-4=11,
当事件9到达时,w=max(7,11,15,9)-4=11,
当事件12到达时,w=max(7,11,15,12)-4=11,
此时到了Watermarks每隔200ms计算一次,此时时间已经走到了w(11)。这时Flink认为小于等于11的事件已经全部到达。
如果是基于事件时间语义的有序流,Watermask计算时延迟的时间为0,即w=max(事件时间)。
并行流中的水位线
水位线是在源函数(source functions)处或直接在源函数之后生成的。源函数的每个并行子任务通常独立地生成其水位线。这些水位线定义了特定并行源处的事件时间。
当水位线在流处理项目中流动时,它们会提前到达算子处的事件时间。每当一个算子将其事件时间向前推进时,它就会在其后续算子的下游生成一个新的水位线。
一些算子(Operation)使用多个输入流;例如,联合,或keyBy(…)或partition(…)函数后面的算子。这样,一个算子的当前事件时间是其输入流事件时间的最小值。当它的输入流更新它们的事件时间时,算子也会更新。
下图显示了在并行流中流动的事件和水位线的示例,以及跟踪事件时间的操作。
图中绘制了并行度为2的流处理。流处理有source、map和window操作。操作右上角黄色方块中数字表示该算子的事件时间。
例如图中window算子的事件时间为14,原因是map(1)算子的事件时间为29,同时map(2)算子的事件时间为14,两者同时向下游window算子推进,到当前window算子时,window算子取两者事件时间最小值作为当前算子的事件时间,所以window(1)和window(2)两个并行的算子的事件时间为14。
窗口
聚合事件(例如计数、求和)在流处理中的工作方式与批处理中的不同。例如,计算流中的所有元素是不可能的,因为流通常是无限的。相反,流上的聚合(计数,总和等)是由窗口限定的,例如“过去5分钟的计数”或“最近100个元素的总和”。
Windows可以是时间驱动的(例如:每30秒)或数据驱动的(例如:每100个元素)。通常可以区分不同类型的窗口,例如时间驱动窗口分为**滚动窗口**(没有重叠)、**滑动窗口**(有重叠)、**会话窗口**(被不活动的间隙打断)和**全局窗口**。数据驱动窗口有**计数窗口**。
图中黄色箭头表示事件流方向,每隔灰色方块表示一个事件。Time windows表示时间驱动的窗口,每隔多长时间统计一次,Count(3) windows表示数据驱动的窗口,每3个事件统计一次。
滚动窗口—Tumbling Windows
滚动窗口赋值器将每个元素赋给指定窗口大小的窗口。滚动窗口有固定的大小,不重叠。例如,如果指定大小为5分钟的滚动窗口,则将评估当前窗口,并每五分钟启动一个新窗口,如下图所示。
滑动窗口—Sliding Windows
滑动窗口赋值器将元素赋给固定长度的窗口。与滚动窗口分配器类似,窗口的大小由窗口大小参数配置。另一个窗口滑动参数控制滑动窗口启动的频率。因此,如果滑动块小于窗口大小,则滑动窗口可以重叠。在这种情况下,元素被分配给多个窗口。
例如,您可以设置大小为10分钟的窗口,每隔5分钟滑动一次。这样,每隔5分钟就会出现一个窗口,其中包含最近10分钟内到达的事件,如下图所示。
当window slide<window size时,滑动后的窗口与滑动前的窗口之间会有重叠部分,例如图中window 1和window 2之间有重叠;
当window slide>=window size,滑动后的窗口与滑动前的窗口之间可能还会有间隙部分。
会话窗口—Session Windows
会话窗口分配器按活动的会话对元素进行分组。会话窗口不重叠,也没有固定的开始和结束时间,这与滚动窗口和滑动窗口不同。相反,当会话窗口在一段时间内没有接收到元素时,即当出现不活动间隙时,会话窗口关闭。会话窗口分配器可以配置一个静态会话间隙,也可以配置一个会话间隙提取器函数,该函数定义了不活动的时间长度。当此期限到期时,当前会话关闭,并将后续元素分配给新的会话窗口。
全局窗口—Global Windows
全局窗口赋值器将具有相同键的所有元素赋给同一个全局窗口。此窗口方案仅在指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有自然的结束。
例子
这里以单词统计为例来说明水位线生成以及窗口使用,本例的需求每隔5s中,每个单词的数量,代码如下:
package com.leboop;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import javax.annotation.Nullable;
import javax.sound.midi.Soundbank;
/**
* Description TODO.
* Date 2024/7/24 11:10
*
* @author leb
* @version 2.0
*/
public class OrderStreamWatermarkDemo {
public static void main(String[] args) throws Exception {
// 1. 获取流执行环境.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println("默认并行度parallelism=" + env.getParallelism());
env.setParallelism(1); // 为了方便测试,这里并行度设置为1,默认并行度为8.
// 2. 获取默认的时间语义. 本api对应的flink版本1.9.3,默认时间语义为:ProcessingTime
TimeCharacteristic streamTimeCharacteristic = env.getStreamTimeCharacteristic();
System.out.println("默认时间语义streamTimeCharacteristic=" + streamTimeCharacteristic);
long autoWatermarkInterval = env.getConfig().getAutoWatermarkInterval();
System.out.println("默认水位线生成时间间隔autoWatermarkInterval=" + autoWatermarkInterval);
// 3. 设置时间语义为事件时间.
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
autoWatermarkInterval = env.getConfig().getAutoWatermarkInterval();
System.out.println("默认事件时间语义的水位线生成时间间隔autoWatermarkInterval=" + autoWatermarkInterval);
// 设置水位线生成时间间隔.
env.getConfig().setAutoWatermarkInterval(200);
// 4. 监听socket数据源,每行输入格式:单词,事件时间,例如:hello,1000.
final DataStreamSource<String> sourceDS = env.socketTextStream("bigdata111", 9999);
// 5. 将socket数据源流转换成Word对象流
SingleOutputStreamOperator<Word> wordDs = sourceDS.map(new MapFunction<String, Word>() {
public Word map(String s) throws Exception {
String[] wordArr = s.split(",");
return new Word(wordArr[0], Long.valueOf(wordArr[1]), 1);
}
});
// 6. 设置watermark.
SingleOutputStreamOperator<Word> watermarkDS = wordDs.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Word>() {
private Long lateTime = 5 * 1000L;
private Long maxEventTime = 0L;
@Nullable
public Watermark getCurrentWatermark() {
// 生成Watermark.
Watermark watermark = new Watermark(maxEventTime - lateTime);
// System.out.println("当前水位线watermark=" + watermark.getTimestamp());
return watermark;
}
public long extractTimestamp(Word word, long previousElementTimestamp) {
// 抽取事件时间.
long eventTime = word.getTime() * 1000;
// 计算最大事件时间.
maxEventTime = Math.max(maxEventTime, eventTime);
// 此处代码仅仅是为了打印当前word的水位线.
Watermark eventWatermark = new Watermark(maxEventTime - lateTime);
word.setWatermark(eventWatermark); // 当前word的水位线.
System.out.println("抽取" + word);
return eventTime;
}
});
// 7. 将word对象流转换成key流.
KeyedStream<Word, String> wordKS = watermarkDS.keyBy(new KeySelector<Word, String>() {
public String getKey(Word word) throws Exception {
return word.getWord();
}
});
// 设置window计算:滚动窗口每隔5s计算一次.
WindowedStream<Word, String, TimeWindow> wordWS = wordKS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
// 窗口中事件统计.
SingleOutputStreamOperator<Word> wordCountResult = wordWS.sum("count");
// 输出窗口中事件.
wordWS.apply(new WindowFunction<Word, Word, String, TimeWindow>() {
public void apply(String s, TimeWindow window, Iterable<Word> input, Collector<Word> out) throws Exception {
System.out.println("窗口window=" + window + ",窗口中数据input=" + input);
}
});
// 8.打印结果并执行.
wordCountResult.print();
env.execute();
}
}
下面是Word实体类代码:
package com.leboop;
import org.apache.flink.streaming.api.watermark.Watermark;
/**
* Description TODO.
* Date 2024/7/24 13:05
*
* @author leb
* @version 2.0
*/
public class Word {
/**
* 单词.
*/
private String word;
/**
* 单词产生的事件时间.
*/
private Long time;
/**
* 统计单词个数.
*/
private Integer count;
private Watermark watermark;
public Word() {
}
public Word(String word, Long time, Integer count) {
this.word = word;
this.time = time;
this.count = count;
}
public String getWord() {
return word;
}
public void setWord(String word) {
this.word = word;
}
public Long getTime() {
return time;
}
public void setTime(Long time) {
this.time = time;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public Watermark getWatermark() {
return watermark;
}
public void setWatermark(Watermark watermark) {
this.watermark = watermark;
}
@Override
public String toString() {
return "Word{" +
"word='" + word + '\'' +
", time=" + time +
", count=" + count +
", watermark=" + watermark.getTimestamp() +
'}';
}
}
代码中有几点需要注意的:
(1)代码的并行度最好设置为1,默认并行度为8,这样后面通过socket输入的单词都进去该并行度中进行计算,加快触发,否则并行度太多,需要在socket中输入更多的单词,才能触发。
env.setParallelism(1);
(2)本文使用的是flink 1.9.3的api,该版本默认时间语义是ProcessingTime,后面flink新版本默认的时间语义是EventTime,查看TimeCharacteristic源码时间语义总共有三种:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.api;
import org.apache.flink.annotation.PublicEvolving;
/**
* The time characteristic defines how the system determines time for time-dependent
* order and operations that depend on time (such as time windows).
*/
@PublicEvolving
public enum TimeCharacteristic {
/**
* Processing time for operators means that the operator uses the system clock of the machine
* to determine the current time of the data stream. Processing-time windows trigger based
* on wall-clock time and include whatever elements happen to have arrived at the operator at
* that point in time.
*
* <p>Using processing time for window operations results in general in quite non-deterministic
* results, because the contents of the windows depends on the speed in which elements arrive.
* It is, however, the cheapest method of forming windows and the method that introduces the
* least latency.
*/
ProcessingTime,
/**
* Ingestion time means that the time of each individual element in the stream is determined
* when the element enters the Flink streaming data flow. Operations like windows group the
* elements based on that time, meaning that processing speed within the streaming dataflow
* does not affect windowing, but only the speed at which sources receive elements.
*
* <p>Ingestion time is often a good compromise between processing time and event time.
* It does not need any special manual form of watermark generation, and events are typically
* not too much out-or-order when they arrive at operators; in fact, out-of-orderness can
* only be introduced by streaming shuffles or split/join/union operations. The fact that
* elements are not very much out-of-order means that the latency increase is moderate,
* compared to event
* time.
*/
IngestionTime,
/**
* Event time means that the time of each individual element in the stream (also called event)
* is determined by the event's individual custom timestamp. These timestamps either exist in
* the elements from before they entered the Flink streaming dataflow, or are user-assigned at
* the sources. The big implication of this is that it allows for elements to arrive in the
* sources and in all operators out of order, meaning that elements with earlier timestamps may
* arrive after elements with later timestamps.
*
* <p>Operators that window or order data with respect to event time must buffer data until they
* can be sure that all timestamps for a certain time interval have been received. This is
* handled by the so called "time watermarks".
*
* <p>Operations based on event time are very predictable - the result of windowing operations
* is typically identical no matter when the window is executed and how fast the streams
* operate. At the same time, the buffering and tracking of event time is also costlier than
* operating with processing time, and typically also introduces more latency. The amount of
* extra cost depends mostly on how much out of order the elements arrive, i.e., how long the
* time span between the arrival of early and late elements is. With respect to the
* "time watermarks", this means that the cost typically depends on how early or late the
* watermarks can be generated for their timestamp.
*
* <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the the
* event's original time, rather than the time assigned at the data source. Practically, that
* means that event time has generally more meaning, but also that it takes longer to determine
* that all elements for a certain time have arrived.
*/
EventTime
}
本案例中使用EventTime语义。
(3) 默认情况下,EventTime时间语义的水位线生成时间间隔为200ms,可查看StreamExecutionEnvironment类中如下方法看到:
@PublicEvolving
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
if (characteristic == TimeCharacteristic.ProcessingTime) {
getConfig().setAutoWatermarkInterval(0);
} else {
getConfig().setAutoWatermarkInterval(200);
}
}
(4)输入的数据格式如下:
hello,1
spring,1
java,1
hello,6
spring,11
hello,7
java,14
java,20
hello,3
hello,5
hello,10
java,21
java,31
格式为单词,事件时间,例如第一行表示hello这个单词在1s时刻产生的。从数据中可以看到单词是无序的,例如hello,3在后面才出现。
(5)将每行输入数据通过英文逗号切分,转换成一个Word对象,代码如下:
SingleOutputStreamOperator<Word> wordDs = sourceDS.map(new MapFunction<String, Word>() {
public Word map(String s) throws Exception {
String[] wordArr = s.split(",");
return new Word(wordArr[0], Long.valueOf(wordArr[1]), 1);
}
});
(6)窗口设置
可通过assignTimestampsAndWatermarks为事件抽出时间戳和生成水位线。代码中设置了延迟时间为5s,代码如下:
private Long lateTime = 5 * 1000L;
(7)Word对象流转换成键流
为了统计每个单词的数量,需要将单词按照单词分流,因此需要一单词为键来统计,代码如下:
KeyedStream<Word, String> wordKS = watermarkDS.keyBy(new KeySelector<Word, String>() {
public String getKey(Word word) throws Exception {
return word.getWord();
}
});
(8)为键流设置窗口大小为5s的滚动窗口
WindowedStream<Word, String, TimeWindow> wordWS = wordKS.window(TumblingEventTimeWindows.of(Time.seconds(5)));
此时键流转换成的窗口流。此时,划分的滚动窗口为[0,5000),[5000,10000),[10000,15000),……。每个窗口含头不含尾,窗口时间单位为ms。
(9)窗口流按照单词对象Word的count字段来统计单词数量,代码如下:
SingleOutputStreamOperator<Word> wordCountResult = wordWS.sum("count");
这里需要Word类必须是POJO,例如该实体类不是public修饰,在单词统计wordWS.sum("count")处会报错如下:
Exception in thread "main" org.apache.flink.api.common.typeutils.CompositeType$InvalidFieldReferenceException: Cannot reference field by field expression on GenericType<com.leboop.Word>Field expressions are only supported on POJO types, tuples, and case classes. (See the Flink documentation on what is considered a POJO.)
at org.apache.flink.streaming.util.typeutils.FieldAccessorFactory.getAccessor(FieldAccessorFactory.java:193)
at org.apache.flink.streaming.api.functions.aggregation.SumAggregator.<init>(SumAggregator.java:55)
at org.apache.flink.streaming.api.datastream.WindowedStream.sum(WindowedStream.java:1367)
at com.leboop.OrderStreamWatermarkDemo.main(OrderStreamWatermarkDemo.java:101)
(10)打印每个窗口中有哪些事件
为了知道统计背后的原理,这里打印出每个窗口中的事件,代码如下:
wordWS.apply(new WindowFunction<Word, Word, String, TimeWindow>() {
public void apply(String s, TimeWindow window, Iterable<Word> input, Collector<Word> out) throws Exception {
System.out.println("窗口window=" + window + ",窗口中数据input=" + input);
}
});
(11)最后打印出统计结果并执行程序
代码如下:
wordCountResult.print();
env.execute();
下面打开bigdata111的socket,并逐行输入数据,如下:
程序输出结果如下:
默认并行度parallelism=8
默认时间语义streamTimeCharacteristic=ProcessingTime
默认水位线生成时间间隔autoWatermarkInterval=0
默认事件时间语义的水位线生成时间间隔autoWatermarkInterval=200
抽取Word{word='hello', time=1, count=1, watermark=-4000}
抽取Word{word='spring', time=1, count=1, watermark=-4000}
抽取Word{word='java', time=1, count=1, watermark=-4000}
抽取Word{word='hello', time=6, count=1, watermark=1000}
抽取Word{word='spring', time=11, count=1, watermark=6000}
窗口window=TimeWindow{start=0, end=5000},窗口中数据input=[Word{word='hello', time=1, count=1, watermark=-4000}]
窗口window=TimeWindow{start=0, end=5000},窗口中数据input=[Word{word='spring', time=1, count=1, watermark=-4000}]
窗口window=TimeWindow{start=0, end=5000},窗口中数据input=[Word{word='java', time=1, count=1, watermark=-4000}]
Word{word='hello', time=1, count=1, watermark=-4000}
Word{word='spring', time=1, count=1, watermark=-4000}
Word{word='java', time=1, count=1, watermark=-4000}
抽取Word{word='hello', time=7, count=1, watermark=6000}
抽取Word{word='java', time=14, count=1, watermark=9000}
抽取Word{word='java', time=20, count=1, watermark=15000}
窗口window=TimeWindow{start=5000, end=10000},窗口中数据input=[Word{word='hello', time=6, count=1, watermark=1000}, Word{word='hello', time=7, count=1, watermark=6000}]
Word{word='hello', time=6, count=2, watermark=1000}
窗口window=TimeWindow{start=10000, end=15000},窗口中数据input=[Word{word='spring', time=11, count=1, watermark=6000}]
窗口window=TimeWindow{start=10000, end=15000},窗口中数据input=[Word{word='java', time=14, count=1, watermark=9000}]
Word{word='spring', time=11, count=1, watermark=6000}
Word{word='java', time=14, count=1, watermark=9000}
抽取Word{word='hello', time=3, count=1, watermark=15000}
抽取Word{word='hello', time=5, count=1, watermark=15000}
抽取Word{word='hello', time=10, count=1, watermark=15000}
抽取Word{word='java', time=21, count=1, watermark=16000}
抽取Word{word='java', time=31, count=1, watermark=26000}
Word{word='java', time=20, count=2, watermark=15000}
窗口window=TimeWindow{start=20000, end=25000},窗口中数据input=[Word{word='java', time=20, count=1, watermark=15000}, Word{word='java', time=21, count=1, watermark=16000}]
结果解释:
(1)程序运行后,即会输出如下结果:
默认并行度parallelism=8
默认时间语义streamTimeCharacteristic=ProcessingTime
默认水位线生成时间间隔autoWatermarkInterval=0
默认事件时间语义的水位线生成时间间隔autoWatermarkInterval=200
(2)当输入前四行数据
hello,1
spring,1
java,1
hello,6
会调用抽取时间戳extractTimestamp方法,然后会输出
抽取Word{word='hello', time=1, count=1, watermark=-4000}
抽取Word{word='spring', time=1, count=1, watermark=-4000}
抽取Word{word='java', time=1, count=1, watermark=-4000}
抽取Word{word='hello', time=6, count=1, watermark=1000}
此时水位线为1000ms,这表示程序认为当前的事件时间才到1000ms,此时窗口[0,5000)还不能触发计算,接着输入下一条数据:
spring,11
此时继续抽取时间戳,输出
抽取Word{word='spring', time=11, count=1, watermark=6000}
注意当前水位线已经到达6000,程序认为事件时间已经到了6000ms,既会触发[0,5000)窗口的计算,此时窗口中的数据有
hello,1
spring,1
java,1
这里注意已经输入的如下两条数据并不在该窗口中:
hello,6
spring,11
数据会按照输入数据的事件时间正确地分配到每个窗口。因此[0,5000)窗口统计结果为:
Word{word='hello', time=1, count=1, watermark=-4000}
Word{word='spring', time=1, count=1, watermark=-4000}
Word{word='java', time=1, count=1, watermark=-4000}
也即hello、spring、java各出现1次。以此类推。
版权归原作者 L(刘二宝) 所有, 如有侵权,请联系我们删除。