1. MapReduce
1.1 MapReduce任务在Yarn中执行流程
MapReduce作为一种分布式计算框架,它在Yarn中执行的流程为:
- (1)客户端提交job; > 细节:> ① org.apache.hadoop.mapreduce.Job类配置job;> ② mapred-site.xml中mapreduce.framework.name配置为yarn时,yarn协议会激活;> ③ job.waitForCompletion(true):将job提交到Yarn集群,并等待执行完成;> ④ 客户端代码在客户端JVM中运行;
- (2)job跟ResourceManager交互获取任务元数据:如application id; > 细节:> ①委托 OutputFormat检查输出目录是否已存在;> ②计算InputSplits;
- (3)job将相关资源拷贝至HDFS(jar包、配置文件、input splits),并且允许其他job获取;
- (4)job将application提交到ResourceManager; > job初始化细节:> ①ResourceManager收到新application的请求;> ②ResourceManager委派内部的Scheduler为ApplicationMaster提供container;(MR的ApplicationMaster为MRAppMaster)> ③MRAppMaster跟Resource Manager协商后初始化object,并且运行job;
- (5)ResourceManager选择一个NodeManager,并分配资源让其新建一个container运行MRAppMaster; > MRAppMaster初始化细节:> ①创建内部bookkeeping对象,监控进程;> ②恢复Input Splits,客户端创建splits,并拷贝到HDFS;> ③创建tasks:每个split创建一个Map tasks,Reduce tasks根据mapreduce.job.reduces配置决定;> ④决定怎么运行task:特别小的任务会直接在MRAppMaster的JVM中运行,这种任务称为uberized、uber;在NodeManager运行tasks;> ⑤执行tasks;
- (6)NodeManager分配给MRAppMaster container,MRAppMaster 执行并协调MapReduce job;
- (7)MRAppMaster从HDFS获取运行MapReduce作业的资源(步骤③的资源);
- (8)MRAppMaster跟ResourceManager协商,获取资源(ResourceManager会分配一个资源较为充足的NodeManager);
- (9)MRAppMaster要求分配到的NodeManager上执行Map和Reduce tasks;
- (10)NodeManager创建YarnChild容器,运行tasks;
- (11)YarnChild从HDFS获取需要运行Map或Reduce任务的job资源;
- (12)YarnChild运行Map或Reduce任务;
ps:YarnChild会向MRAppMaster实时汇报任务进展,MRAppMaster会向客户端和ResourceManager汇报任务运行进展;
整体流程如下图所示:
1.2 客户端提交Job流程
……
# MR代码中配置好,Job后会通过以下代码进行提交
/**
* TODO 提交 job.waitForCompletion
*/System.exit(job.waitForCompletion(true)?0:1);
- job.waitForCompletion的执行流程如图所示:
详细的每一步的流程已在图中体现,最后到了ApplicationClientProtocolPBClientImpl后会根据ResourceManager的逻辑选择合适的nodemanager进行任务的分配;
1.3 MapReduce任务流程
- MapReduce任务: 这里以最简单的MR WordCount举例:
①输入:一个文本文件;
②分片(split):
把输入的文件切分成split,具体逻辑如下:
((double) bytesRemaining)/splitSize > SPLIT_SLOP
其中bytesRemaining是文件的字节长度,bytesRemaining要满足以上条件,每次增加splitSize个字节长度;
splitSize是以下三个参数的中间值;
mapreduce.input.fileinputformat.split.maxsize
mapreduce.input.fileinputformat.split.minsize
blockSize
SPLIT_SLOP一般是1.1;
切分的split存储在集合中:
List splits = new ArrayList();
主要是以FileSplit的形式存储在集合中:其中有文件路径及文件名,开始到结束的字节数,hostname等多种信息;
splits.add(makeSplit(path, length-bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
①、②步在MapReduce任务开始之前会完成;
③每个split对应一个MapTask;
④Map做好相应逻辑后,spill到环形缓冲区(环形缓冲区可以同时读和写,读:Map的结果,写:往磁盘写排序后的内容);
⑤环形缓冲区的内容,经过快速排序后溢出到本地磁盘中;
⑥磁盘中的文件经过几轮归并排序,Merge成一个大的文件;
⑦步骤⑥的结果会向MRAppMaster进行汇报,然后MRAppMaster会通知ReduceTask启动;
⑧Reduce阶段拉取几个MapTask⑥步中的partition值一样的大文件,然后又进行几轮归并排序,最后合成一个或多个大文件写出到output文件夹中;
详细过程已在图中描述:
核心难点在于环形缓冲区的设计,后续有时间单独写一篇文章进行讲解;
原创作品,禁止转载!!!感谢支持~
版权归原作者 CLAY157 所有, 如有侵权,请联系我们删除。