一、MR输出时合并小文件
参数设置 含义
set hive.merge.mapfiles=true; 默认值ture,在Map-only的任务结束时合并小文件
set hive.merge.mapredfiles=true; 默认值false,在Map-Reduce的任务结束时合并小文件
set hive.merge.size.per.task=256000000; 默认值256M,
set hive.merge.smallfiles.avgsize=16000000
; 默认值16M,当输出文件的平均大小小于16M时,启动一个独立的map-reduce任务进行文件merge
reduce 计算方式:merge job后每个文件的目标大小(targetSize),用之前job输出文件的total size除以这个值,就可以决定merge job的reduce数目。merge job的map端相当于identity map,然后shuffle到reduce,每个reduce dump一个文件,通过这种方式控制文件的数量和大小
MapredWork work = (MapredWork) mrTask.getWork();
if (work.getNumReduceTasks() > 0) {
int maxReducers = conf.getIntVar(HiveConf.ConfVars.MAXREDUCERS);
int reducers = (int) ((totalSize +targetSize - 1) / targetSize);
reducers = Math.max(1, reducers);
reducers = Math.min(maxReducers, reducers);
work.setNumReduceTasks(reducers);
}
二、输入合并小文件,减小map数
set mapred.max.split.size=256000000; #每个Map最大输入大小
set mapred.min.split.size.per.node=100000000; #一个节点上split的至少的大小
set mapred.min.split.size.per.rack=100000000; #一个交换机下split的至少的大小
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; #执行Map前进行小文件合并
在开启了org.apache.hadoop.hive.ql.io.CombineHiveInputFormat后,一个data node节点上多个小文件会进行合并,合并文件数由mapred.max.split.size限制的大小决定。
mapred.min.split.size.per.node决定了多个data node上的文件是否需要合并~
mapred.min.split.size.per.rack决定了多个交换机上的文件是否需要合并~
解读:CombineFileInputFormat类
MR-Job默认的输入格式FileInputFormat为每一个小文件生成一个切片。CombineFileInputFormat通过将多个“小文件”合并为一个"切片"(在形成切片的过程中也考虑同一节点、同一机架的数据本地性),让每一个Mapper任务可以处理更多的数据,从而提高MR任务的执行速度。详见 MR案例: CombineFileInputFormat类
1).三个重要的属性:
maxSplitSize:切片大小最大值。可通过属性 “mapreduce.input.fileinputformat.split.maxsize” 或 CombineFileInputFormat.setMaxInputSplitSize()方法进行设置【不设置,则所有输入只启动一个map任务】
minSplitSizeNode:同一节点的数据块形成切片时,切片大小的最小值。可通过属性 “mapreduce.input.fileinputformat.split.minsize.per.node” 或 CombineFileInputFormat.setMinSplitSizeNode()方法进行设置
minSplitSizeRack:同一机架的数据块形成切片时,切片大小的最小值。可通过属性 “mapreduce.input.fileinputformat.split.minsize.per.rack” 或 CombineFileInputFormat.setMinSplitSizeRack()方法进行设置
大小关系:maxSplitSize > minSplitSizeNode > minSplitSizeRack
2).切片的形成过程:
2.1. 不断迭代节点列表,逐个节点 (以数据块为单位) 形成切片(Local Split)
a. 如果maxSplitSize == 0,则整个节点上的Block数据形成一个切片
b. 如果maxSplitSize != 0,遍历并累加每个节点上的数据块,如果累加数据块大小 >= maxSplitSize,则将这些数据块形成一个切片。继续该过程,直到剩余数据块累加大小 < maxSplitSize 。则进行下一步
c. 如果剩余数据块累加大小 >= minSplitSizeNode,则将这些剩余数据块形成一个切片。继续该过程,直到剩余数据块累加大小 < minSplitSizeNode。然后进行下一步,并这些数据块留待后续处理
2.2. 不断迭代机架列表,逐个机架 (以数据块为单位) 形成切片(Rack Split)
a. 遍历并累加这个机架上所有节点的数据块 (这些数据块即上一步遗留下来的数据块),如果累加数据块大小 >= maxSplitSize,则将这些数据块形成一个切片。继续该过程,直到剩余数据块累加大小<maxSplitSize。则进行下一步
b. 如果剩余数据块累加大小 >= minSplitSizeRack,则将这些剩余数据块形成一个切片。如果剩余数据块累加大小 < minSplitSizeRack,则这些数据块留待后续处理
2.3. 遍历并累加所有Rack上的剩余数据块,如果累加数据块大小 >= maxSplitSize,则将这些数据块形成一个切片。继续该过程,直到剩余数据块累加大小< maxSplitSize。则进行下一步
2.4. 将最终剩余的数据块形成一个切片。
Demo:
规定:maxSplit=100 > minSizeNode=50 > minSizeRack=30
原有文件:Rack01:{[30,60,70] [80,110]} Rack02:{170}
处理过程:
30+60+70 > 100 ? 100+60 80+110 > 100 ? 100+90 170 > 100 ? 100+70
—> 3个数据切片,以及Rack01:{[60] [90]} Rack02:{70}
—> 60 > 50 ? 50+10 90 > 50 ? 50+40 70 > 50 ? 50+20
—> 3+3个数据切片,以及Rack01:{[10] [40]} Rack02:{20}
—> 10+40 < 100 ?0 20 < 100 ? 0
—> 3+3+0个数据切片,以及Rack01:{50} Rack02:{20}
—> 50+20 > 30 ? 30+30+10
—> 3+3+0+3个数据切片
对hive输入格式设置为CombineHiveInputFormat的进行分析map数是如何计算的
set hive.input.format=org.apache.hadoop.hive.al.io.CombineHiveInputFormat
注:对orcformat、外表和链接文件无法使用,会转到调用父类HiveInputFormat的getsplits()函数
map数与逻辑split数是一致的,决定map的主要因素有:
1、相关表或分区input的文件个数
2、input文件的大小
3、input文件在node和rack的分布
4、set mapred.max.split.size; 最大split大小
5、set mapred.min.split.size.per.node; 一个节点上最小的split大小
6、set mapred.min.split.size.per.rack; 一个机架上最小的split大小
例如:查询相关目录下有12个input file,每个input file的大小都在100M左右,block分布如下图:
情况一:参数设置如下:set mapred.max.split.size=256000000;
set mapred.min.split.size.per.node=64000000;
set mapred.min.split.size.per.rack=64000000;
第一步:遍历node,嵌套遍历block,当block的累加值大于max.split.size时,创建一个split,小于时,但如果大于min.size.per.node,创建一个新的split,小于时暂存block,继续下一个node。这个遍历过程每个node最多生成一个split,为提高并发度,让split尽量分布到不同的node上。
node I 有三个block(A、B、E)累加值300M > 256M, 会新建一个split。
node II 只有一个C block < 256M,会进行暂存
第二步:遍历rack,嵌套遍历block,对暂存的block进行分割,当block的累加值大于max.split.size时,创建一个新的split,小于时,但如果大于min.size.per.rack,创建一个新的split,小于时暂存block,继续下一个rack
rack I 三个block(C、D、G)累加值300M > 256M,会新建一个split,继续到下一个rack
第三步:对垮rack最后溢出的block处理,当block累加值大于max.split.size时创建新的split,循环处理,最后剩的数据创建一个split
参考:
2.https://www.ghcc.net/node/3287051
原文链接:https://blog.csdn.net/javastart/article/details/106686861
版权归原作者 Samooyou 所有, 如有侵权,请联系我们删除。