0


hadoop生态圈面试精华之MapReduce(二)

hadoop生态圈面试精华之MapReduce(二)

shuGle为什么要排序?
问过的一些公司:携程(2021.09),网易有道(2021.09) 参考答案:
shuffle排序,按字典顺序排序的,目的是把相同的的key可以提前一步放到一起。
sort是用来shuffle的,shuffle就是把key相同的东西弄一起去,其实不一定要sort也能shuffle,那为什么要sort排序呢?
sort是为了通过外排(外部排序)降低内存的使用量:因为reduce阶段需要分组,将key相同的放在一起进 行规约,使用了两种算法:hashmap和sort,如果在reduce阶段sort排序(内部排序),太消耗内存,而 map阶段的输出是要溢写到磁盘的,在磁盘中外排可以对任意数据量分组(只要磁盘够大),所以,map 端排序(shuffle阶段),是为了减轻reduce端排序的压力。

说一下map是怎么到reduce的?
问过的一些公司:蔚来(2021.09) 参考答案:
Reduce的数据来源于Map,Map的输出即是Reduce的输入,Reduce需要通过Shuffle来获取数据。从Map输出到Reduce输入的整个过程可以广义地称为Shuffle。Shuffle横跨Map端和Reduce端,在Map端 包括Spill过程,在Reduce端包括copy和sort过程,如图所示:
在这里插入图片描述
说一下你了解的用哪几种shuGle机制?
问过的一些公司:蔚来(2021.09) 参考答案:
Map端:Spill机制,Reduce端:copy和sort机制

MapReduce的数据处理过程
问过的一些公司:阿里,网易
回答技巧:这里也可以参考MapReduce工作原理的版本二:Map、Reduce任务中ShuGle和排序的过程的 图
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
1、任务切分:对文件进行逻辑切片,切片按照范围划分,默认128M一片。
一个文件至少有一个切片,每个切片运行一个maptask,如果文件超过128M,同一个输入文件会有多个
maptask运行;为减少资源浪费,如果最后一个切片大小小于1.1*128M,将不会被切分处理。
2、输入对象:FileInputFormat.setInputPaths()方法,指定数据输入路径;输入目录中可以有单个或多个 文件。
读取数据、生成K-V对:由继承RecordReader的LineRecordReader类中的readLine()方法从输入的切片中读 取数据;每读取一行执行一次,生成一组K-V。
3、map()方法:以单词统计为例,自定义的WordCountMapper类继承父类Mapper,接收K-V对,重写
map()方法的业务逻辑。
map()的业务逻辑中,对数据进行切分,遍历数组,生成新的K-V对;由context.write(nk,nv)方法输出新的 K-V。
map()方法执行时机:一对K-V执行一次。
4、map()的输出:context.write()被调用时,OutputController组件会将新的K-V输出到数组缓存区,写入 数组缓存区中的还有新K-V的元数据;
5、KV分区:
MapOutputBuffer类调用collect(nk,nv,partition)方法接收新K-V;partition调用HashPartitioner组件; HashPartitioner.getPartition(nk,nv){
nk.hashcode%numberReduceTasks;

}
获得分区,得到区号,返回给partition。
6、区内排序:①按照分区排序;②区内数据再按照K进行排序。
7、溢出:spiller,当缓冲区中的数据到达80%时,进行分区、排序,将数据溢出,当前处于阻塞状态, 防止写入数据。(根据数据量大小溢出,至少一次)
8、归并、区内排序:将数组缓冲区中分区排序完的数据,用Merger组件进行归并,写入磁盘;同时进 行区内排序。
9、局部聚合:调用Combiner组件,根据相同K进行数据聚合。

10、写入本地磁盘:将归并排序完成的数据写入本地磁盘,此阶段提供http下载,便于数据传输。
11、拉取数据:reducetask分别拉取属于自己的数据(本地–>分区经由网络传输)
12、归并排序:调用Merger组件,按照K进行排序。
13、分组:调用GroupingComparator组件中的CompareTo(preK,postK)方法,将相同K的分到一组,放入 同一迭代器中。
14、聚合:reduce(K,iterator<>,context)方法中将相同K的数据进行聚合操作,聚合一次得到一组K-V。
15、输出:由TextOutputFormat的write方法,写出到HDFS(或本地磁盘)。

map join的原理(实现)?应用场景?
问过的一些公司:美团,阿里参考答案:
map join流程
Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多 份,让每个map task内存中存在一份
(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
MapJoin简单说就是在Map阶段将小表读入内存,顺序扫描大表完成Join。减少昂贵的shuGle操作及reduce操作
MapJoin分为两个阶段:
通过MapReduce Local Task,将小表读入内存,生成HashTableFiles上传至Distributed Cache中,这里会HashTableFiles进行压缩。
MapReduce Job在Map阶段,每个Mapper从Distributed Cache读取HashTableFiles到内存中,顺序扫描大表,在Map阶段直接进行Join,将数据传递给下一个MapReduce任务。
使用场景
MapJoin通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数
hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。
Hive0.7之前,需要使用hint提示 /*+ mapjoin(table) */才会执行MapJoin,否则执行Common Join,但在0.7
版本之后,默认自动会转换Map Join,由参数hive.auto.convert.join来控制,默认为true.
假设a表为一张大表,b为小表,并且hive.auto.convert.join=true,那么Hive在执行时候会自动转化为MapJoin。
应用场景
Map Join 实现方式一:分布式缓存
使用场景:一张表十分小、一张表很大。用法:
在提交作业的时候先将小表文件放到该作业的DistributedCache中,然后从DistributeCache中取出该小表 进行join (比如放到Hash Map等等容器中)。然后扫描大表,看大表中的每条记录的join key /value值是否能够在内存中找到相同join key的记录,如果有则直接输出结果。
DistributedCache是分布式缓存的一种实现,它在整个MapReduce框架中起着相当重要的作用,他可以支 撑我们写一些相当复杂高效的分布式程序。说回到这里,JobTracker在作业启动之前会获取到DistributedCache的资源uri列表,并将对应的文件分发到各个涉及到该作业的任务的TaskTracker上。另外,关于DistributedCache和作业的关系,比如权限、存储路径区分、public和private等属性。

reduce join如何执行(原理)
问过的一些公司:美团参考答案:
reduce side join是一种最简单的join方式,其主要思想如下:
在map阶段,map函数同时读取两个文件File1和File2,为了区分两种来源的key/value数据对,对每条数 据打一个标签> (tag),比如:tag=0表示来自文件File1,tag=2表示来自文件File2。即:map阶段的主要任务是对不同文件中的数据打标签。> 在reduce阶段,reduce函数获取key相同的来自File1和File2文件的value list,
然后对于同一个key,对File1和File2中的数据进行join(笛卡尔乘积)。即:reduce阶段进行实际的连接 操作。

MapReduce为什么不能产生过多小文件
问过的一些公司:好未来x2 参考答案:
默认情况下,TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会单独交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。
MapReduce大量小文件的优化策略:
最优方案:在数据处理的最前端(预处理、采集),就将小文件合并成大文件,在上传到HDFS做后续 的分析
补救措施:如果HDFS中已经存在大量的小文件了,可以使用另一种Inputformat来做切片
(CombineFileInputformat),它的切片逻辑跟FileInputformat不同,它可以将多个小文件从逻辑上规划 到一个切片中,这样,多个小文件就可以交给一个 MapTask 处理。

MapReduce分区及作用
可回答:Map默认是HashPartitioner如何自定义分区问过的一些公司:百度,头条,字节(2021.08)
参考答案:
1、默认分区
系统自动调用HashPartitioner类进行分区,默认分区是根据key的hashCode对ReduceTasks个数取模得到 的。用户没法控制哪个key存储到哪个分区。

  1. public class HashPartitioner<K, V> extends Partitioner<K, V> {
  2. public int getPartition(K key, V value, int numReduceTasks) {
  3. return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; 4 } 5 }

2、自定义分区

  1. 自定义类继承Partitioner,重写getPartition()方法
  2. public class CustomPartitioner extends Partitioner<Text, FlowBean> {
  3. @Override
  4. public int getPartition(Text key, FlowBean value, int numPartitions) {
  5. // 控制分区代码逻辑 5 … … 6 return partition; 7 } 8 }
  6. 在Job驱动中,设置自定义Partitioner

1 job.setPartitionerClass(CustomPartitioner.class); 2

  1. 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

1 job.setNumReduceTasks(num); 2
3、全局排序
全局排序是通过将进入map端之前的数据进行随机采样,在采取的样本中设置分割点,通过分割点将数 据进行分区,将设置的分割点保存在二叉树中,Map Task每输出一个数据就会去查找其对应的区间,以此来达到分区效果。
作用
根据业务实际需求将统计结果按照条件产生多个输出文件(分区) 多个reduce任务运行,提高整体job的运行效率

ReduceTask数量和分区数量关系
问过的一些公司:携程(2021.09) 参考答案:

  1. 如果ReduceTask的数量 > getPartition的结果数(ReduceTask > 分区数量),则会多产生几个空的输出文件part-r-000xx;
  2. 如果1<ReduceTask的数量 < getPartition的结果数(1 < ReduceTask < 分区数量),则有一部分分区数据无处安放,会Exception;
  3. 如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000。 案例分析 例如:假设自定义分区数为5,则 1)job.setNumReduceTasks(1); => 会正常运行,只不过会产生一个输出文件 2)job.setNumReduceTasks(2); => 会报错 3)job.setNumReduceTasks(6); => 大于5,程序会正常运行,会产生空文件

