0


Flink源码之State创建流程

SateManager

StreamOperatorStateHandler

在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateHandler类型的成员变量, StreamOperatorStateHandler对象变量封装了keyedStatedBackend和operatorStateBackend,用于统一管理SteamOperator的状态。

 OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法
        AbstractStreamOperator::initializeState(StreamTaskStateInitializer) 
            StreamTaskStateInitializerImpl::streamOperatorStateContext //此时会创建keyedStatedBackend和operatorStateBackend
            StreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成员变量,用于状态管理
            StreamOperatorStateHandler::initializeOperatorState
            StateInitializationContextImpl::new //封装DefaultKeyedStateStore和OperatorStateStore
            CheckpointedStreamOperator::initializeState(StateInitializationContext)//调用用户定义函数中的initializeState方法,可获取Operator State
        StreamingRuntimeContext::setKeyedStateStore

Flink中主要有两种StateBackend:

  • HashMapStateBackend //内存
  • EmbeddedRocksDBStateBackend //内存+磁盘

每个StreamTask一个StateBackend成员变量,在构造函数中进行初始化,通过用户代码中设置或StateBackendLoader::loadStateBackendFromConfig从配置中加载,默认为HashMapStateBackend。简单起见,以HashMapStateBackend为例剖析创建KeyedStatedBackend和OperatorStateBackend以及处理数据流时是如何使用KeyedState和OperatorState的。

OperatorState

OperatorState创建流程:

OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法
    AbstractStreamOperator::initializeState
        StreamTaskStateInitializerImpl::streamOperatorStateContext
            StreamTaskStateInitializerImpl::operatorStateBackend
            HashMapStateBackend::createOperatorStateBackend //创建DefaultOperatorStateBackend
        StreamOperatorStateHandler::new //创建StreamOperatorStateHandler
        StreamOperatorStateHandler::initializeOperatorState //调用CheckpointedFunction::initializeState
            StateInitializationContextImpl::new //该实例可getOperatorStateStore

使用Operator State的用户业务代码需要实现CheckpointedFunction接口,该接口中有以两个下方法:

voidinitializeState(FunctionInitializationContext context)throwsException;voidsnapshotState(FunctionSnapshotContext context)throwsException;

其中initializeState方法则会被StreamOperatorStateHandler.initializeOperatorState 调用,在initializeState方法中可使用

FunctionInitializationContext.getOperatorStateStore().getListState(ListStateDescriptor)DefaultOperatorStateBackend::getListState::newPartitionableListState::new//内部是ArrayList

因此通过OperatorStateStore获取的ListState内部本质上是一个ArrayList, 业务代码中可以调用add方法向这个内部List添加元素,由StateBackend管理每个Operator State,这样就实现了一个分布式状态管理,借助Checkpoint可以实现状态持久化及容灾恢复。

OperatorStateStore有三个获取状态方法:

<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
            throws Exception

KeyedState

KeyedState创建流程如下:

OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法
    AbstractStreamOperator::initializeState
        StreamTaskStateInitializerImpl::streamOperatorStateContext
            StreamTaskStateInitializerImpl::keyedStatedBackend
            HashMapStateBackend::createKeyedStateBackend //创建HeapKeyedStateBackend
                HeapKeyedStateBackendBuilder::build
                    InternalKeyContextImpl::new //用于保存当前正在处理的key
                    
        StreamOperatorStateHandler::new //创建StreamOperatorStateHandler
            DefaultKeyedStateStore::new //创建DefaultKeyedStateStore
        StreamingRuntimeContext::setKeyedStateStore //设置keyedStateStore成员变量
    AbstractStreamUdfOperator::open
        FunctionUtils::openFunction
            RichFunction::open

KeyedStateStore保存在StreamingRuntimeContext中,使用KeyedState时,用户自定义函数实现RichFunction接口,在open方法中调用getRuntimeContext().getState方法获取状态:

getRuntimeContext().getState() //获取ValueState
DefaultKeyedStateStore::getState
DefaultKeyedStateStore::getPartitionedState
HeapKeyedStateBackend::getPartitionedState
AbstractKeyedStateBackend::getOrCreateKeyedState
    LatencyTrackingStateFactory::createStateAndWrapWithLatencyTrackingIfEnabled
    TtlStateFactory::createStateAndWrapWithTtlIfEnabled //包装TTL
    HeapKeyedStateBackend::createInternalState
    HeapKeyedStateBackend::tryRegisterStateTable //这里很关键,对每个State创建一个StateTable
        CopyOnWriteStateTable::new//异步快照,这里传递了当前KeyedStateBackend的InternalKeyContext
        StateTable::new //根据当前Task管理的KeyGroups数量创建StateMap数组
        CopyOnWriteStateTable::createStateMap //一个KeyGroup一个StateMap
        CopyOnWriteStateMap::new //存储key及其对应的状态
   HeapValueState::create
           HeapValueState::new //有个成员变量指向存储当前state的CopyOnWriteStateMap
       HeapValueState::setCurrentNamespace  //默认为VoidNamespace

KeyedState有以下几种类型

ValueState<T> getState(ValueStateDescriptor<T> stateProperties) 获取HeapValueState

ListState<T> getListState(ListStateDescriptor<T> stateProperties)获取HeapListState

MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)获取HeapMapState

getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties)获取HeapAggregatingState

getReducingState(ReducingStateDescriptor<T> stateProperties)获取HeapReducingState

RocksDBStateBackend

EmbeddedRocksDBStateBackend 管理OperatorState与HashMapStateBackend 一样,也是通过DefaultOperatorStateBackend进行管理的。

EmbeddedRocksDBStateBackend 管理KeyedState则是使用RocksDBKeyedStateBackend实现,这样可以借助磁盘加内存进行大状态管理:

RocksDBValueState
RocksDBListState
RocksDBMapState
RocksDBAggregatingState
RocksDBReducingState

总结

Flink内置状态管理是相比其他分布式流式处理系统最大的优势之一,不用借助外部存储组件,就可实现高效可靠的分布式状态管理,极大降低了学习和使用成本。

标签: flink 大数据

本文转载自: https://blog.csdn.net/jinjiating/article/details/132299910
版权归原作者 ImproveJin 所有, 如有侵权,请联系我们删除。

“Flink源码之State创建流程”的评论:

还没有评论