0


一文弄懂FLink状态及checkpoint源码

一文弄懂Flink重要源码

1. Flink 状态源码

1.1 valueState源码

1.1.1 Update方法

classHeapValueState<K,N,V>extendsAbstractHeapState<K,N,V>implementsInternalValueState<K,N,V>{/** The current namespace, which the access methods will refer to. */protectedN currentNamespace;/** Map containing the actual key/value pairs. */protectedfinalStateTable<K,N, SV> stateTable;@Overridepublicvoidupdate(V value){if(value ==null){clear();return;}// 向state中传值,namespace从上下文获取
        stateTable.put(currentNamespace, value);}}
publicabstractclassStateTable<K,N,S>implementsStateSnapshotRestore,Iterable<StateEntry<K,N,S>>{// Maps the composite of active key and given namespace to the specified state.// 根据key+namespace找到对应的state// 其中,key从上下文中获取,入参中的state是用户需要保存的值publicvoidput(N namespace,S state){put(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace, state);}publicvoidput(K key,int keyGroup,N namespace,S state){checkKeyNamespacePreconditions(key, namespace);StateMap<K,N,S> stateMap =getMapForKeyGroup(keyGroup);// 将 state 根据 key 的值放入到 stateMap 中
            stateMap.put(key, namespace, state);}}
publicclassCopyOnWriteStateMap<K,N,S>extendsStateMap<K,N,S>{@Overridepublicvoidput(K key,N namespace,S value){// 获取当前key对应的StateMapEntry结构finalStateMapEntry<K,N,S> e =putEntry(key, namespace);// 获取到key+namespace对应的StateMapEntry对象之后,将value赋值进去
        e.state = value;
        e.stateVersion = stateMapVersion;}privateStateMapEntry<K,N,S>putEntry(K key,N namespace){// 根据key+namespace求hash值,然后再通过hash值得到位于数组tab中的StateMapEntry结构的准确下标finalint hash =computeHashForOperationAndDoIncrementalRehash(key, namespace);finalStateMapEntry<K,N,S>[] tab =selectActiveTable(hash);// 这里的位与运算与hashmap中的寻址方法一模一样,位与运算可以保证得到的index值小于数组长度int index = hash &(tab.length -1);for(StateMapEntry<K,N,S> e = tab[index]; e !=null; e = e.next){// StateMapEntry结构以链表的形式串联,因此找到index之后,还需要遍历链表,通过key+namespace找到目标StateMapEntry,是不是跟hashmap很像?if(e.hash == hash && key.equals(e.key)&& namespace.equals(e.namespace)){// copy-on-write check for entryif(e.entryVersion < highestRequiredSnapshotVersion){
                                e =handleChainedEntryCopyOnWrite(tab, index, e);}return e;}}++modCount;if(size()> threshold){doubleCapacity();}returnaddNewStateMapEntry(tab, key, namespace, hash);}}

保存key+namespace+value的最终结构是StateMapEntry,定义如下:

protectedstaticclassStateMapEntry<K,N,S>implementsStateEntry<K,N,S>{/**
     * The key. Assumed to be immumap and not null.
     */@NonnullfinalK key;/**
     * The namespace. Assumed to be immumap and not null.
     */@NonnullfinalN namespace;/**
     * The state. This is not final to allow exchanging the object for copy-on-write. Can be null.
     */@Nullable// 用户自定义类型S state;/**
     * Link to another {@link StateMapEntry}. This is used to resolve collisions in the
     * {@link CopyOnWriteStateMap} through chaining.
     */@Nullable// 链表StateMapEntry<K,N,S> next;/**
     * The version of this {@link StateMapEntry}. This is meta data for copy-on-write of the map structure.
     */int entryVersion;/**
     * The version of the state object in this entry. This is meta data for copy-on-write of the state object itself.
     */int stateVersion;/**
     * The computed secondary hash for the composite of key and namespace.
     */finalint hash;}

类层级结构图

1.1.2 Value 方法

classHeapValueState<K,N,V>extendsAbstractHeapState<K,N,V>implementsInternalValueState<K,N,V>{/** The current namespace, which the access methods will refer to. */protectedN currentNamespace;@OverridepublicVvalue(){// 上下文中获取namespacefinalV result = stateTable.get(currentNamespace);if(result ==null){returngetDefaultValue();}return result;}}
publicabstractclassStateTable<K,N,S>implementsStateSnapshotRestore,Iterable<StateEntry<K,N,S>>{publicSget(N namespace){returnget(keyContext.getCurrentKey(), keyContext.getCurrentKeyGroupIndex(), namespace);}privateSget(K key,int keyGroupIndex,N namespace){checkKeyNamespacePreconditions(key, namespace);StateMap<K,N,S> stateMap =getMapForKeyGroup(keyGroupIndex);if(stateMap ==null){returnnull;}return stateMap.get(key, namespace);}}
publicclassCopyOnWriteStateMap<K,N,S>extendsStateMap<K,N,S>{@OverridepublicSget(K key,N namespace){// 根据key+namespace计算hashfinalint hash =computeHashForOperationAndDoIncrementalRehash(key, namespace);finalint requiredVersion = highestRequiredSnapshotVersion;// 最终存储StateMapEntry对象的数组finalStateMapEntry<K,N,S>[] tab =selectActiveTable(hash);int index = hash &(tab.length -1);// 遍历链表,然后根据key+namespace+hash寻找StateMapEntry对象,可以理解为hashmap中的重写eqauls方法for(StateMapEntry<K,N,S> e = tab[index]; e !=null; e = e.next){finalK eKey = e.key;finalN eNamespace = e.namespace;if((e.hash == hash && key.equals(eKey)&& namespace.equals(eNamespace))){// copy-on-write check for stateif(e.stateVersion < requiredVersion){// copy-on-write check for entryif(e.entryVersion < requiredVersion){
                                        e =handleChainedEntryCopyOnWrite(tab, hash &(tab.length -1), e);}
                                e.stateVersion = stateMapVersion;
                                e.state =getStateSerializer().copy(e.state);}return e.state;}}returnnull;}}

2. checkPoint 源码分析

2.1 SourceStreamTask的checkpoint实现

2.1.1 JobManager端checkpoint调度

在JobManager端构建ExecutionGraph过程中(ExecutionGraphBuilder.buildGraph()方法),会调用ExecutionGraph.enableCheckpointing()方法,这个方法不管任务里有没有设置checkpoint都会调用的。在enableCheckpointing()方法里会创建CheckpointCoordinator,这是负责checkpoint的核心实现类,同时会给job添加一个监听器CheckpointCoordinatorDeActivator(只有设置了checkpoint才会注册这个监听器),CheckpointCoordinatorDeActivator负责checkpoint的启动和停止。源码如下:

//ExecutionGraphBuilder类publicstaticExecutionGraphbuildGraph(@NullableExecutionGraph prior,JobGraph jobGraph,Configuration jobManagerConfig,ScheduledExecutorService futureExecutor,Executor ioExecutor,SlotProvider slotProvider,ClassLoader classLoader,CheckpointRecoveryFactory recoveryFactory,Time rpcTimeout,RestartStrategy restartStrategy,MetricGroup metrics,int parallelismForAutoMax,BlobWriter blobWriter,Time allocationTimeout,Logger log)throwsJobExecutionException,JobException{...// configure the state checkpointing// 配置 checkpointing stateJobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();if(snapshotSettings !=null){...finalCheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();// 启动 checkpoint
      executionGraph.enableCheckpointing(
         chkConfig.getCheckpointInterval(),//checkpoint间隔
         chkConfig.getCheckpointTimeout(),//checkpoint超时时间
         chkConfig.getMinPauseBetweenCheckpoints(),//两次checkpoint间隔最短时间
         chkConfig.getMaxConcurrentCheckpoints(),//同时可并发执行的checkpoint数量
         chkConfig.getCheckpointRetentionPolicy(),//checkpoint的保留策略
         triggerVertices,//需要触发checkpoint的任务,即所有的sourceTask
         ackVertices,//需要确认checkpoint的任务,这里是所有的任务
         confirmVertices,//checkpoint执行完需要进行commit的任务,也是所有的任务
         hooks,
         checkpointIdCounter,//checkpoint ID计数器,每执行一次就+1
         completedCheckpoints,//CompletedCheckpointStore, 存放已完成的checkpoint
         rootBackend,
         checkpointStatsTracker);}...return executionGraph;}
//ExecutionGraph类publicvoidenableCheckpointing(long interval,long checkpointTimeout,long minPauseBetweenCheckpoints,int maxConcurrentCheckpoints,CheckpointRetentionPolicy retentionPolicy,List<ExecutionJobVertex> verticesToTrigger,List<ExecutionJobVertex> verticesToWaitFor,List<ExecutionJobVertex> verticesToCommitTo,List<MasterTriggerRestoreHook<?>> masterHooks,CheckpointIDCounter checkpointIDCounter,CompletedCheckpointStore checkpointStore,StateBackend checkpointStateBackend,CheckpointStatsTracker statsTracker){// simple sanity checkscheckArgument(interval >=10,"checkpoint interval must not be below 10ms");checkArgument(checkpointTimeout >=10,"checkpoint timeout must not be below 10ms");checkState(state ==JobStatus.CREATED,"Job must be in CREATED state");checkState(checkpointCoordinator ==null,"checkpointing already enabled");ExecutionVertex[] tasksToTrigger =collectExecutionVertices(verticesToTrigger);ExecutionVertex[] tasksToWaitFor =collectExecutionVertices(verticesToWaitFor);ExecutionVertex[] tasksToCommitTo =collectExecutionVertices(verticesToCommitTo);

   checkpointStatsTracker =checkNotNull(statsTracker,"CheckpointStatsTracker");//创建checkpointCoordinator,这里的checkpointCoordinator不管有没有设置checkpoint都会创建// create the coordinator that triggers and commits checkpoints and holds the state
   checkpointCoordinator =newCheckpointCoordinator(
      jobInformation.getJobId(),
      interval,
      checkpointTimeout,
      minPauseBetweenCheckpoints,
      maxConcurrentCheckpoints,
      retentionPolicy,
      tasksToTrigger,
      tasksToWaitFor,
      tasksToCommitTo,
      checkpointIDCounter,
      checkpointStore,
      checkpointStateBackend,
      ioExecutor,SharedStateRegistry.DEFAULT_FACTORY);// register the master hooks on the checkpoint coordinatorfor(MasterTriggerRestoreHook<?> hook : masterHooks){if(!checkpointCoordinator.addMasterHook(hook)){LOG.warn("Trying to register multiple checkpoint hooks with the name: {}", hook.getIdentifier());}}

   checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);//注册一个job状态监听器,在job任务出现改变时会进行一些相应的操作//注意如果没有设置checkpoint的话,则不会注册这个checkpoint监听器// interval of max long value indicates disable periodic checkpoint,// the CheckpointActivatorDeactivator should be created only if the interval is not max valueif(interval !=Long.MAX_VALUE){// the periodic checkpoint scheduler is activated and deactivated as a result of// job status changes (running -> on, all other states -> off)registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());}}//CheckpointCoordinator类publicJobStatusListenercreateActivatorDeactivator(){synchronized(lock){if(shutdown){thrownewIllegalArgumentException("Checkpoint coordinator is shut down");}if(jobStatusListener ==null){
         jobStatusListener =newCheckpointCoordinatorDeActivator(this);}return jobStatusListener;}}

在JobManager端开始进行任务调度的时候,会对job的状态进行转换,由CREATED转成RUNNING,实现在transitionState()方法中,在这个过程中刚才设置的job监听器CheckpointCoordinatorDeActivator就开始启动checkpoint的定时任务了,调用链为ExecutionGraph.scheduleForExecution() -> transitionState() -> notifyJobStatusChange() -> CheckpointCoordinatorDeActivator.jobStatusChanges() -> CheckpointCoordinator.startCheckpointScheduler()源码如下

//ExecutionGraph类publicvoidscheduleForExecution()throwsJobException{assertRunningInJobMasterMainThread();finallong currentGlobalModVersion = globalModVersion;if(transitionState(JobStatus.CREATED,JobStatus.RUNNING)){...}else{thrownewIllegalStateException("Job may only be scheduled from state "+JobStatus.CREATED);}}privatebooleantransitionState(JobStatus current,JobStatus newState,Throwable error){assertRunningInJobMasterMainThread();...// now do the actual state transitionif(STATE_UPDATER.compareAndSet(this, current, newState)){LOG.info("Job {} ({}) switched from state {} to {}.",getJobName(),getJobID(), current, newState, error);

      stateTimestamps[newState.ordinal()]=System.currentTimeMillis();notifyJobStatusChange(newState, error);returntrue;}else{returnfalse;}}privatevoidnotifyJobStatusChange(JobStatus newState,Throwable error){if(jobStatusListeners.size()>0){finallong timestamp =System.currentTimeMillis();finalThrowable serializedError = error ==null?null:newSerializedThrowable(error);for(JobStatusListener listener : jobStatusListeners){try{
            listener.jobStatusChanges(getJobID(), newState, timestamp, serializedError);}catch(Throwable t){LOG.warn("Error while notifying JobStatusListener", t);}}}}//CheckpointCoordinatorDeActivator类publicvoidjobStatusChanges(JobID jobId,JobStatus newJobStatus,long timestamp,Throwable error){if(newJobStatus ==JobStatus.RUNNING){// start the checkpoint scheduler// 在这里启动 checkPoint 调度
      coordinator.startCheckpointScheduler();}else{// anything else should stop the trigger for now
      coordinator.stopCheckpointScheduler();}}

