0


Flink自定义触发器

Flink自定义触发器

Apache Flink是一个流处理框架,它提供了许多内置的触发器来控制流处理作业的执行。但是,有时候内置的触发器不能满足我们的需求,这时候我们就需要自定义触发器,在编写自定义触发器之前,我们先来了解一下触发器的基本知识:

一、触发器概述

  • 触发器是什么?​ Trigger(触发器)决定了什么时候窗口准备就绪了,一旦窗口准备就绪就可以使用WindowFunction(窗口计算操作)进行计算。每一个 WindowAssigner(窗口分配器) 都会有一个默认的Trigger。如果默认的Trigger不满足用户的需求,用户可以自定义Trigger。触发器接口有5个方法,允许触发器对不同的事件做出反应:1. onElement():每当向窗口添加一个元素时,都会调用该方法。2. onEventTime():当注册的事件时间定时器触发时,该方法被调用。3. onProcessingTime():当注册的处理时间计时器触发时,该方法被调用。4. onMerge():该方法适用于有状态触发器,并在它们对应的窗口合并时合并两个触发器的状态。5. clear():执行窗口的清除操作。需要注意的是:1. 前3个方法决定如何响应它们的调用事件,返回一个 TriggerResult:CONTINUE:什么也不做FIRE:触发计算PURGE:清除窗口中的元素FIRE_AND_PURGE:触发计算并清空窗口中的元素2. 以上这些方法都可以用来为之后的操作注册处理时间定时器或事件时间定时器
  • 内置触发器1. EventTimeTrigger:基于事件时间和watermark机制来对窗口进行触发计算2. ProcessingTimeTrigger: 基于处理时间触发3. CountTrigger:窗口元素数超过预先给定的限制值的话会触发计算4. PurgingTrigger:作为其它trigger的参数,将其转化为一个purging触发器

二、需求

​ 实际工作中,可能会遇到想控制Flink数据流速度的情况,比如每5秒最多输出3条数据,这时候如果使用默认的TimeWindow或者CountWindow都不好达到要求,这时候就可以进行自定义窗口的触发器Trigger,修改触发窗口执行计算的条件。

三、自定义触发器

为了实现以上需求,我编写了如下代码:

importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONArray;importlombok.extern.slf4j.Slf4j;importorg.apache.commons.lang3.StringUtils;importorg.apache.flink.api.common.functions.ReduceFunction;importorg.apache.flink.api.common.state.ReducingState;importorg.apache.flink.api.common.state.ReducingStateDescriptor;importorg.apache.flink.api.common.typeutils.base.IntSerializer;importorg.apache.flink.api.common.typeutils.base.LongSerializer;importorg.apache.flink.streaming.api.TimeCharacteristic;importorg.apache.flink.streaming.api.windowing.triggers.Trigger;importorg.apache.flink.streaming.api.windowing.triggers.TriggerResult;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importjava.util.HashMap;importjava.util.Map;@Slf4jpublicclassCountAndSizeTrigger<T>extendsTrigger<T,TimeWindow>{privatestaticfinalString DATA_COUNT_STATE_NAME ="dataCountState";privatestaticfinalString DATA_SIZE_STATE_NAME ="dataSizeState";// 窗口最大数据条数privateint maxCount;// 窗口最大数据字节数privateint maxSize;// 时间语义:event time、process timeprivateTimeCharacteristic timeType;// 用于储存窗口当前数据条数的状态对象privateReducingStateDescriptor<Long> countStateDescriptor =newReducingStateDescriptor(DATA_COUNT_STATE_NAME,newReduceFunction<Long>(){@OverridepublicLongreduce(Long value1,Long value2)throwsException{return value1 + value2;}},LongSerializer.INSTANCE);//用于储存窗口当前数据字节数的状态对象privateReducingStateDescriptor<Integer> sizeStateDescriptor =newReducingStateDescriptor(DATA_SIZE_STATE_NAME,newReduceFunction<Long>(){@OverridepublicLongreduce(Long value1,Long value2)throwsException{return value1 + value2;}},IntSerializer.INSTANCE);privateCountAndSizeTrigger(int maxCount,int maxSize ,TimeCharacteristic timeType){this.maxCount = maxCount;this.maxSize = maxSize;this.timeType = timeType;}/**
     * 触发计算,计算结束后清空窗口内的元素
     * @param window 窗口
     * @param ctx 上下文
     */privateTriggerResultfireAndPurge(TimeWindow window,TriggerContext ctx)throwsException{clear(window, ctx);returnTriggerResult.FIRE_AND_PURGE;}/**
     * 进入窗口的每个元素都会调用该方法
     * @param element 元素
     * @param timestamp 时间戳
     * @param window 窗口
     * @param ctx 上下文
     */@OverridepublicTriggerResultonElement(T element,long timestamp,TimeWindow window,TriggerContext ctx)throwsException{ReducingState<Long> countState = ctx.getPartitionedState(countStateDescriptor);ReducingState<Integer> sizeState = ctx.getPartitionedState(sizeStateDescriptor);Map<String,JSONArray> map =stringToMap(element.toString());if(map !=null){for(Map.Entry<String,JSONArray> entry : map.entrySet()){JSONArray value = entry.getValue();
                countState.add(Long.valueOf(value.size()));}}else{
            countState.add(0L);}int length =String.valueOf(element).getBytes("utf-8").length;
        sizeState.add(length);// 注册定时器
        ctx.registerProcessingTimeTimer(window.maxTimestamp());if(countState.get()>= maxCount){
            log.info("fire count {} ",countState.get());returnfireAndPurge(window, ctx);}if(sizeState.get()>= maxSize){
            log.info("fire size {} ",sizeState.get());returnfireAndPurge(window, ctx);}else{returnTriggerResult.CONTINUE;}}// 数据处理,可根据需要修改privateMap<String,JSONArray>stringToMap(String str){if(StringUtils.isBlank(str)){returnnull;}String string = str.substring(1, str.length()-1).replaceAll(" ","");Map<String,JSONArray> map =newHashMap<>();String[] split = string.split("=");if(split.length <2){returnnull;}else{String key = split[0];String value = string.substring(string.indexOf("=")+1);
            map.put(key, JSON.parseArray(value));}return map;}/**
     * 处理时间窗口触发的时候会被调用
     * @param time 时间
     * @param window 窗口
     * @param ctx 上下文
     */@OverridepublicTriggerResultonProcessingTime(long time,TimeWindow window,TriggerContext ctx)throwsException{if(timeType !=TimeCharacteristic.ProcessingTime){returnTriggerResult.CONTINUE;}
        log.info("fire time {} ",time);returnfireAndPurge(window, ctx);}/**
     * 事件时间窗口触发的时候被调用
     * @param time 时间
     * @param window 窗口
     * @param ctx 上下文
     */@OverridepublicTriggerResultonEventTime(long time,TimeWindow window,TriggerContext ctx)throwsException{if(timeType !=TimeCharacteristic.EventTime){returnTriggerResult.CONTINUE;}if(time >= window.getEnd()){returnTriggerResult.CONTINUE;}else{
            log.info("fire with event tiem: "+ time);returnfireAndPurge(window, ctx);}}/**
     * 执行窗口的清除操作
     * @param window 窗口
     * @param ctx 上下文
     */@Overridepublicvoidclear(TimeWindow window,TriggerContext ctx)throwsException{
        ctx.getPartitionedState(countStateDescriptor).clear();
        ctx.getPartitionedState(sizeStateDescriptor).clear();}/**
     * 初始化触发器,默认使用processTime
     * @param maxCount 最大数据条数
     * @param maxSize 最大数据字节数
     * @param timeType 事件类型
     */publicstaticCountAndSizeTriggercreat(int maxCount,int maxSize){returnnewCountAndSizeTrigger(maxCount,maxSize,TimeCharacteristic.ProcessingTime);}/**
     * 初始化触发器
     * @param maxCount 最大数据条数
     * @param maxSize 最大数据字节数
     * @param timeType 事件类型
     */publicstaticCountAndSizeTriggercreat(int maxCount,int maxSize,TimeCharacteristic timeType){returnnewCountAndSizeTrigger(maxCount,maxSize,timeType);}}

四、使用示例

stream
        .timeWindowAll(Time.seconds(10)).trigger(newCountAndSizeTrigger(1000,1024)).process(newDemoWindowProcessFunction()).addSink(newDemoSinkFunction()).name("demo");

以上代码通过调用CountAndSizeTrigger,传入最大数据条数和最大数据字节数,来对数据流进行流速控制。

标签: flink java apache

本文转载自: https://blog.csdn.net/weixin_43914798/article/details/130954737
版权归原作者 码猿小站 所有, 如有侵权,请联系我们删除。

“Flink自定义触发器”的评论:

还没有评论