MapReduce 处理阶段
MapReduce 框架通常由三个阶段组成:
- Map:读取文件数据,按照规则对文本进行拆分,生成 KV 形式的数据。
- Shuffle:工作节点根据输出键(由 map 函数生成)重新分配数据,对数据排序、分组、拷贝,目的是属于一个键的所有数据都位于同一个工作节点上。
- Reduce:工作节点并行处理每个键的一组数据,对结果进行汇总。
下图把 MapReduce 的过程分为两个部分,而实际上从两边的 Map 和 Reduce 到中间的那一大块都属于 Shuffle 过程,也就是说,Shuffle 过程有一部分是在 Map 端,有一部分是在 Reduce 端。
案例统计单词出现的次数
数据
hello hbase
hello hadoop
hello hive
hello kubernetes
hello java
原理图
wordcount 代码实现
用户编写的 MapReduce 程序分成三个部分:Mapper,Reducer,Driver:
- 用户自定义 Mapper 类继承 Mapper 类,实现 map() 方法,输出和输出的数据都是
<K,V>
对形式,<K,V>
类型可以根据实际情况自定义。MapTask 进程对每一个<K,V>
调用一次。 - 用户自定义 Reduce 类继承 Reduce 类,实现 reduce() 方法,输出和输出的数据都是
<K,V>
对形式,<K,V>
类型可以根据实际情况自定义。Reducetask 进程对每一组相同 K 的<K,V>
组调用一次 reduce() 方法。 - 整个 MapReduce 程序需要一个 Drvier 类来进行提交,提交的是一个描述了各种必要信息的 Job 对象。
在 HDFS 中创建目录并上传文件
在 HDFS 中创建一个目录
hadoop fs -mkdir /wcinput
#将本机 words.txt 文件上传到 HDFS 的 /wcinput 目录中
hadoop fs -put /words.txt /wcinput
引入pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>hadoopmapreduce</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>2.12.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.3</version>
</dependency>
</dependencies>
</project>
编写 Mapper 类
编写一个 WordMapper 类继承 Mapper 类,并重写 map() 方法。Mapper 类是一个泛型类,4 个泛型类型分别代表(KeyIn,ValueIn,KeyOut,ValueOut )。泛型的类型可以根据自己实际的场景来指定。在 wordcount 这个例子中指定的类型如下:
- KeyIn(输入的键):LongWritable 类型,表示每行文字的起始位置(偏移量)
- ValueIn(输入的值):Text 类型,表示每行的文本。
- KeyOut(输出的键):Text 类型,表示每个单词。
- ValueOut(输出的值为):LongWritable 类型,表示单词出现的次数(1次)
Mapper 阶段依次读取每一行的数据,每行按照空格拆分出单词,得到 <单词,1> 的键值对,键是单词,值是 1,之后 Reduce 阶段累计单词出现的次数就累加 1 即可。
Mapper 阶段代码如下:
package words;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @description Map 阶段,分别计算每行每个单词出现的次数,key 是单词,value 为 1(表示 1 个单词)。
*/
public class WordMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、切分单词
String[] words = value.toString().split(" ");
//2、单词转换 单词 -> <单词,1>
for (String word : words) {
//3、写入到上下文
context.write(new Text(word),new LongWritable(1));
}
}
}
编写 Reduce 类
编写一个类 WordReducer 继承 Reducer 类,并重写 reduce() 方法。Reducer 类是也是一个泛型类,4 个泛型类型分别代表(KeyIn,ValueIn,KeyOut,ValueOut )泛型的类型可以根据自己实际的场景来指定。在 这个例子中我们指定的类型如下:
- KeyIn(输入的键):Text 类型,表示每个单词
- ValueIn(输入的值):LongWritable 类型,表示单词出现的次数(1次)
- KeyOut(输出的键):Text 类型,表示每个单词
- ValueOut(输出的值为):LongWritable 类型,表示单词出现的总数
Reduce 阶段接收到数据键是单词,值是一个可迭代的对象,是相同单词对应的次数(每个都是 1),只需要把这些 1 累加起来,就可以得到单词出现的总数了。
执行完后达到的效果
<hello,[1,1,1,1,1]>
Reduce 阶段代码如下:
package words;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @description Reduce 阶段,把 key 相同的数据进行累计,得到每个单词出现的次数
*/
public class WordReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
//1、定义一个变量
long count = 0;
//2、迭代
for (LongWritable value : values) {
count += value.get();
}
//3、写入上下文
context.write(key,new LongWritable(count));
}
}
编写 Driver 类
创建提交给 YARN 集群运行的 Job 对象,封装了 MapReduce 程序运行所需要的相关参数,例如输入数据路径,输出数据路径,Mapper 参数,Reduce 参数
package words;
import count.CharCountDriver;
import count.CharCountMapper;
import count.CharCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
public class JobMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//一、初始化Job
Configuration configuration = new Configuration();
//获取运行命令的参数,参数一:输入文件路径,参数二:输出文件路径
//如果输入路径是一个文件,那么只处理这个文件,如果指定的路径是目录,则处理这个目录下的所有文件
//输出路径只能是不存在的目录名
String [] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs();
if(otherArgs.length < 2){
System.err.println("必须提供输入文件路径和输出文件路径");
System.exit(2);
}
Job job = Job.getInstance(configuration, "mr");
job.setJarByClass(JobMain.class);
//二、设置Job的相关信息 8个小步骤
//1、设置输入路径
job.setInputFormatClass(TextInputFormat.class);
//本地运行
//TextInputFormat.addInputPath(job,new Path("/tmp/input/mr1.txt"));
TextInputFormat.addInputPath(job,new Path(args[0]));
//2、设置Mapper类型,并设置输出键和输出值
job.setMapperClass(WordMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//shuffle阶段,使用默认的
//3、设置Reducer类型,并设置输出键和输出值
job.setReducerClass(WordReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//4、设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
//本地运行
//TextOutputFormat.setOutputPath(job,new Path("/tmp/output/mr"));
TextOutputFormat.setOutputPath(job,new Path(args[1]));
//三、等待完成
boolean isfinish = job.waitForCompletion(true);
System.out.println(isfinish ==true?"MapReduce 任务执行成功!":"MapReduce 任务执行失败!");
System.exit(isfinish ? 0 : 1);
}
}
idea中打包成jar包
上传JAR包到服务器上
注意:如果输入路径是一个文件,那么只处理这个文件,如果指定的路径是目录,则处理这个目录下的所有文件
执行 hadoop jar
hadoop jar /hadoopmapreduce-1.0-SNAPSHOT.jar danci.JobMain /wcinput /my_wcoutput
查看结果
版权归原作者 薛定谔的猫1981 所有, 如有侵权,请联系我们删除。