0


Flink-Checkpoint源码详解

Flink chcekpoint作为flink中最重要的部分,是flink精准一次性的重要保证,可以这么说flink之所以这么成功和她的checkpoint机制是离不开的。

之前大概学习了一下flink的checkpoint源码,但是还是有点晕乎乎的,甚至有点不理解我们作业中设置的checkpoint配置flink是如何读取到的,并且他是如何往下传播的。

1.代码中的checkpoint配置

这次我详细屡了一下,方便我们更好理解checkpoint,下面我们先看代码中我们一般是如何配置flink checkpoint的:

 // TODO 1. 环境准备
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // TODO 2. 状态后端设置
        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(30 * 1000L);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(3000L);
        env.getCheckpointConfig().enableExternalizedCheckpoints(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION
        );
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, Time.days(1), Time.minutes(1)
        ));
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage(
                "hdfs://hadoop102:8020/ck"
        );
       
        中间的业务代码省略

        // TODO 3. 启动任务
        env.execute();

在这里我们可以看到checkpoint的相关配置是调用streamExecutionEnvironment的各种方法配置,那我们代码中的checkpoint配置env肯定是拿到了

点进enableCheckpointing()方法中可以看到我们配置的checkpint相关参数被streamExecutionEnvironment的一个成员变量checkpointCfg获取到了

所以下面我们只需要盯着这个checkpointCfg是怎么传到下面的就可以了,下面我们从作业的起点env.execute()方法往下走,下一步flink就要把我们的作业转换成streamGraph了

这个方法中调用了getStreamGraph方法,生成作业对应的streamGraph

2.StreamGraph中的checkpoint配置

上一步我们看到了env.execute(jobName)方法,我们看到了他的方法中调用了getStreamGraph方法,这个方法就是生成StreamGraph的,从下面这个图中我们可以看到首先它利用了getStreamGraphGenerator生成StreamGraphGenerator,然后根据generator生成streamGraph

 private StreamGraphGenerator getStreamGraphGenerator() {
        if (transformations.size() <= 0) {
            throw new IllegalStateException(
                    "No operators defined in streaming topology. Cannot execute.");
        }

        final RuntimeExecutionMode executionMode = configuration.get(ExecutionOptions.RUNTIME_MODE);

        return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
                .setRuntimeExecutionMode(executionMode) //设置执行模式,流式或者批处理
                .setStateBackend(defaultStateBackend)  //设置状态后端
                .setSavepointDir(defaultSavepointDirectory)  //设置checkpoint文件夹
                .setChaining(isChainingEnabled) //是否开启chaining
                .setUserArtifacts(cacheFile)  //设置用户jar包
                .setTimeCharacteristic(timeCharacteristic)  //设置时间语义
                .setDefaultBufferTimeout(bufferTimeout);  //设置buffer超时时间
    }

从这个方法体中我们可以看到checkpointCfg被当做参数传给了streamGraphGenerator,并且他还配置了checkpoint和状态后端的相关参数

public StreamGraph generate() {
        //checkpointCfg和savepoint的配置被当做参数传给streamGraph
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);
        shouldExecuteInBatchMode = shouldExecuteInBatchMode(runtimeExecutionMode);
        configureStreamGraph(streamGraph);

        alreadyTransformed = new HashMap<>();

        //转换算子,将env的成员变量transformations中的算子遍历出来,塞到streamGraph中
        for (Transformation<?> transformation : transformations) {
            transform(transformation);
        }

        for (StreamNode node : streamGraph.getStreamNodes()) {
            if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
                for (StreamEdge edge : node.getInEdges()) {
                    edge.setSupportsUnalignedCheckpoints(false);
                }
            }
        }

        final StreamGraph builtStreamGraph = streamGraph;

        alreadyTransformed.clear();
        alreadyTransformed = null;
        streamGraph = null;

        return builtStreamGraph;
    }

在streamGraphGenerator的generate()方法中我们可以看到checkpointCfg作为参数传给了streamGraph,这样streamGraph就拿到了我们的checkpoint配置了

3.JobGraph的checkpoint配置

