背景
在处理窗口函数时,ProcessWindowFunction处理函数可以定义三个状态: 富函数getRuntimeContext.getState,
每个key+每个窗口的状态context.windowState(),每个key的状态context.globalState,那么这几个状态之间有什么关系呢?
ProcessWindowFunction处理函数三种状态之间的关系:
1.getRuntimeContext.getState这个定义的状态是每个key维度的,也就是可以跨时间窗口并维持状态的
2.context.windowState()这个定义的状态是和每个key以及窗口相关的,也就是虽然key相同,但是时间窗口不同,他们的值也不一样.
3.context.globalState这个定义的状态是和每个key相关的,也就是和getRuntimeContext.getState的定义一样,可以跨窗口维护状态
验证代码如下所示:
packagewikiedits.func;importorg.apache.flink.api.common.state.ValueState;importorg.apache.flink.api.common.state.ValueStateDescriptor;importorg.apache.flink.api.java.tuple.Tuple2;importorg.apache.flink.configuration.Configuration;importorg.apache.flink.streaming.api.TimeCharacteristic;importorg.apache.flink.streaming.api.datastream.DataStream;importorg.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;importorg.apache.flink.streaming.api.environment.StreamExecutionEnvironment;importorg.apache.flink.streaming.api.functions.source.SourceFunction;importorg.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;importorg.apache.flink.streaming.api.windowing.time.Time;importorg.apache.flink.streaming.api.windowing.windows.TimeWindow;importorg.apache.flink.util.Collector;importwikiedits.func.model.KeyCount;importjava.text.SimpleDateFormat;importjava.util.Date;publicclassProcessWindowFunctionDemo{publicstaticvoidmain(String[] args)throwsException{finalStreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 并行度为1
env.setParallelism(1);// 设置数据源,一共三个元素DataStream<Tuple2<String,Integer>> dataStream = env.addSource(newSourceFunction<Tuple2<String,Integer>>(){@Overridepublicvoidrun(SourceContext<Tuple2<String,Integer>> ctx)throwsException{int xxxNum =0;int yyyNum =0;for(int i =1; i <Integer.MAX_VALUE; i++){// 只有XXX和YYY两种nameString name =(0== i %2)?"XXX":"YYY";//更新aaa和bbb元素的总数if(0== i %2){
xxxNum++;}else{
yyyNum++;}// 使用当前时间作为时间戳long timeStamp =System.currentTimeMillis();// 将数据和时间戳打印出来,用来验证数据System.out.println(String.format("source,%s, %s, XXX total : %d, YYY total : %d\n",
name,time(timeStamp),
xxxNum,
yyyNum));// 发射一个元素,并且戴上了时间戳
ctx.collectWithTimestamp(newTuple2<String,Integer>(name,1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1000);}}@Overridepublicvoidcancel(){}});// 将数据用5秒的滚动窗口做划分,再用ProcessWindowFunctionSingleOutputStreamOperator<String> mainDataStream = dataStream
// 以Tuple2的f0字段作为key,本例中实际上key只有aaa和bbb两种.keyBy(value -> value.f0)// 5秒一次的滚动窗口.timeWindow(Time.seconds(5))// 统计每个key当前窗口内的元素数量,然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(newProcessWindowFunction<Tuple2<String,Integer>,String,String,TimeWindow>(){// 自定义状态privateValueState<KeyCount> state;@Overridepublicvoidopen(Configuration parameters)throwsException{// 初始化状态,name是myState
state =getRuntimeContext().getState(newValueStateDescriptor<>("myState",KeyCount.class));}publicvoidclear(Context context){ValueState<KeyCount> contextWindowValueState = context.windowState().getState(newValueStateDescriptor<>("myWindowState",KeyCount.class));
contextWindowValueState.clear();}@Overridepublicvoidprocess(String s,Context context,Iterable<Tuple2<String,Integer>> iterable,Collector<String> collector)throwsException{// 从backend取得当前单词的myState状态KeyCount current = state.value();// 如果myState还从未没有赋值过,就在此初始化if(current ==null){
current =newKeyCount();
current.key = s;
current.count =0;}int count =0;// iterable可以访问该key当前窗口内的所有数据,// 这里简单处理,只统计了元素数量for(Tuple2<String,Integer> tuple2 : iterable){
count++;}// 更新当前key的元素总数
current.count += count;// 更新状态到backend
state.update(current);System.out.println("getRuntimeContext() == context :"+(getRuntimeContext()== context));ValueState<KeyCount> contextWindowValueState = context.windowState().getState(newValueStateDescriptor<>("myWindowState",KeyCount.class));ValueState<KeyCount> contextGlobalValueState = context.globalState().getState(newValueStateDescriptor<>("myGlobalState",KeyCount.class));KeyCount windowValue = contextWindowValueState.value();if(windowValue ==null){
windowValue =newKeyCount();
windowValue.key = s;
windowValue.count =0;}
windowValue.count += count;
contextWindowValueState.update(windowValue);KeyCount globalValue = contextGlobalValueState.value();if(globalValue ==null){
globalValue =newKeyCount();
globalValue.key = s;
globalValue.count =0;}
globalValue.count += count;
contextGlobalValueState.update(globalValue);ValueState<KeyCount> contextWindowSameNameState =
context.windowState().getState(newValueStateDescriptor<>("myState",KeyCount.class));ValueState<KeyCount> contextGlobalSameNameState =
context.globalState().getState(newValueStateDescriptor<>("myState",KeyCount.class));System.out.println("contextWindowSameNameState == contextGlobalSameNameState :"+(
contextWindowSameNameState == contextGlobalSameNameState));System.out.println("state == contextGlobalSameNameState :"+(state == contextGlobalSameNameState));// 将当前key及其窗口的元素数量,还有窗口的起止时间整理成字符串String value =String.format("window, %s, %s - %s, %d, total : %d, windowStateCount :%s, globalStateCount :%s\n",// 当前key
s,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数
count,// 当前key出现的总数
current.count,
contextWindowValueState.value(),
contextGlobalValueState.value());// 发射到下游算子
collector.collect(value);}});// 打印结果,通过分析打印信息,检查ProcessWindowFunction中可以处理所有key的整个窗口的数据
mainDataStream.print();
env.execute("processfunction demo : processwindowfunction");}publicstaticStringtime(long timeStamp){returnnewSimpleDateFormat("hh:mm:ss").format(newDate(timeStamp));}}
输出结果:
window,XXX,08:34:45-08:34:50,3, total :22, windowStateCount :KeyCount{key='XXX', count=3}, globalStateCount :KeyCount{key='XXX', count=22}
window,YYY,08:34:45-08:34:50,2, total :22, windowStateCount :KeyCount{key='YYY', count=2}, globalStateCount :KeyCount{key='YYY', count=22}
从结果可以验证以上的结论,此外需要特别注意的一点是context.windowState()的状态需要在clear方法中清理掉,因为一旦时间窗口结束,就再也没有机会清理了
从这个例子中还发现一个比较有趣的现象:
ValueState<KeyCount> state =getRuntimeContext().getState(newValueStateDescriptor<>("myState",KeyCount.class));ValueState<KeyCount> contextWindowSameNameState =
context.windowState().getState(newValueStateDescriptor<>("myState",KeyCount.class));ValueState<KeyCount> contextGlobalSameNameState =
context.globalState().getState(newValueStateDescriptor<>("myState",KeyCount.class));
在open中通过getRuntimeContext().getState定义的状态竟然可以通过 context.windowState()/ context.globalState()访问到,并且他们指向的都是同一个变量,可以参见代码的输出:
System.out.println("contextWindowSameNameState == contextGlobalSameNameState :"+(
contextWindowSameNameState == contextGlobalSameNameState));System.out.println("state == contextGlobalSameNameState :"+(state == contextGlobalSameNameState));
结果如下:
contextWindowSameNameState == contextGlobalSameNameState :true
state == contextGlobalSameNameState :true
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。