0


MapReduce词频统计(一)

1.词频统计任务要求

首先,在Linux系统本地创建两个文件,即文件wordfile1.txt和wordfile2.txt。在实际应用中,这两个文件可能会非常大,会被分布存储到多个节点上。但是,为了简化任务,这里的两个文件只包含几行简单的内容。需要说明的是,针对这两个小数据集样本编写的MapReduce词频统计程序,不作任何修改,就可以用来处理大规模数据集的词频统计。

创建wordfile1.txt文件。

cd ~
vim wordfile1.txt

文件wordfile1.txt的内容如下:

I love Spark
I love Hadoop

创建wordfile2.txt文件。

vim wordfile2.txt

文件wordfile2.txt的内容如下:

Hadoop is good
Spark is fast

假设HDFS中有一个input文件夹,并且文件夹为空,请把文件wordfile1.txt和wordfile2.txt上传到HDFS中的input文件夹下。

首先启动Hadoop,命令如下:

cd /usr/local/hadoop
./sbin/start-dfs.sh

其次,上传上面创建的两个文件夹到HDFS文件系统上。

./bin/hdfs dfs -mkdir -p /user/hadoop/input
./bin/hdfs dfs -put ~/wordfile1.txt /user/hadoop/input/
./bin/hdfs dfs -put ~/wordfile2.txt /user/hadoop/input/

查看文件是否已经上传至HDFS文件系统。

./bin/hdfs dfs -ls /user/hadoop/input/

2.MapReduce程序编写方法

编写MapReduce程序来实现词频统计功能,主要包括以下3个步骤:

●编写Map处理逻辑;

●编写Reduce处理逻辑;

●编写main方法。

(1)编写Map处理逻辑

MapReduce程序包括Map阶段和Reduce阶段。在Map阶段,文件wordfile1.txt和文件wordfile2.txt中的文本数据被读入,以<key,value>的形式提交给Map函数进行处理,其中,key是当前读取到的行的地址偏移量,value是当前读取到的行的内容。<key,value>提交给Map函数以后,就可以运行我们自定义的Map处理逻辑,对value进行处理,然后以特定的键值对的形式进行输出,这个输出将作为中间结果,继续提供给Reduce阶段作为输入数据。以下是Map处理逻辑的具体代码:

public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text(); 
        public TokenizerMapper() {
        }
        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString()); 
            while(itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one);
            }
        }
    }

(2)编写Reduce处理逻辑

Map阶段得到的中间结果,经过Shuffle阶段(分区、排序、合并)以后,分发给对应的Reduce任务去处理。对于Reduce阶段而言,输入是<key,value-list>形式,比如,<’Hadoop’,<1,1>>。Reduce函数就是对输入中的value-list进行求和,得到词频统计结果。下面给出Reduce处理逻辑的具体代码:

public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable(); 
        public IntSumReducer() {
        }
        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0; 
            IntWritable val;
            for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                val = (IntWritable)i$.next();
            }
            this.result.set(sum);
            context.write(key, this.result);
        }
    }

(3)编写main方法

为了让TokenizerMapper类和IntSumReducer类能够协同工作,完成最终的词频统计任务,需要在主函数中通过Job类设置Hadoop程序运行时的环境变量,具体代码如下:

public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        if(otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");        //设置环境参数
        job.setJarByClass(WordCount.class);                //设置整个程序的类名
        job.setMapperClass(WordCount.TokenizerMapper.class); //添加Mapper类
        job.setReducerClass(WordCount.IntSumReducer.class);  //添加Reducer类
        job.setOutputKeyClass(Text.class);                    //设置输出类型
        job.setOutputValueClass(IntWritable.class);             //设置输出类型 
        for(int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));  //设置输入文件
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
        System.exit(job.waitForCompletion(true)?0:1);
    }

(4)完整的词频统计程序

在编写词频统计Java程序时,需要新建一个名称为WordCount.java的文件,该文件包含了完整的词频统计程序代码。

cd /usr/local/hadoop
vim WordCount.java

具体代码如下:

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
    public WordCount() {
    }
     public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
        if(otherArgs.length < 2) {
            System.err.println("Usage: wordcount <in> [<in>...] <out>");
            System.exit(2);
        }
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(WordCount.TokenizerMapper.class);
        job.setCombinerClass(WordCount.IntSumReducer.class);
        job.setReducerClass(WordCount.IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class); 
        for(int i = 0; i < otherArgs.length - 1; ++i) {
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }
        FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
        System.exit(job.waitForCompletion(true)?0:1);
    }
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();
        public TokenizerMapper() {
        }
        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString()); 
            while(itr.hasMoreTokens()) {
                this.word.set(itr.nextToken());
                context.write(this.word, one);
            }
        }
    }
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public IntSumReducer() {
        }
        public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            IntWritable val;
            for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
                val = (IntWritable)i$.next();
            }
            this.result.set(sum);
            context.write(key, this.result);
        }
    }
}

3.编译打包程序

可以采用两种方式对上面编写的WordCount代码进行编译打包,本次实验使用命令行编译打包词频统计程序。

使用命令行编译打包词频统计程序

当前环境已经安装了Java程序(JDK),因此,这里可以直接用JDK包中的工具对代码进行编译。

首先,请在Linux系统中打开一个终端,把Hadoop的安装目录设置为当前工作目录,命令如下:

cd /usr/local/hadoop

然后,执行如下命令,让java编译程序可以找到Hadoop相关的JAR包:

export CLASSPATH="/usr/local/hadoop/share/hadoop/common/hadoop-common-3.1.3.jar:/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:$CLASSPATH"

接下来,就可以执行javac命令来编译程序。(创建WordCount.java文件放在“/usr/local/hadoop”目录下)

javac WordCount.java
ls

如果系统环境找不到javac程序的位置,那么请使用JDK中的绝对路径。

编译之后,在文件夹下可以发现有3个“.class”文件,这是Java的可执行文件。此时,我们需要将它们打包并命名为WordCount.jar,命令如下:

jar -cvf WordCount.jar *.class

到这里,我们就得到像Hadoop自带实例一样的jar包了,可以运行得到结果。启动Hadoop之后,我们可以运行程序,命令如下:

./bin/hadoop jar WordCount.jar WordCount input output

最后,可以运行下面命令查看结果:

./bin/hadoop fs -cat output/*
标签: hadoop hdfs mapreduce

本文转载自: https://blog.csdn.net/qq_35193897/article/details/130357790
版权归原作者 qq_35193897 所有, 如有侵权,请联系我们删除。

“MapReduce词频统计(一)”的评论:

还没有评论