Map的分片有多大
问过的一些公司:360 参考答案:
Hadoop中在 在进行map计算之前,mapreduce会根据输入文件计算输入分片(input split),每个输入分片(input split)针对一个map任务,输入分片(input split)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。
源码中切片大小:
Math.max(minSize, Math.min(maxSize, blockSize));
mapreduce.input.fileinputformat.split.minsize=1
默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue 默认值Long.MAXValue
因此,默认情况下,切片大小=blocksize。
Hadoop 2.7.2版本及之前默认64MB,Hadoop 2.7.3版本及之后默认128M,可以在hdfs-site.xml中设置dfs.block.size,注意单位是byte。
分片大小范围可以在mapred-site.xml中设置,mapred.min.split.size和mapred.max.split.size,
minSplitSize大小默认为1B,maxSplitSize大小默认为Long.MAX_VALUE = 9223372036854775807
切片大小设置:
maxsize(切片最大值):参数如果调得比blockSize小,则会让切片变小,而且就等于配置的这个参数 的值。
minsize(切片最小值):参数调的比blockSize大,则可以让切片变得比blockSize还大。

MapReduce join两个表的流程?
可回答:MapReduce里join怎么做
问过的一些公司:字节,阿里(2021.09) 参考答案:
假设要进行join的数据分别来自File1和File2
1、map side join
之所以存在reduce side join,是因为在map阶段不能获取所有需要的join字段,即:同一个key对应的字段可能位于不同map中。Reduce side join是非常低效的,因为shuffle阶段要进行大量的数据传输。

