0


Hadoop(九)MapReduce 案例2

MapReduce 处理阶段

MapReduce 框架通常由三个阶段组成:

  • Map:读取文件数据,按照规则对文本进行拆分,生成 KV 形式的数据。
  • Shuffle:工作节点根据输出键(由 map 函数生成)重新分配数据,对数据排序、分组、拷贝,目的是属于一个键的所有数据都位于同一个工作节点上。
  • Reduce:工作节点并行处理每个键的一组数据,对结果进行汇总。

下图把 MapReduce 的过程分为两个部分,而实际上从两边的 Map 和 Reduce 到中间的那一大块都属于 Shuffle 过程,也就是说,Shuffle 过程有一部分是在 Map 端,有一部分是在 Reduce 端。

案例统计单词出现的次数

数据

hello hbase
hello hadoop
hello hive
hello kubernetes
hello java

原理图

wordcount 代码实现

用户编写的 MapReduce 程序分成三个部分:Mapper,Reducer,Driver:

  • 用户自定义 Mapper 类继承 Mapper 类,实现 map() 方法,输出和输出的数据都是 <K,V> 对形式,<K,V> 类型可以根据实际情况自定义。MapTask 进程对每一个 <K,V> 调用一次。
  • 用户自定义 Reduce 类继承 Reduce 类,实现 reduce() 方法,输出和输出的数据都是 <K,V> 对形式,<K,V> 类型可以根据实际情况自定义。Reducetask 进程对每一组相同 K 的 <K,V> 组调用一次 reduce() 方法。
  • 整个 MapReduce 程序需要一个 Drvier 类来进行提交,提交的是一个描述了各种必要信息的 Job 对象。

在 HDFS 中创建目录并上传文件

在 HDFS 中创建一个目录

hadoop fs -mkdir /wcinput

#将本机 words.txt 文件上传到 HDFS 的 /wcinput 目录中

hadoop fs -put /words.txt /wcinput

引入pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>hadoopmapreduce</artifactId>
    <version>1.0-SNAPSHOT</version>
<dependencies>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-slf4j-impl</artifactId>
        <version>2.12.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>3.1.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>3.1.3</version>
    </dependency>

 </dependencies>

</project>

编写 Mapper 类

编写一个 WordMapper 类继承 Mapper 类,并重写 map() 方法。Mapper 类是一个泛型类,4 个泛型类型分别代表(KeyIn,ValueIn,KeyOut,ValueOut )。泛型的类型可以根据自己实际的场景来指定。在 wordcount 这个例子中指定的类型如下:

  • KeyIn(输入的键):LongWritable 类型,表示每行文字的起始位置(偏移量)
  • ValueIn(输入的值):Text 类型,表示每行的文本。
  • KeyOut(输出的键):Text 类型,表示每个单词。
  • ValueOut(输出的值为):LongWritable 类型,表示单词出现的次数(1次)

Mapper 阶段依次读取每一行的数据,每行按照空格拆分出单词,得到 <单词,1> 的键值对,键是单词,值是 1,之后 Reduce 阶段累计单词出现的次数就累加 1 即可。

Mapper 阶段代码如下:

package words;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

/**
 * @description Map 阶段,分别计算每行每个单词出现的次数,key 是单词,value 为 1(表示 1 个单词)。
 */
public class WordMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //1、切分单词
        String[] words = value.toString().split(" ");
        //2、单词转换 单词 -> <单词,1>
        for (String word : words) {
            //3、写入到上下文
            context.write(new Text(word),new LongWritable(1));
        }
    }
}

编写 Reduce 类

编写一个类 WordReducer 继承 Reducer 类,并重写 reduce() 方法。Reducer 类是也是一个泛型类,4 个泛型类型分别代表(KeyIn,ValueIn,KeyOut,ValueOut )泛型的类型可以根据自己实际的场景来指定。在 这个例子中我们指定的类型如下:

  • KeyIn(输入的键):Text 类型,表示每个单词
  • ValueIn(输入的值):LongWritable 类型,表示单词出现的次数(1次)
  • KeyOut(输出的键):Text 类型,表示每个单词
  • ValueOut(输出的值为):LongWritable 类型,表示单词出现的总数

Reduce 阶段接收到数据键是单词,值是一个可迭代的对象,是相同单词对应的次数(每个都是 1),只需要把这些 1 累加起来,就可以得到单词出现的总数了。

执行完后达到的效果

 <hello,[1,1,1,1,1]>

Reduce 阶段代码如下:

package words;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**

 * @description Reduce 阶段,把 key 相同的数据进行累计,得到每个单词出现的次数
 */
public class WordReducer extends Reducer<Text, LongWritable,Text,LongWritable> {
@Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
        //1、定义一个变量
        long count = 0;
        //2、迭代
        for (LongWritable value : values) {
            count += value.get();
        }
        //3、写入上下文
        context.write(key,new LongWritable(count));
    }
}

编写 Driver 类

创建提交给 YARN 集群运行的 Job 对象,封装了 MapReduce 程序运行所需要的相关参数,例如输入数据路径,输出数据路径,Mapper 参数,Reduce 参数

package words;

import count.CharCountDriver;
import count.CharCountMapper;
import count.CharCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;

public class JobMain {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        //一、初始化Job
        Configuration configuration = new Configuration();

        //获取运行命令的参数,参数一:输入文件路径,参数二:输出文件路径
        //如果输入路径是一个文件,那么只处理这个文件,如果指定的路径是目录,则处理这个目录下的所有文件
        //输出路径只能是不存在的目录名
        String [] otherArgs = new GenericOptionsParser(configuration,args).getRemainingArgs();
        if(otherArgs.length < 2){
            System.err.println("必须提供输入文件路径和输出文件路径");
            System.exit(2);
        }
        Job job = Job.getInstance(configuration, "mr");
        job.setJarByClass(JobMain.class);

        //二、设置Job的相关信息 8个小步骤
        //1、设置输入路径
        job.setInputFormatClass(TextInputFormat.class);
        //本地运行
        //TextInputFormat.addInputPath(job,new Path("/tmp/input/mr1.txt"));
        TextInputFormat.addInputPath(job,new Path(args[0]));

        //2、设置Mapper类型,并设置输出键和输出值
        job.setMapperClass(WordMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        //shuffle阶段,使用默认的
        //3、设置Reducer类型,并设置输出键和输出值
        job.setReducerClass(WordReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        //4、设置输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        //本地运行
        //TextOutputFormat.setOutputPath(job,new Path("/tmp/output/mr"));
        TextOutputFormat.setOutputPath(job,new Path(args[1]));
        //三、等待完成
        boolean isfinish = job.waitForCompletion(true);
        System.out.println(isfinish ==true?"MapReduce 任务执行成功!":"MapReduce 任务执行失败!");
        System.exit(isfinish  ? 0 : 1);
    }
}

idea中打包成jar包

上传JAR包到服务器上

注意:如果输入路径是一个文件,那么只处理这个文件,如果指定的路径是目录,则处理这个目录下的所有文件

执行 hadoop jar

hadoop jar /hadoopmapreduce-1.0-SNAPSHOT.jar danci.JobMain /wcinput /my_wcoutput

查看结果


本文转载自: https://blog.csdn.net/sadfasdfsafadsa/article/details/141276718
版权归原作者 薛定谔的猫1981 所有, 如有侵权,请联系我们删除。

“Hadoop(九)MapReduce 案例2”的评论:

还没有评论