一、WordCount代码
(一)WordCount简介
WordCount是大数据经典案例,其逻辑就是有一个文本文件,通过编写java代码与Hadoop核心组件的操作,查询每个单词出现的频率。
1.wordcount.txt
hello java hello hadoop hello java hadoop java hadoop java hadoop hadoop java hello java
(二)WordCount的java代码
1.WordCountMapper
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
// Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
// <0,"hello world","hello",1>
public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
Text text = new Text();
IntWritable intWritable = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
System.out.println("WordCount stage Key:"+key+" Value:"+value);
String[] words = value.toString().split(" ");// "hello world" -->[hello,world]
for (String word :
words) {
text.set(word);
intWritable.set(1);
context.write(text,intWritable);// 输出键值对 <hello,1><world,1>
}
}
}
2.WordCountReduce
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
// <KEYIN, VALUEIN, KEYOUT, VALUEOUT>
public class WordCountReduce extends Reducer<Text, IntWritable,Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
System.out.println("Reduce stage Key:"+key+" Values:"+values.toString());
int count = 0;
for (IntWritable intWritable :
values) {
count += intWritable.get();
}
// LongWritable longWritable = new LongWritable();
// longWritable.set(count);
LongWritable longWritable = new LongWritable(count);
System.out.println("Key:"+key+" ResultValue:"+longWritable.get());
context.write(key,longWritable);
}
}
3.WordCountDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
// 设置mapper类
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 设置reduce类
job.setReducerClass(WordCountReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 指定map输入的文件路径
FileInputFormat
.setInputPaths(job,new Path("D:\\javaseprojects\\hadoopstu\\input\\demo1\\wordcount.txt"));
// 指定reduce结果输出的文件路径
Path path = new Path("D:\\javaseprojects\\hadoopstu\\output");
FileSystem fileSystem = FileSystem.get(path.toUri(),configuration);
if(fileSystem.exists(path)){
fileSystem.delete(path,true);
}
FileOutputFormat.setOutputPath(job,path);
job.waitForCompletion(true);
// job.setJobName("");
}
}
(三)IDEA运行结果
(四)Hadoop运行wordcount
1.在HDFS上新建一个文件目录
[root@lxm147 ~]# hdfs dfs -mkdir /inputpath
2023-02-10 23:05:40,098 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[root@lxm147 ~]# hdfs dfs -ls /
2023-02-10 23:05:52,217 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 3 items
drwxr-xr-x - root supergroup 0 2023-02-08 08:06 /aa
drwxr-xr-x - root supergroup 0 2023-02-10 10:52 /bigdata
drwxr-xr-x - root supergroup 0 2023-02-10 23:05 /inputpath
2.新建一个文件,并上传至该目录下
[root@lxm147 mapreduce]# vim ./test.csv
[root@lxm147 mapreduce]# hdfs dfs -put ./test.csv /inputpath
3.执行wordcount命令
[root@lxm147 mapreduce]# hadoop jar ./hadoop-mapreduce-examples-3.1.3.jar wordcount /inputpath /outputpath
4.查看运行结果
(1)web端
(2)命令行
[root@lxm147 mapreduce]# hdfs dfs -cat /outputpath/part-r-00000
2023-02-10 23:26:06,276 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2023-02-10 23:26:07,793 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
hadoop 1
hello 2
java 2
javaweb 1
mybatis 2
spring 1
5.第二次提交报错原因
执行wordcount命令前删除/outpath目录下的文件再执行即可
6.进入NodeManager查看
7.启动历史服务器(如果已经启动可以忽略此步骤)
[root@lxm148 ~]# mr-jobhistory-daemon.sh start historyserver
WARNING: Use of this script to start the MR JobHistory daemon is deprecated.
WARNING: Attempting to execute replacement "mapred --daemon start" instead.
[root@lxm148 ~]# jps
4546 SecondaryNameNode
6370 JobHistoryServer
4164 NameNode
4804 ResourceManager
4937 NodeManager
6393 Jps
4302 DataNode
8.查看历史服务信息
三、执行本地代码
(一)项目代码
1.stuscore.csv
1,zs,10,语文 2,ls,98,语文 3,ww,80,语文 1,zs,20,数学 2,ls,87,数学 3,ww,58,数学 1,zs,44,英语 2,ls,66,英语 3,ww,40,英语 1,zs,55,政治 2,ls,60,政治 3,ww,80,政治 1,zs,10,化学 2,ls,28,化学 3,ww,78,化学 1,zs,87,生物 2,ls,9,生物 3,ww,10,生物
2.Student类
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Student implements WritableComparable<Student> {
private long stuid;
private String stuname;
private int score;
private String lession;
@Override
public int compareTo(Student o) {
return this.score > o.score ? 1 : 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong(stuid);
dataOutput.writeUTF(stuname);
dataOutput.writeUTF(lession);
dataOutput.writeInt(score);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.stuid = dataInput.readLong();
this.stuname = dataInput.readUTF();
this.lession = dataInput.readUTF();
this.score = dataInput.readInt();
}
@Override
public String toString() {
return "Student{" +
"stuid=" + stuid +
", stuname='" + stuname + '\'' +
", score=" + score +
", lession='" + lession + '\'' +
'}';
}
public long getStuid() {
return stuid;
}
public void setStuid(long stuid) {
this.stuid = stuid;
}
public String getStuname() {
return stuname;
}
public void setStuname(String stuname) {
this.stuname = stuname;
}
public int getScore() {
return score;
}
public void setScore(int score) {
this.score = score;
}
public String getLession() {
return lession;
}
public void setLession(String lession) {
this.lession = lession;
}
public Student(long stuid, String stuname, int score, String lession) {
this.stuid = stuid;
this.stuname = stuname;
this.score = score;
this.lession = lession;
}
public Student() {
}
}
2.StudentMapper类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
// K=id,V=student
// Mapper<进来的K,进来的V,出去的K,出去的V>
public class StudentMapper extends Mapper<LongWritable, Text, LongWritable, Student> {
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, LongWritable, Student>.Context context) throws IOException, InterruptedException {
System.out.println(key+" "+value.toString());
String[] split = value.toString().split(",");
LongWritable stuidKey = new LongWritable(Long.parseLong(split[2]));
Student studentValue = new Student(Long.parseLong(split[0]), split[1], Integer.parseInt(split[2]),split[3]);
context.write(stuidKey,studentValue);
}
}
4.StudentReduce类
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class StudentReduce extends Reducer<LongWritable, Student, Student, NullWritable> {
@Override
protected void reduce(LongWritable key, Iterable<Student> values, Reducer<LongWritable, Student, Student, NullWritable>.Context context) throws IOException,
InterruptedException {
Student stu = new Student();
// 相同key相加
// int sum = 0;
int max = 0;
String name ="";
String lession = "";
// for (Student student:
// values) {
// sum += student.getScore();
// name = student.getStuname();
// }
// 求每门科目的最高分
for (Student student :
values) {
if(max<=student.getScore()){
max = student.getScore();
name = student.getStuname();
lession = student.getLession();
}
}
stu.setStuid(key.get());
stu.setScore(max);
stu.setStuname(name);
stu.setLession(lession);
System.out.println(stu.toString());
context.write(stu,NullWritable.get());
}
}
5.StudentDriver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class StudentDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(StudentDriver.class);
job.setMapperClass(StudentMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Student.class);
job.setReducerClass(StudentReduce.class);
job.setOutputKeyClass(Student.class);
job.setOutputValueClass(NullWritable.class);
// 指定路径
FileInputFormat.setInputPaths(job,
new Path("hdfs://lxm147:9000/bigdata/in/demo2/stuscore.csv"));
Path path = new Path("hdfs://lxm147:9000/bigdata/out2");
// 不指定路径
/* Path inpath = new Path(args[0]);
FileInputFormat.setInputPaths(job, inpath);
Path path = new Path(args[1]);*/
FileSystem fs = FileSystem.get(path.toUri(), configuration);
if (fs.exists(path)) {
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);
job.waitForCompletion(true);
}
}
(二)java代码中指定路径
1.maven项目编译并打包
分别双击compile和package
2.上传stuscore.csv到hdfs指定目录下
hdfs dfs -put /opt/stuscore.csv /bigdata/in/demo2
3.xftp上传target目录下的打包好的jar包上传到虚拟机
4.Hadoop运行hadoopstu-1.0-SNAPSHOT.jar
[root@lxm147 opt]# hadoop jar ./hadoopstu-1.1.0-SNAPSHOT.jar nj.zb.kb21.demo2.StudentDriver /bigdata/in/demo2/stuscore.csv /bigdata/out2
5.Hadoop运行结果
(三)java代码中不指定路径
1.StuudentDriver类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class StudentDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(StudentDriver.class);
job.setMapperClass(StudentMapper.class);
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(Student.class);
job.setReducerClass(StudentReduce.class);
job.setOutputKeyClass(Student.class);
job.setOutputValueClass(NullWritable.class);
// 指定路径
/*FileInputFormat.setInputPaths(job,
new Path("hdfs://lxm147:9000/bigdata/in/demo2/stuscore.csv"));
Path path = new Path("hdfs://lxm147:9000/bigdata/out2");*/
// 不指定路径
Path inpath = new Path(args[0]);
FileInputFormat.setInputPaths(job, inpath);
Path path = new Path(args[1]);
FileSystem fs = FileSystem.get(path.toUri(), configuration);
if (fs.exists(path)) {
fs.delete(path, true);
}
FileOutputFormat.setOutputPath(job, path);
job.waitForCompletion(true);
}
}
2.重新编译打包上传
为了方便区分,这里修改版本号再重新编译打包
3.HDFS命令执行该jar包
[root@lxm147 opt]# hadoop jar ./hadoopstu-1.1.0-SNAPSHOT.jar nj.zb.kb21.demo2.StudentDriver /bigdata/in/demo2/stuscore.csv /bigdata/out
4.查看运行结果
[root@lxm147 opt]# hdfs dfs -cat /bigdata/out/part-r-00000
Student{stuid=1, stuname='zs', score=226}
Student{stuid=2, stuname='ls', score=348}
Student{stuid=3, stuname='ww', score=346}
版权归原作者 雷神乐乐 所有, 如有侵权,请联系我们删除。