CheckpointCoordinator会部署一个定时任务,用于周期性的触发checkpoint,这个定时任务就是ScheduledTrigger类

publicvoidstartCheckpointScheduler(){synchronized(lock){if(shutdown){thrownewIllegalArgumentException("Checkpoint coordinator is shut down");}// make sure all prior timers are cancelledstopCheckpointScheduler();

      periodicScheduling =true;long initialDelay =ThreadLocalRandom.current().nextLong(
         minPauseBetweenCheckpointsNanos /1_000_000L, baseInterval +1L);// 启动调度
      currentPeriodicTrigger = timer.scheduleAtFixedRate(newScheduledTrigger(), initialDelay, baseInterval,TimeUnit.MILLISECONDS);}}

2.1.2 ScheduledTrigger定时触发checkpoint

下面我们来看ScheduledTrigger的实现,主要就在CheckpointCoordinator.triggerCheckpoint()中:

  1. 在触发checkpoint之前先做一遍检查,检查当前正在处理的checkpoint是否超过设置的最大并发checkpoint数量,检查checkpoint的间隔是否达到设置的两次checkpoint的时间间隔,在都没有问题的情况下才可以触发checkpoint
  2. 检查需要触发的task是否都正常运行,即所有的source task
  3. 检查JobManager端需要确认checkpoint信息的task时候正常运行,这里就是所有运行task,即所有的task都需要向JobManager发送确认自己checkpoint的消息
  4. 正式开始触发checkpoint,创建一个PendingCheckpoint,包含了checkpointID和timestamp,向所有的source task去触发checkpoint。
//CheckpointCoordinator类privatefinalclassScheduledTriggerimplementsRunnable{@Overridepublicvoidrun(){try{// 触犯 checkPointtriggerCheckpoint(System.currentTimeMillis(),true);}catch(Exception e){LOG.error("Exception while triggering checkpoint for job {}.", job, e);}}}publicbooleantriggerCheckpoint(long timestamp,boolean isPeriodic){returntriggerCheckpoint(timestamp, checkpointProperties,null, isPeriodic).isSuccess();}publicCheckpointTriggerResulttriggerCheckpoint(long timestamp,CheckpointProperties props,@NullableString externalSavepointLocation,boolean isPeriodic){// 首先做一些检查,看能不能触发checkpoint,//1. 主要就是检查最大并发checkpoint数,checkpoint间隔时间// make some eager pre-checkssynchronized(lock){// abort if the coordinator has been shutdown in the meantimeif(shutdown){returnnewCheckpointTriggerResult(CheckpointDeclineReason.COORDINATOR_SHUTDOWN);}// Don't allow periodic checkpoint if scheduling has been disabledif(isPeriodic &&!periodicScheduling){returnnewCheckpointTriggerResult(CheckpointDeclineReason.PERIODIC_SCHEDULER_SHUTDOWN);}// validate whether the checkpoint can be triggered, with respect to the limit of// concurrent checkpoints, and the minimum time between checkpoints.// these checks are not relevant for savepointsif(!props.forceCheckpoint()){// sanity check: there should never be more than one trigger request queuedif(triggerRequestQueued){LOG.warn("Trying to trigger another checkpoint for job {} while one was queued already.", job);returnnewCheckpointTriggerResult(CheckpointDeclineReason.ALREADY_QUEUED);}// if too many checkpoints are currently in progress, we need to mark that a request is queuedif(pendingCheckpoints.size()>= maxConcurrentCheckpointAttempts){
            triggerRequestQueued =true;if(currentPeriodicTrigger !=null){
               currentPeriodicTrigger.cancel(false);
               currentPeriodicTrigger =null;}returnnewCheckpointTriggerResult(CheckpointDeclineReason.TOO_MANY_CONCURRENT_CHECKPOINTS);}// make sure the minimum interval between checkpoints has passedfinallong earliestNext = lastCheckpointCompletionNanos + minPauseBetweenCheckpointsNanos;finallong durationTillNextMillis =(earliestNext -System.nanoTime())/1_000_000;if(durationTillNextMillis >0){if(currentPeriodicTrigger !=null){
               currentPeriodicTrigger.cancel(false);
               currentPeriodicTrigger =null;}// Reassign the new trigger to the currentPeriodicTrigger
            currentPeriodicTrigger = timer.scheduleAtFixedRate(newScheduledTrigger(),
                  durationTillNextMillis, baseInterval,TimeUnit.MILLISECONDS);returnnewCheckpointTriggerResult(CheckpointDeclineReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS);}}}//检查需要触发的task是否都正常运行,即所有的source task// check if all tasks that we need to trigger are running.// if not, abort the checkpointExecution[] executions =newExecution[tasksToTrigger.length];for(int i =0; i < tasksToTrigger.length; i++){Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt();if(ee ==null){LOG.info("Checkpoint triggering task {} of job {} is not being executed at the moment. Aborting checkpoint.",
               tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
               job);returnnewCheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}elseif(ee.getState()==ExecutionState.RUNNING){
         executions[i]= ee;}else{LOG.info("Checkpoint triggering task {} of job {} is not in state {} but {} instead. Aborting checkpoint.",
               tasksToTrigger[i].getTaskNameWithSubtaskIndex(),
               job,ExecutionState.RUNNING,
               ee.getState());returnnewCheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// 检查JobManager端需要确认checkpoint信息的task时候正常运行// next, check if all tasks that need to acknowledge the checkpoint are running.// if not, abort the checkpointMap<ExecutionAttemptID,ExecutionVertex> ackTasks =newHashMap<>(tasksToWaitFor.length);for(ExecutionVertex ev : tasksToWaitFor){Execution ee = ev.getCurrentExecutionAttempt();if(ee !=null){
         ackTasks.put(ee.getAttemptId(), ev);}else{LOG.info("Checkpoint acknowledging task {} of job {} is not being executed at the moment. Aborting checkpoint.",
               ev.getTaskNameWithSubtaskIndex(),
               job);returnnewCheckpointTriggerResult(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING);}}// we will actually trigger this checkpoint!// we lock with a special lock to make sure that trigger requests do not overtake each other.// this is not done with the coordinator-wide lock, because the 'checkpointIdCounter'// may issue blocking operations. Using a different lock than the coordinator-wide lock,// we avoid blocking the processing of 'acknowledge/decline' messages during that time.synchronized(triggerLock){finalCheckpointStorageLocation checkpointStorageLocation;finallong checkpointID;try{// this must happen outside the coordinator-wide lock, because it communicates// with external services (in HA mode) and may block for a while.
         checkpointID = checkpointIdCounter.getAndIncrement();

         checkpointStorageLocation = props.isSavepoint()?
               checkpointStorage.initializeLocationForSavepoint(checkpointID, externalSavepointLocation):
               checkpointStorage.initializeLocationForCheckpoint(checkpointID);}catch(Throwable t){...}finalPendingCheckpoint checkpoint =newPendingCheckpoint(
         job,
         checkpointID,
         timestamp,
         ackTasks,
         props,
         checkpointStorageLocation,
         executor);...try{// re-acquire the coordinator-wide locksynchronized(lock){// since we released the lock in the meantime, we need to re-check// that the conditions still hold.//这里又重新做了一遍checkpoint检查...LOG.info("Triggering checkpoint {} @ {} for job {}.", checkpointID, timestamp, job);

            pendingCheckpoints.put(checkpointID, checkpoint);...}// end of lock scopefinalCheckpointOptions checkpointOptions =newCheckpointOptions(
               props.getCheckpointType(),
               checkpointStorageLocation.getLocationReference());// 给source task发消息触发checkpoint      // send the messages to the tasks that trigger their checkpointfor(Execution execution: executions){
            execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);}

         numUnsuccessfulCheckpointsTriggers.set(0);returnnewCheckpointTriggerResult(checkpoint);}catch(Throwable t){...}}// end trigger lock}

Execution.triggerCheckpoint()就是远程调用TaskManager的triggerCheckpoint()方法

//Execution publicvoidtriggerCheckpoint(long checkpointId,long timestamp,CheckpointOptions checkpointOptions){finalLogicalSlot slot = assignedResource;if(slot !=null){finalTaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();// 通过 RPC接口触发 TaskManger.triggerCheckpoint
      taskManagerGateway.triggerCheckpoint(attemptId,getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);}else{LOG.debug("The execution has no slot assigned. This indicates that the execution is "+"no longer running.");}}//RpcTaskManagerGateway类publicvoidtriggerCheckpoint(ExecutionAttemptID executionAttemptID,JobID jobId,long checkpointId,long timestamp,CheckpointOptions checkpointOptions){
   taskExecutorGateway.triggerCheckpoint(
      executionAttemptID,
      checkpointId,
      timestamp,
      checkpointOptions);}

2.1.3 SourceStreamTask的Checkpoint执行

TaskManager的triggerCheckpoint()方法首先获取到source task(即SourceStreamTask),调用Task.triggerCheckpointBarrier(),triggerCheckpointBarrier()会异步的去执行一个独立线程,这个线程来负责source task的checkpoint执行。

//TaskExecutor类publicCompletableFuture<Acknowledge>triggerCheckpoint(ExecutionAttemptID executionAttemptID,long checkpointId,long checkpointTimestamp,CheckpointOptions checkpointOptions){
   log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, checkpointTimestamp, executionAttemptID);// 获取 taskfinalTask task = taskSlotTable.getTask(executionAttemptID);if(task !=null){// 对 task 执行 triggerCheckpointBarrier
      task.triggerCheckpointBarrier(checkpointId, checkpointTimestamp, checkpointOptions);// 异步获取 ack 信息returnCompletableFuture.completedFuture(Acknowledge.get());}else{finalString message ="TaskManager received a checkpoint request for unknown task "+ executionAttemptID +'.';

      log.debug(message);returnFutureUtils.completedExceptionally(newCheckpointException(message));}}//Task类publicvoidtriggerCheckpointBarrier(finallong checkpointID,long checkpointTimestamp,finalCheckpointOptions checkpointOptions){finalAbstractInvokable invokable =this.invokable;finalCheckpointMetaData checkpointMetaData =newCheckpointMetaData(checkpointID, checkpointTimestamp);if(executionState ==ExecutionState.RUNNING&& invokable !=null){// build a local closurefinalString taskName = taskNameWithSubtask;finalSafetyNetCloseableRegistry safetyNetCloseableRegistry =FileSystemSafetyNet.getSafetyNetCloseableRegistryForThread();Runnable runnable =newRunnable(){@Overridepublicvoidrun(){// set safety net from the task's context for checkpointing threadLOG.debug("Creating FileSystem stream leak safety net for {}",Thread.currentThread().getName());FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);try{//由一个单独的线程来调用执行,这个invokable在这里就是SourceStreamTaskboolean success = invokable.triggerCheckpoint(checkpointMetaData, checkpointOptions);if(!success){
                  checkpointResponder.declineCheckpoint(getJobID(),getExecutionId(), checkpointID,newCheckpointDeclineTaskNotReadyException(taskName));}}catch(Throwable t){if(getExecutionState()==ExecutionState.RUNNING){failExternally(newException("Error while triggering checkpoint "+ checkpointID +" for "+
                        taskNameWithSubtask, t));}else{LOG.debug("Encountered error while triggering checkpoint {} for "+"{} ({}) while being not in state running.", checkpointID,
                     taskNameWithSubtask, executionId, t);}}finally{FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);}}};//异步的执行这个线程executeAsyncCallRunnable(runnable,String.format("Checkpoint Trigger for %s (%s).", taskNameWithSubtask, executionId));}else{...}}

checkpoint的核心实现在StreamTask.performCheckpoint()方法中,该方法主要有三个步骤
1、在checkpoint之前做一些准备工作,通常情况下operator在这个阶段是不做什么操作的
2、立即向下游广播CheckpointBarrier,以便使下游的task能够及时的接收到CheckpointBarrier也开始进行checkpoint的操作
3、开始进行状态的快照,即checkpoint操作。
注意以上操作都是在同步代码块里进行的,获取到的这个lock锁就是用于checkpoint的锁,checkpoint线程和task任务线程用的是同一把锁,在进行performCheckpoint()时,task任务线程是不能够进行数据处理的

/SourceStreamTask类
publicbooleantriggerCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions)throwsException{if(!externallyInducedCheckpoints){returnsuper.triggerCheckpoint(checkpointMetaData, checkpointOptions);}else{// we do not trigger checkpoints here, we simply state whether we can trigger themsynchronized(getCheckpointLock()){returnisRunning();}}}//StreamTask类publicbooleantriggerCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions)throwsException{try{// No alignment if we inject a checkpointCheckpointMetrics checkpointMetrics =newCheckpointMetrics().setBytesBufferedInAlignment(0L).setAlignmentDurationNanos(0L);returnperformCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);}catch(Exception e){...}}privatebooleanperformCheckpoint(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics)throwsException{LOG.debug("Starting checkpoint ({}) {} on task {}",
      checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType(),getName());synchronized(lock){if(isRunning){// we can do a checkpoint// All of the following steps happen as an atomic step from the perspective of barriers and// records/watermarks/timers/callbacks.// We generally try to emit the checkpoint barrier as soon as possible to not affect downstream// checkpoint alignments// Step (1): Prepare the checkpoint, allow operators to do some pre-barrier work.//           The pre-barrier work should be nothing or minimal in the common case.// 准备 checkPoint 
         operatorChain.prepareSnapshotPreBarrier(checkpointMetaData.getCheckpointId());// Step (2): Send the checkpoint barrier downstream// 发送 checkpoint barrier 到下游
         operatorChain.broadcastCheckpointBarrier(
               checkpointMetaData.getCheckpointId(),
               checkpointMetaData.getTimestamp(),
               checkpointOptions);// Step (3): Take the state snapshot. This should be largely asynchronous, to not//           impact progress of the streaming topologycheckpointState(checkpointMetaData, checkpointOptions, checkpointMetrics);returntrue;}else{...returnfalse;}}}

首先来看第一步operatorChain.prepareSnapshotPreBarrier(),默认的实现其实什么也没做

//OperatorChain类publicvoidprepareSnapshotPreBarrier(long checkpointId)throwsException{// go forward through the operator chain and tell each operator// to prepare the checkpointfinalStreamOperator<?>[] operators =this.allOperators;for(int i = operators.length -1; i >=0;--i){finalStreamOperator<?> op = operators[i];if(op !=null){
         op.prepareSnapshotPreBarrier(checkpointId);}}}//AbstractStreamOperator类publicvoidprepareSnapshotPreBarrier(long checkpointId)throwsException{// the default implementation does nothing and accepts the checkpoint// this is purely for subclasses to override}

