一、源码下载
下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧
Index of /dist/hadoop/core
二、Reducer类
我们先看下我们写的reduce所继承的Reducer类
public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
/**
* 传递给Reducer实现的上下文
*/
public abstract class Context
implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
}
/**
* 在任务开始时调用一次
*/
protected void setup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* 每个键调用一次此方法。大多数应用程序将通过重写此方法来定义其reduce类。默认实现是标识函数
*
*/
@SuppressWarnings("unchecked")
protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
) throws IOException, InterruptedException {
for(VALUEIN value: values) {
context.write((KEYOUT) key, (VALUEOUT) value);
}
}
/**
* 在任务结束时调用一次
*/
protected void cleanup(Context context
) throws IOException, InterruptedException {
// NOTHING
}
/**
* 高级应用程序编写者可以使用 run(org.apache.hadop.mapreduce.Reporter.Context) 方法
* 来控制reduce任务的工作方式
*/
public void run(Context context) throws IOException, InterruptedException {
setup(context);
try {
while (context.nextKey()) {
reduce(context.getCurrentKey(), context.getValues(), context);
//如果使用了备份存储,请将其重置
Iterator<VALUEIN> iter = context.getValues().iterator();
if(iter instanceof ReduceContext.ValueIterator) {
((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();
}
}
} finally {
cleanup(context);
}
}
}
经过该类的注释我们可以得到以下信息:
1、将map阶段输出的key相同的一组中间值缩减为一组较小的值
2、Reducer实现可以通过JobContext.getConfiguration()访问作业的配置
3、Reducer有三个主要阶段
3.1、Shuffle:Reducer使用HTTP在网络上复制每个Mapper的排序输出
3.2、Sort:框架合并多个Mapper的输出并按key进行排序(因为不同的Mapper可能输出相同的key)
Shuffle 和 Sort 阶段同时发生,即当提取输出时,它们被合并
3.3、SecondarySort:为了对迭代器返回的value进行二次排序,应用程序应该用二次关键字扩展关键字,并定义一个分组比较器。键将使用整个键进行排序,但将使用分组比较器进行分组,以决定在同一调用中发送哪些key和value到reduce。分组比较器是通过Job.setGroupingComparatorClass(Class)指定的。排序顺序由Job.setSortCompratorclass(Class)控制
4、Reduce阶段:为排序输入中的每<key,value集合>调用reduce()
5、ReduceTask的输出通常通过Context.write()写入RecordWriter
6、Reducer的输出未重新排序
三、ReduceTask是如何调起的
ReduceTask和MapTask一样都是由YarnChild启动的,详细可以看下上一篇博客<Hadoop-MapReduce-源码跟读-MapTask阶段篇>中的MapTask调起过程
四、ReduceTask运行细节(源码跟读)
这里我们就不从YarnChild开始捋了,而是从ReduceTask的run方法开始跟读
请注意:以下分析的是一个Job中一个ReduceTask的源码,一个Job可以有多个ReduceTask
1、ReduceTask
public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
if (isMapOrReduce()) {
//Progress帮助生成进度报告的实用程序。是层次结构。子阶段的节点通过调用addPhase()创建
//添加copy、sort、reduce到流程中
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
}
//启动将处理与父级通信的线程(创建TaskReporter并启动通信线程)
TaskReporter reporter = startReporter(umbilical);
//是否是哟个新API,默认false,可以通过mapred.reducer.new-api设置
boolean useNewApi = job.getUseNewReducer();
//初始化:
// 1、构建job的上下文
// 2、构建尝试任务的上下文
// 3、更改任务的状态UNASSIGNED到RUNNING
// 4、获取输出格式化类,默认是TextOutputFormat.class 可以通过mapreduce.job.outputformat.class设置
// 5、获取输出提交器,框架依赖输出提交器做一下操作:
// 5.1、在初始化期间设置作业。例如,在作业初始化期间为作业创建临时输出目录
// 5.2、作业完成后清理作业。例如,在作业完成后删除临时输出目录
// 5.3、设置任务临时输出
// 5.4、检查任务是否需要提交。这是为了在任务不需要提交的情况下避免提交过程
// 5.5、任务输出的提交
// 5.6、放弃任务提交
// 6、获取输出目录,并将其设置为工作目录
// 7、设置任务的输出(这是从将输出到HDFS的每个单独任务的进程中调用的,并且它只是为该任务调用的。对于同一任务,但对于不同的任务尝试,可以多次调用此函数)
// 8、从Job配置中的类名创建根到指定进程的ResourceCalculatorProcessTree并对其进行配置。如果类名为null,此方法将尝试返回可用于此系统的进程树插件。
// 9、更新进程树
// 10、获取自创建进程树以来进程树中所有进程使用的CPU时间(以毫秒为单位)
initialize(job, getJobID(), reporter, useNewApi);
//检查任务类型:cleanupJobTask、jobSetupTask、taskCleanupTask
if (jobCleanup) {
runJobCleanupTask(umbilical, reporter);
return;
}
if (jobSetup) {
runJobSetupTask(umbilical, reporter);
return;
}
if (taskCleanup) {
runTaskCleanupTask(umbilical, reporter);
return;
}
//初始化编解码器
//检查是否要压缩map输出数据
codec = initCodec();
//RawKeyValueIterator是一个迭代器,用于在对中间数据进行排序/合并期间对原始键和值进行迭代。
RawKeyValueIterator rIter = null;
ShuffleConsumerPlugin shuffleConsumerPlugin = null;
//获取用户定义的组合器类,该类用于在将map输出发送到reduce之前组合映射输出。通常,组合器与作业的 Reducer 相同,即 getReducerClass(),可以通过mapred.combiner.class设置
Class combinerClass = conf.getCombinerClass();
//构建组合器的输出收集者
//默认收集10000个<key,value>向框架汇报一次,可以通过mapreduce.task.combine.progress.records设置
CombineOutputCollector combineCollector =
(null != combinerClass) ?
new CombineOutputCollector(reduceCombineOutputCounter, reporter, conf) : null;
//获取Shuffle类
//默认是Shuffle.class,可以通过mapreduce.job.reduce.shuffle.consumer.plugin.class设置
Class<? extends ShuffleConsumerPlugin> clazz =
job.getClass(MRConfig.SHUFFLE_CONSUMER_PLUGIN, Shuffle.class, ShuffleConsumerPlugin.class);
//利用反射获取ShuffleConsumerPlugin
//ShuffleConsumerPugin用于服务Reducers。它可以从内置的ShuffleHandler或第三方辅助服务中打乱MOF文件。
shuffleConsumerPlugin = ReflectionUtils.newInstance(clazz, job);
LOG.info("Using ShuffleConsumerPlugin: " + shuffleConsumerPlugin);
//构建Shuffle上下文(如果是本地模式,会设置localMapFiles,即:任务map输出都是在本地)
ShuffleConsumerPlugin.Context shuffleContext =
new ShuffleConsumerPlugin.Context(getTaskID(), job, FileSystem.getLocal(job), umbilical,
super.lDirAlloc, reporter, codec,
combinerClass, combineCollector,
spilledRecordsCounter, reduceCombineInputCounter,
shuffledMapsCounter,
reduceShuffleBytes, failedShuffleCounter,
mergedMapOutputsCounter,
taskStatus, copyPhase, sortPhase, this,
mapOutputFile, localMapFiles);
//根据Shuffle上下文初始化Shuffle消费者插件,默认会调用Shuffle.init()
// 1、获取MapTask的个数并设置剩余MapTask和完成MapTask数量
// 2、设置失败限制:Math.max(30, totalMaps / 10) 既:最小值为30
// 3、设置重新拉取时间、拉取失败个数(默认5)等
// 4、创建合并管理器
shuffleConsumerPlugin.init(shuffleContext);
//Shuffle开始,我们看看具体实现(第2步)
rIter = shuffleConsumerPlugin.run();
//释放数据结构
mapOutputFilesOnDisk.clear();
//排序阶段完成
sortPhase.complete();
//开始REDUCE阶段
setPhase(TaskStatus.Phase.REDUCE);
//发生状态变更到task tracker
statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
//获取用户定义的WritableComparable比较器,用于对reduce的输入所有key进行分组。
RawComparator comparator = job.getOutputValueGroupingComparator();
if (useNewApi) {
//我们看新API
runNewReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
} else {
runOldReducer(job, umbilical, reporter, rIter, comparator,
keyClass, valueClass);
}
shuffleConsumerPlugin.close();
done(umbilical, reporter);
}
private <INKEY,INVALUE,OUTKEY,OUTVALUE>
void runNewReducer(JobConf job,
final TaskUmbilicalProtocol umbilical,
final TaskReporter reporter,
RawKeyValueIterator rIter,
RawComparator<INKEY> comparator,
Class<INKEY> keyClass,
Class<INVALUE> valueClass
) throws IOException,InterruptedException,
ClassNotFoundException {
final RawKeyValueIterator rawIter = rIter;
rIter = new RawKeyValueIterator() {
public void close() throws IOException {
rawIter.close();
}
public DataInputBuffer getKey() throws IOException {
return rawIter.getKey();
}
public Progress getProgress() {
return rawIter.getProgress();
}
public DataInputBuffer getValue() throws IOException {
return rawIter.getValue();
}
public boolean next() throws IOException {
boolean ret = rawIter.next();
reporter.setProgress(rawIter.getProgress().getProgress());
return ret;
}
};
//制作一个任务上下文,以便我们可以获得类
org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
new org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl(job,
getTaskID(), reporter);
//制作用户自定义reducer
org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
(org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
//制作RecordWriter
org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> trackedRW =
new NewTrackingRecordWriter<OUTKEY, OUTVALUE>(this, taskContext);
job.setBoolean("mapred.skip.on", isSkipping());
job.setBoolean(JobContext.SKIP_RECORDS, isSkipping());
//创建reduce的上下文,comparator是vlaue的比较器
org.apache.hadoop.mapreduce.Reducer.Context
reducerContext = createReduceContext(reducer, job, getTaskID(),
rIter, reduceInputKeyCounter,
reduceInputValueCounter,
trackedRW,
committer,
reporter, comparator, keyClass,
valueClass);
try {
//运行自定义的reduce(我们还是看WordCount的reduce)
reducer.run(reducerContext);
} finally {
trackedRW.close(reducerContext);
}
}
2、Shuffle
Shuffe阶段会返回一个迭代器,用于在对中间数据进行排序/合并期间对原始键和值进行迭代
public RawKeyValueIterator run() throws IOException, InterruptedException {
//缩放我们每次RPC调用获取的最大事件数,以缓解ApplicationMaster上的OOM问题
// TODO: 在 HADOOP-8942 之后应该没有必要这样做
//MIN_EVENTS_TO_FETCH = 100
//MAX_RPC_OUTSTANDING_EVENTS = 3000000
//我们为此作业配置的reduce任务数。默认为1
//因此eventsPerReducer的取值范围是 [100,3000000]
int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
MAX_RPC_OUTSTANDING_EVENTS / jobConf.getNumReduceTasks());
//MAX_EVENTS_TO_FETCH = 10000
//eventsPerReducer = [100,3000000]
//因此 maxEventsToFetch 取值范围是 [100,10000]
int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
//启动MapTask完成事件获取线程EventFetcher
//EventFetcher 会在TaskTracker中查询给定事件ID中的一组MapTask完成事件
(TaskCompletionEvent.Status.SUCCEEDED)
//处理任务完成事件:
// 1.将SUCCEEDED映射保存在knownOutput中以获取输出。
// 2.将OBSOLETE/FAILED/KILLED状态的map保存在obsoleteOutput中,以停止从这些map中获取。
// 3.从neededOutput中删除TIPFAILED状态的map,因为我们根本不需要它们的输出。
final EventFetcher<K, V> eventFetcher =
new EventFetcher<K, V>(reduceId, umbilical, scheduler, this,
maxEventsToFetch);
eventFetcher.start();
//启动map输出提取器线程Fetcher
//localMapFiles:
//为map和reduce的瞬态存储操作工作区域。
//map和reduce任务使用此类来标识中间文件需要写入/读取的目录。
//如果job为本地作业那么localMapFiles是不为空的,既isLocal=true 如果是分布式isLocal=false
boolean isLocal = localMapFiles != null;
//在Shuffle阶段,reduce运行的默认并行传输数。默认5个,可以通过mapreduce.reduce.shuffle.parallelcopies设置
final int numFetchers = isLocal ? 1 :
jobConf.getInt(MRJobConfig.SHUFFLE_PARALLEL_COPIES, 5);
//生成Fetcher数组
Fetcher<K, V>[] fetchers = new Fetcher[numFetchers];
if (isLocal) {
//如果是本地模式,就创建一个LocalFetcher线程即可
//LocalJobRunner使用LocalFetcher执行本地文件系统获取
fetchers[0] = new LocalFetcher<K, V>(jobConf, reduceId, scheduler,
merger, reporter, metrics, this, reduceTask.getShuffleSecret(),
localMapFiles);
fetchers[0].start();
} else {
//分布式环境下需要创建多个Fetcher从不同节点拉取数据
for (int i=0; i < numFetchers; ++i) {
fetchers[i] = new Fetcher<K, V>(jobConf, reduceId, scheduler, merger,
reporter, metrics, this,
reduceTask.getShuffleSecret());
fetchers[i].start();
}
}
//等待Shuffle成功完成
while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
reporter.progress();
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
}
//停止map完成事件获取线程,因为Shuffle阶段已经完了
eventFetcher.shutDown();
//停止map输出获取线程
for (Fetcher<K, V> fetcher : fetchers) {
fetcher.shutDown();
}
//停止调度
scheduler.close();
copyPhase.complete(); //复制已完成
//设置此任务的当前阶段为SORT
taskStatus.setPhase(TaskStatus.Phase.SORT);
//向 task tracker 发送状态更新
reduceTask.statusUpdate(umbilical);
//完成正在进行的合并
RawKeyValueIterator kvIter = null;
try {
//kvIter 就是<key,value>的迭代器了,也就是传给reduce方法的值,那么SORT阶段就是在这里做了,下面我们详细看下merger.close()中的实现(第3步)
kvIter = merger.close();
} catch (Throwable e) {
throw new ShuffleError("Error while doing final merge ", e);
}
// Sanity check
synchronized (this) {
if (throwable != null) {
throw new ShuffleError("error in shuffle in " + throwingThreadName,
throwable);
}
}
return kvIter;
}
3、MergeManagerImpl
在这里会对从各个map拉取的数据做排序、合并处理
public RawKeyValueIterator close() throws Throwable {
//等待正在进行的合并完成
if (memToMemMerger != null) {
memToMemMerger.close();
}
inMemoryMerger.close();
onDiskMerger.close();
//内存中的map输出数据
List<InMemoryMapOutput<K, V>> memory =
new ArrayList<InMemoryMapOutput<K, V>>(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
//磁盘中的map输出数据
List<CompressAwarePath> disk = new ArrayList<CompressAwarePath>(onDiskMapOutputs);
onDiskMapOutputs.clear();
return finalMerge(jobConf, rfs, memory, disk);
}
private RawKeyValueIterator finalMerge(JobConf job, FileSystem fs,
List<InMemoryMapOutput<K,V>> inMemoryMapOutputs,
List<CompressAwarePath> onDiskMapOutputs
) throws IOException {
LOG.info("finalMerge called with " +
inMemoryMapOutputs.size() + " in-memory map-outputs and " +
onDiskMapOutputs.size() + " on-disk map-outputs");
//获取在ReduceTask中可用的最大内存限制
final long maxInMemReduce = getMaxInMemReduceLimit();
// merge config params
Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
//是否应保留失败任务的临时文件?默认false,可以通过mapreduce.task.files.preserve.failedtasks设置
boolean keepInputs = job.getKeepFailedTaskFiles();
final Path tmpDir = new Path(reduceId.toString());
//获取用于比较key的比较器,必须是RawComparator 的子类,默认为null,可以通过mapreduce.job.output.key.comparator.class设置
//如果没有指定比较器,那么会获取WritableComparable(可序列化可比较)实现的比较器
//RawComparator接口允许其实现直接比较数据流中的记录,无需先把数据流饭序列化为对象,这样便避免了新建对象的额外开销。
final RawComparator<K> comparator =
(RawComparator<K>)job.getOutputKeyComparator();
//腾出内存所需的段
List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
long inMemToDiskBytes = 0;
boolean mergePhaseFinished = false;
if (inMemoryMapOutputs.size() > 0) {
TaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
//这里会和ReduceTask中可用的最大内存限制做比较,把大于内存限制的部分数据放到磁盘上,内存中保存尽可能多的数据
inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs,
memDiskSegments,
maxInMemReduce);
final int numMemDiskSegments = memDiskSegments.size();
//ioSortFactor 是对文件进行排序时要同时合并的流的数量。这决定了打开的文件句柄的数量。
//默认是10 可以通过mapreduce.task.io.sort.factor设置
if (numMemDiskSegments > 0 &&
ioSortFactor > onDiskMapOutputs.size()) {
//如果我们到达这里,这意味着我们有少于io.sort.factor磁盘段,
//并且这将增加1(内存段合并的结果)。由于这个总数仍然是<=io.sort.factor,
//我们将不再进行任何中间合并,所有这些磁盘段的合并将直接提供给reduce方法
mergePhaseFinished = true;
//必须溢写到磁盘,但不能保留在mem中进行中间合并
//创建一个本地reduce输入文件名,以.merged为后缀
final Path outputPath =
mapOutputFile.getInputFileForWrite(mapId,
inMemToDiskBytes).suffix(
Task.MERGED_OUTPUT_PREFIX);
//开始合并,下面我们详细看下合并细节。(看第3.1步)
final RawKeyValueIterator rIter = Merger.merge(job, fs,
keyClass, valueClass, memDiskSegments, numMemDiskSegments,
tmpDir, comparator, reporter, spilledRecordsCounter, null,
mergePhase);
//用CryptoOutputStream封装给定的FSDataOutputStream。
//流所需的数据缓冲区大小由“mapreduce.job.encrypted-intermediate-data.buffer.kb”作业配置变量指定
FSDataOutputStream out =
IntermediateEncryptedStream.wrapIfNecessary(job,
fs.create(outputPath), outputPath);
Writer<K, V> writer = new Writer<K, V>(job, out, keyClass, valueClass,
codec, null, true);
try {
//合并输出到文件,默认合并10000条记录就向MR ApplicationMaster发送进度通知
Merger.writeFile(rIter, writer, reporter, job);
writer.close();
onDiskMapOutputs.add(new CompressAwarePath(outputPath,
writer.getRawLength(), writer.getCompressedLength()));
writer = null;
//添加到最终磁盘输出的列表中
} catch (IOException e) {
if (null != outputPath) {
try {
fs.delete(outputPath, true);
} catch (IOException ie) {
// NOTHING
}
}
throw e;
} finally {
if (null != writer) {
writer.close();
}
}
LOG.info("Merged " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes to disk to satisfy " +
"reduce memory limit");
inMemToDiskBytes = 0;
memDiskSegments.clear();
} else if (inMemToDiskBytes != 0) {
LOG.info("Keeping " + numMemDiskSegments + " segments, " +
inMemToDiskBytes + " bytes in memory for " +
"intermediate, on-disk merge");
}
}
//磁盘上的段处理
List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
long onDiskBytes = inMemToDiskBytes;
long rawBytes = inMemToDiskBytes;
CompressAwarePath[] onDisk = onDiskMapOutputs.toArray(
new CompressAwarePath[onDiskMapOutputs.size()]);
for (CompressAwarePath file : onDisk) {
long fileLength = fs.getFileStatus(file).getLen();
onDiskBytes += fileLength;
rawBytes += (file.getRawDataLength() > 0) ? file.getRawDataLength() : fileLength;
LOG.debug("Disk file: " + file + " Length is " + fileLength);
diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs,
(file.toString().endsWith(
Task.MERGED_OUTPUT_PREFIX) ?
null : mergedMapOutputsCounter), file.getRawDataLength()
));
}
LOG.info("Merging " + onDisk.length + " files, " +
onDiskBytes + " bytes from disk");
Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
public int compare(Segment<K, V> o1, Segment<K, V> o2) {
if (o1.getLength() == o2.getLength()) {
return 0;
}
return o1.getLength() < o2.getLength() ? -1 : 1;
}
});
// build final list of segments from merged backed by disk + in-mem
//从备份磁盘和内存构建最终的段数据列表
List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
long inMemBytes = createInMemorySegments(inMemoryMapOutputs,
finalSegments, 0);
LOG.info("Merging " + finalSegments.size() + " segments, " +
inMemBytes + " bytes from memory into reduce");
if (0 != onDiskBytes) {
final int numInMemSegments = memDiskSegments.size();
diskSegments.addAll(0, memDiskSegments);
memDiskSegments.clear();
//只有当将要进行中间合并时,才通过mergePhase
Progress thisPhase = (mergePhaseFinished) ? null : mergePhase;
RawKeyValueIterator diskMerge = Merger.merge(
job, fs, keyClass, valueClass, codec, diskSegments,
ioSortFactor, numInMemSegments, tmpDir, comparator,
reporter, false, spilledRecordsCounter, null, thisPhase);
diskSegments.clear();
if (0 == finalSegments.size()) {
return diskMerge;
}
finalSegments.add(new Segment<K,V>(
new RawKVIteratorReader(diskMerge, onDiskBytes), true, rawBytes));
}
//开始合并(合并细节看第3.1步)
return Merger.merge(job, fs, keyClass, valueClass,
finalSegments, finalSegments.size(), tmpDir,
comparator, reporter, spilledRecordsCounter, null,
null);
}
3.1、Merger
Merger是Map和Reduce任务用于合并其内存和磁盘段的实用程序类
最终会调用Merger内部类MergeQueue中的merge()
RawKeyValueIterator merge(Class<K> keyClass, Class<V> valueClass,
int factor, int inMem, Path tmpDir,
Counters.Counter readsCounter,
Counters.Counter writesCounter,
Progress mergePhase)
throws IOException {
LOG.info("Merging " + segments.size() + " sorted segments");
/*
* 如果内存中有段,则它们首先出现在段列表中,然后是已排序的磁盘段。
* 否则(如果只有磁盘段),则如果段列表中有多个因子段,则它们是已排序的段。
*/
int numSegments = segments.size();
int origFactor = factor;
int passNo = 1;
if (mergePhase != null) {
mergeProgress = mergePhase;
}
//计算要合并的输入字节的预期大小,将用于计算合并进度。
//这模拟了上面的merge(),并试图获得所有合并中要合并的字节数(假设合并时没有调用合并器)
long totalBytes = computeBytesInMerges(factor, inMem);
if (totalBytes != 0) {
progPerByte = 1.0f / (float)totalBytes;
}
//从构造函数中创建的排序map中创建MergeStreams,并将最终输出转储到文件中
do {
//获取此合并过程的因子。我们假设内存中的段是段列表中的第一个条目,并且传递因子不适用于它们
factor = getPassFactor(factor, passNo, numSegments - inMem);
if (1 == passNo) {
factor += inMem;
}
List<Segment<K, V>> segmentsToMerge =
new ArrayList<Segment<K, V>>();
int segmentsConsidered = 0;
int numSegmentsToConsider = factor;
long startBytes = 0; //此合并的段的起始字节
while (true) {
//提取最小的“factor”段数对空段调用清理(无 key/value 数据)
List<Segment<K, V>> mStream =
getSegmentDescriptors(numSegmentsToConsider);
for (Segment<K, V> segment : mStream) {
//在最后可能的时刻初始化段;这有助于确保我们在需要缓冲区之前不会使用它们
segment.init(readsCounter);
long startPos = segment.getReader().bytesRead;
boolean hasNext = segment.nextRawKey();
long endPos = segment.getReader().bytesRead;
if (hasNext) {
startBytes += endPos - startPos;
segmentsToMerge.add(segment);
segmentsConsidered++;
}
else {
segment.close();
numSegments--; //我们忽略该段进行合并
}
}
//如果我们有所需数量的分段,或者查看所有可用的分段,我们就会中断
if (segmentsConsidered == factor ||
segments.size() == 0) {
break;
}
numSegmentsToConsider = factor - segmentsConsidered;
}
//将流馈送到优先级队列
initialize(segmentsToMerge.size());
clear();
for (Segment<K, V> segment : segmentsToMerge) {
put(segment);
}
//如果剩余的段数较少,则只返回迭代器,否则执行另一个单级合并
if (numSegments <= factor) {
if (!includeFinalMerge) { //用于reduce任务
//重置totalBytesProcessed并重新计算剩余段的totalBytes,
//以跟踪最终合并的进度。最终合并被视为reduce阶段的进展,即reduce任务的第三阶段。
totalBytesProcessed = 0;
totalBytes = 0;
for (int i = 0; i < segmentsToMerge.size(); i++) {
totalBytes += segmentsToMerge.get(i).getRawDataLength();
}
}
if (totalBytes != 0) //偏执
progPerByte = 1.0f / (float)totalBytes;
totalBytesProcessed += startBytes;
if (totalBytes != 0)
mergeProgress.set(Math.min(1.0f, totalBytesProcessed * progPerByte));
else
mergeProgress.set(1.0f); //最后一次传输,没有剩余的分段-我们就完成了
LOG.info("Down to the last merge-pass, with " + numSegments +
" segments left of total size: " +
(totalBytes - totalBytesProcessed) + " bytes");
return this;
} else {
LOG.info("Merging " + segmentsToMerge.size() +
" intermediate segments out of a total of " +
(segments.size()+segmentsToMerge.size()));
long bytesProcessedInPrevMerges = totalBytesProcessed;
totalBytesProcessed += startBytes;
//如果在空间限制下可用,我们希望在多个磁盘上分散创建临时文件
long approxOutputSize = 0;
for (Segment<K, V> s : segmentsToMerge) {
approxOutputSize += s.getLength() +
ChecksumFileSystem.getApproxChkSumLength(
s.getLength());
}
Path tmpFilename =
new Path(tmpDir, "intermediate").suffix("." + passNo);
Path outputFile = lDirAlloc.getLocalPathForWrite(
tmpFilename.toString(),
approxOutputSize, conf);
FSDataOutputStream out = fs.create(outputFile);
out = IntermediateEncryptedStream.wrapIfNecessary(conf, out,
outputFile);
Writer<K, V> writer = new Writer<K, V>(conf, out, keyClass, valueClass,
codec, writesCounter, true);
writeFile(this, writer, reporter, conf);
writer.close();
//我们完成了一次单级合并;现在清理优先级队列
this.close();
//将新创建的分段添加到要合并的分段列表中
Segment<K, V> tempSegment =
new Segment<K, V>(conf, fs, outputFile, codec, false);
//在排序列表中插入新的合并段
int pos = Collections.binarySearch(segments, tempSegment,
segmentComparator);
if (pos < 0) {
//二进制搜索失败。所以要插入的位置是pos-1
pos = -pos-1;
}
segments.add(pos, tempSegment);
numSegments = segments.size();
//从totalBytes中减去新段的预期大小和实际大小之间的差值(新段的预计大小为inputBytesOfThisMerge)。
//若合并中未调用合并器,则预期大小和实际大小将匹配(几乎)
long inputBytesOfThisMerge = totalBytesProcessed -
bytesProcessedInPrevMerges;
totalBytes -= inputBytesOfThisMerge - tempSegment.getRawDataLength();
if (totalBytes != 0) {
progPerByte = 1.0f / (float)totalBytes;
}
passNo++;
}
//我们只担心第一次通过合并因子。因此,将系数重置为原来的值
factor = origFactor;
} while(true);
}
4、WordCount中的reduce
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
//key为从map输出收集的去重后的key,value为按照key进行分组合并排序后的value迭代器
//当处理当下key时,会对下一个key的value按照reduce上下文传入的比较器进行排序
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
//默认调用TextOutputFormat.write()进行输出
context.write(key, result);
}
}
5、TextOutputFormat
public synchronized void write(K key, V value)
throws IOException {
boolean nullKey = key == null || key instanceof NullWritable;
boolean nullValue = value == null || value instanceof NullWritable;
if (nullKey && nullValue) {
return;
}
if (!nullKey) {
writeObject(key);
}
if (!(nullKey || nullValue)) {
out.write(keyValueSeparator);
}
if (!nullValue) {
writeObject(value);
}
//换行符“\n”
out.write(NEWLINE);
}
五、总结
1、初始化:比如构建作业和尝试任务的上下文、更新任务状态,构建输出提交器等
2、Shuffle:根据本地模式和集群模式生成不同的线程(Fetcher)组来收集map端的输出
3、Sort:对Shuffle的结果进行排序合并
4、SecondarySort:对相同key的value进行二次排序
5、构建自定义reducer、RecordWriter、reduce的上下文
6、运行用户自定义的Reduce
7、无序输出结果,一个reduce输出一份结果
版权归原作者 隔着天花板看星星 所有, 如有侵权,请联系我们删除。