0


Hadoop: Mapreduce了解

1.MapReduce概述

    Hadoop MapReduce是一个软件框架,用于轻松编写应用程序,这些应用程序以可靠、容错的方式在大型集群(数千个节点)的商用硬件上并行大量数据(数TB数据集)。

    MapReduce作业通常将输入数据集分割成独立的块,这些块由映射任务以完全并行的方式进行处理。该框架对映射的输出进行排序,然后将其输入到Reduce任务中。通常,作业的输入和输出都存储在文件系统中。该框架负责安排任务,监控它们并重新执行失败的任务。
     通常,计算节点和存储节点是相同的,即MapReduce框架和Hadoop分布式文件系统(参见HDFS架构指南)在同一组节点上运行。此配允许框架在已存在数据的节点上有效地调度任务,从而导致整个集群的聚合带宽非常高。

2.MapReduce的基本工作原理

2.1.Map阶段

2.1.1.Map源码解析

public class WordMapper extends  
            Mapper<Object, Text, Text, IntWritable> {  
  
        private final static IntWritable one = new IntWritable(1);  
        private Text word = new Text();  
  
        public void map(Object key, Text value, Context context)  
                throws IOException, InterruptedException {  
            String line = value.toString();  
            StringTokenizer itr = new StringTokenizer(line);  
            while (itr.hasMoreTokens()) {  
                word.set(itr.nextToken().toLowerCase());  
                context.write(word, one);  
            }  
        }  
    }

** 当我输入“hello world hello”并调用这段代码时,以下时代码的执行过程:**