广播CheckpointBarrier
接下来我们看看operatorChain.broadcastCheckpointBarrier(),即CheckpointBarrier的广播过程。
其核心实现就是在task的输出ResultPartition里,给下游的每个通道channel都发送一个CheckpointBarrier,下游有多少个任务,那就会有多少个通道,给每个任务都会发送一个CheckpointBarrier,这个过程叫做广播。
这个CheckpointBarrier广播的过程也叫CheckpointBarrier注入,因为CheckpointBarrier并不是由执行source task的线程来写入的,而是由checkpoint线程来写入的,并且做了同步,在写入CheckpointBarrier时source task线程是被阻塞的。
这个源码如下:

//OperatorChain类publicvoidbroadcastCheckpointBarrier(long id,long timestamp,CheckpointOptions checkpointOptions)throwsIOException{CheckpointBarrier barrier =newCheckpointBarrier(id, timestamp, checkpointOptions);for(RecordWriterOutput<?> streamOutput : streamOutputs){
      streamOutput.broadcastEvent(barrier);}}//RecordWriterOutput类publicvoidbroadcastEvent(AbstractEvent event)throwsIOException{
   recordWriter.broadcastEvent(event);}//RecordWriter类publicvoidbroadcastEvent(AbstractEvent event)throwsIOException{try(BufferConsumer eventBufferConsumer =EventSerializer.toBufferConsumer(event)){for(int targetChannel =0; targetChannel < numberOfChannels; targetChannel++){tryFinishCurrentBufferBuilder(targetChannel);//这里的targetPartition就是ResultPartition,向下游每一个通道都发送一个CheckpointBarrier // Retain the buffer so that it can be recycled by each channel of targetPartition
         targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);}if(flushAlways){flushAll();}}}

异步执行checkpoint
接下来就是关键的部分checkpointState()方法,也就是checkpoint的执行了,这个过程也是一个异步的过程,不能因为checkpoint而影响了正常数据流的处理。
StreamTask里的每个operator都会创建一个OperatorSnapshotFutures,OperatorSnapshotFutures 里包含了执行operator状态checkpoint的FutureTask,然后由另一个单独的线程异步的来执行这些operator的实际checkpoint操作

//StreamTask类privatevoidcheckpointState(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics)throwsException{CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
         checkpointMetaData.getCheckpointId(),
         checkpointOptions.getTargetLocation());CheckpointingOperation checkpointingOperation =newCheckpointingOperation(this,
      checkpointMetaData,
      checkpointOptions,
      storage,
      checkpointMetrics);

   checkpointingOperation.executeCheckpointing();}
//StreamTask#CheckpointingOperation类publicvoidexecuteCheckpointing()throwsException{
   startSyncPartNano =System.nanoTime();try{//对每个operator创建一个OperatorSnapshotFutures添加到operatorSnapshotsInProgressfor(StreamOperator<?> op : allOperators){checkpointStreamOperator(op);}

      startAsyncPartNano =System.nanoTime();

      checkpointMetrics.setSyncDurationMillis((startAsyncPartNano - startSyncPartNano)/1_000_000);//由一个异步线程来执行实际的checkpoint操作,旨在不影响数据流的处理// we are transferring ownership over snapshotInProgressList for cleanup to the thread, active on submitAsyncCheckpointRunnable asyncCheckpointRunnable =newAsyncCheckpointRunnable(
         owner,
         operatorSnapshotsInProgress,
         checkpointMetaData,
         checkpointMetrics,
         startAsyncPartNano);

      owner.cancelables.registerCloseable(asyncCheckpointRunnable);
      owner.asyncOperationsThreadPool.submit(asyncCheckpointRunnable);...}catch(Exception ex){...
      owner.synchronousCheckpointExceptionHandler.tryHandleCheckpointException(checkpointMetaData, ex);}}privatevoidcheckpointStreamOperator(StreamOperator<?> op)throwsException{if(null!= op){//这个snapshotInProgress包含了operator checkpoint的FutureTaskOperatorSnapshotFutures snapshotInProgress = op.snapshotState(
            checkpointMetaData.getCheckpointId(),
            checkpointMetaData.getTimestamp(),
            checkpointOptions,
            storageLocation);
      operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);}}

OperatorSnapshotFutures
我们首先来看看针对每个operator创建的OperatorSnapshotFutures是什么,OperatorSnapshotFutures持有了一个operator的状态数据快照过程的RunnableFuture,什么意思呢,RunnableFuture首先是一个Runnable,也就是把operator快照的操作写到run()方法里,但并不去执行,等到需要的时候再去执行run()方法,也就是去执行真正的快照操作。其次RunnableFuture是一个Future,在执行run()后可以调用get()来获取future的结果,这里的结果就是SnapshotResult。

publicclassOperatorSnapshotFutures{@NonnullprivateRunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateManagedFuture;@NonnullprivateRunnableFuture<SnapshotResult<KeyedStateHandle>> keyedStateRawFuture;@NonnullprivateRunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateManagedFuture;@NonnullprivateRunnableFuture<SnapshotResult<OperatorStateHandle>> operatorStateRawFuture;

从snapshotState()方法中可以看到OperatorSnapshotFutures中的keyedStateRawFuture和operatorStateRawFuture都是DoneFuture,也就是一个已经完成的RunnableFuture,那么这个过程就是一个同步的。
operatorState和keyedState的snapshot过程被封装到一个RunnableFuture(这个过程比较复杂,这里先不介绍),并不会立即执行,之后调用RunnableFuture.run()才会真正的执行snapshot,在这里RunnableFuture就是一个FutureTask,这个过程是一个异步的,会被一个异步快照线程执行

//AbstractStreamOperator类publicfinalOperatorSnapshotFuturessnapshotState(long checkpointId,long timestamp,CheckpointOptions checkpointOptions,CheckpointStreamFactory factory)throwsException{KeyGroupRange keyGroupRange =null!= keyedStateBackend ?
         keyedStateBackend.getKeyGroupRange():KeyGroupRange.EMPTY_KEY_GROUP_RANGE;OperatorSnapshotFutures snapshotInProgress =newOperatorSnapshotFutures();try(StateSnapshotContextSynchronousImpl snapshotContext =newStateSnapshotContextSynchronousImpl(
         checkpointId,
         timestamp,
         factory,
         keyGroupRange,getContainingTask().getCancelables())){//将snapshotContext进行快照,这个过程也会调用CheckpointedFunction.snapshotState()方法snapshotState(snapshotContext);//keyedStateRawFuture和operatorStateRawFuture都是DoneFuture,也就是一个已经完成的RunnableFuture
      snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
      snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());//operatorState和keyedState的snapshot过程被封装到一个RunnableFuture,并不会立即执行,//之后调用RunnableFuture.run()才会真正的执行snapshotif(null!= operatorStateBackend){
         snapshotInProgress.setOperatorStateManagedFuture(
            operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}if(null!= keyedStateBackend){
         snapshotInProgress.setKeyedStateManagedFuture(
            keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}}catch(Exception snapshotException){...}return snapshotInProgress;}

AsyncCheckpointRunnable
接下来我们看异步快照线程的实现,jobManagerTaskOperatorSubtaskStates是需要向JobManager ack的operator状态快照元数据信息,localTaskOperatorSubtaskStates是需要向TaskManager上报的operator状态快照元数据信息,localTaskOperatorSubtaskStates的作用是为状态数据保存一个备份,用户TaskManager快速的进行本地数据恢复,我们主要关心的还是向JobManager去Ack的TaskStateSnapshot。
1、针对每个operator创建一个OperatorSnapshotFinalizer,OperatorSnapshotFinalizer是状态数据快照的真正执行者,它真正的执行的operator的快照过程,也就是去执行那些FutureTask
2、状态快照执行完毕之后上JobManager上报checkpoint的信息

//AsyncCheckpointRunnable类publicvoidrun(){FileSystemSafetyNet.initializeSafetyNetForThread();try{TaskStateSnapshot jobManagerTaskOperatorSubtaskStates =newTaskStateSnapshot(operatorSnapshotsInProgress.size());TaskStateSnapshot localTaskOperatorSubtaskStates =newTaskStateSnapshot(operatorSnapshotsInProgress.size());for(Map.Entry<OperatorID,OperatorSnapshotFutures> entry : operatorSnapshotsInProgress.entrySet()){OperatorID operatorID = entry.getKey();OperatorSnapshotFutures snapshotInProgress = entry.getValue();//这里真正的执行的operator的快照过程,会执行那些FutureTask// finalize the async part of all by executing all snapshot runnablesOperatorSnapshotFinalizer finalizedSnapshots =newOperatorSnapshotFinalizer(snapshotInProgress);

         jobManagerTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
            operatorID,
            finalizedSnapshots.getJobManagerOwnedState());

         localTaskOperatorSubtaskStates.putSubtaskStateByOperatorID(
            operatorID,
            finalizedSnapshots.getTaskLocalState());}finallong asyncEndNanos =System.nanoTime();finallong asyncDurationMillis =(asyncEndNanos - asyncStartNanos)/1_000_000L;

      checkpointMetrics.setAsyncDurationMillis(asyncDurationMillis);if(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsyncCheckpointState.RUNNING,CheckpointingOperation.AsyncCheckpointState.COMPLETED)){//上JobManager上报checkpoint的信息reportCompletedSnapshotStates(
            jobManagerTaskOperatorSubtaskStates,
            localTaskOperatorSubtaskStates,
            asyncDurationMillis);}else{LOG.debug("{} - asynchronous part of checkpoint {} could not be completed because it was closed before.",
            owner.getName(),
            checkpointMetaData.getCheckpointId());}}catch(Exception e){handleExecutionException(e);}finally{
      owner.cancelables.unregisterCloseable(this);FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();}}

可以看到在创建OperatorSnapshotFinalizer实例的时候就会去运行那些FutureTask,执行快照过程,比如将状态数据写到文件系统如HDFS等。

//OperatorSnapshotFinalizer类publicOperatorSnapshotFinalizer(@NonnullOperatorSnapshotFutures snapshotFutures)throwsExecutionException,InterruptedException{SnapshotResult<KeyedStateHandle> keyedManaged =FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateManagedFuture());SnapshotResult<KeyedStateHandle> keyedRaw =FutureUtils.runIfNotDoneAndGet(snapshotFutures.getKeyedStateRawFuture());SnapshotResult<OperatorStateHandle> operatorManaged =FutureUtils.runIfNotDoneAndGet(snapshotFutures.getOperatorStateManagedFuture());SnapshotResult<OperatorStateHandle> operatorRaw =FutureUtils.runIfNotDoneAndGet(snapshotFutures.getOperatorStateRawFuture());

   jobManagerOwnedState =newOperatorSubtaskState(
      operatorManaged.getJobManagerOwnedSnapshot(),
      operatorRaw.getJobManagerOwnedSnapshot(),
      keyedManaged.getJobManagerOwnedSnapshot(),
      keyedRaw.getJobManagerOwnedSnapshot());

   taskLocalState =newOperatorSubtaskState(
      operatorManaged.getTaskLocalSnapshot(),
      operatorRaw.getTaskLocalSnapshot(),
      keyedManaged.getTaskLocalSnapshot(),
      keyedRaw.getTaskLocalSnapshot());}

2.1.4 Task上报checkpoint信息

ask在执行完checkpoint后会向JobManager上报checkpoint的元数据信息,最终会调用到CheckpointCoordinator.receiveAcknowledgeMessage()方法

//TaskStateManagerImpl类publicvoidreportTaskStateSnapshots(@NonnullCheckpointMetaData checkpointMetaData,@NonnullCheckpointMetrics checkpointMetrics,@NullableTaskStateSnapshot acknowledgedState,@NullableTaskStateSnapshot localState){long checkpointId = checkpointMetaData.getCheckpointId();

   localStateStore.storeLocalState(checkpointId, localState);

   checkpointResponder.acknowledgeCheckpoint(
      jobId,
      executionAttemptID,
      checkpointId,
      checkpointMetrics,
      acknowledgedState);}//JobMaster类publicvoidacknowledgeCheckpoint(finalJobID jobID,finalExecutionAttemptID executionAttemptID,finallong checkpointId,finalCheckpointMetrics checkpointMetrics,finalTaskStateSnapshot checkpointState){finalCheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator();finalAcknowledgeCheckpoint ackMessage =newAcknowledgeCheckpoint(
      jobID,
      executionAttemptID,
      checkpointId,
      checkpointMetrics,
      checkpointState);if(checkpointCoordinator !=null){getRpcService().execute(()->{try{// 通过 RPC 接口调用 receiveAcknowledgeMessage
            checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);}catch(Throwable t){
            log.warn("Error while processing checkpoint acknowledgement message", t);}});}else{...}}

