0


flink的ProcessWindowFunction函数的三种状态

背景

在处理窗口函数时,ProcessWindowFunction处理函数可以定义三个状态: 富函数getRuntimeContext.getState,
每个key+每个窗口的状态context.windowState(),每个key的状态context.globalState,那么这几个状态之间有什么关系呢?

ProcessWindowFunction处理函数三种状态之间的关系:

1.getRuntimeContext.getState这个定义的状态是每个key维度的,也就是可以跨时间窗口并维持状态的
2.context.windowState()这个定义的状态是和每个key以及窗口相关的,也就是虽然key相同,但是时间窗口不同,他们的值也不一样.
3.context.globalState这个定义的状态是和每个key相关的,也就是和getRuntimeContext.getState的定义一样,可以跨窗口维护状态
验证代码如下所示:

packagewikiedits.func;importorg.apache.flink.api.common.state.ValueState;importorg.apache.flink.api.common.state.ValueStateDescriptor;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.TimeCharacteristic;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importorg.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;importwikiedits.func.model.KeyCount;importjava.text.SimpleDateFormat;importjava.util.Date;publicclassProcessWindowFunctionDemo{publicstaticvoidmain(String[] args)throwsException{finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 并行度为1
        env.setParallelism(1);// 设置数据源,一共三个元素DataStream<Tuple2<String,Integer>> dataStream = env.addSource(newSourceFunction<Tuple2<String,Integer>>(){@Overridepublicvoidrun(SourceContext<Tuple2<String,Integer>> ctx)throwsException{int xxxNum =0;int yyyNum =0;for(int i =1; i <Integer.MAX_VALUE; i++){// 只有XXX和YYY两种nameString name =(0== i %2)?"XXX":"YYY";//更新aaa和bbb元素的总数if(0== i %2){
                        xxxNum++;}else{
                        yyyNum++;}// 使用当前时间作为时间戳long timeStamp =System.currentTimeMillis();// 将数据和时间戳打印出来,用来验证数据System.out.println(String.format("source,%s, %s,    XXX total : %d,    YYY total : %d\n",
                            name,time(timeStamp),
                            xxxNum,
                            yyyNum));// 发射一个元素,并且戴上了时间戳
                    ctx.collectWithTimestamp(newTuple2<String,Integer>(name,1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1000);}}@Overridepublicvoidcancel(){}});// 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunctionSingleOutputStreamOperator<String> mainDataStream = dataStream
                // 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种.keyBy(value -> value.f0)// 5秒一次的滚动窗口.timeWindow(Time.seconds(5))// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(newProcessWindowFunction<Tuple2<String,Integer>,String,String,TimeWindow>(){// 自定义状态privateValueState<KeyCount> state;@Overridepublicvoidopen(Configuration parameters)throwsException{// 初始化状态,name是myState
                        state =getRuntimeContext().getState(newValueStateDescriptor<>("myState",KeyCount.class));}publicvoidclear(Context context){ValueState<KeyCount> contextWindowValueState = context.windowState().getState(newValueStateDescriptor<>("myWindowState",KeyCount.class));
                        contextWindowValueState.clear();}@Overridepublicvoidprocess(String s,Context context,Iterable<Tuple2<String,Integer>> iterable,Collector<String> collector)throwsException{// 从backend取得当前单词的myState状态KeyCount current = state.value();// 如果myState还从未没有赋值过,就在此初始化if(current ==null){
                            current =newKeyCount();
                            current.key = s;
                            current.count =0;}int count =0;// iterable可以访问该key当前窗口内的所有数据,// 这里简单处理,只统计了元素数量for(Tuple2<String,Integer> tuple2 : iterable){
                            count++;}// 更新当前key的元素总数
                        current.count += count;// 更新状态到backend
                        state.update(current);System.out.println("getRuntimeContext() == context :"+(getRuntimeContext()== context));ValueState<KeyCount> contextWindowValueState = context.windowState().getState(newValueStateDescriptor<>("myWindowState",KeyCount.class));ValueState<KeyCount> contextGlobalValueState = context.globalState().getState(newValueStateDescriptor<>("myGlobalState",KeyCount.class));KeyCount windowValue = contextWindowValueState.value();if(windowValue ==null){
                            windowValue =newKeyCount();
                            windowValue.key = s;
                            windowValue.count =0;}
                        windowValue.count += count;
                        contextWindowValueState.update(windowValue);KeyCount globalValue = contextGlobalValueState.value();if(globalValue ==null){
                            globalValue =newKeyCount();
                            globalValue.key = s;
                            globalValue.count =0;}
                        globalValue.count += count;
                        contextGlobalValueState.update(globalValue);ValueState<KeyCount> contextWindowSameNameState =
                                context.windowState().getState(newValueStateDescriptor<>("myState",KeyCount.class));ValueState<KeyCount> contextGlobalSameNameState =
                                context.globalState().getState(newValueStateDescriptor<>("myState",KeyCount.class));System.out.println("contextWindowSameNameState == contextGlobalSameNameState :"+(
                                contextWindowSameNameState == contextGlobalSameNameState));System.out.println("state == contextGlobalSameNameState :"+(state == contextGlobalSameNameState));// 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串String value =String.format("window, %s, %s - %s, %d,    total : %d, windowStateCount :%s, globalStateCount :%s\n",// 当前key
                                s,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数
                                count,// 当前key出现的总数
                                current.count,
                                contextWindowValueState.value(),
                                contextGlobalValueState.value());// 发射到下游算子
                        collector.collect(value);}});// 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据
        mainDataStream.print();

        env.execute("processfunction demo : processwindowfunction");}publicstaticStringtime(long timeStamp){returnnewSimpleDateFormat("hh:mm:ss").format(newDate(timeStamp));}}

输出结果:

window,XXX,08:34:45-08:34:50,3,    total :22, windowStateCount :KeyCount{key='XXX', count=3}, globalStateCount :KeyCount{key='XXX', count=22}
window,YYY,08:34:45-08:34:50,2,    total :22, windowStateCount :KeyCount{key='YYY', count=2}, globalStateCount :KeyCount{key='YYY', count=22}

从结果可以验证以上的结论,此外需要特别注意的一点是context.windowState()的状态需要在clear方法中清理掉,因为一旦时间窗口结束,就再也没有机会清理了
从这个例子中还发现一个比较有趣的现象:

ValueState<KeyCount> state =getRuntimeContext().getState(newValueStateDescriptor<>("myState",KeyCount.class));ValueState<KeyCount> contextWindowSameNameState =
        context.windowState().getState(newValueStateDescriptor<>("myState",KeyCount.class));ValueState<KeyCount> contextGlobalSameNameState =
        context.globalState().getState(newValueStateDescriptor<>("myState",KeyCount.class));

在open中通过getRuntimeContext().getState定义的状态竟然可以通过 context.windowState()/ context.globalState()访问到,并且他们指向的都是同一个变量,可以参见代码的输出:

System.out.println("contextWindowSameNameState == contextGlobalSameNameState :"+(
        contextWindowSameNameState == contextGlobalSameNameState));System.out.println("state == contextGlobalSameNameState :"+(state == contextGlobalSameNameState));

结果如下:

contextWindowSameNameState == contextGlobalSameNameState :true
state == contextGlobalSameNameState :true

参考文献:
https://cloud.tencent.com/developer/article/1815079

标签: flink 大数据

本文转载自: https://blog.csdn.net/lixia0417mul2/article/details/132155845
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。

“flink的ProcessWindowFunction函数的三种状态”的评论:

还没有评论