一、引言
在当今大数据时代,数据量呈爆炸式增长,传统的单机处理方式已经难以满足对海量数据进行高效处理的需求。而 Hadoop - MapReduce 作为一种强大的分布式计算框架,应运而生,为大数据处理提供了可靠且高效的解决方案。本文将带大家深入了解 Hadoop - MapReduce 的相关概念、工作原理以及其在实际应用中的重要意义。
二、Hadoop - MapReduce 概述
Hadoop 是一个开源的分布式计算平台,由 Apache 软件基金会开发和维护。MapReduce 是 Hadoop 的核心组件之一,它提供了一种简单而强大的编程模型,用于在大规模集群上并行处理海量数据。
其主要特点包括:
- 分布式处理:能够将大规模的数据分割成多个小块,分布在集群中的不同节点上进行并行处理,充分利用集群的计算资源,大大提高了处理速度。
- 容错性强:通过数据冗余和自动恢复机制,即使在部分节点出现故障的情况下,也能保证整个计算任务的顺利完成,确保了系统的可靠性。
- 可扩展性:可以方便地通过增加节点来扩展集群的计算能力,以适应不断增长的数据量和处理需求。
三、MapReduce 工作原理
Map 阶段- 数据输入:MapReduce 的输入数据通常存储在 Hadoop 分布式文件系统(HDFS)中。在 Map 阶段,框架会将输入数据按照一定的规则(如文件块大小等)划分成多个输入分片(Input Split),每个分片会被分配到集群中的一个节点上进行处理。- 映射操作:在每个节点上,Mapper 函数会对分配到的输入分片进行处理。Mapper 函数会按照用户定义的逻辑,将输入数据中的每条记录转换为一系列的键值对(Key-Value Pairs)。例如,对于一个文本文件处理任务,可能会将每行文本作为一个记录,通过 Mapper 函数提取出其中的关键词作为键,该行文本作为值。
Shuffle 阶段- 数据分区:在 Map 阶段结束后,各个节点上产生的键值对会根据键的值进行分区(Partition)操作。分区的目的是为了将具有相同键或者相近键的键值对发送到同一个 Reduce 节点上进行后续处理,以便于进行聚合等操作。- 数据排序:分区后的键值对还会在每个分区内进行排序操作,使得相同键的键值对能够相邻排列,方便 Reduce 阶段的处理。
Reduce 阶段- 数据输入:经过 Shuffle 阶段的处理,具有相同键的键值对会被发送到同一个 Reduce 节点上。Reduce 节点会接收到来自多个 Map 节点的属于自己负责处理的键值对集合。- 归约操作:在 Reduce 节点上,Reduce 函数会按照用户定义的逻辑,对收到的具有相同键的键值对进行聚合、汇总等归约操作。例如,对于前面提到的文本文件处理任务,Reduce 函数可能会将具有相同关键词的行文本进行合并或者统计出现的次数等操作。- 数据输出:Reduce 阶段处理完成后,会将最终的结果输出到指定的位置,通常也是存储在 HDFS 中。
import java.io.IOException;import java.util.StringTokenizer;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } }}
四、MapReduce 编程示例
**** 1.Mapper 函数实现****
下面以一个简单的单词计数(Word Count)任务为例,展示如何使用 MapReduce 进行编程。
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
while (tokenizer.hasMoreTokens()) {
word.set(tokenizer.nextToken());
context.write(word, one);
}
}
}
在上述代码中,Mapper 函数接收输入的键(偏移量,这里用 LongWritable 表示)和值(一行文本,用 Text 表示),通过 StringTokenizer 将每行文本分割成单词,然后将每个单词作为键,值为 1 的 IntWritable 发送出去。
2.Reducer 函数实现
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
Iterator<IntWritable> iterator = values.iterator();
while (iterator.hasNext()) {
sum += iterator.next().get();
}
context.write(key, new IntWritable(sum));
}
}
Reducer 函数接收具有相同键的键值对集合,将这些值进行累加,得到每个单词出现的次数,然后将单词和其出现次数作为最终结果输出。
3.主程序配置
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputPath;
import org.apache.hadoop.mapreduce.lib.output.FileOutputPath;
public class WordCountMain {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "WordCount");
job.setJarByClass(WordCountMain.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyType(Text.class);
job.setOutputValueType(IntWritable.class);
FileInputPath.setInputPaths(job, new Path(args[0]));
FileOutputPath.setOutputPaths(job, new Path(args[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
}
}
在主程序中,我们配置了 MapReduce 任务的相关参数,如设置 Mapper 和 Reducer 类,指定输入输出的键值类型,以及输入输出文件的路径等。
五、MapReduces实操案例
1.下载安装
2.配置环境
3.列出目录
4.再次配置环境
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>mapreduce</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.30</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
5.创建一个输入输出结果的文件夹路径
6.代码
map代码
package mappreduce.subject_score;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class subject_scoreMap extends Mapper<Object, Text,Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
@Override
protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
String[] tokens = value.toString().split(",");
word.set(tokens[1]);
context.write(word,new IntWritable(Integer.parseInt(tokens[2])));
}
}
reduce代码
package mappreduce.subject_score;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class subject_scoreReduce extends Reducer<Text,IntWritable,Text,IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum=0;
int count=0;
for (IntWritable score : values){
sum += score.get();
count++;
}
int average = (int) ((double) sum/count);
context.write(key,new IntWritable(average));
}
}
driver代码
package mappreduce.subject_score;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class subject_scoreDriver {
public static void main(String[] args) throws IOException,InterruptedException,ClassNotFoundException{
//1.获取job
Configuration conf=new Configuration();
Job job= Job.getInstance(conf,"average score by subject");
//2.设置jar路径
job.setJarByClass(subject_scoreDriver.class);
//3.关联mapper和reduce
job.setMapperClass(subject_scoreMap.class);
job.setReducerClass(subject_scoreReduce.class);
//4.
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//5.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//6.
FileInputFormat.setInputPaths(job,new Path("E:\\rainbow\\Java\\Hadoop\\mapreduce\\subject_score\\input"));
FileOutputFormat.setOutputPath(job,new Path("E:\\rainbow\\Java\\Hadoop\\mapreduce\\subject_score\\ouput\\output1"));
//7.
boolean result=job.waitForCompletion(true);
System.exit(result ? 0:1);
}
}
7.输出结果
六、Hadoop - MapReduce 的应用场景
- 日志分析:在互联网公司中,每天会产生大量的服务器日志数据,通过 MapReduce 可以高效地对这些日志进行分析,如统计访问次数、用户行为分析等。
- 数据挖掘:用于大规模数据集的挖掘任务,如关联规则挖掘、分类算法应用等。通过 MapReduce 可以在分布式环境下对海量数据进行挖掘操作,提高挖掘效率。
- 搜索引擎:在搜索引擎的索引构建和网页排名等过程中,MapReduce 发挥着重要作用。它可以快速处理海量的网页内容,提取关键信息并进行相关计算。
七、总结
Hadoop - MapReduce 分布式计算框架为大数据处理提供了一种高效、可靠且可扩展的解决方案。通过其独特的 Map、Shuffle 和 Reduce 阶段的工作原理,能够轻松应对海量数据的并行处理任务。并且,其简单的编程模型使得开发人员能够相对容易地编写分布式应用程序来处理各种大数据相关的问题。随着大数据时代的不断发展,Hadoop - MapReduce 将继续在众多领域发挥重要作用,为数据处理和分析提供强有力的支持。
希望通过本文的介绍,大家对 Hadoop - MapReduce 有了更深入的理解,能够在实际的大数据处理项目中更好地运用它。
版权归原作者 周彩虹20230322035 所有, 如有侵权,请联系我们删除。