0


Hadoop-MapReduce-源码跟读-客户端篇

一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、从WordCount进入源码

用idea将源码加载进来后,找到org.apache.hadoop.examples.WordCount类(快捷方法:双击Shift输入WordCount)

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.examples;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper 
       extends Mapper<Object, Text, Text, IntWritable>{
    
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
      
    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }
  
  public static class IntSumReducer 
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    //构建一个新的Configuration,static代码块中加载了core-default.xml、core-site.xml配置
    //如果core-site.xml将某个属性的final设置为true,那么用户将无法进行修改
    Configuration conf = new Configuration();
    //获取用户命令行中指定的选项,并进行配置
    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
    if (otherArgs.length < 2) {
      System.err.println("Usage: wordcount <in> [<in>...] <out>");
      System.exit(2);
    }
    //根据配置和job名称创建一个新的Job,Job是提交者面对的视图
    //此时Cluster是空的,只有在需要时,才会根据conf参数创建Cluster
    Job job = Job.getInstance(conf, "word count");
    //通过查找示例类位置来设置作业的jar文件,此时Job状态被设置为DEFINE
    job.setJarByClass(WordCount.class);
    //为作业设置Mapper,该类必须是Mapper的子类,那么设置mapreduce.job.map.class的value为该类
    job.setMapperClass(TokenizerMapper.class);
    //为作业设置combiner,该类必须是Reducer的子类,那么设置mapreduce.job.combine.class的value为该类
    job.setCombinerClass(IntSumReducer.class);
    //为作业设置Reducer,该类必须是Reducer的子类,那么设置mapreduce.job.reduce.class的value为该类
    job.setReducerClass(IntSumReducer.class);
    //设置作业输出的Key类型类,即mapreduce.job.output.key.class
    job.setOutputKeyClass(Text.class);
    //设置作业输出的Value类型类,即mapreduce.job.output.value.class
    job.setOutputValueClass(IntWritable.class);
    //设置输入数据的路径,设置mapreduce.input.fileinputformat.inputdir为以逗号为连接符的多个输入路径
    for (int i = 0; i < otherArgs.length - 1; ++i) {
      FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
    }
    //设置输出数据的路径,即mapreduce.output.fileoutputformat.outputdir
    FileOutputFormat.setOutputPath(job,
      new Path(otherArgs[otherArgs.length - 1]));
    //将Job提交到集群并等待其完成。传参为true表示实时监控作业和打印状态
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

注释已加,下面我们从job.waitForCompletion(true) 进入源码学习

其中涉及的方法有很多,不可能一一来看,我们这里只看主线上的方法以及重要的方法

1、Job.waitForCompletion

