MapReduce编程模板
1.自定义 Mapper类继承类并重写map方法:
importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;publicclassWordCountMapperextendsMapper<LongWritable,Text,Text,IntWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{String[] split = value.toString().split(",");//比如每一行aa bb cc,根据逗号, 切分后,把字符串为key,value为1for(String word : split){Text text =newText(word);IntWritable num =newIntWritable(1);
context.write(text, num);}}}//hadoop对应java数据类型
JavaHadoopintIntWritablelongLongWritablestringTextbyteByteWritabledoubleDoubleWritablefloatFloatWritablebooleanBooleanWritablenullNullWritable
自己定义的需要序列化和反序列化可以通过实现 Writable接口来使用。
在重写map方法时,如果中间处理数据时将类型转化为Java的数据类型,将结果写入上下文对象Context,要重新转为Hadoop的类型。
2.自定义Reducer类集成Reducer,并重写Reduce方法
importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;publicclassWordCountReducerextendsReducer<Text,IntWritable,Text,IntWritable>{@Overrideprotectedvoidreduce(Text key,Iterable<IntWritable> values,Context context)throwsIOException,InterruptedException{//循环遍历values 求和int sum=0;for(IntWritable v:values){//mapreduce的框架已经帮我们做好了从map出来后已经做好按key分组,// 也就是到这里的,Iterable<IntWritable> values 是同一个单词的数量迭代器,进行相加就可以得到最后的数量//类似于{"aa":[1,1,1,1,1]},所以统计aa单词出现的个数的话,只需要将迭代器中的[1,1,1,1,1]相加就可以得出总数
sum+=v.get();}
context.write(key,newIntWritable(sum));}}
3.Driver 主入口,整合mapper和reducer
(1) 配置conf并开启一个job
(2) 指定mapper类和reducer类
(3) 设置map输出key value的类型和设置reduce输出key value的类型
(4) 创建输入流FileInputFormat设置输入的hdfs的指定位置
(4) 创建输出流FileOutputFormat 将结果输出的hdfs的指定位置
(5) job提交语句:job.waitForCompletion(true) ,true表示需要打印日志
publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,ClassNotFoundException{Configuration conf =newConfiguration();// 设置默认hdfs访问路径
conf.set("fs.defaultFS","hdfs://master:9000");// 设置Windows跨平台提交job的参数// conf.set("mapreduce.app-submission.cross-platform","true");
conf.set("mapred.job.tracker","master:54311");// 配置访问用户System.setProperty("HADOOP_USER_NAME","root");//创建一个jobJob job =Job.getInstance(conf);
job.setJarByClass(test.class);
job.setMapperClass(CountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setReducerClass(CountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);FileInputFormat.addInputPath(job,newPath("hdfs://master:9000/data/stu_score_sub.csv"));Path out =newPath("hdfs://master:9000/output");FileSystem fs =FileSystem.get(conf);if(fs.exists(out)){
fs.delete(out,true);}//配置输入输出的路径FileOutputFormat.setOutputPath(job,out);
job.waitForCompletion(true);}
MapReduce各个情景实战
1.多个输入
求每个同学每科成绩的总分
chinese.txt
english.txt
math.txt
Student.java
注意:序列化/反序列化机制:当自定义了一个类之后,如果想要产生的对象在hadoop中进行传输,那么需要这个类实现Hadoop提供的Writable的接口只需要将按序写出并进行序列化/反序列化
importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;importorg.apache.hadoop.io.Writable;publicclassStudentimplementsWritable{privateString name;privateInteger chinese;privateInteger english;privateInteger math;publicStringgetName(){return name;}publicvoidsetName(String name){this.name = name;}publicIntegergetChinese(){return chinese;}publicvoidsetChinese(Integer chinese){this.chinese = chinese;}publicIntegergetEnglish(){return english;}publicvoidsetEnglish(Integer english){this.english = english;}publicIntegergetMath(){return math;}publicvoidsetMath(Integer math){this.math = math;}@Override// 反序列化 publicvoidreadFields(DataInput input)throwsIOException{this.name = input.readUTF();this.chinese = input.readInt();this.english = input.readInt();this.math = input.readInt();}@Override// 序列化publicvoidwrite(DataOutput output)throwsIOException{
output.writeUTF(name);
output.writeInt(chinese);
output.writeInt(english);
output.writeInt(math);}@OverridepublicStringtoString(){return"Student [name="+ name +", chinese="+ chinese +", english="+ english +", math="+ math +"]";}}
Mapper代码
importjava.io.IOException;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.lib.input.FileSplit;/**统计每个人三科的每科各月的总成绩
* key : 姓名 * value : student
* Map : 映射数据 *
* Mapper 数量 = 切片的数量
*/publicclassScoreMapperextendsMapper<LongWritable,Text,Text,Student>{@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,Student>.Context context)throwsIOException,InterruptedException{// 文件名称,根据文件名称判断成绩是哪个科目的FileSplit split =(FileSplit) context.getInputSplit();Student student =newStudent();// 每行的内容 // 1 zhang 89 月份 姓名 成绩 if(split.getPath().getName().equals("chinese.txt")){
student.setChinese(Integer.valueOf(score));
student.setEnglish(0);
student.setMath(0);}elseif(split.getPath().getName().equals("math.txt")){
student.setEnglish(Integer.valueOf(score));
student.setMath(0);
student.setChinese(0);}elseif(split.getPath().getName().equals("english.txt")){
student.setMath(Integer.valueOf(score));
student.setChinese(0);
student.setEnglish(0);}String lineContent = value.toString();String[] datas = lineContent.split(" ");String name = datas[1];String score = datas[2];
student.setName(name);
context.write(newText(name), student);}}
上面用到的FileSplit类用法
FileSplit fs = new FileSplit();
String pathname=fs.getPath().getName(); //获取目录名字
int depth = fs.getPath().depth(); //获取目录深度
fs.getClass(); //获取当前类
long length = fs.getLength(); //获取文件长度
SplitLocationInfo[] locationInfo =fs.getLocationInfo(); //获取位置信息
String[] locations = fs.getLocations(); //获取位置
Reducer代码
importjava.io.IOException;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;publicclassScoreReducerextendsReducer<Text,Student,Text,Student>{@Overrideprotectedvoidreduce(Text key,Iterable<Student> values,Reducer<Text,Student,Text,Student>.Context context)throwsIOException,InterruptedException{Student student =newStudent();
student.setName(key.toString());Integer chinese =0;Integer english =0;Integer math =0;for(Student stu : values){
chinese = chinese + stu.getChinese();
english = english + stu.getEnglish();
math = math + stu.getMath();}
student.setChinese(chinese);
student.setEnglish(english);
student.setMath(math);
context.write(key, student);}}
Driver代码
publicclassScoreDriver{publicstaticvoidmain(String[] args)throwsClassNotFoundException,IOException,InterruptedException{Configuration conf =newConfiguration();Job job =Job.getInstance(conf);
job.setJarByClass(ScoreDriver.class);
job.setMapperClass(ScoreMapper.class);
job.setReducerClass(ScoreReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Student.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Student.class);// 读取路径下的所有文件,此时 result 文件夹不存在 FileInputFormat.setInputPaths(job, new Path("hdfs://192.168.76.131:9000/score")); FileOutputFormat.setOutputPath(job, new Path("hdfs://192.168.76.131:9000/score/result")); job.waitForCompletion(true); }}
2.排序
根据电影热度对电影排序
惊天破 72
机械师2 83
奇异博士 67
但丁密码 79
比利林恩的中场战事 84
侠探杰克:永不回头 68
龙珠Z:复活的弗利萨 79
长城 56
Mapper 排序是根据KEY值进行排序的,所以 PO类作为KEY值
MovieBean.java
importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;importorg.apache.hadoop.io.WritableComparable;publicclassMovieBeanimplementsWritableComparable<MovieBean>{privateString name;privateInteger hotNum;publicStringgetName(){return name;}publicvoidsetName(String name){this.name = name;}publicIntegergetHotNum(){return hotNum;}publicvoidsetHotNum(Integer hotNum){this.hotNum = hotNum;}@OverridepublicvoidreadFields(DataInput input)throwsIOException{this.name = input.readUTF();this.hotNum = input.readInt();}@Overridepublicvoidwrite(DataOutput output)throwsIOException{
output.writeUTF(this.name);
output.writeInt(this.hotNum);}@OverridepublicStringtoString(){return"MovieBean [name="+ name +", hotNum="+ hotNum +"]";}// 降序排序:旧对象 - 当前对象 @OverridepublicintcompareTo(MovieBean o){//return Integer.compare(o.getHotNum(), this.getHotNum())return o.getHotNum()-this.getHotNum();}}
继承WritableComparable接口,重写 compareTo()函数,定义比较结果
Mapper代码
publicclassSortMapperextendsMapper<LongWritable,Text,MovieBean,NullWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,MovieBean,NullWritable>.Context context)throwsIOException,InterruptedException{String line = value.toString();String[] datas = line.split(" ");MovieBean movieBean =newMovieBean();
movieBean.setName(datas[0]);
movieBean.setHotNum(Integer.valueOf(datas[1]));
context.write(movieBean,NullWritable.get());}}
Driver代码
publicclassSortDriver{publicstaticvoidmain(String[] args)throwsIllegalArgumentException,IOException,ClassNotFoundException,InterruptedException{Configuration conf =newConfiguration();Job job =Job.getInstance(conf);
job.setJarByClass(SortDriver.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(MovieBean.class);
job.setMapOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job,newPath("hdfs://192.168.76.131:9000/sort"));FileOutputFormat.setOutputPath(job,newPath("hdfs://192.168.76.131:9000/sort/result")); job.waitForCompletion(true);}}
3.多层MR处理 多 Job 串联
一个稍复杂点的处理逻辑往往需要多个 MapReduce 程序串联处理,多 job 的串联可以借助 MapReduce 框架的 JobControl 实现
在第一层MR处理基础上
添加第二个JOB处理第一个JOB的运行结果
例子:
计算每人3个月的总收入并排序
第一个MR:计算每人的总收入
第二个MR:按照收入进行排序Mapper
以下有两个 MapReduce 任务,分别是 Flow的 SumMR 和 SortMR,其中有依赖关系:SumMR 的输出是 SortMR 的输入,所以 SortMR 的启动得在 SumMR 完成之后
publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();Job jobsum =Job.getInstance(conf);
jobsum.setJarByClass(RunManyJobMR.class);
jobsum.setMapperClass(FlowSumMapper.class);
jobsum.setReducerClass(FlowSumReducer.class);
jobsum.setMapOutputKeyClass(Text.class);
jobsum.setMapOutputValueClass(Flow.class);
jobsum.setCombinerClass(FlowSumReducer.class);
jobsum.setOutputKeyClass(Text.class);
jobsum.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(jobsum,"d:/flow/input");FileOutputFormat.setOutputPath(jobsum,newPath("d:/flow/output12"));Job jobsort =Job.getInstance(conf);
jobsort.setJarByClass(RunManyJobMR.class);
jobsort.setMapperClass(FlowSortMapper.class);
jobsort.setReducerClass(FlowSortReducer.class);
jobsort.setMapOutputKeyClass(Flow.class);
jobsort.setMapOutputValueClass(Text.class);
jobsort.setOutputKeyClass(NullWritable.class);
jobsort.setOutputValueClass(Flow.class);FileInputFormat.setInputPaths(jobsort,"d:/flow/output12");FileOutputFormat.setOutputPath(jobsort,newPath("d:/flow/sortoutput12"));ControlledJob sumcj =newControlledJob(jobsum.getConfiguration());ControlledJob sortcj =newControlledJob(jobsort.getConfiguration());
sumcj.setJob(jobsum);
sortcj.setJob(jobsort);// 设置作业依赖关系
sortcj.addDependingJob(sumcj);JobControl jc =newJobControl("flow sum and sort");
jc.addJob(sumcj);
jc.addJob(sortcj);Thread jobThread =newThread(jc);
jobThread.start();while(!jc.allFinished()){Thread.sleep(500);}
jc.stop();}
4.TopN算法-自定义 GroupComparator
输入文件格式
algorithm,liuyifei,75,85,62,48,54,96,15
computer,huangjiaju,85,75,86,85,85
english,liuyifei,76,95,86,74,68,74,48
english,huangdatou,48,58,67,86,15,33,85
algorithm,huanglei,76,95,86,74,68,74,48
algorithm,huangjiaju,85,75,86,85,85,74,86
computer,huangdatou,48,58,67,86,15,33,85
输出文件格式
k=3, 按课程分4个文件,每个文件保存平均成绩前3的人名和平均成绩
algorithm huangjiaju的成绩:62.0
algorithm liutao的成绩:56.57
algorithm huanglei的成绩:55.89
实现Comparable接口的比较类MyCom
staticclassMyComimplementsComparable<MyCom>{//首字段为人名,次字段为平均成绩privateString tname;privateDouble tscore;//自动生成getset方法publicStringgetTname(){return tname;}publicvoidsetTname(String tname){this.tname = tname;}publicDoublegetTscore(){return tscore;}publicvoidsetTscore(Double tscore){this.tscore = tscore;}@OverridepublicintcompareTo(MyCom o){//对传入的平均成绩进行比较returnthis.tscore.compareTo(o.getTscore());}}
Map代码
staticclassTopMaperextendsMapper<LongWritable,Text,Text,Text>{//输入类型为<偏移量,一行文本>,输出类型为<Text,Text>privateText mkey=newText();privateText mvalue=newText();@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{System.out.println("map");//按,拆分当前行字符串String[] lines=value.toString().split(",");//第一个字符串为课程,写入输出key
mkey.set(lines[0]);//过滤为空的非法数据if(lines==null||lines.length<1){return;}//按下标得到[2]以后所有字符串,转换为double求和double sum=0;for(int i=2;i<lines.length;i++){
sum+=newDouble(lines[i]);}//DecimalFormat规定小数点后保留两位DecimalFormat df=newDecimalFormat("0.00");//输出value为人名,平均成绩
mvalue.set(lines[1]+","+df.format (sum/lines.length-2));
context.write(mkey,mvalue);}}
Reduce代码
staticclassTopReduceerextendsReducer<Text,Text,Text,Text>{privateText rvalue=newText();@Overrideprotectedvoidreduce(Text mkey,Iterable<Text> iter,Context context)throwsIOException,InterruptedException{System.out.println("reduce");//将MyCom类放入List,通过ArrayList实现List<MyCom> slist=newArrayList<>();//遍历传入的人名和成绩for(Text it:iter){//按,拆分String[] lines = it.toString().split(",");MyCom c=newMyCom();
c.setTname(lines[0]);//写入人名
c.setTscore(newDouble(lines[1]));//写入平均成绩//将写好的MyCom放入List
slist.add(c);}//Collections.sort实现对列表的升序排序Collections.sort(slist);//Collections.reverse反转升序后的元素,即降序Collections.reverse(slist);//topk个元素,即输出平均成绩最高的前3条记录for(int k=0;k<3;k++){MyCom s = slist.get(k);
rvalue.set(s.getTname()+"的成绩:"+ s.getTscore());
context.write(mkey, rvalue);}}}
执行主方法
publicstaticvoidmain(String[] args)throwsIOException,ClassNotFoundException,InterruptedException{Configuration cfg=newConfiguration();Job job =Job.getInstance(cfg);//设置主方法所在的类
job.setJarByClass(Topk.class);
job.setMapperClass(TopMaper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setReducerClass(TopReduceer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);//设置reducetask数量为4,默认为1
job.setNumReduceTasks(4);//不重写Partitioner会按map输出的key进行分区,分区数为reducetask数//输入路径和输出路径的设置FileInputFormat.addInputPath(job,newPath("d:\\mr\\input\\grade.txt"));FileOutputFormat.setOutputPath(job,newPath("d:\\mr\\outtopk"));System.exit(job.waitForCompletion(true)?0:1);}
5.全局计数器
以下是一个利用全局计数器来统计一个目录下所有文件出现的单词总数和总行数
packagecom.mr.counter;importjava.io.IOException;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassCounterWordCount{enumCouterWordCountC{COUNT_WORDS,COUNT_LINES}publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();Job job =Job.getInstance(conf);
job.setJarByClass(CounterWordCount.class);
job.setMapperClass(WCCounterMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);Path inputPath =newPath("d:/wordcount/input");FileInputFormat.setInputPaths(job, inputPath);
job.setNumReduceTasks(0);Path outputPath =newPath("d:/wordcount/output");FileSystem fs =FileSystem.get(conf);if(fs.exists(outputPath)){
fs.delete(outputPath,true);}FileOutputFormat.setOutputPath(job, outputPath);boolean waitForCompletion = job.waitForCompletion(true);System.exit(waitForCompletion?0:1);}privatestaticclassWCCounterMapperextendsMapper<LongWritable,Text,Text,LongWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{// 统计行数,因为默认读取文本是逐行读取,所以 map 执行一次,行数+1
context.getCounter(CouterWordCountC.COUNT_LINES).increment(1L);String[] words = value.toString().split(" ");for(String word: words){// 统计单词总数,遇见一个单词就+1
context.getCounter(CouterWordCountC.COUNT_WORDS).increment(1L);}}}}
6.MapJoin
MapJoin 适用于有一份数据较小的连接情况。做法是直接把该小份数据直接全部加载到内存 当中,按链接关键字建立索引。然后大份数据就作为 MapTask 的输入,对 map()方法的每次 输入都去内存当中直接去匹配连接。然后把连接结果按 key 输出,这种方法要使用 hadoop 中的 DistributedCache 把小份数据分布到各个计算节点,每个 maptask 执行任务的节点都需 要加载该数据到内存,并且按连接关键字建立索引
现有两份数据 movies.dat 和 ratings.dat
数据样式分别为:
Movies.dat ---- 字段含义:movieid, moviename, movietype
1::Toy Story (1995)::Animation|Children's|Comedy
2::Jumanji (1995)::Adventure|Children's|Fantasy
3::Grumpier Old Men (1995)::Comedy|Romance
Ratings.dat ---- 字段含义:userid, movieid, rate, timestamp
1::1193::5::978300760
1::661::3::978302109
1::914::3::978301968
Select * from movie a join ratings b on a.movieid = b.movieid
现要求对两表进行连接,要求输出最终的结果有以上六个字段:
movieid, userid, rate, moviename, movietype, timestamp
第一步:封装 MovieRate,方便数据的排序和序列化
packagecom.mr.mymapjoin;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;importorg.apache.hadoop.io.WritableComparable;publicclassMovieRateimplementsWritableComparable<MovieRate>{privateString movieid;privateString userid;privateint rate;privateString movieName;privateString movieType;privatelong ts;publicStringgetMovieid(){return movieid;}publicvoidsetMovieid(String movieid){this.movieid = movieid;}publicStringgetUserid(){return userid;}publicvoidsetUserid(String userid){this.userid = userid;}publicintgetRate(){return rate;}publicvoidsetRate(int rate){this.rate = rate;}publicStringgetMovieName(){return movieName;}publicvoidsetMovieName(String movieName){this.movieName = movieName;}publicStringgetMovieType(){return movieType;}publicvoidsetMovieType(String movieType){this.movieType = movieType;}publiclonggetTs(){return ts;}publicvoidsetTs(long ts){this.ts = ts;}publicMovieRate(){}publicMovieRate(String movieid,String userid,int rate,String movieName,String movieType,long ts){this.movieid = movieid;this.userid = userid;this.rate = rate;this.movieName = movieName;this.movieType = movieType;this.ts = ts;}@OverridepublicStringtoString(){return movieid +"\t"+ userid +"\t"+ rate +"\t"+ movieName
+"\t"+ movieType +"\t"+ ts;}@Overridepublicvoidwrite(DataOutput out)throwsIOException{
out.writeUTF(movieid);
out.writeUTF(userid);
out.writeInt(rate);
out.writeUTF(movieName);
out.writeUTF(movieType);
out.writeLong(ts);}@OverridepublicvoidreadFields(DataInput in)throwsIOException{this.movieid = in.readUTF();this.userid = in.readUTF();this.rate = in.readInt();this.movieName = in.readUTF();this.movieType = in.readUTF();this.ts = in.readLong();}@OverridepublicintcompareTo(MovieRate mr){int it = mr.getMovieid().compareTo(this.movieid);if(it ==0){return mr.getUserid().compareTo(this.userid);}else{return it;}}}
第二步:编写 MapReduce 程序
packagecom.mr.mymapjoin;importjava.io.BufferedReader;importjava.io.FileReader;importjava.io.IOException;importjava.net.URI;importjava.util.HashMap;importjava.util.Map;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IOUtils;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.filecache.DistributedCache;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassMovieRatingMapJoinMR{publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();
conf.set("fs.defaultFS","hdfs://hadoop02:9000");System.setProperty("HADOOP_USER_NAME","hadoop");Job job =Job.getInstance(conf);// job.setJarByClass(MovieRatingMapJoinMR.class);
job.setJar("/home/hadoop/mrmr.jar");
job.setMapperClass(MovieRatingMapJoinMRMapper.class);
job.setMapOutputKeyClass(MovieRate.class);
job.setMapOutputValueClass(NullWritable.class);// job.setReducerClass(MovieRatingMapJoinMReducer.class);// job.setOutputKeyClass(MovieRate.class);// job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(0);String minInput = args[0];String maxInput = args[1];String output = args[2];FileInputFormat.setInputPaths(job,newPath(maxInput));Path outputPath =newPath(output);FileSystem fs =FileSystem.get(conf);if(fs.exists(outputPath)){
fs.delete(outputPath,true);}FileOutputFormat.setOutputPath(job, outputPath);URI uri =newPath(minInput).toUri();
job.addCacheFile(uri);boolean status = job.waitForCompletion(true);System.exit(status?0:1);}staticclassMovieRatingMapJoinMRMapperextendsMapper<LongWritable,Text,MovieRate,NullWritable>{// 用来存储小份数据的所有解析出来的 key-valueprivatestaticMap<String,Movie> movieMap =newHashMap<String,Movie>();@Overrideprotectedvoidsetup(Context context)throwsIOException,InterruptedException{Path[] localCacheFilePaths =DistributedCache.getLocalCacheFiles(context.getConfiguration());String myfilePath = localCacheFilePaths[0].toString();System.out.println(myfilePath);URI[] cacheFiles = context.getCacheFiles();System.out.println(cacheFiles[0].toString());BufferedReader br =newBufferedReader(newFileReader(myfilePath.toString()));// 此处的 line 就是从文件当中逐行读到的 movieString line ="";while(null!=(line = br.readLine())){String[] splits = line.split("::");
movieMap.put(splits[0],newMovie(splits[0], splits[1], splits[2]));}IOUtils.closeStream(br);}@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{String[] splits = value.toString().split("::");String userid = splits[0];String movieid = splits[1];int rate =Integer.parseInt(splits[2]);long ts =Long.parseLong(splits[3]);String movieName = movieMap.get(movieid).getMovieName();String movieType = movieMap.get(movieid).getMovieType();MovieRate mr =newMovieRate(movieid, userid, rate, movieName, movieType,ts);
context.write(mr,NullWritable.get());}}}
7.最简单的wordcount
测试数据:
zhangyong zhangrui zhangqin
zhangyong zhangrui zhangqin
zhangyong zhangrui zhangqin
mapper类
publicclassWordCountMapperextendsMapper<LongWritable,Text,Text,LongWritable>{@Overridepublicvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{// 首先获取一行数据String line = value.toString ();// 将行内的单词进行切分,使用一个数组进行保存,切分数据时根据源数据得知可以使用空格的方式切分。String[] arr = line.split (" ");for(String str : arr){
context.write (newText(str),newLongWritable(1));}}}
reducer类
publicclassWordCountReducerextendsReducer<Text,LongWritable,Text,LongWritable>{@Overridepublicvoidreduce(Text key,Iterable<LongWritable> values,Context context)throwsIOException,InterruptedException{// 定义变量记录单词出现的次数long sum =0;for(LongWritable val : values){// 记录总次数
sum += val.get ();}// 输出数据,key就是单词,value就是在map阶段这个单词出现的总次数
context.write (key,newLongWritable(sum));}}
Driver类
publicclassWordCountDriver{publicstaticvoidmain(String[] args)throwsIOException,ClassNotFoundException,InterruptedException{// 获取当前的默认配置Configuration conf =newConfiguration();// 获取代表当前mr作业的job对象Job job =Job.getInstance (conf);// 指定一下当前程序的入口类
job.setJarByClass (WordCountDriver.class);//指定当前Mapper、Reducer任务的类
job.setMapperClass (WordCountMapper.class);
job.setReducerClass (WordCountReducer.class);//设置Mapper的结果类型
job.setMapOutputKeyClass (Text.class);
job.setMapOutputValueClass (LongWritable.class);// 设置Reducer的结果类型
job.setOutputKeyClass (Text.class);
job.setOutputValueClass (LongWritable.class);//设置待分析的文件夹路径(linux的路径地址)FileInputFormat.setInputPaths (job,newPath("hdfs://anshun115:9000/mapreduce"));FileOutputFormat.setOutputPath (job,newPath("hdfs://anshun115:9000/result/mapreduce"));if(!job.waitForCompletion (true)){return;}}}
8.求温度平均值
测试数据:
2329999919500515070000
9909999919500515120022
9909999919500515180011
9509999919490324120111
6509999919490324180078
代码
publicclassHeightMapperextendsMapper<LongWritable,Text,Text,LongWritable>{@Overridepublicvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{//获取一段数据String line = value.toString ();//获取年份String year = line.substring (8,12);//获取温度(强制转换一下)int t =Integer.parseInt (line.substring (18,22));
context.write (newText(year),newLongWritable(t));}}publicclassHeightReducerextendsReducer<Text,LongWritable,Text,LongWritable>{@Overridepublicvoidreduce(Text year,Iterable<LongWritable> t,Context context)throwsIOException,InterruptedException{long max =0;for(LongWritable data : t){if(max < data.get ()){
max = data.get ();}}
context.write (year,newLongWritable(max));}}publicclassHeightDriver{publicstaticvoidmain(String[] args)throwsIOException,ClassNotFoundException,InterruptedException{Configuration conf =newConfiguration();Job job =Job.getInstance (conf);
job.setJarByClass (HeightDriver.class);
job.setMapperClass (HeightMapper.class);
job.setReducerClass (HeightReducer.class);
job.setMapOutputKeyClass (Text.class);
job.setMapOutputValueClass (LongWritable.class);
job.setOutputKeyClass (Text.class);
job.setOutputValueClass (LongWritable.class);FileInputFormat.setInputPaths (job,newPath("hdfs://anshun115:9000/wendu/"));FileOutputFormat.setOutputPath (job,newPath("hdfs://anshun115:9000/result/wendu"));
job.waitForCompletion (true);}}
9.分区多路输出
测试数据:
13901000123 zs bj 343
13202111011 ww sh 456
13901000123 zs bj 1024
13207551234 ls sz 758
Partitioner类
publicclassAddPartitionerextendsPartitioner<Text,PartFlowBean>{@OverridepublicintgetPartition(Text text,PartFlowBean flowBean,int
numPartitioner){String addr = flowBean.getAddr();if(addr.equals("bj")){return0;//输出part-r-00000}elseif(addr.equals("sh")){return1;//输出part-r-00001}else{return2;//输出part-r-00002}}}
编写MR
publicclassPartFlowMapperextendsMapper<LongWritable,Text,Text,PartFlowBean>{@Overridepublicvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{String line = value.toString ();/**
[13901000123,zk,bj,343]
phone = 13901000123;
name = zk;
addr = bj;
flow = 343;
*/String[] info = line.split (" ");PartFlowBean flowBean =newPartFlowBean();
flowBean.setPhone (info[0]);
flowBean.setName (info[1]);
flowBean.setAddr (info[2]);
flowBean.setFlow (Integer.parseInt (info[3]));
context.write (newText(flowBean.getName ()), flowBean);}}publicclassPartFlowReducerextendsReducer<Text,PartFlowBean,PartFlowBean,NullWritable>{@Overridepublicvoidreduce(Text key,Iterable<PartFlowBean> values,Context
context)throwsIOException,InterruptedException{PartFlowBean result =newPartFlowBean();for(PartFlowBean value : values){
result.setPhone (value.getPhone ());
result.setPhone (value.getPhone ());
result.setName (value.getName ());
result.setAddr (value.getAddr ());
result.setFlow (result.getFlow ()+ value.getFlow ());}
context.write (result,NullWritable.get ());}}publicclassPartFlowDriver{publicstaticvoidmain(String[] args)throwsException{Configuration conf =newConfiguration();Job job =Job.getInstance (conf);
job.setJarByClass (PartFlowDriver.class);
job.setMapperClass (PartFlowMapper.class);
job.setReducerClass (PartFlowReducer.class);/**
* 下面的两个类如果不写的话,那么就不会生效。
*/// 设置分区类
job.setPartitionerClass (AddPartitioner.class);// 设置分区数量
job.setNumReduceTasks (3);
job.setMapOutputKeyClass (Text.class);
job.setMapOutputValueClass (PartFlowBean.class);
job.setOutputKeyClass (PartFlowBean.class);
job.setOutputValueClass (NullWritable.class);FileInputFormat.setInputPaths (job,newPath("hdfs://anshun115:9000/partition"));FileOutputFormat.setOutputPath (job,newPath("hdfs://anshun115:9000/result/partition"));
job.waitForCompletion (true);}}
运行结果:
part-r-00000
FlowBean{phone='13901000123', name='zs', addr='bj', flow=1367}
part-r-00001
FlowBean{phone='13202111011', name='ww', addr='sh', flow=456}
part-r-00002
FlowBean{phone='13207551234', name='ls', addr='sz', flow=758}
10.分区并全排序
82 239 231
23 22 213
123 232 124
213 3434 232
4546 565 123
231 231
Partitioner类
/**
* @Author zhangyong
* @Date 2020/4/14 9:39
* @Version 1.0
* 全排序
* 将上述文件内容按照数字位数分别写入三个文件,如下
* 0-99的写入到文件1
* 100-999写入到文件2
* 1000-其他数据写入到文件3
*/publicclassAutoPartitionerextendsPartitioner<IntWritable,IntWritable>{@OverridepublicintgetPartition(IntWritable key,IntWritable value,int numPartitions){String num =String.valueOf (key.get ());if(num.matches ("[0-9][0-9]")|| num.matches ("[0-9]")){return0;}elseif(num.matches ("[0-9][0-9][0-9]")){return1;}else{return2;}}}
11.推荐认识好友
测试数据:
tom rose
tom jim
tom smith
tom lucy
rose tom
rose lucy
rose smith
jim tom
第一个mapper类
publicclassOneFriendMapperextendsMapper<LongWritable,Text,Text,Text>{/**
* 输入的key和value是根据文件内容来确定。
* 输出的key和value是因为在业务逻辑中设定的输出是name-friend好友关系。
*/@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{// 获取每行的数据String line = value.toString();// 获取姓名String name = line.split(" ")[0];// 获取好友String friend = line.split(" ")[1];
context.write(newText(name),newText(friend));}}
第一个reducer类
publicclassOneFriendReducerextendsReducer<Text,Text,Text,IntWritable>{/**
* 输入key和value要和mapper的输出保持一致。
* Text和IntWritable:
* 如果是好友-1,如果不是好友就用-2。
*/@Overrideprotectedvoidreduce(Text key,Iterable<Text> values,Context context)throwsIOException,InterruptedException{ArrayList<String> friendList =newArrayList<>();//处理好友关系for(Text value : values){
friendList.add(value.toString());if(key.toString().compareTo(value.toString())<0){
context.write(newText(key +"-"+ value),newIntWritable(1));}else{
context.write(newText(value +"-"+ key),newIntWritable(1));}}// 处理可能相识的好友。for(int i =0; i < friendList.size(); i++){for(int j =0; j < friendList.size(); j++){String friend1 = friendList.get(i);String friend2 = friendList.get(j);if(friend1.compareTo(friend2)<0){
context.write(newText(friend1 +"-"+ friend2),newIntWritable(2));}}}}}
第二个mapper类
publicclassTwoFriendMapperextendsMapper<LongWritable,Text,Text,IntWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Context context)throwsIOException,InterruptedException{// 获取一行数据String line = value.toString();// 获取朋友关系的信息String friendInfo = line.split("\t")[0];// 获取朋友关系的深度int deep =Integer.parseInt(line.split("\t")[1]);
context.write(newText(friendInfo),newIntWritable(deep));}}
第二个reducer类
publicclassTwoFriendReducerextendsReducer<Text,IntWritable,Text,NullWritable>{@Overrideprotectedvoidreduce(Text key,Iterable<IntWritable> values,Context context)throwsIOException,InterruptedException{Boolean flag =true;/**
* 设定好友关系为true的时候进行输出
* 因为题目要求是输出可能相识的好友。所以为true的代码应该是2
* 也就是好友关系为1的时候设置变量为false
*/for(IntWritable value : values){if(value.get()==1){
flag =false;}}if(flag){
context.write(key,NullWritable.get());}}}
Driver类
publicclassFriendDriver{publicstaticvoidmain(String[] args)throwsIOException,ClassNotFoundException,InterruptedException{Configuration conf =newConfiguration();//设置第一轮MapReduce的相应处理类与输入输出Job job1 =Job.getInstance(conf);
job1.setJarByClass(FriendDriver.class);
job1.setMapperClass(OneFriendMapper.class);
job1.setReducerClass(OneFriendReducer.class);
job1.setMapOutputKeyClass(Text.class);
job1.setMapOutputValueClass(Text.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);//设置路径(传输、结果)FileInputFormat.setInputPaths(job1,newPath("hdfs://anshun115:9000/friend"));FileOutputFormat.setOutputPath(job1,newPath("hdfs://anshun115:9000/result/friend"));//如果第一轮MapReduce完成再做这里的代码if(job1.waitForCompletion(true)){Job job2 =Job.getInstance(conf);// 设置第二个Job任务的Mapper
job2.setMapperClass(TwoFriendMapper.class);
job2.setMapOutputKeyClass(Text.class);
job2.setMapOutputValueClass(IntWritable.class);// 设置第二个Job任务的Reducer
job2.setReducerClass(TwoFriendReducer.class);
job2.setOutputKeyClass(Text.class);
job2.setOutputValueClass(NullWritable.class);/**
* 设置第二个Job任务是输入输出路径。
* 此处的输入路径是第一个job任务的输出路径
* 注意设置路径时,里面传入的job应该是当前的job任务,如下所示,应该是job2。
* 如果写成前面的job任务名称,在运行时则会爆出错误,提示路径不存在。
*/FileInputFormat.setInputPaths(job2,newPath("hdfs://anshun115:9000/result/friend"));FileOutputFormat.setOutputPath(job2,newPath("hdfs://anshun115:9000/result/friend2"));// 此处提交任务时,注意用的是job2。
job2.waitForCompletion(true);}}
12。自定义文件名并多路输出
publicclass two {publicstaticclassTWOMapperextendsMapper<LongWritable,Text,LongWritable,Text>{@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,LongWritable,Text>.Context context)throwsIOException,InterruptedException{String[] split = value.toString().split(",");// 课程String clazz = split[0];// 姓名String name = split[1];// 总分double zf =0;for(int i =2; i < split.length; i++){// 分数double fen =Double.parseDouble(split[i]);
zf += fen;}//平均分double v = zf /(split.length -2);BigDecimal bd =newBigDecimal(v);BigDecimal bd1 = bd.setScale(2,BigDecimal.ROUND_HALF_UP);
context.write(newLongWritable(bd1.longValue()),newText(clazz+","+name));}}publicstaticclassTWOReducerextendsReducer<LongWritable,Text,Text,NullWritable>{privateMultipleOutputs<Text,NullWritable> mos;@Overrideprotectedvoidsetup(Context context)throwsIOException,InterruptedException{//在setup运行时,重新初始化这个类
mos =newMultipleOutputs<Text,NullWritable>(context);}@Overrideprotectedvoidcleanup(Context context)throwsIOException,InterruptedException{
mos.close();}@Overrideprotectedvoidreduce(LongWritable key,Iterable<Text> values,Reducer<LongWritable,Text,Text,NullWritable>.Context context)throwsIOException,InterruptedException{for(Text value: values){// 课程String clazz = value.toString().split(",")[0];// 姓名String name = value.toString().split(",")[1];if(clazz.equals("computer"))
mos.write("computer",clazz+","+name+","+key,NullWritable.get());if(clazz.equals("english"))
mos.write("english",clazz+","+name+","+key,NullWritable.get());if(clazz.equals("algorithm"))
mos.write("algorithm",clazz+","+name+","+key,NullWritable.get());if(clazz.equals("math"))
mos.write("math",clazz+","+name+","+key,NullWritable.get());}}}publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,ClassNotFoundException{FileInputFormat.addInputPath(job,newPath("hdfs://master:9000/data/stu_score_sub.csv"));Path out =newPath("hdfs://master:9000/output2");FileSystem fs =FileSystem.get(conf);if(fs.exists(out)){
fs.delete(out,true);}//配置输入输出的路径FileOutputFormat.setOutputPath(job,out);MultipleOutputs.addNamedOutput(job,"computer",TextOutputFormat.class,Text.class,NullWritable.class);MultipleOutputs.addNamedOutput(job,"english",TextOutputFormat.class,Text.class,NullWritable.class);MultipleOutputs.addNamedOutput(job,"algorithm",TextOutputFormat.class,Text.class,NullWritable.class);MultipleOutputs.addNamedOutput(job,"math",TextOutputFormat.class,Text.class,NullWritable.class);
job.waitForCompletion(true);}}
版权归原作者 背帆 所有, 如有侵权,请联系我们删除。