CheckpointCoordinator最后会调用PendingCheckpoint.acknowledgeTask()方法,该方法就是将task上报的元数据信息(checkpoint的路径地址,状态数据大小等等)添加到PendingCheckpoint里,如果接收到了全部task上报的的Ack消息,就执行completePendingCheckpoint()

//CheckpointCoordinatorpublicbooleanreceiveAcknowledgeMessage(AcknowledgeCheckpoint message)throwsCheckpointException{...finallong checkpointId = message.getCheckpointId();synchronized(lock){// we need to check inside the lock for being shutdown as well, otherwise we// get races and invalid error log messagesif(shutdown){returnfalse;}finalPendingCheckpoint checkpoint = pendingCheckpoints.get(checkpointId);if(checkpoint !=null&&!checkpoint.isDiscarded()){switch(checkpoint.acknowledgeTask(message.getTaskExecutionId(), message.getSubtaskState(), message.getCheckpointMetrics())){caseSUCCESS:if(checkpoint.isFullyAcknowledged()){// 如果所有的 subTask 都返回 ack,此时就可以完成本次 checkPointcompletePendingCheckpoint(checkpoint);}break;...}returntrue;}...}}//PendingCheckpoint类publicTaskAcknowledgeResultacknowledgeTask(ExecutionAttemptID executionAttemptId,TaskStateSnapshot operatorSubtaskStates,CheckpointMetrics metrics){synchronized(lock){if(discarded){returnTaskAcknowledgeResult.DISCARDED;}finalExecutionVertex vertex = notYetAcknowledgedTasks.remove(executionAttemptId);if(vertex ==null){if(acknowledgedTasks.contains(executionAttemptId)){returnTaskAcknowledgeResult.DUPLICATE;}else{returnTaskAcknowledgeResult.UNKNOWN;}}else{
         acknowledgedTasks.add(executionAttemptId);}List<OperatorID> operatorIDs = vertex.getJobVertex().getOperatorIDs();int subtaskIndex = vertex.getParallelSubtaskIndex();long ackTimestamp =System.currentTimeMillis();long stateSize =0L;//将上报的operatorSubtaskStates也就是每个Task的状态快照元数据添加到PendingCheckpoint的operatorStates里面if(operatorSubtaskStates !=null){for(OperatorID operatorID : operatorIDs){OperatorSubtaskState operatorSubtaskState =
               operatorSubtaskStates.getSubtaskStateByOperatorID(operatorID);// if no real operatorSubtaskState was reported, we insert an empty stateif(operatorSubtaskState ==null){
               operatorSubtaskState =newOperatorSubtaskState();}OperatorState operatorState = operatorStates.get(operatorID);if(operatorState ==null){
               operatorState =newOperatorState(
                  operatorID,
                  vertex.getTotalNumberOfParallelSubtasks(),
                  vertex.getMaxParallelism());
               operatorStates.put(operatorID, operatorState);}

            operatorState.putState(subtaskIndex, operatorSubtaskState);
            stateSize += operatorSubtaskState.getStateSize();}}++numAcknowledgedTasks;...returnTaskAcknowledgeResult.SUCCESS;}}

JobManager完成所有task的ack之后,会做以下操作:

  1. 将PendingCheckpoint 转成CompletedCheckpoint,标志着checkpoint过程完成,CompletedCheckpoint里包含了checkpoint的元数据信息,包括checkpoint的路径地址,状态数据大小等等,同时也会将元数据信息进行持久化,目录为 c h e c k p o i n t D i r / checkpointDir/ checkpointDir/uuid/chk-***/_metadata,也会把过期的checkpoint数据给删除
  2. 通知所有的task进行commit操作,一般来说,task的commit操作其实不需要做什么,但是像那种TwoPhaseCommitSinkFunction,比如FlinkKafkaProducer,就会进行一些事物的提交操作等。
privatevoidcompletePendingCheckpoint(PendingCheckpoint pendingCheckpoint)throwsCheckpointException{finallong checkpointId = pendingCheckpoint.getCheckpointId();finalCompletedCheckpoint completedCheckpoint;// As a first step to complete the checkpoint, we register its state with the registryMap<OperatorID,OperatorState> operatorStates = pendingCheckpoint.getOperatorStates();
   sharedStateRegistry.registerAll(operatorStates.values());try{try{// 完成 checkpoint
         completedCheckpoint = pendingCheckpoint.finalizeCheckpoint();}catch(Exception e1){...}// the pending checkpoint must be discarded after the finalizationPreconditions.checkState(pendingCheckpoint.isDiscarded()&& completedCheckpoint !=null);try{
         completedCheckpointStore.addCheckpoint(completedCheckpoint);}catch(Exception exception){...}}finally{
      pendingCheckpoints.remove(checkpointId);triggerQueuedRequests();}rememberRecentCheckpointId(checkpointId);// drop those pending checkpoints that are at prior to the completed onedropSubsumedCheckpoints(checkpointId);// record the time when this was completed, to calculate// the 'min delay between checkpoints'
   lastCheckpointCompletionNanos =System.nanoTime();...// send the "notify complete" call to all verticesfinallong timestamp = completedCheckpoint.getTimestamp();for(ExecutionVertex ev : tasksToCommitTo){Execution ee = ev.getCurrentExecutionAttempt();if(ee !=null){// task在接收到消息之后会调用Task.notifyCheckpointComplete()方法
         ee.notifyCheckpointComplete(checkpointId, timestamp);}}}

2.1.5 JobManager通知Task进行commit

ask在接收到消息之后会调用Task.notifyCheckpointComplete()方法,最后会调用StreamOperator.notifyCheckpointComplete(),一般来说不做什么操作。但是像AbstractUdfStreamOperator这种的可能还会由一些其他操作

publicvoidnotifyCheckpointComplete(finallong checkpointID){finalAbstractInvokable invokable =this.invokable;if(executionState ==ExecutionState.RUNNING&& invokable !=null){Runnable runnable =newRunnable(){@Overridepublicvoidrun(){try{
               invokable.notifyCheckpointComplete(checkpointID);
               taskStateManager.notifyCheckpointComplete(checkpointID);}catch(Throwable t){...}}};executeAsyncCallRunnable(runnable,"Checkpoint Confirmation for "+
         taskNameWithSubtask);}else{LOG.debug("Ignoring checkpoint commit notification for non-running task {}.", taskNameWithSubtask);}}
//StreamTask类publicvoidnotifyCheckpointComplete(long checkpointId)throwsException{synchronized(lock){if(isRunning){LOG.debug("Notification of complete checkpoint for task {}",getName());for(StreamOperator<?> operator : operatorChain.getAllOperators()){if(operator !=null){
               operator.notifyCheckpointComplete(checkpointId);}}}else{LOG.debug("Ignoring notification of complete checkpoint for not-running task {}",getName());}}}//AbstractStreamOperator类publicvoidnotifyCheckpointComplete(long checkpointId)throwsException{if(keyedStateBackend !=null){
      keyedStateBackend.notifyCheckpointComplete(checkpointId);}}//HeapKeyedStateBackend类publicvoidnotifyCheckpointComplete(long checkpointId){//Nothing to do}

AbstractUdfStreamOperator主要是针对用户自定义函数的operator,像StreamMap,StreamSource等等,如果用户定义的Function实现了CheckpointListener接口,则会进行额外的一些处理,例如FlinkKafkaConsumerBase会向kafka提交消费的offset,TwoPhaseCommitSinkFunction类会进行事务的提交,例如FlinkKafkaProducer。值得一提的是,TwoPhaseCommitSinkFunction是保证flink端到端exactly-once的保证

//AbstractUdfStreamOperatorpublicvoidnotifyCheckpointComplete(long checkpointId)throwsException{super.notifyCheckpointComplete(checkpointId);if(userFunction instanceofCheckpointListener){((CheckpointListener) userFunction).notifyCheckpointComplete(checkpointId);}}
//FlinkKafkaConsumerBase类publicfinalvoidnotifyCheckpointComplete(long checkpointId)throwsException{if(!running){LOG.debug("notifyCheckpointComplete() called on closed source");return;}finalAbstractFetcher<?,?> fetcher =this.kafkaFetcher;if(fetcher ==null){LOG.debug("notifyCheckpointComplete() called on uninitialized source");return;}if(offsetCommitMode ==OffsetCommitMode.ON_CHECKPOINTS){// only one commit operation must be in progressif(LOG.isDebugEnabled()){LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint "+ checkpointId);}try{...@SuppressWarnings("unchecked")Map<KafkaTopicPartition,Long> offsets =(Map<KafkaTopicPartition,Long>) pendingOffsetsToCommit.remove(posInMap);...//向kafka提交offset
         fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);}catch(Exception e){...}}}
//TwoPhaseCommitSinkFunction类publicfinalvoidnotifyCheckpointComplete(long checkpointId)throwsException{Iterator<Map.Entry<Long,TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();checkState(pendingTransactionIterator.hasNext(),"checkpoint completed, but no transaction pending");Throwable firstError =null;while(pendingTransactionIterator.hasNext()){...try{//提交事务commit(pendingTransaction.handle);}catch(Throwable t){...}...}...}//FlinkKafkaProducer类protectedvoidcommit(FlinkKafkaProducer.KafkaTransactionState transaction){if(transaction.isTransactional()){try{
         transaction.producer.commitTransaction();}finally{recycleTransactionalProducer(transaction.producer);}}}

所有的task完成了notifyCheckpointComplete()方法后,一个完整的checkpoint流程就完成了

2.2 非SourceStreamTask的checkpoint实现

上述只说了source task的checkpoint实现,source task的checkpoint是由JobManager来触发的,那么非source task的checkpoint流程又是如何的呢?
上面说了source task会向下游广播发送CheckpointBarrier,那么下游的task就会接收到source task发送的CheckpointBarrier,checkpoint的起始位置也在接收到CheckpointBarrier。起点就在StreamTask中的CheckpointBarrierHandler.getNextNonBlocked()中

//StreamInputProcessor类publicbooleanprocessInput()throwsException{...while(true){...//finalBufferOrEvent bufferOrEvent = barrierHandler.getNextNonBlocked();...}}

CheckpointBarrierHandler会根据CheckpointingMode模式不同生成不同的Handler,如果是EXACTLY_ONCE,就会生成BarrierBuffer,会进行barrier对齐,BarrierBuffer中的CachedBufferBlocker是用来缓存barrier对齐时从被阻塞channel接收到的数据。如果CheckpointingMode是AT_LEAST_ONCE,那就会生成BarrierTracker,不会进行barrier对齐。

publicstaticCheckpointBarrierHandlercreateCheckpointBarrierHandler(StreamTask<?,?> checkpointedTask,CheckpointingMode checkpointMode,IOManager ioManager,InputGate inputGate,Configuration taskManagerConfig)throwsIOException{CheckpointBarrierHandler barrierHandler;if(checkpointMode ==CheckpointingMode.EXACTLY_ONCE){long maxAlign = taskManagerConfig.getLong(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT);if(!(maxAlign ==-1|| maxAlign >0)){thrownewIllegalConfigurationException(TaskManagerOptions.TASK_CHECKPOINT_ALIGNMENT_BYTES_LIMIT.key()+" must be positive or -1 (infinite)");}if(taskManagerConfig.getBoolean(TaskManagerOptions.NETWORK_CREDIT_MODEL)){
         barrierHandler =newBarrierBuffer(inputGate,newCachedBufferBlocker(inputGate.getPageSize()), maxAlign);}else{
         barrierHandler =newBarrierBuffer(inputGate,newBufferSpiller(ioManager, inputGate.getPageSize()), maxAlign);}}elseif(checkpointMode ==CheckpointingMode.AT_LEAST_ONCE){
      barrierHandler =newBarrierTracker(inputGate);}else{thrownewIllegalArgumentException("Unrecognized Checkpointing Mode: "+ checkpointMode);}if(checkpointedTask !=null){
      barrierHandler.registerCheckpointEventHandler(checkpointedTask);}return barrierHandler;}

2.2.1 EXACTLY_ONCE下checkpoint的实现

下面看看CheckpointBarrierHandler的处理逻辑,首先看BarrierBuffer
1、首先获取到下一个BufferOrEvent,可能是从当前缓存中获取获取,如果缓存没了,就从inputGate获取
2、如果获取的事件是已经被设置为阻塞的channel的,就将其放到缓存中,如果channel已经接收到CheckpointBarrier,就会将其设置为block状态
3、如果接收的事件是CheckpointBarrier,处理接收到的CheckpointBarrier,如果接收到的是正常的数据就返回

