0


大数据实验 实验五:MapReduce 初级编程实践

大数据实验 实验五:MapReduce 初级编程实践

实验环境

  • 操作系统 centos7
  • Hadoop版本:3,3,0

实验内容与完成情况

(一)编程实现文件合并和去重操作

对于两个输入文件,即文件 A 和文件 B,请编写 MapReduce 程序,对两个文件进行合并,
并剔除其中重复的内容,得到一个新的输出文件 C。下面是输入文件和输出文件的一个样例 供参考。

输入文件 A 的样例如下:
20170101 x
20170102 y
20170103 x
20170104 y
20170105 z
20170106 x

输入文件 B 的样例如下:
20170101 y
20170102 y
20170103 x
20170104 z
20170105 y

根据输入文件 A 和 B 合并得到的输出文件 C 的样例如下:
20170101 x
20170101 y
20170102 y
20170103 x
20170104 y
20170104 z
20170105 y
20170105 z
20170106 x


创建文件A.txt和B.txt
在这里插入图片描述
将两个文件上传到HDFS中

在这里插入图片描述
Java程序

packageMain;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;publicclass main {//重载map函数,直接将输入中的value复制到输出数据的key上publicstaticclassMapextendsMapper<Object,Text,Text,Text>{privatestaticText text =newText();publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{
            text = value;
            context.write(text,newText(""));}}//重载reduce函数,直接将输入中的key复制到输出数据的key上publicstaticclassReduceextendsReducer<Text,Text,Text,Text>{publicvoidreduce(Text key,Iterable<Text> values,Context context)throwsIOException,InterruptedException{
            context.write(key,newText(""));}}publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();
        conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
        conf.set("fs.defaultFS","hdfs://localhost:8020");String[] otherArgs =newString[]{"/input/test","/output/test"};if(otherArgs.length !=2){System.err.println("Usage: wordcount <in><out>");System.exit(2);}Job job =Job.getInstance(conf,"Merge and duplicate removal");
        job.setJarByClass(main.class);
        job.setMapperClass(Map.class);
        job.setCombinerClass(Reduce.class);
        job.setReducerClass(Reduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);FileInputFormat.addInputPath(job,newPath(otherArgs[0]));FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));System.exit(job.waitForCompletion(true)?0:1);}}

运行指令
在这里插入图片描述
查看运行结果

在这里插入图片描述

(二)编程实现对输入文件的排序

现在有多个输入文件,每个文件中的每行内容均为一个整数。要求读取所有文件中的整 数,进行升序排序后,输出到一个新的文件中,输出的数据格式为每行两个整数,第一个数 字为第二个整数的排序位次,第二个整数为原待排列的整数。下面是输入文件和输出文件的
一个样例供参考。

输入文件 1 的样例如下:
33
37
12
40

输入文件 2 的样例如下:
4
16
39
5

输入文件 3 的样例如下:
1
45
25

根据输入文件 1、2 和 3 得到的输出文件如下:
1 1
2 4
3 5
4 12
5 16
6 25
7 33
54
8 37
9 39
10 40
11 45


新建三个文件

在这里插入图片描述
将文件上传到hdfs

在这里插入图片描述
编写java代码

packageMain;importjava.io.IOException;importjava.util.Objects;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Partitioner;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclass main {publicstaticclassMapextendsMapper<Object,Text,IntWritable,IntWritable>{privatestaticIntWritable data =newIntWritable();publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{String text = value.toString();if(!Objects.equals(text,"")){
                data.set(Integer.parseInt(text));
                context.write(data,newIntWritable(1));}}}publicstaticclassReduceextendsReducer<IntWritable,IntWritable,IntWritable,IntWritable>{privatestaticIntWritable line_num =newIntWritable(1);publicvoidreduce(IntWritable key,Iterable<IntWritable> values,Context context)throwsIOException,InterruptedException{for(IntWritable val : values){
                context.write(line_num, key);
                line_num =newIntWritable(line_num.get()+1);}}}publicstaticclassPartitionextendsPartitioner<IntWritable,IntWritable>{publicintgetPartition(IntWritable key,IntWritable value,int num_Partition){intMaxnumber=65223;int bound =Maxnumber/ num_Partition +1;int keynumber = key.get();for(int i =0; i < num_Partition; i++){if(keynumber < bound *(i +1)&& keynumber >= bound * i){return i;}}return-1;}}publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();//conf.set("fs.default.name","hdfs://localhost:9000");
        conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
        conf.set("fs.defaultFS","hdfs://localhost:8020");String[] otherArgs =newString[]{"/input/test","/output/test"};if(otherArgs.length !=2){System.err.println("Usage: wordcount <in><out>");System.exit(2);}Job job =Job.getInstance(conf,"Merge and sort");
        job.setJarByClass(main.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
        job.setPartitionerClass(Partition.class);
        job.setOutputKeyClass(IntWritable.class);
        job.setOutputValueClass(IntWritable.class);FileInputFormat.addInputPath(job,newPath(otherArgs[0]));FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));System.exit(job.waitForCompletion(true)?0:1);}}