/**
   * Submit the job to the cluster and wait for it to finish.
   * @param verbose print the progress to the user
   * @return true if the job succeeded
   * @throws IOException thrown if the communication with the 
   *         <code>JobTracker</code> is lost
   */
  public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    //如果此时Job状态是否为DEFINE,就提交
    if (state == JobState.DEFINE) {
      //将作业提交到集群并立即返回。
      submit();
    }
    //如果传入的参数为true,就实时打印Job状态
    if (verbose) {
      //随着进度和任务的进行,实时监控作业和打印状态
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      //从客户端获取完成轮询间隔。可以通过mapreduce.client.completion.pollinterval设置,
      //默认5000ms,JobClient轮询MapReduce ApplicationMaster以获取有关作业状态的更新的间隔(以毫秒为单位)。
      //测试小数据量时可以设置间隔短些,生产上设置的间隔长一些可以减少客户端-服务器交互
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      //检查作业是否已完成。这是一个非阻塞呼叫。
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

2、Job.submit

/**
   * Submit the job to the cluster and return immediately.
   * @throws IOException
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    //默认设置为新API,除非它们被显式设置,或者使用了旧的映射器或reduce属性。
    setUseNewAPI();
    //采用impersonation(doAs)机制,为符合身份和权限的用户构建Cluster
    //Cluster提供一种访问有关 map/reduce 群集的信息的方法。
    connect();
    //获取JobSubmitter 从字面上看时Job提交者 (参数为文件系统和客户端)
    //JobClient可以使用自有方法提交作业以供执行,并了解当前系统状态。
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        //用于向系统提交作业的内部方法。
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    //更改作业状态为RUNNING
    state = JobState.RUNNING;
    //获取可以显示该作业进度信息的URL。
    LOG.info("The url to track the job: " + getTrackingURL());
   }

3、JobSubmitter.submitJobInternal

/**
* 用于向系统提交Job的内部方法。
* Job提交过程包括:
*    1、检查Job的输入和输出规格
*    2、计算Job的InputSplit
*    3、如有必要,请为Job的DistributedCache设置必要的记帐信息
*    4、将Job的jar和配置复制到分布式文件系统上的map-reduce系统目录中
*    5、将作业提交到ResourceManager,并可选择监视其状态。
* @param job the configuration to submit
* @param cluster the handle to the Cluster
* @throws ClassNotFoundException
* @throws InterruptedException
* @throws IOException
*/
JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //验证作业输出规格,如果输出目录存在为避免重写则抛出异常
    checkSpecs(job);

    //根据Job获取Configuration,刚刚是根据配置创建Job,可见他们可以互相得到
    Configuration conf = job.getConfiguration();
    //加载MapReduce框架存档路径到分布式缓存conf
    //如果设置了MapReduce框架存档的路径(此路径通常位于HDFS文件系统中的公共位置),框架存档将自动与作业一起分发
    addMRFrameworkToDistributedCache(conf);
    
    //初始化临时目录并返回路径
    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    //在提交的dfs上正确配置命令行选项

    //返回本地主机的地址。通过从系统中检索主机的名称,然后将该名称解析为InetAddress
    //注意:解析后的地址可能会被缓存一小段时间
    //如果存在安全管理器并被阻挡,那么返回表示环回地址的InetAddress
    //会获取系统的所有网卡信息,但是返回的是第一个
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      //设置提交端的ip地址
      submitHostAddress = ip.getHostAddress();
      //设置提交端的hostname
      submitHostName = ip.getHostName();
      //设置job相关配置mapreduce.job.submithostname
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      //设置job相关配置mapreduce.job.submithostaddress
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    //作业分配一个唯一的jobId
    JobID jobId = submitClient.getNewJobID();
    //为job设置jobId
    job.setJobID(jobId);
    //
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      //设置mapreduce.job.user.name
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      //设置hadoop.http.filter.initializers,默认的过滤类是org.apache.hadoop.http.lib.StaticUserWebFilter
      //这里设置的是AmFilterInitializer
      //该配置是以逗号分隔的类名列表,必须是FilterInitializer子类
      //这些Filter将应用于所有面向用户的jsp和servlet网页
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      //设置mapreduce.job.dir
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      //获取dir的委派令牌
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);
      
      //获取密钥和令牌并将其存储到TokenCache中
      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      // 生成一个密钥以验证无序传输
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(SHUFFLE_KEY_LENGTH);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        //设置MapReduce中Shuffle的密钥key,可见Shuffle的传输是有校验的,是有数据完整性保障的
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }
      //判断是否加密中间MapReduce溢写文件,默认false(mapreduce.job.encrypted-intermediate-data)
      if (CryptoUtils.isEncryptedSpillEnabled(conf)) {
        //如果设置了加密,就把最大作业尝试次数设置为1,默认值是2
        //该参数是应用程序尝试的最大次数,如果失败ApplicationMaster会进行重试
        conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 1);
        LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
                "data spill is enabled");
      }
      //上传和配置与传递job相关的文件、libjar、jobjar和归档文件。
      //如果启用了共享缓存,则此客户端将使用libjar、文件、归档和jobjar的共享缓存
      //1.对于已经成功共享的资源,我们将继续以共享的方式使用它们。
      //2.对于不在缓存中并且需要NM上传的资源,我们不会要求NM上传。
      copyAndConfigureFiles(job, submitJobDir);
      
      //获取job conf的文件路径
      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
      
      // Create the splits for the job
      //为job创建 splits
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      //重点看该方法,该方法为job计算分片
      int maps = writeSplits(job, submitJobDir);
      //设置map个数 mapreduce.job.maps 可见分片数=map个数
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      //获取最大map数 mapreduce.job.max.map 默认 -1 即无限制
      int maxMaps = conf.getInt(MRJobConfig.JOB_MAX_MAP,
          MRJobConfig.DEFAULT_JOB_MAX_MAP);
      if (maxMaps >= 0 && maxMaps < maps) {
        throw new IllegalArgumentException("The number of map tasks " + maps +
            " exceeded limit " + maxMaps);
      }

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      //将“作业提交到的队列的队列管理员”写入作业文件

      //获取队列名称 mapreduce.job.queuename 默认是default
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      //获取给定作业队列的管理员。此方法仅供hadoop内部使用。
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      //设置mapred.queue.default.acl-administer-jobs
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      //在将job conf复制到HDFS之前删除jobtoken引用,因为任务不需要此设置,
      //实际上它们可能会因此而中断,因为引用将指向不同的作业。
      TokenCache.cleanUpTokenReferral(conf);

      //判断配置中mapreduce.job.token.tracking.ids.enabled(跟踪作业使用的令牌的ID的配置,默认false)
      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      //设置预订信息(如果存在)mapreduce.job.reservation.id
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      //写入作业文件以提交目录(HDFS上)
      writeConf(conf, submitJobFile);
      
      //
      // Now, actually submit the job (using the submit name)
      // 现在,真正提交作业(使用提交名称)
      //这里调用了YARNRunner.submitJob() 下面我们看下这个方法
      printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }

