0


Hadoop之mapreduce -- WrodCount案例以及各种概念

文章目录


一、MapReduce的优缺点

优点

– 1、易于编程
– 2、良好的扩展性
– 3、高容错性
– 4、非常适合大数据集的计算

缺点

– 1、不适合做实时计算
– 2、不适合做流式计算
– 3、不适合做有向图(DAG)计算

多个应用程序之间有依赖关系,后一个程序需要依赖前面的程序的结果。这种场景就称之为有向图

二、MapReduce案例–WordCount

1、导包

importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Partitioner;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importjava.io.IOException;

2、Mapper方法

classWordCountMapperextendsMapper<LongWritable,Text,Text,IntWritable>{@Override// key 值 指的是行偏移量// value 指的是 这一行数据protectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,IntWritable>.Context context)throwsIOException,InterruptedException{String line = value.toString();String[] arr = line.split("\\s+");for(String word : arr){
            context.write(newText(word),newIntWritable(1));}}}

3、Partitioner方法(自定义分区器)

//Map任务 --> Partitioner  --> ReducerclassWordCountPartitionerextendsPartitioner<Text,IntWritable>{publicintgetPartition(Text text,IntWritable intWritable,int i){String word = text.toString();char c = word.charAt(0);if(c >='a'&& c <='p'){return0;}elseif(c >='q'&& c <='z'){return1;}else{return2;}}}

4、reducer方法

classWordCountReducerextendsReducer<Text,IntWritable,Text,IntWritable>{@Overrideprotectedvoidreduce(Text key,Iterable<IntWritable> values,Reducer<Text,IntWritable,Text,IntWritable>.Context context)throwsIOException,InterruptedException{int sum =0;for(IntWritable value : values){
            sum += value.get();}
        context.write(key,newIntWritable(sum));}}

5、driver(main方法)

publicclassWordCountDriver{publicstaticvoidmain(String[] args)throwsIOException,InterruptedException,ClassNotFoundException{Configuration conf =newConfiguration();// 使用本地的文件系统,而不是hdfs
        conf.set("fs.defaultFS","file:///");// 使用本地的资源(CPU,内存等), 也可以使用yarn平台跑任务
        conf.set("mapreduce.framework.name","local");Job job =Job.getInstance(conf,"单词统计");// 指定 map
        job.setMapperClass(WordCountMapper.class);//指定分区器
        job.setPartitionerClass(WordCountPartitioner.class);// 设置reduceTask的数量// reduce的数量决定了reduceTask的任务数量,每一个任务,结束后都会产生一个文件 part-r-xxxxx
        job.setNumReduceTasks(3);//指定reducer
        job.setReducerClass(WordCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);FileInputFormat.setInputPaths(job,newPath("./mr01/input"));FileOutputFormat.setOutputPath(job,newPath("./mr01/output"));boolean result = job.waitForCompletion(true);System.exit(result?0:-1);}}

6、Writable(手机流量统计案例的实体类)

publicclassPhoneFlowWritableimplementsWritable{privateString phone;privateint upFlow;privateint downFlow;// 此处需要指定一个空参数的构造方法,否则报错:// java.lang.NoSuchMethodException: com.bigdata.phoneflow.PhoneFlowWritable.<init>()publicPhoneFlowWritable(){}publicPhoneFlowWritable(String phone,int upFlow,int downFlow){this.phone = phone;this.upFlow = upFlow;this.downFlow = downFlow;}publicStringgetPhone(){return phone;}publicvoidsetPhone(String phone){this.phone = phone;}publicintgetUpFlow(){return upFlow;}publicvoidsetUpFlow(int upFlow){this.upFlow = upFlow;}publicintgetDownFlow(){return downFlow;}publicvoidsetDownFlow(int downFlow){this.downFlow = downFlow;}@Overridepublicvoidwrite(DataOutput out)throwsIOException{
        out.writeUTF(phone);
        out.writeInt(upFlow);
        out.writeInt(downFlow);}@OverridepublicvoidreadFields(DataInput in)throwsIOException{
        phone =  in.readUTF();
        upFlow = in.readInt();
        downFlow = in.readInt();}}

三、关于片和块

1、什么是片,什么是块?

块是物理概念,片是逻辑概念。一般片 = 块,但是到最后一次的时候,有可能片> 块,但是绝对不能超过块的1.1倍。

2、mapreduce 启动多少个MapTask任务?

跟片有关系,有多少个片,就启动多少个map任务。跟块儿无关。

四、MapReduce的原理

AppMaster: 整个Job任务的核心协调工具
MapTask: 主要用于Map任务的执行
ReduceTask: 主要用于Reduce任务的执行

一个任务提交 --> AppMaster–> 根据切片的数量统计出需要多少个MapTask任务 --> 向ResourceManager(Yarn平台的老大)索要资源 --> 执行Map任务,先读取一个分片的数据,传递给map方法。–> map 方法不断的溢写 --> reduce 方法 --> 将统计的结果存放在磁盘上。

五、Shuffle 过程

MapReduce的Shuffle过程指的是MapTask的后半程,以及ReduceTask的前半程,共同组成的。 从MapTask中的map方法结束,到ReduceTask中的reduce方法开始,这个中间的部分就是Shuffle。是MapReduce的核心,心脏。

六、环形缓冲区

1、环形缓冲区,其实是一个数组,将数组分为两部分,分割的这个点就称之为轴心。
2、存储KV真实数据,是顺时针存储
3、每一个KV真实数据都有对应的元数据,元数据是逆时针存储。
4、当两者数据占用空间达到80%的时候,需要清理数组,清理完之后,轴心发生了变化

七、Combiner

Combiner其实就是运行在mapTask中的reducer。 Reducer其实就是合并代码的。Combiner是作用在Map端的。
Combiner 只能用于对统计结果没有影响的场景下。

一般只用于统计之和,统计最大值最小值的场景下。统计平均值等情况是不能用的。

本文转载自: https://blog.csdn.net/lzhlizihang/article/details/141967423
版权归原作者 lzhlizihang 所有, 如有侵权,请联系我们删除。

“Hadoop之mapreduce -- WrodCount案例以及各种概念”的评论:

还没有评论