Map side join是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中存在一份
(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。
为了支持文件的复制,Hadoop提供了一个类DistributedCache,使用该类的方法如下:

  1. 用户使用静态方法DistributedCache.addCacheFile()指定要复制的文件,它的参数是文件的URI(如 果是HDFS上的文件,可以这样:hdfs://namenode:9000/home/XXX/file,其中9000是自己配置的NameNode端口号)。JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
  2. 用户使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取 相应的文件。 2、SemiJoin SemiJoin,也叫半连接,是从分布式数据库中借鉴过来的方法。它的产生动机是:对于reduce side join,跨机器的数据传输量非常大,这成了join操作的一个瓶颈,如果能够在map端过滤掉不会参加join 操作的数据,则可以大大节省网络IO。 实现方法很简单:选取一个小表,假设是File1,将其参与join的key抽取出来,保存到文件File3中,File3 文件一般很小,可以放到内存中。在map阶段,使用DistributedCache将File3复制到各个TaskTracker上, 然后将File2中不在File3中的key对应的记录过滤掉,剩下的reduce阶段的工作与reduce side join相同。 3、reduce side join + BloomFilter 在某些情况下,SemiJoin抽取出来的小表的key集合在内存中仍然存放不下,这时候可以使用BloomFiler 以节省空间。 BloomFilter最常见的作用是:判断某个元素是否在一个集合里面。它最重要的两个方法是:add() 和contains()。最大的特点是不会存在false negative,即:如果contains()返回false,则该元素一定不在集合中,但会存在一定的true negative,即:如果contains()返回true,则该元素可能在集合中。 因而可将小表中的key保存到BloomFilter中,在map阶段过滤大表,可能有一些不在小表中的记录没有过 滤掉(但是在小表中的记录一定不会过滤掉),这没关系,只不过增加了少量的网络IO而已。

手撕一段简单的MapReduce程序
问过的一些公司:美团,一点资讯参考答案:
按照 MapReduce 编程规范,分别编写 Mapper,Reducer,Driver
输入数据

  1. ss ss
  2. cls cls
  3. jiao
  4. banzhang
  5. xue
  6. hadoop 7 输出结果数据
  7. banzhang 1
  8. cls 2
  9. hadoop 1
  10. jiao 1
  11. ss 2
  12. xue 1 7 1、编写 Mapper 类 import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** • KEYIN:map阶段输入的key的类型,LongWritable • VALUEIN:map阶段输入的value的类型,Text • KEYOUT:map阶段输出的key的类型,Text • VALUEOUT:map阶段输出的value的类型,IntWritable */ public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 14
  13. private Text outK = new Text();
  14. private IntWritable outV = new IntWritable(1); 17 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 1 获取一行 // ss ss String line = value.toString();

// 2 切割
// ss
// ss
String[] words = line.split(" ");

// 3 循环输出
for (String word : words) {
// 封 装 outK outK.set(word);

// 写出
context.write(outK, outV);
}
}
}
2、编写Reducer 类

  1. import java.io.IOException;
  2. import org.apache.hadoop.io.IntWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer; /** • KEYIN:reducer阶段输入的key的类型,LongWritable • VALUEIN:reducer阶段输入的value的类型,Text • KEYOUT:reducer阶段输出的key的类型,Text • VALUEOUT:reducer阶段输出的value的类型,IntWritable */ public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable outV = new IntWritable(); @Override protected void reduce(Text key, Iterable values,Context context) throws IOException, InterruptedException { // 1 累加求和 int sum = 0; // ss,(1,1) for (IntWritable count : values) { sum += count.get(); } // 2 输出 outV.set(sum); context.write(key,outV); } } 3、编写Driver 驱动类

a. import java.io.IOException;
b. import org.apache.hadoop.conf.Configuration;
c. import org.apache.hadoop.fs.Path;
d. import org.apache.hadoop.io.IntWritable;
e. import org.apache.hadoop.io.Text;
f. import org.apache.hadoop.mapreduce.Job;
g. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
h. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 9
10 public class WordCountDriver { 11
12 public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
13

  1. // 1 获取配置信息以及封装任务(获取job)
  2. Configuration conf = new Configuration();
  3. Job job = Job.getInstance(conf); 17
  4. // 2 设置jar加载路径
  5. job.setJarByClass(WordCountDriver.class); 20
  6. // 3 关联map和reduce类
  7. job.setMapperClass(WordCountMapper.class);
  8. job.setReducerClass(WordCountReducer.class); 24
  9. // 4 设置map输出的kv类型
  10. job.setMapOujob.setMapOutputValueClass(IntWritable.class);

// 5 设置最终输出的kv类型job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);

// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(“D:\BigDataTest\06_input\inputword”));
FileOutputFormat.setOutputPath(job, new Path(“D:\BigDataTest\06_output\output1”));
// FileInputFormat.setInputPaths(job, new Path(args[0]));
// FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交job
boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}
4、本地测试即可
5、也可以打包在集群上测试

reduce任务什么时候开始?
问过的一些公司:作业帮参考答案:
只要有map任务完成,就可以开始reduce任务

MapReduce的reduce使用的是什么排序?
问过的一些公司:美团参考答案:
这里把map和reduce的都说一下
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对 缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁 盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写 磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大 文件;如果内存中文件大小或者 数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

MapReduce怎么确定MapTask的数量?
可回答:MapReduce中的task数量确定中的MapTask数量确定可参考该题 问过的一些公司:携程,好未来,蘑菇街
参考答案:
MapTask数量影响因素
影响map个数(split个数)的主要因素有:
文件的大小。当块(dfs.block.size)为128m时,如果输入文件为128m,会被划分为1个split;当块为
256m,会被划分为2个split。
文件的个数。FileInputFormat按照文件分割split,并且只会分割大文件,即那些大小超过HDFS块的大小 的文件。如果HDFS中dfs.block.size设置为128m,而输入的目录中文件有100个,则划分后的split个数至 少为100个。
splitSize的大小。分片是按照splitszie的大小进行分割的,一个split的大小在没有设置的情况下,默认等 于hdfs block的大小。已知以下参数

  1. input_file_num : 输入文件的个数
  2. block_size : hdfs的文件块大小,2.7.3默认为128M,可以通过hdfs-site.xml中的dfs.block.size 参数进行设置
  3. total_size : 输入文件整体的大小,由框架循环叠加计算 4MapTask的数量计算原则为:
  4. 默认map个数 如果不进行任何设置,默认的map个数是和blcok_size相关的。

1 default_num = total_size / block_size; 2

  1. 自定义设置分片的minSize、maxSize 如果在MapReduce的驱动程序(main方法)中通过以下方法设置了分片的最小或最大的大小
  2. //设置最小分片大小,单位byte
  3. FileInputFormat.setMinInputSplitSize(job,1024102410L); //10MB
  4. //设置最大分片大小,单位byte
  5. FileInputFormat.setMaxInputSplitSize(job,1024L); //1KB 5 应用程序可以通过数据分片的最大和最小大小两个参数来对splitsize进行调节,则计算方式就变成了

1 splitSize=Math.max(minSize, Math.min(maxSize, blockSize) 2
其中maxSize即方法 setMaxInputSplitSize 设置的值,minSize即方法·setMinInputSplitSize·设置的值。
其设置原则就是

  1. 要增加map的个数,调整maxSize<blockSize;
  2. 要减小map的个数,调整minSize>blockSize。 3 总结:默认block为128M,当输入文件大于128M时,则会进行分块,分块后剩余文件大小大于128*1.1 时,则会继续分块,否则不再分块,有多少块,就有多少MapTask。

Map数量由什么决定
问过的一些公司:远景智能(2021.08) 参考答案:
影响map个数(split个数)的主要因素有:
文件的大小。当块(dfs.block.size)为128m时,如果输入文件为128m,会被划分为1个split;当块为
256m,会被划分为2个split。
文件的个数。FileInputFormat按照文件分割split,并且只会分割大文件,即那些大小超过HDFS块的大小 的文件。如果HDFS中dfs.block.size设置为128m,而输入的目录中文件有100个,则划分后的split个数至 少为100个。
splitSize的大小。分片是按照splitszie的大小进行分割的,一个split的大小在没有设置的情况下,默认等 于hdfs block的大小。已知以下参数

  1. input_file_num : 输入文件的个数
  2. block_size : hdfs的文件块大小,2.7.3默认为128M,可以通过hdfs-site.xml中的dfs.block.size 参数进行设置
  3. total_size : 输入文件整体的大小,由框架循环叠加计算

MapReduce的map进程和reducer进程的jvm垃圾回收器怎么选择可以提高吞吐量?
问过的一些公司:网易云音乐参考答案:
开启JVM重用
属性:mapred.job.reuse.jvm.num.tasks,默认值是1,在一个taskTracker上对于给定的作业的每个jvm上 可以运行任务最大数。-1表示无限制,即同一个jvm可以被该作业的所有任务使用。

  1. conf.set(“mapreduce.job.jvm.numtasks”, “-1”); //开启jvm重用
  2. job.getConfiguration().setInt(job.JVM_NUMTASKS_TORUN, -1); //开启jvm重用3 MapReduce的task数目划分 可回答:Hadoop reduce数量怎么确定 问过的一些公司:字节,美团(2021.09),携程(2021.09) 参考答案: 在MapReduce当中,每个mapTask处理一个切片split的数据量,要注意切片与block块的概念很像,但是 block块是HDFS当中存储数据的单位,切片split是MapReduce当中每个MapTask处理数据量的单位。

在介绍map task的数量及切片机制之前先了解这两个概念:
block块(数据块,物理划分)
block是HDFS中的基本存储单位,hadoop1.x默认大小为64M,而hadoop2.x默认块大小为128M。文件上 传到HDFS,就要划分数据成块,这里的划分属于物理的划分(实现机制也就是设置一个read方法,每次 限制最多读128M的数据后调用write进行写入到hdfs),块的大小可通过 dfs.block.size配置。block采用冗余机制保证数据的安全:默认为3份,可通过dfs.replication配置。
注意:当更改块大小的配置后,新上传的文件的块大小为新配置的值,以前上传的文件的块大小为以前 的配置值。
split分片(数据分片,逻辑划分)
Hadoop中split划分属于逻辑上的划分,目的只是为了让map task更好地获取数据。split是通过hadoop中的 InputFormat接口中的getSplits()方法得到的。数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
1、MapTask个数
在这里插入图片描述

综上所述
Split切片数就是我们的MapTask数量,切片的大小是可以自行设置的 切
大小的计算公式
Math.max(minSize, Math.min(maxSize, blockSize));
mapreduce.input.fileinputformat.split.minsize=1
// 默认值为1
mapreduce.input.fileinputformat.split.maxsize= Long.MAXValue // 默认值
Long.MAXValue
// blockSize为128M

2、如何控制mapTask的个数
MapTask数量设置不当带来的问题:
Map Task数量过多的话,会产生大量的小文件, 过多的Mapper创建和初始化都会消耗大量的硬件资源。
Map Task数量过少,就会导致并发度过小,Job执行时间过长,无法充分利用分布式硬件资源。
那么,如果需要控制maptask的个数,我们只需要调整maxSize和minsize这两个值,那么切片的大小就会 改变,切片大小改变之后,mapTask的个数就会改变,我们可以在MapReduce的驱动程序(main方法)中 通过以下方法设置了分片的最小或最大的大小

  1. //设置最小分片大小,单位byte
  2. FileInputFormat.setMinInputSplitSize(job,1024102410L); //10MB
  3. //设置最大分片大小,单位byte
  4. FileInputFormat.setMaxInputSplitSize(job,1024L); //1KB 5 设置原则如下
  5. 要增加map的个数,调整maxSize<blockSize;
  6. 要减小map的个数,调整minSize>blockSize。 3三、ReduceTask数量的确定 Reduce任务是一个数据聚合的步骤,数量默认为1。使用过多的Reduce任务则意味着复杂的shuffle,并 使输出文件数量激增。而reduce的个数设置相比map的个数设置就要简单的多,只需要设置在驱动程序 中通过 即可。 job.setNumReduceTasks(int n) MapReduce作业执行的过程中,中间的数据会存在什么地方?不会存在内存中么? 问过的一些公司:小米参考答案: 不会存在内存,存在本地磁盘 Mapper端进行combiner之后,除了速度会提升,那从Mapper端到Reduece 端的数据量会怎么变? 问过的一些公司:祖龙娱乐参考答案: 数据量会减少,因为combiner之后,会将相同的key进行一次聚合,数据量会在这时候减少一部分 map输出的数据如何超出它的小文件内存之后,是落地到磁盘还是落地到HDFS中? 问过的一些公司:祖龙娱乐参考答案: 数据会落地到磁盘中,因为map和reduce操作,就是一次次的I/O请求

Map到Reduce默认的分区机制是什么?
问过的一些公司:祖龙娱乐参考答案:
默认分区是根据key的hashCode对ReduceTask个数取模得到的。用户没法控制哪个key存储到哪个分区。
结合wordcount述说MapReduce,具体各个流程,map怎么做,reduce怎么做
可回答:1)给一个场景,mapreduce统计词频原理;2)WordCount在MapReduce中键值对的变化;3) 讲下MapReduce的具体任务过程;4)一次MapReduce过程
回答技巧:比如问结合实际场景说下MapReduce,可以说根据wordcount来讲一下MapReduce过程 问过的一些公司:字节,大华,趋势科技,一点资讯,奇安信,58同城(2021.08),阿里(2021.09) 参考答案:
先来看一张图
片

