一:窗口简述
Flink是一种流式计算引擎,主要是来处理无界数据流的,数据源源不断、无穷无尽。想要更加方便高效地处理无界流,一种方式就是将无限数据切割成有限的“数据块”进行处理,这就是所谓的“窗口”(Window)。 【把窗口理解成一个“桶”,Flink则可以把流切割成大小有限的“储存桶”,把数据分发到不同的桶里,每一个窗口都是一个桶。当窗口结束,就对每一个桶的数据进行收集处理】
二: 窗口的分类
** 1)按照驱动类型分**
** (1) 时间窗口**
原理:建立一个窗口,在固定的额时间段内不断收集数据,到达结束时间的时候窗口结束收集数据,生成结果,窗口销毁。【就像地铁一样,间隔一段时间发车,无论车上有多少乘客,地铁都会往前开】
** (2) 计数窗口**
** ** 原理:计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。每个窗口截取数据的个数,就是窗口的大小。基本思路是“人齐发车”
** 2)按照窗口分配数据的规则分类(以下均为以时间驱动为例)**
主要概念:窗口的大小,窗口的滑动步长【两个窗口重叠的部分】,会话间隔
** (1) 滚动窗口(Tumbling Windows)**
滚动窗口有固定的大小,而且窗口之间不会重叠,每个数据都在一个窗口且只属于这个窗口。在一个固定时间内,接受数据的传入。到了截止时间,收集数据,输出结果。应用类型广泛,可以对每个时间做聚合统计。
** **
** (2) 滑动窗口 (Sliding Windows)**
** ** 当窗口大小大于窗口步长的时候就会出现滑动,滑动窗口会重叠,同时数据也会同时被分到多个窗口,滑动步长就代表了计算频率。适合计算结果更新较快的场景。
** (3) 会话窗口 (Session Windows)**
** ** 原理:基于“会话”来进行数据分组、如果相邻两个数据到来的时间间隔 小于指定大小,那么这两个数据在同一个窗口内。如果 大于则数据到了新的窗口,且前面的窗口关闭。 会话窗口长度,起始结束时间不确定。各个分区之间窗口没有任何关联。在规定的时间内没有数据到来触发一次计算。可以用于保持会话的场景下。
(4) 全局窗口 (Global Windows)
** ** 把相同key的数据全部分配到一个窗口之中,窗口没有结束是不会触发计算的。如果希望对数据处理,需要定义一个触发器。
三:窗口API
** 1)按键分区(Keyed)和非按键分区(Non-Keyed)**
(1)按键分区窗口(Keyed Windows)
经过按键分区keyBy操作后,数据流会按照key被分为多条逻辑流(logical streams),这就是KeyedStream。基于KeyedStream进行窗口操作时,窗口计算会在多个并行子任务上同时执行。相同key的数据会被发送到同一个并行子任务,而窗口操作会基于每个key进行单独的处理。所以可以认为,每个key上都定义了一组窗口,各自独立地进行统计计算。
在代码实现上,我们需要先对DataStream调用.keyBy()进行按键分区,然后再调用.window()定义窗口。
stream.keyBy(...) .window(...)
(2)非按键分区(Non-Keyed Windows)
如果没有进行keyBy,那么原始的DataStream就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了1。setParallelism(1)
在代码中,直接基于DataStream调用.windowAll()定义窗口。
stream.windowAll(...)
注意:对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的
四:窗口分配器
1)Flink中的时间语义
到底是以哪种时间作为衡量的标准,就被成为“时间语义”
** 2)时间窗口分配器**
** (1) 处理时间的时间语义窗口**
特征:
- 基于处理节点的当前系统时间。
- 实现简单,延迟低。
- 适用于数据实时性要求高且乱序不多的场景。
- 在处理乱序数据时,结果可能不准确。
**应用场景: **
- 监控系统指标(如CPU使用率、内存使用情况)。
- 实时数据分析,延迟比准确性更重要的场景。
**使用参数: **
TumblingProcessingTimeWindows.of(Time.) 滚动窗口SlidingProcessingTimeWindows.of(Time.) 滑动窗口ProcessingTimeSessionWindows.withGap() ProcessingTimeSessionWindows.withDynamicGap() 会话窗口
(2) 处理事件的时间语义窗口
特征:
- 基于事件的时间戳。
- 处理乱序数据,通过Watermark机制来处理延迟事件。
- 更加准确,适用于要求严格时间语义的场景。
**实时场景: **
- 实时日志分析(如用户行为分析、点击流分析)。
- 需要严格时间顺序和准确性的场景,如金融交易分析。
使用参数:
TumblingEventTimeWindows.of(Time.) 滚动窗口SlidingEventTimeWindows.of(Time.) 滑动窗口
EventTimeSessionWindows.withGap()EventTimeSessionWindows.withDynamicGap()
会话窗口
** 3)计数窗口分配器**
countWindow(5)计数窗口分配器 【滚动】 满足5条输出一次计算结果countWindow(5,2)计数窗口分配器 【滑动】 满足5条输出一次计算结果 , 每经过一个步长都有一个窗口-触发输出【第一次输出在第二条数据来的时候】
** 4)全局窗口分配器**
GlobalWindows.create() 全局窗口
五:窗口函数
窗口函数定义了要对窗口中收集的数据做的计算操作,根据处理的方式可以分为两类:增量聚合函数和全窗口函数。
1)增量聚合函数(ReduceFunction / AggregateFunction)
** **窗口将数据收集起来,最基本的处理操作当然就是进行聚合。我们可以每来一个数据就在之前结果上聚合一次,这就是“增量聚合”。
** **(1)归约函数(ReduceFunction)
import com.guigu.function.WaterSensorMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class WindowReduceDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
.map(new WaterSensorMapFunction());
KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(5)));
/*
1.相同key的第一条数据来的时候,不会调用reduce方法 【来的数据类型必须保持一致】
2.增量聚合:来一条数据,就会计算一次,但是不会输出
3.在窗口触发的时候,才会输出窗口的最终结果
*/
// 窗口函数,增量聚合reduce
// TODO:返回一个DataStream
SingleOutputStreamOperator<WaterSensor> reduce = senorWS.reduce(new ReduceFunction<WaterSensor>() {
// TODO:只有第二条数据进来了才会调用reduce方法
@Override
public WaterSensor reduce(WaterSensor value1, WaterSensor value2) throws Exception {
System.out.println("调用reduce方法: value1" + value1 + ",value2" + value2);
return new WaterSensor(value1.getId(), value2.getTs(), value1.getVc() + value2.getVc());
}
});
reduce.print();
env.execute();
}
}
(2)聚合函数(AggregateFunction)
来一个数据就调用add方法,进行数据聚合。结果保存在状态中。窗口需要输出时调用getresult()方法 得到计算结果。和ReduceFunction作用相同,而由于输入、中间状态、输出的类型可以不同,使得应用更加灵活方便。
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class WindowAggregateDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
.map(new WaterSensorMapFunction());
KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
// 窗口函数,增量聚合 Aggregate
/*
第一个类型:输入数据的类型
第二个类型: 累加器的类型,存储的中间结果的类型
第三个类型: 输出的类型
*/
/*
*1、属于本窗口的第一条数据来,创建窗口,创建累加器
*2、增量聚合:来一条计算一条,调用-次add方法
*3、窗口输出时调用一次getresult方法
*4、输入、中间累加器、输出 类型可以不一样,非常灵活
*/
senorWS.aggregate(
new AggregateFunction<WaterSensor, Integer, String>() {
// TODO:创建累加器初始化累加器
@Override
public Integer createAccumulator() {
System.out.println("创建累加器");
return 0;
}
// TODO: 聚合逻辑
@Override
public Integer add(WaterSensor value, Integer accumulator) {
System.out.println("调用add方法,value="+value);
return accumulator+ value.getVc();
}
// TODO:获得最后的结果,窗口触发时输出
@Override
public String getResult(Integer accumulator) {
System.out.println("调用getresult方法");
return accumulator.toString();
}
@Override
public Integer merge(Integer a, Integer b) {
// 只有会话窗口才会用到
System.out.println("调用merge方法");
return null;
}
}
).print();
env.execute();
}
}
另外,Flink也为窗口的聚合提供了一系列预定义的简单聚合方法,可以直接基于WindowedStream调用。主要包括.sum()/max()/maxBy()/min()/minBy(),与KeyedStream的简单聚合非常相似。它们的底层,其实都是通过AggregateFunction来实现的。
** 2)全窗口函数(窗口函数,处理窗口函数)**
** 全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。 **WindowFunction【apply方法】能提供的上下文信息较少,也没有更高级的功能。事实上,它的作用可以被ProcessWindowFunction全覆盖,所以之后可能会逐渐弃用【老方法】。
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WindowProcessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
.map(new WaterSensorMapFunction());
KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
// 窗口分配器 【滚动】
WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
// TODO:全窗口 不断存储数据到最后 窗口触发时只会输出一次
// TODO:老写法
// senorWS
// .apply(
// new WindowFunction<WaterSensor, String, String, TimeWindow>() {
// /**
// *
// * @param s 分组的key
// * @param window 窗口对象
// * @param input 存储的数据
// * @param out 采集器
// * @throws Exception
// */
// @Override
// public void apply(String s, TimeWindow window, Iterable<WaterSensor> input, Collector<String> out) throws Exception {
//
// }
// }
// )
// TODO:新写法
senorWS
.process(
new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
/**
* @param s 分组的 key
* @param context 上下文
* @param elements 存的数据
* @param out 采集器
* @throws Exception
*/
@Override
public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
// TODO:毫秒
long start = context.window().getStart();
long end = context.window().getEnd();
String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
// TODO:多少条数据
long data = elements.spliterator().estimateSize();
out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());
}
}
).print();
env.execute();
}
}
3)全窗口函数 和 增量聚合函数 的结合使用
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WindowAggregateAndProcessDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.161", 9999)
.map(new WaterSensorMapFunction());
KeyedStream<WaterSensor, String> senorks = map.keyBy(sensor -> sensor.getId());
// 窗口分配器
WindowedStream<WaterSensor, String, TimeWindow> senorWS = senorks.window(TumblingProcessingTimeWindows.of(Time.seconds(10)));
// TODO:窗口函数,增量聚合 Aggregate + 全窗口 Process
/*
*增量聚合 Aggregate+全窗日 process
* 1、增量聚合函数处理数据:来一条计算一条
*2、窗口触发时,增量聚合的结果(只有一条)传递给 全窗口函数
*3、经过全窗口函数的处理包装后,输出米指轺调坝党
*结合两者的优点:
*1、增量聚合: 来一条计算一条,存储中间的计算结果,占用的空间少
*2、全窗口函数:可以通过 上下文 实现灵活的功能
* */
senorWS.aggregate(
new MyAgg(),
new MyProcess()
).print();
//
env.execute();
}
public static class MyAgg implements AggregateFunction<WaterSensor, Integer, String>{
// TODO:创建累加器初始化累加器
@Override
public Integer createAccumulator() {
System.out.println("创建累加器");
return 0;
}
// TODO: 聚合逻辑
@Override
public Integer add(WaterSensor value, Integer accumulator) {
System.out.println("调用add方法,value="+value);
return accumulator+ value.getVc();
}
// TODO:获得最后的结果,窗口触发时输出
@Override
public String getResult(Integer accumulator) {
System.out.println("调用getresult方法");
return accumulator.toString();
}
@Override
public Integer merge(Integer a, Integer b) {
// 只有会话窗口才会用到
System.out.println("调用merge方法");
return null;
}
}
public static class MyProcess extends ProcessWindowFunction<String,String,String,TimeWindow>{
/**
* @param s 分组的 key
* @param context 上下文
* @param elements 存的数据
* @param out 采集器
* @throws Exception
*/
@Override
public void process(String s, ProcessWindowFunction<String, String, String, TimeWindow>.Context context, Iterable<String> elements, Collector<String> out) throws Exception {
// TODO:毫秒
long start = context.window().getStart();
long end = context.window().getEnd();
String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
// TODO:多少条数据
long data = elements.spliterator().estimateSize();
out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());
}
}
}
这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了Iterable类型的输入。
六:其他API
1) 触发器(Trigger)
触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程。基于WindowedStream调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)。
stream.keyBy(...) .window(...) .trigger(new MyTrigger())
2)移除器(Evictor)
移除器主要用来定义移除某些数据的逻辑。基于WindowedStream调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor是一个接口,不同的窗口类型都有各自预实现的移除器。
stream.keyBy(...) .window(...)
七:窗口的原理
以 时间类型的 滚动窗口 为例,分析原理:
1、窗口什么时候触发 输出?
时间进展>= 窗口的最大时间戳(end-1ms)
2、窗口是怎么划分的?
start= 向下取整,取窗口长度的整数倍
end =start+窗长度
窗口左闭右开 ==》 属于本窗口的 最大时间戳 =end - 1ms
3、窗口的生命周期?
创建:属于本窗口的第一条数据来的时候,现new的,放入一个singeton单例的集合中。
销毁(关窗): 时间进展>= 窗口的最大时间戳(end - 1ms) +允许迟到的时间【默认为0】
八:水位线 (Watermark)
1)什么是水位线?【下图中均为虚线(w)表示】
在Flink中水位线被用来标记事件的进展时间。是在数据流里面的一个标记点,具体内容是时间戳。用来指示当前事件的处理时间。
2) 有序数据的处理
为了提高速率,一般会每隔一段时间产生一个水位线
3)乱序+数据迟到的水位线处理
通常在流式计算中,数据的传输收到网络IO等等因素的影响,数据无法准时到达计算的窗口。为了让窗口处理数据变得规整且正确。我们通常使用让水位线等待固定秒数【意思就是在时间戳的基础上增加一些延迟,以保证不丢失数据】
注意:乱序 和 迟到的区别:
乱序: 数据的顺序乱了 时间小的比时间大的晚来
迟到: 数据的时间戳 < 当前的 watermark
4) 水位线的特征
- 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
- 水位线主要的内容是一个时间戳,用来表示当前事件时间【数据传入的时间】的进展
- 水位线是基于数据的时间戳生成的
- 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进水位线可以通过设置延迟,来保证正确处理乱序数据
- 一个水位线Watermank(t),表示在当前流中事件时间已经达到了时间戳T,这代表t之前的所有数据都到齐了,之后流中不会出现 时间戳T<t的数据。
- 水位线是Flink流处理中保证结果正确性的核心机制,它往往会跟窗口一起配合,完成对乱序数据的正确处理
5)水位线的使用【需要使用事件时间语义窗口才能触发】
(1)有序流的使用
直接调用WatermarkStrategy.forMonotonousTimestamps()方法,然后使用assignTimestampsAndWatermarks()接受 。就可以实现。
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
public class WaterMarkDemo {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<WaterSensor> map = env.socketTextStream("192.168.88.130", 9999)
.map(new WaterSensorMapFunction());
/*
1.定义watermark策略
2.使用assignTimestampsAndWatermarks 调用
*/
// TODO:指定watermark策略
WatermarkStrategy<WaterSensor> waterSensorWatermarkStrategy = WatermarkStrategy
.<WaterSensor>forMonotonousTimestamps() // 指定watermark的生成 单调递增 没有等待时间
// 指定时间戳分配器,从数据中提取
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
// 返回的时间戳是毫秒
System.out.println("数据=" + element + ",recordTS = " + recordTimestamp);
return element.getTs() * 1000L;
}
});
map.assignTimestampsAndWatermarks(waterSensorWatermarkStrategy)
.keyBy(sensor -> sensor.getId())
// TODO:只能使用 事件时间语义窗口 才能使用水平线
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.process(
new ProcessWindowFunction<WaterSensor, String, String, TimeWindow>() {
/**
* @param s 分组的 key
* @param context 上下文
* @param elements 存的数据
* @param out 采集器
* @throws Exception
*/
@Override
public void process(String s, ProcessWindowFunction<WaterSensor, String, String, TimeWindow>.Context context, Iterable<WaterSensor> elements, Collector<String> out) throws Exception {
// TODO:毫秒
long start = context.window().getStart();
long end = context.window().getEnd();
String start_format = DateFormatUtils.format(start, "yyyy-MM-dd HH:mm:ss.SSS");
String end_format = DateFormatUtils.format(end, "yyyy-MM-dd HH:mm:ss.SSS");
// TODO:多少条数据
long data = elements.spliterator().estimateSize();
out.collect("key="+s+"的窗口【"+start_format+","+end_format+"]包含"+data+"条数据 ====》"+elements.toString());
}
}
).print();
env.execute();
}
}
(2) 无序流的使用
将.forMonotonousTimestamps()方法 修改为forBoundedOutOfOrderness() 参数Duration.ofSeconds(等待的时间)
// TODO:指定watermark策略
WatermarkStrategy
// TODO: 指定watermark的生成 乱序 有等待时间 3s
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
// TODO: 指定时间戳分配器,从数据中提取
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
// 返回的时间戳是毫秒
System.out.println("数据=" + element + ",recordTS = " + recordTimestamp);
return element.getTs() * 1000L;
}
});
(3) 自定义水位线的使用
将.forBoundedOutOfOrderness()方法 修改为 forGenerator(重写类atermarkGeneratorSupplier) 最后 return 自己的方法<>(延迟的时间)
WatermarkStrategy
// TODO: 自定义指定watermark的生成 乱序 有等待时间 3s
.<WaterSensor>forGenerator(new WatermarkGeneratorSupplier<WaterSensor>() {
@Override
public WatermarkGenerator<WaterSensor> createWatermarkGenerator(Context context) {
// TODO:延迟时间3s
return new MyGenerator<>(3000);
}
})
(4) 在数据源发送水位线
我们也可以在自定义的数据源中抽取事件时间,然后发送水位线。这里要注意的是,在自定义数据源中发送了水位线以后,就不能再在程序中使用assignTimestampsAndWatermarks方法来生成水位线了。在自定义数据源中生成水位线和在程序中使用assignTimestampsAndWatermarks方法生成水位线二者只能取其一。
env.fromSource(
kafkaSource, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "kafkasource"
)
6)水位线的传递
一个Task通常会设置多个并行度,而而当一个任务接收到多个上游并行任务传递来的水位线时,应该以最小的那个作为当前任务的事件时钟。每个任务都以“处理完之前所有数据”为标准来确定自己的时钟。
九:处理乱序+迟到数据 常用三部曲
1)水位线空闲等待
在多个上游并行任务中,如果有其中一个没有数据,由于当前Task是以最小的那个作为当前任务的事件时钟,而没有数据的并行任务一直保持最小。就会导致当前Task的水位线无法推进,就可能导致窗口无法触发。这时候可以设置空闲等待(withIdleness)。
env.socketTextStream("192.168.88.130", 9999)
.partitionCustom(new MyPartioner(), r -> r)
.map(r -> Integer.parseInt(r))
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Integer>forMonotonousTimestamps()
.withTimestampAssigner((r, ts) -> r * 1000L)
// TODO: 空闲等待 5s
.withIdleness(Duration.ofSeconds(5))
);
2) 允许推迟时间关窗
Flink的窗口,也允许迟到数据。当触发了窗口计算后,会先计算当前的结果,但是此时并不会关闭窗口。以后每来一条迟到数据,就触发一次这条数据所在窗口计算(增量计算)。直到wartermark 超过了窗口结束时间+推迟时间,此时窗口会真正关闭,迟到来的数据不会被计算。
.keyBy(sensor -> sensor.getId())
// TODO:只能使用 事件时间语义窗口 才能使用水平线
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
// TODO:使用窗口延迟 5s
.allowedLateness(Time.seconds(5))
3) 侧输出流
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.seconds(3))
// TODO: 【关窗后】 迟到的数据放入侧输出流
.sideOutputLateData(
// TODO: 参数1:侧输出流的名字 参数2:侧输出数据的类型
new OutputTag<WaterSensor>("late_data", Types.POJO(WaterSensor.class))
)
4) 设置经验
1.watermark等待时间,设置一个不算特别大的,一般是秒级,在乱序和 延迟 取舍
2.设置一定的窗口允许迟到,只考虑大部分的迟到数据,极端小部分迟到很久的数据,不管
3.极端小部分迟到很久的数据,放到侧输出流。获取到之后可以做各种处理
十:基于时间的合流-----双流联结(Join)
可以发现,根据某个key合并两条流,与关系型数据库中表的join操作非常相近。事实上,Flink中两条流的connect操作,就可以通过keyBy指定键进行分组后合并,实现了类似于SQL中的join操作;另外connect支持处理函数,可以使用自定义实现各种需求,其实已经能够处理双流join的大多数场景。
不过处理函数是底层接口,所以尽管connect能做的事情多,但在一些具体应用场景下还是显得太过抽象了。比如,如果我们希望统计固定时间内两条流数据的匹配情况,那就需要自定义来实现——其实这完全可以用窗口(window)来表示。为了更方便地实现基于时间的合流操作,Flink的DataStrema API提供了内置的join算子。
1)窗口联结 (Window Join)
Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。
// TODO: window join 窗口联结
/**
* stream1.join(stream2)
* .where(<KeySelector>)
* .equalTo(<KeySelector>)
* .window(<WindowAssigner>)
* .apply(<JoinFunction>)
*
* 1. 落在同一个时间窗口范围内才能匹配
* 2. 根据keyby的key,来进行匹配关联
* 3. 只能拿到匹配上的数据,类似有固定时间范围的inner join
*/
data1.join(data2)
// TODO:data1的keyby
.where(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
})
// TODO:data2的keyby
.equalTo(new KeySelector<Tuple3<String, Integer, Integer>, String>() {
@Override
public String getKey(Tuple3<String, Integer, Integer> value) throws Exception {
return value.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
*关联上的数据调用join方法
* @param first The element from first input. [data1]
* @param second The element from second input. [data2]
* @return
* @throws Exception
*/
@Override
public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
return first + "<------------>" + second;
}
});
2)间隔联结 (Interval Join)
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key,所以要先进行KeyBy分组;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
(1)正常使用
data_afterKeyBy1.intervalJoin(data_afterKeyBy2)
// TODO:设置下上界的偏移量 [先下后上的设置]
.between(Time.seconds(-3), Time.seconds(3))
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
*两条流的数据匹配上才会调用这个方法
* @param left ks1 的数据
* @param right ks2 的数据
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
// 关联上的数据才能进入这个方法是
out.collect(left + "<------->" + right);
}
})
(2) 处理迟到数据
/**
* TODO Interval join 处理迟到数据
* 1、只支持事件时间
* 2、指定上界、下界的偏移,负号代表时间往前,正号代表时间往后
* 3、process中,只能处理 join上的数据
* 4、两条流关联后的watermark,以两条流中最小的为准
* 5、如果 当前数据的事件时间 < 当前的watermark,就是迟到数据, 主流的process不处理
* => between后,可以指定将 左流 或 右流 的迟到数据 放入侧输出流
*/
OutputTag<Tuple2<String, Integer>> ks1Late = new OutputTag<>("ks1_late", Types.TUPLE(Types.STRING, Types.INT));
OutputTag<Tuple3<String, Integer,Integer>> ks2Late = new OutputTag<>("ks1_late", Types.TUPLE(Types.STRING, Types.INT,Types.INT));
SingleOutputStreamOperator<String> process = key1.intervalJoin(key2)
// TODO:设置下上界的偏移量 [先下后上的设置]
.between(Time.seconds(-3), Time.seconds(3))
// TODO:处理左侧迟到数据到侧输出流
.sideOutputLeftLateData(ks1Late)
// TODO:处理右侧迟到数据到侧输出流
.sideOutputRightLateData(ks2Late)
.process(new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
/**
*两条流的数据匹配上才会调用这个方法
* @param left ks1 的数据
* @param right ks2 的数据
* @param ctx 上下文
* @param out 采集器
* @throws Exception
*/
@Override
public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>.Context ctx, Collector<String> out) throws Exception {
// 进入这个方法是关联上的数据
out.collect(left + "<------->" + right);
}
});
process.print("主流打印:");
process.getSideOutput(ks1Late).printToErr("ks1迟到数据以错误日志方式打印:");
process.getSideOutput(ks2Late).printToErr("ks2迟到数据以错误日志方式打印:");
版权归原作者 可乐朱仔 所有, 如有侵权,请联系我们删除。