下面我们先返回streamExecutionEnvironment的execute方法,下面我们要看的是在Jobgraph生产过程中,checkpoint配置是如何传递的,从env.execute()方法经过一系列的方法跳转,最终我们可以看到生成Jobgraph的方法是streamGraph.getJobGraph()

这个方法中调用 StreamingJobGraphGenerator.createJobGraph方法

这个方法中首先创建了一个StreamingJobGraphGenerator.,然后调用其createJobGraph方法

然后我们详细看一下createJobGraph方法

private JobGraph createJobGraph() {
        preValidate();  //预检查checkpoint是否开始等属性
        jobGraph.setJobType(streamGraph.getJobType());

        jobGraph.enableApproximateLocalRecovery(
                streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());

        // Generate deterministic hashes for the nodes in order to identify them across
        // submission iff they didn't change.
        Map<Integer, byte[]> hashes =
                defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);

        // Generate legacy version hashes for backwards compatibility
        List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
        for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
            legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
        }

        setChaining(hashes, legacyHashes);

        setPhysicalEdges();

        setSlotSharingAndCoLocation();

        setManagedMemoryFraction(
                Collections.unmodifiableMap(jobVertices),
                Collections.unmodifiableMap(vertexConfigs),
                Collections.unmodifiableMap(chainedConfigs),
                id -> streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
                id -> streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
        
        //配置checkpoint
        configureCheckpointing();

        //设置savepoint
        jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());

        final Map<String, DistributedCache.DistributedCacheEntry> distributedCacheEntries =
                JobGraphUtils.prepareUserArtifactEntries(
                        streamGraph.getUserArtifacts().stream()
                                .collect(Collectors.toMap(e -> e.f0, e -> e.f1)),
                        jobGraph.getJobID());

        for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry :
                distributedCacheEntries.entrySet()) {
            jobGraph.addUserArtifact(entry.getKey(), entry.getValue());
        }

        // set the ExecutionConfig last when it has been finalized
        try {
            jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
        } catch (IOException e) {
            throw new IllegalConfigurationException(
                    "Could not serialize the ExecutionConfig."
                            + "This indicates that non-serializable types (like custom serializers) were registered");
        }

        return jobGraph;
    }

在preValidate方法中会检验是否开始了checkpoint,是否支持unalign checkpoint等,在configureCheckpointing()方法中才真正将streamGraph的checkpoint相关配置传给jobGraph

