0


Hadoop(九)MapReduce 案例2

MapReduce 处理阶段

MapReduce 框架通常由三个阶段组成:

  • Map:读取文件数据,按照规则对文本进行拆分,生成 KV 形式的数据。
  • Shuffle:工作节点根据输出键(由 map 函数生成)重新分配数据,对数据排序、分组、拷贝,目的是属于一个键的所有数据都位于同一个工作节点上。
  • Reduce:工作节点并行处理每个键的一组数据,对结果进行汇总。

下图把 MapReduce 的过程分为两个部分,而实际上从两边的 Map 和 Reduce 到中间的那一大块都属于 Shuffle 过程,也就是说,Shuffle 过程有一部分是在 Map 端,有一部分是在 Reduce 端。

案例统计单词出现的次数

数据

  1. hello hbase
  2. hello hadoop
  3. hello hive
  4. hello kubernetes
  5. hello java

原理图

wordcount 代码实现

用户编写的 MapReduce 程序分成三个部分:Mapper,Reducer,Driver:

  • 用户自定义 Mapper 类继承 Mapper 类,实现 map() 方法,输出和输出的数据都是 <K,V> 对形式,<K,V> 类型可以根据实际情况自定义。MapTask 进程对每一个 <K,V> 调用一次。
  • 用户自定义 Reduce 类继承 Reduce 类,实现 reduce() 方法,输出和输出的数据都是 <K,V> 对形式,<K,V> 类型可以根据实际情况自定义。Reducetask 进程对每一组相同 K 的 <K,V> 组调用一次 reduce() 方法。
  • 整个 MapReduce 程序需要一个 Drvier 类来进行提交,提交的是一个描述了各种必要信息的 Job 对象。

在 HDFS 中创建目录并上传文件

  1. HDFS 中创建一个目录
  2. hadoop fs -mkdir /wcinput
  3. #将本机 words.txt 文件上传到 HDFS 的 /wcinput 目录中
  4. hadoop fs -put /words.txt /wcinput

引入pom文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>hadoopmapreduce</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <dependencies>
  10. <dependency>
  11. <groupId>org.apache.logging.log4j</groupId>
  12. <artifactId>log4j-slf4j-impl</artifactId>
  13. <version>2.12.1</version>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.apache.hadoop</groupId>
  17. <artifactId>hadoop-common</artifactId>
  18. <version>3.1.3</version>
  19. </dependency>
  20. <dependency>
  21. <groupId>org.apache.hadoop</groupId>
  22. <artifactId>hadoop-client</artifactId>
  23. <version>3.1.3</version>
  24. </dependency>
  25. <dependency>
  26. <groupId>org.apache.hadoop</groupId>
  27. <artifactId>hadoop-hdfs</artifactId>
  28. <version>3.1.3</version>
  29. </dependency>
  30. </dependencies>
  31. </project>

编写 Mapper 类

编写一个 WordMapper 类继承 Mapper 类,并重写 map() 方法。Mapper 类是一个泛型类,4 个泛型类型分别代表(KeyIn,ValueIn,KeyOut,ValueOut )。泛型的类型可以根据自己实际的场景来指定。在 wordcount 这个例子中指定的类型如下:

  • KeyIn(输入的键):LongWritable 类型,表示每行文字的起始位置(偏移量)
  • ValueIn(输入的值):Text 类型,表示每行的文本。
  • KeyOut(输出的键):Text 类型,表示每个单词。
  • ValueOut(输出的值为):LongWritable 类型,表示单词出现的次数(1次)

Mapper 阶段依次读取每一行的数据,每行按照空格拆分出单词,得到 <单词,1> 的键值对,键是单词,值是 1,之后 Reduce 阶段累计单词出现的次数就累加 1 即可。

Mapper 阶段代码如下:

  1. package words;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Mapper;
  5. import java.io.IOException;
  6. /**
  7. * @description Map 阶段,分别计算每行每个单词出现的次数,key 是单词,value 为 1(表示 1 个单词)。
  8. */
  9. public class WordMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
  10. @Override
  11. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  12. //1、切分单词
  13. String[] words = value.toString().split(" ");
  14. //2、单词转换 单词 -> <单词,1>
  15. for (String word : words) {
  16. //3、写入到上下文
  17. context.write(new Text(word),new LongWritable(1));
  18. }
  19. }
  20. }

编写 Reduce 类

编写一个类 WordReducer 继承 Reducer 类,并重写 reduce() 方法。Reducer 类是也是一个泛型类,4 个泛型类型分别代表(KeyIn,ValueIn,KeyOut,ValueOut )泛型的类型可以根据自己实际的场景来指定。在 这个例子中我们指定的类型如下:

  • KeyIn(输入的键):Text 类型,表示每个单词
  • ValueIn(输入的值):LongWritable 类型,表示单词出现的次数(1次)
  • KeyOut(输出的键):Text 类型,表示每个单词
  • ValueOut(输出的值为):LongWritable 类型,表示单词出现的总数