具体各个阶段做了什么
spliting :Documents会根据切割规则被切成若干块,
map阶段:然后进行Map过程,Map会并行读取文本,对读取的单词进行单词分割,并且每个词以键值 对<key,value>形式生成。

  1. 例如:读取到”Hello World Hello Java“,分割单词形成Map
  2. <Hello,1> <World,1><Hello,1> <Java,1> 3 combine阶段:接下来Combine(该阶段是可以选择的,Combine其实也是一种reduce)会对每个片相同 的词进行统计。 shuffle阶段:将Map输出作为reduce的输入的过程就是shuffle,次阶段是最耗时间,也是重点需要优化的阶 段。shuffle阶段会对数据进行拉取,对最后得到单词进行统计,每个单词的位置会根据Hash来确定所在 的位置, reduce阶段:对数据做最后的汇总,最后结果是存储在hdfs上。

MapReduce数据倾斜产生的原因及其解决方案

可回答:Hadoop数据倾斜问题怎么解决
问过的一些公司:腾讯,大华,多益,冠群驰骋,网易,端点数据(2021.07),阿里蚂蚁(2021.08),字节
(2021.08),茄子科技(2021.09),快手(2021.09)
参考答案:
1、数据倾斜现象
数据倾斜就是数据的key的分化严重不均,造成一部分数据很多,一部分数据很少的局面。 数据频率倾斜——某一个区域的数据量要远远大于其他区域。
数据大小倾斜——部分记录的大小远远大于平均值。
2、数据倾斜产生的原因

  1. Hadoop框架的特性 Job数多的作业运行效率会相对比较低; countdistinct、group by、join等操作,触发了Shuffle动作,导致全部相同key的值聚集在一个或几个节点上,很容易发生单点问题。
  2. 具体原因 key 分布不均匀,某一个key的条数比其他key多太多; 业务数据自带的特性; 建表时考虑不全面; 可能某些HQL 语句自身就存在数据倾斜问题。 3、数据倾斜解决方案 从业务和数据方面解决数据倾斜有损的方法:找到异常数据。无损的方法: 对分布不均匀的数据,进行单独计算,首先对key做一层hash,把数据打散,让它的并行度变 大,之后进行汇集 数据预处理 Hadoop平台的解决方法 1)针对join产生的数据倾斜 场景一:大表和小表join产生的数据倾斜 ① 在多表关联情况下,将小表(关联键记录少的表)依次放到前面,这样能够触发reduce端减少操作次数,从而减少运行时间。 ② 同时使用Map Join让小表缓存到内存。在map端完成join过程,这样就能省掉redcue端的工作。需要注意:这一功能使用时,需要开启map-side join的设置属性:set hive.auto.convert.join=true(默认是false) ③ 还可以对使用这个优化的小表的大小进行设置:

1 set hive.mapjoin.smalltable.filesize=25000000(默认值25M) 2
场景二:大表和大表的join产生的数据倾斜
① 将异常值赋一个随机值,以此来分散key,均匀分配给多个reduce去执行
② 如果key值都是有效值的情况下,需要设置以下几个参数来解决
1 set hive.exec.reducers.bytes.per.reducer = 1000000000 2
也就是每个节点的reduce,其 默认是处理数据地大小为1G,如果join 操作也产生了数据倾斜,那么就在
hive 中设定

  1. set hive.optimize.skewjoin = true;
  2. set hive.skewjoin.key = skew_key_threshold (default = 100000) 3 46 group by 造成的数据倾斜 解决方式相对简单:
  3. hive.map.aggr=true (默认true) 这个配置项代表是否在map端进行聚合,相当于Combiner
  4. hive.groupby.skewindata 3 47 count(distinct)或者其他参数不当造成的数据倾斜 ① reduce个数太少 1 set mapred.reduce.tasks=800 2 ② HiveQL中包含count(distinct)时 使用sum…group byl来替代。例如select a,sum(1) from (select a, b from t group by a,b) group by a;

Map Join为什么能解决数据倾斜
问过的一些公司:阿里蚂蚁(2021.08) 参考答案:
Map Join概念:将其中做连接的小表(全量数据)分发到所有 MapTask 端进行 Join,从而避免了reduceTask,前提要求是内存足以装下该全量数据。
Map Join通常用于一个很小的表和一个大表进行join的场景,具体小表有多小,由参数hive.mapjoin.smalltable.filesize来决定,该参数表示小表的总大小,默认值为25000000字节,即25M。 一般默认就够了,无须修改。

  1. 在多表关联情况下,将小表(关联键记录少的表)依次放到前面,这样能够触发reduce端减少操作次 数,从而减少运行时间。
  2. 同时使用Map Join让小表缓存到内存。在map端完成join过程,这样就能省掉redcue端的工作。需要注意:这一功能使用时,需要开启map-side join的设置属性:set hive.auto.convert.join=true(默认是false) 注意: 大表放硬盘,小表放内存( 前提要求是内存足以装下该全量数据 )。

MapReduce运行过程中会发生OOM,OOM发生的位置?
问过的一些公司:作业帮参考答案:
背景:Hive中跑MapReduce Job出现OOM问题

该异常发生既不是map阶段,也不是reduce阶段,发现不是执行过程,而是driver提交job阶段就OOM 了。

  1. Hive中XMLEncoder序列化MapredWork引发OutOfMemoryError
  2. XMLEncoder导致java.lang.OutOfMemoryError: GC overhead limit exceeded 3 先来看下,Hive中出现OOM的异常原因大致分为以下几种: Map 阶 段 OOM Reduce阶段OOM Driver提交Job阶段OOM 1、Map阶段OOM 发生OOM的几率很小,除非你程序的逻辑不正常,亦或是程序写的不高效,产生垃圾太多。 2、Reduce阶段OOM 1)data skew 数据倾斜 data skew是引发这个的一个原因。 key分布不均匀,导致某一个reduce所处理的数据超过预期,导致jvm频繁GC。2)value对象过多或者过大 某个reduce中的value堆积的对象过多,导致jvm频繁GC。 解决方案: ① 增加reduce个数,set mapred.reduce.tasks=300,。 ② 在hive-site.xml中设置,或者在hive shell里设置 set mapred.child.java.opts = -Xmx512m 或者只设置reduce的最大heap为2G,并设置垃圾回收器的类型为并行标记回收器,这样可以显著减少GC 停顿,但是稍微耗费CPU。 set mapred.reduce.child.java.opts=-Xmx2g -XX:+UseConcMarkSweepGC; ③ 使用map join 代替 common join. 可以set hive.auto.convert.join = true ④ 设置 hive.optimize.skewjoin = true 来解决数据倾斜问题 3、Driver提交Job阶段OOM job产生的执行计划的条目太多,比如扫描的分区过多,上到4k-6k个分区的时候,并且是好几张表的分 区都很多时,这时做join。 究其原因,是因为序列化时,会将这些分区,即hdfs文件路径,封装为Path对象,这样,如果对象太多 了,而且Driver启动的时候设置的heap size太小,则会导致在Driver内序列化这些MapRedWork时,生成的对象太多,导致频繁GC,则会引发如下异常:
  3. java.lang.OutOfMemoryError: GC overhead limit exceeded
  4. at sun.nio.cs.UTF_8.newEncoder(UTF_8.java:53)
  5. at java.beans.XMLEncoder.createString(XMLEncoder.java:572) 4 解决方案: 经过查询, 是因为扫描的表的分区太多,上到3千到6千个分区,这样在对计划进行序列化时,仅仅是路径对象Path就会耗去大半Driver,如果Driver设置的heap太小,甚至都会OOM。 解决思路