3.1 JobSubmitter.writeSplits

private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
      Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    JobConf jConf = (JobConf)job.getConfiguration();
    int maps;
    //默认是false,在Job.submit是用setUseNewAPI()方法设置过true
    if (jConf.getUseNewMapper()) {
      //重点看该方法
      maps = writeNewSplits(job, jobSubmitDir);
    } else {
      maps = writeOldSplits(jConf, jobSubmitDir);
    }
    return maps;
  }

3.2 JobSubmitter.writeNewSplits

private <T extends InputSplit>
  int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
      InterruptedException, ClassNotFoundException {
    Configuration conf = job.getConfiguration();
    //获取输入格式化类,可以通过mapreduce.job.inputformat.class设置,
    //默认为TextInputFormat.class
    InputFormat<?, ?> input =
      ReflectionUtils.newInstance(job.getInputFormatClass(), conf);

    //重点看这个方法
    //按逻辑拆分作业的输入文件集。
    //每个InputSplit都被分配给一个单独的Mapper进行处理(分片数量=MapTask数量)
    //注意:InputSplit是逻辑上的分割(比如 <输入文件路径,开始,偏移量>),并没有改变文件对应的块
    //InputFormat还创建RecordReader以读取InputSplit。
    List<InputSplit> splits = input.getSplits(job);
    T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);

    // sort the splits into order based on size, so that the biggest
    // go first
    //根据大小将拆分部分按顺序排序,使最大的优先
    Arrays.sort(array, new SplitComparator());
    JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
        jobSubmitDir.getFileSystem(conf), array);
    return array.length;
  }

3.3 FileInputFormat.getSplits