//BarrierBuffer类publicBufferOrEventgetNextNonBlocked()throwsException{while(true){// process buffered BufferOrEvents before grabbing new onesOptional<BufferOrEvent> next;if(currentBuffered ==null){
         next = inputGate.getNextBufferOrEvent();}else{
         next =Optional.ofNullable(currentBuffered.getNext());if(!next.isPresent()){completeBufferedSequence();returngetNextNonBlocked();}}...BufferOrEvent bufferOrEvent = next.get();if(isBlocked(bufferOrEvent.getChannelIndex())){//如果数据是被阻塞的channel,就将其添加到缓存,如果channel已经接收到CheckpointBarrier,就会将其设置为block状态// if the channel is blocked, we just store the BufferOrEvent
         bufferBlocker.add(bufferOrEvent);checkSizeLimit();}elseif(bufferOrEvent.isBuffer()){return bufferOrEvent;}elseif(bufferOrEvent.getEvent().getClass()==CheckpointBarrier.class){if(!endOfStream){//处理接收到的CheckpointBarrier// process barriers only if there is a chance of the checkpoint completingprocessBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());}}elseif(bufferOrEvent.getEvent().getClass()==CancelCheckpointMarker.class){processCancellationBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent());}else{if(bufferOrEvent.getEvent().getClass()==EndOfPartitionEvent.class){processEndOfPartition();}return bufferOrEvent;}}}//CachedBufferBlocker类publicvoidadd(BufferOrEvent boe){
   bytesBlocked += pageSize;//currentBuffers是一个ArrayDeque<BufferOrEvent>
   currentBuffers.add(boe);}

核心处理逻辑在processBarrier()方法中,实现如下:
1、如果只有一个channel,也就是说只有一个上游任务,就立马触发checkpoint
2、如果有多个上游任务,也就是有多个channel,那么会有以下几种情况的判断
1)、如果是首次接收到barrier,就开始进行barrier对齐,并将该channel设置为阻塞状态
2)、如果不是首次接收到barrier,但也不是最后一个barrier,就只给channel设置为阻塞状态
3)、如果在没完成当前checkpoint的时候又接收到了下一次checkpoint的barrier,就终止当前的checkpoint,重新开始新的一次checkpoint,这种情况并不常见
4)、如果接收到全部channel,即上游所有任务的barrier,就开始触发checkpoint,并取消所有channel的阻塞状态,开始处理那些被添加到缓存的事件

privatevoidprocessBarrier(CheckpointBarrier receivedBarrier,int channelIndex)throwsException{finallong barrierId = receivedBarrier.getId();// fast path for single channel casesif(totalNumberOfInputChannels ==1){if(barrierId > currentCheckpointId){// new checkpoint
         currentCheckpointId = barrierId;notifyCheckpoint(receivedBarrier);}return;}// -- general code path for multiple input channels --if(numBarriersReceived >0){// this is only true if some alignment is already progress and was not canceledif(barrierId == currentCheckpointId){// regular caseonBarrier(channelIndex);}elseif(barrierId > currentCheckpointId){//这种情况就是上一个checkpoint还没完呢,就接收到下一个checkpoint的barrier,这种情况,就终止当前的checkpoint// we did not complete the current checkpoint, another started before...// let the task know we are not completing thisnotifyAbort(currentCheckpointId,newCheckpointDeclineSubsumedException(barrierId));// abort the current checkpointreleaseBlocksAndResetBarriers();// begin a the new checkpointbeginNewAlignment(barrierId, channelIndex);}else{// ignore trailing barrier from an earlier checkpoint (obsolete now)return;}}elseif(barrierId > currentCheckpointId){// first barrier of a new checkpointbeginNewAlignment(barrierId, channelIndex);}else{// either the current checkpoint was canceled (numBarriers == 0) or// this barrier is from an old subsumed checkpointreturn;}//接收到所有channel的barrier,开始触发checkpoint// check if we have all barriers - since canceled checkpoints always have zero barriers// this can only happen on a non canceled checkpointif(numBarriersReceived + numClosedChannels == totalNumberOfInputChannels){// actually trigger checkpoint...//释放所有channel的阻塞状态releaseBlocksAndResetBarriers();//开始触发checkpointnotifyCheckpoint(receivedBarrier);}}//开始barrier对齐privatevoidbeginNewAlignment(long checkpointId,int channelIndex)throwsIOException{
   currentCheckpointId = checkpointId;onBarrier(channelIndex);

   startOfAlignmentTimestamp =System.nanoTime();...}//onBarrier()就是将其channel设置为阻塞状态privatevoidonBarrier(int channelIndex)throwsIOException{if(!blockedChannels[channelIndex]){
      blockedChannels[channelIndex]=true;

      numBarriersReceived++;...}else{thrownewIOException("Stream corrupt: Repeated barrier for same checkpoint on input "+ channelIndex);}}

触发checkpoint的逻辑在notifyCheckpoint()方法中,toNotifyOnCheckpoint就是StreamTask,最后还是调用了StreamTask.triggerCheckpointOnBarrier(),就跟上述描述的source task的checkpoint逻辑是一样的了。

privatevoidnotifyCheckpoint(CheckpointBarrier checkpointBarrier)throwsException{if(toNotifyOnCheckpoint !=null){CheckpointMetaData checkpointMetaData =newCheckpointMetaData(checkpointBarrier.getId(), checkpointBarrier.getTimestamp());long bytesBuffered = currentBuffered !=null? currentBuffered.size():0L;CheckpointMetrics checkpointMetrics =newCheckpointMetrics().setBytesBufferedInAlignment(bytesBuffered).setAlignmentDurationNanos(latestAlignmentDurationNanos);

      toNotifyOnCheckpoint.triggerCheckpointOnBarrier(
         checkpointMetaData,
         checkpointBarrier.getCheckpointOptions(),
         checkpointMetrics);}}//StreamTaskpublicvoidtriggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData,CheckpointOptions checkpointOptions,CheckpointMetrics checkpointMetrics)throwsException{try{performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics);}catch(CancelTaskException e){...}}

2.2.2 AT_LEAST_ONCE下的checkpoint实现

//BarrierTracker类publicBufferOrEventgetNextNonBlocked()throwsException{while(true){Optional<BufferOrEvent> next = inputGate.getNextBufferOrEvent();if(!next.isPresent()){// buffer or input exhaustedreturnnull;}BufferOrEvent bufferOrEvent = next.get();if(bufferOrEvent.isBuffer()){return bufferOrEvent;}elseif(bufferOrEvent.getEvent().getClass()==CheckpointBarrier.class){processBarrier((CheckpointBarrier) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());}elseif(bufferOrEvent.getEvent().getClass()==CancelCheckpointMarker.class){processCheckpointAbortBarrier((CancelCheckpointMarker) bufferOrEvent.getEvent(), bufferOrEvent.getChannelIndex());}else{// some other eventreturn bufferOrEvent;}}}

核心实现还是在processBarrier()中,实现如下:
1、如果一个channel,也即上游只有一个channel,就立即触发checkpoint
2、在多个channel的情况下,也即多个上游任务,有如下几种情况
1)、如果是首次接收到barrier,只更新一下当前的checkpointID
2)、不是首次,但也不是最后一次接收到barrier,基本什么也不做,就累加一个计数器
3)、如果接收到所有channel的barrier,就开始触发checkpoint
因为BarrierTracker在等待所有channel的barrier到来的时候还会处理其他已经到来barrier的channel的数据,所以在进行状态数据快照的时候就会造成多了一部分数据的状态,快照并不具备一致性。在程序从checkpoint进行恢复的时候会重复处理这部分数据,所以才会出现AT_LEAST_ONCE

//BarrierTracker类privatevoidprocessBarrier(CheckpointBarrier receivedBarrier,int channelIndex)throwsException{finallong barrierId = receivedBarrier.getId();// fast path for single channel trackersif(totalNumberOfInputChannels ==1){notifyCheckpoint(barrierId, receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());return;}// general path for multiple input channelsif(LOG.isDebugEnabled()){LOG.debug("Received barrier for checkpoint {} from channel {}", barrierId, channelIndex);}// find the checkpoint barrier in the queue of pending barriersCheckpointBarrierCount cbc =null;int pos =0;for(CheckpointBarrierCount next : pendingCheckpoints){if(next.checkpointId == barrierId){
         cbc = next;break;}
      pos++;}if(cbc !=null){// add one to the count to that barrier and check for completionint numBarriersNew = cbc.incrementBarrierCount();//当接收到所有channel的barrier,开始触发checkpointif(numBarriersNew == totalNumberOfInputChannels){// checkpoint can be triggered (or is aborted and all barriers have been seen)// first, remove this checkpoint and all all prior pending// checkpoints (which are now subsumed)for(int i =0; i <= pos; i++){
            pendingCheckpoints.pollFirst();}// notify the listenerif(!cbc.isAborted()){if(LOG.isDebugEnabled()){LOG.debug("Received all barriers for checkpoint {}", barrierId);}notifyCheckpoint(receivedBarrier.getId(), receivedBarrier.getTimestamp(), receivedBarrier.getCheckpointOptions());}}}else{// first barrier for that checkpoint ID// add it only if it is newer than the latest checkpoint.// if it is not newer than the latest checkpoint ID, then there cannot be a// successful checkpoint for that ID anywaysif(barrierId > latestPendingCheckpointID){
         latestPendingCheckpointID = barrierId;
         pendingCheckpoints.addLast(newCheckpointBarrierCount(barrierId));// make sure we do not track too many checkpointsif(pendingCheckpoints.size()>MAX_CHECKPOINTS_TO_TRACK){
            pendingCheckpoints.pollFirst();}}}}

notifyCheckpoint()方法也是调用了StreamTask.triggerCheckpointOnBarrier(),跟BarrierBuffer是一样的。

//BarrierTracker类privatevoidnotifyCheckpoint(long checkpointId,long timestamp,CheckpointOptions checkpointOptions)throwsException{if(toNotifyOnCheckpoint !=null){CheckpointMetaData checkpointMetaData =newCheckpointMetaData(checkpointId, timestamp);CheckpointMetrics checkpointMetrics =newCheckpointMetrics().setBytesBufferedInAlignment(0L).setAlignmentDurationNanos(0L);

      toNotifyOnCheckpoint.triggerCheckpointOnBarrier(checkpointMetaData, checkpointOptions, checkpointMetrics);}}

上述的流程会一直执行到sink task,sink task执行完checkpoint,整个checkpoint就完成了。

2.3 2.4 checkPoint 状态数据存储

operatorState和keyedState的snapshot过程被封装到一个RunnableFuture,它并不会立即执行,之后会有一个异步的线程调用RunnableFuture.run()才会真正的执行snapshot。

//AbstractStreamOperator类publicfinalOperatorSnapshotFuturessnapshotState(long checkpointId,long timestamp,CheckpointOptions checkpointOptions,CheckpointStreamFactory factory)throwsException{KeyGroupRange keyGroupRange =null!= keyedStateBackend ?
         keyedStateBackend.getKeyGroupRange():KeyGroupRange.EMPTY_KEY_GROUP_RANGE;OperatorSnapshotFutures snapshotInProgress =newOperatorSnapshotFutures();try(StateSnapshotContextSynchronousImpl snapshotContext =newStateSnapshotContextSynchronousImpl(
         checkpointId,
         timestamp,
         factory,
         keyGroupRange,getContainingTask().getCancelables())){snapshotState(snapshotContext);//keyedStateRawFuture和operatorStateRawFuture都是DoneFuture,也就是一个已经完成的RunnableFuture
      snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
      snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());//operatorState和keyedState的snapshot过程被封装到一个RunnableFuture,并不会立即执行,//之后调用RunnableFuture.run()才会真正的执行snapshotif(null!= operatorStateBackend){
         snapshotInProgress.setOperatorStateManagedFuture(
            operatorStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}if(null!= keyedStateBackend){
         snapshotInProgress.setKeyedStateManagedFuture(
            keyedStateBackend.snapshot(checkpointId, timestamp, factory, checkpointOptions));}}catch(Exception snapshotException){...}return snapshotInProgress;}
//HeapKeyedStateBackend类publicRunnableFuture<SnapshotResult<KeyedStateHandle>>snapshot(finallong checkpointId,finallong timestamp,@NonnullfinalCheckpointStreamFactory streamFactory,@NonnullCheckpointOptions checkpointOptions)throwsIOException{long startTime =System.currentTimeMillis();finalRunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunner =
      snapshotStrategy.snapshot(checkpointId, timestamp, streamFactory, checkpointOptions);

   snapshotStrategy.logSyncCompleted(streamFactory, startTime);return snapshotRunner;}

2.4.1 KeyedState快照过程

首先我们来看KeyedState的快照,调用的是snapshotStrategy.snapshot(),checkpoint的核心源码逻辑都在里面,这个snapshotStrategy就是HeapSnapshotStrategy,源码如下,主要实现的逻辑大致如下:
1、首先对状态数据进行快照拷贝,这里拷贝的仅仅是状态数据的引用,而不是对实例对象的拷贝
2、创建checkpoint的输出流CheckpointStateOutputStream,这个就是checkpoint的数据目录,但是并非是最终的数据目录,而是像

     C 
    
   
     H 
    
   
     E 
    
   
     C 
    
   
     K 
    
   
     P 
    
   
     O 
    
   
     I 
    
   
     N 
    
    
    
      T 
     
    
      D 
     
    
   
     I 
    
   
     R 
    
   
     / 
    
   
  
    CHECKPOINT_DIR/ 
   
  
CHECKPOINTD​IR/UID/chk-n这种的

3、上述2个步骤都是同步进行的,下面就是生成一个AsyncSnapshotCallable,callInternal()方法就是对checkpoint的持久化操作,这是可以异步进行的。大致步骤又分为:
1)、获取CheckpointStateOutputStream,写入状态数据的元数据信息,包括状态的名字、
序列化类型等
2)、对状态数据按照keyGroupId进行分组,依次对每个keyGoupId的状态数据进行写入
3)、封装状态数据checkpoint的元数据信息,StreamStateHandle里包含状态数据存储的路径和
大小,KeyGroupRangeOffsets里包含的是每个keyGoupId在文件中存储的offset位置

