namenode的主要责任是文件元信息与数据块映射的管理。相应的,namenode的启动流程需要关注与客户端、datanode通信的工作线程,文件元信息的管理机制,数据块的管理机制等。其中,RpcServer主要负责与客户端、datanode通信,FSDirectory主要负责管理文件元信息。流程如下:
1.执行命令启动hdfs集群
start-dfs.sh
该命令会启动Hdfs的NameNode以及DataNode,启动NameNode主要是通过org.apache.hadoop.hdfs.server.namenode.NameNode类。
2.NameNode启动主流程
publicclassNameNodeextendsReconfigurableBaseimplementsNameNodeStatusMXBean{//主要初始化一些HDFS的配置信息static{//点进去发现init方法为空,但并不是什么也没做,//可以观察HdfsConfiguration的静态代码块,源码跳转到3查看HdfsConfiguration.init();}//代码省略 。。。。。。。。。。//main方法publicstaticvoidmain(String argv[])throwsException{//解析传入的参数是否为usage参数,如果是的话打印usage帮助信息并退出if(DFSUtil.parseHelpArgument(argv,NameNode.USAGE,System.out,true)){System.exit(0);}try{//格式化输出启动信息,并创建hook,打印节点关闭信息StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);//创建NameNode,源码跳转到4查看NameNode namenode =createNameNode(argv,null);if(namenode !=null){//namenode加入集群
namenode.join();}}catch(Throwable e){//启动异常
LOG.error("Failed to start namenode.", e);terminate(1, e);}}
3.HdfsConfiguration类,HDFS配置参数
publicclassHdfsConfigurationextendsConfiguration{//静态代码块,初始化hdfs一些配置信息static{addDeprecatedKeys();// adds the default resources//添加配置文件Configuration.addDefaultResource("hdfs-default.xml");Configuration.addDefaultResource("hdfs-site.xml");}publicHdfsConfiguration(){super();}publicHdfsConfiguration(boolean loadDefaults){super(loadDefaults);}publicHdfsConfiguration(Configuration conf){super(conf);}/**
* This method is here so that when invoked, HdfsConfiguration is class-loaded
* if it hasn't already been previously loaded. Upon loading the class, the
* static initializer block above will be executed to add the deprecated keys
* and to add the default resources. It is safe for this method to be called
* multiple times as the static initializer block will only get invoked once.
*
* This replaces the previously, dangerous practice of other classes calling
* Configuration.addDefaultResource("hdfs-default.xml") directly without
* loading this class first, thereby skipping the key deprecation.
*/publicstaticvoidinit(){}privatestaticvoidaddDeprecatedKeys(){Configuration.addDeprecations(newDeprecationDelta[]{newDeprecationDelta("dfs.backup.address",DeprecatedKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY),//。。。。。省略多个配置参数newDeprecationDelta("dfs.encryption.key.provider.uri",CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH),});}publicstaticvoidmain(String[] args){init();Configuration.dumpDeprecatedKeys();}}
4.createNameNode()创建NameNode
publicstaticNameNodecreateNameNode(String argv[],Configuration conf)throwsIOException{
LOG.info("createNameNode "+Arrays.asList(argv));if(conf ==null)
conf =newHdfsConfiguration();// Parse out some generic args into Configuration.//解析一些通用参数到配置项GenericOptionsParser hParser =newGenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();// Parse the rest, NN specific args.//解析NameNode启动参数StartupOption startOpt =parseArguments(argv);if(startOpt ==null){printUsage(System.err);returnnull;}setStartupOption(conf, startOpt);switch(startOpt){case FORMAT:{boolean aborted =format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());terminate(aborted ?1:0);returnnull;// avoid javac warning}case GENCLUSTERID:{System.err.println("Generating new cluster id:");System.out.println(NNStorage.newClusterID());terminate(0);returnnull;}//省略多个case分支...case UPGRADEONLY:{DefaultMetricsSystem.initialize("NameNode");newNameNode(conf);terminate(0);returnnull;}//正常启动进入该分支default:{//初始化metric系统DefaultMetricsSystem.initialize("NameNode");//返回新的NameNode,源码跳转到5查看returnnewNameNode(conf);}}}
5.NameNode构造函数
publicNameNode(Configuration conf)throwsIOException{this(conf,NamenodeRole.NAMENODE);}protectedNameNode(Configuration conf,NamenodeRole role)throwsIOException{super(conf);this.tracer =newTracer.Builder("NameNode").conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).build();this.tracerConfigurationManager =newTracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);this.role = role;//设置NameNode#clientNameNodeAddress为'hdfs://localhost:9000'setClientNamenodeAddress(conf);String nsId =getNameServiceId(conf);String namenodeId =HAUtil.getNameNodeId(conf, nsId);//HA相关this.haEnabled =HAUtil.isHAEnabled(conf, nsId);
state =createHAState(getStartupOption(conf));this.allowStaleStandbyReads =HAUtil.shouldAllowStandbyReads(conf);this.haContext =createHAContext();try{initializeGenericKeys(conf, nsId, namenodeId);//完成实际的初始化工作,源码跳转到6查看initialize(getConf());//HA相关//尽管本地没有开启HA(haEnabled=false**),namenode依然拥有一个HAState//namenode的HAState状态为active.try{
haContext.writeLock();
state.prepareToEnterState(haContext);
state.enterState(haContext);}finally{
haContext.writeUnlock();}}catch(IOException e){this.stopAtException(e);throw e;}catch(HadoopIllegalArgumentException e){this.stopAtException(e);throw e;}this.started.set(true);}
6.initialize(Configuration conf) 完成实际的初始化工作
protectedvoidinitialize(Configuration conf)throwsIOException{if(conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS)==null){String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);if(intervals !=null){
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);}}UserGroupInformation.setConfiguration(conf);loginAsNameNodeUser(conf);//初始化metricNameNode.initMetrics(conf,this.getRole());StartupProgressMetrics.register(startupProgress);//启动JvmPauseMonitor等,反向监控JVM
pauseMonitor =newJvmPauseMonitor();
pauseMonitor.init(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);//启动httpserverif(NamenodeRole.NAMENODE == role){startHttpServer(conf);}//从NameNode目录加载editlog和fsimage,初始化FsNameSystem,FsDirectory,LeaseManager等loadNamesystem(conf);//创建rpcserver,封装了NameNodeRpcServer、ClientRPCServer//支持ClientNameNodeProtocol、DataNodeProtocolPB等协议
rpcServer =createRpcServer(conf);initReconfigurableBackoffKey();if(clientNamenodeAddress ==null){// This is expected for MiniDFSCluster. Set it now using // the RPC server's bind address.
clientNamenodeAddress =NetUtils.getHostPortString(getNameNodeAddress());
LOG.info("Clients are to use "+ clientNamenodeAddress +" to access"+" this namenode/service.");}if(NamenodeRole.NAMENODE == role){
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());}//启动执行多个重要的工作线程,源码跳转到7startCommonServices(conf);startMetricsLogger(conf);}
7.startCommonServices(conf) 启动执行多个重要工作的线程
privatevoidstartCommonServices(Configuration conf)throwsIOException{//创建NameNodeResourceChecker、激活BlockManager等//源码跳转到8
namesystem.startCommonServices(conf, haContext);registerNNSMXBean();//如果role不为NameNode.NAMENODE的在此处启动Httpserverif(NamenodeRole.NAMENODE != role){startHttpServer(conf);
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());}//启动RpcServer
rpcServer.start();//启动各种插件try{
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,ServicePlugin.class);}catch(RuntimeException e){String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY);
LOG.error("Unable to load NameNode plugins. Specified list of plugins: "+
pluginsValue, e);throw e;}for(ServicePlugin p: plugins){try{
p.start(this);}catch(Throwable t){
LOG.warn("ServicePlugin "+ p +" could not be started", t);}}
LOG.info(getRole()+" RPC up at: "+getNameNodeAddress());if(rpcServer.getServiceRpcAddress()!=null){
LOG.info(getRole()+" service RPC up at: "+ rpcServer.getServiceRpcAddress());}}
8.startCommonServices(conf, haContext)
voidstartCommonServices(Configuration conf,HAContext haContext)throwsIOException{this.registerMBean();// register the MBean for the FSNamesystemStatewriteLock();this.haContext = haContext;try{//创建NameNodeResourceChecker,并立即检查一次
nnResourceChecker =newNameNodeResourceChecker(conf);checkAvailableResources();assert!blockManager.isPopulatingReplQueues();//设置一些启动过程中的信息StartupProgress prog =NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);//获取已完成的数据块总量long completeBlocksTotal =getCompleteBlocksTotal();
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
completeBlocksTotal);//激活BlockManager,源码跳转到9
blockManager.activate(conf, completeBlocksTotal);}finally{writeUnlock("startCommonServices");}registerMXBean();DefaultMetricsSystem.instance().register(this);if(inodeAttributeProvider !=null){
inodeAttributeProvider.start();
dir.setINodeAttributeProvider(inodeAttributeProvider);}
snapshotManager.registerMXBean();InetSocketAddress serviceAddress =NameNode.getServiceAddress(conf,true);this.nameNodeHostName =(serviceAddress !=null)?
serviceAddress.getHostName():"";}
9.activate(conf, completeBlocksTotal)激活BlockManager
//blockManager.activate(conf)激活BlockManager主要完成PendingReplicationMonitor、// DecommissionManager#Monitor、HeartbeatManager#Monitor、ReplicationMonitorpublicvoidactivate(Configuration conf,long blockTotal){// 启动PendingReplicationMonitor
pendingReplications.start();// 激活DatanodeManager:启动DecommissionManager--Monitor、HeartbeatManager--Monitor
datanodeManager.activate(conf);this.replicationThread.setName("ReplicationMonitor");// 启动BlockManager--ReplicationMonitorthis.replicationThread.start();this.blockReportThread.start();
mxBeanName =MBeans.register("NameNode","BlockStats",this);
bmSafeMode.activate(blockTotal);}
版权归原作者 小光芒 所有, 如有侵权,请联系我们删除。