private void configureCheckpointing() {
        CheckpointConfig cfg = streamGraph.getCheckpointConfig();

        long interval = cfg.getCheckpointInterval();
        if (interval < MINIMAL_CHECKPOINT_TIME) {
            // interval of max value means disable periodic checkpoint
            interval = Long.MAX_VALUE;
        }

        //  --- configure options ---

        CheckpointRetentionPolicy retentionAfterTermination;
        if (cfg.isExternalizedCheckpointsEnabled()) {
            CheckpointConfig.ExternalizedCheckpointCleanup cleanup =
                    cfg.getExternalizedCheckpointCleanup();
            // Sanity check
            if (cleanup == null) {
                throw new IllegalStateException(
                        "Externalized checkpoints enabled, but no cleanup mode configured.");
            }
            retentionAfterTermination =
                    cleanup.deleteOnCancellation()
                            ? CheckpointRetentionPolicy.RETAIN_ON_FAILURE
                            : CheckpointRetentionPolicy.RETAIN_ON_CANCELLATION;
        } else {
            retentionAfterTermination = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
        }

        //  --- configure the master-side checkpoint hooks ---

        final ArrayList<MasterTriggerRestoreHook.Factory> hooks = new ArrayList<>();

        for (StreamNode node : streamGraph.getStreamNodes()) {
            if (node.getOperatorFactory() instanceof UdfStreamOperatorFactory) {
                Function f =
                        ((UdfStreamOperatorFactory) node.getOperatorFactory()).getUserFunction();

                if (f instanceof WithMasterCheckpointHook) {
                    hooks.add(
                            new FunctionMasterCheckpointHookFactory(
                                    (WithMasterCheckpointHook<?>) f));
                }
            }
        }

        // because the hooks can have user-defined code, they need to be stored as
        // eagerly serialized values
        final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks;
        if (hooks.isEmpty()) {
            serializedHooks = null;
        } else {
            try {
                MasterTriggerRestoreHook.Factory[] asArray =
                        hooks.toArray(new MasterTriggerRestoreHook.Factory[hooks.size()]);
                serializedHooks = new SerializedValue<>(asArray);
            } catch (IOException e) {
                throw new FlinkRuntimeException("Trigger/restore hook is not serializable", e);
            }
        }

        // because the state backend can have user-defined code, it needs to be stored as
        // eagerly serialized value
        final SerializedValue<StateBackend> serializedStateBackend;
        if (streamGraph.getStateBackend() == null) {
            serializedStateBackend = null;
        } else {
            try {
                serializedStateBackend =
                        new SerializedValue<StateBackend>(streamGraph.getStateBackend());
            } catch (IOException e) {
                throw new FlinkRuntimeException("State backend is not serializable", e);
            }
        }

        // because the checkpoint storage can have user-defined code, it needs to be stored as
        // eagerly serialized value
        final SerializedValue<CheckpointStorage> serializedCheckpointStorage;
        if (streamGraph.getCheckpointStorage() == null) {
            serializedCheckpointStorage = null;
        } else {
            try {
                serializedCheckpointStorage =
                        new SerializedValue<>(streamGraph.getCheckpointStorage());
            } catch (IOException e) {
                throw new FlinkRuntimeException("Checkpoint storage is not serializable", e);
            }
        }

        //  --- done, put it all together ---
        创建一个JobCheckpointingSettings,用来封装checkpoint的相关配置
        JobCheckpointingSettings settings =
                new JobCheckpointingSettings(
                        CheckpointCoordinatorConfiguration.builder()
                                .setCheckpointInterval(interval)
                                .setCheckpointTimeout(cfg.getCheckpointTimeout())
                                .setMinPauseBetweenCheckpoints(cfg.getMinPauseBetweenCheckpoints())
                                .setMaxConcurrentCheckpoints(cfg.getMaxConcurrentCheckpoints())
                                .setCheckpointRetentionPolicy(retentionAfterTermination)
                                .setExactlyOnce(
                                        getCheckpointingMode(cfg) == CheckpointingMode.EXACTLY_ONCE)
                                .setPreferCheckpointForRecovery(cfg.isPreferCheckpointForRecovery())
                                .setTolerableCheckpointFailureNumber(
                                        cfg.getTolerableCheckpointFailureNumber())
                                .setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled())
                                .setAlignmentTimeout(cfg.getAlignmentTimeout().toMillis())
                                .build(),
                        serializedStateBackend,
                        serializedCheckpointStorage,
                        serializedHooks);
        //将JobCheckpointingSettings传给JobGraph
        jobGraph.setSnapshotSettings(settings);
    }

在这个方法中会获取streamGraph的checkpointCfg,创建一个JobcheckpointSettings,并将streamGraph相关的checkpoint配置取出来放进JobcheckpointSettings,JobGraph中负责保存checkpoint相关配置的成员变量是snapshotSettings。

上述两个过程均是在客户端完成的,如果是yarn集群环境,JobGraph生成后,client会创建一个yarn-client用于提交作业,作业提交后,WebmonitorEndpoint的JobsubmitHandler会调用自己的handleRequest方法用于处理提交的作业。

4.ExecutionGraph的checkpoint配置

ExecutionGraph是在jobmaster启动的时候,将jobGraph转换成ExecutionGraph,由于jobmaster启动的过程比较繁琐,所以这边我们分析详细一点,从集群接收到客户端提交的jobGraph开始分析

1.handleRequest

这个方法中主要是获取client那边传过来的JobGraph,jar包和相关的依赖,然后调用dispatcherGateway.submitJob方法提交JobGraph