/HeapSnapshotStrategy类
publicRunnableFuture<SnapshotResult<KeyedStateHandle>>snapshot(long checkpointId,long timestamp,@NonnullCheckpointStreamFactory primaryStreamFactory,@NonnullCheckpointOptions checkpointOptions)throwsIOException{if(!hasRegisteredState()){returnDoneFuture.of(SnapshotResult.empty());}int numStates = registeredKVStates.size()+ registeredPQStates.size();Preconditions.checkState(numStates <=Short.MAX_VALUE,"Too many states: "+ numStates +". Currently at most "+Short.MAX_VALUE+" states are supported");finalList<StateMetaInfoSnapshot> metaInfoSnapshots =newArrayList<>(numStates);finalMap<StateUID,Integer> stateNamesToId =newHashMap<>(numStates);finalMap<StateUID,StateSnapshot> cowStateStableSnapshots =newHashMap<>(numStates);//对状态数据进行快照拷贝到cowStateStableSnapshots processSnapshotMetaInfoForAllStates(
      metaInfoSnapshots,
      cowStateStableSnapshots,
      stateNamesToId,
      registeredKVStates,StateMetaInfoSnapshot.BackendStateType.KEY_VALUE);processSnapshotMetaInfoForAllStates(
      metaInfoSnapshots,
      cowStateStableSnapshots,
      stateNamesToId,
      registeredPQStates,//registeredPQStates对我们keyed状态数据没有影响StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE);finalKeyedBackendSerializationProxy<K> serializationProxy =newKeyedBackendSerializationProxy<>(// TODO: this code assumes that writing a serializer is threadsafe, we should support to// get a serialized form already at state registration time in the futuregetKeySerializer(),
         metaInfoSnapshots,!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, keyGroupCompressionDecorator));//创建checkpoint的输出流CheckpointStateOutputStream      finalSupplierWithException<CheckpointStreamWithResultProvider,Exception> checkpointStreamSupplier =

      localRecoveryConfig.isLocalRecoveryEnabled()?()->CheckpointStreamWithResultProvider.createDuplicatingStream(
            checkpointId,CheckpointedStateScope.EXCLUSIVE,
            primaryStreamFactory,
            localRecoveryConfig.getLocalStateDirectoryProvider()):()->CheckpointStreamWithResultProvider.createSimpleStream(CheckpointedStateScope.EXCLUSIVE,
            primaryStreamFactory);//--------------------------------------------------- this becomes the end of sync partfinalAsyncSnapshotCallable<SnapshotResult<KeyedStateHandle>> asyncSnapshotCallable =newAsyncSnapshotCallable<SnapshotResult<KeyedStateHandle>>(){@OverrideprotectedSnapshotResult<KeyedStateHandle>callInternal()throwsException{finalCheckpointStreamWithResultProvider streamWithResultProvider =
               checkpointStreamSupplier.get();

            snapshotCloseableRegistry.registerCloseable(streamWithResultProvider);finalCheckpointStreamFactory.CheckpointStateOutputStream localStream =
               streamWithResultProvider.getCheckpointOutputStream();finalDataOutputViewStreamWrapper outView =newDataOutputViewStreamWrapper(localStream);//写入状态数据的元数据信息,包括状态的名字、序列化类型等
            serializationProxy.write(outView);finallong[] keyGroupRangeOffsets =newlong[keyGroupRange.getNumberOfKeyGroups()];//对状态数据按照keyGroupId进行分组,依次对每个keyGoupId的状态数据进行写入for(int keyGroupPos =0; keyGroupPos < keyGroupRange.getNumberOfKeyGroups();++keyGroupPos){int keyGroupId = keyGroupRange.getKeyGroupId(keyGroupPos);//记录每个keyGoupId的状态数据的存储位置
               keyGroupRangeOffsets[keyGroupPos]= localStream.getPos();
               outView.writeInt(keyGroupId);for(Map.Entry<StateUID,StateSnapshot> stateSnapshot :
                  cowStateStableSnapshots.entrySet()){StateSnapshot.StateKeyGroupWriter partitionedSnapshot =//分组的实现
                     stateSnapshot.getValue().getKeyGroupWriter();try(OutputStream kgCompressionOut =
                        keyGroupCompressionDecorator.decorateWithCompression(localStream)){DataOutputViewStreamWrapper kgCompressionView =newDataOutputViewStreamWrapper(kgCompressionOut);
                     kgCompressionView.writeShort(stateNamesToId.get(stateSnapshot.getKey()));//写入每个keyGoupId的状态数据
                     partitionedSnapshot.writeStateInKeyGroup(kgCompressionView, keyGroupId);}// this will just close the outer compression stream}}//封装状态数据的checkpoint元数据信息if(snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)){KeyGroupRangeOffsets kgOffs =newKeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);SnapshotResult<StreamStateHandle> result =
                  streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();returnCheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result, kgOffs);}else{thrownewIOException("Stream already unregistered.");}}@OverrideprotectedvoidcleanupProvidedResources(){for(StateSnapshot tableSnapshot : cowStateStableSnapshots.values()){
               tableSnapshot.release();}}@OverrideprotectedvoidlogAsyncSnapshotComplete(long startTime){if(snapshotStrategySynchronicityTrait.isAsynchronous()){logAsyncCompleted(primaryStreamFactory, startTime);}}};finalFutureTask<SnapshotResult<KeyedStateHandle>> task =
      asyncSnapshotCallable.toAsyncSnapshotFutureTask(cancelStreamRegistry);finalizeSnapshotBeforeReturnHook(task);return task;}

状态数据的快照拷贝
拷贝的方法调用起始在processSnapshotMetaInfoForAllStates()方法,registeredStates就是我们注册的一些状态数据表,是一个Map,key就是状态数据的名字,比如下面的代码就注册了一个名字为sumValueState的状态表,value就是状态表,默认是一个CopyOnWriteStateTable
对每个状态表都会创建一个快照放到cowStateStableSnapshots中,快照的核心就是将CopyOnWriteStateTable中的状态数据拷贝到CopyOnWriteStateTableSnapshot中的一个数组snapshotData中,注意这里拷贝的仅仅是数据的引用而已,并不是对象实例,这个过程很快,也不会占用过多的内存。CopyOnWriteStateTable使用了写入时复制技术,状态数据对象本身的值并不会发生改变,而是用一个新对象来替换原来的老对象,这样即使快照和数据处理同时进行,快照还是指向的老对象,引用的对象本身也并不会发生变化。

privatevoidprocessSnapshotMetaInfoForAllStates(List<StateMetaInfoSnapshot> metaInfoSnapshots,Map<StateUID,StateSnapshot> cowStateStableSnapshots,Map<StateUID,Integer> stateNamesToId,Map<String,?extendsStateSnapshotRestore> registeredStates,StateMetaInfoSnapshot.BackendStateType stateType){for(Map.Entry<String,?extendsStateSnapshotRestore> kvState : registeredStates.entrySet()){finalStateUID stateUid =StateUID.of(kvState.getKey(), stateType);
      stateNamesToId.put(stateUid, stateNamesToId.size());StateSnapshotRestore state = kvState.getValue();if(null!= state){finalStateSnapshot stateSnapshot = state.stateSnapshot();
         metaInfoSnapshots.add(stateSnapshot.getMetaInfoSnapshot());
         cowStateStableSnapshots.put(stateUid, stateSnapshot);}}}
//CopyOnWriteStateTable类publicCopyOnWriteStateTableSnapshot<K,N,S>stateSnapshot(){returnnewCopyOnWriteStateTableSnapshot<>(this);}//CopyOnWriteStateTableSnapshot类CopyOnWriteStateTableSnapshot(CopyOnWriteStateTable<K,N,S> owningStateTable){super(owningStateTable);//对CopyOnWriteStateTable表中的状态数据进行一个拷贝this.snapshotData = owningStateTable.snapshotTableArrays();this.snapshotVersion = owningStateTable.getStateTableVersion();this.numberOfEntriesInSnapshotData = owningStateTable.size();// We create duplicates of the serializers for the async snapshot, because TypeSerializer// might be stateful and shared with the event processing thread.this.localKeySerializer = owningStateTable.keyContext.getKeySerializer().duplicate();this.localNamespaceSerializer = owningStateTable.metaInfo.getNamespaceSerializer().duplicate();this.localStateSerializer = owningStateTable.metaInfo.getStateSerializer().duplicate();this.partitionedStateTableSnapshot =null;this.stateSnapshotTransformer = owningStateTable.metaInfo.getStateSnapshotTransformFactory().createForDeserializedState().orElse(null);}//CopyOnWriteStateTable类StateTableEntry<K,N,S>[]snapshotTableArrays(){synchronized(snapshotVersions){// increase the table version for copy-on-write and register the snapshotif(++stateTableVersion <0){// this is just a safety net against overflows, but should never happen in practice (i.e., only after 2^31 snapshots)thrownewIllegalStateException("Version count overflow in CopyOnWriteStateTable. Enforcing restart.");}

      highestRequiredSnapshotVersion = stateTableVersion;
      snapshotVersions.add(highestRequiredSnapshotVersion);}StateTableEntry<K,N,S>[] table = primaryTable;finalint totalTableIndexSize = rehashIndex + table.length;finalint copiedArraySize =Math.max(totalTableIndexSize,size());finalStateTableEntry<K,N,S>[] copy =newStateTableEntry[copiedArraySize];if(isRehashing()){// consider both tables for the snapshot, the rehash index tells us which part of the two tables we needfinalint localRehashIndex = rehashIndex;finalint localCopyLength = table.length - localRehashIndex;// for the primary table, take every index >= rhIdx.System.arraycopy(table, localRehashIndex, copy,0, localCopyLength);// for the new table, we are sure that two regions contain all the entries:// [0, rhIdx[ AND [table.length / 2, table.length / 2 + rhIdx[
      table = incrementalRehashTable;System.arraycopy(table,0, copy, localCopyLength, localRehashIndex);System.arraycopy(table, table.length >>>1, copy, localCopyLength + localRehashIndex, localRehashIndex);}else{// we only need to copy the primary tableSystem.arraycopy(table,0, copy,0, table.length);}return copy;}

2.4.2 创建CheckpointStateOutputStream

一般在正式环境我们设置的checkpoint存储都是基于文件系统的,即FsStateBackend, 而不是内存MemoryStateBackend,在FsStateBackend模式下,最终生成的CheckpointStateOutputStream是FsCheckpointStateOutputStream

staticCheckpointStreamWithResultProvidercreateSimpleStream(@NonnullCheckpointedStateScope checkpointedStateScope,@NonnullCheckpointStreamFactory primaryStreamFactory)throwsIOException{CheckpointStreamFactory.CheckpointStateOutputStream primaryOut =
      primaryStreamFactory.createCheckpointStateOutputStream(checkpointedStateScope);returnnewCheckpointStreamWithResultProvider.PrimaryStreamOnly(primaryOut);}publicFsCheckpointStateOutputStreamcreateCheckpointStateOutputStream(CheckpointedStateScope scope)throwsIOException{Path target = scope ==CheckpointedStateScope.EXCLUSIVE? checkpointDirectory : sharedStateDirectory;int bufferSize =Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);// 创建 FsCheckpointStateOutputStreamreturnnewFsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);}

构建FsCheckpointStateOutputStream有几个参数:
basePath:checkpoint基础路径,形式是

      c 
     
    
      h 
     
    
      e 
     
    
      c 
     
    
      k 
     
    
      p 
     
    
      o 
     
    
      i 
     
    
      n 
     
     
     
       t 
      
     
       d 
      
     
    
      i 
     
    
      r 
     
    
   
     / 
    
   
  
    {checkpoint_dir}/ 
   
  
checkpointd​ir/{uid}/chk-${checkpointid}

fs:文件系统FileSystem,这里是flink自己封装的,对于hdfs有HadoopFileSystem,本地文件有LocalFileSystem
bufferSize:写OutputStream时会先将数据写入到buffer,bufferSize控制buffer的大小
fileStateThreshold:文件状态的阈值,超过这个大小就会将状态数据写入到文件,否则就直接封装到StreamHandle里面,这样做是为了减少小文件,默认值为1024,即1kb

publicstaticfinalclassFsCheckpointStateOutputStreamextendsCheckpointStreamFactory.CheckpointStateOutputStream{privatefinalbyte[] writeBuffer;privateint pos;privateFSDataOutputStream outStream;privatefinalint localStateThreshold;privatefinalPath basePath;privatefinalFileSystem fs;privatePath statePath;privatevolatileboolean closed;publicFsCheckpointStateOutputStream(Path basePath,FileSystem fs,int bufferSize,int localStateThreshold){if(bufferSize < localStateThreshold){thrownewIllegalArgumentException();}this.basePath = basePath;this.fs = fs;this.writeBuffer =newbyte[bufferSize];this.localStateThreshold = localStateThreshold;}@Overridepublicvoidwrite(int b)throwsIOException{if(pos >= writeBuffer.length){flush();}
    writeBuffer[pos++]=(byte) b;}...

此外,FsCheckpointStateOutputStream还有一些其他的成员:
writeBuffer:一个缓冲数组byte[],写OutputStream时会先将数据写入到buffer缓冲区,减少磁盘的io交互
pos:writeBuffer中数据写入的位置
outStream:checkpoint最终文件路径封装的FSDataOutputStream,通过这个OutputStream写入状态数据到最终文件里面
statePath:状态数据的文件路径,路径形式为

      c 
     
    
      h 
     
    
      e 
     
    
      c 
     
    
      k 
     
    
      p 
     
    
      o 
     
    
      i 
     
    
      n 
     
     
     
       t 
      
     
       d 
      
     
    
      i 
     
    
      r 
     
    
   
     / 
    
   
  
    {checkpoint_dir}/ 
   
  
checkpointd​ir/{uid}/chk- 
 
  
   
    
    
      c 
     
    
      h 
     
    
      e 
     
    
      c 
     
    
      k 
     
    
      p 
     
    
      o 
     
    
      i 
     
    
      n 
     
    
      t 
     
    
      i 
     
    
      d 
     
    
   
     / 
    
   
  
    {checkpointid}/ 
   
  
checkpointid/{uuid}

2.4.3 写入状态数据元数据信息

调用serializationProxy.write(outView)写入状态数据的元数据信息,包括状态数据的名称,序列化类型等等

//KeyedBackendSerializationProxy类publicvoidwrite(DataOutputView out)throwsIOException{super.write(out);// write the compression format used to write each key-group
   out.writeBoolean(usingKeyGroupCompression);TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, keySerializerSnapshot, keySerializer);// write individual registered keyed state metainfos
   out.writeShort(stateMetaInfoSnapshots.size());for(StateMetaInfoSnapshot metaInfoSnapshot : stateMetaInfoSnapshots){StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(metaInfoSnapshot, out);}}