/** 
   * Generate the list of files and make them into FileSplits.
   * 生成文件列表,并将它们制作成FileSplits。
   * @param job the job context
   * @throws IOException
   */
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    StopWatch sw = new StopWatch().start();
    //getFormatMinSplitSize() 返回 1
    //getMinSplitSize(job)) 获取mapreduce.input.fileinputformat.split.minsize值,默认1
    //两者取最大值,因为两者默认值都是1,那么 minSize = 1
    long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    //获取mapreduce.input.fileinputformat.split.maxsize的值,默认值Long.MAX_VALUE(2的63次方-1 MAX_VALUE=0x7fffffffffffffffL)
    long maxSize = getMaxSplitSize(job);

    // generate splits
    // 声明分片列表
    List<InputSplit> splits = new ArrayList<InputSplit>();
    //列出输入目录,仅选择与正则表达式匹配的文件
    List<FileStatus> files = listStatus(job);

    //获取mapreduce.input.fileinputformat.input.dir.recursive的值 默认false
    //获取mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs的值 默认false
    //两者都为true,才把ignoreDirs 设置为true
    boolean ignoreDirs = !getInputDirRecursive(job)
      && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
    //循环输入的每个文件,计算全部的InputSplit
    for (FileStatus file: files) {
      //忽略目录
      if (ignoreDirs && file.isDirectory()) {
        continue;
      }
      //FileStatus接口,表示文件的客户端信息
      //Path 为FileSystem中文件或目录的名称
      //通过FileStatus获取Path
      Path path = file.getPath();
      //获取此文件的长度,以字节为单位。
      long length = file.getLen();
      //如果文件长度不等于0
      if (length != 0) {
        //BlockLocation 表示块的网络位置、有关包含块副本的主机的信息以及其他块元数据
        //(例如,与块相关的文件偏移量、长度、是否已损坏等)。
        //如果文件是3个复本,则BlockLocation的偏移量和长度表示文件中的绝对值,而主机是保存副本的3个数据节点。以下是一个示例:
        //BlockLocation(offset: 0, length: BLOCK_SIZE,hosts: {"host1:9866", "host2:9866, host3:9866"})
        //如果文件是擦除编码的,则每个BlockLocation表示一个逻辑块组。值偏移是文件中块组的偏移,值长度是块组的总长度。BlockLocation的主机是保存块组的所有数据块和奇偶校验块的数据节点。
        //假设我们有一个RS_3_2编码文件(3个数据单元和2个奇偶校验单元)。BlockLocation示例如下:
        //BlockLocation(offset: 0, length: 3 * BLOCK_SIZE, hosts: {"host1:9866","host2:9866","host3:9866","host4:9866","host5:9866"})
        BlockLocation[] blkLocations;
        //判断文件是否是LocatedFileStatus的实例
        //获取文件的 block 位置列表    
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          FileSystem fs = path.getFileSystem(job.getConfiguration());
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        //判断文件是可拆分的吗?通常情况下,这是真的,但如果文件是流压缩的,则不会。
        if (isSplitable(job, path)) {
          //获取该文件的块大小,HDFS允许文件可以指定自己的块大小和副本数
          long blockSize = file.getBlockSize();
          //计算该文件的分片大小:
          //Math.max(minSize, Math.min(maxSize, blockSize));
          //minSize 默认 = 1
          //maxSize 默认 = Long.MAX_VALUE
          //那么默认情况下该文件的分片大小=blockSize(该文件的块大小)
          long splitSize = computeSplitSize(blockSize, minSize, maxSize);

          //默认剩下的字节长度=文件总的字节长度
          long bytesRemaining = length;
          //文件剩下的字节长度 / 分片大小(默认该文件块大小) > 1.1
          //含义:如果文件剩下的字节长度还有 块大小的1.1倍就继续 
          //    如果一个文件只有一个块 那么就不走该循环了
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            //length-bytesRemaining 相当于对于该文件整体的偏移量
            //根据偏移量获取对应该文件的第几个块
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            //添加分片
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                        blkLocations[blkIndex].getHosts(),
                        blkLocations[blkIndex].getCachedHosts()));
            bytesRemaining -= splitSize;
          }

          //一般到最后一个分片会走这里,或者该文件特别小只有一个块会走这里
          if (bytesRemaining != 0) {
            int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
            splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                       blkLocations[blkIndex].getHosts(),
                       blkLocations[blkIndex].getCachedHosts()));
          }
        } else { // not splitable
          if (LOG.isDebugEnabled()) {
            // Log only if the file is big enough to be splitted
            if (length > Math.min(file.getBlockSize(), minSize)) {
              LOG.debug("File is not splittable so no parallelization "
                  + "is possible: " + file.getPath());
            }
          }
          //制作分片,分片数量=文件数量,分片为该文件对应的副本中第一个副本所在位置(优先取在缓存中的副本)
          splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                      blkLocations[0].getCachedHosts()));
        }
      } else { 
        //Create empty hosts array for zero length files
        //如果输入文件的字节大小=0,创建空的分片
        splits.add(makeSplit(path, 0, length, new String[0]));
      }
    }
    // Save the number of input files for metrics/loadgen
    // 为 job 设置文件数 mapreduce.input.fileinputformat.numinputfiles
    job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
    sw.stop();
    if (LOG.isDebugEnabled()) {
      LOG.debug("Total # of splits generated by getSplits: " + splits.size()
          + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    }
    return splits;
  }