① 减少分区数量,将历史数据做成一张整合表,做成增量数据表,这样分区就很少了。
② 调大Hive CLI Driver的heap size, 默认是256MB,调节成512MB或者更大。具体做法是在bin/hive bin/hive-config里可以找到启动CLI的JVM OPTIONS。这里我们设置
1 export HADOOP_HEAPSIZE=512 2
双管齐下, 即做成了整合,方便使用,又调节了Hive CLI Driver的heap size,保证线上的运行稳定。
遇到这种问题:
一是HiveQL的写法上,尽量少的扫描同一张表,并且尽量少的扫描分区。扫太多,一是job数多, 慢,二是耗费网络资源,慢。
二是Hive的参数调优和JVM的参数调优,尽量在每个阶段,选择合适的jvm max heap size来应对OOM的问题。

MapReduce用了几次排序,分别是什么?
可回答:MapReduce过程用到了哪些排序?
问过的一些公司:美团,米哈游,大华(2021.07),字节(2021.08) 参考答案:
在Map任务和Reduce任务的过程中,一共发生了3次排序

  1. 当map函数产生输出时,会首先写入内存的环形缓冲区,当达到设定的阀值,在刷写磁盘之前,后 台线程会将缓冲区的数据划分成相应的分区。在每个分区中,后台线程按键进行内排序
  2. 在Map任务完成之前,磁盘上存在多个已经分好区,并排好序的,大小和缓冲区一样的溢写文件,这 时溢写文件将被合并成一个已分区且已排序的输出文件。由于溢写文件已经经过第一次排序,所有合并 文件只需要再做一次排序即可使输出文件整体有序。
  3. 在reduce阶段,需要将多个Map任务的输出文件copy到ReduceTask中后合并,由于经过第二次排序, 所以合并文件时只需再做一次排序即可使输出文件整体有序 在这3次排序中第一次是内存缓冲区做的内排序,使用的算法使快速排序,第二次排序和第三次排序都 是在文件合并阶段发生的,使用的是归并排序。

MapReduce压缩方式
问过的一些公司:转转参考答案:
1、MapReduce支持的压缩方式
压缩格式 hadoop自带? 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用
DEFLATE
.deflate 否 和文本处理一样,不需要修改
Gzip 是,直接使用
DEFLATE
.gz 否 和文本处理一样,不需要修改

bzip2 是,直接使用
bzip2
.bz2 是 和文本处理一样,不需要修改

LZO 否,需要安装
LZO
.lzo 是 需要建索引,还需要指定输入格式
Snappy 否,需要安装
Snappy
.snappy 否 和文本处理一样,不需要修改
2、压缩性能比较
压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s
3、压缩方式选择
压缩方式选择时重点考虑:压缩/解压缩速度、压缩率(压缩后存储大小)、压缩后是否可以支持切 片。

Gzip压缩
优点:压缩率比较高,而且压缩/解压速度也比较快;Hadoop本身支持,在应用中处理Gzip格式的文件 就和直接处理文本一样;大部分Linux系统都自带Gzip命令,使用方便。
缺点:不支持Split。
应用场景:当每个文件压缩之后在130M以内的(1个块大小),都可以考虑使用Gzip压缩格式。例如一 天或者一个小时的日志压缩成一个Gzip文件。

Bzip2压缩
优点:支持split;具有很高的压缩率,比gzip压缩率都高;hadoop本身支持,但不支持native;在linux 系统下自带bzip2命令,使用方便。
缺点:压缩/解压速度慢;不支持native。

应用场景:适合对速度要求不高,但需要较高的压缩率的时候,可以作为mapreduce作业的输出格式; 或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情 况;或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持split,而且兼容之前的应用程序
(即应用程序不需要修改)的情况。

Lzo压缩
优点:压缩/解压速度也比较快,合理的压缩率;支持split,是hadoop中最流行的压缩格式;可以在
linux系统下安装lzop命令,使用方便。
缺点:压缩率比gzip要低一些;hadoop本身不支持,需要安装;在应用中对lzo格式的文件需要做一些 特殊处理(为了支持split需要建索引,还需要指定inputformat为lzo格式)。
应用场景:一个很大的文本文件,压缩之后还大于200M以上的可以考虑,而且单个文件越大,lzo优点 越越明显。

Snappy压缩
优点:高速压缩速度和合理的压缩率。
缺点:不支持split;压缩率比gzip要低;hadoop本身不支持,需要安装;
应用场景:当Mapreduce作业的Map输出的数据比较大的时候,作为Map到Reduce的中间数据的压缩格 式;或者作为一个Mapreduce作业的输出和另外一个Mapreduce作业的输入。
4、压缩位置选择
压缩可以在MapReduce作用的任意阶段启用
![输入端采用压缩
在有大量数据并计划重复处理的情况下,应该考虑对输入进行压缩。然而,你无须显示指定使用的编解 码方式。Hadoop自动检查文件扩展名,如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压 缩和解压。否则,Hadoop就不会使用任何编解码器。
mapper输出端采用压缩
当map任务输出的中间数据量很大时,应考虑在此阶段采用压缩技术。这能显著改善内部数据Shuffle过 程,而Shuffle过程在Hadoop处理过程中是资源消耗最多的环节。如果发现数据量大造成网络传输缓慢, 应该考虑使用压缩技术。可用于压缩mapper输出的快速编解码器包括LZO或者Snappy。
reducer输出采用压缩
在此阶段启用压缩技术能够减少要存储的数据量,因此降低所需的磁盘空间。当mapreduce作业形成作 业链条时,因为第二个作业的输入也已压缩,所以启用压缩同样有效。

MapReduce中怎么处理一个大文件
问过的一些公司:京东参考答案:
1、输入大文件时

  1. // 小于这个数据时进行合并
  2. conf.setLong(FileInputFormat.SPLIT_MINSIZE,10241024256L);
  3. // 大于这个数据时进行切分
  4. conf.setLong(FileInputFormat.SPLIT_MAXSIZE,102410241024); 5 2、输入大量小文件时 方式一:小文件先进行Merge操作再使用MapReduce 方式二:使用FileInputFormat子类CombineFileInputFormat重写RecordReader()将多个input path合并成一个InputSplit

Hadoop中用到了那些缓存机制? 问过的一些公司:大华(2021.07) 参考答案:
分布式缓存
就是在job任务执行前,将需要的文件拷贝到Task机器上进行缓存,提高mapreduce的执行效率。

YARN部分
介绍下YARN
可回答:1)YARN的RM和NM;2)YARN有哪些组件,如何分配资源;3)YARN的AM和RM的作用; 回答技巧:YARN的架构,执行流程等
问过的一些公司:字节,字节(2021.08),网易云音乐×2,蘑菇街x2,美团,小鹏汽车,一点资讯,头 条,央视网,海康x2,海康(2021.09),恒生(2021.09)
参考答案:
介绍YARN,可以先考虑下面两个问题

  1. 如何管理集群资源?
  2. 如何给任务合理分配资源? YARN是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平 台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。 YARN 作为一个资源管理、任务调度的框架,主要包含ResourceManager、NodeManager、 ApplicationMaster和Container模块。

YARN基础架构
在这里插入图片描述

  1. ResourceManager(RM)主要作用如下: 处理客户端请求监控NodeManager 启动或监控ApplicationMaster 资源的分配与调度
  2. NodeManager(NM)主要作用如下: 管理单个节点上的资源 处理来自ResourceManager的命令 处理来自ApplicationMaster的命令
  3. ApplicationMaster(AM)作用如下: 为应用程序申请资源并分配给内部的任务任务的监督与容错
  4. Container Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络 等。

YARN工作机制
可回答:1)YARN的任务提交流程;2)YARN的通信流程或工作流程;3)YARN执行一个application的流 程;4)Hadoop任务的YARN调度过程;5)YARN上提交资源(程序)的流程;6)YARN的资源调度流
程;7)YARN的任务提交流程
问过的一些公司:字节x2,字节(2021.08)-(2022.03),网易云音乐,快手,海康x2,转转,滴滴,作业 帮,虎牙(2021.09),四方伟业(2021.08),蔚来(2021.09)
参考答案:

