0


Flink window 源码分析4:WindowState

本文源码为flink 1.18.0版本。
其他相关文章:
Flink window 源码分析1:窗口整体执行流程
Flink window 源码分析2:Window 的主要组件
Flink window 源码分析3:WindowOperator
Flink window 源码分析4:WindowState

reduce、aggregate 等函数中怎么使用 WindowState ?

主要考虑 reduce、aggregate 函数中的托管状态是在什么时候触发和使用的?使用时与WindowState有什么联系?

  1. 状态描述器 若用户定义了 Evictor,则窗口中创建 ListState 描述器:
ListStateDescriptor<StreamRecord<T>> stateDesc =newListStateDescriptor<>(WINDOW_STATE_NAME, streamRecordSerializer);

在 process、apply 中创建 ListState 描述器:

ListStateDescriptor<T> stateDesc =newListStateDescriptor<>(WINDOW_STATE_NAME, inputType.createSerializer(config));

在 reduce 中创建 ReducingState 描述器:

ReducingStateDescriptor<T> stateDesc =newReducingStateDescriptor<>(WINDOW_STATE_NAME, reduceFunction, inputType.createSerializer(config));

在 aggregate 中创建 AggregatingState 描述器:

AggregatingStateDescriptor<T, ACC,V> stateDesc =newAggregatingStateDescriptor<>(WINDOW_STATE_NAME,  
                aggregateFunction,  
                accumulatorType.createSerializer(config));

在创建ReducingState、AggregatingState时,直接将用户定义的函数添加到状态中。
2. 创建状态
上述描述器会在 WindowOperator.open() 方法中使用。getOrCreateKeyedState() 会从状态后端中创建或检索相应的状态。最终会得到的状态类型是 InternalAppendingState 。
如果是会话窗口,会进一步将 windowState 转换为 windowMergingState, 其类型为 InternalMergingState。

// create (or restore) the state that hold the actual window contents  // NOTE - the state may be null in the case of the overriding evicting window operator  if(windowStateDescriptor !=null){  
    windowState =(InternalAppendingState<K,W, IN, ACC, ACC>)getOrCreateKeyedState(windowSerializer, windowStateDescriptor);}
  1. 使用状态 在 WindowOperator 类中的 processElement()、
windowState.setCurrentNamespace(stateWindow);  
windowState.add(element.getValue());... # 判断窗口是否触发
if(triggerResult.isFire()){ACC contents = windowState.get();if(contents ==null){continue;}emitWindowContents(actualWindow, contents);}if(triggerResult.isPurge()){  
    windowState.clear();}

会调用 windowState 的 add() 和 get() 来添加元素或获取元素。这两个方法在接口 AppendingState 中做了定义,前面的那些类都继承该接口。不同类型的 State 对这两个函数的具体实现是不同的。

@PublicEvolvingpublicinterfaceAppendingState<IN, OUT>extendsState{OUTget()throwsException;voidadd(IN var1)throwsException;}

为了可以在emitWindowContents()函数中统一调用用户自定义的代码,会将用户自定义的代码转换为 InternalIterableWindowFunction 类型,在该类型的 process() 方法中会执行用户定义的逻辑。若 windowState 是 ReducingState 或 AggregatingState 类型,则会提供“空”的 InternalIterableWindowFunction,因为逻辑已经绑定到 windowState 上了。

  • 若使用的 process()、apply() 方法,在调用add()方法时,则向ListState中添加数据。等待 emitWindowContents() 函数执行时,使用用户定义的Function处理数据。
  • 若使用的 reduce()、aggragate() 函数,在add()时进行聚合。emitWindowContents()函数执行时,直接将状态中聚合的数据进行提交。
  • 若用户定义了 Evictor,在调用add()方法时,则向ListState中添加数据。等待 emitWindowContents() 函数执行时,使用用户定义的Function处理数据。如果是使用的reduce()、aggragate()函数,那么会在这里遍历窗口的所有数据,反复执行用户自定义的函数。在执行用户窗口处理函数前后会执行用户定义的 Evictor 中的方法 evictBefore() 和 evictAfter() ,这两个函数中可能对窗口的历史数据做处理。