4、YARNRunner.submitJob

public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
  throws IOException, InterruptedException {
    //添加Tokens
    addHistoryToken(ts);

    //构建启动MapReduce ApplicationMaster所需的所有信息
    //    1、设置LocalResources(表示运行容器所需的本地资源,NodeManager负责在启动容器之前本地化资源)
    //    2、设置安全令牌
    //    3、为ApplicationMaster容器设置ContainerLaunchContext(表示NodeManager启动容器所需的所有信息包括:ContainerId、资源情况、分配给谁、安全令牌、环境变量、启动容器的命令、容器失败退出时的重试策略、运行容器所必需的,如二进制文件、jar、共享对象、辅助文件等、)
    //    4、设置ApplicationSubmissionContext(表示ResourceManager启动应用程序的ApplicationMaster所需的所有信息。包括:ApplicationId、用户、名称、优先级、执行ApplicationMaster的容器的ContainerLaunchContext、可尝试的最大次数、尝试间隔、NodeManager处理应用程序日志所需的所有信息)
    //    5、设置ApplicationMaster资源请求
    //    6、为AM容器请求设置标签(如果存在)
    //    7、为job容器设置标签
    ApplicationSubmissionContext appContext =
      createApplicationSubmissionContext(conf, jobSubmitDir, ts);

    // Submit to ResourceManager
    // 向ResourceManager提交
    try {
      //最终是用YarnClient来提交到Yarn
      ApplicationId applicationId =
          resMgrDelegate.submitApplication(appContext);

      ApplicationReport appMaster = resMgrDelegate
          .getApplicationReport(applicationId);
      String diagnostics =
          (appMaster == null ?
              "application report is null" : appMaster.getDiagnostics());
      if (appMaster == null
          || appMaster.getYarnApplicationState() == YarnApplicationState.FAILED
          || appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
        throw new IOException("Failed to run job : " +
            diagnostics);
      }
      return clientCache.getClient(jobId).getJobStatus(jobId);
    } catch (YarnException e) {
      throw new IOException(e);
    }
  }

5、YarnClientImpl.submitApplication

/**
 * 向YARN提交新申请,这是一个阻塞调用-在提交的应用程序成功提交并被ResourceManager接受之前,
 * 它不会返回ApplicationId。
 * 用户在提交新应用程序时应提供ApplicationId作为参数ApplicationSubmissionContext的一部分
 * 这在内部调用ApplicationClientProtocol.submitApplication() 之后在内部调用 ApplicationClientProtocol.getApplicationReport()
 * 
 */