在这里插入图片描述
1)MapReduce程序提交到客户端所在的节点。2)YarnRunner向ResourceManager申请一个Application。

  1. RM 将该应用程序的资源路径以及application_id返回给YarnRunner。
  2. 该程序将运行所需资源提交到 HDFS 上。5)程序资源提交完毕后,申请运行 mrAppMaster。6)RM 将用户的请求初始化成一个 Task。 7)其中一个NodeManager 领取到 Task 任务。 8)该 NodeManager 创建容器 Container,并产生 MRAppmaster。9)Container 从HDFS 上拷贝资源到本地。
  3. MRAppmaster 向RM 申请运行 MapTask 资源。
  4. RM 将运行 MapTask 任务分配给另外两个 NodeManager,另两个 NodeManager 分别领取任务并创建容器。
  5. MR 向两个接收到任务的NodeManager 发送程序启动脚本,这两个 NodeManager分别启动MapTask,MapTask 对数据分区排序。
  6. MrAppMaster 等待所有 MapTask 运行完毕后,向 RM 申请容器,运行 ReduceTask。14)ReduceTask 向 MapTask 获取相应分区的数据。 15)程序运行完毕后,MR 会向 RM 申请注销自己。

YARN有什么优势,能解决什么问题?
问过的一些公司:祖龙娱乐参考答案:
YARN的优点
解决了单点故障问题,由于每一个任务由一个AppMaster进行调度,且可进行AppMaster出错重试, 从而使单点故障影响到多个任务进行问题不存在。

解决了单点压力过大问题,每一个任务由一个AppMaster进行调度,而每一个AppMaster都是由集群 中资源较为充足的结点进行启动,调度任务,起到一个负载均衡的作用。
完成了资源管理和任务调度的解耦,Yarn只负责对集群资源的管理,各个计算框架只要继承了
AppMaster,就可以共同使用Yarn资源管理,更加充分地利用集群资源。
解决的问题
在Hadoop 1.x版本时,JobTracker和TaskTracker是常服务,资源管理和任务调度的耦合,而在Hadoop 2.x 版本之后,Yarn将二者分离,只有资源管理成为了常服务,而任务调度则变成只有任务在调度时,才启 用的临时服务。

YARN容错机制
问过的一些公司:美团参考答案:
在现实情况中,用户代码错误不断,进程奔溃,机器故障等等。使用hadoop的好处之一就是可以它能处 理这类故障并成功完成任务。需要考虑的实体失败任务为:任务(job),Application Master, NodeManager和ResourceManager。

任务失败
任务失败可能存在以下几种情况:
MapTask或者ReduceTask中由于代码原因抛出异常,jvm在关闭之前,会通知mrAppMaster这个task任务 失败,在mrAppMaster中,错误报告被写入到用户日志并且任务标记为失败,并释放jvm资源,供其他任 务使用。对于streaming任务,如果streaming进程以非0退出代码退出,则被标记为失败。这种行为由stream.non.zero.is.failure属性(默认值为true)控制。
jvm突然退出,可能是由于jvm缺陷而导致mr用户代码由于某种特殊原因造成jvm退出。nodeManage会将 这消息通知到mrAppMaster,标记此次任务失败。
任务挂起(可能是由于资源不足造成):一旦mrAppMaster一段时间没有接收到进度的更新,则将任务 标记为失败,nodeManager会将该jvm进程杀死。任务失败时长可以由mapreduce.task.timeout来设置。 如果为0 ,则表示关闭。如果关闭这个属性,那么可能会造成长时间运行的任务不会被标记为失败,被挂起的任务就会一直不被释放资源,长时间会造成集群效率降低,因此尽量避免这个设置。同时充分保 证每个任务定期更新进度。
处理阶段:
当mrAppMaster被告知,一个任务失败的时候,会重新调度该任务。mrAppMaster会尝试避免在以前失败 过的nodeManager重新调度该任务。此外,一个任务失败的次数超过4次,将不会再重新调度。这个数值 由mapreduce.map.maxattempts控制。如果一个任务失败次数大于该属性设置的,则整个作业都会失
败。对于一些应用程序中,不希望少部分任务失败,而导致整个作业失败,因为即使一些任务失败,作 业的输出结果也是可用的,我们可用通过运行任务失败的最大比例:maptask由mapreduce.map.failures.maxpercent,reducetask由mapreduce.reduce.failures.maxpercent来设置。任务尝试也是可以用来中止(killed),因为它是一个推测副本(如果一个任务执行时间比预期的慢的时候, 会启动另外一个相同的任务作为备份,这个任务为推测执行)或者它所在的nodeManager失败,导致该nodeManager所执行的任务被标记为killed,被中止的任务是不会被记录到任务运行尝试次数。

ApplicationMaster运行失败
在YARN中,ApplicationMaster有几次尝试次数,最多尝试次数由:mapreduce.am.max-attempts和
yarn.resourcemanager.am.max-attempts确定,默认为2。
mapreduce.am.max-attempts:表示mrAppMaster失败最大次数

yarn.resourcemanager.am.max-attempts:表示在YARN中运行的应用程序失败最大次数。 所以如果要设置mrAppMaster最大失败次数,这两个都需要设置。
在ApplicationMaster向resourceManager定期发送心跳,当ResourceManager检查到ApplicationMaster失败 的时候,ResourceManager会在新的NodeManager开启新的ApplicationMaster实例。如果是mrAppMaster,则会使用作业历史来恢复作业的运行状态,不必重新运行,由yarn.app.mapreduce.am.job.recovery.enable来控制该功能。
MapReduce客户端向mrAppMaster来轮询进度报告,如果mrAppMaster失败了,则客户端通过询问ResourceManager会定位新的mrAppMaster实例。在整个MapReduce任务作业初始化的时候,客户端会向ResourceManager询问并缓存mrAppMaster地址。

NodeManager运行失败
当NodeManager由于奔溃或者非常缓慢运行而失败,会停止向ResourceManager发送心跳信息。则如果10 分钟内(由yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms来设置,以ms为单位), ResourceManager会停止通知发送的NodeManager,并将起从自己的节点池中移除。
在失败的NodeManager上的任务或者ApplicationMaster将由上面的机制恢复。对于曾经在失败的NodeManager运行并且成功的Map Task,如果属于为完成的作业,则ApplicationMaster则会重新分配资源重新运行,因为输出结果在失败的NodeManager的本地文件系统中,Reduce任务可能无法访问到。
如果在一个NodeManager中,任务失败次数过多,即使自己并没有失败过,则ApplicationMaster则会尽 量将任务调度到其他的NodeManager上。失败次数由mapreduce.job.maxtaskfailures.per.tracker设置。

ResourceManager运行失败
在YARN中,ResourceManager失败是个致命的问题,如果失败,任何任务和作业都无法启动。在默认配 置中,ResourceManager是单点故障。为了获得高可用(HA),我们需要配置一对ResourceManager,在 主ResourceManager失败后,备份ResourceManager可以继续运行。
将所有的ApplicationMaster的运行信息保存到一个高可用的状态存储中(由ZooKeeper或者HDFS备
份),这样备份ResourceManager就可以恢复出失败的ResourceManager状态。当新的ResourceManager 从存储区读取ApplicationMaster,然后在集群中重启所有ApplicationMaster。这个行为不会计入到ApplicationMaster尝试。
ResourceManager在主备切换由故障转移器(failover controller)处理。默认情况,failover controller自动工作,由ZooKeeper的Leader选举,保证同一时刻只有一个主ResourceManager。不同于HDFS的HA, 该failover controller不必是单独的进程,而是嵌入ResourceManager中。

