1.词频统计任务要求
首先,在Linux系统本地创建两个文件,即文件wordfile1.txt和wordfile2.txt。在实际应用中,这两个文件可能会非常大,会被分布存储到多个节点上。但是,为了简化任务,这里的两个文件只包含几行简单的内容。需要说明的是,针对这两个小数据集样本编写的MapReduce词频统计程序,不作任何修改,就可以用来处理大规模数据集的词频统计。
创建wordfile1.txt文件。
cd ~
vim wordfile1.txt
文件wordfile1.txt的内容如下:
I love Spark
I love Hadoop
创建wordfile2.txt文件。
vim wordfile2.txt
文件wordfile2.txt的内容如下:
Hadoop is good
Spark is fast
假设HDFS中有一个input文件夹,并且文件夹为空,请把文件wordfile1.txt和wordfile2.txt上传到HDFS中的input文件夹下。
首先启动Hadoop,命令如下:
cd /usr/local/hadoop
./sbin/start-dfs.sh
其次,上传上面创建的两个文件夹到HDFS文件系统上。
./bin/hdfs dfs -mkdir -p /user/hadoop/input
./bin/hdfs dfs -put ~/wordfile1.txt /user/hadoop/input/
./bin/hdfs dfs -put ~/wordfile2.txt /user/hadoop/input/
查看文件是否已经上传至HDFS文件系统。
./bin/hdfs dfs -ls /user/hadoop/input/
2.MapReduce程序编写方法
编写MapReduce程序来实现词频统计功能,主要包括以下3个步骤:
●编写Map处理逻辑;
●编写Reduce处理逻辑;
●编写main方法。
(1)编写Map处理逻辑
MapReduce程序包括Map阶段和Reduce阶段。在Map阶段,文件wordfile1.txt和文件wordfile2.txt中的文本数据被读入,以<key,value>的形式提交给Map函数进行处理,其中,key是当前读取到的行的地址偏移量,value是当前读取到的行的内容。<key,value>提交给Map函数以后,就可以运行我们自定义的Map处理逻辑,对value进行处理,然后以特定的键值对的形式进行输出,这个输出将作为中间结果,继续提供给Reduce阶段作为输入数据。以下是Map处理逻辑的具体代码:
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
(2)编写Reduce处理逻辑
Map阶段得到的中间结果,经过Shuffle阶段(分区、排序、合并)以后,分发给对应的Reduce任务去处理。对于Reduce阶段而言,输入是<key,value-list>形式,比如,<’Hadoop’,<1,1>>。Reduce函数就是对输入中的value-list进行求和,得到词频统计结果。下面给出Reduce处理逻辑的具体代码:
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public IntSumReducer() {
}
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
(3)编写main方法
为了让TokenizerMapper类和IntSumReducer类能够协同工作,完成最终的词频统计任务,需要在主函数中通过Job类设置Hadoop程序运行时的环境变量,具体代码如下:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count"); //设置环境参数
job.setJarByClass(WordCount.class); //设置整个程序的类名
job.setMapperClass(WordCount.TokenizerMapper.class); //添加Mapper类
job.setReducerClass(WordCount.IntSumReducer.class); //添加Reducer类
job.setOutputKeyClass(Text.class); //设置输出类型
job.setOutputValueClass(IntWritable.class); //设置输出类型
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i])); //设置输入文件
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));//设置输出文件
System.exit(job.waitForCompletion(true)?0:1);
}
(4)完整的词频统计程序
在编写词频统计Java程序时,需要新建一个名称为WordCount.java的文件,该文件包含了完整的词频统计程序代码。
cd /usr/local/hadoop
vim WordCount.java
具体代码如下:
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public WordCount() {
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = (new GenericOptionsParser(conf, args)).getRemainingArgs();
if(otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(WordCount.TokenizerMapper.class);
job.setCombinerClass(WordCount.IntSumReducer.class);
job.setReducerClass(WordCount.IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for(int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job, new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true)?0:1);
}
public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public TokenizerMapper() {
}
public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while(itr.hasMoreTokens()) {
this.word.set(itr.nextToken());
context.write(this.word, one);
}
}
}
public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public IntSumReducer() {
}
public void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
int sum = 0;
IntWritable val;
for(Iterator i$ = values.iterator(); i$.hasNext(); sum += val.get()) {
val = (IntWritable)i$.next();
}
this.result.set(sum);
context.write(key, this.result);
}
}
}
3.编译打包程序
可以采用两种方式对上面编写的WordCount代码进行编译打包,本次实验使用命令行编译打包词频统计程序。
使用命令行编译打包词频统计程序
当前环境已经安装了Java程序(JDK),因此,这里可以直接用JDK包中的工具对代码进行编译。
首先,请在Linux系统中打开一个终端,把Hadoop的安装目录设置为当前工作目录,命令如下:
cd /usr/local/hadoop
然后,执行如下命令,让java编译程序可以找到Hadoop相关的JAR包:
export CLASSPATH="/usr/local/hadoop/share/hadoop/common/hadoop-common-3.1.3.jar:/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-client-core-3.1.3.jar:/usr/local/hadoop/share/hadoop/common/lib/commons-cli-1.2.jar:$CLASSPATH"
接下来,就可以执行javac命令来编译程序。(创建WordCount.java文件放在“/usr/local/hadoop”目录下)
javac WordCount.java
ls
如果系统环境找不到javac程序的位置,那么请使用JDK中的绝对路径。
编译之后,在文件夹下可以发现有3个“.class”文件,这是Java的可执行文件。此时,我们需要将它们打包并命名为WordCount.jar,命令如下:
jar -cvf WordCount.jar *.class
到这里,我们就得到像Hadoop自带实例一样的jar包了,可以运行得到结果。启动Hadoop之后,我们可以运行程序,命令如下:
./bin/hadoop jar WordCount.jar WordCount input output
最后,可以运行下面命令查看结果:
./bin/hadoop fs -cat output/*
版权归原作者 qq_35193897 所有, 如有侵权,请联系我们删除。