背景
算子的联合列表状态是平时使用的比较少的一种状态,本文通过kafka的消费者实现来看一下怎么使用算子列表联合状态
算子联合列表状态
首先我们看一下算子联合列表状态的在进行故障恢复或者从某个保存点进行扩缩容启动应用时状态的恢复情况
算子联合列表状态主要由这两个方法处理:
1初始化方法
publicfinalvoidinitializeState(FunctionInitializationContext context)throwsException{OperatorStateStore stateStore = context.getOperatorStateStore();// 在初始化方法中获取联合列表状态this.unionOffsetStates =
stateStore.getUnionListState(newListStateDescriptor<>(OFFSETS_STATE_NAME,createStateSerializer(getRuntimeContext().getExecutionConfig())));if(context.isRestored()){
restoredState =newTreeMap<>(newKafkaTopicPartition.Comparator());// 把联合列表状态的数据都恢复成类的本地变量中// populate actual holder for restored statefor(Tuple2<KafkaTopicPartition,Long> kafkaOffset : unionOffsetStates.get()){
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);}LOG.info("Consumer subtask {} restored state: {}.",getRuntimeContext().getIndexOfThisSubtask(),
restoredState);}else{LOG.info("Consumer subtask {} has no restore state.",getRuntimeContext().getIndexOfThisSubtask());}}
2.开始通知检查点开始的方法:
publicfinalvoidsnapshotState(FunctionSnapshotContext context)throwsException{if(!running){LOG.debug("snapshotState() called on closed source");}else{
unionOffsetStates.clear();finalAbstractFetcher<?,?> fetcher =this.kafkaFetcher;if(fetcher ==null){// the fetcher has not yet been initialized, which means we need to return the// originally restored offsets or the assigned partitionsfor(Map.Entry<KafkaTopicPartition,Long> subscribedPartition :
subscribedPartitionsToStartOffsets.entrySet()){// 进行checkpoint时,把数据保存到联合列表状态中进行保存
unionOffsetStates.add(Tuple2.of(
subscribedPartition.getKey(), subscribedPartition.getValue()));}if(offsetCommitMode ==OffsetCommitMode.ON_CHECKPOINTS){// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);}}else{HashMap<KafkaTopicPartition,Long> currentOffsets = fetcher.snapshotCurrentState();if(offsetCommitMode ==OffsetCommitMode.ON_CHECKPOINTS){// the map cannot be asynchronously updated, because only one checkpoint call// can happen// on this function at a time: either snapshotState() or// notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);}for(Map.Entry<KafkaTopicPartition,Long> kafkaTopicPartitionLongEntry :
currentOffsets.entrySet()){
unionOffsetStates.add(Tuple2.of(
kafkaTopicPartitionLongEntry.getKey(),
kafkaTopicPartitionLongEntry.getValue()));}}if(offsetCommitMode ==OffsetCommitMode.ON_CHECKPOINTS){// truncate the map of pending offsets to commit, to prevent infinite growthwhile(pendingOffsetsToCommit.size()>MAX_NUM_PENDING_CHECKPOINTS){
pendingOffsetsToCommit.remove(0);}}}}
版权归原作者 lixia0417mul2 所有, 如有侵权,请联系我们删除。