1.MapReduce概述
Hadoop MapReduce是一个软件框架,用于轻松编写应用程序,这些应用程序以可靠、容错的方式在大型集群(数千个节点)的商用硬件上并行大量数据(数TB数据集)。
MapReduce作业通常将输入数据集分割成独立的块,这些块由映射任务以完全并行的方式进行处理。该框架对映射的输出进行排序,然后将其输入到Reduce任务中。通常,作业的输入和输出都存储在文件系统中。该框架负责安排任务,监控它们并重新执行失败的任务。
通常,计算节点和存储节点是相同的,即MapReduce框架和Hadoop分布式文件系统(参见HDFS架构指南)在同一组节点上运行。此配允许框架在已存在数据的节点上有效地调度任务,从而导致整个集群的聚合带宽非常高。
2.MapReduce的基本工作原理
2.1.Map阶段
2.1.1.Map源码解析
public class WordMapper extends
Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().toLowerCase());
context.write(word, one);
}
}
}
** 当我输入“hello world hello”并调用这段代码时,以下时代码的执行过程:**
public class WordMapper extends
Mapper<Object, Text, Text, IntWritable> {
初始化Mapper类
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
这里将输入的数据“hello world hello”,这个字符以Value参数传递给map方法。也就是说此时的map是这样的(偏移量,“hello world hello”)
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().toLowerCase());
context.write(word, one);
}
}
执行map方法
String line = value.toString();
输入Text对象value被转换为Java的String类型,并存储在变量line中。此时line的值就是“hello world hello”
StringTokenizer itr = new StringTokenizer(line);
StringTokenizer类用于将字符串line分解成单词。默认情况下是将根据空格来拆分成单词的。
while (itr.hasMoreTokens()) {
word.set(itr.nextToken().toLowerCase());
context.write(word, one);
}
循环遍历每个单词
word.set(itr.nextToken().toLowerCase()); 将当前单词转换为小写,并存储在word中
**context.write(word, one);**输出键值对,也就是一个单词对应一个one,one表示单词的计数固定为1。所以得出来就是(hello,1),(world,1),(hello,1)。它不仅用于输出键值对,还在整个MapReduce框架中充当与框架交互的接口,允许Mapper将中间数据传递给Reduce阶段。
2.1.2.map端代码总结
Map阶段是MapReduce作业的第一个步骤,主要负责将原始输入数据分解为一系列键值对(key-value pairs)。这个过程通常涉及到如下步骤:
输入数据的分片:原始数据通常存储在HDFS中,并被分割成若干个分片(splits),每个 分片通常对应一个HDFS块。MapReduce框架会为每个分片启动一个Map任务。
**数据解析与映射:**每个Map任务读取输入分片中的数据,并将其解析为键值对。例如,如果处理的是一个文本文件,Map任务可以将每一行文本作为一个记录,然后将该记录拆分成键值对,通常是单词作为键,出现次数作为值。
**中间键值对的生成:**通过自定义的Map函数,输入记录被转换为中间的键值对。这些中间的键值对。这些中间结果在整个集群存储,并为后续的shuffle阶段做准备。
示例:在一个单词计数应用中,map函数会将每个单词映射为一个键,值为1( 即出现次数)。比如,“hello world hello” 会被映射为(hello,1),(world,1),(hello,1)。
2.2.Shuffle and Sort阶段
该阶段是mapreduce作业的中间步骤,连接map阶段和reduce阶段。其中主要目的是将所有相同的键值对聚合到一起,并对他们进行排序。这个阶段可以分为以下几个步骤:
**shuffle(数据洗牌):**在map阶段生成的中间建制对被分发到相应的Reduce任务。具体来说,相同的建会被发送到同一个Reduce任务,以确保这些建可以一起被处理。
**sort(排序):**在Reduce任务接收到所有属于自己的键值对后,它们会根据键进行排序。排序的目的是为了保证Reduce阶段处理的数据是按键顺序排列的,这对某些需要顺序处理的数据非常重要。
** 数据合并与压缩(可选):**为了跳效率,Mapreduce可以在shuffle过程合并与压缩中间数据,从而减少网络传输的负担。
示例:在前速的单词计数应用中,shuffle and sort阶段会将相同的单词汇聚在一起,如(hello,1),(hello,1) 会汇总成(hello,[1,1])。
2.3.Reduce阶段
2.3.1.Reduce源码解析
public class WordReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, new IntWritable(sum));
}
}
经过上面2步的操作之后,此时我们得到的数据是(hello,[1,1]),(world,1)。
public class WordReducer extends
Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
首先初始化Reducer类,result 用于存储最终计算结果的IntWritable 对象。在Reduce函数中,我们将把相同键(单词)对应的所有值相加,最终将和存储在result中。
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, new IntWritable(sum));
}
执行Reduce函数
public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
key:这是来自Map阶段的一个键(在我们的例子中,就是一个单词)
values:这是一个可迭代的的IntWritable对象集合,表示所有与该key相关联的值(计数)。
在我们的例子中:key对应的是hello,values则是[1,1]。
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
遍历求和
result.set(sum);
context.write(key, new IntWritable(sum));
}
**result.set(sum);**将计算结果设置为result值。
**context.write(key, new IntWritable(sum));**输出最终的键值对。
我们例子中则是(hello,2),(world,1)。
2.3.2.Reduce端源码总结
reduce阶段是maprecduce作业最后一步,负责对shuffle and sort阶段输出的中间键值对进行汇总和处理,最终生成作业的输出结果。主要步骤包括:
**汇总与处理:**Reduce任务接收到某个键的所有值后,会调用用户第一reduce函数,对这些值进行汇总操作。例如,进行求和,求平均值,最大值,最小值等聚合操作。
**输出结果:**Reduce函数处理完毕后,生成最终的键值对结果,并将其写回hdfs中作为最终结果输出。通常,这些结果会存储微多个文件,以适应hadoop的分布式存储结构。
示例:继续前面的单词计数应用,Reduce函数会对每个单词的出现次数进行求和,如(hello,[1,1])将被转换为(hello,2),并输出到最终的结果文件中。
3.数据流与任务执行
3.1.数据输入与输出格式
3.1.1.TextInputFormat
TextInputFormat是MapReduce中最常见的输入格式之一。它将输入文件按行划分为记录,每一行会作为一个键值对。键通常是行的字节偏移量,值则是该行的内容。这种格式适用于处理纯文本文件,比如日志文件或CSV文件。
3.1.2.SequenceFileInputFormat
SequenceFileInputFormat用于处理hadoop的二进制文件格式。SequenceFile是一种用于存储键值对的hadoop特定格式,通常用于中间数据的存储和分布式缓存。这种格式适合存储大量小文件或需要高效读取的大规模数据。
3.1.3.KeyValueTextInputFormat
KeyValueTextInputFormat将输入文件的每一行作为一个键值对进行处理,行中的第一次单词作为键,其余部分作为值。这种格式在处理结构化文本数据时非常有用。
3.1.4.TextOutputFormat
TextOutputFormat是最常用的输出格式,默认情况下,输出为文本格式。每一行包含一个键值对,键和值之间由制表符分隔。这适合与需要将处理结果以文本形式存储的场景。
3.1.5.SequenceFileOutputFormat
SequenceFileOutputFormat将结果存储为hadoop的二进制格式SequenceFile,这样的文本具有较高的读取效率和压缩比,适合用于中间结果存储或者更大规模的数据存储需求。
3.2.任务调度执行流程
3.2.1.MapReduce1.X
1.提交作业:客户端首先向JobTracker提高一个作业。作业包含了需要执行的Map和Reduce任务、输入输出路径以及作业的配置参数等。
2.JobTracker分配任务:JobTracker负责整个集群的资源管理和任务调度。它会将作业分解成多个Map任务和Reduce任务,并将这些任务分配给集群中的各个TaskTracker。
3.TaskTracker执行任务:TaskTracker运行在每个集群节点上,负责接收并执行来自JobTracker的任务。TaskTracker会将任务分配给本地的工作线程,执行Map或Reduce操作。
4.任务执行与心跳机制:TaskTracker在执行任务的同时,会定期向JobTracker发送心跳信息,报告任务的执行状态以及当前的资源使用情况。心跳信息用于通知JobTracker节点的健康状态以及任务的执行进度。
5.失败处理与重试:如果JobTracker没有在规定时间内收到某个TaskTracker的心跳信息,或者TaskTracker报告了任务失败,JobTracker会将该任务重新分配给其他可用的TaskTracker执行。
6.任务完成:当所有的Map和Reduce任务都完成后,JobTracker会通知客户端作业完成,并输出最终的结果。
3.2.2.MapReduce2.x
1.启动作业:客户端的MapReduce程序通过JobClient启动作业,作业由客户端节点发起。
2.申请新的Application:jobClient向ResourceManager(在YARN架构中,它负责资源管理)申请一个新的Application ID。
3.复制作业资源:jobClient将作业所需的资源(例如JAR文件、配置文件等)复制到分布式文件系统(HDFS)中,以便作业在集群中执行时可以访问这些资源。
4.提交申请:jobClient将作业提交给ResourceManager,ResourceManager随后启动一个ApplicationMaster来管理该作业的执行。
5.初始化:jobTracker(在图中应是ResourceManager,负责集群资源的分配和管理)初始化并准备管理作业。
6.获取输入拆分:输入数据被分割为多个逻辑分片(input splits),每个Map任务将处理一个分片。这些分片是由InputFormat类管理的。
7.发送心跳信息:TaskTracker节点(在YARN中被称为NodeManager)定期向ResourceManager(或图中的jobTracker)发送心跳信息,报告其状态并请求新的任务。
8.获取工作资源:TaskTracker节点从HDFS获取作业所需的资源,这些资源包括输入数据和运行任务的必要文件。
9.启动JVM:TaskTracker节点启动一个子JVM(Java虚拟机),这个子JVM用于执行具体的Map或Reduce任务。
10.运行任务:子JVM开始执行分配给它的任务,可能是Map任务或Reduce任务,直至任务完成。
4.性能优化
4.1.常见优化策略
4.1.1.合并器(Combiner)
Combiner是一种本地Reduce操作,它在Map任务的输出阶段起作用,用来合并相同的键值对,从而减少需要传输到Reduce任务的数据量。
如果Map阶段的输出可以通过局部合并得到较小的中间结果,使用Combiner可以显著减少网络传输的负担。Combiner通常用于诸如计数、求和等操作。
4.1.2.Partitioner(分区器)
Partitioner负责将Map阶段输出的键值对分配到不同的Reduce任务中。默认的分区方式是根据键的哈希值,但在某些情况下,自定义分区器可以帮助数据均衡地分配到各个Reduce任务中,从而避免某些Reduce任务过载。
自定义Partitioner可以确保数据按某种逻辑进行分配,例如基于键的范围进行分区,使每个Reduce任务处理的工作量更加均衡。
4.1.3.调整Map和Reduce任务数
Map任务的数量由输入数据的分片大小决定。可以通过调整 mapreduce.input.fileinputformat.split.minsize 来控制每个Map任务处理的分片大小。适当增加分片大小可以减少Map任务的数量,减少调度开销。
Reduce任务数量的设置应根据中间数据量和集群资源进行调整。过少的Reduce任务会导致负载集中在少数几个节点,过多则会增加调度和管理的开销。一般建议设置Reduce任务数量为集群Reduce slot的1-1.5倍。
4.1.4.压缩中间数据
MapReduce的中间数据传输量可能很大,通过启用压缩可以减少网络传输时间以及磁盘I/O的开销。推荐使用Snappy和LZO是高效的压缩格式,适合MapReduce任务。
4.1.5.数据本地化
MapReduce会优先在数据存储所在的节点上运行Map任务,这被称为数据本地化。为了提高性能,应尽量确保数据分布均匀,并避免热点问题。
在HDFS上,确保数据块被均匀分布在集群的不同节点上,以最大化数据本地化的机会。
4.1.6.增加并行度
MapReduce作业中的每个Map和Reduce任务都可以在各自的JVM中并行运行。增加集群的并行度可以提升处理速度。
使用YARN可以动态调整集群的资源分配,确保MapReduce作业能够使用到更多的资源。
5.MapReduce的局限性与扩展
5.1.局限性
5.1.1.迭代任务效率低
MapReduce模型在处理迭代任务(如机器学习中的多轮训练)时表现不佳。每次迭代都需要从磁盘读取数据,处理完成后再将结果写回磁盘,这导致了大量的I/O操作。
这种I/O密集型的操作导致迭代任务的整体执行效率低下,增加了计算的时间成本。
5.1.2.延迟高
MapReduce模型的工作流程设计使得作业的启动和数据传输具有一定的延迟。任务调度、数据分发以及之间结果的处理都需要花费时间,尤其是在处理实时数据或低延迟需求的应用时、MapReduce表现不够理想。
延迟问题使得MapReduce难以应用于需要实时响应的数据处理场景,如实时数据分析和流式数据处理。
5.1.3.模型单一
MapReduce主要适用于批处理任务,对于诸如图计算、流处理等需要更加复杂数据处理模型的场景,MapReduce显得力不从心。它缺乏对这些模型的原生支持,开发者需要通过复杂的工作流程和代码来弥补。
这限制了MapReduce在多样化数据处理任务中的适用性,迫使开发者寻求更加灵活的计算框架。
5.1.4.资源管理不足
MapReduce 1.x中,JobTracker既负责任务调度,又负责资源管理,这种集中式架构导致了系统的可扩展性问题。当集群规模变大时,JobTracker容易成为性能瓶颈,并且存在单点故障的风险。
这种资源管理方式限制了集群规模的扩展性,影响了大规模集群的稳定性和高效性。
版权归原作者 sheansavage 所有, 如有侵权,请联系我们删除。