0


MapReduce on Yarn(包含MapReduce执行详细流程)

1. MapReduce

1.1 MapReduce任务在Yarn中执行流程

MapReduce作为一种分布式计算框架,它在Yarn中执行的流程为:

  • (1)客户端提交job; > 细节:> ① org.apache.hadoop.mapreduce.Job类配置job;> ② mapred-site.xml中mapreduce.framework.name配置为yarn时,yarn协议会激活;> ③ job.waitForCompletion(true):将job提交到Yarn集群,并等待执行完成;> ④ 客户端代码在客户端JVM中运行;
  • (2)job跟ResourceManager交互获取任务元数据:如application id; > 细节:> ①委托 OutputFormat检查输出目录是否已存在;> ②计算InputSplits;
  • (3)job将相关资源拷贝至HDFS(jar包、配置文件、input splits),并且允许其他job获取;
  • (4)job将application提交到ResourceManager; > job初始化细节:> ①ResourceManager收到新application的请求;> ②ResourceManager委派内部的Scheduler为ApplicationMaster提供container;(MR的ApplicationMaster为MRAppMaster)> ③MRAppMaster跟Resource Manager协商后初始化object,并且运行job;
  • (5)ResourceManager选择一个NodeManager,并分配资源让其新建一个container运行MRAppMaster; > MRAppMaster初始化细节:> ①创建内部bookkeeping对象,监控进程;> ②恢复Input Splits,客户端创建splits,并拷贝到HDFS;> ③创建tasks:每个split创建一个Map tasks,Reduce tasks根据mapreduce.job.reduces配置决定;> ④决定怎么运行task:特别小的任务会直接在MRAppMaster的JVM中运行,这种任务称为uberized、uber;在NodeManager运行tasks;> ⑤执行tasks;
  • (6)NodeManager分配给MRAppMaster container,MRAppMaster 执行并协调MapReduce job;
  • (7)MRAppMaster从HDFS获取运行MapReduce作业的资源(步骤③的资源);
  • (8)MRAppMaster跟ResourceManager协商,获取资源(ResourceManager会分配一个资源较为充足的NodeManager);
  • (9)MRAppMaster要求分配到的NodeManager上执行Map和Reduce tasks;
  • (10)NodeManager创建YarnChild容器,运行tasks;
  • (11)YarnChild从HDFS获取需要运行Map或Reduce任务的job资源;
  • (12)YarnChild运行Map或Reduce任务;

ps:YarnChild会向MRAppMaster实时汇报任务进展,MRAppMaster会向客户端和ResourceManager汇报任务运行进展;

整体流程如下图所示:
在这里插入图片描述

1.2 客户端提交Job流程

……  
# MR代码中配置好,Job后会通过以下代码进行提交  
/**  
 * TODO 提交 job.waitForCompletion  
*/System.exit(job.waitForCompletion(true)?0:1);
  • job.waitForCompletion的执行流程如图所示:在这里插入图片描述

详细的每一步的流程已在图中体现,最后到了ApplicationClientProtocolPBClientImpl后会根据ResourceManager的逻辑选择合适的nodemanager进行任务的分配;

1.3 MapReduce任务流程

  • MapReduce任务: 这里以最简单的MR WordCount举例

输入:一个文本文件;
分片(split)
把输入的文件切分成split,具体逻辑如下:
((double) bytesRemaining)/splitSize > SPLIT_SLOP

其中bytesRemaining是文件的字节长度,bytesRemaining要满足以上条件,每次增加splitSize个字节长度;
splitSize是以下三个参数的中间值;
mapreduce.input.fileinputformat.split.maxsize
mapreduce.input.fileinputformat.split.minsize
blockSize
SPLIT_SLOP一般是1.1;
切分的split存储在集合中
List splits = new ArrayList();
主要是以FileSplit的形式存储在集合中:其中有文件路径及文件名,开始到结束的字节数,hostname等多种信息;
splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
①、②步在MapReduce任务开始之前会完成;

③每个split对应一个MapTask;
④Map做好相应逻辑后,spill到环形缓冲区(环形缓冲区可以同时读和写,读:Map的结果,写:往磁盘写排序后的内容);
⑤环形缓冲区的内容,经过快速排序后溢出到本地磁盘中;
⑥磁盘中的文件经过几轮归并排序,Merge成一个大的文件;
⑦步骤⑥的结果会向MRAppMaster进行汇报,然后MRAppMaster会通知ReduceTask启动;
⑧Reduce阶段拉取几个MapTask⑥步中的partition值一样的大文件,然后又进行几轮归并排序,最后合成一个或多个大文件写出到output文件夹中;
详细过程已在图中描述
在这里插入图片描述

核心难点在于环形缓冲区的设计,后续有时间单独写一篇文章进行讲解;

原创作品,禁止转载!!!感谢支持~


本文转载自: https://blog.csdn.net/CLAY157/article/details/123968628
版权归原作者 CLAY157 所有, 如有侵权,请联系我们删除。

“MapReduce on Yarn(包含MapReduce执行详细流程)”的评论:

还没有评论