evictorContext.evictBefore(recordsWithTimestamp,Iterables.size(recordsWithTimestamp));... # 执行用户窗口处理函数
evictorContext.evictAfter(recordsWithTimestamp,Iterables.size(recordsWithTimestamp));

不同滑动窗口的 state 是否会有重叠?

这节的代码在WindowOperator.processElement()中。
滑动窗口与滚动窗口的底层代码相同,区别只是 WindowAssigner 不同。
滑动窗口到达新数据时,该数据可能属于多个窗口,那么 WindowAssigner 会返回窗口集合:

finalCollection<W> elementWindows =  
        windowAssigner.assignWindows(  
                element.getValue(), element.getTimestamp(), windowAssignerContext);

进而会访问该集合的每个元素(窗口),在每个窗口中都处理一次该元素:

for(W window : elementWindows){
    # 判断数据是否迟到
    # 数据添加到 windowState中
    # 判断该数据是否触发窗口操作
    # 如果触发窗口操作,则进一步处理
}

在该循环中,每个窗口访问其状态时,为了区别,需要设置 namespace:

windowState.setCurrentNamespace(window);  
windowState.add(element.getValue());

每个窗口就是一个 namespace。namespace 不同,访问到的状态会有联系吗?状态存储和检索时通过 StateTable 管理,StateTable 具体是使用 StateMap 管理每个 state 。StateMap 的 value 就是 state 。key 和 namespace 会分别序列化成 byte,两个 byte 数组拼接起来作为 StateMap 的 key。所以,namespace 不同,状态也不同。所以每个 window 单独存储其 state。如果一个数据属于多个窗口,那么它会被复制多份存储。
下面是通过key和namespace获得数据存储位置的函数(不重要):

MemorySegmentserializeToSegment(K key,N namespace){  
    outputStream.reset();try{// serialize namespace  
        outputStream.setPosition(Integer.BYTES);  
        namespaceSerializer.serialize(namespace, outputView);}catch(IOException e){thrownewRuntimeException("Failed to serialize namespace", e);}int keyStartPos = outputStream.getPosition();try{// serialize key  
        outputStream.setPosition(keyStartPos +Integer.BYTES);  
        keySerializer.serialize(key, outputView);}catch(IOException e){thrownewRuntimeException("Failed to serialize key", e);}finalbyte[] result = outputStream.toByteArray();finalMemorySegment segment =MemorySegmentFactory.wrap(result);// set length of namespace and key  
    segment.putInt(0, keyStartPos -Integer.BYTES);  
    segment.putInt(keyStartPos, result.length - keyStartPos -Integer.BYTES);return segment;}

会话窗口比较特殊,会涉及到 windowState 的合并:

// merge the merged state windows into the newly resulting  // state window  
windowMergingState.mergeNamespaces(  
        stateWindowResult, mergedStateWindows);

WindowState 与 用户自定义 KeyedState

两者创建过程不同(下面有两者创建状态的源码),但创建出的同一类型状态是一样的。因为在 getPartitionedState() 方法中,如果状态在以前没有创建过,则使用的就是 getOrCreateKeyedState() 方法进行创建,因此两方法所得到的状态是一样的。那么状态描述符一样的话,状态使用起来也就没差别

  • WindowState 创建使用 AbstractKeyedStateBackend.getOrCreateKeyedState() 方法 :
