0


解码Hadoop系列——NameNode启动流程

namenode的主要责任是文件元信息与数据块映射的管理。相应的,namenode的启动流程需要关注与客户端、datanode通信的工作线程,文件元信息的管理机制,数据块的管理机制等。其中,RpcServer主要负责与客户端、datanode通信,FSDirectory主要负责管理文件元信息。流程如下:
在这里插入图片描述

1.执行命令启动hdfs集群

start-dfs.sh

该命令会启动Hdfs的NameNode以及DataNode,启动NameNode主要是通过org.apache.hadoop.hdfs.server.namenode.NameNode类。

2.NameNode启动主流程

publicclassNameNodeextendsReconfigurableBaseimplementsNameNodeStatusMXBean{//主要初始化一些HDFS的配置信息static{//点进去发现init方法为空,但并不是什么也没做,//可以观察HdfsConfiguration的静态代码块,源码跳转到3查看HdfsConfiguration.init();}//代码省略 。。。。。。。。。。//main方法publicstaticvoidmain(String argv[])throwsException{//解析传入的参数是否为usage参数,如果是的话打印usage帮助信息并退出if(DFSUtil.parseHelpArgument(argv,NameNode.USAGE,System.out,true)){System.exit(0);}try{//格式化输出启动信息,并创建hook,打印节点关闭信息StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);//创建NameNode,源码跳转到4查看NameNode namenode =createNameNode(argv,null);if(namenode !=null){//namenode加入集群
        namenode.join();}}catch(Throwable e){//启动异常
      LOG.error("Failed to start namenode.", e);terminate(1, e);}}

3.HdfsConfiguration类,HDFS配置参数

publicclassHdfsConfigurationextendsConfiguration{//静态代码块,初始化hdfs一些配置信息static{addDeprecatedKeys();// adds the default resources//添加配置文件Configuration.addDefaultResource("hdfs-default.xml");Configuration.addDefaultResource("hdfs-site.xml");}publicHdfsConfiguration(){super();}publicHdfsConfiguration(boolean loadDefaults){super(loadDefaults);}publicHdfsConfiguration(Configuration conf){super(conf);}/**
   * This method is here so that when invoked, HdfsConfiguration is class-loaded
   * if it hasn't already been previously loaded.  Upon loading the class, the
   * static initializer block above will be executed to add the deprecated keys
   * and to add the default resources. It is safe for this method to be called
   * multiple times as the static initializer block will only get invoked once.
   *
   * This replaces the previously, dangerous practice of other classes calling
   * Configuration.addDefaultResource("hdfs-default.xml") directly without
   * loading this class first, thereby skipping the key deprecation.
   */publicstaticvoidinit(){}privatestaticvoidaddDeprecatedKeys(){Configuration.addDeprecations(newDeprecationDelta[]{newDeprecationDelta("dfs.backup.address",DeprecatedKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY),//。。。。。省略多个配置参数newDeprecationDelta("dfs.encryption.key.provider.uri",CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH),});}publicstaticvoidmain(String[] args){init();Configuration.dumpDeprecatedKeys();}}

4.createNameNode()创建NameNode

publicstaticNameNodecreateNameNode(String argv[],Configuration conf)throwsIOException{
    LOG.info("createNameNode "+Arrays.asList(argv));if(conf ==null)
      conf =newHdfsConfiguration();// Parse out some generic args into Configuration.//解析一些通用参数到配置项GenericOptionsParser hParser =newGenericOptionsParser(conf, argv);
    argv = hParser.getRemainingArgs();// Parse the rest, NN specific args.//解析NameNode启动参数StartupOption startOpt =parseArguments(argv);if(startOpt ==null){printUsage(System.err);returnnull;}setStartupOption(conf, startOpt);switch(startOpt){case FORMAT:{boolean aborted =format(conf, startOpt.getForceFormat(),
            startOpt.getInteractiveFormat());terminate(aborted ?1:0);returnnull;// avoid javac warning}case GENCLUSTERID:{System.err.println("Generating new cluster id:");System.out.println(NNStorage.newClusterID());terminate(0);returnnull;}//省略多个case分支...case UPGRADEONLY:{DefaultMetricsSystem.initialize("NameNode");newNameNode(conf);terminate(0);returnnull;}//正常启动进入该分支default:{//初始化metric系统DefaultMetricsSystem.initialize("NameNode");//返回新的NameNode,源码跳转到5查看returnnewNameNode(conf);}}}

5.NameNode构造函数

publicNameNode(Configuration conf)throwsIOException{this(conf,NamenodeRole.NAMENODE);}protectedNameNode(Configuration conf,NamenodeRole role)throwsIOException{super(conf);this.tracer =newTracer.Builder("NameNode").conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).build();this.tracerConfigurationManager =newTracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);this.role = role;//设置NameNode#clientNameNodeAddress为'hdfs://localhost:9000'setClientNamenodeAddress(conf);String nsId =getNameServiceId(conf);String namenodeId =HAUtil.getNameNodeId(conf, nsId);//HA相关this.haEnabled =HAUtil.isHAEnabled(conf, nsId);
    state =createHAState(getStartupOption(conf));this.allowStaleStandbyReads =HAUtil.shouldAllowStandbyReads(conf);this.haContext =createHAContext();try{initializeGenericKeys(conf, nsId, namenodeId);//完成实际的初始化工作,源码跳转到6查看initialize(getConf());//HA相关//尽管本地没有开启HA(haEnabled=false**),namenode依然拥有一个HAState//namenode的HAState状态为active.try{
        haContext.writeLock();
        state.prepareToEnterState(haContext);
        state.enterState(haContext);}finally{
        haContext.writeUnlock();}}catch(IOException e){this.stopAtException(e);throw e;}catch(HadoopIllegalArgumentException e){this.stopAtException(e);throw e;}this.started.set(true);}

6.initialize(Configuration conf) 完成实际的初始化工作

protectedvoidinitialize(Configuration conf)throwsIOException{if(conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS)==null){String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);if(intervals !=null){
        conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
          intervals);}}UserGroupInformation.setConfiguration(conf);loginAsNameNodeUser(conf);//初始化metricNameNode.initMetrics(conf,this.getRole());StartupProgressMetrics.register(startupProgress);//启动JvmPauseMonitor等,反向监控JVM
    pauseMonitor =newJvmPauseMonitor();
    pauseMonitor.init(conf);
    pauseMonitor.start();
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);//启动httpserverif(NamenodeRole.NAMENODE == role){startHttpServer(conf);}//从NameNode目录加载editlog和fsimage,初始化FsNameSystem,FsDirectory,LeaseManager等loadNamesystem(conf);//创建rpcserver,封装了NameNodeRpcServer、ClientRPCServer//支持ClientNameNodeProtocol、DataNodeProtocolPB等协议
    rpcServer =createRpcServer(conf);initReconfigurableBackoffKey();if(clientNamenodeAddress ==null){// This is expected for MiniDFSCluster. Set it now using // the RPC server's bind address.
      clientNamenodeAddress =NetUtils.getHostPortString(getNameNodeAddress());
      LOG.info("Clients are to use "+ clientNamenodeAddress +" to access"+" this namenode/service.");}if(NamenodeRole.NAMENODE == role){
      httpServer.setNameNodeAddress(getNameNodeAddress());
      httpServer.setFSImage(getFSImage());}//启动执行多个重要的工作线程,源码跳转到7startCommonServices(conf);startMetricsLogger(conf);}

