0


实验5 MapReduce初级编程实践(2)——编写程序实现对输入文件的排序

一、实验目的

  1. 通过实验掌握基本的MapReduce编程方法;
  2. 掌握用MapReduce解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。

二、实验平台

  1. 操作系统:Linux(建议Ubuntu16.04或Ubuntu18.04)
  2. Hadoop版本:3.1.3

三、实验内容

编写程序实现对输入文件的排序

现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的一个样例供参考。

输入文件 1 的样例如下:

33371240

输入文件 2 的样例如下:

416395

输入文件 3 的样例如下:

14525

根据输入文件 1、2 和 3 得到的输出文件C的样例如下:

11243541251662573383793910401145

四、实验步骤

进入 Hadoop 安装目录,启动 hadoop:

cd /usr/local/hadoop
sbin/start-dfs.sh

新建文件夹,创建文件 1、2 和 3:

sudomkdir Pritice2 &&cd Pritice2
sudovim1sudovim2sudovim3

编写 Java 文件实现 MapReduce:

sudovim MergeSort.java

实现的 Java 代码如下:

importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Partitioner;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclassMergeSort{/**
     * @param args
     * 输入多个文件,每个文件中的每行内容均为一个整数
     * 输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数字为第二个整数的排序位次,第二个整数为原待排列的整数
     *///map函数读取输入中的value,将其转化成IntWritable类型,最后作为输出keypublicstaticclassMapextendsMapper<Object,Text,IntWritable,IntWritable>{privatestaticIntWritable data =newIntWritable();publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{String text = value.toString();
            data.set(Integer.parseInt(text));
            context.write(data,newIntWritable(1));}}//reduce函数将map输入的key复制到输出的value上,然后根据输入的value-list中元素的个数决定key的输出次数,定义一个全局变量line_num来代表key的位次publicstaticclassReduceextendsReducer<IntWritable,IntWritable,IntWritable,IntWritable>{privatestaticIntWritable line_num =newIntWritable(1);publicvoidreduce(IntWritable key,Iterable<IntWritable> values,Context context)throwsIOException,InterruptedException{for(IntWritable val : values){
                context.write(line_num, key);
                line_num =newIntWritable(line_num.get()+1);}}}//自定义Partition函数,此函数根据输入数据的最大值和MapReduce框架中Partition的数量获取将输入数据按照大小分块的边界,然后根据输入数值和边界的关系返回对应的Partiton IDpublicstaticclassPartitionextendsPartitioner<IntWritable,IntWritable>{publicintgetPartition(IntWritable key,IntWritable value,int num_Partition){intMaxnumber=65223;//int型的最大数值int bound =Maxnumber/num_Partition+1;int keynumber = key.get();for(int i =0; i<num_Partition; i++){if(keynumber<bound *(i+1)&& keynumber>=bound * i){return i;}}return-1;}}publicstaticvoidmain(String[] args)throwsException{// TODO Auto-generated method stubConfiguration conf =newConfiguration();
        conf.set("fs.default.name","hdfs://localhost:9000");String[] otherArgs =newString[]{"input","output"};/* 直接设置输入参数 */if(otherArgs.length !=2){System.err.println("Usage: wordcount <in><out>");System.exit(2);}Job job =Job.getInstance(conf,"Merge and sort");
        job.setJarByClass(MergeSort.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setPartitionerClass(Partition.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job,newPath(otherArgs[0]));FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));System.exit(job.waitForCompletion(true)?0:1);}}

赋予用户相关权限:

sudochown -R hadoop /usr/local/hadoop

添加编译所需要使用的 jar 包:

vim ~/.bashrc

添加下面一行到文件的最后:

exportHADOOP_HOME=/usr/local/hadoop
exportCLASSPATH=$($HADOOP_HOME/bin/hadoop classpath):$CLASSPATH

使更改立即生效:

source ~/.bashrc

编译 MergeSort.java:

javac MergeSort.java

打包生成的 class 文件为 jar 包:

jar -cvf MergeSort.jar *.class

创建 Hadoop 主目录为 /user/hadoop 并创建 input 文件夹:

/usr/local/hadoop/bin/hdfs dfs -mkdir -p /user/hadoop
/usr/local/hadoop/bin/hdfs dfs -mkdir input

若 intput 已存在则删除原有文件:

/usr/local/hadoop/bin/hdfs dfs -rm input/*

上传 1、2 和 3 文件到 input 文件夹中:

/usr/local/hadoop/bin/hdfs dfs -put ./1 input
/usr/local/hadoop/bin/hdfs dfs -put ./2 input
/usr/local/hadoop/bin/hdfs dfs -put ./3 input

使用之前确保 output 文件夹不存在:

/usr/local/hadoop/bin/hdfs dfs -rm -r output

使用我们刚生成的 Merge.jar 包:

/usr/local/hadoop/bin/hadoop jar MergeSort.jar MergeSort

查看输出结果:

/usr/local/hadoop/bin/hdfs dfs -cat output/*

输出如下:

hadoop@fzqs-Laptop:/usr/local/hadoop$ hdfs dfs -cat output/*
11243541251662573383793910401145
hadoop@fzqs-Laptop:/usr/local/hadoop$ 

此外,有想用 Python 写的可以参考我这篇博客:实验5 MapReduce初级编程实践(Python实现)


本文转载自: https://blog.csdn.net/weixin_46584887/article/details/121517400
版权归原作者 Z.Q.Feng 所有, 如有侵权,请联系我们删除。

“实验5 MapReduce初级编程实践(2)——编写程序实现对输入文件的排序”的评论:

还没有评论