@Override
    protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
        final Collection<File> uploadedFiles = request.getUploadedFiles();
        final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
            File::getName,
            Path::fromLocalFile
        ));

        if (uploadedFiles.size() != nameToFile.size()) {
            throw new RestHandlerException(
                String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
                    uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
                    nameToFile.size(),
                    uploadedFiles.size()),
                HttpResponseStatus.BAD_REQUEST
            );
        }

        final JobSubmitRequestBody requestBody = request.getRequestBody();

        if (requestBody.jobGraphFileName == null) {
            throw new RestHandlerException(
                String.format("The %s field must not be omitted or be null.",
                    JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
                HttpResponseStatus.BAD_REQUEST);
        }
        
        //加载Jobgraph
        CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);

        //获取jar包
        Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
        //获取jar包相关的依赖
        Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);

        CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
        
        //调用dispatcherGateway的submitJob方法
        CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));

        return jobSubmissionFuture.thenCombine(jobGraphFuture,
            (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
    }

2.submitJob

dispatcherGateway.submitJob方法是一个抽象方法,他有两个实现Dispatcher和MiniDispatcher,miniDispatcher适用于本地调试调用,这里我们选择Dispatcher的submitJob方法,这个方法里对Jobgraph做了一些判断,jobID是否重复,是否是partialResource等,然后调用其internalSubmitJob方法

@Override
    public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
        log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());

        try {
            if (isDuplicateJob(jobGraph.getJobID())) {
                return FutureUtils.completedExceptionally(
                    new DuplicateJobSubmissionException(jobGraph.getJobID()));
            } else if (isPartialResourceConfigured(jobGraph)) {
                return FutureUtils.completedExceptionally(
                    new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +
                            "resources configured. The limitation will be removed in future versions."));
            } else {
                return internalSubmitJob(jobGraph);
            }
        } catch (FlinkException e) {
            return FutureUtils.completedExceptionally(e);
        }
    }

3.internalSubmitJob

这个方法中调用persistAndRunJob对Job进行持久化并运行,然后处理Job运行的结果

private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
        log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());

        final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(jobGraph.getJobID(), jobGraph,
            this::persistAndRunJob)
            .thenApply(ignored -> Acknowledge.get());

        return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
            if (throwable != null) {
                cleanUpJobData(jobGraph.getJobID(), true);

                ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);
                final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
                log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
                throw new CompletionException(
                    new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
            } else {
                return acknowledge;
            }
        }, ioExecutor);
    }

4.persistAndRunJob

这里的jobGraphWriter就是用来持久化jobGraph的,然后调用其runJob方法

private void persistAndRunJob(JobGraph jobGraph) throws Exception {
        jobGraphWriter.putJobGraph(jobGraph);
        runJob(jobGraph, ExecutionType.SUBMISSION);
    }

5.runJob

这个方法中主要做了以下几件事:

    1.调用createJobManagerRunner方法,创建并启动jobmaster

    2.处理Job的运行结果

    3.Job执行失败后,清理Job的执行状态文件,移除Job
private void runJob(JobGraph jobGraph, ExecutionType executionType) {
        Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
        long initializationTimestamp = System.currentTimeMillis();
        /*TODO 创建并启动JobMaster*/
        CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);

        DispatcherJob dispatcherJob = DispatcherJob.createFor(
                jobManagerRunnerFuture,
                jobGraph.getJobID(),
                jobGraph.getName(),
                initializationTimestamp);
        runningJobs.put(jobGraph.getJobID(), dispatcherJob);

        final JobID jobId = jobGraph.getJobID();
        //处理Job的运行结果
        final CompletableFuture<CleanupJobState> cleanupJobStateFuture = dispatcherJob.getResultFuture().handleAsync(
            (dispatcherJobResult, throwable) -> {
                Preconditions.checkState(runningJobs.get(jobId) == dispatcherJob, "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");

                if (dispatcherJobResult != null) {
                    return handleDispatcherJobResult(jobId, dispatcherJobResult, executionType);
                } else {
                    return dispatcherJobFailed(jobId, throwable);
                }
            }, getMainThreadExecutor());
        //Job执行失败后,清理Job的执行状态文件,然后移除Job
        final CompletableFuture<Void> jobTerminationFuture = cleanupJobStateFuture
            .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
            .thenCompose(Function.identity());

        FutureUtils.assertNoException(jobTerminationFuture);
        registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
    }