运行程序

在这里插入图片描述
运行结果

在这里插入图片描述

(三)对给定的表格进行信息挖掘

下面给出一个 child-parent 的表格,要求挖掘其中的父子辈关系,给出祖孙辈关系的
表格。
输入文件内容如下:
child parent
Steven Lucy
Steven Jack
Jone Lucy
Jone Jack
Lucy Mary
Lucy Frank
Jack Alice
Jack Jesse
David Alice
David Jesse
Philip David
Philip Alma
Mark David
Mark Alma

输出文件内容如下:
grandchild grandparent
Steven Alice
Steven Jesse
Jone Alice
Jone Jesse
Steven Mary
Steven Frank
Jone Mary
Jone Frank
Philip Alice
Philip Jesse
Mark Alice
Mark Jesse


新建数据
在这里插入图片描述

编写程序

packageMain;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.TextInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.BufferedReader;importjava.io.IOException;importjava.io.InputStreamReader;importjava.util.ArrayList;publicclass main {publicstaticclassTokenizerMapperextendsMapper<Object,Text,Text,Text>{publicstaticIntWritable data =newIntWritable();publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{String str = value.toString();if(str !=null&&!str.equals("")&&!str.equals("child parent")){String[] fa = str.split("\t");String son = fa[0], parent = fa[1];
                context.write(newText(parent),newText("son"+ son));
                context.write(newText(son),newText("fa"+ parent));}}}publicstaticclassReduceextendsReducer<Text,Text,Text,Text>{publicstaticboolean flag =false;publicvoidreduce(Text key,Iterable<Text> values,Context context)throwsIOException,InterruptedException{if(!flag){
                context.write(newText("grandson"),newText("grandparent"));
                flag =true;}ArrayList<Text> grandChild =newArrayList<Text>();ArrayList<Text> grandParent =newArrayList<Text>();for(Text val : values){String s = val.toString();if(s.startsWith("son")){
                    grandChild.add(newText(s.substring(3)));}else{
                    grandParent.add(newText(s.substring(2)));}}for(Text gc : grandChild)for(Text gp : grandParent)
                    context.write(gc, gp);}}publicstaticvoidmain(String[] args)throwsException{String[] a ={"a","b"};Configuration conf =newConfiguration();
        conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");//conf.set("fs.defaultFS", "hdfs://localhost:8020");FileSystem fs =FileSystem.get(conf);Job job =Job.getInstance(conf,"merge and duplicate removal");
        job.setJarByClass(main.class);
        job.setMapperClass(TokenizerMapper.class);

        job.setReducerClass(Reduce.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setInputFormatClass(TextInputFormat.class);FileInputFormat.addInputPath(job,newPath("hdfs://localhost:8020/input/test"));FileOutputFormat.setOutputPath(job,newPath("hdfs://localhost:8020/output/test"));System.exit(job.waitForCompletion(true)?0:1);}}

运行程序

在这里插入图片描述

运行结果

在这里插入图片描述

出现的问题

问题一

在将运行的文件达成jar包在centos上运行时

在这里插入图片描述
出现无法运行缺少运行配置

在这里插入图片描述
出现传入参数为空的报错

解决方案

在程序中添加配置
conf.set(“fs.hdfs.impl”,“org.apache.hadoop.hdfs.DistributedFileSystem”);
将 java -jar XXX.jar 改为 hadoop jar xxx.jar 命令执行。
因为我们知道执行Hadoop命令时是会自动加载Hadoop相关jar包及配置的,但确保环境变量已配置生效

这样,通过Hadoop命令去执行,即使不设置fs.hdfs.impl参数也不会报No FileSystem for scheme异常
问题解决

在这里插入图片描述
此处的代码改为

在这里插入图片描述

问题解决

标签: 大数据 mapreduce

本文转载自: https://blog.csdn.net/ADEXOM/article/details/134625254
版权归原作者 ADBOEX 所有, 如有侵权,请联系我们删除。

“大数据实验 实验五:MapReduce 初级编程实践”的评论:

还没有评论