定时器
让 Flink 流处理程序对处理时间和事件时间的变化作出反应。通常在KeyProcessFunction中使用。
基于处理时间或者事件时间处理过一个元素之后,注册一个定时器,然后在指定的时间运行。
最常见的显式利用Timer的地方就是KeyedProcessFunction。我们在其processElement()方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。根据时间特征的不同:
- 处理时间——调用Context.timerService().registerProcessingTimeTimer()注册;onTimer()在系统时间戳达到Timer设定的时间戳时触发。
- 事件时间——调用Context.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部水印达到或超过Timer设定的时间戳时触发。
Context和OnTimerContext所持有的TimerService对象拥有以下方法:
currentProcessingTime()
: Long 返回当前处理时间- currentWatermark(): Long 返回当前watermark的时间戳
- registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达指定时间时,触发timer。
- registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
- deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
- deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。
1.基于处理时间的定时器
SingleOutputStreamOperator<WaterSensor> stream = env
.socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳
.map(value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
stream
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
// 处理时间过后5s后触发定时器
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
out.collect(value.toString());
}
// 定时器被触发之后, 回调这个方法
// 参数1: 触发器被触发的时间
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
System.out.println(timestamp);
out.collect("我被触发了....");
}
})
.print();
2.基于事件时间的定时器
事件进展依据的是watermark
SingleOutputStreamOperator<WaterSensor> stream = env
.socketTextStream("hadoop102", 9999) // 在socket终端只输入毫秒级别的时间戳
.map(value -> {
String[] datas = value.split(",");
return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
});
WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
.<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
.withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000);
stream
.assignTimestampsAndWatermarks(wms)
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
@Override
public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
System.out.println(ctx.timestamp());
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000);
out.collect(value.toString());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
System.out.println("定时器被触发.....");
}
})
.print();
3.定时器练习
监控水位传感器的水位值,如果水位值在(处理时间)5秒内连续上上,则报警。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
env.setParallelism(1);
env.socketTextStream("hadoop162",9999)
.map(value->{
String[] split = value.split(",");
return new WaterSensor(split[0],Long.valueOf(split[1])*1000,Integer.valueOf(split[2]));
})
.assignTimestampsAndWatermarks(WatermarkStrategy.
<WaterSensor>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){
@Override
public long extractTimestamp(WaterSensor element, long recordTimestamp) {
return element.getTs();
}
})
)
.keyBy(WaterSensor::getId)
.process(new KeyedProcessFunction<String, WaterSensor, String>() {
private long time;
private int lastVc = 0;
private boolean isFirst = true;
@Override
public void processElement(WaterSensor value,
Context ctx,
Collector<String> out) throws Exception {
if (value.getVc() > lastVc){
if (isFirst){
//水位上升
time =ctx.timestamp() +5000L;
ctx.timerService().registerEventTimeTimer(time);
isFirst=false;
}
}else {
//水位下降或者不变
ctx.timerService().deleteEventTimeTimer(time);
time = ctx.timestamp() + 5000L;
ctx.timerService().registerEventTimeTimer(time);
}
lastVc = value.getVc();
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey()+"水位已经连续5上升。。。。");
isFirst=true;
}
})
.print();
try {
env.execute();
} catch (Exception e) {
e.printStackTrace();
}
}
版权归原作者 阿萨红参 所有, 如有侵权,请联系我们删除。