6.createJobManagerRunner

这个方法中调用了JobManagerFactory.createJobManagerRunner()创建jobmaster,然后调用start方法启动Jobmaster

CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {
        final RpcService rpcService = getRpcService();
        return CompletableFuture.supplyAsync(
            () -> {
                try {
                    /*TODO 创建JobMaster */
                    JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(
                        jobGraph,
                        configuration,
                        rpcService,
                        highAvailabilityServices,
                        heartbeatServices,
                        jobManagerSharedServices,
                        new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                        fatalErrorHandler,
                        initializationTimestamp);
                    /*TODO 启动JobMaster*/
                    runner.start();
                    return runner;
                } catch (Exception e) {
                    throw new CompletionException(new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));
                }
            },
            ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation
    }

7.ExecutionGraph的生成

jobmaster的启动还是比较麻烦的,中间有比较多的过程,这里我们省略中间一些不重要的过程

1.JobManagerFactory # createJobManagerRunner

2.DefaultJobManagerRunnerFactory # createJobManagerRunner

3.JobManagerRuunerImpl # JobManagerRuunerImpl

4.DefaultJobMasterServiceFactory # createJobMasterService

5.JobMaster # JobMaster ==> createScheduler

6.DefaultSchedulerFactory # createInstance

7.SchedulerBase # createAndRestoreExecutionGraph

在该方法中调用了createExecutionGraph创建executionGraph,并且还创建检查点协调器,负责该job检查点相关的工作

private ExecutionGraph createAndRestoreExecutionGraph(
        JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
        ShuffleMaster<?> shuffleMaster,
        JobMasterPartitionTracker partitionTracker,
        ExecutionDeploymentTracker executionDeploymentTracker,
        long initializationTimestamp) throws Exception {

        ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker, executionDeploymentTracker, initializationTimestamp);

        final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator();

        if (checkpointCoordinator != null) {
            // check whether we find a valid checkpoint
            if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
                    new HashSet<>(newExecutionGraph.getAllVertices().values()))) {

                // check whether we can restore from a savepoint
                tryRestoreExecutionGraphFromSavepoint(newExecutionGraph, jobGraph.getSavepointRestoreSettings());
            }
        }

        return newExecutionGraph;
    }

8.ExecutionGraph中checkpoint的配置

schedulerBase#createExecutionGraph: 内部调用了ExecutionGraphBuilder#buildGraph方法,

这个方法中做了很多checkpoint相关的事,比如:

    1.获取jobgraph的checkpoint配置

    2.创建checkpointIDCounter

    3.获取状态后端

    4.获取用户自定义的checkpoint钩子

    5.将checkpoint的各种参数传入executionGraph