7.startCommonServices(conf) 启动执行多个重要工作的线程

privatevoidstartCommonServices(Configuration conf)throwsIOException{//创建NameNodeResourceChecker、激活BlockManager等//源码跳转到8
    namesystem.startCommonServices(conf, haContext);registerNNSMXBean();//如果role不为NameNode.NAMENODE的在此处启动Httpserverif(NamenodeRole.NAMENODE != role){startHttpServer(conf);
      httpServer.setNameNodeAddress(getNameNodeAddress());
      httpServer.setFSImage(getFSImage());}//启动RpcServer
    rpcServer.start();//启动各种插件try{
      plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,ServicePlugin.class);}catch(RuntimeException e){String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY);
      LOG.error("Unable to load NameNode plugins. Specified list of plugins: "+
          pluginsValue, e);throw e;}for(ServicePlugin p: plugins){try{
        p.start(this);}catch(Throwable t){
        LOG.warn("ServicePlugin "+ p +" could not be started", t);}}
    LOG.info(getRole()+" RPC up at: "+getNameNodeAddress());if(rpcServer.getServiceRpcAddress()!=null){
      LOG.info(getRole()+" service RPC up at: "+ rpcServer.getServiceRpcAddress());}}

8.startCommonServices(conf, haContext)

voidstartCommonServices(Configuration conf,HAContext haContext)throwsIOException{this.registerMBean();// register the MBean for the FSNamesystemStatewriteLock();this.haContext = haContext;try{//创建NameNodeResourceChecker,并立即检查一次
      nnResourceChecker =newNameNodeResourceChecker(conf);checkAvailableResources();assert!blockManager.isPopulatingReplQueues();//设置一些启动过程中的信息StartupProgress prog =NameNode.getStartupProgress();
      prog.beginPhase(Phase.SAFEMODE);//获取已完成的数据块总量long completeBlocksTotal =getCompleteBlocksTotal();
      prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
          completeBlocksTotal);//激活BlockManager,源码跳转到9
      blockManager.activate(conf, completeBlocksTotal);}finally{writeUnlock("startCommonServices");}registerMXBean();DefaultMetricsSystem.instance().register(this);if(inodeAttributeProvider !=null){
      inodeAttributeProvider.start();
      dir.setINodeAttributeProvider(inodeAttributeProvider);}
    snapshotManager.registerMXBean();InetSocketAddress serviceAddress =NameNode.getServiceAddress(conf,true);this.nameNodeHostName =(serviceAddress !=null)?
        serviceAddress.getHostName():"";}

9.activate(conf, completeBlocksTotal)激活BlockManager

//blockManager.activate(conf)激活BlockManager主要完成PendingReplicationMonitor、// DecommissionManager#Monitor、HeartbeatManager#Monitor、ReplicationMonitorpublicvoidactivate(Configuration conf,long blockTotal){// 启动PendingReplicationMonitor
    pendingReplications.start();// 激活DatanodeManager:启动DecommissionManager--Monitor、HeartbeatManager--Monitor
    datanodeManager.activate(conf);this.replicationThread.setName("ReplicationMonitor");// 启动BlockManager--ReplicationMonitorthis.replicationThread.start();this.blockReportThread.start();
    mxBeanName =MBeans.register("NameNode","BlockStats",this);
    bmSafeMode.activate(blockTotal);}
标签: hdfs 大数据 hadoop

本文转载自: https://blog.csdn.net/manshiai/article/details/126287854
版权归原作者 小光芒 所有, 如有侵权,请联系我们删除。

“解码Hadoop系列——NameNode启动流程”的评论:

还没有评论