0


Hadoop之实战WordCount

大致流程如下:
第一步:开发Map阶段代码
第二步:开发Reduce阶段代码
第三步:组装Job

在idea中创建WordCountJob类
添加注释,梳理一下需求:

需求:读取hdfs上的hello.txt文件,计算文件中每个单词出现的总次数
hello.txt文件内容如下:
hello you
hello me
最终需要的结果形式如下:
hello 2
me 1
you 1

先创建map阶段的代码,在这里需要自定义一个mapper类,继承框架中的Mapper类

public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
        Logger logger = LoggerFactory.getLogger(MyMapper.class);
        /**
         * 需要实现map函数
         * 这个map函数就是可以接<k1,v1> 产生k2 v2
         * @param k1
         * @param v1
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
            //输出k1,v1的值
            //System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString());
            //logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
            //k1 代表的是每一行数据的行首偏移量  v1代表的是每一行内容
            //对获取到的每一行数据进行切割,把单词切割出来
            String[] words = v1.toString().split(" ");
            //迭代切割出来的单词数据
            for (String word : words){
                //把迭代出来的单词封装成<k2,v2>的形式
                Text k2 = new Text(word);
                LongWritable v2 = new LongWritable(1L);
                //把<k2,v2>写出去
                context.write(k2,v2);
            }   
     }

然后是reduce阶段的代码,需要自定义一个reducer类,继承框架中的reducer。

public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
        Logger logger = LoggerFactory.getLogger(MyReducer.class);
        /**
         * 针对<>k2,{v2...}> 的数据进行累加求和,并且最终把数据转化为k3,v3写出去
         * @param k2
         * @param v2s
         * @param context
         * @throws IOException
         * @throws InterruptedException
         */
        @Override
        protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
            //创建一个sum变量, 保存v2s的值
            long sum = 0L;
            //对v2s中的数据累加求和
            for(LongWritable v2:v2s){
                //输出k2,v2的值
                //System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+"");
                logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
                sum += v2.get();
            }
            //组装k3, v3
            Text k3 = k2;
            LongWritable v3 = new LongWritable(sum);
            //输出k3,v3的值
            //System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+"");
            logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
            //把结果写出去
            context.write(k3,v3);
        }
    }

最终组装job,job等于map+reduce

public static void main(String[] args){
        try{
            if(args.length!=2){
                System.exit(100);
            }
            //指定Job需要的配置参数
            Configuration conf = new Configuration();
            //创建一个Job
            Job job = Job.getInstance(conf);
            //注意了:这一行必须设置,否则在集群中执行的时候是找不到WordConutJob这个类的
            job.setJarByClass(WordCountJob.class);
            //指定输入路径(可以是文件,也可以是目录)
            FileInputFormat.setInputPaths(job,new Path(args[0]));
            //指定输出路径(只能指定一个不存在的目录)
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            //指定map相关的代码
            job.setMapperClass(MyMapper.class);
            //指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            //指定v2的类型
            job.setMapOutputValueClass(LongWritable.class);
            //指定reduce相关的代码
            job.setReducerClass(MyReducer.class);
            //指定k3类型
            job.setOutputKeyClass(Text.class);
            //指定v3类型
            job.setOutputValueClass(LongWritable.class);
            //提交job
            job.waitForCompletion(true);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

现在代码开发完毕了,现在我们是把自定义的mapper类和reducer类都放到了这个WordCountJob类中,主要是为了在学习阶段看起来清晰一些,所有的代码都在一个类中,好找,其实我们完全可以把自定义的mapper类和reducer类单独提出去,定义为单独的类,是没有什么区别的。

ok,那代码开发好了以后想要执行,我们需要打jar包上传到集群上去执行,这个时候需要在pom文件中添加maven的编译打包插件。

<build>
        <plugins>
            <!-- compiler插件, 设定JDK版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <source>1.8</source>
                    <target>1.8</target>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

注意了,这些添加完以后还有一个地方需要修改,需要在pom中的hadoop-client和log4j依赖中增加scope属性,值为provided,表示只在编译的时候使用这个依赖,在执行以及打包的时候都不使用,因为hadoop-client和log4j依赖在集群中都是有的,所以在打jar包的时候就不需要打进去了,如果我们使用到了集群中没有的第三方依赖包就不需要增加这个provided属性了,不增加provided就可以把对应的第三方依赖打进jar包里面了。

<!-- hadoop 的依赖-->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.3.2</version>
            <!-- provided 表示这个依赖只在编译的时候,执行或者打jar包的时候都不使用-->
            <scope>provided</scope>
        </dependency>
        <!-- log4j 的依赖-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.10</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.10</version>
            <scope>provided</scope>
        </dependency>

添加好了以后就可以打包了,建议在windows的cmd命令行下cd到项目根目录,然后执行mvn编译打包命令,看到最后输出的BUILD SUCCESS就说明执行成功了
命令执行成功之后,就可以到target目录下获取对应的jar包了,需要使用jar-with-dependencies结尾的那个jar包。

D:\IdeaProjects\db_hadoop\target\db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar

把这个jar包上传到集群的任意一台机器上面或者是hadoop客户端机器上都可以,只要这台机器可以和集群进行交互即可。

注意,这个jar包不能使用java -jar的方式执行,需要使用集群特有的执行方式

我把这个jar包上传到了bigdata01机器的/data/soft/hadoop-3.2.0目录下了,
确认一下
在向集群中正式提交任务jar包之前需要先把测试数据准备好
在本地创建一个hello.txt文件,内容是

[root@bigdata01 hadoop-3.3.2]# vi hello.txt
hello you
hello me

单词中间用空格隔开,因为我们在MapReduce代码中是使用空格进行切割单词的。
然后把hello.txt上传到HDFS的test目录下

[root@bigdata01 hadoop-3.3.2]# hdfs dfs -mkdir /test
[root@bigdata01 hadoop-3.3.2]# hdfs dfs -put hello.txt /test
[root@bigdata01 hadoop-3.3.2]# hdfs dfs -ls /test
Found 1 items
--re-r--r--        2 root supergroup    19 2020-04-22 11:16 /test/hello.txt

接下来就可以向集群提交MapReduce任务了
具体的命令是这样的

hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test /out

hadoop:表示使用hadoop脚本提交任务,其实在这里使用yarn脚本也是可以的,从hadoop2开始支持使用yarn,不过也兼容hadoop1,也继续支持使用hadoop脚本,所以在这里使用哪个都可以,具体就看你个人的喜好了,我是习惯于使用hadoop脚本

jar:表示执行jar包

db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar:指定具体的jar包路径信息

com.imooc.mr.WordCountJob:指定要执行的mapreduce代码的全路径

/test/hello.txt:指定mapreduce接收到的第一个参数,代表的是输入路径,这里的输入路径可以直接指定hello.txt的路径,也可以直接指定它的父目录,因为它的父目录里面也没有其它无关的文件,如果指定目录的话就意味着hdfs会读取这个目录下所有的文件,所以后期如果我们需要处理一批文件,那就可以把他们放到同一个目录里面,直接指定目录即可。

/out:指定mapreduce接收到的第二个参数,代表的是输出目录,这里的输出目录必须是不存在的,MapReduce程序在执行之前会检测这个输出目录,如果存在会报错,因为它每次执行任务都需要一个新的输出目录来存储结果数据


本文转载自: https://blog.csdn.net/qq_52150032/article/details/124869485
版权归原作者 hhhecker 所有, 如有侵权,请联系我们删除。

“Hadoop之实战WordCount”的评论:

还没有评论