作者:禅与计算机程序设计艺术
1.简介
Hadoop MapReduce(以下简称MR)是一个分布式计算框架,基于Google开发,用于并行处理海量数据集。其提供简单、高效的数据处理能力,并可运行于多种平台上,广泛应用于数据分析领域。因此,掌握MR的原理及其工作方式对于利用它进行海量数据的分布式计算,以及优化数据处理性能都非常重要。本文从MapReduce框架的基本原理出发,深入剖析了其工作原理、流程、工作机制,并结合具体案例,详尽阐述了MR的各项特性及优缺点,同时也提供了一些扩展阅读的参考资源。
2.主要特点
- 分布式计算框架
- 可运行在多种平台上
- 支持海量数据处理
- 提供高效的数据分析能力
- 有优秀的编程模型和生态系统
- 通过Hadoop项目发展而成熟,被业界广泛采用
- 拥有丰富的工具及插件支持
3.适用场景
- 数据分析、机器学习、网络爬虫、推荐系统、日志分析、数据挖掘等需要大量计算处理的应用场景;
- 实时或离线批处理、实时计算和流处理等数据处理需求;
- 在线业务交易、广告投放、搜索排序等实时处理需求;
- 大规模集群计算资源的管理和任务调度。
4.总体结构
Hadoop MapReduce由一个M(master)和R(slave/worker)组成,如下图所示。Master负责分配任务,记录执行进度;Slave负责处理任务,读取数据、执行计算、生成结果。Master和Slave之间通过网络通信,实现任务的分发、协作和监控。
其中,Map是将数据映射到一系列键值对上的过程,Reduce是根据一组键相同的值对结果进行汇总。MapReduce分为两个阶段,第一个阶段为Map任务,将输入数据切分为较小的独立任务,并将每个任务作为输入传给Map函数,获取中间结果。第二个阶段为Reduce任务,将Map输出的结果进行汇总,最终得到想要的结果。
2. 基本概念术语说明
2.1 任务类别及其分类
Hadoop MapReduce有三种类型的任务,包括MapTask、ShuffleTask和ReduceTask。它们的区别如下表:
类别描述MapTask对于每块数据,MapTask都会对其中的元素执行用户自定义的map函数,并且会产生N个键值对(其中N可以随意指定),将这些键值对作为中间结果输出给reduce任务。ShuffleTask当MapTask产生的键太多时,就会发生Shuffle,也就是将这些键值对重新组合成更少的键值对,这样就可以减少下一步reduce任务的输入量,提升整体运行效率。ReduceTask对MapTask的输出结果进行汇总处理,返回最终结果。
2.2 分布式文件系统HDFS
Hadoop Distributed File System (HDFS),是一个分布式文件系统,由Apache基金会维护,提供高容错性、高吞吐量的数据访问接口。它允许多个节点存储文件,且具有高容错性。它能够存储超大文件,在内存中处理,并通过网络自动复制文件以保证高可用性。HDFS以客户机-服务器模式运行,客户端向NameNode请求文件信息,然后直接与DataNode通信交互,实现数据读写。NameNode记录了文件的元数据信息,比如文件路径、大小、副本数量等。HDFS具有高容错性和高吞吐量,并能够在不停机状态下对其进行动态调整,以应对突发的瞬间流量高峰。
2.3 YARN
Yet Another Resource Negotiator(YARN,另一种资源协商者)是Hadoop项目下的一个子项目,其目的就是管理Hadoop集群资源。它提供了四个功能模块:资源管理、应用提交、集群资源调度和容错。
2.4 JobConf
JobConf(作业配置)是一个配置文件,它存储了启动MapReduce作业所需的参数信息。JobConf被用来传递参数、设置作业的属性、确定作业输入/输出路径,以及确定作业要使用的Mapper和Reducer函数等信息。JobConf可以通过编程的方式或者命令行的方式设置参数,并且支持动态修改。
3. 核心算法原理和具体操作步骤以及数学公式讲解
3.1 Map阶段
Map是MapReduce框架的一个核心组件,负责对输入的数据集进行划分、处理、转换,以产生中间结果。它是一次性的、本地化的计算,所有计算都是在一台机器上完成。Map阶段通过把数据集切分成独立的键值对,并调用用户定义的Map函数处理每个键值对,然后将键值对的输出结果存储到磁盘或内存中。
3.1.1 Map输入数据集
输入数据集可能是文本文件、压缩文件、数据库中的表格数据、日志文件,甚至可能是实时的流数据。
3.1.2 Map函数
Map函数接受一个输入键值对,并返回一个或多个键值对。一般来说,Map函数的目的是将输入数据集划分为一系列的键值对,即一组键和一组值。
3.1.2.1 Map输出结果
Map输出结果可能保存到内存或磁盘,取决于计算环境。但一般建议使用磁盘存储,因为内存很有限,而且如果在内存中存储太多数据,可能会导致内存溢出,影响整体性能。
3.1.3 Map任务过程
Map任务过程可以概括为以下几个步骤:
- 从输入数据集中读取一块数据,作为map任务的输入。
- 对该块数据进行预处理,去除无关数据,清理杂质,如果需要的话还可以在此过程中对数据进行格式转换。
- 将预处理后的数据分割成为多行,每行成为一个输入键值对。
- 执行用户自定义的map函数,对于每个输入键值对,执行map逻辑,产生零个或多个输出键值对。
- 将输出键值对写入磁盘或内存中。
- 检查是否还有更多的输入数据,如果有则回到第1步继续执行。
- 等待所有map任务完成,然后继续执行reduce任务。
3.1.4 Map函数设计指导方针
- 只在map函数中对输入数据进行操作,不要对数据进行复杂的过滤、排序等操作,否则会降低整个MapReduce计算的速度;
- 使用外部变量(Counters)来进行计数统计;
- 对Mapper中的key和value的长度保持在可接受的范围内(比如1MB或10MB);
- 不要在mapper里面做过多的网络IO操作;
- 不要在mapper里面做过多的内存运算;
- 不要在mapper里面做太多的复杂的算法运算;
- 避免在Mapper中进行大的磁盘I/O操作;
- Mapper的输出结果尽量不要超过2GB,否则可能导致JVM内存溢出;
- 在Reducer中对数据进行合并操作,而不是在Mapper中;
- 注意避免Mapper之间的依赖关系过多,减少数据shuffle,可以考虑增加partition数量来减少数据shuffle的影响。
3.2 Shuffle阶段
当MapTask产生的键太多时,就会发生Shuffle,也就是将这些键值对重新组合成更少的键值对,这样就可以减少下一步reduce任务的输入量,提升整体运行效率。
3.2.1 Shuffle过程
Shuffle过程可以概括为以下几个步骤:
- MapTask将输出结果写入磁盘或内存中。
- Shuffle过程将Map输出结果按照Key进行分组,将属于同一个Group的记录放在一起。
- 如果属于不同Group的记录太多,将会发生Spill Over。此时,Spill Over即溢写,数据溢出到磁盘上,以解决内存空间不足的问题。
- Shuffle过程将各个MapTask的输出结果分别缓存在不同的主机上,以减少网络带宽占用。
- Shuffle过程合并分组后的结果,形成一个大的中间结果。
- 此时,MapTask结束。
3.2.2 实现机制
- Shuffle过程采用全排序算法,每次从磁盘加载数据时,都会对数据进行排序,以便于后续按key分组;
- MapTask的输出结果首先被分区(Partition),然后根据key进行排序,如果key相同的记录存放在一起,最后才合并成中间结果;
- shuffle过程可以细粒度(Coarse Grained)的shuffle,也可以粗粒度的shuffle;
- reduce端将所需的键值对拉取到内存,并对数据进行排序,在磁盘上持久化缓存,以便于下次数据重复使用。
3.2.3 Spill Over机制
当某个MapTask输出的结果过多,无法全部放在内存中,就需要溢写到磁盘。在一般情况下,Spill Over主要发生在Shuffle阶段。在HDFS中,属于不同MapTask的输出结果保存在不同的块中,不会影响其他MapTask的运行,因此Spill Over只会在某些特定情况下发生。一般情况下,Spill Over不会影响MapReduce计算的正确性,但是会导致数据落盘,影响性能。
3.2.4 性能优化建议
- 减少磁盘I/O操作:在Hadoop中,当MapTask输出结果时,默认情况下,每次只写一个block,这样可以减少磁盘I/O操作;
- 避免分片过多:当一个reducer处理的数据量比较少时,可以选择将其分片过多,即将一个reducer处理的分片个数设置为和其数据量相同;
- 设置合理的mapreduce.task.io.sort.mb:默认情况下,mapreduce.task.io.sort.mb设置为100M,可以适当增大这个参数;
- 减少临时内存消耗:当多个MapTask并行运行时,如果临时内存不能完全放入内存,就会出现OOM,所以可以适当减少临时内存的占用。
3.3 Reduce阶段
Reduce阶段也是MapReduce框架的一个核心组件,负责对MapTask的输出结果进行汇总,以得出最终结果。它也是一次性的、本地化的计算,所有计算都是在一台机器上完成。Reduce阶段是由用户自定义的reduce函数处理的,它接收来自MapTask的一组键值对,汇总它们,生成一个新的键值对作为输出结果。
3.3.1 Reduce输入数据集
Reduce输入数据集由MapTask的输出结果构成。
3.3.2 Reduce函数
Reduce函数接收来自MapTask的一组键值对,并返回一个或多个键值对。
3.3.3 Reduce任务过程
Reduce任务过程可以概括为以下几个步骤:
- 从磁盘或内存中读取输入数据,作为reduce任务的输入。
- 执行用户自定义的reduce函数,对输入键值对进行处理,生成零个或多个输出键值对。
- 将输出键值对写入磁盘或内存中。
- 检查是否还有更多的输入数据,如果有则回到第1步继续执行。
- 返回最终结果。
3.3.4 Reduce函数设计指导方针
- 在Reduce函数中只对输入数据进行操作,不要对数据进行复杂的过滤、排序等操作,否则会降低整个MapReduce计算的速度;
- 使用外部变量(Counters)来进行计数统计;
- 对Reducer中的key和value的长度保持在可接受的范围内(比如1MB或10MB);
- 不要在Reducer里面做过多的网络IO操作;
- 不要在Reducer里面做过多的内存运算;
- 不要在Reducer里面做太多的复杂的算法运算;
- Reducer的输出结果尽量不要超过2GB,否则可能导致JVM内存溢出;
- 在Reducer中对数据进行合并操作,而不是在MapTask中;
- 注意避免Reducer之间的依赖关系过多,减少数据shuffle,可以考虑增加partition数量来减少数据shuffle的影响。
4. 具体代码实例和解释说明
4.1 WordCount示例
WordCount的目的是统计一段文本文件中出现的单词的频率。假设我们有一个文件test.txt,它的内容如下:
hello world hello hadoop spark mapreduce spark hadoop hello mapreduce spark
WordCount的Map阶段的过程如下:
- 每个输入记录作为一条字符串,中间结果保存在内存或磁盘。
- 源字符串会被拆分为单词,并标记出每个单词的起始位置和结束位置。
- 对于每个单词,输出一条键值对,其中键为单词,值为1。
WordCount的Reduce阶段的过程如下:
- 每个输入键值对表示一个单词和它的频率,中间结果保存在内存或磁盘。
- 对同一个单词的多个频率进行累加。
- 输出最终的单词频率。
4.2 TopK问题示例
TopK问题是在排序的基础上,找到前k个最大或最小的元素,这是一个经典的问题。举个例子,假如有一个包含了一千万条订单数据的日志文件,每条日志记录包含了一个商品ID,我们希望找出最热门的10个商品。这个问题的Map阶段的过程如下:
- 每个输入记录代表一个订单,中间结果保存在内存或磁盘。
- 输入记录中包含了一个商品ID字段,将其作为键输出。
- 值输出为空。
TopK问题的Reduce阶段的过程如下:
- 每个输入键值对代表一个商品ID和空值,中间结果保存在内存或磁盘。
- 将相同商品的键值对输出到一个中间结果集合。
- 将中间结果集合排序。
- 取出排名前k的商品。
- 返回最终结果。
4.3 衍生问题
4.3.1 自定义键排序
假设我们想按照自定义规则对MapTask的输出结果进行排序,比如按照单词出现次数的倒序排序。这时候我们可以自定义Map输出结果的Value类型,将单词出现的次数作为Value,并实现自定义的Partitioner,将同样的单词划分到同一个分区中,即可实现自定义排序。
4.3.2 中间值聚合
假设MapTask的输出结果不是直接送入Reduce阶段,而是先经过一系列的中间操作,最终聚合到一起。这种情况可以用自定义的Combiner来完成,它可以帮助Reduce阶段对中间结果进行聚合操作,进一步提升性能。
4.3.3 基于Map输出结果进行分组
有的时候,MapTask的输出结果并不是一组键值对,而是要先进行分组再进行后续处理。这时候我们可以自定义Reduce的输入类型,传入分组后的对象列表作为输入,然后自定义Reduce的函数,对每个分组进行处理。
版权归原作者 禅与计算机程序设计艺术 所有, 如有侵权,请联系我们删除。