Hadoop中的MapReduce
在大数据处理领域,Hadoop的MapReduce框架扮演着至关重要的角色。它提供了一种有效的方式来处理大规模数据集,实现并行计算和分布式处理。本文将介绍MapReduce的基本概念、工作原理以及如何在Hadoop环境中编写和运行MapReduce作业。
一、 MapReduce概述
1.MapReduce是一种编程模型,用于处理大规模数据集的并行计算。它将任务分解成两个关键阶段:Map阶段和Reduce阶段。在Map阶段,数据被切分成独立的数据块,并由多个Map任务并行处理;在Reduce阶段,Map任务的输出被汇总和整合,最终生成最终结果。
定义:MapReduce是一个分布式运算程序的编程框架,其核心功能是将用户编写的业务逻辑代码和自带的默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
一个基本完整的MapReduce程序流程,包括:数据分片-数据映射-数据混洗-数据归约-数据输出
二、MapReduce工作流程
- 1、MapReduce的工作流程主要包括以下步骤:
- 2、Input(输入):读取文本文件。
- 3、Splitting(拆分):将文本按行拆分,得到K1表示行数,V1表示对应行的文本内容。
- 4、Mapping(映射):将每一行按空格并行拆分处理,排序得到List(K2,V2),其中K2表示每一个单词,V2的值为1,代表出现1次。
- 5、Shuffling(混洗):由于Mapping操作可能是在不同的机器上并行处理,所以需要通过Shuffling将相同key值的数据分发到同一个节点上去合并,然后排序。
- 6、Reducing(归约):对List(V2)进行归约求和操作,统计每个单词出现的总次数。
- 7、Final Result(最终结果):最终输出文件保存在文件系统(如HDFS)上。
- 三、MapReduce技术特点易于编程:Hadoop的一个重要设计目标是简化分布式程序设计,用户只需专注于自己的应用程序逻辑实现。高容错性:Hadoop通过计算迁移或数据迁移等策略提高集群的可用性与容错性。适合非交互式计算:MapReduce适用于一次性大规模数据处理,不适合需要即时交互的应用场景。
四、MapReduce适用场景
- 数据挖掘和分析:MapReduce技术适用于大规模的数据挖掘和分析任务,可以方便地处理海量的结构化和非结构化数据。
- 分布式搜索引擎:MapReduce可以用于构建分布式的搜索引擎,通过并行计算来提高搜索效率。
- 日志处理和分析:许多互联网公司使用MapReduce来处理大规模的日志数据,以便进行性能监控、用户行为分析等工作。
五、MapReduce的工作原理
- Map阶段:- 数据输入:输入数据通常存储在HDFS上,被切分成多个数据块(splits)。- Map任务:每个Map任务读取一个数据块,并根据用户定义的map()函数处理这些数据,通常包括解析输入数据,提取键值对,并应用转换逻辑。- 中间输出:Map任务生成的中间键值对被写入本地磁盘,等待进一步处理。- 分区和排序:中间键值对根据它们的键进行分区和排序,以确保具有相同键的键值对被发送到同一个Reduce任务进行处理。
- Reduce阶段:- Reduce任务:接收来自Map任务的排序后的键值对,根据它们的键被分发到不同的Reduce任务。- 归约函数:对于每个唯一的键,Reduce任务接收一个与该键相关的值列表,并通过用户定义的reduce()函数将这些值合并成一个单一的输出值。- 最终输出:Reduce任务的输出被写入HDFS或其他支持的文件系统,作为MapReduce作业的结果。
- Driver阶段:- 作业配置:客户端代码设置MapReduce作业的配置参数,包括输入和输出路径、Mapper和Reducer类、序列化器和反序列化器、分区器和排序器等。- 作业提交:将作业提交到Hadoop集群,其中包括与ResourceManager通信以请求资源,并将作业配置和作业资源打包发送到集群中的NodeManager。- 作业监控:监控作业的执行情况,例如查询ResourceManager以获取作业状态信息,并在需要时采取适当的操作(如取消作业或重新提交作业)。- 结果处理:处理作业的输出结果,例如将结果数据加载到数据库或进行进一步的分析。- 需要注意的是,Driver阶段并不直接参与数据的Map和Reduce处理,但它对于启动、配置、提交和监控MapReduce作业至关重要。
总的来说,MapReduce采用了分布式计算的思想,通过将作业分解成Map和Reduce阶段进行并行处理,从而有效地处理大规模数据。同时,Driver阶段负责作业的配置、提交和监控,确保整个作业能够顺利执行。
- 示例:WordCount示例:
- Map阶段:
package com.hadoop.mapreduce.wordcountlinux;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { private Text outKey = new Text(); private IntWritable outValue = new IntWritable(1); @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); // 遍历单词数组,为每个单词设置outKey,并输出<outKey, outValue>键值对 for (String word : words) { outKey.set(word); context.write(outKey, outValue); } }}
Reduce阶段:package com.hadoop.mapreduce.wordcountlinux;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { private IntWritable outValue = new IntWritable(); @Override protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException { int sum = 0; // 遍历values,将每个value相加得到sum for (IntWritable value : values) { sum += value.get(); } // 将sum设置为outValue,并输出结果 outValue.set(sum); context.write(key, outValue); }}
Driver阶段:package com.hadoop.mapreduce.wordcountlinux;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException{ //1.获取jod Configuration conf =new Configuration(); //2.设置路径 Job job = Job.getInstance(conf); job.setJarByClass(WordCountDriver.class); //3.关联mapper和reducer job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReducer.class); //4.设置map输出的key,value类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); //5.设置最终的key,value类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); //6.设置输入路径和输出路径 //这里设置为自己的文件路径 FileInputFormat.setInputPaths(job,new Path("D:\\zHADOOP\\input\\inputword\\CSWJ.txt")); //在D:\zHADOOP\output\outputword12 的outputword12会自己生成的,如果存在则会报错 FileOutputFormat.setOutputPath(job,new Path("D:\\zHADOOP\\output\\outputword12")); //7.提交job boolean result = job.waitForCompletion(true); System.exit(result ? 0 : 1); }}
示例结果:
六、总结:
Hadoop中的MapReduce是一个功能强大的分布式计算框架,它通过简化的编程接口和高效的容错机制,为用户提供了处理大规模数据集的能力。通过理解MapReduce的工作流程、技术特点和适用场景,可以更好地利用Hadoop进行数据处理和分析。
版权归原作者 鱼晏LH 所有, 如有侵权,请联系我们删除。