@Override@SuppressWarnings("unchecked")public<N,SextendsState,V>SgetOrCreateKeyedState(finalTypeSerializer<N> namespaceSerializer,StateDescriptor<S,V> stateDescriptor)throwsException{checkNotNull(namespaceSerializer,"Namespace serializer");checkNotNull(  
            keySerializer,"State key serializer has not been configured in the config. "+"This operation cannot use partitioned state.");  

    # 判断之前是否已经创建过该状态
    InternalKvState<K,?,?> kvState = keyValueStatesByName.get(stateDescriptor.getName());if(kvState ==null){if(!stateDescriptor.isSerializerInitialized()){  
            stateDescriptor.initializeSerializerUnlessSet(executionConfig);}  
        kvState =LatencyTrackingStateFactory.createStateAndWrapWithLatencyTrackingIfEnabled(TtlStateFactory.createStateAndWrapWithTtlIfEnabled(  
                                namespaceSerializer, stateDescriptor,this, ttlTimeProvider),  
                        stateDescriptor,  
                        latencyTrackingStateConfig);  
        keyValueStatesByName.put(stateDescriptor.getName(), kvState);publishQueryableStateIfEnabled(stateDescriptor, kvState);}return(S) kvState;}
  • KeyedState 创建使用 AbstractKeyedStateBackend.getPartitionedState() 方法 :
@SuppressWarnings("unchecked")@Overridepublic<N,SextendsState>SgetPartitionedState(finalN namespace,finalTypeSerializer<N> namespaceSerializer,finalStateDescriptor<S,?> stateDescriptor)throwsException{checkNotNull(namespace,"Namespace");  

    # 如果上一次创建的与这一次是同一个name,则返回上次创建的
    if(lastName !=null&& lastName.equals(stateDescriptor.getName())){  
        lastState.setCurrentNamespace(namespace);return(S) lastState;}  

    # 若之前已经创建过该状态,则返回之前创建的
    # 在 getOrCreateKeyedState 函数中也有该判断
    InternalKvState<K,?,?> previous = keyValueStatesByName.get(stateDescriptor.getName());if(previous !=null){  
        lastState = previous;  
        lastState.setCurrentNamespace(namespace);  
        lastName = stateDescriptor.getName();return(S) previous;}  

    # 之前没创建过该状态,则创建个新的
    # 此处 getOrCreateKeyedState() 一定是 Create 行为,因为上面的if判断是假
    finalS state =getOrCreateKeyedState(namespaceSerializer, stateDescriptor);finalInternalKvState<K,N,?> kvState =(InternalKvState<K,N,?>) state;  
  
    lastName = stateDescriptor.getName();  
    lastState = kvState;  
    kvState.setCurrentNamespace(namespace);return state;}

WindowState 的数据存取是怎么实现的?

WindowState 在窗口中主要使用方法是 add()、get()、clear()。
下面主要考虑 Heap-backed。

  1. add()
  • HeapListState.add()
@Overridepublicvoidadd(V value){Preconditions.checkNotNull(value,"You cannot add null to a ListState.");finalN namespace = currentNamespace;finalStateTable<K,N,List<V>> map = stateTable;List<V> list = map.get(namespace);if(list ==null){  
        list =newArrayList<>();  
        map.put(namespace, list);}  
    list.add(value);}

这个比较简单。ListState 存储数据的数据结构为 ArrayList,这里只需要使用 ArrayList.add()方法添加数据即可。

  • HeapReducingState.add()
@Overridepublicvoidadd(V value)throwsIOException{if(value ==null){clear();return;}try{  
        stateTable.transform(currentNamespace, value, reduceTransformation);}catch(Exception e){thrownewIOException("Exception while applying ReduceFunction in reducing state", e);}}

调用了 stateTable.transform() 方法修改状态值,reduceTransformation 是用户定义的聚合逻辑。按照下面访问具体逻辑:

stateTable.transform(currentNamespace, value, reduceTransformation)->getMapForKeyGroup(keyGroup).transform(key, namespace, value, transformation)->CopyOnWriteStateMap.transform(key, namespace, value, transformation)

则看到以下代码:

