测试反馈, 配置的flink任务提交上去后, 输入数据源符合条件,到时间窗口的size。最后一个窗口没有闭窗计算,数据并没及时输出告警
经过调试发现,watermark没有向后继续推进,导致无法闭窗, watermark的时间取的是数据中的业务时间,create_time。
因为没有后续数据进来, 所以watermark一直停在收到的最后一条数据的时间,,
按照官网的watermark的实现:
inputStream.assignTimestampsAndWatermarks(WatermarkStrategy.forGenerator(new EventWaterMark(window))
.withTimestampAssigner((SerializableTimestampAssigner<JSONObject>) (jsonObject, l) -> jsonObject.getLong(window.getAttrField()))).name("Watermark|"+rule.getName());
public class EventWaterMark implements WatermarkGeneratorSupplier<JSONObject> {
private static final long serialVersionUID = -2338922000184097299L;
private final String eventTimeFieldNameJSONObject;
private long currentMaxTimestamp;
private String eventTimeFieldName;
private long maxOutOfOrderness;
public EventWaterMark(Window window) {
this.maxOutOfOrderness = window.getMaxOutOfOrderness();
this.eventTimeFieldNameJSONObject = window.getAttrField();
this.eventTimeFieldName = window.getAttrField();
}
@Override
public WatermarkGenerator<JSONObject> createWatermarkGenerator(Context context) {
return new WatermarkGenerator<JSONObject>() {
@Override
public void onEvent(JSONObject jsonObject, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, jsonObject.getLong(eventTimeFieldName));
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
// 无后续数据,窗口不会关闭计算?
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
# 使用当前时间,无法处理历史的数据
// watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis() - maxOutOfOrderness));
}
};
}
}
解决思路: 记录最后一条数据进入的时间, 多少秒后无数据进来,就主动向后推进watermark多少秒,保持watermark向后推进
更新后自定义的watermark实现:
public class EventWaterMark implements WatermarkGeneratorSupplier<JSONObject> {
private static final long serialVersionUID = -2338922000184097299L;
private final String eventTimeFieldNameJSONObject;
private long currentMaxTimestamp;
private String eventTimeFieldName;
private long maxOutOfOrderness;
// 当前数据进入的时间
private long currentDateTimeMillis;
public EventWaterMark(Window window) {
this.maxOutOfOrderness = window.getMaxOutOfOrderness();
this.eventTimeFieldNameJSONObject = window.getAttrField();
this.eventTimeFieldName = window.getAttrField();
}
@Override
public WatermarkGenerator<JSONObject> createWatermarkGenerator(Context context) {
return new WatermarkGenerator<JSONObject>() {
@Override
public void onEvent(JSONObject jsonObject, long l, WatermarkOutput watermarkOutput) {
currentMaxTimestamp = Math.max(currentMaxTimestamp, jsonObject.getLong(eventTimeFieldName));
// 记录 当前来的数据来到 时间戳
currentDateTimeMillis = System.currentTimeMillis();
}
@Override
public void onPeriodicEmit(WatermarkOutput watermarkOutput) {
// 无后续数据,最后一个窗口不会关闭计算,
// watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
// 使用System.currentTimeMillis(),如果第一条数据时间很早, 窗口不会进行关闭计算
// watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis() - maxOutOfOrderness));
// 无初始数据进入 watermark不用推进, 多并行度时候,会取最小的watermark,通过withIdleness 将分区标记为空闲,这样watermark就可以往下走了。
if (currentMaxTimestamp - maxOutOfOrderness <= 0) {
} else {
// 3秒内无数据进入,watermark保持向后推进, 确保无数据进入,窗口能闭窗计算
if (System.currentTimeMillis() - currentDateTimeMillis >= 5000) {
// 无数据进入,保持watermark递增
currentMaxTimestamp = currentMaxTimestamp + System.currentTimeMillis() - currentDateTimeMillis;
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
// 模拟数据进入 设置数据进入时间
currentDateTimeMillis = System.currentTimeMillis();
} else {
watermarkOutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness));
}
}
}
};
}
}
上面虽然可以解决最后一个窗口闭合问题,但是还有个问题,当执行繁忙的时候,出现反压,导致source消费为0, 此时持续高反压时间大于代码中配置的5秒后, 此时自定义的watermark由于没收到上游source算子的数据,就会自动的向后推进watermark,这样就会导致,窗口会及时闭合计算,当反压结束后, source再来的数据 已经超过 窗口时间和延迟时间,, 直接作为迟到数据不参与窗口计算了。
此处暂未想到更好的解决方法
版权归原作者 奔跑的窝窝牛 所有, 如有侵权,请联系我们删除。