YARN高可用
问过的一些公司:蘑菇街参考答案:
ResourceManager(RM)负责资源管理,并调度应用作业。在Hadop2.4之前RsourceManage是单点的,容易 产生单点故障。HA 提供活动和备用的RM,解决了一个RM单点故障问题。
ResourceManager存在单点故障,基于Zookeeper实现HA,通常任务失败后,RM将失败的任务告诉AM, RM负责任务的重启,AM来决定如何处理失败的任务。RMAppMaster会保存已经运行完成的Task,启后无 需重新运行。
HA架构图
在这里插入图片描述
集群概述
RM:ResourceManage:r一个集群只有一个active状态的,负责整个集群的管理和调度 处理客户端请求
启动监控ApplicationMaster(AM,一个作业对应一个)
监控NM
系统资源的分配和调度
NM:负责单个节点的资源管理和使用以及task运行
定期想RM汇报本节点的资源使用情况和container运行情况接收处理RM对container的启停各种命令
单节点资源管理和任务管理
ZK
在ZooKeeper上会有一个/yarn-leader-election/yarn1的锁节点,所有的ResourceManager在启动的时候, 都会去竞争写一个Lock子节点:/yarn-leader-election/yarn1/ActiveBreadCrumb,该节点是临时节点。ZooKeepr能够为我们保证最终只有一个ResourceManager能够创建成功。创建成功的那个ResourceManager就切换为Active状态,没有成功的那些ResourceManager则切换为Standby状态。
ZKFC
是RM里面的一个线程,在HDFS HA中,zkfc是一个独立的进程。作用 是监控RM的健康状态,并执行选举作用。
RMStateStore
RM会把job的信息存放在zookeeper的/rmstore目录下,active RM会向这个目录写app的信息。当active RM挂掉之后,standby RM会通过zkfc切换为active状态,然后从zookeeper的/rmstore目录下读取相应的作业信息。重新构建作业的内存信息,启动内部服务,开始接受NM的心跳信息,构建集群的资源信 息,并且接受客户端的作业提交请求

YARN调度器
可回答:1)YARN有哪些调度策略?2)Hadoop中有哪些调度器;3)了解YARN哪些调度

问过的一些公司:转转,滴滴,作业帮,大华(2021.07), soul(2021.09),陌陌(2021.10)
参考答案:
目前,Hadoop 作业调度器主要有三种:FIFO、容量(Capacity Scheduler)和公平(Fair Scheduler)。Apache Hadoop3.1.3 默认的资源调度器是 Capacity Scheduler。
CDH 框架默认调度器是 Fair Scheduler。具体设置详见:yarn-default.xml 文件

  1. The class to use as the resource scheduler.
  2. yarn.resourcemanager.scheduler.class
  3. org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity Scheduler
  4. 6 1、先进先出调度器(FIFO) FIFO调度器(First In First Out):单队列,根据提交作业的先后顺序,先来先服务。在这里插入图片描述 优点:简单易懂 缺点:不支持多队列,生成环境很少使用 2、容量调度器(Capacity Scheduler) Capacity Scheduler是Yahoo开发的多用户调度器

多队列:每个队列可配置一定的资源量,每个队列采用FIFO调度策略容量保证:管理员可为每个队列设置资源最低保证和资源使用上限
灵活性:如果一个队列中的资源有剩余,可以暂时共享给那些需要资源的队列,而一旦该队列有新的应 用程序提交,则其它队列借调的资源会归还给该队列

多租户:
支持多用户共享集群和多应用程序同事运行
为了防止同一用户的作业独占队列中的资源,该调度器会对同一用户提交的作业所占资源进行限定
3、公平调度器(Fair Scheduler)
在这里插入图片描述
与容量调度器相同点
多队列:支持多队列多作业
容量保证:管理员可为每个队列设置资源最低保证和资源使用上线
灵活性:如果一个队列中的资源有剩余,可以暂时共享给那些需要资源的队列,而一旦该队列有新 的应用程序中提交,则其它队列借调的资源会归还给该队列
多用户:支持多用户共享集群和多应用程序同事运行;为了防止同一用户的作业独占队列总的资 源,该调度器会对同一用户提交的作业所占资源进行限定
与容量调度器不同点
核心调度策略不同
容量调度器:优先选择资源利用率低的队列公平调度器:有限选择对资源的缺额比例大的
每个队列可以单独设置资源分配方式
容量调度器:FIFO、DRF
公平调度器:FIFO、FAIR、DRF

YARN中Container是如何启动的?
问过的一些公司:海康威视参考答案:
1、ApplicationMaster的主要逻辑
AM与NM通信
AM与NM们通过NMClientAsync通信,后者需要调用方提供一个回调类,NM会在合适的时机调用回调类中 的方法来通知AM。回调类被AM实现为NMCallbackHandler,其中最重要的两个函数是:
onContainerStarted(),当NM新启动了Containers时,会调用改方法,把Container列表传给它。onContainerStopped(),当NM停止了一些Containers时,会调用改方法,把Container 列表传给它。
AM与RM通信
AM与RM通过AMRMClientAsync通信。
首先,通过 AMRMClientAsync.registerApplicationMaster() 向 RM 注册自己。

然后AM开始提交对Container的需求,在申请到需要数量的Container之前,先调用setupContainerAskForRM()设置对Container的具体需求(优先级、资源等),然后调用AMRMClientAsync.addContainerRequest()把需求提交给RM,最终该方法会把需求存到一个集合(AMRMClient.ask)里面。
AMRMClientAsync同样需要调用方提供一个回调类,AM实现为RMCallbackHandler。这个回调类主要实现 了两个方法:
onContainersAllocated(),获得新申请的Container,创建一个新线程,设置ContainerLaunchContext,最终调用NMClientAsync.startContainerAsync() 来启动Container。onContainersCompleted(),检查已完成的Container的数量是否达到了需求,没有的话,继续添加需 求。
AM的三个主流程
总结上面说的,AM有三个主要流程与Container的创建密切相关: 提交需求,通过心跳,把需求发送给RM;
获取Container,通过心跳,拿到申请好的Container;
每申请到一个Container ,与NM 通信,启动这个Container;
分析清楚了这三个主流程,也就清楚了 YARN Container 的启动逻辑。
Application 与 ResourceManager 的心跳
再看RM这边,在AM向RM注册时,RM最终会生成一个代表这个APP的实例,我们先不分析注册的具体过程,只要知道在我们的情景下,最终是生成了一个FicaSchedulerApp。
AM与RM进行心跳,发送的信息中含有:
AM告诉RM两个信息: a) 自己对Container的要求,b) 已经用完的待回收的Container列表。
RM给AM的回应:a) 新申请的Container,b) 已经完成的Container 的状态。
ApplicationMasterService是RM的一个组成部分。RM启动时,会初始化这个服务,并根据配置,把相应的 调度器YarnScheduler传进来。它实现了ApplicationMasterProtocol接口,负责对来自AM的 RPC 请求进行回应。在我们的情景中, ApplicationMasterService.allocate() 方法会被调用,核心逻辑是:
触发RMappAttemptStatusupdateEvent 事件。
调用YarnScheduler.allocate() 方法,把执行的结果封装起来返回。YarnScheduler 是与调度器通信的接口。所以,最后调用的是具体调度器的allocate() 方法。
我们使用的是 FIFO 调度器,FifoScheduler.allocate() 方法的主要做两件事情:
调用FicaSchedulerApp.updateResourceRequests()更新APP(指从调度器角度看的APP) 的资源需求。通过FicaSchedulerApp.pullNewlyAllocatedContainersAndNMTokens()把FicaSchedulerApp.newlyAllocatedContainers这个List 中的Container取出来,封装后返回。
FicaSchedulerApp.newlyAllocatedContainers 这个数据结构中存放的,正是最近申请到的 Container 。那么,这个 List 中的元素是怎么来的呢,这要从 NM 的心跳说起。
NodeManager与ResourceManager的心跳
NM 需要和 RM 进行心跳,让 RM 更新自己的信息。心跳的信息包含:
Request(NM->RM) : NM 上所有Container 的状态;
Response(RM->NM) : 已待删除和待清理的Container 列表
NM 启动时会向RM注册自己,RM生成对应的RMNode结构,代表这个NM ,存放了这个NM的资源信息以及其他一些统计信息。

