大致流程如下:
第一步:开发Map阶段代码
第二步:开发Reduce阶段代码
第三步:组装Job
在idea中创建WordCountJob类
添加注释,梳理一下需求:
需求:读取hdfs上的hello.txt文件,计算文件中每个单词出现的总次数
hello.txt文件内容如下:
hello you
hello me
最终需要的结果形式如下:
hello 2
me 1
you 1
先创建map阶段的代码,在这里需要自定义一个mapper类,继承框架中的Mapper类
public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
Logger logger = LoggerFactory.getLogger(MyMapper.class);
/**
* 需要实现map函数
* 这个map函数就是可以接<k1,v1> 产生k2 v2
* @param k1
* @param v1
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException {
//输出k1,v1的值
//System.out.println("<k1,v1>=<"+k1.get()+","+v1.toString());
//logger.info("<k1,v1>=<"+k1.get()+","+v1.toString()+">");
//k1 代表的是每一行数据的行首偏移量 v1代表的是每一行内容
//对获取到的每一行数据进行切割,把单词切割出来
String[] words = v1.toString().split(" ");
//迭代切割出来的单词数据
for (String word : words){
//把迭代出来的单词封装成<k2,v2>的形式
Text k2 = new Text(word);
LongWritable v2 = new LongWritable(1L);
//把<k2,v2>写出去
context.write(k2,v2);
}
}
然后是reduce阶段的代码,需要自定义一个reducer类,继承框架中的reducer。
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
Logger logger = LoggerFactory.getLogger(MyReducer.class);
/**
* 针对<>k2,{v2...}> 的数据进行累加求和,并且最终把数据转化为k3,v3写出去
* @param k2
* @param v2s
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s, Context context) throws IOException, InterruptedException {
//创建一个sum变量, 保存v2s的值
long sum = 0L;
//对v2s中的数据累加求和
for(LongWritable v2:v2s){
//输出k2,v2的值
//System.out.println("<k2,v2>=<"+k2.toString()+","+v2.get()+"");
logger.info("<k2,v2>=<"+k2.toString()+","+v2.get()+">");
sum += v2.get();
}
//组装k3, v3
Text k3 = k2;
LongWritable v3 = new LongWritable(sum);
//输出k3,v3的值
//System.out.println("<k3,v3>=<"+k3.toString()+","+v3.get()+"");
logger.info("<k3,v3>=<"+k3.toString()+","+v3.get()+">");
//把结果写出去
context.write(k3,v3);
}
}
最终组装job,job等于map+reduce
public static void main(String[] args){
try{
if(args.length!=2){
System.exit(100);
}
//指定Job需要的配置参数
Configuration conf = new Configuration();
//创建一个Job
Job job = Job.getInstance(conf);
//注意了:这一行必须设置,否则在集群中执行的时候是找不到WordConutJob这个类的
job.setJarByClass(WordCountJob.class);
//指定输入路径(可以是文件,也可以是目录)
FileInputFormat.setInputPaths(job,new Path(args[0]));
//指定输出路径(只能指定一个不存在的目录)
FileOutputFormat.setOutputPath(job,new Path(args[1]));
//指定map相关的代码
job.setMapperClass(MyMapper.class);
//指定k2的类型
job.setMapOutputKeyClass(Text.class);
//指定v2的类型
job.setMapOutputValueClass(LongWritable.class);
//指定reduce相关的代码
job.setReducerClass(MyReducer.class);
//指定k3类型
job.setOutputKeyClass(Text.class);
//指定v3类型
job.setOutputValueClass(LongWritable.class);
//提交job
job.waitForCompletion(true);
}catch (Exception e){
e.printStackTrace();
}
}
现在代码开发完毕了,现在我们是把自定义的mapper类和reducer类都放到了这个WordCountJob类中,主要是为了在学习阶段看起来清晰一些,所有的代码都在一个类中,好找,其实我们完全可以把自定义的mapper类和reducer类单独提出去,定义为单独的类,是没有什么区别的。
ok,那代码开发好了以后想要执行,我们需要打jar包上传到集群上去执行,这个时候需要在pom文件中添加maven的编译打包插件。
<build>
<plugins>
<!-- compiler插件, 设定JDK版本 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<encoding>UTF-8</encoding>
<source>1.8</source>
<target>1.8</target>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
注意了,这些添加完以后还有一个地方需要修改,需要在pom中的hadoop-client和log4j依赖中增加scope属性,值为provided,表示只在编译的时候使用这个依赖,在执行以及打包的时候都不使用,因为hadoop-client和log4j依赖在集群中都是有的,所以在打jar包的时候就不需要打进去了,如果我们使用到了集群中没有的第三方依赖包就不需要增加这个provided属性了,不增加provided就可以把对应的第三方依赖打进jar包里面了。
<!-- hadoop 的依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.3.2</version>
<!-- provided 表示这个依赖只在编译的时候,执行或者打jar包的时候都不使用-->
<scope>provided</scope>
</dependency>
<!-- log4j 的依赖-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.10</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.10</version>
<scope>provided</scope>
</dependency>
添加好了以后就可以打包了,建议在windows的cmd命令行下cd到项目根目录,然后执行mvn编译打包命令,看到最后输出的BUILD SUCCESS就说明执行成功了
命令执行成功之后,就可以到target目录下获取对应的jar包了,需要使用jar-with-dependencies结尾的那个jar包。
D:\IdeaProjects\db_hadoop\target\db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar
把这个jar包上传到集群的任意一台机器上面或者是hadoop客户端机器上都可以,只要这台机器可以和集群进行交互即可。
注意,这个jar包不能使用java -jar的方式执行,需要使用集群特有的执行方式
我把这个jar包上传到了bigdata01机器的/data/soft/hadoop-3.2.0目录下了,
确认一下
在向集群中正式提交任务jar包之前需要先把测试数据准备好
在本地创建一个hello.txt文件,内容是
[root@bigdata01 hadoop-3.3.2]# vi hello.txt
hello you
hello me
单词中间用空格隔开,因为我们在MapReduce代码中是使用空格进行切割单词的。
然后把hello.txt上传到HDFS的test目录下
[root@bigdata01 hadoop-3.3.2]# hdfs dfs -mkdir /test
[root@bigdata01 hadoop-3.3.2]# hdfs dfs -put hello.txt /test
[root@bigdata01 hadoop-3.3.2]# hdfs dfs -ls /test
Found 1 items
--re-r--r-- 2 root supergroup 19 2020-04-22 11:16 /test/hello.txt
接下来就可以向集群提交MapReduce任务了
具体的命令是这样的
hadoop jar db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar com.imooc.mr.WordCountJob /test /out
hadoop:表示使用hadoop脚本提交任务,其实在这里使用yarn脚本也是可以的,从hadoop2开始支持使用yarn,不过也兼容hadoop1,也继续支持使用hadoop脚本,所以在这里使用哪个都可以,具体就看你个人的喜好了,我是习惯于使用hadoop脚本
jar:表示执行jar包
db_hadoop-1.0-SNAPSHOT-jar-with-dependencies.jar:指定具体的jar包路径信息
com.imooc.mr.WordCountJob:指定要执行的mapreduce代码的全路径
/test/hello.txt:指定mapreduce接收到的第一个参数,代表的是输入路径,这里的输入路径可以直接指定hello.txt的路径,也可以直接指定它的父目录,因为它的父目录里面也没有其它无关的文件,如果指定目录的话就意味着hdfs会读取这个目录下所有的文件,所以后期如果我们需要处理一批文件,那就可以把他们放到同一个目录里面,直接指定目录即可。
/out:指定mapreduce接收到的第二个参数,代表的是输出目录,这里的输出目录必须是不存在的,MapReduce程序在执行之前会检测这个输出目录,如果存在会报错,因为它每次执行任务都需要一个新的输出目录来存储结果数据
版权归原作者 hhhecker 所有, 如有侵权,请联系我们删除。