需求概述
我们需要从给定的数据中,统计每一年出现的最高温度。该需求可以通过 MapReduce 实现,处理格式化的气象数据,提取年份和温度,计算出每一年的最高温度。
业务分析
我们有大量气象记录数据,数据格式如下:
0151234567890123456789012345678901234567890123456789012345678901234567890123456789012345
YYYYMMdd[TIME] [TEMPERATURE] ...
测试数据:
0029029070999991901010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999999
0029029070999991901010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+01231+99999102001ADDGF108991999999999999999999
0029029070999991910010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+01501+99999102001ADDGF108991999999999999999999
0029029070999991910010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9-00231+99999102001ADDGF108991999999999999999999
0029029070999991920010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+02451+99999102001ADDGF108991999999999999999999
- 年份信息在第 15 到 18 位。[15,18]
- 温度数据在第 88 到 91 位 [88,91],如果温度为9999为无效温度。
- 第87位为温度的符号(正负)
- 第92位用作数据合法性检查,标记为
0 1 4 5 9
时才是有效的温度记录。
通过编写 MapReduce 作业,
Mapper
阶段负责提取年份和温度,
Reducer
阶段对同一年的温度进行汇总,最终输出每一年的最高温度。
代码附注释
1.
TempMap.java
—— Mapper 类
package com.dxd.dxd;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TempMap extends Mapper<LongWritable, Text, Text, IntWritable>{
// 定义输出的键值对类型
private Text k2;
private IntWritable v2;
// 初始化方法,自动在执行map方法时调用,非必须, 延迟内存分配的时间, 提高程序的启动速度
// 该方法可省略,直接定义+创建对象即可,如下
//private Text k2 = new Text();
//private IntWritable v2 = new IntWritable();
@Override
protected void setup(Context context) throws IOException, InterruptedException{
k2 = new Text();
v2 = new IntWritable();
}
// map 方法:提取数据中的年份和温度信息,并将其写入 context
@Override
protected void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException{
// 为方便按index处理该行数据,转化为String类
String line = value.toString();
// 提取年份,数据从 15 到 19 位,不包含第19位,字符串索引第一位为0
String year = line.substring(15, 19);
//double temperature = Double.parseDouble(line.substring(87,92));
//↑ 若为浮点型
// 提取温度,数据从 87 到 92 位,转化为整数,不包含第92位
int temperature = Integer.parseInt(line.substring(87, 92));
// 提取质量标记,数据从 92 到 93 位,不包含第93位
String check = line.substring(92, 93);
//check.matches("[]")
//[01459] ---- check="0hello87hhh" ---- true
//只要有0 1 4 5 9 即可返回true
//[^01459] ---- ^ == 非 ---- check="0hello87hhh" ---- false
// 过滤无效数据,温度为 9999 或质量标记不符合要求时跳过
if (Math.abs(temperature) == 9999 || check.matches("[^01459]")) {
return;
}
// 设置输出的键和值
k2.set(year); // 键为年份
v2.set(temperature); // 值为温度
// 将年份和温度作为键值对输出
//需处理异常:Unhandled exceptions: java.io.IOException, java.lang.InterruptedException
context.write(k2, v2);
//context..write(new Text(year),new IntWritable(temperature));可替换上面三行
}
}
代码思路:
- 该
Mapper
类接收每一行数据,提取其中的年份和温度,并进行简单的过滤(剔除无效数据)。 - 我们在
setup
中初始化键值对对象,以提高效率(避免每次调用map
都创建新对象)。 - 如果温度无效(如标记为
9999
或质量标记不符合要求),则跳过该数据。否则,提取有效的年份和温度输出给Reducer
。
2.
TempReduce.java
—— Reducer 类
package com.dxd.dxd;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class TempReduce extends Reducer<Text, IntWritable, Text, IntWritable> {
// reduce 方法:接收每个年份对应的多个温度值,求出最高温度
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int max = Integer.MIN_VALUE;
// 遍历该年份的所有温度,找到最大值
for (IntWritable value : values) {
max = Math.max(max, value.get());
}
// 将年份和最高温度输出
//需处理异常:Unhandled exceptions: java.io.IOException, java.lang.InterruptedException
context.write(key, new IntWritable(max));
}
}
代码思路:
Reducer
负责接收每个年份的温度列表,使用Math.max
函数逐一比较,找出该年份的最高温度。- 最终将年份和最高温度写入输出。
3.
TempDriver.java
—— Driver 类
package com.dxd.dxd;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class TempDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
// 设置用户权限
System.setProperty("HADOOP_USER_NAME", "root");
// 配置作业
Configuration configuration = new Configuration();//需处理异常:Unhandled exception:java.io.IOException
Job job = Job.getInstance(configuration);
// 指定包含MapReduce作业代码的JAR文件的主类 即包含main()方法的类
job.setJarByClass(TempDriver.class);
// 设置 Mapper、Reducer 类
job.setMapperClass(TempMap.class);
job.setReducerClass(TempReduce.class);
// 设置 Combiner 类,减少数据传输量(可选)
job.setCombinerClass(TempReduce.class);
// 设置最终输出的键值类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 设置输入输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交作业并等待完成,该行有Exception需处理
System.exit(job.waitForCompletion(true) ? 0 : 1);
//Unhandled exceptions: java.lang.InterruptedException, java.lang.ClassNotFoundException
}
}
代码思路:
Driver
类负责配置和运行整个 MapReduce 作业。- 通过
job.setMapperClass()
和job.setReducerClass()
设置Mapper
和Reducer
。 job.setCombinerClass()
选项用于本地聚合,减少数据传输量,提高效率。- 最后,通过
FileInputFormat
和FileOutputFormat
指定输入和输出路径。
补充:
System.exit(job.waitForCompletion(true) ? 0 : 1); 是什么意思?
System.exit(job.waitForCompletion(true) ? 0 : 1);
是在 Hadoop MapReduce 程序的
Driver
类中用于提交作业并控制作业执行结果的代码。让我们逐步分析这行代码的含义。
1. **
job.waitForCompletion(true)
**
- 该方法会启动 MapReduce 作业并等待其完成。它返回一个布尔值,表示作业是否成功执行。
- 参数
true
表示在作业运行期间会在控制台输出详细的进度信息,也就是作业的状态跟踪。
具体过程:
- MapReduce 作业开始执行,
waitForCompletion(true)
会阻塞程序,直到作业完成。 - 如果作业执行成功,
waitForCompletion(true)
返回true
。 - 如果作业执行失败,则返回
false
。
2. **
? 0 : 1
(三元运算符)**
- 这是 Java 中的三元条件运算符。格式如下:
condition ? valueIfTrue : valueIfFalse;
-condition
:布尔表达式,判断结果为true
或false
。-valueIfTrue
:如果condition
为true
,则返回该值。-valueIfFalse
:如果condition
为false
,则返回该值。
在这里:
- 如果
job.waitForCompletion(true)
返回true
(即作业执行成功),则返回0
。 - 如果
job.waitForCompletion(true)
返回false
(即作业执行失败),则返回1
。
3. **
System.exit(status)
**
System.exit(int status)
是一个用于终止 Java 应用程序的命令。它会关闭当前 Java 虚拟机 (JVM)。- 参数
status
是一个整数,表示程序的退出状态码: -0
:通常表示程序正常退出(无错误)。- 非0
值:表示程序异常退出或有错误。例如,1
通常表示作业失败。
这行代码的完整含义是:
- 程序调用
job.waitForCompletion(true)
提交并等待 MapReduce 作业的执行结果。 - 如果作业成功,
job.waitForCompletion(true)
返回true
,然后? 0 : 1
选择返回0
,表示程序成功退出。 - 如果作业失败,
job.waitForCompletion(true)
返回false
,? 0 : 1
返回1
,表示程序异常退出。 - 最后通过
System.exit(0)
或System.exit(1)
来终止整个 Java 程序。
为什么使用
System.exit()
?
在 Hadoop 作业中,通常使用
System.exit()
来确保整个应用程序在作业完成后终止,返回的状态码可以作为操作系统判断程序是否成功执行的依据:
- 0 表示成功,可以触发后续的处理任务。
- 1 表示失败,可能会触发错误处理机制或发出警报。
总结:
System.exit(job.waitForCompletion(true) ? 0 : 1);
- 作用:等待 MapReduce 作业完成,并根据作业的执行结果退出 Java 程序。
- 含义: -
job.waitForCompletion(true)
提交作业并等待作业完成。- 根据作业是否成功,返回0
或1
。-System.exit()
用于终止程序,并传递作业的成功或失败状态。
最终结果
版权归原作者 辛小贝达尔比 所有, 如有侵权,请联系我们删除。