2.4.4 状态数据分组

状态数据分组的调用在CopyOnWriteStateTableSnapshot.getKeyGroupWriter()方法,是将状态数据按照keyGoupId进行分组,算法比较复杂一点。

//CopyOnWriteStateTableSnapshot类publicStateKeyGroupWritergetKeyGroupWriter(){if(partitionedStateTableSnapshot ==null){finalInternalKeyContext<K> keyContext = owningStateTable.keyContext;finalint numberOfKeyGroups = keyContext.getNumberOfKeyGroups();finalKeyGroupRange keyGroupRange = keyContext.getKeyGroupRange();//状态数据的持久化函数ElementWriterFunction<CopyOnWriteStateTable.StateTableEntry<K,N,S>> elementWriterFunction =(element, dov)->{
            localNamespaceSerializer.serialize(element.namespace, dov);
            localKeySerializer.serialize(element.key, dov);
            localStateSerializer.serialize(element.state, dov);};//默认是StateTableKeyGroupPartitionerStateTableKeyGroupPartitioner<K,N,S> stateTableKeyGroupPartitioner = stateSnapshotTransformer !=null?newTransformingStateTableKeyGroupPartitioner<>(
            snapshotData,
            numberOfEntriesInSnapshotData,
            keyGroupRange,
            numberOfKeyGroups,
            elementWriterFunction,
            stateSnapshotTransformer):newStateTableKeyGroupPartitioner<>(
            snapshotData,
            numberOfEntriesInSnapshotData,
            keyGroupRange,
            numberOfKeyGroups,
            elementWriterFunction);
      partitionedStateTableSnapshot = stateTableKeyGroupPartitioner.partitionByKeyGroup();}return partitionedStateTableSnapshot;}

首先我们看一下StateTableKeyGroupPartitioner类的结构,StateTableKeyGroupPartitioner继承了KeyGroupPartitioner,没有其他新的成员:

publicclassKeyGroupPartitioner<T>{/**
    * The input data for the partitioning. All elements to consider must be densely in the index interval
    * [0, {@link #numberOfElements}[, without null values.
    */@Nonnull//原始的快照状态数据protectedfinalT[] partitioningSource;/**
    * The output array for the partitioning. The size must be {@link #numberOfElements} (or bigger).
    */@Nonnull//最终经过分组后的状态数据protectedfinalT[] partitioningDestination;/** Total number of input elements. */@Nonnegative//状态数据总量protectedfinalint numberOfElements;/** The total number of key-groups in the job. */@Nonnegative//该operator总共的keyGoupId数量protectedfinalint totalKeyGroups;/** The key-group range for the input data, covered in this partitioning. */@Nonnull//该operator的KeyGroupRange,有起始和结束的keyGoupIdprotectedfinalKeyGroupRange keyGroupRange;/**
    * This bookkeeping array is used to count the elements in each key-group. In a second step, it is transformed into
    * a histogram by accumulation.
    */@Nonnull//直方图,用于标记每个keyGoupId的数据在最终状态数据数组中的位置protectedfinalint[] counterHistogram;/**
    * This is a helper array that caches the key-group for each element, so we do not have to compute them twice.
    */@Nonnull//用来标记partitioningSource中每个元素对应的keyGoupId在counterHistogram中的位置protectedfinalint[] elementKeyGroups;/** Cached value of keyGroupRange#firstKeyGroup. */@Nonnegative//每个operator的起始keyGoupId,即keyGroupRange#firstKeyGroupprotectedfinalint firstKeyGroup;/** Function to extract the key from a given element. */@NonnullprotectedfinalKeyExtractorFunction<T> keyExtractorFunction;/** Function to write an element to a {@link DataOutputView}. */@NonnullprotectedfinalElementWriterFunction<T> elementWriterFunction;/** Cached result. */@NullableprotectedStateSnapshot.StateKeyGroupWriter computedResult;

状态数据的分组算法便是依赖于成员中各个数据结构。

我们再来看KeyGroupPartitioner.partitionByKeyGroup()方法,这个方法分为3步:
1、记录状态数据的keyGoupId
2、基于keyGoupId构建直方图,用于数据的分组
3、执行分组

//KeyGroupPartitioner类publicStateSnapshot.StateKeyGroupWriterpartitionByKeyGroup(){if(computedResult ==null){reportAllElementKeyGroups();int outputNumberOfElements =buildHistogramByAccumulatingCounts();executePartitioning(outputNumberOfElements);}return computedResult;}

记录状态数据的keyGoupId
这个操作的效果就是将原始的快照状态数据扁平化,添加到partitioningSource[]中,在elementKeyGroups[]中记录partitioningSource[]每个元素的keyGoupId在counterHistogram[]中的位置。counterHistogram[]中统计出每个keyGoupId有多少个元素

//StateTableKeyGroupPartitioner类protectedvoidreportAllElementKeyGroups(){// In this step we i) 'flatten' the linked list of entries to a second array and ii) report key-groups.int flattenIndex =0;for(CopyOnWriteStateTable.StateTableEntry<K,N,S> entry : partitioningDestination){while(null!= entry){
         flattenIndex =tryAddToSource(flattenIndex, entry);
         entry = entry.next;}}}/** Tries to append next entry to {@code partitioningSource} array snapshot and returns next index.*/inttryAddToSource(int currentIndex,CopyOnWriteStateTable.StateTableEntry<K,N,S> entry){finalint keyGroup =KeyGroupRangeAssignment.assignToKeyGroup(entry.key, totalKeyGroups);reportKeyGroupOfElementAtIndex(currentIndex, keyGroup);
   partitioningSource[currentIndex]= entry;return currentIndex +1;}//KeyGroupPartitioner类protectedvoidreportKeyGroupOfElementAtIndex(int index,int keyGroup){finalint keyGroupIndex = keyGroup - firstKeyGroup;
   elementKeyGroups[index]= keyGroupIndex;++counterHistogram[keyGroupIndex];}

构建直方图
构建counterHistogram[]直方图,构建的结果形如[0,0,1,1,1,3,3,6,8],counterHistogram[]的下标对应的是keyGoupId,数据对应的从firstGroupId开始到该keyGoupId一共有多个元素。

//KeyGroupPartitioner类privateintbuildHistogramByAccumulatingCounts(){int sum =0;for(int i =0; i < counterHistogram.length;++i){int currentSlotValue = counterHistogram[i];
      counterHistogram[i]= sum;
      sum += currentSlotValue;}return sum;}

进行数据分组
依赖于上述构建的直方图counterHistogram[]和elementKeyGroups[]进行分组计算,将数据添加到partitioningDestination[]中,最终partitioningDestination[]的数据就是分组好的数据,数据按照keyGoupId排序好,counterHistogram[]就记录着每个keyGoupId在partitioningDestination[]中存放的offset位置,当做一个索引。

//KeyGroupPartitioner类privatevoidexecutePartitioning(int outputNumberOfElements){// We repartition the entries by their pre-computed key-groups, using the histogram values as write indexesfor(int inIdx =0; inIdx < outputNumberOfElements;++inIdx){int effectiveKgIdx = elementKeyGroups[inIdx];int outIdx = counterHistogram[effectiveKgIdx]++;
      partitioningDestination[outIdx]= partitioningSource[inIdx];}this.computedResult =newPartitioningResult<>(
      elementWriterFunction,
      firstKeyGroup,
      counterHistogram,
      partitioningDestination);}

我们可以看一下PartitioningResult的结构:
partitionedElements:按照keyGoupId分组好的状态数据,就是上述中的partitioningDestination
keyGroupOffsets:记录所有keyGoupId在partitionedElements[]中的的索引位置
firstKeyGroup:该operator的第一个keyGroupId,因为每个operator的起始keyGoupId并不都是0,例如有两个operator实例,共128个keyGoupId,那么第一个operator的起始keyGoupId是0,第二个operator的起始keyGoupId就是64,那么keyGoupId 65就对应了keyGroupOffsets[]的下标1

privatestaticclassPartitioningResult<T>implementsStateSnapshot.StateKeyGroupWriter{/**
    * Function to write one element to a {@link DataOutputView}.
    */@NonnullprivatefinalElementWriterFunction<T> elementWriterFunction;/**
    * The exclusive-end-offsets for all key-groups of the covered range for the partitioning. Exclusive-end-offset
    * for key-group n is under keyGroupOffsets[n - firstKeyGroup].
    */@Nonnullprivatefinalint[] keyGroupOffsets;/**
    * Array with elements that are partitioned w.r.t. the covered key-group range. The start offset for each
    * key-group is in {@link #keyGroupOffsets}.
    */@NonnullprivatefinalT[] partitionedElements;/**
    * The first key-group of the range covered in the partitioning.
    */@Nonnegativeprivatefinalint firstKeyGroup;PartitioningResult(@NonnullElementWriterFunction<T> elementWriterFunction,@Nonnegativeint firstKeyGroup,@Nonnullint[] keyGroupEndOffsets,@NonnullT[] partitionedElements){this.elementWriterFunction = elementWriterFunction;this.firstKeyGroup = firstKeyGroup;this.keyGroupOffsets = keyGroupEndOffsets;this.partitionedElements = partitionedElements;}

2.4.5 写入状态数据

状态数据的写入在partitionedSnapshot.writeStateInKeyGroup(kgCompressionView, keyGroupId),对每个keyGoupId的状态数据依次进行写操作。
可以看到就是通过keyGroupOffsets查找传进来的keyGoupId在partitionedElements[]中的元素位置,再把元素写到OutputStream。

//PartitioningResult类publicvoidwriteStateInKeyGroup(@NonnullDataOutputView dov,int keyGroupId)throwsIOException{int startOffset =getKeyGroupStartOffsetInclusive(keyGroupId);int endOffset =getKeyGroupEndOffsetExclusive(keyGroupId);// write number of mappings in key-group
   dov.writeInt(endOffset - startOffset);// write mappingsfor(int i = startOffset; i < endOffset;++i){
      elementWriterFunction.writeElement(partitionedElements[i], dov);}}privateintgetKeyGroupStartOffsetInclusive(int keyGroup){int idx = keyGroup - firstKeyGroup -1;return idx <0?0: keyGroupOffsets[idx];}@NonnegativeprivateintgetKeyGroupEndOffsetExclusive(int keyGroup){return keyGroupOffsets[keyGroup - firstKeyGroup];}

这里的elementWriterFunction就是如下,持久化了每个状态数据的namespace、key、state

(element, dov)->{
            localNamespaceSerializer.serialize(element.namespace, dov);
            localKeySerializer.serialize(element.key, dov);
            localStateSerializer.serialize(element.state, dov);};

2.4.6 封装StreamStateHandle

将所有的keyGoupId状态数据都写完了,接下来就是封装checkpoint的元数据信息了。
keyGroupRangeOffsets记录了每个keyGoupId的状态数据在存储介质中的存储位置。

//new AsyncSnapshotCallable#callInternal()if(snapshotCloseableRegistry.unregisterCloseable(streamWithResultProvider)){KeyGroupRangeOffsets kgOffs =newKeyGroupRangeOffsets(keyGroupRange, keyGroupRangeOffsets);SnapshotResult<StreamStateHandle> result =
      streamWithResultProvider.closeAndFinalizeCheckpointStreamResult();returnCheckpointStreamWithResultProvider.toKeyedStateHandleSnapshotResult(result, kgOffs);}else{thrownewIOException("Stream already unregistered.");}
//PrimaryStreamOnly类 implements CheckpointStreamWithResultProviderpublicSnapshotResult<StreamStateHandle>closeAndFinalizeCheckpointStreamResult()throwsIOException{returnSnapshotResult.of(outputStream.closeAndGetHandle());}

核心实现在FsCheckpointStateOutputStream.closeAndGetHandle()方法
1、如果总共写入的状态数据大小<阈值(默认1k),那就直接直接封装成ByteStreamStateHandle,状态数据也存放在StreamStateHandle中,以byte[]数组形式存在。
2、如果状态数据超过阈值,就写入到文件里面,调用flush(),不过这个flush并不是唯一的一次flush,在之前写状态数据的过程中,当数据超过了buffer的大小,也会调用flush()刷到磁盘
3、首次调用flush()方法会将完整的checkpoint文件路径创建出来,形如

      c 
     
    
      h 
     
    
      e 
     
    
      c 
     
    
      k 
     
    
      p 
     
    
      o 
     
    
      i 
     
    
      n 
     
     
     
       t 
      
     
       d 
      
     
    
      i 
     
    
      r 
     
    
   
     / 
    
   
  
    {checkpoint_dir}/ 
   
  
checkpointd​ir/{uid}/chk- 
 
  
   
    
    
      c 
     
    
      h 
     
    
      e 
     
    
      c 
     
    
      k 
     
    
      p 
     
    
      o 
     
    
      i 
     
    
      n 
     
    
      t 
     
    
      i 
     
    
      d 
     
    
   
     / 
    
   
  
    {checkpointid}/ 
   
  