@Overridepublic<T>voidtransform(K key,N namespace,T value,StateTransformationFunction<S,T> transformation)throwsException{finalStateMapEntry<K,N,S> entry =putEntry(key, namespace);// copy-on-write check for state  
    entry.state =  
            transformation.apply((entry.stateVersion < highestRequiredSnapshotVersion)?getStateSerializer().copy(entry.state): entry.state,  
                    value);  
    entry.stateVersion = stateMapVersion;}

这里 transformation.apply() 将 entry.state 与新到达的数据 value 进行聚合,并写回 entry.state。

  • HeapAggregatingState.add() 与 HeapReducingState.add() 几乎完全一样,不做赘述。
  1. get() 三种状态的 get() 方法是相同的,如下面代码所示,调用 getInternal() 实现。ListState 通过 get() 函数获得一个迭代器,ReducingState 和 AggregatingState 则获得聚合值。 HeapListState、HeapReducingState、HeapAggregatingState的get():
@OverridepublicIterable<V>get(){returngetInternal();}

这里是封装的 getInternal() 方法,按照下面访问具体的逻辑:

getInternal()-> stateTable.get(namespace)->get(key, keyGroupIndex, namespace)->getMapForKeyGroup(keyGroupIndex).get(key, namespace)->CopyOnWriteStateMap.get(key, namespace)

则看到以下代码:

@OverridepublicSget(K key,N namespace){finalint hash =computeHashForOperationAndDoIncrementalRehash(key, namespace);finalint requiredVersion = highestRequiredSnapshotVersion;finalStateMapEntry<K,N,S>[] tab =selectActiveTable(hash);int index = hash &(tab.length -1);for(StateMapEntry<K,N,S> e = tab[index]; e !=null; e = e.next){finalK eKey = e.key;finalN eNamespace = e.namespace;if((e.hash == hash && key.equals(eKey)&& namespace.equals(eNamespace))){// copy-on-write check for state  if(e.stateVersion < requiredVersion){// copy-on-write check for entry  if(e.entryVersion < requiredVersion){  
                    e =handleChainedEntryCopyOnWrite(tab, hash &(tab.length -1), e);}  
                e.stateVersion = stateMapVersion;  
                e.state =getStateSerializer().copy(e.state);}return e.state;}}returnnull;}

这里就是通过 StateMap 直接获取状态的值。
3. clear()
三种状态的 clear() 方法也是一样的。
AbstractHeapState.clear():

@Overridepublicfinalvoidclear(){  
    stateTable.remove(currentNamespace);}

可按照下面访问具体逻辑:

stateTable.remove(namespace)->remove(key,keyGroupIndex(), namespace)->getMapForKeyGroup(keyGroupIndex).remove(key, namespace)->CopyOnWriteStateMap.remove(key, namespace)->removeEntry(key, namespace)

则可看到如下代码:

privateStateMapEntry<K,N,S>removeEntry(K key,N namespace){finalint hash =computeHashForOperationAndDoIncrementalRehash(key, namespace);finalStateMapEntry<K,N,S>[] tab =selectActiveTable(hash);int index = hash &(tab.length -1);for(StateMapEntry<K,N,S> e = tab[index], prev =null; e !=null; prev = e, e = e.next){if(e.hash == hash && key.equals(e.key)&& namespace.equals(e.namespace)){if(prev ==null){  
                tab[index]= e.next;}else{// copy-on-write check for entry  if(prev.entryVersion < highestRequiredSnapshotVersion){  
                    prev =handleChainedEntryCopyOnWrite(tab, index, prev);}  
                prev.next = e.next;}++modCount;if(tab == primaryTable){--primaryTableSize;}else{--incrementalRehashTableSize;}return e;}}returnnull;}

本文转载自: https://blog.csdn.net/White_Ink_/article/details/135339269
版权归原作者 White_Ink_ 所有, 如有侵权,请联系我们删除。

“Flink window 源码分析4:WindowState”的评论:

还没有评论