public static ExecutionGraph buildGraph(
        @Nullable ExecutionGraph prior,
        JobGraph jobGraph,
        Configuration jobManagerConfig,
        ScheduledExecutorService futureExecutor,
        Executor ioExecutor,
        SlotProvider slotProvider,
        ClassLoader classLoader,
        CheckpointRecoveryFactory recoveryFactory,
        Time rpcTimeout,
        RestartStrategy restartStrategy,
        MetricGroup metrics,
        BlobWriter blobWriter,
        Time allocationTimeout,
        Logger log,
        ShuffleMaster<?> shuffleMaster,
        JobMasterPartitionTracker partitionTracker,
        FailoverStrategy.Factory failoverStrategyFactory,
        ExecutionDeploymentListener executionDeploymentListener,
        ExecutionStateUpdateListener executionStateUpdateListener,
        long initializationTimestamp) throws JobExecutionException, JobException {

        。。。。。。这部分省略不看。。。。。

        // configure the state checkpointing
        //获取jobGraph的checkpoint设置
        JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
        if (snapshotSettings != null) {
            List<ExecutionJobVertex> triggerVertices =
                    idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);

            List<ExecutionJobVertex> ackVertices =
                    idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);

            List<ExecutionJobVertex> confirmVertices =
                    idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);

            CompletedCheckpointStore completedCheckpoints;
            CheckpointIDCounter checkpointIdCounter;
            try {
                int maxNumberOfCheckpointsToRetain = jobManagerConfig.getInteger(
                        CheckpointingOptions.MAX_RETAINED_CHECKPOINTS);

                if (maxNumberOfCheckpointsToRetain <= 0) {
                    // warning and use 1 as the default value if the setting in
                    // state.checkpoints.max-retained-checkpoints is not greater than 0.
                    log.warn("The setting for '{} : {}' is invalid. Using default value of {}",
                            CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key(),
                            maxNumberOfCheckpointsToRetain,
                            CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue());

                    maxNumberOfCheckpointsToRetain = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue();
                }

                completedCheckpoints = recoveryFactory.createCheckpointStore(jobId, maxNumberOfCheckpointsToRetain, classLoader);
                checkpointIdCounter = recoveryFactory.createCheckpointIDCounter(jobId);
            }
            catch (Exception e) {
                throw new JobExecutionException(jobId, "Failed to initialize high-availability checkpoint handler", e);
            }

            // Maximum number of remembered checkpoints
            int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);

            CheckpointStatsTracker checkpointStatsTracker = new CheckpointStatsTracker(
                    historySize,
                    ackVertices,
                    snapshotSettings.getCheckpointCoordinatorConfiguration(),
                    metrics);

            // load the state backend from the application settings
            final StateBackend applicationConfiguredBackend;
            final SerializedValue<StateBackend> serializedAppConfigured = snapshotSettings.getDefaultStateBackend();

            if (serializedAppConfigured == null) {
                applicationConfiguredBackend = null;
            }
            else {
                try {
                    applicationConfiguredBackend = serializedAppConfigured.deserializeValue(classLoader);
                } catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(jobId,
                            "Could not deserialize application-defined state backend.", e);
                }
            }
            //获取状态后端

            final StateBackend rootBackend;
            try {
                rootBackend = StateBackendLoader.fromApplicationOrConfigOrDefault(
                        applicationConfiguredBackend, jobManagerConfig, classLoader, log);
            }
            catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
                throw new JobExecutionException(jobId, "Could not instantiate configured state backend", e);
            }

            // instantiate the user-defined checkpoint hooks
            //初始化用户自定义的checkpoint 钩子

            final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks = snapshotSettings.getMasterHooks();
            final List<MasterTriggerRestoreHook<?>> hooks;

            if (serializedHooks == null) {
                hooks = Collections.emptyList();
            }
            else {
                final MasterTriggerRestoreHook.Factory[] hookFactories;
                try {
                    hookFactories = serializedHooks.deserializeValue(classLoader);
                }
                catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(jobId, "Could not instantiate user-defined checkpoint hooks", e);
                }

                final Thread thread = Thread.currentThread();
                final ClassLoader originalClassLoader = thread.getContextClassLoader();
                thread.setContextClassLoader(classLoader);

                try {
                    hooks = new ArrayList<>(hookFactories.length);
                    for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
                        hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
                    }
                }
                finally {
                    thread.setContextClassLoader(originalClassLoader);
                }
            }

            final CheckpointCoordinatorConfiguration chkConfig = snapshotSettings.getCheckpointCoordinatorConfiguration();

            //将checkpoint的配置传入executionGraph
            executionGraph.enableCheckpointing(
                chkConfig,
                triggerVertices,
                ackVertices,
                confirmVertices,
                hooks,
                checkpointIdCounter,
                completedCheckpoints,
                rootBackend,
                checkpointStatsTracker);
        }

        // create all the metrics for the Execution Graph

        metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
        metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
        metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));

        executionGraph.getFailoverStrategy().registerMetrics(metrics);

        return executionGraph;
    }

至此checkpoint的相关配置就传入了ExecutionGraph,并且还创建了checkpoint检查点来负责检查点相关的工作,后续就是jobmaster启动后会开启调度,然后给task分配资源,开始task的调度,在作业中实际的创建checkpoint。

标签: flink 大数据

本文转载自: https://blog.csdn.net/bigdatakenan/article/details/134783435
版权归原作者 diu_lei 所有, 如有侵权,请联系我们删除。

“Flink-Checkpoint源码详解”的评论:

还没有评论