checkpointid/{uuid},然后将buffer的数据写到文件中。

4、将checkpoint到磁盘的状态数据信息封装成FileStateHandle,包含了状态数据的文件和大小信息

//FsCheckpointStateOutputStream类publicStreamStateHandlecloseAndGetHandle()throwsIOException{// check if there was nothing ever writtenif(outStream ==null&& pos ==0){returnnull;}synchronized(this){if(!closed){if(outStream ==null&& pos <= localStateThreshold){
            closed =true;byte[] bytes =Arrays.copyOf(writeBuffer, pos);
            pos = writeBuffer.length;returnnewByteStreamStateHandle(createStatePath().toString(), bytes);}else{try{flush();

               pos = writeBuffer.length;long size =-1L;// make a best effort attempt to figure out the sizetry{
                  size = outStream.getPos();}catch(Exception ignored){}

               outStream.close();returnnewFileStateHandle(statePath, size);}catch(Exception exception){...}finally{
               closed =true;}}}else{thrownewIOException("Stream has already been closed and discarded.");}}}publicvoidflush()throwsIOException{if(!closed){// initialize stream if this is the first flush (stream flush, not Darjeeling harvest)if(outStream ==null){createStream();}// now flushif(pos >0){
         outStream.write(writeBuffer,0, pos);
         pos =0;}}else{thrownewIOException("closed");}}privatevoidcreateStream()throwsIOException{Exception latestException =null;for(int attempt =0; attempt <10; attempt++){try{OutputStreamAndPath streamAndPath =EntropyInjector.createEntropyAware(
               fs,createStatePath(),WriteMode.NO_OVERWRITE);this.outStream = streamAndPath.stream();this.statePath = streamAndPath.path();return;}catch(Exception e){
         latestException = e;}}thrownewIOException("Could not open output stream for state backend", latestException);}privatePathcreateStatePath(){returnnewPath(basePath,UUID.randomUUID().toString());}

返回的StreamStateHandle会被进一步封装成KeyedStateHandle,因为KeyedStateHandle中只记录了状态数据的文件路径,还需要keyGroupRangeOffsets记录每个keyGoupId的数据在文件中的存储位置。KeyedStateHandle就是既包含了StreamStateHandle,也包含了keyGroupRangeOffsets

staticSnapshotResult<KeyedStateHandle>toKeyedStateHandleSnapshotResult(@NonnullSnapshotResult<StreamStateHandle> snapshotResult,@NonnullKeyGroupRangeOffsets keyGroupRangeOffsets){StreamStateHandle jobManagerOwnedSnapshot = snapshotResult.getJobManagerOwnedSnapshot();if(jobManagerOwnedSnapshot !=null){KeyedStateHandle jmKeyedState =newKeyGroupsStateHandle(keyGroupRangeOffsets, jobManagerOwnedSnapshot);StreamStateHandle taskLocalSnapshot = snapshotResult.getTaskLocalSnapshot();if(taskLocalSnapshot !=null){KeyedStateHandle localKeyedState =newKeyGroupsStateHandle(keyGroupRangeOffsets, taskLocalSnapshot);returnSnapshotResult.withLocalState(jmKeyedState, localKeyedState);}else{returnSnapshotResult.of(jmKeyedState);}}else{returnSnapshotResult.empty();}}

到此,KeyedState的checkpoint过程就完成了。

2.4.7 OperatorState快照过程

OperatorState的快照过程和KeyedState的快照过程大体类似,但是比KeyedState要简单一些,没有状态数据的分组操作,OperatorState只有一种PartitionableListState类型,大致实现如下:
1、进行OperatorState和BroadcastState的状态数据拷贝,这个过程是同步的。BroadcastState我们这先暂时不进行讨论,而且它的快照过程和OperatorState也基本上是一模一样的,所以我们只看OperatorState就行了
2、创建AsyncSnapshotCallable,checkpoint行为在callInternal()方法中,实现如下:
1)、创建CheckpointStateOutputStream,用于写状态数据
2)、写状态数据的元数据信息,这一步和KeyedState一样,包括状态数据的名称,序列化信息等
3)、将状态数据写入到CheckpointStateOutputStream中
4)、返回写入的元数据信息,封装成OperatorStateHandle
上述2的步骤在默认情况下是异步执行的,也可以通过参数设置为同步执行,但是这会影响到正常数据的处理,KeyedState也可以通过参数设置来进行同步的快照处理

//DefaultOperatorStateBackendSnapshotStrategy类@OverridepublicRunnableFuture<SnapshotResult<OperatorStateHandle>>snapshot(finallong checkpointId,finallong timestamp,@NonnullfinalCheckpointStreamFactory streamFactory,@NonnullfinalCheckpointOptions checkpointOptions)throwsIOException{if(registeredOperatorStates.isEmpty()&& registeredBroadcastStates.isEmpty()){returnDoneFuture.of(SnapshotResult.empty());}finalMap<String,PartitionableListState<?>> registeredOperatorStatesDeepCopies =newHashMap<>(registeredOperatorStates.size());finalMap<String,BackendWritableBroadcastState<?,?>> registeredBroadcastStatesDeepCopies =newHashMap<>(registeredBroadcastStates.size());ClassLoader snapshotClassLoader =Thread.currentThread().getContextClassLoader();Thread.currentThread().setContextClassLoader(userClassLoader);try{// eagerly create deep copies of the list and the broadcast states (if any)// in the synchronous phase, so that we can use them in the async writing.//进行状态数据的拷贝if(!registeredOperatorStates.isEmpty()){for(Map.Entry<String,PartitionableListState<?>> entry : registeredOperatorStates.entrySet()){PartitionableListState<?> listState = entry.getValue();if(null!= listState){
               listState = listState.deepCopy();}
            registeredOperatorStatesDeepCopies.put(entry.getKey(), listState);}}if(!registeredBroadcastStates.isEmpty()){for(Map.Entry<String,BackendWritableBroadcastState<?,?>> entry : registeredBroadcastStates.entrySet()){BackendWritableBroadcastState<?,?> broadcastState = entry.getValue();if(null!= broadcastState){
               broadcastState = broadcastState.deepCopy();}
            registeredBroadcastStatesDeepCopies.put(entry.getKey(), broadcastState);}}}finally{Thread.currentThread().setContextClassLoader(snapshotClassLoader);}AsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>> snapshotCallable =newAsyncSnapshotCallable<SnapshotResult<OperatorStateHandle>>(){@OverrideprotectedSnapshotResult<OperatorStateHandle>callInternal()throwsException{//创建CheckpointStateOutputStream CheckpointStreamFactory.CheckpointStateOutputStream localOut =
               streamFactory.createCheckpointStateOutputStream(CheckpointedStateScope.EXCLUSIVE);
            snapshotCloseableRegistry.registerCloseable(localOut);// get the registered operator state infos ...List<StateMetaInfoSnapshot> operatorMetaInfoSnapshots =newArrayList<>(registeredOperatorStatesDeepCopies.size());for(Map.Entry<String,PartitionableListState<?>> entry :
               registeredOperatorStatesDeepCopies.entrySet()){
               operatorMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());}// ... get the registered broadcast operator state infos ...List<StateMetaInfoSnapshot> broadcastMetaInfoSnapshots =newArrayList<>(registeredBroadcastStatesDeepCopies.size());for(Map.Entry<String,BackendWritableBroadcastState<?,?>> entry :
               registeredBroadcastStatesDeepCopies.entrySet()){
               broadcastMetaInfoSnapshots.add(entry.getValue().getStateMetaInfo().snapshot());}// ... write them all in the checkpoint stream ...DataOutputView dov =newDataOutputViewStreamWrapper(localOut);OperatorBackendSerializationProxy backendSerializationProxy =newOperatorBackendSerializationProxy(operatorMetaInfoSnapshots, broadcastMetaInfoSnapshots);//状态数据元数据信息写入到checkpoint
            backendSerializationProxy.write(dov);// ... and then go for the states ...// we put BOTH normal and broadcast state metadata hereint initialMapCapacity =
               registeredOperatorStatesDeepCopies.size()+ registeredBroadcastStatesDeepCopies.size();finalMap<String,OperatorStateHandle.StateMetaInfo> writtenStatesMetaData =newHashMap<>(initialMapCapacity);//状态数据写入   for(Map.Entry<String,PartitionableListState<?>> entry :
               registeredOperatorStatesDeepCopies.entrySet()){PartitionableListState<?> value = entry.getValue();long[] partitionOffsets = value.write(localOut);OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
               writtenStatesMetaData.put(
                  entry.getKey(),newOperatorStateHandle.StateMetaInfo(partitionOffsets, mode));}// ... and the broadcast states themselves ...for(Map.Entry<String,BackendWritableBroadcastState<?,?>> entry :
               registeredBroadcastStatesDeepCopies.entrySet()){BackendWritableBroadcastState<?,?> value = entry.getValue();long[] partitionOffsets ={value.write(localOut)};OperatorStateHandle.Mode mode = value.getStateMetaInfo().getAssignmentMode();
               writtenStatesMetaData.put(
                  entry.getKey(),newOperatorStateHandle.StateMetaInfo(partitionOffsets, mode));}// ... and, finally, create the state handle.OperatorStateHandle retValue =null;//封装checkpoint元数据if(snapshotCloseableRegistry.unregisterCloseable(localOut)){StreamStateHandle stateHandle = localOut.closeAndGetHandle();if(stateHandle !=null){
                  retValue =newOperatorStreamStateHandle(writtenStatesMetaData, stateHandle);}returnSnapshotResult.of(retValue);}else{thrownewIOException("Stream was already unregistered.");}}@OverrideprotectedvoidcleanupProvidedResources(){// nothing to do}@OverrideprotectedvoidlogAsyncSnapshotComplete(long startTime){if(asynchronousSnapshots){logAsyncCompleted(streamFactory, startTime);}}};finalFutureTask<SnapshotResult<OperatorStateHandle>> task =
      snapshotCallable.toAsyncSnapshotFutureTask(closeStreamOnCancelRegistry);//如果是同步checkpoint,则立即执行checkpoint过程if(!asynchronousSnapshots){
      task.run();}return task;}

状态数据拷贝
状态数据的拷贝调用在PartitionableListState.deepCopy(),这个方法会创建一个新的PartitionableListState,然后将老的PartitionableListState里的List列表里的对象重新拷贝一份,不同于KeyedState的是,这里拷贝的对象并不一定就是引用,要分List里的对象是否是可变对象,如果不可变,那么就直接拷贝其引用;如果是可变对象,那么就要复制一个新对象,这将会消耗更多的内存,执行效率也会更慢。

//PartitionableListState类publicPartitionableListState<S>deepCopy(){returnnewPartitionableListState<>(this);}privatePartitionableListState(PartitionableListState<S> toCopy){this(toCo
   py.stateMetaInfo.deepCopy(), toCopy.internalListCopySerializer.copy(toCopy.internalList));}//ArrayListSerializer类publicArrayList<T>copy(ArrayList<T> from){if(elementSerializer.isImmutableType()){// fast track using memcopy for immutable typesreturnnewArrayList<>(from);}else{// element-wise deep copy for mutable typesArrayList<T> newList =newArrayList<>(from.size());for(int i =0; i < from.size(); i++){
         newList.add(elementSerializer.copy(from.get(i)));}return newList;}}

创建CheckpointStateOutputStream
这里和KeyedState一样,也是创建的FsCheckpointStateOutputStream

//FsCheckpointStreamFactory类publicFsCheckpointStateOutputStreamcreateCheckpointStateOutputStream(CheckpointedStateScope scope)throwsIOException{Path target = scope ==CheckpointedStateScope.EXCLUSIVE? checkpointDirectory : sharedStateDirectory;int bufferSize =Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);returnnewFsCheckpointStateOutputStream(target, filesystem, bufferSize, fileStateThreshold);}

写入状态数据元数据信息
这个过程和KeyedState类似,包括状态数据的名称,序列化信息等

//OperatorBackendSerializationProxy类publicvoidwrite(DataOutputView out)throwsIOException{super.write(out);writeStateMetaInfoSnapshots(operatorStateMetaInfoSnapshots, out);writeStateMetaInfoSnapshots(broadcastStateMetaInfoSnapshots, out);}privatevoidwriteStateMetaInfoSnapshots(List<StateMetaInfoSnapshot> snapshots,DataOutputView out)throwsIOException{
   out.writeShort(snapshots.size());for(StateMetaInfoSnapshot state : snapshots){StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(state, out);}}

写入状态数据
状态数据的写入在PartitionableListState.write()方法中,实现就是将PartitionableListState中internalList列表中的元素依次写入到OutputStream中,根据internalList中元素类型不同将采用不同的序列化工具。写入时也是先写到buffer,再进行flush()。partitionOffsets[]记录了每个元素写入到流中的位置,并返回partitionOffsets[]

publiclong[]write(FSDataOutputStream out)throwsIOException{long[] partitionOffsets =newlong[internalList.size()];DataOutputView dov =newDataOutputViewStreamWrapper(out);for(int i =0; i < internalList.size();++i){S element = internalList.get(i);
      partitionOffsets[i]= out.getPos();getStateMetaInfo().getPartitionStateSerializer().serialize(element, dov);}return partitionOffsets;}
标签: flink 分布式

本文转载自: https://blog.csdn.net/mn_kw/article/details/142169025
版权归原作者 mn_kw 所有, 如有侵权,请联系我们删除。

“一文弄懂FLink状态及checkpoint源码”的评论:

还没有评论