0


MapReduce词频统计(一)

1.词频统计任务要求

首先,在Linux系统本地创建两个文件,即文件wordfile1.txt和wordfile2.txt。在实际应用中,这两个文件可能会非常大,会被分布存储到多个节点上。但是,为了简化任务,这里的两个文件只包含几行简单的内容。需要说明的是,针对这两个小数据集样本编写的MapReduce词频统计程序,不作任何修改,就可以用来处理大规模数据集的词频统计。

创建wordfile1.txt文件。

  1. cd ~
  2. vim wordfile1.txt

文件wordfile1.txt的内容如下:

  1. I love Spark
  2. I love Hadoop

创建wordfile2.txt文件。

  1. vim wordfile2.txt

文件wordfile2.txt的内容如下:

  1. Hadoop is good
  2. Spark is fast

假设HDFS中有一个input文件夹,并且文件夹为空,请把文件wordfile1.txt和wordfile2.txt上传到HDFS中的input文件夹下。

首先启动Hadoop,命令如下:

  1. cd /usr/local/hadoop
  2. ./sbin/start-dfs.sh

其次,上传上面创建的两个文件夹到HDFS文件系统上。

  1. ./bin/hdfs dfs -mkdir -p /user/hadoop/input
  2. ./bin/hdfs dfs -put ~/wordfile1.txt /user/hadoop/input/
  3. ./bin/hdfs dfs -put ~/wordfile2.txt /user/hadoop/input/

查看文件是否已经上传至HDFS文件系统。

  1. ./bin/hdfs dfs -ls /user/hadoop/input/

2.MapReduce程序编写方法

编写MapReduce程序来实现词频统计功能,主要包括以下3个步骤:

●编写Map处理逻辑;

●编写Reduce处理逻辑;

●编写main方法。

(1)编写Map处理逻辑

MapReduce程序包括Map阶段和Reduce阶段。在Map阶段,文件wordfile1.txt和文件wordfile2.txt中的文本数据被读入,以<key,value>的形式提交给Map函数进行处理,其中,key是当前读取到的行的地址偏移量,value是当前读取到的行的内容。<key,value>提交给Map函数以后,就可以运行我们自定义的Map处理逻辑,对value进行处理,然后以特定的键值对的形式进行输出,这个输出将作为中间结果,继续提供给Reduce阶段作为输入数据。以下是Map处理逻辑的具体代码:

  1. public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  2. private static final IntWritable one = new IntWritable(1);
  3. private Text word = new Text();
  4. public TokenizerMapper() {
  5. }
  6. public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  7. StringTokenizer itr = new StringTokenizer(value.toString());
  8. while(itr.hasMoreTokens()) {
  9. this.word.set(itr.nextToken());
  10. context.write(this.word, one);
  11. }
  12. }
  13. }

(2)编写Reduce处理逻辑

Map阶段得到的中间结果,经过Shuffle阶段(分区、排序、合并)以后,分发给对应的Reduce任务去处理。对于Reduce阶段而言,输入是<key,value-list>形式,比如,<’Hadoop’,<1,1>>。Reduce函数就是对输入中的value-list进行求和,得到词频统计结果。下面给出Reduce处理逻辑的具体代码:

  1. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  2. private IntWritable result = new IntWritable();
  3. public IntSumReducer() {
  4. }
  5. public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  6. int sum = 0;
  7. IntWritable val;
  8. for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
  9. val = (IntWritable)i$.next();
  10. }
  11. this.result.set(sum);
  12. context.write(key, this.result);
  13. }
  14. }

(3)编写main方法

为了让TokenizerMapper类和IntSumReducer类能够协同工作,完成最终的词频统计任务,需要在主函数中通过Job类设置Hadoop程序运行时的环境变量,具体代码如下:

  1. public static void main(String[] args) throws Exception {
  2. Configuration conf = new Configuration();
  3. String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
  4. if(otherArgs.length < 2) {
  5. System.err.println("Usage: wordcount <in> [<in>...] <out>");
  6. System.exit(2);
  7. }
  8. Job job = Job.getInstance(conf, "word count"); //设置环境参数
  9. job.setJarByClass(WordCount.class); //设置整个程序的类名
  10. job.setMapperClass(WordCount.TokenizerMapper.class); //添加Mapper类
  11. job.setReducerClass(WordCount.IntSumReducer.class); //添加Reducer类
  12. job.setOutputKeyClass(Text.class); //设置输出类型
  13. job.setOutputValueClass(IntWritable.class); //设置输出类型
  14. for(int i = 0; i < otherArgs.length - 1; ++i) {
  15. FileInputFormat.addInputPath(job, new Path(otherArgs[i])); //设置输入文件
  16. }
  17. FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
  18. System.exit(job.waitForCompletion(true)?0:1);
  19. }

