文章目录
📚实验目的
1)通过实验掌握基本的 MapReduce 编程方法。
2)掌握用 MapReduce 解决一些常见的数据处理问题,包括数据去重、数据排序和数据挖掘等。
📚实验平台
1)操作系统:Linux;
2)Hadoop 版本:3.2.2;
📚实验内容
🐇编程实现文件的合并和去重
packagehdfs;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclassMerge{//这段代码输出的结果是原始输入数据的全部内容作为key,value为空的键值对。publicstaticclassMapextendsMapper<Object,Text,Text,Text>{//在Map类中定义了一个静态变量text,并将其类型设置为Text。privatestaticText text =newText();//map方法中的参数分别表示输入数据的键、值和上下文对象//上下文对象可以用于向输出写入数据。publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{//将输入的value数据赋值给text变量
text = value;//text作为key,一个空的Text对象作为value输出。
context.write(text,newText(""));}}//简单地将Mapper输出的所有键值对的key提取出来作为Reducer的输出。 publicstaticclassReduceextendsReducer<Text,Text,Text,Text>{// reduce方法中的参数分别表示输入数据的键、值集合和上下文对象//上下文对象可以用于向输出写入数据。publicvoidreduce(Text key,Iterable<Text> values,Context context )throwsIOException,InterruptedException{//将输入的key作为key,一个空的Text对象作为value输出
context.write(key,newText(""));//因为这里没有对values集合进行处理//所以values中的数据会被忽略掉,只有输入的key被输出。}}publicstaticvoidmain(String[] args)throwsException{//首先创建一个Configuration对象,用于存储Hadoop集群中的一些配置信息。Configuration conf =newConfiguration();//设置Hadoop集群的默认文件系统为hdfs://localhost:9000。
conf.set("fs.default.name","hdfs://localhost:9000");//接着检查输入参数是否正确//需要传入两个参数,第一个是输入数据路径,第二个是输出结果路径。String[] otherArgs =newString[]{"input","output"};if(otherArgs.length !=2){//如果参数不满足要求,则输出错误提示并退出程序。System.err.println("Usage: wordcount <in><out>");System.exit(2);}//创建一个Job对象,使用"Merge and duplicate removal"作为任务名称。Job job =Job.getInstance(conf,"Merge and duplicate removal");//使用Merge类的class对象来设置job所在的jar包。
job.setJarByClass(Merge.class);//设置Map类作为Mapper
job.setMapperClass(Map.class);//设置Reduce类为Combiner和Reducer。
job.setCombinerClass(Reduce.class);
job.setReducerClass(Reduce.class);//设置输入数据和输出结果的键值类型。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);//设置输入数据路径。FileInputFormat.addInputPath(job,newPath(otherArgs[0]));//设置输出结果路径FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));//提交任务并等待任务执行完成,根据执行状态返回0或1表示任务执行成功或失败。System.exit(job.waitForCompletion(true)?0:1);}}
🐇编程实现对输入文件的排序
packagehdfs;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Partitioner;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclassMergeSort{//这段代码将输入的文本数据中的每一个整数作为键,对应的出现次数设置为1作为值publicstaticclassMapextendsMapper<Object,Text,IntWritable,IntWritable>{privatestaticIntWritable data =newIntWritable();publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{//输入的key是偏移量,value是那个数值//首先将输入数据从Text类型转换为String类型,并赋值给text变量。String text = value.toString();//然后将text转换为Int类型,并将其封装到IntWritable对象中,赋值给data变量。
data.set(Integer.parseInt(text));//将data作为输出key,new IntWritable(1)作为输出值value写入上下文中
context.write(data,newIntWritable(1));}}//在进入reduce之前会有一个partition的过程,但因为我们现在电脑配置的datenode只有一个,所以最后其实都会就进入那一个dateNode。publicstaticclassPartitionextendsPartitioner<IntWritable,IntWritable>{publicintgetPartition(IntWritable key,IntWritable value,int num_Partition){//getPartition方法的三个参数分别表示输入键、输入值和分区数。intMaxnumber=65223;//首先定义了一个最大数字Maxnumber,并根据分区数计算出每个分区的边界bound//即将Maxnumber均匀地分为num_Partition个部分int bound =Maxnumber/num_Partition+1;//接着获取当前输入键的整数值keynumberint keynumber = key.get();for(int i =0; i<num_Partition; i++){//然后遍历所有分区,通过比较keynumber与边界值的大小关系,找到它应该属于的分区。if(keynumber<bound *(i+1)&& keynumber>=bound * i){//如果找到了对应的分区,则返回该分区的编号i;//否则,如果在所有分区中都没有找到对应的分区,则返回-1,表示出错。return i;}}return-1;}}//对Map类输出的中间结果按键值排序,为每个键值对添加一个唯一的序号,并将排序后的结果作为最终输出结果。//在本例中,输出结果是一个序号与整数对应的列表。publicstaticclassReduceextendsReducer<IntWritable,IntWritable,IntWritable,IntWritable>{//Reduce的泛型参数分别表示输入键、输入值、输出键和输出值的类型。privatestaticIntWritable line_num =newIntWritable(1);publicvoidreduce(IntWritable key,Iterable<IntWritable> values,Context context)throwsIOException,InterruptedException{//遍历Iterable<IntWritable>类型的values参数,将其按照key值排序后输出。for(IntWritable val : values){//这里的排序是自动排序//使用context.write方法将line_num作为输出键,key作为输出值写入上下文中。
context.write(line_num, key);//每输出一个键值对,line_num的值就加1,以保证输出的键值对具有唯一的序号。
line_num =newIntWritable(line_num.get()+1);}//这里for循环的意义就是避免去重,让key相同的都能遍历输出。}}publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();
conf.set("fs.default.name","hdfs://localhost:9000");String[] otherArgs =newString[]{"input","output"};if(otherArgs.length !=2){System.err.println("Usage: wordcount <in><out>");System.exit(2);}Job job =Job.getInstance(conf,"Merge and sort");//设置运行的主类为MergeSort。
job.setJarByClass(MergeSort.class);//设置Mapper类为Map。
job.setMapperClass(Map.class);//设置Reducer类为Reduce。
job.setReducerClass(Reduce.class);//设置Partitioner类为Partition。
job.setPartitionerClass(Partition.class);//设置输出键类型为IntWritable。
job.setOutputKeyClass(IntWritable.class);//设置输出值类型为IntWritable。
job.setOutputValueClass(IntWritable.class);//使用addInputPath()方法将输入路径添加到任务中FileInputFormat.addInputPath(job,newPath(otherArgs[0]));//使用setOutputPath()方法将输出路径设置到任务中。FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));//使用waitForCompletion()方法启动任务,并等待任务完成。//如果任务执行成功,则返回0;否则,返回1。在最后使用System.exit()方法退出程序。System.exit(job.waitForCompletion(true)?0:1);}}
🐇对指定的表格进行信息挖掘
packagehdfs;importjava.io.IOException;importjava.util.*;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;publicclass map
{//用于输出标题publicstaticint time =0;//“child_name”、“parent_name”和“relation_type”作为键值对输出到Context对象中。publicstaticclassMapextendsMapper<Object,Text,Text,Text>{publicvoidmap(Object key,Text value,Context context)throwsIOException,InterruptedException{String child_name =newString();String parent_name =newString();String relation_type =newString();String line = value.toString();int i =0;while(line.charAt(i)!=' '){//将输入的每一行文本数据以空格为分隔符
i++;}//将第一个部分作为子节点名称,第二个部分作为父节点名称,以及一个关系类型组成的字符串String[] values ={line.substring(0,i),line.substring(i+1)};if(values[0].compareTo("child")!=0){//如果“child_name”不等于“child”(就不是标题) //key是父亲,1
child_name = values[0];
parent_name = values[1];
relation_type ="1";
context.write(newText(values[1]),newText(relation_type+"+"+child_name+"+"+parent_name));//key是孩子,2
relation_type ="2";
context.write(newText(values[0]),newText(relation_type+"+"+child_name+"+"+parent_name));}}}publicstaticclassReduceextendsReducer<Text,Text,Text,Text>{publicvoidreduce(Text key,Iterable<Text> values,Context context)throwsIOException,InterruptedException{if(time ==0){//用于标题
context.write(newText("grand_child"),newText("grand_parent"));
time++;}int grand_child_num =0;//用于在数组里插入String grand_child[]=newString[10];//孙辈的数组 int grand_parent_num =0;//用于在数组里插入String grand_parent[]=newString[10];//祖辈的数组 Iterator ite = values.iterator();while(ite.hasNext()){Stringrecord= ite.next().toString();int len =record.length();int i =2;//i=0是relation_type,i=1是“+”if(len ==0)continue;char relation_type =record.charAt(0);String child_name =newString();String parent_name =newString();while(record.charAt(i)!='+'){//child,从2开始
child_name = child_name +record.charAt(i);
i++;}
i=i+1;//一个加号while(i<len){//"childname+"后面的内容
parent_name = parent_name+record.charAt(i);
i++;}if(relation_type =='1'){//父亲,就取孩子的名字,就是孙辈的名字
grand_child[grand_child_num]= child_name;
grand_child_num++;}else{//孩子,就取父亲的名字,就是祖辈的名字
grand_parent[grand_parent_num]= parent_name;
grand_parent_num++;}}if(grand_parent_num !=0&& grand_child_num !=0)//全排列{for(int m =0;m<grand_child_num;m++){for(int n=0;n<grand_parent_num;n++){
context.write(newText(grand_child[m]),newText(grand_parent[n]));//对每一行进行reduce}}}}}publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();
conf.set("fs.default.name","hdfs://localhost:9000");String[] otherArgs =newString[]{"input","output"};if(otherArgs.length !=2){System.err.println("Usage: wordcount <in><out>");System.exit(2);}Job job =Job.getInstance(conf,"Single table join");//设置运行的主类为map。
job.setJarByClass(map.class);//设置Mapper类为Map。
job.setMapperClass(Map.class);//设置Reducer类为Reduce。
job.setReducerClass(Reduce.class);//设置输入数据和输出结果的键值类型。
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);//设置输入数据路径。FileInputFormat.addInputPath(job,newPath(otherArgs[0]));//设置输出结果路径。FileOutputFormat.setOutputPath(job,newPath(otherArgs[1]));//使用waitForCompletion()方法启动任务,并等待任务完成。//如果任务执行成功,则返回0;否则,返回1。在最后使用System.exit()方法退出程序。System.exit(job.waitForCompletion(true)?0:1);}}
补充学习博客:MapReduce编程规范及示例编写
本文转载自: https://blog.csdn.net/m0_63398413/article/details/130049586
版权归原作者 啦啦右一 所有, 如有侵权,请联系我们删除。
版权归原作者 啦啦右一 所有, 如有侵权,请联系我们删除。