0


Flink源码之JobManager启动流程

从启动命令flink-daemon.sh中可以看出StandaloneSession入口类为org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint, 从该类的main方法会进入ClusterEntrypoint::runCluster中, 该方法中会创建出主要服务和组件。

StandaloneSessionClusterEntrypoint::main
ClusterEntrypoint::runClusterEntrypoint
ClusterEntrypoint::startCluster
ClusterEntrypoint::runCluster

private void runCluster(Configuration configuration, PluginManager pluginManager)
        throws Exception {
    synchronized (lock) {
        initializeServices(configuration, pluginManager); //初始化服务

        // write host information into configuration
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

        final DispatcherResourceManagerComponentFactory
                dispatcherResourceManagerComponentFactory =
                        createDispatcherResourceManagerComponentFactory(configuration);
        //创建核心组件
        clusterComponent =
                dispatcherResourceManagerComponentFactory.create(
                        configuration,
                        ioExecutor,
                        commonRpcService,
                        haServices,
                        blobServer,
                        heartbeatServices,
                        metricRegistry,
                        executionGraphInfoStore,
                        new RpcMetricQueryServiceRetriever(
                                metricRegistry.getMetricQueryServiceRpcService()),
                        this);
    ...ignore code
    }
}

可以看出关键代码是调用initializeServices以及创建Cluster Component。

protected void initializeServices(Configuration configuration, PluginManager pluginManager)
        throws Exception {

    LOG.info("Initializing cluster services.");

    synchronized (lock) {
        rpcSystem = RpcSystem.load(configuration);

        commonRpcService =
                RpcUtils.createRemoteRpcService(
                        rpcSystem,
                        configuration,
                        configuration.getString(JobManagerOptions.ADDRESS),
                        getRPCPortRange(configuration),
                        configuration.getString(JobManagerOptions.BIND_HOST),
                        configuration.getOptional(JobManagerOptions.RPC_BIND_PORT));

        JMXService.startInstance(configuration.getString(JMXServerOptions.JMX_SERVER_PORT));

        // update the configuration used to create the high availability services
        configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
        configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

        ioExecutor =
                Executors.newFixedThreadPool(
                        ClusterEntrypointUtils.getPoolSize(configuration),
                        new ExecutorThreadFactory("cluster-io"));
        haServices = createHaServices(configuration, ioExecutor, rpcSystem);
        blobServer = new BlobServer(configuration, haServices.createBlobStore());
        blobServer.start();
        heartbeatServices = createHeartbeatServices(configuration);
        metricRegistry = createMetricRegistry(configuration, pluginManager, rpcSystem);

        final RpcService metricQueryServiceRpcService =
                MetricUtils.startRemoteMetricsRpcService(
                        configuration, commonRpcService.getAddress(), rpcSystem);
        metricRegistry.startQueryService(metricQueryServiceRpcService, null);

        final String hostname = RpcUtils.getHostname(commonRpcService);

        processMetricGroup =
                MetricUtils.instantiateProcessMetricGroup(
                        metricRegistry,
                        hostname,
                        ConfigurationUtils.getSystemResourceMetricsProbingInterval(
                                configuration));

        executionGraphInfoStore =
                createSerializableExecutionGraphStore(
                        configuration, commonRpcService.getScheduledExecutor());
    }
}

在initializeServices中首先创建commonRpcService,这个RPCService实例是JobManager提供RPC服务的核心,可以看出它会有个地址和监听端口号,commonRpcService可将继承自Gateway的服务实例包装成AkkaActor对外提供RPC服务,比如ResourceManager、Dispatcher。此外还创建了其他服务:

haService: 可通过HAService获取ResourceManager/Dispatcher/RestEndpoint的地址,同时也提供选主服务,组件启动时需向HAService注册,如果被选主成功,则会调用监听器的grandLeadership回调函数
BlobServer: 可用来提供存储大对象存储服务
heartbeatServices:为组件间传递心跳信息
metricRegistry:提供metric上报和查询服务,监听端口不同,新建了一个RpcService专为Metric服务
processMetricGroup:注册系统运行状态信息的Metric,比如GC/Memory/Network运行时状况,添加Metric都是通过一个MetricGroup添加
executionGraphInfoStore:缓存Job执行时信息,比如ExecutionGrap

初始化服务创建完成后,通过DefaultDispatcherResourceManagerComponentFactory:create创建JobManager的三大核心组件:Dispacher/ResourceManager/RestEndpointServer, 都是通过工厂方法创建:

DefaultDispatcherRunnerFactory
StandaloneResourceManagerFactory
SessionRestEndpointFactory

这些组件是JobManager向HAService注册获取leadership后,被ElectionService回调grantLeadership函数中创建出具体组件实例。

RestServer

RestServer并不是一个RPCServer,没有继承RpcGateway,只提供HTTP接口服务,然后将请求转交给Dispatcher处理,它的生成启动流程如下:

SessionRestEndpointFactory::createRestEndpoint
DispatcherRestEndpoint::new
RestServerEndpoint::start //通过Netty启动Rest服务
DispatcherRestEndpoint::initializeHandlers //JobSubmitHeaders、JobSubmitHandler处理客户端提交Job
WebMonitorEndpoint::initializeHandlers //关联Rest请求的Header和Handler
WebMonitorEndpoint::startInternal //竞选leader

ResourceManager

RM生成启动过程是ResourceManagerServiceImpl先竞选leader成功后再创建出具体的ResourceManager

ResourceManagerServiceImpl::start
ResourceManagerServiceImpl::grantLeadership
ResourceManagerServiceImpl::startNewLeaderResourceManager
ResourceManagerServiceImpl::startResourceManagerIfIsLeader//调用start方法
StandaloneResourceManagerFactory::createResourceManager
StandaloneResourceManager::new
StandaloneResourceManager::start

Dispatcher

Dispacher生成启动过程是DefaultDispatcherRunner选主后再创建出具体实例

DefaultDispatcherRunnerFactory::createDispatcherRunner
DefaultDispatcherRunner::create
DispatcherRunnerLeaderElectionLifecycleManager.createFor
DefaultDispatcherRunner::grantLeadership //
DefaultDispatcherRunner::startNewDispatcherLeaderProcess//创建SessionDispatcherLeaderProcess并调用其start方法
DefaultDispatcherRunner::createNewDispatcherLeaderProcess
SessionDispatcherLeaderProcessFactoryFactory::createFactory
SessionDispatcherLeaderProcessFactory::create
SessionDispatcherLeaderProcess::create
SessionDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::start
AbstractDispatcherLeaderProcess::startInternal
SessionDispatcherLeaderProcess:onstart
SessionDispatcherLeaderProcess::createDispatcherIfRunning
SessionDispatcherLeaderProcess::createDispatcher
DefaultDispatcherGatewayServiceFactory::create//创建Dispatcher并调用其start方法
SessionDispatcherFactory::createDispatcher
StandaloneDispatcher::new
StandaloneDispatcher::start
Dispatcher::onstart

总结

在这里插入图片描述
JobManager的启动过程就是创建三大组件RestServer/RM/Dispacher实例初始化的过程,RestSever通过Netty启动HTTP服务,RM/Dispacher被AkkaRpcService包装成AkkaActor提供本地或远程RPC服务,RestServer仅仅是接受请求解析消息后由具体Handler处理,JobGrap提交执行会转发给Dispatcher处理。

标签: flink 大数据

本文转载自: https://blog.csdn.net/jinjiating/article/details/132193277
版权归原作者 ImproveJin 所有, 如有侵权,请联系我们删除。

“Flink源码之JobManager启动流程”的评论:

还没有评论