Reduce 阶段接收到数据键是单词,值是一个可迭代的对象,是相同单词对应的次数(每个都是 1),只需要把这些 1 累加起来,就可以得到单词出现的总数了。

执行完后达到的效果

  1. <hello,[1,1,1,1,1]>

Reduce 阶段代码如下:

  1. package words;
  2. import org.apache.hadoop.io.LongWritable;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Reducer;
  5. import java.io.IOException;
  6. /**
  7. * @description Reduce 阶段,把 key 相同的数据进行累计,得到每个单词出现的次数
  8. */
  9. public class WordReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
  10. @Override
  11. protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
  12. //1、定义一个变量
  13. long count = 0;
  14. //2、迭代
  15. for (LongWritable value : values) {
  16. count += value.get();
  17. }
  18. //3、写入上下文
  19. context.write(key,new LongWritable(count));
  20. }
  21. }

编写 Driver 类

创建提交给 YARN 集群运行的 Job 对象,封装了 MapReduce 程序运行所需要的相关参数,例如输入数据路径,输出数据路径,Mapper 参数,Reduce 参数

  1. package words;
  2. import count.CharCountDriver;
  3. import count.CharCountMapper;
  4. import count.CharCountReducer;
  5. import org.apache.hadoop.conf.Configuration;
  6. import org.apache.hadoop.fs.Path;
  7. import org.apache.hadoop.io.LongWritable;
  8. import org.apache.hadoop.io.Text;
  9. import org.apache.hadoop.mapreduce.Job;
  10. import org.apache.hadoop.mapreduce.Mapper;
  11. import org.apache.hadoop.mapreduce.Reducer;
  12. import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
  13. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
  14. import org.apache.hadoop.util.GenericOptionsParser;
  15. import java.io.IOException;
  16. public class JobMain {
  17. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  18. //一、初始化Job
  19. Configuration configuration = new Configuration();
  20. //获取运行命令的参数,参数一:输入文件路径,参数二:输出文件路径
  21. //如果输入路径是一个文件,那么只处理这个文件,如果指定的路径是目录,则处理这个目录下的所有文件
  22. //输出路径只能是不存在的目录名
  23. String [] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs();
  24. if(otherArgs.length < 2){
  25. System.err.println("必须提供输入文件路径和输出文件路径");
  26. System.exit(2);
  27. }
  28. Job job = Job.getInstance(configuration, "mr");
  29. job.setJarByClass(JobMain.class);
  30. //二、设置Job的相关信息 8个小步骤
  31. //1、设置输入路径
  32. job.setInputFormatClass(TextInputFormat.class);
  33. //本地运行
  34. //TextInputFormat.addInputPath(job,new Path("/tmp/input/mr1.txt"));
  35. TextInputFormat.addInputPath(job,new Path(args[0]));
  36. //2、设置Mapper类型,并设置输出键和输出值
  37. job.setMapperClass(WordMapper.class);
  38. job.setMapOutputKeyClass(Text.class);
  39. job.setMapOutputValueClass(LongWritable.class);
  40. //shuffle阶段,使用默认的
  41. //3、设置Reducer类型,并设置输出键和输出值
  42. job.setReducerClass(WordReducer.class);
  43. job.setOutputKeyClass(Text.class);
  44. job.setOutputValueClass(LongWritable.class);
  45. //4、设置输出路径
  46. job.setOutputFormatClass(TextOutputFormat.class);
  47. //本地运行
  48. //TextOutputFormat.setOutputPath(job,new Path("/tmp/output/mr"));
  49. TextOutputFormat.setOutputPath(job,new Path(args[1]));
  50. //三、等待完成
  51. boolean isfinish = job.waitForCompletion(true);
  52. System.out.println(isfinish ==true?"MapReduce 任务执行成功!":"MapReduce 任务执行失败!");
  53. System.exit(isfinish ? 0 : 1);
  54. }
  55. }

idea中打包成jar包

上传JAR包到服务器上

注意:如果输入路径是一个文件,那么只处理这个文件,如果指定的路径是目录,则处理这个目录下的所有文件

执行 hadoop jar

  1. hadoop jar /hadoopmapreduce-1.0-SNAPSHOT.jar danci.JobMain /wcinput /my_wcoutput

查看结果


本文转载自: https://blog.csdn.net/sadfasdfsafadsa/article/details/141276718
版权归原作者 薛定谔的猫1981 所有, 如有侵权,请联系我们删除。

“Hadoop(九)MapReduce 案例2”的评论:

还没有评论