负责具体心跳的,在NM这边是NodeStatusUpdater服务,在RM那边则是ResourceTrackerService服务。心 跳的信息包括这个NM的状态,其中所有Container的状态等。
心跳最终通过RPC调用到了ResourceTrackerService.nodeHeartbeat() 。其核心逻辑就是触发一个RMNodeStatusEvent(RMNodeEventType.STATUS_UPDATE) 事件,这个事件由 NM 注册时生成的RMNode处理。
RMNode接收RMNodeStatusEvent(RMNodeEventType.STATUS_UPDATE) 消息,更新自己的状态机,然后调用 StatusUpdateWhenHealthyTransition.transition ,该方法从参数中获得这个NM所有的Container的信
息,根据其状态分成两组:a) 刚申请到还未使用的,b) 运行完毕需要回收的,这两组 Container 的信息存 放 在 RMNode 的 一 个 队 列 中 。 接 着 , 发 出 一 个 消 息 : NodeUpdateSchedulerEvent(SchedulerEventType.NODE_UPDATE) 。这个消息,由调度器处理。
ResourceManager处理 NODE_UPDATE 消息
RM 接收到 NM 的心跳后,会发出一个 SchedulerEventType.NODE_UPDATE 的消息,改消息由调度器处理。FifoScheduler 接收到这个消息后,调用了 FifoScheduler.nodeUpdate() 方法。与 Container 申请相关的主要逻辑如下:
获取已申请到的
从 RMNode 中获取出那些「刚申请还未使用」的 Container (NM 与 RM 心跳是获得),发出消息:
RMContainerEventType.LAUNCHED,该消息由 RMContainer 处理;
回收已完成的
从 RMNode 中获取出那些「已经使用完待回收」的 Container,进行回收(具体回收过程略);
申请新的
在这个 NM 上申请新的 Container:
通过 FicaSchedulerApp.getResourceRequest() 拿到资源请求(ResourceRequest)
计算可申请的资源,调用 FicaSchedulerApp.allocate(),根据传进来的参数,封装出一个 RMContainer 添加到 newlyAllocatedContainers 中。然后触发事件 RMContainerEventType.START。该事件之后会由RMContainer 处理。
调用 FicaSchedulerNode.allocateContainer()和RMContainer对RMContainerEventType事件进行处理处理:
RMContainerEventType.START : 状态从 NEW 变为 ALLOCATED,最终触发事件RMAppAttemptEvent(type=CONTAINER_ALLOCATED), 改事件由 RMAppAttemptImpl 处理。RMContainerEventType.LAUNCHED : 状态从 ACQUIED 变为 RUNNING 。
RMAppAttemptImpl对RMAppAttemptEvent事件进行处理,该事件告诉就是告诉AppAttempt ,你这个APP 有Container申请好了,AppAttempt 检查自己的状态,如果当前还没有运行AM ,就把这个Container拿来运行AM。
到此,我们已经理清楚了FicaSchedulerApp.newlyAllocatedContainers中元素的来源,也就理清楚了,AM 与 RM 心跳中获得的那些「新申请」的 Container 的来源。
ApplicationMaster 与 NodeManager 通信启动 Container
关于“AM的三个主流程”,上面已经讲过了。
基于上面的分析,第1,2两个流程已经清楚。下面我们来具体看看 NM 具体是怎么启动一个 Container
的。
AM 设置好 ContainerLaunchContext , 调用 NMClientAsync.startContainerAsync() 启动Container。
NMClientAsync 中有一个名叫 events 的事件队列,同时,NMClientAsync 还启动这一个线程,不断地从
events 中取出事件进行处理。

startContainerAsync() 方法被调用时,会生成一个 ContainerEvent(type=START_CONTAINER) 事件放入events 队列。对于这个事件,处理逻辑是调用 NMClient.startContainer() 同步地启动 Container ,然后调用回调类中的 onContainerStarted() 方法。
NMClient 最终会调用 ContainerManagementProtocol.startContainers() ,以 Google Protocol Buffer 格式, 通过 RPC 调用 NM 的对应方法。NM 处理后会返回成功启动的 Container 列表。
NodeManager 中启动 Container ContainerManagerImpl
NM 中负责响应来自 AM 的 RPC 请求的是 ContainerManagerImpl ,它是 NodeManager 的一部分,负责Container 的管理,在 Nodemanager 启动时,该服务被初始化。该类实现了接口ContainerManagementProtocol ,接到 RPC 请求后,会调用 ContainerManagerImpl.startContainers() 。改函数的基本逻辑是:
首先进行APP 的初始化(如果还没有的话),生成一个ApplicationImpl 实例,然后根据请求,生成一堆ContainerImpl 实例
触发一个新事件:ApplicationContainerInitEvent ,之前生成的ApplicationImpl 收到改事件,又出发一个ContainerEvent(type=INIT_CONTAINER) 事件,这个事件由ContainerImpl 处理
ContainerImpl 收到事件, 更新状态机,启动辅助服务,然后触发一个新事件
ContainersLaucherEvent(type=LAUNCH_CONTAINER) ,处理这个事件的是ContainersLauncher 。
ContainerLauncher 是 ContainerManager 的一个子服务,收到ContainersLaucherEvent(type=LAUNCH_CONTAINER) 事件后,组装出一个 ContainerLaunch 类并使用ExecutorService 执行。
ContainerLaunch 类负责一个 Container 具体的 Lanuch 。基本逻辑如下:
设置运行环境,包括生成运行脚本,Local Resource ,环境变量,工作目录,输出目录等触发新事件ContainerEvent(type=CONTAINER_LAUNCHED),该事件由ContainerImpl 处理。调用ContainerExecutor.launchContainer() 执行Container 的工作,这是一个阻塞方法。
执行结束后,根据执行的结果设置Container 的状态。
ContainerExecutor
ContainerExecutor 是 NodeManager 的一部分,负责 Container 中具体工作的执行。该类是抽象类,可以有不同的实现,如 DefaultContainerExecutor ,DockerContainerExecutor ,LinuxContainerExecutor 等。根据 YARN 的配置,NodeManager 启动时,会初始化具体的 ContainerExecutor 。
ContainerExecutor 最主要的方法是 launchContainer() ,该方法阻塞,直到执行的命令结束。
DefaultContainerExecutor 是默认的 ContainerExecutor ,支持 Windows 和 Linux 。它的 launchContainer()
的逻
创建Container 需要的目录
拷贝Token、运行脚本到工作目录
做一些脚本的封装,然后执行脚本,返回状态码
至此,Container 在 NM 中已经启动,AM 中 NMCallback 回调类中的 onContainerStarted() 方法被调用。

YARN的改进之处,Hadoop 3.x相对于Hadoop 2.x?
问过的一些公司:字节参考答案:
YARN Timeline Service版本更新到v.2

本版本引入了Yarn时间抽服务v.2,主要用于解决2大挑战:改善时间轴服务的可伸缩性和可靠性,通过 引入流和聚合增强可用性。
YARN Timeline Service v.2 alpha 1可以让用户和开发者测试以及反馈,以便使得它可以替换现在的Timeline Service v.1.x。

YARN监控
问过的一些公司:美团参考答案:
配置 yarn-site.xml 开启日志聚合
日志聚集是YARN提供的日志中央化管理功能,它能将运行完成的Container/任务日志上传到HDFS上,从 而减轻NodeManager负载,且提供一个中央化存储和分析机制。默认情况下,Container/任务日志存在在 各个NodeManager上

yarn.nodemanager.aux-services mapreduce_shuffle
yarn.log-aggregation-enable true 配置 mapred-site.xml

  1. mapreduce.framework.name
  2. yarn
  3. mapreduce.jobhistroy.address
  4. master:10020
  5. mapreduce.jobhistroy.webapp.address
  6. master:19888
  7. 15

重启YARN
开启日志监控服务进程在nodenode机器上执行
sbin/mr-jobhistory-daemon.sh start historyserver
后使用jps命令查看是否启动成功,若启动成功则会显示出JobHistoryServer服务。

命令,执行完成

最好将 yarn-site.xml 的 yarn.log.server.url 也配置上

  1. yarn.log.server.url
  2. http://localhost:19888/jobhistory/logs
  3. 5 不然的话这个链接跳转不到在这里插入图片描述
标签: 面试 大数据 hadoop

本文转载自: https://blog.csdn.net/m0_46914845/article/details/126591169
版权归原作者 大数据小理 所有, 如有侵权,请联系我们删除。

“hadoop生态圈面试精华之MapReduce(二)”的评论:

还没有评论