public class WordMapper extends  
            Mapper<Object, Text, Text, IntWritable> {  

初始化Mapper类

    private final static IntWritable one = new IntWritable(1);  
    private Text word = new Text();

这里将输入的数据“hello world hello”,这个字符以Value参数传递给map方法。也就是说此时的map是这样的(偏移量,“hello world hello”)

public void map(Object key, Text value, Context context)  
        throws IOException, InterruptedException {  
    String line = value.toString();  
    StringTokenizer itr = new StringTokenizer(line);  
    while (itr.hasMoreTokens()) {  
        word.set(itr.nextToken().toLowerCase());  
        context.write(word, one);  
    }  
}

执行map方法

String line = value.toString();

输入Text对象value被转换为Java的String类型,并存储在变量line中。此时line的值就是“hello world hello”

StringTokenizer itr = new StringTokenizer(line);

StringTokenizer类用于将字符串line分解成单词。默认情况下是将根据空格来拆分成单词的。

while (itr.hasMoreTokens()) {  
    word.set(itr.nextToken().toLowerCase());  
    context.write(word, one);  
}

循环遍历每个单词

word.set(itr.nextToken().toLowerCase()); 将当前单词转换为小写,并存储在word中

**context.write(word, one);**输出键值对,也就是一个单词对应一个one,one表示单词的计数固定为1。所以得出来就是(hello,1),(world,1),(hello,1)。它不仅用于输出键值对,还在整个MapReduce框架中充当与框架交互的接口,允许Mapper将中间数据传递给Reduce阶段。

2.1.2.map端代码总结

Map阶段是MapReduce作业的第一个步骤,主要负责将原始输入数据分解为一系列键值对(key-value pairs)。这个过程通常涉及到如下步骤:
输入数据的分片:原始数据通常存储在HDFS中,并被分割成若干个分片(splits),每个 分片通常对应一个HDFS块。MapReduce框架会为每个分片启动一个Map任务。

    **数据解析与映射:**每个Map任务读取输入分片中的数据,并将其解析为键值对。例如,如果处理的是一个文本文件,Map任务可以将每一行文本作为一个记录,然后将该记录拆分成键值对,通常是单词作为键,出现次数作为值。

    **中间键值对的生成:**通过自定义的Map函数,输入记录被转换为中间的键值对。这些中间的键值对。这些中间结果在整个集群存储,并为后续的shuffle阶段做准备。

示例:在一个单词计数应用中,map函数会将每个单词映射为一个键,值为1( 即出现次数)。比如,“hello world hello” 会被映射为(hello,1),(world,1),(hello,1)。

2.2.Shuffle and Sort阶段

    该阶段是mapreduce作业的中间步骤,连接map阶段和reduce阶段。其中主要目的是将所有相同的键值对聚合到一起,并对他们进行排序。这个阶段可以分为以下几个步骤:

    **shuffle(数据洗牌):**在map阶段生成的中间建制对被分发到相应的Reduce任务。具体来说,相同的建会被发送到同一个Reduce任务,以确保这些建可以一起被处理。

    **sort(排序):**在Reduce任务接收到所有属于自己的键值对后,它们会根据键进行排序。排序的目的是为了保证Reduce阶段处理的数据是按键顺序排列的,这对某些需要顺序处理的数据非常重要。

   ** 数据合并与压缩(可选):**为了跳效率,Mapreduce可以在shuffle过程合并与压缩中间数据,从而减少网络传输的负担。

示例:在前速的单词计数应用中,shuffle and sort阶段会将相同的单词汇聚在一起,如(hello,1),(hello,1) 会汇总成(hello,[1,1])。

2.3.Reduce阶段

2.3.1.Reduce源码解析

public class WordReducer extends  
            Reducer<Text, IntWritable, Text, IntWritable> {  
        private IntWritable result = new IntWritable();  
  
        public void reduce(Text key, Iterable<IntWritable> values,  
                Context context) throws IOException, InterruptedException {  
            int sum = 0;  
            for (IntWritable val : values) {  
                sum += val.get();  
            }  
            result.set(sum);  
            context.write(key, new IntWritable(sum));  
        }  
    }

经过上面2步的操作之后,此时我们得到的数据是(hello,[1,1]),(world,1)。

public class WordReducer extends  
            Reducer<Text, IntWritable, Text, IntWritable> {  
        private IntWritable result = new IntWritable();

首先初始化Reducer类,result 用于存储最终计算结果的IntWritable 对象。在Reduce函数中,我们将把相同键(单词)对应的所有值相加,最终将和存储在result中。

public void reduce(Text key, Iterable<IntWritable> values,  
                Context context) throws IOException, InterruptedException {
    int sum = 0;
    for (IntWritable val : values) {
        sum += val.get();
    }
    result.set(sum);
    context.write(key, new IntWritable(sum));
}

执行Reduce函数

public void reduce(Text key, Iterable<IntWritable> values,  
                Context context) throws IOException, InterruptedException {

key:这是来自Map阶段的一个键(在我们的例子中,就是一个单词)

values:这是一个可迭代的的IntWritable对象集合,表示所有与该key相关联的值(计数)。

在我们的例子中:key对应的是hello,values则是[1,1]。

    int sum = 0;
    for (IntWritable val : values) {
        sum += val.get();
    }

遍历求和

    result.set(sum);
    context.write(key, new IntWritable(sum));
}

**result.set(sum);**将计算结果设置为result值。

**context.write(key, new IntWritable(sum));**输出最终的键值对。

我们例子中则是(hello,2),(world,1)。

2.3.2.Reduce端源码总结

    reduce阶段是maprecduce作业最后一步,负责对shuffle and sort阶段输出的中间键值对进行汇总和处理,最终生成作业的输出结果。主要步骤包括:

    **汇总与处理:**Reduce任务接收到某个键的所有值后,会调用用户第一reduce函数,对这些值进行汇总操作。例如,进行求和,求平均值,最大值,最小值等聚合操作。

    **输出结果:**Reduce函数处理完毕后,生成最终的键值对结果,并将其写回hdfs中作为最终结果输出。通常,这些结果会存储微多个文件,以适应hadoop的分布式存储结构。

示例:继续前面的单词计数应用,Reduce函数会对每个单词的出现次数进行求和,如(hello,[1,1])将被转换为(hello,2),并输出到最终的结果文件中。

3.数据流与任务执行

3.1.数据输入与输出格式

3.1.1.TextInputFormat

    TextInputFormat是MapReduce中最常见的输入格式之一。它将输入文件按行划分为记录,每一行会作为一个键值对。键通常是行的字节偏移量,值则是该行的内容。这种格式适用于处理纯文本文件,比如日志文件或CSV文件。

3.1.2.SequenceFileInputFormat

    SequenceFileInputFormat用于处理hadoop的二进制文件格式。SequenceFile是一种用于存储键值对的hadoop特定格式,通常用于中间数据的存储和分布式缓存。这种格式适合存储大量小文件或需要高效读取的大规模数据。

3.1.3.KeyValueTextInputFormat

    KeyValueTextInputFormat将输入文件的每一行作为一个键值对进行处理,行中的第一次单词作为键,其余部分作为值。这种格式在处理结构化文本数据时非常有用。

3.1.4.TextOutputFormat

    TextOutputFormat是最常用的输出格式,默认情况下,输出为文本格式。每一行包含一个键值对,键和值之间由制表符分隔。这适合与需要将处理结果以文本形式存储的场景。

3.1.5.SequenceFileOutputFormat

    SequenceFileOutputFormat将结果存储为hadoop的二进制格式SequenceFile,这样的文本具有较高的读取效率和压缩比,适合用于中间结果存储或者更大规模的数据存储需求。

3.2.任务调度执行流程

3.2.1.MapReduce1.X

1.提交作业:客户端首先向JobTracker提高一个作业。作业包含了需要执行的Map和Reduce任务、输入输出路径以及作业的配置参数等。

2.JobTracker分配任务:JobTracker负责整个集群的资源管理和任务调度。它会将作业分解成多个Map任务和Reduce任务,并将这些任务分配给集群中的各个TaskTracker。

3.TaskTracker执行任务:TaskTracker运行在每个集群节点上,负责接收并执行来自JobTracker的任务。TaskTracker会将任务分配给本地的工作线程,执行Map或Reduce操作。

4.任务执行与心跳机制:TaskTracker在执行任务的同时,会定期向JobTracker发送心跳信息,报告任务的执行状态以及当前的资源使用情况。心跳信息用于通知JobTracker节点的健康状态以及任务的执行进度。

5.失败处理与重试:如果JobTracker没有在规定时间内收到某个TaskTracker的心跳信息,或者TaskTracker报告了任务失败,JobTracker会将该任务重新分配给其他可用的TaskTracker执行。

6.任务完成:当所有的Map和Reduce任务都完成后,JobTracker会通知客户端作业完成,并输出最终的结果。

3.2.2.MapReduce2.x

1.启动作业:客户端的MapReduce程序通过JobClient启动作业,作业由客户端节点发起。

2.申请新的Application:jobClient向ResourceManager(在YARN架构中,它负责资源管理)申请一个新的Application ID。

3.复制作业资源:jobClient将作业所需的资源(例如JAR文件、配置文件等)复制到分布式文件系统(HDFS)中,以便作业在集群中执行时可以访问这些资源。

4.提交申请:jobClient将作业提交给ResourceManager,ResourceManager随后启动一个ApplicationMaster来管理该作业的执行。

5.初始化:jobTracker(在图中应是ResourceManager,负责集群资源的分配和管理)初始化并准备管理作业。

6.获取输入拆分:输入数据被分割为多个逻辑分片(input splits),每个Map任务将处理一个分片。这些分片是由InputFormat类管理的。

7.发送心跳信息:TaskTracker节点(在YARN中被称为NodeManager)定期向ResourceManager(或图中的jobTracker)发送心跳信息,报告其状态并请求新的任务。

8.获取工作资源:TaskTracker节点从HDFS获取作业所需的资源,这些资源包括输入数据和运行任务的必要文件。

9.启动JVM:TaskTracker节点启动一个子JVM(Java虚拟机),这个子JVM用于执行具体的Map或Reduce任务。

10.运行任务:子JVM开始执行分配给它的任务,可能是Map任务或Reduce任务,直至任务完成。

4.性能优化

4.1.常见优化策略

4.1.1.合并器(Combiner)

    Combiner是一种本地Reduce操作,它在Map任务的输出阶段起作用,用来合并相同的键值对,从而减少需要传输到Reduce任务的数据量。

    如果Map阶段的输出可以通过局部合并得到较小的中间结果,使用Combiner可以显著减少网络传输的负担。Combiner通常用于诸如计数、求和等操作。

4.1.2.Partitioner(分区器)

    Partitioner负责将Map阶段输出的键值对分配到不同的Reduce任务中。默认的分区方式是根据键的哈希值,但在某些情况下,自定义分区器可以帮助数据均衡地分配到各个Reduce任务中,从而避免某些Reduce任务过载。

    自定义Partitioner可以确保数据按某种逻辑进行分配,例如基于键的范围进行分区,使每个Reduce任务处理的工作量更加均衡。

4.1.3.调整Map和Reduce任务数

    Map任务的数量由输入数据的分片大小决定。可以通过调整 mapreduce.input.fileinputformat.split.minsize 来控制每个Map任务处理的分片大小。适当增加分片大小可以减少Map任务的数量,减少调度开销。

    Reduce任务数量的设置应根据中间数据量和集群资源进行调整。过少的Reduce任务会导致负载集中在少数几个节点,过多则会增加调度和管理的开销。一般建议设置Reduce任务数量为集群Reduce slot的1-1.5倍。

4.1.4.压缩中间数据

    MapReduce的中间数据传输量可能很大,通过启用压缩可以减少网络传输时间以及磁盘I/O的开销。推荐使用Snappy和LZO是高效的压缩格式,适合MapReduce任务。

4.1.5.数据本地化

    MapReduce会优先在数据存储所在的节点上运行Map任务,这被称为数据本地化。为了提高性能,应尽量确保数据分布均匀,并避免热点问题。

    在HDFS上,确保数据块被均匀分布在集群的不同节点上,以最大化数据本地化的机会。

4.1.6.增加并行度

    MapReduce作业中的每个Map和Reduce任务都可以在各自的JVM中并行运行。增加集群的并行度可以提升处理速度。

    使用YARN可以动态调整集群的资源分配,确保MapReduce作业能够使用到更多的资源。

5.MapReduce的局限性与扩展

5.1.局限性

5.1.1.迭代任务效率低

    MapReduce模型在处理迭代任务(如机器学习中的多轮训练)时表现不佳。每次迭代都需要从磁盘读取数据,处理完成后再将结果写回磁盘,这导致了大量的I/O操作。

    这种I/O密集型的操作导致迭代任务的整体执行效率低下,增加了计算的时间成本。

5.1.2.延迟高

    MapReduce模型的工作流程设计使得作业的启动和数据传输具有一定的延迟。任务调度、数据分发以及之间结果的处理都需要花费时间,尤其是在处理实时数据或低延迟需求的应用时、MapReduce表现不够理想。

    延迟问题使得MapReduce难以应用于需要实时响应的数据处理场景,如实时数据分析和流式数据处理。

5.1.3.模型单一

    MapReduce主要适用于批处理任务,对于诸如图计算、流处理等需要更加复杂数据处理模型的场景,MapReduce显得力不从心。它缺乏对这些模型的原生支持,开发者需要通过复杂的工作流程和代码来弥补。

    这限制了MapReduce在多样化数据处理任务中的适用性,迫使开发者寻求更加灵活的计算框架。      

5.1.4.资源管理不足

    MapReduce 1.x中,JobTracker既负责任务调度,又负责资源管理,这种集中式架构导致了系统的可扩展性问题。当集群规模变大时,JobTracker容易成为性能瓶颈,并且存在单点故障的风险。

    这种资源管理方式限制了集群规模的扩展性,影响了大规模集群的稳定性和高效性。

本文转载自: https://blog.csdn.net/sheansavage/article/details/138683734
版权归原作者 sheansavage 所有, 如有侵权,请联系我们删除。

“Hadoop: Mapreduce了解”的评论:

还没有评论