0


大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出

3.6.1OutputFormat接口实现类

OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

1、文本输出TextOutputFormat

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,疑问TextOutputFormat调用toString()方法把他们转换为字符串。

2、SequenceFileOutputFormat

将SequenceFileOutputFormat输出作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩

3、自定义OutputFormat

根据用户需求,自定义实现输出。

3.6.2自定义OutputFormat

1、使用场景

为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

例如:要在一个MapReduce程序中根据数据的不同输出两类结果到不同的目录,这类灵活的输出需求可以通过自定义OutputFormat来实现。

2、自定义OUtputFormat步骤

(1)自定义一个类继承FileOutputFormat。

(2)改写RecordWriter,具体改写输出数据的方法write()。

3.6.3自定义OutputFormat案例实操

1、需求

过滤输入的log日志,包含atguigu的网站输出到e:/atguigu.log,不包含atguigu的网站输出到e:/other.log。

(1)输入数据

http://www.baidu.com
http://www.google.com
http://cn.bing.com
http://www.atguigu.com
http://www.sohu.com
http://www.sina.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sindsafa.com

(2)期望输出数据

http://www.atguigu.com
http://cn.bing.com
http://www.baidu.com
http://www.google.com
http://www.sin2a.com
http://www.sin2desa.com
http://www.sina.com
http://www.sindsafa.com
http://www.sohu.com

2、需求分析

Untitled

3、案例实操

(1)编写FilterMapper类

packagecom.cuiyf41.output;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importjava.io.IOException;publicclassFilterMapperextendsMapper<LongWritable,Text,Text,NullWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,NullWritable>.Context context)throwsIOException,InterruptedException{// 写出
        context.write(value,NullWritable.get());}}

(2)编写FilterReducer类

packagecom.cuiyf41.output;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importjava.io.IOException;publicclassFilterReducerextendsReducer<Text,NullWritable,Text,NullWritable>{Text k =newText();@Overrideprotectedvoidreduce(Text key,Iterable<NullWritable> values,Reducer<Text,NullWritable,Text,NullWritable>.Context context)throwsIOException,InterruptedException{// 1 获取一行String line = key.toString();// 2 拼接
        line = line +"\r\n";// 3 设置key
        k.set(line);// 4 输出
        context.write(k,NullWritable.get());}}

(3)自定义一个OutputFormat类

packagecom.atguigu.mapreduce.outputformat;importjava.io.IOException;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.RecordWriter;importorg.apache.hadoop.mapreduce.TaskAttemptContext;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;publicclassFilterOutputFormatextendsFileOutputFormat<Text,NullWritable>{@OverridepublicRecordWriter<Text,NullWritable>getRecordWriter(TaskAttemptContext job)throwsIOException,InterruptedException{// 创建一个RecordWriterreturnnewFilterRecordWriter(job);}}

(4)编写RecordWriter类

packagecom.cuiyf41.output;importorg.apache.hadoop.fs.FSDataOutputStream;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IOUtils;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.RecordWriter;importorg.apache.hadoop.mapreduce.TaskAttemptContext;importjava.io.IOException;publicclassFilterRecordWriterextendsRecordWriter<Text,NullWritable>{FSDataOutputStream atguiguOut =null;FSDataOutputStream otherOut =null;publicFilterRecordWriter(TaskAttemptContext job){// 1 获取文件系统FileSystem fs;try{
            fs =FileSystem.get(job.getConfiguration());// 2 创建输出文件路径Path atguiguPath =newPath("e:/atguigu.log");Path otherPath =newPath("e:/other.log");// 3 创建输出流
            atguiguOut = fs.create(atguiguPath);
            otherOut = fs.create(otherPath);}catch(IOException e){
            e.printStackTrace();}}@Overridepublicvoidwrite(Text key,NullWritable value)throwsIOException,InterruptedException{// 判断是否包含“atguigu”输出到不同文件if(key.toString().contains("atguigu")){
            atguiguOut.write(key.toString().getBytes());}else{
            otherOut.write(key.toString().getBytes());}}@Overridepublicvoidclose(TaskAttemptContext context)throwsIOException,InterruptedException{// 关闭资源IOUtils.closeStream(atguiguOut);IOUtils.closeStream(otherOut);}}

(5)编写FilterDriver类

packagecom.cuiyf41.output;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.FileSystem;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;publicclassFilterDriver{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,ClassNotFoundException{// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
        args =newString[]{"e:/input/log.txt","e:/output2"};Configuration conf =newConfiguration();Job job =Job.getInstance(conf);

        job.setJarByClass(FilterDriver.class);
        job.setMapperClass(FilterMapper.class);
        job.setReducerClass(FilterReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);// 要将自定义的输出格式组件设置到job中
        job.setOutputFormatClass(FilterOutputFormat.class);Path input =newPath(args[0]);Path output =newPath(args[1]);// 如果输出路径存在,则进行删除FileSystem fs =FileSystem.get(conf);if(fs.exists(output)){
            fs.delete(output,true);}FileInputFormat.setInputPaths(job, input);// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录FileOutputFormat.setOutputPath(job, output);boolean result = job.waitForCompletion(true);System.exit(result ?0:1);}}

本文转载自: https://blog.csdn.net/yiluohan0307/article/details/129232066
版权归原作者 yiluohan0307 所有, 如有侵权,请联系我们删除。

“大数据框架之Hadoop:MapReduce(三)MapReduce框架原理——OutputFormat数据输出”的评论:

还没有评论