3.6.1OutputFormat接口实现类
OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。
1、文本输出TextOutputFormat
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,疑问TextOutputFormat调用toString()方法把他们转换为字符串。
2、SequenceFileOutputFormat
将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。
3、自定义OutputFormat
根据用户需求,自定义实现输出。
3.6.2自定义OutputFormat
1、使用场景
为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。
例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同的目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。
2、自定义OUtputFormat步骤
(1)自定义一个类继承FileOutputFormat。
(2)改写RecordWriter,具体改写输出数据的方法write()。
3.6.3自定义OutputFormat案例实操
1、需求
过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。
(1)输入数据
http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com
(2)期望输出数据
http://www.atguigu.com
http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com
2、需求分析
3、案例实操
(1)编写FilterMapper类
packagecom.cuiyf41.output;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;publicclassFilterMapperextendsMapper<LongWritable,Text,Text,NullWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,NullWritable>.Context context)throwsIOException,InterruptedException{// 写出
context.write(value,NullWritable.get());}}
(2)编写FilterReducer类
packagecom.cuiyf41.output;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;publicclassFilterReducerextendsReducer<Text,NullWritable,Text,NullWritable>{Text k =newText();@Overrideprotectedvoidreduce(Text key,Iterable<NullWritable> values,Reducer<Text,NullWritable,Text,NullWritable>.Context context)throwsIOException,InterruptedException{// 1 获取一行String line = key.toString();// 2 拼接
line = line +"\r\n";// 3 设置key
k.set(line);// 4 输出
context.write(k,NullWritable.get());}}
(3)自定义一个OutputFormat类
packagecom.atguigu.mapreduce.outputformat;importjava.io.IOException;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.RecordWriter;importorg.apache.hadoop.mapreduce.TaskAttemptContext;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassFilterOutputFormatextendsFileOutputFormat<Text,NullWritable>{@OverridepublicRecordWriter<Text,NullWritable>getRecordWriter(TaskAttemptContext job)throwsIOException,InterruptedException{// 创建一个RecordWriterreturnnewFilterRecordWriter(job);}}
(4)编写RecordWriter类
packagecom.cuiyf41.output;importorg.apache.hadoop.fs.FSDataOutputStream;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IOUtils;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.RecordWriter;importorg.apache.hadoop.mapreduce.TaskAttemptContext;importjava.io.IOException;publicclassFilterRecordWriterextendsRecordWriter<Text,NullWritable>{FSDataOutputStream atguiguOut =null;FSDataOutputStream otherOut =null;publicFilterRecordWriter(TaskAttemptContext job){// 1 获取文件系统FileSystem fs;try{
fs =FileSystem.get(job.getConfiguration());// 2 创建输出文件路径Path atguiguPath =newPath("e:/atguigu.log");Path otherPath =newPath("e:/other.log");// 3 创建输出流
atguiguOut = fs.create(atguiguPath);
otherOut = fs.create(otherPath);}catch(IOException e){
e.printStackTrace();}}@Overridepublicvoidwrite(Text key,NullWritable value)throwsIOException,InterruptedException{// 判断是否包含“atguigu”输出到不同文件if(key.toString().contains("atguigu")){
atguiguOut.write(key.toString().getBytes());}else{
otherOut.write(key.toString().getBytes());}}@Overridepublicvoidclose(TaskAttemptContext context)throwsIOException,InterruptedException{// 关闭资源IOUtils.closeStream(atguiguOut);IOUtils.closeStream(otherOut);}}
(5)编写FilterDriver类
packagecom.cuiyf41.output;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;publicclassFilterDriver{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,ClassNotFoundException{// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args =newString[]{"e:/input/log.txt","e:/output2"};Configuration conf =newConfiguration();Job job =Job.getInstance(conf);
job.setJarByClass(FilterDriver.class);
job.setMapperClass(FilterMapper.class);
job.setReducerClass(FilterReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);// 要将自定义的输出格式组件设置到job中
job.setOutputFormatClass(FilterOutputFormat.class);Path input =newPath(args[0]);Path output =newPath(args[1]);// 如果输出路径存在,则进行删除FileSystem fs =FileSystem.get(conf);if(fs.exists(output)){
fs.delete(output,true);}FileInputFormat.setInputPaths(job, input);// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, output);boolean result = job.waitForCompletion(true);System.exit(result ?0:1);}}
版权归原作者 yiluohan0307 所有, 如有侵权,请联系我们删除。