一文弄懂Flink重要源码
1. Flink 状态源码
1.1 valueState源码
1.1.1 Update方法
classHeapValueState<K,N,V>extendsAbstractHeapState<K,N,V>implementsInternalValueState<K,N,V>{/** The current namespace, which the access methods will refer to. */protectedN currentNamespace;/** Map containing the actual key/value pairs. */protectedfinalStateTable<K,N, SV> stateTable;@Overridepublicvoidupdate(V value){if(value ==null){clear();return;}// 向state中传值,namespace从上下文获取
stateTable.put(currentNamespace, value);}}
publicabstractclassStateTable<K,N,S>implementsStateSnapshotRestore,Iterable<StateEntry<K,N,S>>{// Maps the composite of active key and given namespace to the specified state.// 根据key+namespace找到对应的state// 其中,key从上下文中获取,入参中的state是用户需要保存的值publicvoidput(N namespace,S state){put(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace, state);}publicvoidput(K key,int keyGroup,N namespace,S state){checkKeyNamespacePreconditions(key, namespace);StateMap<K,N,S> stateMap =getMapForKeyGroup(keyGroup);// 将 state 根据 key 的值放入到 stateMap 中
stateMap.put(key, namespace, state);}}
publicclassCopyOnWriteStateMap<K,N,S>extendsStateMap<K,N,S>{@Overridepublicvoidput(K key,N namespace,S value){// 获取当前key对应的StateMapEntry结构finalStateMapEntry<K,N,S> e =putEntry(key, namespace);// 获取到key+namespace对应的StateMapEntry对象之后,将value赋值进去
e.state = value;
e.stateVersion = stateMapVersion;}privateStateMapEntry<K,N,S>putEntry(K key,N namespace){// 根据key+namespace求hash值,然后再通过hash值得到位于数组tab中的StateMapEntry结构的准确下标finalint hash =computeHashForOperationAndDoIncrementalRehash(key, namespace);finalStateMapEntry<K,N,S>[] tab =selectActiveTable(hash);// 这里的位与运算与hashmap中的寻址方法一模一样,位与运算可以保证得到的index值小于数组长度int index = hash &(tab.length -1);for(StateMapEntry<K,N,S> e = tab[index]; e !=null; e = e.next){// StateMapEntry结构以链表的形式串联,因此找到index之后,还需要遍历链表,通过key+namespace找到目标StateMapEntry,是不是跟hashmap很像?if(e.hash == hash && key.equals(e.key)&& namespace.equals(e.namespace)){// copy-on-write check for entryif(e.entryVersion < highestRequiredSnapshotVersion){
e =handleChainedEntryCopyOnWrite(tab, index, e);}return e;}}++modCount;if(size()> threshold){doubleCapacity();}returnaddNewStateMapEntry(tab, key, namespace, hash);}}
保存key+namespace+value的最终结构是StateMapEntry,定义如下:
protectedstaticclassStateMapEntry<K,N,S>implementsStateEntry<K,N,S>{/**
* The key. Assumed to be immumap and not null.
*/@NonnullfinalK key;/**
* The namespace. Assumed to be immumap and not null.
*/@NonnullfinalN namespace;/**
* The state. This is not final to allow exchanging the object for copy-on-write. Can be null.
*/@Nullable// 用户自定义类型S state;/**
* Link to another {@link StateMapEntry}. This is used to resolve collisions in the
* {@link CopyOnWriteStateMap} through chaining.
*/@Nullable// 链表StateMapEntry<K,N,S> next;/**
* The version of this {@link StateMapEntry}. This is meta data for copy-on-write of the map structure.
*/int entryVersion;/**
* The version of the state object in this entry. This is meta data for copy-on-write of the state object itself.
*/int stateVersion;/**
* The computed secondary hash for the composite of key and namespace.
*/finalint hash;}
1.1.2 Value 方法
classHeapValueState<K,N,V>extendsAbstractHeapState<K,N,V>implementsInternalValueState<K,N,V>{/** The current namespace, which the access methods will refer to. */protectedN currentNamespace;@OverridepublicVvalue(){// 上下文中获取namespacefinalV result = stateTable.get(currentNamespace);if(result ==null){returngetDefaultValue();}return result;}}
publicabstractclassStateTable<K,N,S>implementsStateSnapshotRestore,Iterable<StateEntry<K,N,S>>{publicSget(N namespace){returnget(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);}privateSget(K key,int keyGroupIndex,N namespace){checkKeyNamespacePreconditions(key, namespace);StateMap<K,N,S> stateMap =getMapForKeyGroup(keyGroupIndex);if(stateMap ==null){returnnull;}return stateMap.get(key, namespace);}}
publicclassCopyOnWriteStateMap<K,N,S>extendsStateMap<K,N,S>{@OverridepublicSget(K key,N namespace){// 根据key+namespace计算hashfinalint hash =computeHashForOperationAndDoIncrementalRehash(key, namespace);finalint requiredVersion = highestRequiredSnapshotVersion;// 最终存储StateMapEntry对象的数组finalStateMapEntry<K,N,S>[] tab =selectActiveTable(hash);int index = hash &(tab.length -1);// 遍历链表,然后根据key+namespace+hash寻找StateMapEntry对象,可以理解为hashmap中的重写eqauls方法for(StateMapEntry<K,N,S> e = tab[index]; e !=null; e = e.next){finalK eKey = e.key;finalN eNamespace = e.namespace;if((e.hash == hash && key.equals(eKey)&& namespace.equals(eNamespace))){// copy-on-write check for stateif(e.stateVersion < requiredVersion){// copy-on-write check for entryif(e.entryVersion < requiredVersion){
e =handleChainedEntryCopyOnWrite(tab, hash &(tab.length -1), e);}
e.stateVersion = stateMapVersion;
e.state =getStateSerializer().copy(e.state);}return e.state;}}returnnull;}}
2. checkPoint 源码分析
2.1 SourceStreamTask的checkpoint实现
2.1.1 JobManager端checkpoint调度
在JobManager端构建ExecutionGraph过程中(ExecutionGraphBuilder.buildGraph()方法),会调用ExecutionGraph.enableCheckpointing()方法,这个方法不管任务里有没有设置checkpoint都会调用的。在enableCheckpointing()方法里会创建CheckpointCoordinator,这是负责checkpoint的核心实现类,同时会给job添加一个监听器CheckpointCoordinatorDeActivator(只有设置了checkpoint才会注册这个监听器),CheckpointCoordinatorDeActivator负责checkpoint的启动和停止。源码如下:
//ExecutionGraphBuilder类publicstaticExecutionGraphbuildGraph(@NullableExecutionGraph prior,JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,SlotProvider slotProvider,ClassLoader classLoader,CheckpointRecoveryFactory recoveryFactory,Time rpcTimeout,RestartStrategy restartStrategy,MetricGroup metrics,int parallelismForAutoMax,BlobWriter blobWriter,Time allocationTimeout,Logger log)throwsJobExecutionException,JobException{...// configure the state checkpointing// 配置 checkpointing stateJobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();if(snapshotSettings !=null){...finalCheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();// 启动 checkpoint
executionGraph.enableCheckpointing(
chkConfig.getCheckpointInterval(),//checkpoint间隔
chkConfig.getCheckpointTimeout(),//checkpoint超时时间
chkConfig.getMinPauseBetweenCheckpoints(),//两次checkpoint间隔最短时间
chkConfig.getMaxConcurrentCheckpoints(),//同时可并发执行的checkpoint数量
chkConfig.getCheckpointRetentionPolicy(),//checkpoint的保留策略
triggerVertices,//需要触发checkpoint的任务,即所有的sourceTask
ackVertices,//需要确认checkpoint的任务,这里是所有的任务
confirmVertices,//checkpoint执行完需要进行commit的任务,也是所有的任务
hooks,
checkpointIdCounter,//checkpoint ID计数器,每执行一次就+1
completedCheckpoints,//CompletedCheckpointStore, 存放已完成的checkpoint
rootBackend,
checkpointStatsTracker);}...return executionGraph;}
//ExecutionGraph类publicvoidenableCheckpointing(long interval,long checkpointTimeout,long minPauseBetweenCheckpoints,int maxConcurrentCheckpoints,CheckpointRetentionPolicy retentionPolicy,List<ExecutionJobVertex> verticesToTrigger,List<ExecutionJobVertex> verticesToWaitFor,List<ExecutionJobVertex> verticesToCommitTo,List<MasterTriggerRestoreHook<?>> masterHooks,CheckpointIDCounter checkpointIDCounter,CompletedCheckpointStore checkpointStore,StateBackend checkpointStateBackend,CheckpointStatsTracker statsTracker){// simple sanity checkscheckArgument(interval >=10,"checkpoint interval must not be below 10ms");checkArgument(checkpointTimeout >=10,"checkpoint timeout must not be below 10ms");checkState(state ==JobStatus.CREATED,"Job must be in CREATED state");checkState(checkpointCoordinator ==null,"checkpointing already enabled");ExecutionVertex[] tasksToTrigger =collectExecutionVertices(verticesToTrigger);ExecutionVertex[] tasksToWaitFor =collectExecutionVertices(verticesToWaitFor);ExecutionVertex[] tasksToCommitTo =collectExecutionVertices(verticesToCommitTo);
checkpointStatsTracker =checkNotNull(statsTracker,"CheckpointStatsTracker");//创建checkpointCoordinator,这里的checkpointCoordinator不管有没有设置checkpoint都会创建// create the coordinator that triggers and commits checkpoints and holds the state
checkpointCoordinator =newCheckpointCoordinator(
jobInformation.getJobId(),
interval,
checkpointTimeout,
minPauseBetweenCheckpoints,
maxConcurrentCheckpoints,
retentionPolicy,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
checkpointIDCounter,
checkpointStore,
checkpointStateBackend,
ioExecutor,SharedStateRegistry.DEFAULT_FACTORY);// register the master hooks on the checkpoint coordinatorfor(MasterTriggerRestoreHook<?> hook : masterHooks){if(!checkpointCoordinator.addMasterHook(hook)){LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());}}
checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);//注册一个job状态监听器,在job任务出现改变时会进行一些相应的操作//注意如果没有设置checkpoint的话,则不会注册这个checkpoint监听器// interval of max long value indicates disable periodic checkpoint,// the CheckpointActivatorDeactivator should be created only if the interval is not max valueif(interval !=Long.MAX_VALUE){// the periodic checkpoint scheduler is activated and deactivated as a result of// job status changes (running -> on, all other states -> off)registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());}}//CheckpointCoordinator类publicJobStatusListenercreateActivatorDeactivator(){synchronized(lock){if(shutdown){thrownewIllegalArgumentException("Checkpoint coordinator is shut down");}if(jobStatusListener ==null){
jobStatusListener =newCheckpointCoordinatorDeActivator(this);}return jobStatusListener;}}
在JobManager端开始进行任务调度的时候,会对job的状态进行转换,由CREATED转成RUNNING,实现在transitionState()方法中,在这个过程中刚才设置的job监听器CheckpointCoordinatorDeActivator就开始启动checkpoint的定时任务了,调用链为ExecutionGraph.scheduleForExecution() -> transitionState() -> notifyJobStatusChange() -> CheckpointCoordinatorDeActivator.jobStatusChanges() -> CheckpointCoordinator.startCheckpointScheduler()源码如下
//ExecutionGraph类publicvoidscheduleForExecution()throwsJobException{assertRunningInJobMasterMainThread();finallong currentGlobalModVersion = globalModVersion;if(transitionState(JobStatus.CREATED,JobStatus.RUNNING)){...}else{thrownewIllegalStateException("Job may only be scheduled from state "+JobStatus.CREATED);}}privatebooleantransitionState(JobStatus current,JobStatus newState,Throwable error){assertRunningInJobMasterMainThread();...// now do the actual state transitionif(STATE_UPDATER.compareAndSet(this, current, newState)){LOG.info("Job {} ({}) switched from state {} to {}.",getJobName(),getJobID(), current, newState, error);
stateTimestamps[newState.ordinal()]=System.currentTimeMillis();notifyJobStatusChange(newState, error);returntrue;}else{returnfalse;}}privatevoidnotifyJobStatusChange(JobStatus newState,Throwable error){if(jobStatusListeners.size()>0){finallong timestamp =System.currentTimeMillis();finalThrowable serializedError = error ==null?null:newSerializedThrowable(error);for(JobStatusListener listener : jobStatusListeners){try{
listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);}catch(Throwable t){LOG.warn("Error while notifying JobStatusListener", t);}}}}//CheckpointCoordinatorDeActivator类publicvoidjobStatusChanges(JobID jobId,JobStatus newJobStatus,long timestamp,Throwable error){if(newJobStatus ==JobStatus.RUNNING){// start the checkpoint scheduler// 在这里启动 checkPoint 调度
coordinator.startCheckpointScheduler();}else{// anything else should stop the trigger for now
coordinator.stopCheckpointScheduler();}}
CheckpointCoordinator会部署一个定时任务,用于周期性的触发checkpoint,这个定时任务就是ScheduledTrigger类
publicvoidstartCheckpointScheduler(){synchronized(lock){if(shutdown){thrownewIllegalArgumentException("Checkpoint coordinator is shut down");}// make sure all prior timers are cancelledstopCheckpointScheduler();
periodicScheduling =true;long initialDelay =ThreadLocalRandom.current().nextLong(
minPauseBetweenCheckpointsNanos /1_000_000L, baseInterval +1L);// 启动调度
currentPeriodicTrigger = timer.scheduleAtFixedRate(newScheduledTrigger(), initialDelay, baseInterval,TimeUnit.MILLISECONDS);}}
2.1.2 ScheduledTrigger定时触发checkpoint
下面我们来看ScheduledTrigger的实现,主要就在CheckpointCoordinator.triggerCheckpoint()中:
- 在触发checkpoint之前先做一遍检查,检查当前正在处理的checkpoint是否超过设置的最大并发checkpoint数量,检查checkpoint的间隔是否达到设置的两次checkpoint的时间间隔,在都没有问题的情况下才可以触发checkpoint
- 检查需要触发的task是否都正常运行,即所有的source task
- 检查JobManager端需要确认checkpoint信息的task时候正常运行,这里就是所有运行task,即所有的task都需要向JobManager发送确认自己checkpoint的消息
- 正式开始触发checkpoint,创建一个PendingCheckpoint,包含了checkpointID和timestamp,向所有的source task去触发checkpoint。
//CheckpointCoordinator类privatefinalclassScheduledTriggerimplementsRunnable{@Overridepublicvoidrun(){try{// 触犯 checkPointtriggerCheckpoint(System.currentTimeMillis(),true);}catch(Exception e){LOG.error("Exception while triggering checkpoint for job {}.", job, e);}}}publicbooleantriggerCheckpoint(long timestamp,boolean isPeriodic){returntriggerCheckpoint(timestamp, checkpointProperties,null, isPeriodic).isSuccess();}publicCheckpointTriggerResulttriggerCheckpoint(long timestamp,CheckpointProperties props,@NullableString externalSavepointLocation,boolean isPeriodic){// 首先做一些检查,看能不能触发checkpoint,//1. 主要就是检查最大并发checkpoint数,checkpoint间隔时间// make some eager pre-checkssynchronized(lock){// abort if the coordinator has been shutdown in the meantimeif(shutdown){returnnewCheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);}// Don't allow periodic checkpoint if scheduling has been disabledif(isPeriodic &&!periodicScheduling){returnnewCheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);}// validate whether the checkpoint can be triggered, with respect to the limit of// concurrent checkpoints, and the minimum time between checkpoints.// these checks are not relevant for savepointsif(!props.forceCheckpoint()){// sanity check: there should never be more than one trigger request queuedif(triggerRequestQueued){LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);returnnewCheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);}// if too many checkpoints are currently in progress, we need to mark that a request is queuedif(pendingCheckpoints.size()>= maxConcurrentCheckpointAttempts){
triggerRequestQueued =true;if(currentPeriodicTrigger !=null){
currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger =null;}returnnewCheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);}// make sure the minimum interval between checkpoints has passedfinallong earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;finallong durationTillNextMillis =(earliestNext -System.nanoTime())/1_000_000;if(durationTillNextMillis >0){if(currentPeriodicTrigger !=null){
currentPeriodicTrigger.cancel(false);
currentPeriodicTrigger =null;}// Reassign the new trigger to the currentPeriodicTrigger
currentPeriodicTrigger = timer.scheduleAtFixedRate(newScheduledTrigger(),
durationTillNextMillis, baseInterval,TimeUnit.MILLISECONDS);returnnewCheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);}}}//检查需要触发的task是否都正常运行,即所有的source task// check if all tasks that we need to trigger are running.// if not, abort the checkpointExecution[] executions =newExecution[tasksToTrigger.length];for(int i =0; i < tasksToTrigger.length; i++){Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();if(ee ==null){LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job);returnnewCheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}elseif(ee.getState()==ExecutionState.RUNNING){
executions[i]= ee;}else{LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
job,ExecutionState.RUNNING,
ee.getState());returnnewCheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// 检查JobManager端需要确认checkpoint信息的task时候正常运行// next, check if all tasks that need to acknowledge the checkpoint are running.// if not, abort the checkpointMap<ExecutionAttemptID,ExecutionVertex> ackTasks =newHashMap<>(tasksToWaitFor.length);for(ExecutionVertex ev : tasksToWaitFor){Execution ee = ev.getCurrentExecutionAttempt();if(ee !=null){
ackTasks.put(ee.getAttemptId(), ev);}else{LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
ev.getTaskNameWithSubtaskIndex(),
job);returnnewCheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// we will actually trigger this checkpoint!// we lock with a special lock to make sure that trigger requests do not overtake each other.// this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'// may issue blocking operations. Using a different lock than the coordinator-wide lock,// we avoid blocking the processing of 'acknowledge/decline' messages during that time.synchronized(triggerLock){finalCheckpointStorageLocation checkpointStorageLocation;finallong checkpointID;try{// this must happen outside the coordinator-wide lock, because it communicates// with external services (in HA mode) and may block for a while.
checkpointID = checkpointIdCounter.getAndIncrement();
checkpointStorageLocation = props.isSavepoint()?
checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation):
checkpointStorage.initializeLocationForCheckpoint(checkpointID);}catch(Throwable t){...}finalPendingCheckpoint checkpoint =newPendingCheckpoint(
job,
checkpointID,
timestamp,
ackTasks,
props,
checkpointStorageLocation,
executor);...try{// re-acquire the coordinator-wide locksynchronized(lock){// since we released the lock in the meantime, we need to re-check// that the conditions still hold.//这里又重新做了一遍checkpoint检查...LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);
pendingCheckpoints.put(checkpointID, checkpoint);...}// end of lock scopefinalCheckpointOptions checkpointOptions =newCheckpointOptions(
props.getCheckpointType(),
checkpointStorageLocation.getLocationReference());// 给source task发消息触发checkpoint // send the messages to the tasks that trigger their checkpointfor(Execution execution: executions){
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}
numUnsuccessfulCheckpointsTriggers.set(0);returnnewCheckpointTriggerResult(checkpoint);}catch(Throwable t){...}}// end trigger lock}
Execution.triggerCheckpoint()就是远程调用TaskManager的triggerCheckpoint()方法
//Execution publicvoidtriggerCheckpoint(long checkpointId,long timestamp,CheckpointOptions checkpointOptions){finalLogicalSlot slot = assignedResource;if(slot !=null){finalTaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();// 通过 RPC接口触发 TaskManger.triggerCheckpoint
taskManagerGateway.triggerCheckpoint(attemptId,getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);}else{LOG.debug("The execution has no slot assigned. This indicates that the execution is "+"no longer running.");}}//RpcTaskManagerGateway类publicvoidtriggerCheckpoint(ExecutionAttemptID executionAttemptID,JobID jobId,long checkpointId,long timestamp,CheckpointOptions checkpointOptions){
taskExecutorGateway.triggerCheckpoint(
executionAttemptID,
checkpointId,
timestamp,
checkpointOptions);}
2.1.3 SourceStreamTask的Checkpoint执行
TaskManager的triggerCheckpoint()方法首先获取到source task(即SourceStreamTask),调用Task.triggerCheckpointBarrier(),triggerCheckpointBarrier()会异步的去执行一个独立线程,这个线程来负责source task的checkpoint执行。
//TaskExecutor类publicCompletableFuture<Acknowledge>triggerCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp,CheckpointOptions checkpointOptions){
log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);// 获取 taskfinalTask task = taskSlotTable.getTask(executionAttemptID);if(task !=null){// 对 task 执行 triggerCheckpointBarrier
task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);// 异步获取 ack 信息returnCompletableFuture.completedFuture(Acknowledge.get());}else{finalString message ="TaskManager received a checkpoint request for unknown task "+ executionAttemptID +'.';
log.debug(message);returnFutureUtils.completedExceptionally(newCheckpointException(message));}}//Task类publicvoidtriggerCheckpointBarrier(finallong checkpointID,long checkpointTimestamp,finalCheckpointOptions checkpointOptions){finalAbstractInvokable invokable =this.invokable;finalCheckpointMetaData checkpointMetaData =newCheckpointMetaData(checkpointID, checkpointTimestamp);if(executionState ==ExecutionState.RUNNING&& invokable !=null){// build a local closurefinalString taskName = taskNameWithSubtask;finalSafetyNetCloseableRegistry safetyNetCloseableRegistry =FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();Runnable runnable =newRunnable(){@Overridepublicvoidrun(){// set safety net from the task's context for checkpointing threadLOG.debug("Creating FileSystem stream leak safety net for {}",Thread.currentThread().getName());FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);try{//由一个单独的线程来调用执行,这个invokable在这里就是SourceStreamTaskboolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);if(!success){
checkpointResponder.declineCheckpoint(getJobID(),getExecutionId(), checkpointID,newCheckpointDeclineTaskNotReadyException(taskName));}}catch(Throwable t){if(getExecutionState()==ExecutionState.RUNNING){failExternally(newException("Error while triggering checkpoint "+ checkpointID +" for "+
taskNameWithSubtask, t));}else{LOG.debug("Encountered error while triggering checkpoint {} for "+"{} ({}) while being not in state running.", checkpointID,
taskNameWithSubtask, executionId, t);}}finally{FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);}}};//异步的执行这个线程executeAsyncCallRunnable(runnable,String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));}else{...}}
checkpoint的核心实现在StreamTask.performCheckpoint()方法中,该方法主要有三个步骤
1、在checkpoint之前做一些准备工作,通常情况下operator在这个阶段是不做什么操作的
2、立即向下游广播CheckpointBarrier,以便使下游的task能够及时的接收到CheckpointBarrier也开始进行checkpoint的操作
3、开始进行状态的快照,即checkpoint操作。
注意以上操作都是在同步代码块里进行的,获取到的这个lock锁就是用于checkpoint的锁,checkpoint线程和task任务线程用的是同一把锁,在进行performCheckpoint()时,task任务线程是不能够进行数据处理的
/SourceStreamTask类
publicbooleantriggerCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions)throwsException{if(!externallyInducedCheckpoints){returnsuper.triggerCheckpoint(checkpointMetaData, checkpointOptions);}else{// we do not trigger checkpoints here, we simply state whether we can trigger themsynchronized(getCheckpointLock()){returnisRunning();}}}//StreamTask类publicbooleantriggerCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions)throwsException{try{// No alignment if we inject a checkpointCheckpointMetrics checkpointMetrics =newCheckpointMetrics().setBytesBufferedInAlignment(0L).setAlignmentDurationNanos(0L);returnperformCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);}catch(Exception e){...}}privatebooleanperformCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics)throwsException{LOG.debug("Starting checkpoint ({}) {} on task {}",
checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(),getName());synchronized(lock){if(isRunning){// we can do a checkpoint// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignments// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.// The pre-barrier work should be nothing or minimal in the common case.// 准备 checkPoint
operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());// Step (2): Send the checkpoint barrier downstream// 发送 checkpoint barrier 到下游
operatorChain.broadcastCheckpointBarrier(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions);// Step (3): Take the state snapshot. This should be largely asynchronous, to not// impact progress of the streaming topologycheckpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);returntrue;}else{...returnfalse;}}}
首先来看第一步operatorChain.prepareSnapshotPreBarrier(),默认的实现其实什么也没做
//OperatorChain类publicvoidprepareSnapshotPreBarrier(long checkpointId)throwsException{// go forward through the operator chain and tell each operator// to prepare the checkpointfinalStreamOperator<?>[] operators =this.allOperators;for(int i = operators.length -1; i >=0;--i){finalStreamOperator<?> op = operators[i];if(op !=null){
op.prepareSnapshotPreBarrier(checkpointId);}}}//AbstractStreamOperator类publicvoidprepareSnapshotPreBarrier(long checkpointId)throwsException{// the default implementation does nothing and accepts the checkpoint// this is purely for subclasses to override}
广播CheckpointBarrier
接下来我们看看operatorChain.broadcastCheckpointBarrier(),即CheckpointBarrier的广播过程。
其核心实现就是在task的输出ResultPartition里,给下游的每个通道channel都发送一个CheckpointBarrier,下游有多少个任务,那就会有多少个通道,给每个任务都会发送一个CheckpointBarrier,这个过程叫做广播。
这个CheckpointBarrier广播的过程也叫CheckpointBarrier注入,因为CheckpointBarrier并不是由执行source task的线程来写入的,而是由checkpoint线程来写入的,并且做了同步,在写入CheckpointBarrier时source task线程是被阻塞的。
这个源码如下:
//OperatorChain类publicvoidbroadcastCheckpointBarrier(long id,long timestamp,CheckpointOptions checkpointOptions)throwsIOException{CheckpointBarrier barrier =newCheckpointBarrier(id, timestamp, checkpointOptions);for(RecordWriterOutput<?> streamOutput : streamOutputs){
streamOutput.broadcastEvent(barrier);}}//RecordWriterOutput类publicvoidbroadcastEvent(AbstractEvent event)throwsIOException{
recordWriter.broadcastEvent(event);}//RecordWriter类publicvoidbroadcastEvent(AbstractEvent event)throwsIOException{try(BufferConsumer eventBufferConsumer =EventSerializer.toBufferConsumer(event)){for(int targetChannel =0; targetChannel < numberOfChannels; targetChannel++){tryFinishCurrentBufferBuilder(targetChannel);//这里的targetPartition就是ResultPartition,向下游每一个通道都发送一个CheckpointBarrier // Retain the buffer so that it can be recycled by each channel of targetPartition
targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);}if(flushAlways){flushAll();}}}
异步执行checkpoint
接下来就是关键的部分checkpointState()方法,也就是checkpoint的执行了,这个过程也是一个异步的过程,不能因为checkpoint而影响了正常数据流的处理。
StreamTask里的每个operator都会创建一个OperatorSnapshotFutures,OperatorSnapshotFutures 里包含了执行operator状态checkpoint的FutureTask,然后由另一个单独的线程异步的来执行这些operator的实际checkpoint操作
//StreamTask类privatevoidcheckpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics)throwsException{CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
checkpointMetaData.getCheckpointId(),
checkpointOptions.getTargetLocation());CheckpointingOperation checkpointingOperation =newCheckpointingOperation(this,
checkpointMetaData,
checkpointOptions,
storage,
checkpointMetrics);
checkpointingOperation.executeCheckpointing();}
//StreamTask#CheckpointingOperation类publicvoidexecuteCheckpointing()throwsException{
startSyncPartNano =System.nanoTime();try{//对每个operator创建一个OperatorSnapshotFutures添加到operatorSnapshotsInProgressfor(StreamOperator<?> op : allOperators){checkpointStreamOperator(op);}
startAsyncPartNano =System.nanoTime();
checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano)/1_000_000);//由一个异步线程来执行实际的checkpoint操作,旨在不影响数据流的处理// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submitAsyncCheckpointRunnable asyncCheckpointRunnable =newAsyncCheckpointRunnable(
owner,
operatorSnapshotsInProgress,
checkpointMetaData,
checkpointMetrics,
startAsyncPartNano);
owner.cancelables.registerCloseable(asyncCheckpointRunnable);
owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);...}catch(Exception ex){...
owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);}}privatevoidcheckpointStreamOperator(StreamOperator<?> op)throwsException{if(null!= op){//这个snapshotInProgress包含了operator checkpoint的FutureTaskOperatorSnapshotFutures snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
checkpointOptions,
storageLocation);
operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);}}
OperatorSnapshotFutures
我们首先来看看针对每个operator创建的OperatorSnapshotFutures是什么,OperatorSnapshotFutures持有了一个operator的状态数据快照过程的RunnableFuture,什么意思呢,RunnableFuture首先是一个Runnable,也就是把operator快照的操作写到run()方法里,但并不去执行,等到需要的时候再去执行run()方法,也就是去执行真正的快照操作。其次RunnableFuture是一个Future,在执行run()后可以调用get()来获取future的结果,这里的结果就是SnapshotResult。
publicclassOperatorSnapshotFutures{@NonnullprivateRunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;@NonnullprivateRunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;@NonnullprivateRunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;@NonnullprivateRunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;
从snapshotState()方法中可以看到OperatorSnapshotFutures中的keyedStateRawFuture和operatorStateRawFuture都是DoneFuture,也就是一个已经完成的RunnableFuture,那么这个过程就是一个同步的。
operatorState和keyedState的snapshot过程被封装到一个RunnableFuture(这个过程比较复杂,这里先不介绍),并不会立即执行,之后调用RunnableFuture.run()才会真正的执行snapshot,在这里RunnableFuture就是一个FutureTask,这个过程是一个异步的,会被一个异步快照线程执行
//AbstractStreamOperator类publicfinalOperatorSnapshotFuturessnapshotState(long checkpointId,long timestamp,CheckpointOptions checkpointOptions,CheckpointStreamFactory factory)throwsException{KeyGroupRange keyGroupRange =null!= keyedStateBackend ?
keyedStateBackend.getKeyGroupRange():KeyGroupRange.EMPTY_KEY_GROUP_RANGE;OperatorSnapshotFutures snapshotInProgress =newOperatorSnapshotFutures();try(StateSnapshotContextSynchronousImpl snapshotContext =newStateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,getContainingTask().getCancelables())){//将snapshotContext进行快照,这个过程也会调用CheckpointedFunction.snapshotState()方法snapshotState(snapshotContext);//keyedStateRawFuture和operatorStateRawFuture都是DoneFuture,也就是一个已经完成的RunnableFuture
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());//operatorState和keyedState的snapshot过程被封装到一个RunnableFuture,并不会立即执行,//之后调用RunnableFuture.run()才会真正的执行snapshotif(null!= operatorStateBackend){
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}if(null!= keyedStateBackend){
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}}catch(Exception snapshotException){...}return snapshotInProgress;}
AsyncCheckpointRunnable
接下来我们看异步快照线程的实现,jobManagerTaskOperatorSubtaskStates是需要向JobManager ack的operator状态快照元数据信息,localTaskOperatorSubtaskStates是需要向TaskManager上报的operator状态快照元数据信息,localTaskOperatorSubtaskStates的作用是为状态数据保存一个备份,用户TaskManager快速的进行本地数据恢复,我们主要关心的还是向JobManager去Ack的TaskStateSnapshot。
1、针对每个operator创建一个OperatorSnapshotFinalizer,OperatorSnapshotFinalizer是状态数据快照的真正执行者,它真正的执行的operator的快照过程,也就是去执行那些FutureTask
2、状态快照执行完毕之后上JobManager上报checkpoint的信息
//AsyncCheckpointRunnable类publicvoidrun(){FileSystemSafetyNet.initializeSafetyNetForThread();try{TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =newTaskStateSnapshot(operatorSnapshotsInProgress.size());TaskStateSnapshot localTaskOperatorSubtaskStates =newTaskStateSnapshot(operatorSnapshotsInProgress.size());for(Map.Entry<OperatorID,OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()){OperatorID operatorID = entry.getKey();OperatorSnapshotFutures snapshotInProgress = entry.getValue();//这里真正的执行的operator的快照过程,会执行那些FutureTask// finalize the async part of all by executing all snapshot runnablesOperatorSnapshotFinalizer finalizedSnapshots =newOperatorSnapshotFinalizer(snapshotInProgress);
jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getJobManagerOwnedState());
localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
operatorID,
finalizedSnapshots.getTaskLocalState());}finallong asyncEndNanos =System.nanoTime();finallong asyncDurationMillis =(asyncEndNanos - asyncStartNanos)/1_000_000L;
checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);if(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,CheckpointingOperation.AsyncCheckpointState.COMPLETED)){//上JobManager上报checkpoint的信息reportCompletedSnapshotStates(
jobManagerTaskOperatorSubtaskStates,
localTaskOperatorSubtaskStates,
asyncDurationMillis);}else{LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
owner.getName(),
checkpointMetaData.getCheckpointId());}}catch(Exception e){handleExecutionException(e);}finally{
owner.cancelables.unregisterCloseable(this);FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();}}
可以看到在创建OperatorSnapshotFinalizer实例的时候就会去运行那些FutureTask,执行快照过程,比如将状态数据写到文件系统如HDFS等。
//OperatorSnapshotFinalizer类publicOperatorSnapshotFinalizer(@NonnullOperatorSnapshotFutures snapshotFutures)throwsExecutionException,InterruptedException{SnapshotResult<KeyedStateHandle> keyedManaged =FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());SnapshotResult<KeyedStateHandle> keyedRaw =FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture());SnapshotResult<OperatorStateHandle> operatorManaged =FutureUtils.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture());SnapshotResult<OperatorStateHandle> operatorRaw =FutureUtils.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture());
jobManagerOwnedState =newOperatorSubtaskState(
operatorManaged.getJobManagerOwnedSnapshot(),
operatorRaw.getJobManagerOwnedSnapshot(),
keyedManaged.getJobManagerOwnedSnapshot(),
keyedRaw.getJobManagerOwnedSnapshot());
taskLocalState =newOperatorSubtaskState(
operatorManaged.getTaskLocalSnapshot(),
operatorRaw.getTaskLocalSnapshot(),
keyedManaged.getTaskLocalSnapshot(),
keyedRaw.getTaskLocalSnapshot());}
2.1.4 Task上报checkpoint信息
ask在执行完checkpoint后会向JobManager上报checkpoint的元数据信息,最终会调用到CheckpointCoordinator.receiveAcknowledgeMessage()方法
//TaskStateManagerImpl类publicvoidreportTaskStateSnapshots(@NonnullCheckpointMetaData checkpointMetaData,@NonnullCheckpointMetrics checkpointMetrics,@NullableTaskStateSnapshot acknowledgedState,@NullableTaskStateSnapshot localState){long checkpointId = checkpointMetaData.getCheckpointId();
localStateStore.storeLocalState(checkpointId, localState);
checkpointResponder.acknowledgeCheckpoint(
jobId,
executionAttemptID,
checkpointId,
checkpointMetrics,
acknowledgedState);}//JobMaster类publicvoidacknowledgeCheckpoint(finalJobID jobID,finalExecutionAttemptID executionAttemptID,finallong checkpointId,finalCheckpointMetrics checkpointMetrics,finalTaskStateSnapshot checkpointState){finalCheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();finalAcknowledgeCheckpoint ackMessage =newAcknowledgeCheckpoint(
jobID,
executionAttemptID,
checkpointId,
checkpointMetrics,
checkpointState);if(checkpointCoordinator !=null){getRpcService().execute(()->{try{// 通过 RPC 接口调用 receiveAcknowledgeMessage
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);}catch(Throwable t){
log.warn("Error while processing checkpoint acknowledgement message", t);}});}else{...}}
CheckpointCoordinator最后会调用PendingCheckpoint.acknowledgeTask()方法,该方法就是将task上报的元数据信息(checkpoint的路径地址,状态数据大小等等)添加到PendingCheckpoint里,如果接收到了全部task上报的的Ack消息,就执行completePendingCheckpoint()
//CheckpointCoordinatorpublicbooleanreceiveAcknowledgeMessage(AcknowledgeCheckpoint message)throwsCheckpointException{...finallong checkpointId = message.getCheckpointId();synchronized(lock){// we need to check inside the lock for being shutdown as well, otherwise we// get races and invalid error log messagesif(shutdown){returnfalse;}finalPendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);if(checkpoint !=null&&!checkpoint.isDiscarded()){switch(checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())){caseSUCCESS:if(checkpoint.isFullyAcknowledged()){// 如果所有的 subTask 都返回 ack,此时就可以完成本次 checkPointcompletePendingCheckpoint(checkpoint);}break;...}returntrue;}...}}//PendingCheckpoint类publicTaskAcknowledgeResultacknowledgeTask(ExecutionAttemptID executionAttemptId,TaskStateSnapshot operatorSubtaskStates,CheckpointMetrics metrics){synchronized(lock){if(discarded){returnTaskAcknowledgeResult.DISCARDED;}finalExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);if(vertex ==null){if(acknowledgedTasks.contains(executionAttemptId)){returnTaskAcknowledgeResult.DUPLICATE;}else{returnTaskAcknowledgeResult.UNKNOWN;}}else{
acknowledgedTasks.add(executionAttemptId);}List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs();int subtaskIndex = vertex.getParallelSubtaskIndex();long ackTimestamp =System.currentTimeMillis();long stateSize =0L;//将上报的operatorSubtaskStates也就是每个Task的状态快照元数据添加到PendingCheckpoint的operatorStates里面if(operatorSubtaskStates !=null){for(OperatorID operatorID : operatorIDs){OperatorSubtaskState operatorSubtaskState =
operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);// if no real operatorSubtaskState was reported, we insert an empty stateif(operatorSubtaskState ==null){
operatorSubtaskState =newOperatorSubtaskState();}OperatorState operatorState = operatorStates.get(operatorID);if(operatorState ==null){
operatorState =newOperatorState(
operatorID,
vertex.getTotalNumberOfParallelSubtasks(),
vertex.getMaxParallelism());
operatorStates.put(operatorID, operatorState);}
operatorState.putState(subtaskIndex, operatorSubtaskState);
stateSize += operatorSubtaskState.getStateSize();}}++numAcknowledgedTasks;...returnTaskAcknowledgeResult.SUCCESS;}}
JobManager完成所有task的ack之后,会做以下操作:
- 将PendingCheckpoint 转成CompletedCheckpoint,标志着checkpoint过程完成,CompletedCheckpoint里包含了checkpoint的元数据信息,包括checkpoint的路径地址,状态数据大小等等,同时也会将元数据信息进行持久化,目录为 c h e c k p o i n t D i r / checkpointDir/ checkpointDir/uuid/chk-***/_metadata,也会把过期的checkpoint数据给删除
- 通知所有的task进行commit操作,一般来说,task的commit操作其实不需要做什么,但是像那种TwoPhaseCommitSinkFunction,比如FlinkKafkaProducer,就会进行一些事物的提交操作等。
privatevoidcompletePendingCheckpoint(PendingCheckpoint pendingCheckpoint)throwsCheckpointException{finallong checkpointId = pendingCheckpoint.getCheckpointId();finalCompletedCheckpoint completedCheckpoint;// As a first step to complete the checkpoint, we register its state with the registryMap<OperatorID,OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
sharedStateRegistry.registerAll(operatorStates.values());try{try{// 完成 checkpoint
completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();}catch(Exception e1){...}// the pending checkpoint must be discarded after the finalizationPreconditions.checkState(pendingCheckpoint.isDiscarded()&& completedCheckpoint !=null);try{
completedCheckpointStore.addCheckpoint(completedCheckpoint);}catch(Exception exception){...}}finally{
pendingCheckpoints.remove(checkpointId);triggerQueuedRequests();}rememberRecentCheckpointId(checkpointId);// drop those pending checkpoints that are at prior to the completed onedropSubsumedCheckpoints(checkpointId);// record the time when this was completed, to calculate// the 'min delay between checkpoints'
lastCheckpointCompletionNanos =System.nanoTime();...// send the "notify complete" call to all verticesfinallong timestamp = completedCheckpoint.getTimestamp();for(ExecutionVertex ev : tasksToCommitTo){Execution ee = ev.getCurrentExecutionAttempt();if(ee !=null){// task在接收到消息之后会调用Task.notifyCheckpointComplete()方法
ee.notifyCheckpointComplete(checkpointId, timestamp);}}}
2.1.5 JobManager通知Task进行commit
ask在接收到消息之后会调用Task.notifyCheckpointComplete()方法,最后会调用StreamOperator.notifyCheckpointComplete(),一般来说不做什么操作。但是像AbstractUdfStreamOperator这种的可能还会由一些其他操作
publicvoidnotifyCheckpointComplete(finallong checkpointID){finalAbstractInvokable invokable =this.invokable;if(executionState ==ExecutionState.RUNNING&& invokable !=null){Runnable runnable =newRunnable(){@Overridepublicvoidrun(){try{
invokable.notifyCheckpointComplete(checkpointID);
taskStateManager.notifyCheckpointComplete(checkpointID);}catch(Throwable t){...}}};executeAsyncCallRunnable(runnable,"Checkpoint Confirmation for "+
taskNameWithSubtask);}else{LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);}}
//StreamTask类publicvoidnotifyCheckpointComplete(long checkpointId)throwsException{synchronized(lock){if(isRunning){LOG.debug("Notification of complete checkpoint for task {}",getName());for(StreamOperator<?> operator : operatorChain.getAllOperators()){if(operator !=null){
operator.notifyCheckpointComplete(checkpointId);}}}else{LOG.debug("Ignoring notification of complete checkpoint for not-running task {}",getName());}}}//AbstractStreamOperator类publicvoidnotifyCheckpointComplete(long checkpointId)throwsException{if(keyedStateBackend !=null){
keyedStateBackend.notifyCheckpointComplete(checkpointId);}}//HeapKeyedStateBackend类publicvoidnotifyCheckpointComplete(long checkpointId){//Nothing to do}
AbstractUdfStreamOperator主要是针对用户自定义函数的operator,像StreamMap,StreamSource等等,如果用户定义的Function实现了CheckpointListener接口,则会进行额外的一些处理,例如FlinkKafkaConsumerBase会向kafka提交消费的offset,TwoPhaseCommitSinkFunction类会进行事务的提交,例如FlinkKafkaProducer。值得一提的是,TwoPhaseCommitSinkFunction是保证flink端到端exactly-once的保证
//AbstractUdfStreamOperatorpublicvoidnotifyCheckpointComplete(long checkpointId)throwsException{super.notifyCheckpointComplete(checkpointId);if(userFunction instanceofCheckpointListener){((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);}}
//FlinkKafkaConsumerBase类publicfinalvoidnotifyCheckpointComplete(long checkpointId)throwsException{if(!running){LOG.debug("notifyCheckpointComplete() called on closed source");return;}finalAbstractFetcher<?,?> fetcher =this.kafkaFetcher;if(fetcher ==null){LOG.debug("notifyCheckpointComplete() called on uninitialized source");return;}if(offsetCommitMode ==OffsetCommitMode.ON_CHECKPOINTS){// only one commit operation must be in progressif(LOG.isDebugEnabled()){LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint "+ checkpointId);}try{...@SuppressWarnings("unchecked")Map<KafkaTopicPartition,Long> offsets =(Map<KafkaTopicPartition,Long>) pendingOffsetsToCommit.remove(posInMap);...//向kafka提交offset
fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);}catch(Exception e){...}}}
//TwoPhaseCommitSinkFunction类publicfinalvoidnotifyCheckpointComplete(long checkpointId)throwsException{Iterator<Map.Entry<Long,TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();checkState(pendingTransactionIterator.hasNext(),"checkpoint completed, but no transaction pending");Throwable firstError =null;while(pendingTransactionIterator.hasNext()){...try{//提交事务commit(pendingTransaction.handle);}catch(Throwable t){...}...}...}//FlinkKafkaProducer类protectedvoidcommit(FlinkKafkaProducer.KafkaTransactionState transaction){if(transaction.isTransactional()){try{
transaction.producer.commitTransaction();}finally{recycleTransactionalProducer(transaction.producer);}}}
所有的task完成了notifyCheckpointComplete()方法后,一个完整的checkpoint流程就完成了
2.2 非SourceStreamTask的checkpoint实现
上述只说了source task的checkpoint实现,source task的checkpoint是由JobManager来触发的,那么非source task的checkpoint流程又是如何的呢?
上面说了source task会向下游广播发送CheckpointBarrier,那么下游的task就会接收到source task发送的CheckpointBarrier,checkpoint的起始位置也在接收到CheckpointBarrier。起点就在StreamTask中的CheckpointBarrierHandler.getNextNonBlocked()中
//StreamInputProcessor类publicbooleanprocessInput()throwsException{...while(true){...//finalBufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();...}}
CheckpointBarrierHandler会根据CheckpointingMode模式不同生成不同的Handler,如果是EXACTLY_ONCE,就会生成BarrierBuffer,会进行barrier对齐,BarrierBuffer中的CachedBufferBlocker是用来缓存barrier对齐时从被阻塞channel接收到的数据。如果CheckpointingMode是AT_LEAST_ONCE,那就会生成BarrierTracker,不会进行barrier对齐。
publicstaticCheckpointBarrierHandlercreateCheckpointBarrierHandler(StreamTask<?,?> checkpointedTask,CheckpointingMode checkpointMode,IOManager ioManager,InputGate inputGate,Configuration taskManagerConfig)throwsIOException{CheckpointBarrierHandler barrierHandler;if(checkpointMode ==CheckpointingMode.EXACTLY_ONCE){long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);if(!(maxAlign ==-1|| maxAlign >0)){thrownewIllegalConfigurationException(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()+" must be positive or -1 (infinite)");}if(taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)){
barrierHandler =newBarrierBuffer(inputGate,newCachedBufferBlocker(inputGate.getPageSize()), maxAlign);}else{
barrierHandler =newBarrierBuffer(inputGate,newBufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);}}elseif(checkpointMode ==CheckpointingMode.AT_LEAST_ONCE){
barrierHandler =newBarrierTracker(inputGate);}else{thrownewIllegalArgumentException("Unrecognized Checkpointing Mode: "+ checkpointMode);}if(checkpointedTask !=null){
barrierHandler.registerCheckpointEventHandler(checkpointedTask);}return barrierHandler;}
2.2.1 EXACTLY_ONCE下checkpoint的实现
下面看看CheckpointBarrierHandler的处理逻辑,首先看BarrierBuffer
1、首先获取到下一个BufferOrEvent,可能是从当前缓存中获取获取,如果缓存没了,就从inputGate获取
2、如果获取的事件是已经被设置为阻塞的channel的,就将其放到缓存中,如果channel已经接收到CheckpointBarrier,就会将其设置为block状态
3、如果接收的事件是CheckpointBarrier,处理接收到的CheckpointBarrier,如果接收到的是正常的数据就返回
//BarrierBuffer类publicBufferOrEventgetNextNonBlocked()throwsException{while(true){// process buffered BufferOrEvents before grabbing new onesOptional<BufferOrEvent> next;if(currentBuffered ==null){
next = inputGate.getNextBufferOrEvent();}else{
next =Optional.ofNullable(currentBuffered.getNext());if(!next.isPresent()){completeBufferedSequence();returngetNextNonBlocked();}}...BufferOrEvent bufferOrEvent = next.get();if(isBlocked(bufferOrEvent.getChannelIndex())){//如果数据是被阻塞的channel,就将其添加到缓存,如果channel已经接收到CheckpointBarrier,就会将其设置为block状态// if the channel is blocked, we just store the BufferOrEvent
bufferBlocker.add(bufferOrEvent);checkSizeLimit();}elseif(bufferOrEvent.isBuffer()){return bufferOrEvent;}elseif(bufferOrEvent.getEvent().getClass()==CheckpointBarrier.class){if(!endOfStream){//处理接收到的CheckpointBarrier// process barriers only if there is a chance of the checkpoint completingprocessBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());}}elseif(bufferOrEvent.getEvent().getClass()==CancelCheckpointMarker.class){processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());}else{if(bufferOrEvent.getEvent().getClass()==EndOfPartitionEvent.class){processEndOfPartition();}return bufferOrEvent;}}}//CachedBufferBlocker类publicvoidadd(BufferOrEvent boe){
bytesBlocked += pageSize;//currentBuffers是一个ArrayDeque<BufferOrEvent>
currentBuffers.add(boe);}
核心处理逻辑在processBarrier()方法中,实现如下:
1、如果只有一个channel,也就是说只有一个上游任务,就立马触发checkpoint
2、如果有多个上游任务,也就是有多个channel,那么会有以下几种情况的判断
1)、如果是首次接收到barrier,就开始进行barrier对齐,并将该channel设置为阻塞状态
2)、如果不是首次接收到barrier,但也不是最后一个barrier,就只给channel设置为阻塞状态
3)、如果在没完成当前checkpoint的时候又接收到了下一次checkpoint的barrier,就终止当前的checkpoint,重新开始新的一次checkpoint,这种情况并不常见
4)、如果接收到全部channel,即上游所有任务的barrier,就开始触发checkpoint,并取消所有channel的阻塞状态,开始处理那些被添加到缓存的事件
privatevoidprocessBarrier(CheckpointBarrier receivedBarrier,int channelIndex)throwsException{finallong barrierId = receivedBarrier.getId();// fast path for single channel casesif(totalNumberOfInputChannels ==1){if(barrierId > currentCheckpointId){// new checkpoint
currentCheckpointId = barrierId;notifyCheckpoint(receivedBarrier);}return;}// -- general code path for multiple input channels --if(numBarriersReceived >0){// this is only true if some alignment is already progress and was not canceledif(barrierId == currentCheckpointId){// regular caseonBarrier(channelIndex);}elseif(barrierId > currentCheckpointId){//这种情况就是上一个checkpoint还没完呢,就接收到下一个checkpoint的barrier,这种情况,就终止当前的checkpoint// we did not complete the current checkpoint, another started before...// let the task know we are not completing thisnotifyAbort(currentCheckpointId,newCheckpointDeclineSubsumedException(barrierId));// abort the current checkpointreleaseBlocksAndResetBarriers();// begin a the new checkpointbeginNewAlignment(barrierId, channelIndex);}else{// ignore trailing barrier from an earlier checkpoint (obsolete now)return;}}elseif(barrierId > currentCheckpointId){// first barrier of a new checkpointbeginNewAlignment(barrierId, channelIndex);}else{// either the current checkpoint was canceled (numBarriers == 0) or// this barrier is from an old subsumed checkpointreturn;}//接收到所有channel的barrier,开始触发checkpoint// check if we have all barriers - since canceled checkpoints always have zero barriers// this can only happen on a non canceled checkpointif(numBarriersReceived + numClosedChannels == totalNumberOfInputChannels){// actually trigger checkpoint...//释放所有channel的阻塞状态releaseBlocksAndResetBarriers();//开始触发checkpointnotifyCheckpoint(receivedBarrier);}}//开始barrier对齐privatevoidbeginNewAlignment(long checkpointId,int channelIndex)throwsIOException{
currentCheckpointId = checkpointId;onBarrier(channelIndex);
startOfAlignmentTimestamp =System.nanoTime();...}//onBarrier()就是将其channel设置为阻塞状态privatevoidonBarrier(int channelIndex)throwsIOException{if(!blockedChannels[channelIndex]){
blockedChannels[channelIndex]=true;
numBarriersReceived++;...}else{thrownewIOException("Stream corrupt: Repeated barrier for same checkpoint on input "+ channelIndex);}}
触发checkpoint的逻辑在notifyCheckpoint()方法中,toNotifyOnCheckpoint就是StreamTask,最后还是调用了StreamTask.triggerCheckpointOnBarrier(),就跟上述描述的source task的checkpoint逻辑是一样的了。
privatevoidnotifyCheckpoint(CheckpointBarrier checkpointBarrier)throwsException{if(toNotifyOnCheckpoint !=null){CheckpointMetaData checkpointMetaData =newCheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());long bytesBuffered = currentBuffered !=null? currentBuffered.size():0L;CheckpointMetrics checkpointMetrics =newCheckpointMetrics().setBytesBufferedInAlignment(bytesBuffered).setAlignmentDurationNanos(latestAlignmentDurationNanos);
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
checkpointMetaData,
checkpointBarrier.getCheckpointOptions(),
checkpointMetrics);}}//StreamTaskpublicvoidtriggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics)throwsException{try{performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);}catch(CancelTaskException e){...}}
2.2.2 AT_LEAST_ONCE下的checkpoint实现
//BarrierTracker类publicBufferOrEventgetNextNonBlocked()throwsException{while(true){Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();if(!next.isPresent()){// buffer or input exhaustedreturnnull;}BufferOrEvent bufferOrEvent = next.get();if(bufferOrEvent.isBuffer()){return bufferOrEvent;}elseif(bufferOrEvent.getEvent().getClass()==CheckpointBarrier.class){processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());}elseif(bufferOrEvent.getEvent().getClass()==CancelCheckpointMarker.class){processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());}else{// some other eventreturn bufferOrEvent;}}}
核心实现还是在processBarrier()中,实现如下:
1、如果一个channel,也即上游只有一个channel,就立即触发checkpoint
2、在多个channel的情况下,也即多个上游任务,有如下几种情况
1)、如果是首次接收到barrier,只更新一下当前的checkpointID
2)、不是首次,但也不是最后一次接收到barrier,基本什么也不做,就累加一个计数器
3)、如果接收到所有channel的barrier,就开始触发checkpoint
因为BarrierTracker在等待所有channel的barrier到来的时候还会处理其他已经到来barrier的channel的数据,所以在进行状态数据快照的时候就会造成多了一部分数据的状态,快照并不具备一致性。在程序从checkpoint进行恢复的时候会重复处理这部分数据,所以才会出现AT_LEAST_ONCE
//BarrierTracker类privatevoidprocessBarrier(CheckpointBarrier receivedBarrier,int channelIndex)throwsException{finallong barrierId = receivedBarrier.getId();// fast path for single channel trackersif(totalNumberOfInputChannels ==1){notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());return;}// general path for multiple input channelsif(LOG.isDebugEnabled()){LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);}// find the checkpoint barrier in the queue of pending barriersCheckpointBarrierCount cbc =null;int pos =0;for(CheckpointBarrierCount next : pendingCheckpoints){if(next.checkpointId == barrierId){
cbc = next;break;}
pos++;}if(cbc !=null){// add one to the count to that barrier and check for completionint numBarriersNew = cbc.incrementBarrierCount();//当接收到所有channel的barrier,开始触发checkpointif(numBarriersNew == totalNumberOfInputChannels){// checkpoint can be triggered (or is aborted and all barriers have been seen)// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)for(int i =0; i <= pos; i++){
pendingCheckpoints.pollFirst();}// notify the listenerif(!cbc.isAborted()){if(LOG.isDebugEnabled()){LOG.debug("Received all barriers for checkpoint {}", barrierId);}notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());}}}else{// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anywaysif(barrierId > latestPendingCheckpointID){
latestPendingCheckpointID = barrierId;
pendingCheckpoints.addLast(newCheckpointBarrierCount(barrierId));// make sure we do not track too many checkpointsif(pendingCheckpoints.size()>MAX_CHECKPOINTS_TO_TRACK){
pendingCheckpoints.pollFirst();}}}}
notifyCheckpoint()方法也是调用了StreamTask.triggerCheckpointOnBarrier(),跟BarrierBuffer是一样的。
//BarrierTracker类privatevoidnotifyCheckpoint(long checkpointId,long timestamp,CheckpointOptions checkpointOptions)throwsException{if(toNotifyOnCheckpoint !=null){CheckpointMetaData checkpointMetaData =newCheckpointMetaData(checkpointId, timestamp);CheckpointMetrics checkpointMetrics =newCheckpointMetrics().setBytesBufferedInAlignment(0L).setAlignmentDurationNanos(0L);
toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetrics);}}
上述的流程会一直执行到sink task,sink task执行完checkpoint,整个checkpoint就完成了。
2.3 2.4 checkPoint 状态数据存储
operatorState和keyedState的snapshot过程被封装到一个RunnableFuture,它并不会立即执行,之后会有一个异步的线程调用RunnableFuture.run()才会真正的执行snapshot。
//AbstractStreamOperator类publicfinalOperatorSnapshotFuturessnapshotState(long checkpointId,long timestamp,CheckpointOptions checkpointOptions,CheckpointStreamFactory factory)throwsException{KeyGroupRange keyGroupRange =null!= keyedStateBackend ?
keyedStateBackend.getKeyGroupRange():KeyGroupRange.EMPTY_KEY_GROUP_RANGE;OperatorSnapshotFutures snapshotInProgress =newOperatorSnapshotFutures();try(StateSnapshotContextSynchronousImpl snapshotContext =newStateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
factory,
keyGroupRange,getContainingTask().getCancelables())){snapshotState(snapshotContext);//keyedStateRawFuture和operatorStateRawFuture都是DoneFuture,也就是一个已经完成的RunnableFuture
snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());//operatorState和keyedState的snapshot过程被封装到一个RunnableFuture,并不会立即执行,//之后调用RunnableFuture.run()才会真正的执行snapshotif(null!= operatorStateBackend){
snapshotInProgress.setOperatorStateManagedFuture(
operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}if(null!= keyedStateBackend){
snapshotInProgress.setKeyedStateManagedFuture(
keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}}catch(Exception snapshotException){...}return snapshotInProgress;}
//HeapKeyedStateBackend类publicRunnableFuture<SnapshotResult<KeyedStateHandle>>snapshot(finallong checkpointId,finallong timestamp,@NonnullfinalCheckpointStreamFactory streamFactory,@NonnullCheckpointOptions checkpointOptions)throwsIOException{long startTime =System.currentTimeMillis();finalRunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunner =
snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);
snapshotStrategy.logSyncCompleted(streamFactory, startTime);return snapshotRunner;}
2.4.1 KeyedState快照过程
首先我们来看KeyedState的快照,调用的是snapshotStrategy.snapshot(),checkpoint的核心源码逻辑都在里面,这个snapshotStrategy就是HeapSnapshotStrategy,源码如下,主要实现的逻辑大致如下:
1、首先对状态数据进行快照拷贝,这里拷贝的仅仅是状态数据的引用,而不是对实例对象的拷贝
2、创建checkpoint的输出流CheckpointStateOutputStream,这个就是checkpoint的数据目录,但是并非是最终的数据目录,而是像
C
H
E
C
K
P
O
I
N
T
D
I
R
/
CHECKPOINT_DIR/
CHECKPOINTDIR/UID/chk-n这种的
3、上述2个步骤都是同步进行的,下面就是生成一个AsyncSnapshotCallable,callInternal()方法就是对checkpoint的持久化操作,这是可以异步进行的。大致步骤又分为:
1)、获取CheckpointStateOutputStream,写入状态数据的元数据信息,包括状态的名字、
序列化类型等
2)、对状态数据按照keyGroupId进行分组,依次对每个keyGoupId的状态数据进行写入
3)、封装状态数据checkpoint的元数据信息,StreamStateHandle里包含状态数据存储的路径和
大小,KeyGroupRangeOffsets里包含的是每个keyGoupId在文件中存储的offset位置
/HeapSnapshotStrategy类
publicRunnableFuture<SnapshotResult<KeyedStateHandle>>snapshot(long checkpointId,long timestamp,@NonnullCheckpointStreamFactory primaryStreamFactory,@NonnullCheckpointOptions checkpointOptions)throwsIOException{if(!hasRegisteredState()){returnDoneFuture.of(SnapshotResult.empty());}int numStates = registeredKVStates.size()+ registeredPQStates.size();Preconditions.checkState(numStates <=Short.MAX_VALUE,"Too many states: "+ numStates +". Currently at most "+Short.MAX_VALUE+" states are supported");finalList<StateMetaInfoSnapshot> metaInfoSnapshots =newArrayList<>(numStates);finalMap<StateUID,Integer> stateNamesToId =newHashMap<>(numStates);finalMap<StateUID,StateSnapshot> cowStateStableSnapshots =newHashMap<>(numStates);//对状态数据进行快照拷贝到cowStateStableSnapshots processSnapshotMetaInfoForAllStates(
metaInfoSnapshots,
cowStateStableSnapshots,
stateNamesToId,
registeredKVStates,StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);processSnapshotMetaInfoForAllStates(
metaInfoSnapshots,
cowStateStableSnapshots,
stateNamesToId,
registeredPQStates,//registeredPQStates对我们keyed状态数据没有影响StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);finalKeyedBackendSerializationProxy<K> serializationProxy =newKeyedBackendSerializationProxy<>(// TODO: this code assumes that writing a serializer is threadsafe, we should support to// get a serialized form already at state registration time in the futuregetKeySerializer(),
metaInfoSnapshots,!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator));//创建checkpoint的输出流CheckpointStateOutputStream finalSupplierWithException<CheckpointStreamWithResultProvider,Exception> checkpointStreamSupplier =
localRecoveryConfig.isLocalRecoveryEnabled()?()->CheckpointStreamWithResultProvider.createDuplicatingStream(
checkpointId,CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory,
localRecoveryConfig.getLocalStateDirectoryProvider()):()->CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE,
primaryStreamFactory);//--------------------------------------------------- this becomes the end of sync partfinalAsyncSnapshotCallable<SnapshotResult<KeyedStateHandle>> asyncSnapshotCallable =newAsyncSnapshotCallable<SnapshotResult<KeyedStateHandle>>(){@OverrideprotectedSnapshotResult<KeyedStateHandle>callInternal()throwsException{finalCheckpointStreamWithResultProvider streamWithResultProvider =
checkpointStreamSupplier.get();
snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);finalCheckpointStreamFactory.CheckpointStateOutputStream localStream =
streamWithResultProvider.getCheckpointOutputStream();finalDataOutputViewStreamWrapper outView =newDataOutputViewStreamWrapper(localStream);//写入状态数据的元数据信息,包括状态的名字、序列化类型等
serializationProxy.write(outView);finallong[] keyGroupRangeOffsets =newlong[keyGroupRange.getNumberOfKeyGroups()];//对状态数据按照keyGroupId进行分组,依次对每个keyGoupId的状态数据进行写入for(int keyGroupPos =0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups();++keyGroupPos){int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);//记录每个keyGoupId的状态数据的存储位置
keyGroupRangeOffsets[keyGroupPos]= localStream.getPos();
outView.writeInt(keyGroupId);for(Map.Entry<StateUID,StateSnapshot> stateSnapshot :
cowStateStableSnapshots.entrySet()){StateSnapshot.StateKeyGroupWriter partitionedSnapshot =//分组的实现
stateSnapshot.getValue().getKeyGroupWriter();try(OutputStream kgCompressionOut =
keyGroupCompressionDecorator.decorateWithCompression(localStream)){DataOutputViewStreamWrapper kgCompressionView =newDataOutputViewStreamWrapper(kgCompressionOut);
kgCompressionView.writeShort(stateNamesToId.get(stateSnapshot.getKey()));//写入每个keyGoupId的状态数据
partitionedSnapshot.writeStateInKeyGroup(kgCompressionView, keyGroupId);}// this will just close the outer compression stream}}//封装状态数据的checkpoint元数据信息if(snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)){KeyGroupRangeOffsets kgOffs =newKeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);SnapshotResult<StreamStateHandle> result =
streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();returnCheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result, kgOffs);}else{thrownewIOException("Stream already unregistered.");}}@OverrideprotectedvoidcleanupProvidedResources(){for(StateSnapshot tableSnapshot : cowStateStableSnapshots.values()){
tableSnapshot.release();}}@OverrideprotectedvoidlogAsyncSnapshotComplete(long startTime){if(snapshotStrategySynchronicityTrait.isAsynchronous()){logAsyncCompleted(primaryStreamFactory, startTime);}}};finalFutureTask<SnapshotResult<KeyedStateHandle>> task =
asyncSnapshotCallable.toAsyncSnapshotFutureTask(cancelStreamRegistry);finalizeSnapshotBeforeReturnHook(task);return task;}
状态数据的快照拷贝
拷贝的方法调用起始在processSnapshotMetaInfoForAllStates()方法,registeredStates就是我们注册的一些状态数据表,是一个Map,key就是状态数据的名字,比如下面的代码就注册了一个名字为sumValueState的状态表,value就是状态表,默认是一个CopyOnWriteStateTable
对每个状态表都会创建一个快照放到cowStateStableSnapshots中,快照的核心就是将CopyOnWriteStateTable中的状态数据拷贝到CopyOnWriteStateTableSnapshot中的一个数组snapshotData中,注意这里拷贝的仅仅是数据的引用而已,并不是对象实例,这个过程很快,也不会占用过多的内存。CopyOnWriteStateTable使用了写入时复制技术,状态数据对象本身的值并不会发生改变,而是用一个新对象来替换原来的老对象,这样即使快照和数据处理同时进行,快照还是指向的老对象,引用的对象本身也并不会发生变化。
privatevoidprocessSnapshotMetaInfoForAllStates(List<StateMetaInfoSnapshot> metaInfoSnapshots,Map<StateUID,StateSnapshot> cowStateStableSnapshots,Map<StateUID,Integer> stateNamesToId,Map<String,?extendsStateSnapshotRestore> registeredStates,StateMetaInfoSnapshot.BackendStateType stateType){for(Map.Entry<String,?extendsStateSnapshotRestore> kvState : registeredStates.entrySet()){finalStateUID stateUid =StateUID.of(kvState.getKey(), stateType);
stateNamesToId.put(stateUid, stateNamesToId.size());StateSnapshotRestore state = kvState.getValue();if(null!= state){finalStateSnapshot stateSnapshot = state.stateSnapshot();
metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
cowStateStableSnapshots.put(stateUid, stateSnapshot);}}}
//CopyOnWriteStateTable类publicCopyOnWriteStateTableSnapshot<K,N,S>stateSnapshot(){returnnewCopyOnWriteStateTableSnapshot<>(this);}//CopyOnWriteStateTableSnapshot类CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K,N,S> owningStateTable){super(owningStateTable);//对CopyOnWriteStateTable表中的状态数据进行一个拷贝this.snapshotData = owningStateTable.snapshotTableArrays();this.snapshotVersion = owningStateTable.getStateTableVersion();this.numberOfEntriesInSnapshotData = owningStateTable.size();// We create duplicates of the serializers for the async snapshot, because TypeSerializer// might be stateful and shared with the event processing thread.this.localKeySerializer = owningStateTable.keyContext.getKeySerializer().duplicate();this.localNamespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer().duplicate();this.localStateSerializer = owningStateTable.metaInfo.getStateSerializer().duplicate();this.partitionedStateTableSnapshot =null;this.stateSnapshotTransformer = owningStateTable.metaInfo.getStateSnapshotTransformFactory().createForDeserializedState().orElse(null);}//CopyOnWriteStateTable类StateTableEntry<K,N,S>[]snapshotTableArrays(){synchronized(snapshotVersions){// increase the table version for copy-on-write and register the snapshotif(++stateTableVersion <0){// this is just a safety net against overflows, but should never happen in practice (i.e., only after 2^31 snapshots)thrownewIllegalStateException("Version count overflow in CopyOnWriteStateTable. Enforcing restart.");}
highestRequiredSnapshotVersion = stateTableVersion;
snapshotVersions.add(highestRequiredSnapshotVersion);}StateTableEntry<K,N,S>[] table = primaryTable;finalint totalTableIndexSize = rehashIndex + table.length;finalint copiedArraySize =Math.max(totalTableIndexSize,size());finalStateTableEntry<K,N,S>[] copy =newStateTableEntry[copiedArraySize];if(isRehashing()){// consider both tables for the snapshot, the rehash index tells us which part of the two tables we needfinalint localRehashIndex = rehashIndex;finalint localCopyLength = table.length - localRehashIndex;// for the primary table, take every index >= rhIdx.System.arraycopy(table, localRehashIndex, copy,0, localCopyLength);// for the new table, we are sure that two regions contain all the entries:// [0, rhIdx[ AND [table.length / 2, table.length / 2 + rhIdx[
table = incrementalRehashTable;System.arraycopy(table,0, copy, localCopyLength, localRehashIndex);System.arraycopy(table, table.length >>>1, copy, localCopyLength + localRehashIndex, localRehashIndex);}else{// we only need to copy the primary tableSystem.arraycopy(table,0, copy,0, table.length);}return copy;}
2.4.2 创建CheckpointStateOutputStream
一般在正式环境我们设置的checkpoint存储都是基于文件系统的,即FsStateBackend, 而不是内存MemoryStateBackend,在FsStateBackend模式下,最终生成的CheckpointStateOutputStream是FsCheckpointStateOutputStream
staticCheckpointStreamWithResultProvidercreateSimpleStream(@NonnullCheckpointedStateScope checkpointedStateScope,@NonnullCheckpointStreamFactory primaryStreamFactory)throwsIOException{CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);returnnewCheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);}publicFsCheckpointStateOutputStreamcreateCheckpointStateOutputStream(CheckpointedStateScope scope)throwsIOException{Path target = scope ==CheckpointedStateScope.EXCLUSIVE? checkpointDirectory : sharedStateDirectory;int bufferSize =Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);// 创建 FsCheckpointStateOutputStreamreturnnewFsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);}
构建FsCheckpointStateOutputStream有几个参数:
basePath:checkpoint基础路径,形式是
c
h
e
c
k
p
o
i
n
t
d
i
r
/
{checkpoint_dir}/
checkpointdir/{uid}/chk-${checkpointid}
fs:文件系统FileSystem,这里是flink自己封装的,对于hdfs有HadoopFileSystem,本地文件有LocalFileSystem
bufferSize:写OutputStream时会先将数据写入到buffer,bufferSize控制buffer的大小
fileStateThreshold:文件状态的阈值,超过这个大小就会将状态数据写入到文件,否则就直接封装到StreamHandle里面,这样做是为了减少小文件,默认值为1024,即1kb
publicstaticfinalclassFsCheckpointStateOutputStreamextendsCheckpointStreamFactory.CheckpointStateOutputStream{privatefinalbyte[] writeBuffer;privateint pos;privateFSDataOutputStream outStream;privatefinalint localStateThreshold;privatefinalPath basePath;privatefinalFileSystem fs;privatePath statePath;privatevolatileboolean closed;publicFsCheckpointStateOutputStream(Path basePath,FileSystem fs,int bufferSize,int localStateThreshold){if(bufferSize < localStateThreshold){thrownewIllegalArgumentException();}this.basePath = basePath;this.fs = fs;this.writeBuffer =newbyte[bufferSize];this.localStateThreshold = localStateThreshold;}@Overridepublicvoidwrite(int b)throwsIOException{if(pos >= writeBuffer.length){flush();}
writeBuffer[pos++]=(byte) b;}...
此外,FsCheckpointStateOutputStream还有一些其他的成员:
writeBuffer:一个缓冲数组byte[],写OutputStream时会先将数据写入到buffer缓冲区,减少磁盘的io交互
pos:writeBuffer中数据写入的位置
outStream:checkpoint最终文件路径封装的FSDataOutputStream,通过这个OutputStream写入状态数据到最终文件里面
statePath:状态数据的文件路径,路径形式为
c
h
e
c
k
p
o
i
n
t
d
i
r
/
{checkpoint_dir}/
checkpointdir/{uid}/chk-
c
h
e
c
k
p
o
i
n
t
i
d
/
{checkpointid}/
checkpointid/{uuid}
2.4.3 写入状态数据元数据信息
调用serializationProxy.write(outView)写入状态数据的元数据信息,包括状态数据的名称,序列化类型等等
//KeyedBackendSerializationProxy类publicvoidwrite(DataOutputView out)throwsIOException{super.write(out);// write the compression format used to write each key-group
out.writeBoolean(usingKeyGroupCompression);TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, keySerializerSnapshot, keySerializer);// write individual registered keyed state metainfos
out.writeShort(stateMetaInfoSnapshots.size());for(StateMetaInfoSnapshot metaInfoSnapshot : stateMetaInfoSnapshots){StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(metaInfoSnapshot, out);}}
2.4.4 状态数据分组
状态数据分组的调用在CopyOnWriteStateTableSnapshot.getKeyGroupWriter()方法,是将状态数据按照keyGoupId进行分组,算法比较复杂一点。
//CopyOnWriteStateTableSnapshot类publicStateKeyGroupWritergetKeyGroupWriter(){if(partitionedStateTableSnapshot ==null){finalInternalKeyContext<K> keyContext = owningStateTable.keyContext;finalint numberOfKeyGroups = keyContext.getNumberOfKeyGroups();finalKeyGroupRange keyGroupRange = keyContext.getKeyGroupRange();//状态数据的持久化函数ElementWriterFunction<CopyOnWriteStateTable.StateTableEntry<K,N,S>> elementWriterFunction =(element, dov)->{
localNamespaceSerializer.serialize(element.namespace, dov);
localKeySerializer.serialize(element.key, dov);
localStateSerializer.serialize(element.state, dov);};//默认是StateTableKeyGroupPartitionerStateTableKeyGroupPartitioner<K,N,S> stateTableKeyGroupPartitioner = stateSnapshotTransformer !=null?newTransformingStateTableKeyGroupPartitioner<>(
snapshotData,
numberOfEntriesInSnapshotData,
keyGroupRange,
numberOfKeyGroups,
elementWriterFunction,
stateSnapshotTransformer):newStateTableKeyGroupPartitioner<>(
snapshotData,
numberOfEntriesInSnapshotData,
keyGroupRange,
numberOfKeyGroups,
elementWriterFunction);
partitionedStateTableSnapshot = stateTableKeyGroupPartitioner.partitionByKeyGroup();}return partitionedStateTableSnapshot;}
首先我们看一下StateTableKeyGroupPartitioner类的结构,StateTableKeyGroupPartitioner继承了KeyGroupPartitioner,没有其他新的成员:
publicclassKeyGroupPartitioner<T>{/**
* The input data for the partitioning. All elements to consider must be densely in the index interval
* [0, {@link #numberOfElements}[, without null values.
*/@Nonnull//原始的快照状态数据protectedfinalT[] partitioningSource;/**
* The output array for the partitioning. The size must be {@link #numberOfElements} (or bigger).
*/@Nonnull//最终经过分组后的状态数据protectedfinalT[] partitioningDestination;/** Total number of input elements. */@Nonnegative//状态数据总量protectedfinalint numberOfElements;/** The total number of key-groups in the job. */@Nonnegative//该operator总共的keyGoupId数量protectedfinalint totalKeyGroups;/** The key-group range for the input data, covered in this partitioning. */@Nonnull//该operator的KeyGroupRange,有起始和结束的keyGoupIdprotectedfinalKeyGroupRange keyGroupRange;/**
* This bookkeeping array is used to count the elements in each key-group. In a second step, it is transformed into
* a histogram by accumulation.
*/@Nonnull//直方图,用于标记每个keyGoupId的数据在最终状态数据数组中的位置protectedfinalint[] counterHistogram;/**
* This is a helper array that caches the key-group for each element, so we do not have to compute them twice.
*/@Nonnull//用来标记partitioningSource中每个元素对应的keyGoupId在counterHistogram中的位置protectedfinalint[] elementKeyGroups;/** Cached value of keyGroupRange#firstKeyGroup. */@Nonnegative//每个operator的起始keyGoupId,即keyGroupRange#firstKeyGroupprotectedfinalint firstKeyGroup;/** Function to extract the key from a given element. */@NonnullprotectedfinalKeyExtractorFunction<T> keyExtractorFunction;/** Function to write an element to a {@link DataOutputView}. */@NonnullprotectedfinalElementWriterFunction<T> elementWriterFunction;/** Cached result. */@NullableprotectedStateSnapshot.StateKeyGroupWriter computedResult;
状态数据的分组算法便是依赖于成员中各个数据结构。
我们再来看KeyGroupPartitioner.partitionByKeyGroup()方法,这个方法分为3步:
1、记录状态数据的keyGoupId
2、基于keyGoupId构建直方图,用于数据的分组
3、执行分组
//KeyGroupPartitioner类publicStateSnapshot.StateKeyGroupWriterpartitionByKeyGroup(){if(computedResult ==null){reportAllElementKeyGroups();int outputNumberOfElements =buildHistogramByAccumulatingCounts();executePartitioning(outputNumberOfElements);}return computedResult;}
记录状态数据的keyGoupId
这个操作的效果就是将原始的快照状态数据扁平化,添加到partitioningSource[]中,在elementKeyGroups[]中记录partitioningSource[]每个元素的keyGoupId在counterHistogram[]中的位置。counterHistogram[]中统计出每个keyGoupId有多少个元素
//StateTableKeyGroupPartitioner类protectedvoidreportAllElementKeyGroups(){// In this step we i) 'flatten' the linked list of entries to a second array and ii) report key-groups.int flattenIndex =0;for(CopyOnWriteStateTable.StateTableEntry<K,N,S> entry : partitioningDestination){while(null!= entry){
flattenIndex =tryAddToSource(flattenIndex, entry);
entry = entry.next;}}}/** Tries to append next entry to {@code partitioningSource} array snapshot and returns next index.*/inttryAddToSource(int currentIndex,CopyOnWriteStateTable.StateTableEntry<K,N,S> entry){finalint keyGroup =KeyGroupRangeAssignment.assignToKeyGroup(entry.key, totalKeyGroups);reportKeyGroupOfElementAtIndex(currentIndex, keyGroup);
partitioningSource[currentIndex]= entry;return currentIndex +1;}//KeyGroupPartitioner类protectedvoidreportKeyGroupOfElementAtIndex(int index,int keyGroup){finalint keyGroupIndex = keyGroup - firstKeyGroup;
elementKeyGroups[index]= keyGroupIndex;++counterHistogram[keyGroupIndex];}
构建直方图
构建counterHistogram[]直方图,构建的结果形如[0,0,1,1,1,3,3,6,8],counterHistogram[]的下标对应的是keyGoupId,数据对应的从firstGroupId开始到该keyGoupId一共有多个元素。
//KeyGroupPartitioner类privateintbuildHistogramByAccumulatingCounts(){int sum =0;for(int i =0; i < counterHistogram.length;++i){int currentSlotValue = counterHistogram[i];
counterHistogram[i]= sum;
sum += currentSlotValue;}return sum;}
进行数据分组
依赖于上述构建的直方图counterHistogram[]和elementKeyGroups[]进行分组计算,将数据添加到partitioningDestination[]中,最终partitioningDestination[]的数据就是分组好的数据,数据按照keyGoupId排序好,counterHistogram[]就记录着每个keyGoupId在partitioningDestination[]中存放的offset位置,当做一个索引。
//KeyGroupPartitioner类privatevoidexecutePartitioning(int outputNumberOfElements){// We repartition the entries by their pre-computed key-groups, using the histogram values as write indexesfor(int inIdx =0; inIdx < outputNumberOfElements;++inIdx){int effectiveKgIdx = elementKeyGroups[inIdx];int outIdx = counterHistogram[effectiveKgIdx]++;
partitioningDestination[outIdx]= partitioningSource[inIdx];}this.computedResult =newPartitioningResult<>(
elementWriterFunction,
firstKeyGroup,
counterHistogram,
partitioningDestination);}
我们可以看一下PartitioningResult的结构:
partitionedElements:按照keyGoupId分组好的状态数据,就是上述中的partitioningDestination
keyGroupOffsets:记录所有keyGoupId在partitionedElements[]中的的索引位置
firstKeyGroup:该operator的第一个keyGroupId,因为每个operator的起始keyGoupId并不都是0,例如有两个operator实例,共128个keyGoupId,那么第一个operator的起始keyGoupId是0,第二个operator的起始keyGoupId就是64,那么keyGoupId 65就对应了keyGroupOffsets[]的下标1
privatestaticclassPartitioningResult<T>implementsStateSnapshot.StateKeyGroupWriter{/**
* Function to write one element to a {@link DataOutputView}.
*/@NonnullprivatefinalElementWriterFunction<T> elementWriterFunction;/**
* The exclusive-end-offsets for all key-groups of the covered range for the partitioning. Exclusive-end-offset
* for key-group n is under keyGroupOffsets[n - firstKeyGroup].
*/@Nonnullprivatefinalint[] keyGroupOffsets;/**
* Array with elements that are partitioned w.r.t. the covered key-group range. The start offset for each
* key-group is in {@link #keyGroupOffsets}.
*/@NonnullprivatefinalT[] partitionedElements;/**
* The first key-group of the range covered in the partitioning.
*/@Nonnegativeprivatefinalint firstKeyGroup;PartitioningResult(@NonnullElementWriterFunction<T> elementWriterFunction,@Nonnegativeint firstKeyGroup,@Nonnullint[] keyGroupEndOffsets,@NonnullT[] partitionedElements){this.elementWriterFunction = elementWriterFunction;this.firstKeyGroup = firstKeyGroup;this.keyGroupOffsets = keyGroupEndOffsets;this.partitionedElements = partitionedElements;}
2.4.5 写入状态数据
状态数据的写入在partitionedSnapshot.writeStateInKeyGroup(kgCompressionView, keyGroupId),对每个keyGoupId的状态数据依次进行写操作。
可以看到就是通过keyGroupOffsets查找传进来的keyGoupId在partitionedElements[]中的元素位置,再把元素写到OutputStream。
//PartitioningResult类publicvoidwriteStateInKeyGroup(@NonnullDataOutputView dov,int keyGroupId)throwsIOException{int startOffset =getKeyGroupStartOffsetInclusive(keyGroupId);int endOffset =getKeyGroupEndOffsetExclusive(keyGroupId);// write number of mappings in key-group
dov.writeInt(endOffset - startOffset);// write mappingsfor(int i = startOffset; i < endOffset;++i){
elementWriterFunction.writeElement(partitionedElements[i], dov);}}privateintgetKeyGroupStartOffsetInclusive(int keyGroup){int idx = keyGroup - firstKeyGroup -1;return idx <0?0: keyGroupOffsets[idx];}@NonnegativeprivateintgetKeyGroupEndOffsetExclusive(int keyGroup){return keyGroupOffsets[keyGroup - firstKeyGroup];}
这里的elementWriterFunction就是如下,持久化了每个状态数据的namespace、key、state
(element, dov)->{
localNamespaceSerializer.serialize(element.namespace, dov);
localKeySerializer.serialize(element.key, dov);
localStateSerializer.serialize(element.state, dov);};
2.4.6 封装StreamStateHandle
将所有的keyGoupId状态数据都写完了,接下来就是封装checkpoint的元数据信息了。
keyGroupRangeOffsets记录了每个keyGoupId的状态数据在存储介质中的存储位置。
//new AsyncSnapshotCallable#callInternal()if(snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)){KeyGroupRangeOffsets kgOffs =newKeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);SnapshotResult<StreamStateHandle> result =
streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();returnCheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result, kgOffs);}else{thrownewIOException("Stream already unregistered.");}
//PrimaryStreamOnly类 implements CheckpointStreamWithResultProviderpublicSnapshotResult<StreamStateHandle>closeAndFinalizeCheckpointStreamResult()throwsIOException{returnSnapshotResult.of(outputStream.closeAndGetHandle());}
核心实现在FsCheckpointStateOutputStream.closeAndGetHandle()方法
1、如果总共写入的状态数据大小<阈值(默认1k),那就直接直接封装成ByteStreamStateHandle,状态数据也存放在StreamStateHandle中,以byte[]数组形式存在。
2、如果状态数据超过阈值,就写入到文件里面,调用flush(),不过这个flush并不是唯一的一次flush,在之前写状态数据的过程中,当数据超过了buffer的大小,也会调用flush()刷到磁盘
3、首次调用flush()方法会将完整的checkpoint文件路径创建出来,形如
c
h
e
c
k
p
o
i
n
t
d
i
r
/
{checkpoint_dir}/
checkpointdir/{uid}/chk-
c
h
e
c
k
p
o
i
n
t
i
d
/
{checkpointid}/
checkpointid/{uuid},然后将buffer的数据写到文件中。
4、将checkpoint到磁盘的状态数据信息封装成FileStateHandle,包含了状态数据的文件和大小信息
//FsCheckpointStateOutputStream类publicStreamStateHandlecloseAndGetHandle()throwsIOException{// check if there was nothing ever writtenif(outStream ==null&& pos ==0){returnnull;}synchronized(this){if(!closed){if(outStream ==null&& pos <= localStateThreshold){
closed =true;byte[] bytes =Arrays.copyOf(writeBuffer, pos);
pos = writeBuffer.length;returnnewByteStreamStateHandle(createStatePath().toString(), bytes);}else{try{flush();
pos = writeBuffer.length;long size =-1L;// make a best effort attempt to figure out the sizetry{
size = outStream.getPos();}catch(Exception ignored){}
outStream.close();returnnewFileStateHandle(statePath, size);}catch(Exception exception){...}finally{
closed =true;}}}else{thrownewIOException("Stream has already been closed and discarded.");}}}publicvoidflush()throwsIOException{if(!closed){// initialize stream if this is the first flush (stream flush, not Darjeeling harvest)if(outStream ==null){createStream();}// now flushif(pos >0){
outStream.write(writeBuffer,0, pos);
pos =0;}}else{thrownewIOException("closed");}}privatevoidcreateStream()throwsIOException{Exception latestException =null;for(int attempt =0; attempt <10; attempt++){try{OutputStreamAndPath streamAndPath =EntropyInjector.createEntropyAware(
fs,createStatePath(),WriteMode.NO_OVERWRITE);this.outStream = streamAndPath.stream();this.statePath = streamAndPath.path();return;}catch(Exception e){
latestException = e;}}thrownewIOException("Could not open output stream for state backend", latestException);}privatePathcreateStatePath(){returnnewPath(basePath,UUID.randomUUID().toString());}
返回的StreamStateHandle会被进一步封装成KeyedStateHandle,因为KeyedStateHandle中只记录了状态数据的文件路径,还需要keyGroupRangeOffsets记录每个keyGoupId的数据在文件中的存储位置。KeyedStateHandle就是既包含了StreamStateHandle,也包含了keyGroupRangeOffsets
staticSnapshotResult<KeyedStateHandle>toKeyedStateHandleSnapshotResult(@NonnullSnapshotResult<StreamStateHandle> snapshotResult,@NonnullKeyGroupRangeOffsets keyGroupRangeOffsets){StreamStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();if(jobManagerOwnedSnapshot !=null){KeyedStateHandle jmKeyedState =newKeyGroupsStateHandle(keyGroupRangeOffsets, jobManagerOwnedSnapshot);StreamStateHandle taskLocalSnapshot = snapshotResult.getTaskLocalSnapshot();if(taskLocalSnapshot !=null){KeyedStateHandle localKeyedState =newKeyGroupsStateHandle(keyGroupRangeOffsets, taskLocalSnapshot);returnSnapshotResult.withLocalState(jmKeyedState, localKeyedState);}else{returnSnapshotResult.of(jmKeyedState);}}else{returnSnapshotResult.empty();}}
到此,KeyedState的checkpoint过程就完成了。
2.4.7 OperatorState快照过程
OperatorState的快照过程和KeyedState的快照过程大体类似,但是比KeyedState要简单一些,没有状态数据的分组操作,OperatorState只有一种PartitionableListState类型,大致实现如下:
1、进行OperatorState和BroadcastState的状态数据拷贝,这个过程是同步的。BroadcastState我们这先暂时不进行讨论,而且它的快照过程和OperatorState也基本上是一模一样的,所以我们只看OperatorState就行了
2、创建AsyncSnapshotCallable,checkpoint行为在callInternal()方法中,实现如下:
1)、创建CheckpointStateOutputStream,用于写状态数据
2)、写状态数据的元数据信息,这一步和KeyedState一样,包括状态数据的名称,序列化信息等
3)、将状态数据写入到CheckpointStateOutputStream中
4)、返回写入的元数据信息,封装成OperatorStateHandle
上述2的步骤在默认情况下是异步执行的,也可以通过参数设置为同步执行,但是这会影响到正常数据的处理,KeyedState也可以通过参数设置来进行同步的快照处理
//DefaultOperatorStateBackendSnapshotStrategy类@OverridepublicRunnableFuture<SnapshotResult<OperatorStateHandle>>snapshot(finallong checkpointId,finallong timestamp,@NonnullfinalCheckpointStreamFactory streamFactory,@NonnullfinalCheckpointOptions checkpointOptions)throwsIOException{if(registeredOperatorStates.isEmpty()&& registeredBroadcastStates.isEmpty()){returnDoneFuture.of(SnapshotResult.empty());}finalMap<String,PartitionableListState<?>> registeredOperatorStatesDeepCopies =newHashMap<>(registeredOperatorStates.size());finalMap<String,BackendWritableBroadcastState<?,?>> registeredBroadcastStatesDeepCopies =newHashMap<>(registeredBroadcastStates.size());ClassLoader snapshotClassLoader =Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(userClassLoader);try{// eagerly create deep copies of the list and the broadcast states (if any)// in the synchronous phase, so that we can use them in the async writing.//进行状态数据的拷贝if(!registeredOperatorStates.isEmpty()){for(Map.Entry<String,PartitionableListState<?>> entry : registeredOperatorStates.entrySet()){PartitionableListState<?> listState = entry.getValue();if(null!= listState){
listState = listState.deepCopy();}
registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);}}if(!registeredBroadcastStates.isEmpty()){for(Map.Entry<String,BackendWritableBroadcastState<?,?>> entry : registeredBroadcastStates.entrySet()){BackendWritableBroadcastState<?,?> broadcastState = entry.getValue();if(null!= broadcastState){
broadcastState = broadcastState.deepCopy();}
registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);}}}finally{Thread.currentThread().setContextClassLoader(snapshotClassLoader);}AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =newAsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>(){@OverrideprotectedSnapshotResult<OperatorStateHandle>callInternal()throwsException{//创建CheckpointStateOutputStream CheckpointStreamFactory.CheckpointStateOutputStream localOut =
streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
snapshotCloseableRegistry.registerCloseable(localOut);// get the registered operator state infos ...List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =newArrayList<>(registeredOperatorStatesDeepCopies.size());for(Map.Entry<String,PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()){
operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());}// ... get the registered broadcast operator state infos ...List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =newArrayList<>(registeredBroadcastStatesDeepCopies.size());for(Map.Entry<String,BackendWritableBroadcastState<?,?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()){
broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());}// ... write them all in the checkpoint stream ...DataOutputView dov =newDataOutputViewStreamWrapper(localOut);OperatorBackendSerializationProxy backendSerializationProxy =newOperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);//状态数据元数据信息写入到checkpoint
backendSerializationProxy.write(dov);// ... and then go for the states ...// we put BOTH normal and broadcast state metadata hereint initialMapCapacity =
registeredOperatorStatesDeepCopies.size()+ registeredBroadcastStatesDeepCopies.size();finalMap<String,OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =newHashMap<>(initialMapCapacity);//状态数据写入 for(Map.Entry<String,PartitionableListState<?>> entry :
registeredOperatorStatesDeepCopies.entrySet()){PartitionableListState<?> value = entry.getValue();long[] partitionOffsets = value.write(localOut);OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),newOperatorStateHandle.StateMetaInfo(partitionOffsets, mode));}// ... and the broadcast states themselves ...for(Map.Entry<String,BackendWritableBroadcastState<?,?>> entry :
registeredBroadcastStatesDeepCopies.entrySet()){BackendWritableBroadcastState<?,?> value = entry.getValue();long[] partitionOffsets ={value.write(localOut)};OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
writtenStatesMetaData.put(
entry.getKey(),newOperatorStateHandle.StateMetaInfo(partitionOffsets, mode));}// ... and, finally, create the state handle.OperatorStateHandle retValue =null;//封装checkpoint元数据if(snapshotCloseableRegistry.unregisterCloseable(localOut)){StreamStateHandle stateHandle = localOut.closeAndGetHandle();if(stateHandle !=null){
retValue =newOperatorStreamStateHandle(writtenStatesMetaData, stateHandle);}returnSnapshotResult.of(retValue);}else{thrownewIOException("Stream was already unregistered.");}}@OverrideprotectedvoidcleanupProvidedResources(){// nothing to do}@OverrideprotectedvoidlogAsyncSnapshotComplete(long startTime){if(asynchronousSnapshots){logAsyncCompleted(streamFactory, startTime);}}};finalFutureTask<SnapshotResult<OperatorStateHandle>> task =
snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);//如果是同步checkpoint,则立即执行checkpoint过程if(!asynchronousSnapshots){
task.run();}return task;}
状态数据拷贝
状态数据的拷贝调用在PartitionableListState.deepCopy(),这个方法会创建一个新的PartitionableListState,然后将老的PartitionableListState里的List列表里的对象重新拷贝一份,不同于KeyedState的是,这里拷贝的对象并不一定就是引用,要分List里的对象是否是可变对象,如果不可变,那么就直接拷贝其引用;如果是可变对象,那么就要复制一个新对象,这将会消耗更多的内存,执行效率也会更慢。
//PartitionableListState类publicPartitionableListState<S>deepCopy(){returnnewPartitionableListState<>(this);}privatePartitionableListState(PartitionableListState<S> toCopy){this(toCo
py.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));}//ArrayListSerializer类publicArrayList<T>copy(ArrayList<T> from){if(elementSerializer.isImmutableType()){// fast track using memcopy for immutable typesreturnnewArrayList<>(from);}else{// element-wise deep copy for mutable typesArrayList<T> newList =newArrayList<>(from.size());for(int i =0; i < from.size(); i++){
newList.add(elementSerializer.copy(from.get(i)));}return newList;}}
创建CheckpointStateOutputStream
这里和KeyedState一样,也是创建的FsCheckpointStateOutputStream
//FsCheckpointStreamFactory类publicFsCheckpointStateOutputStreamcreateCheckpointStateOutputStream(CheckpointedStateScope scope)throwsIOException{Path target = scope ==CheckpointedStateScope.EXCLUSIVE? checkpointDirectory : sharedStateDirectory;int bufferSize =Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);returnnewFsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);}
写入状态数据元数据信息
这个过程和KeyedState类似,包括状态数据的名称,序列化信息等
//OperatorBackendSerializationProxy类publicvoidwrite(DataOutputView out)throwsIOException{super.write(out);writeStateMetaInfoSnapshots(operatorStateMetaInfoSnapshots, out);writeStateMetaInfoSnapshots(broadcastStateMetaInfoSnapshots, out);}privatevoidwriteStateMetaInfoSnapshots(List<StateMetaInfoSnapshot> snapshots,DataOutputView out)throwsIOException{
out.writeShort(snapshots.size());for(StateMetaInfoSnapshot state : snapshots){StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(state, out);}}
写入状态数据
状态数据的写入在PartitionableListState.write()方法中,实现就是将PartitionableListState中internalList列表中的元素依次写入到OutputStream中,根据internalList中元素类型不同将采用不同的序列化工具。写入时也是先写到buffer,再进行flush()。partitionOffsets[]记录了每个元素写入到流中的位置,并返回partitionOffsets[]
publiclong[]write(FSDataOutputStream out)throwsIOException{long[] partitionOffsets =newlong[internalList.size()];DataOutputView dov =newDataOutputViewStreamWrapper(out);for(int i =0; i < internalList.size();++i){S element = internalList.get(i);
partitionOffsets[i]= out.getPos();getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);}return partitionOffsets;}
版权归原作者 mn_kw 所有, 如有侵权,请联系我们删除。