StreamOperatorStateHandler
在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateHandler类型的成员变量, StreamOperatorStateHandler对象变量封装了keyedStatedBackend和operatorStateBackend,用于统一管理SteamOperator的状态。
OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法
AbstractStreamOperator::initializeState(StreamTaskStateInitializer)
StreamTaskStateInitializerImpl::streamOperatorStateContext //此时会创建keyedStatedBackend和operatorStateBackend
StreamOperatorStateHandler::new //初始化StreamOperator的stateHandler成员变量,用于状态管理
StreamOperatorStateHandler::initializeOperatorState
StateInitializationContextImpl::new //封装DefaultKeyedStateStore和OperatorStateStore
CheckpointedStreamOperator::initializeState(StateInitializationContext)//调用用户定义函数中的initializeState方法,可获取Operator State
StreamingRuntimeContext::setKeyedStateStore
Flink中主要有两种StateBackend:
- HashMapStateBackend //内存
- EmbeddedRocksDBStateBackend //内存+磁盘
每个StreamTask一个StateBackend成员变量,在构造函数中进行初始化,通过用户代码中设置或StateBackendLoader::loadStateBackendFromConfig从配置中加载,默认为HashMapStateBackend。简单起见,以HashMapStateBackend为例剖析创建KeyedStatedBackend和OperatorStateBackend以及处理数据流时是如何使用KeyedState和OperatorState的。
OperatorState
OperatorState创建流程:
OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法
AbstractStreamOperator::initializeState
StreamTaskStateInitializerImpl::streamOperatorStateContext
StreamTaskStateInitializerImpl::operatorStateBackend
HashMapStateBackend::createOperatorStateBackend //创建DefaultOperatorStateBackend
StreamOperatorStateHandler::new //创建StreamOperatorStateHandler
StreamOperatorStateHandler::initializeOperatorState //调用CheckpointedFunction::initializeState
StateInitializationContextImpl::new //该实例可getOperatorStateStore
使用Operator State的用户业务代码需要实现CheckpointedFunction接口,该接口中有以两个下方法:
voidinitializeState(FunctionInitializationContext context)throwsException;voidsnapshotState(FunctionSnapshotContext context)throwsException;
其中initializeState方法则会被StreamOperatorStateHandler.initializeOperatorState 调用,在initializeState方法中可使用
FunctionInitializationContext.getOperatorStateStore().getListState(ListStateDescriptor)DefaultOperatorStateBackend::getListState::newPartitionableListState::new//内部是ArrayList
因此通过OperatorStateStore获取的ListState内部本质上是一个ArrayList, 业务代码中可以调用add方法向这个内部List添加元素,由StateBackend管理每个Operator State,这样就实现了一个分布式状态管理,借助Checkpoint可以实现状态持久化及容灾恢复。
OperatorStateStore有三个获取状态方法:
<S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception;
<K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
throws Exception
KeyedState
KeyedState创建流程如下:
OperatorChain::initializeStateAndOpenOperators //调用每个Operator的initializeState和Open方法
AbstractStreamOperator::initializeState
StreamTaskStateInitializerImpl::streamOperatorStateContext
StreamTaskStateInitializerImpl::keyedStatedBackend
HashMapStateBackend::createKeyedStateBackend //创建HeapKeyedStateBackend
HeapKeyedStateBackendBuilder::build
InternalKeyContextImpl::new //用于保存当前正在处理的key
StreamOperatorStateHandler::new //创建StreamOperatorStateHandler
DefaultKeyedStateStore::new //创建DefaultKeyedStateStore
StreamingRuntimeContext::setKeyedStateStore //设置keyedStateStore成员变量
AbstractStreamUdfOperator::open
FunctionUtils::openFunction
RichFunction::open
KeyedStateStore保存在StreamingRuntimeContext中,使用KeyedState时,用户自定义函数实现RichFunction接口,在open方法中调用getRuntimeContext().getState方法获取状态:
getRuntimeContext().getState() //获取ValueState
DefaultKeyedStateStore::getState
DefaultKeyedStateStore::getPartitionedState
HeapKeyedStateBackend::getPartitionedState
AbstractKeyedStateBackend::getOrCreateKeyedState
LatencyTrackingStateFactory::createStateAndWrapWithLatencyTrackingIfEnabled
TtlStateFactory::createStateAndWrapWithTtlIfEnabled //包装TTL
HeapKeyedStateBackend::createInternalState
HeapKeyedStateBackend::tryRegisterStateTable //这里很关键,对每个State创建一个StateTable
CopyOnWriteStateTable::new//异步快照,这里传递了当前KeyedStateBackend的InternalKeyContext
StateTable::new //根据当前Task管理的KeyGroups数量创建StateMap数组
CopyOnWriteStateTable::createStateMap //一个KeyGroup一个StateMap
CopyOnWriteStateMap::new //存储key及其对应的状态
HeapValueState::create
HeapValueState::new //有个成员变量指向存储当前state的CopyOnWriteStateMap
HeapValueState::setCurrentNamespace //默认为VoidNamespace
KeyedState有以下几种类型
ValueState<T> getState(ValueStateDescriptor<T> stateProperties) 获取HeapValueState
ListState<T> getListState(ListStateDescriptor<T> stateProperties)获取HeapListState
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties)获取HeapMapState
getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties)获取HeapAggregatingState
getReducingState(ReducingStateDescriptor<T> stateProperties)获取HeapReducingState
RocksDBStateBackend
EmbeddedRocksDBStateBackend 管理OperatorState与HashMapStateBackend 一样,也是通过DefaultOperatorStateBackend进行管理的。
EmbeddedRocksDBStateBackend 管理KeyedState则是使用RocksDBKeyedStateBackend实现,这样可以借助磁盘加内存进行大状态管理:
RocksDBValueState
RocksDBListState
RocksDBMapState
RocksDBAggregatingState
RocksDBReducingState
总结
Flink内置状态管理是相比其他分布式流式处理系统最大的优势之一,不用借助外部存储组件,就可实现高效可靠的分布式状态管理,极大降低了学习和使用成本。
版权归原作者 ImproveJin 所有, 如有侵权,请联系我们删除。