0


Flink中定时器的使用

定时器

让 Flink 流处理程序对处理时间和事件时间的变化作出反应。通常在KeyProcessFunction中使用。

基于处理时间或者事件时间处理过一个元素之后,注册一个定时器,然后在指定的时间运行。

最常见的显式利用Timer的地方就是KeyedProcessFunction。我们在其processElement()方法中注册Timer,然后覆写其onTimer()方法作为Timer触发时的回调逻辑。根据时间特征的不同:

  • 处理时间——调用Context.timerService().registerProcessingTimeTimer()注册;onTimer()在系统时间戳达到Timer设定的时间戳时触发。
  • 事件时间——调用Context.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部水印达到或超过Timer设定的时间戳时触发。

Context和OnTimerContext所持有的TimerService对象拥有以下方法:

  • currentProcessingTime(): Long 返回当前处理时间
  • currentWatermark(): Long 返回当前watermark的时间戳
  • registerProcessingTimeTimer(timestamp: Long): Unit 会注册当前key的processing time的定时器。当processing time到达指定时间时,触发timer。
  • registerEventTimeTimer(timestamp: Long): Unit 会注册当前key的event time 定时器。当水位线大于等于定时器注册的时间时,触发定时器执行回调函数。
  • deleteProcessingTimeTimer(timestamp: Long): Unit 删除之前注册处理时间定时器。如果没有这个时间戳的定时器,则不执行。
  • deleteEventTimeTimer(timestamp: Long): Unit 删除之前注册的事件时间定时器,如果没有此时间戳的定时器,则不执行。

1.基于处理时间的定时器

SingleOutputStreamOperator<WaterSensor> stream = env
  .socketTextStream("hadoop102", 9999)  // 在socket终端只输入毫秒级别的时间戳
  .map(value -> {
      String[] datas = value.split(",");
      return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

  });

stream
  .keyBy(WaterSensor::getId)
  .process(new KeyedProcessFunction<String, WaterSensor, String>() {
      @Override
      public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
          // 处理时间过后5s后触发定时器
          ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5000);
          out.collect(value.toString());
      }

      // 定时器被触发之后, 回调这个方法
      // 参数1: 触发器被触发的时间
      @Override
      public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
          System.out.println(timestamp);
          out.collect("我被触发了....");
      }
  })
  .print();

2.基于事件时间的定时器

事件进展依据的是watermark

SingleOutputStreamOperator<WaterSensor> stream = env
  .socketTextStream("hadoop102", 9999)  // 在socket终端只输入毫秒级别的时间戳
  .map(value -> {
      String[] datas = value.split(",");
      return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

  });

WatermarkStrategy<WaterSensor> wms = WatermarkStrategy
  .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000);
stream
  .assignTimestampsAndWatermarks(wms)
  .keyBy(WaterSensor::getId)
  .process(new KeyedProcessFunction<String, WaterSensor, String>() {
      @Override
      public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
          System.out.println(ctx.timestamp());
          ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 5000);
          out.collect(value.toString());
      }

      @Override
      public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
          System.out.println("定时器被触发.....");
      }
  })
  .print();

3.定时器练习

监控水位传感器的水位值,如果水位值在(处理时间)5秒内连续上上,则报警。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(1);

        env.socketTextStream("hadoop162",9999)
                .map(value->{
                    String[] split = value.split(",");
                    return new WaterSensor(split[0],Long.valueOf(split[1])*1000,Integer.valueOf(split[2]));
                })
                .assignTimestampsAndWatermarks(WatermarkStrategy.
                        <WaterSensor>forMonotonousTimestamps()
                    .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>(){
                        @Override
                        public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                            return element.getTs();
                        }
                    })
                )
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {

                    private  long  time;
                    private  int  lastVc = 0;
                    private  boolean  isFirst = true;

                    @Override
                    public void processElement(WaterSensor value,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        if (value.getVc() > lastVc){
                            if (isFirst){
                                //水位上升
                                time =ctx.timestamp() +5000L;
                                ctx.timerService().registerEventTimeTimer(time);
                                isFirst=false;
                            }
                        }else {
                            //水位下降或者不变
                            ctx.timerService().deleteEventTimeTimer(time);

                            time = ctx.timestamp() + 5000L;
                            ctx.timerService().registerEventTimeTimer(time);
                        }
                        lastVc = value.getVc();
                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        out.collect(ctx.getCurrentKey()+"水位已经连续5上升。。。。");
                        isFirst=true;
                    }
                })
                .print();
                
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
标签: flink 大数据

本文转载自: https://blog.csdn.net/roi666/article/details/135849206
版权归原作者 阿萨红参 所有, 如有侵权,请联系我们删除。

“Flink中定时器的使用”的评论:

还没有评论