public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    //获取applicationId
    ApplicationId applicationId = appContext.getApplicationId();
    if (applicationId == null) {
      throw new ApplicationIdNotProvidedException(
          "ApplicationId is not provided in ApplicationSubmissionContext");
    }
    //构建SubmitApplicationRequest(向ResourceManager提交应用程序的请求信息)
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);

    // Automatically add the timeline DT into the CLC
    // Only when the security and the timeline service are both enabled
    //仅当安全和时间线服务都启用时自动将时间线DT添加到CLC中
    if (isSecurityEnabled() && timelineV1ServiceEnabled) {
      addTimelineDelegationToken(appContext.getAMContainerSpec());
    }
    
    //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
    //提交作业
    //客户端用于向ResourceManager提交新应用程序的接口
    //客户端需要通过SubmitApplicationRequest提供详细信息,如运行ApplicationMaster所需的队列、用于启动Application Master的等效队列等
    rmClient.submitApplication(request);

    int pollCount = 0;
    long startTime = System.currentTimeMillis();
    //Job等待状态设置
    EnumSet<YarnApplicationState> waitingStates = 
                                 EnumSet.of(YarnApplicationState.NEW,
                                 YarnApplicationState.NEW_SAVING,
                                 YarnApplicationState.SUBMITTED);
    //Job失败状态设置
    EnumSet<YarnApplicationState> failToSubmitStates = 
                                  EnumSet.of(YarnApplicationState.FAILED,
                                  YarnApplicationState.KILLED);        
    while (true) {
      try {
        //获取应用的报告,包括:
        //    ApplicationId
        //    Applications user
        //    Application queue
        //    Application name
        //    允许ApplicationMaster的主机
        //    ApplicationMaster的RPC端口
        //    跟踪url
        //    ApplicationMaster的各种状态
        //    出现错误时的诊断信息
        //    应用的开始时间
        //    如果开启了安全性,应用的客户端令牌
        ApplicationReport appReport = getApplicationReport(applicationId);
        YarnApplicationState state = appReport.getYarnApplicationState();
        if (!waitingStates.contains(state)) {
          if(failToSubmitStates.contains(state)) {
            throw new YarnException("Failed to submit " + applicationId + 
                " to YARN : " + appReport.getDiagnostics());
          }
          LOG.info("Submitted application " + applicationId);
          break;
        }

        long elapsedMillis = System.currentTimeMillis() - startTime;
        if (enforceAsyncAPITimeout() &&
            elapsedMillis >= asyncApiPollTimeoutMillis) {
          throw new YarnException("Timed out while waiting for application " +
              applicationId + " to be submitted successfully");
        }

        // Notify the client through the log every 10 poll, in case the client
        // is blocked here too long.
        //每10次轮询通过日志通知客户端,以防客户端在此处被阻止的时间过长。
        if (++pollCount % 10 == 0) {
          LOG.info("Application submission is not finished, " +
              "submitted application " + applicationId +
              " is still in " + state);
        }
        try {
          //通过yarn.client.app-submission.poll-interval 设置,默认值200ms
          Thread.sleep(submitPollIntervalMillis);
        } catch (InterruptedException ie) {
          String msg = "Interrupted while waiting for application "
              + applicationId + " to be successfully submitted.";
          LOG.error(msg);
          throw new YarnException(msg, ie);
        }
      } catch (ApplicationNotFoundException ex) {
        // FailOver or RM restart happens before RMStateStore saves
        // ApplicationState
        //故障转移或RM重新启动发生在RMStateStore保存ApplicationState之前
        LOG.info("Re-submit application " + applicationId + "with the " +
            "same ApplicationSubmissionContext");
        rmClient.submitApplication(request);
      }
    }

    return applicationId;
  }

三、总结

1、构建Configuration,并加载hadoop默认的配置文件core-default.xml、core-site.xml
2、解析命令行参数,配置用户配置的环境变量
3、设置Job信息,比如:主类、Mapper类、Reduce类、Combiner类、输出格式、输入输出文件等
4、异步提交Job,实时监控作业并打印Job状态
5、根据用户身份和权限构建Cluster,并向集群提交Job
6、检查Job的输入和输出规格
7、计算Job的InputSplit(格式:<输入文件路径,开始,偏移量>,默认分片数量=所有输入文件对应的块的数量,且每个分片对应一个Mapper)
8、如有必要,请为Job的DistributedCache设置必要的记帐信息
9、将Job的jar和配置复制到分布式文件系统上的map-reduce系统目录中
10、将作业提交到ResourceManager,并可选择监视其状态


本文转载自: https://blog.csdn.net/lu070828/article/details/135747480
版权归原作者 隔着天花板看星星 所有, 如有侵权,请联系我们删除。

“Hadoop-MapReduce-源码跟读-客户端篇”的评论:

还没有评论