(4)完整的词频统计程序

在编写词频统计Java程序时,需要新建一个名称为WordCount.java的文件,该文件包含了完整的词频统计程序代码。

  1. cd /usr/local/hadoop
  2. vim WordCount.java

具体代码如下:

  1. import java.io.IOException;
  2. import java.util.Iterator;
  3. import java.util.StringTokenizer;
  4. import org.apache.hadoop.conf.Configuration;
  5. import org.apache.hadoop.fs.Path;
  6. import org.apache.hadoop.io.IntWritable;
  7. import org.apache.hadoop.io.Text;
  8. import org.apache.hadoop.mapreduce.Job;
  9. import org.apache.hadoop.mapreduce.Mapper;
  10. import org.apache.hadoop.mapreduce.Reducer;
  11. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  12. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  13. import org.apache.hadoop.util.GenericOptionsParser;
  14. public class WordCount {
  15. public WordCount() {
  16. }
  17. public static void main(String[] args) throws Exception {
  18. Configuration conf = new Configuration();
  19. String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
  20. if(otherArgs.length < 2) {
  21. System.err.println("Usage: wordcount <in> [<in>...] <out>");
  22. System.exit(2);
  23. }
  24. Job job = Job.getInstance(conf, "word count");
  25. job.setJarByClass(WordCount.class);
  26. job.setMapperClass(WordCount.TokenizerMapper.class);
  27. job.setCombinerClass(WordCount.IntSumReducer.class);
  28. job.setReducerClass(WordCount.IntSumReducer.class);
  29. job.setOutputKeyClass(Text.class);
  30. job.setOutputValueClass(IntWritable.class);
  31. for(int i = 0; i < otherArgs.length - 1; ++i) {
  32. FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
  33. }
  34. FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
  35. System.exit(job.waitForCompletion(true)?0:1);
  36. }
  37. public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
  38. private static final IntWritable one = new IntWritable(1);
  39. private Text word = new Text();
  40. public TokenizerMapper() {
  41. }
  42. public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  43. StringTokenizer itr = new StringTokenizer(value.toString());
  44. while(itr.hasMoreTokens()) {
  45. this.word.set(itr.nextToken());
  46. context.write(this.word, one);
  47. }
  48. }
  49. }
  50. public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
  51. private IntWritable result = new IntWritable();
  52. public IntSumReducer() {
  53. }
  54. public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
  55. int sum = 0;
  56. IntWritable val;
  57. for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
  58. val = (IntWritable)i$.next();
  59. }
  60. this.result.set(sum);
  61. context.write(key, this.result);
  62. }
  63. }
  64. }

3.编译打包程序

可以采用两种方式对上面编写的WordCount代码进行编译打包,本次实验使用命令行编译打包词频统计程序。

使用命令行编译打包词频统计程序

当前环境已经安装了Java程序(JDK),因此,这里可以直接用JDK包中的工具对代码进行编译。

首先,请在Linux系统中打开一个终端,把Hadoop的安装目录设置为当前工作目录,命令如下:

  1. cd /usr/local/hadoop

然后,执行如下命令,让java编译程序可以找到Hadoop相关的JAR包:

  1. export CLASSPATH="/usr/local/hadoop/share/hadoop/common/hadoop-common-3.1.3.jar:/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:$CLASSPATH"

接下来,就可以执行javac命令来编译程序。(创建WordCount.java文件放在“/usr/local/hadoop”目录下)

  1. javac WordCount.java
  2. ls

如果系统环境找不到javac程序的位置,那么请使用JDK中的绝对路径。

编译之后,在文件夹下可以发现有3个“.class”文件,这是Java的可执行文件。此时,我们需要将它们打包并命名为WordCount.jar,命令如下:

  1. jar -cvf WordCount.jar *.class

到这里,我们就得到像Hadoop自带实例一样的jar包了,可以运行得到结果。启动Hadoop之后,我们可以运行程序,命令如下:

  1. ./bin/hadoop jar WordCount.jar WordCount input output

最后,可以运行下面命令查看结果:

  1. ./bin/hadoop fs -cat output/*
标签: hadoop hdfs mapreduce

本文转载自: https://blog.csdn.net/qq_35193897/article/details/130357790
版权归原作者 qq_35193897 所有, 如有侵权,请联系我们删除。

“MapReduce词频统计(一)”的评论:

还没有评论