0


使用 Hadoop MapReduce 实现历年最高温度统计

需求概述

我们需要从给定的数据中,统计每一年出现的最高温度。该需求可以通过 MapReduce 实现,处理格式化的气象数据,提取年份和温度,计算出每一年的最高温度。

业务分析

我们有大量气象记录数据,数据格式如下:

0151234567890123456789012345678901234567890123456789012345678901234567890123456789012345
YYYYMMdd[TIME] [TEMPERATURE] ...

测试数据:

0029029070999991901010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9-00781+99999102001ADDGF108991999999999999999999999
0029029070999991901010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+01231+99999102001ADDGF108991999999999999999999
0029029070999991910010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+01501+99999102001ADDGF108991999999999999999999
0029029070999991910010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9-00231+99999102001ADDGF108991999999999999999999
0029029070999991920010106004+64333+023450FM12+000599999V0202701N015919999999N0000001N9+02451+99999102001ADDGF108991999999999999999999
  • 年份信息在第 15 到 18 位。[15,18]
  • 温度数据在第 88 到 91 位 [88,91],如果温度为9999为无效温度。
  • 第87位为温度的符号(正负)
  • 第92位用作数据合法性检查,标记为 0 1 4 5 9 时才是有效的温度记录。

通过编写 MapReduce 作业,

Mapper

阶段负责提取年份和温度,

Reducer

阶段对同一年的温度进行汇总,最终输出每一年的最高温度。

代码附注释

1.
TempMap.java

—— Mapper 类

package com.dxd.dxd;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TempMap extends Mapper<LongWritable, Text, Text, IntWritable>{

    // 定义输出的键值对类型
    private Text k2;
    private IntWritable v2;

    // 初始化方法,自动在执行map方法时调用,非必须, 延迟内存分配的时间, 提高程序的启动速度
    // 该方法可省略,直接定义+创建对象即可,如下

    //private Text k2 = new Text();
    //private IntWritable v2 = new IntWritable();
    
    @Override
    protected void setup(Context context) throws IOException, InterruptedException{
        k2 = new Text();
        v2 = new IntWritable();
    }

    // map 方法:提取数据中的年份和温度信息,并将其写入 context
    @Override
    protected void map(LongWritable key,Text value, Context context) throws IOException, InterruptedException{
        // 为方便按index处理该行数据,转化为String类
        String line = value.toString();
        
        // 提取年份,数据从 15 到 19 位,不包含第19位,字符串索引第一位为0
        String year = line.substring(15, 19);
        //double temperature = Double.parseDouble(line.substring(87,92));
        //↑ 若为浮点型
        
        // 提取温度,数据从 87 到 92 位,转化为整数,不包含第92位
        int temperature = Integer.parseInt(line.substring(87, 92));

        // 提取质量标记,数据从 92 到 93 位,不包含第93位
        String check = line.substring(92, 93);

        //check.matches("[]")
        //[01459] ---- check="0hello87hhh" ---- true
        //只要有0 1 4 5 9 即可返回true
        //[^01459] ---- ^ == 非 ---- check="0hello87hhh" ---- false

        // 过滤无效数据,温度为 9999 或质量标记不符合要求时跳过
        if (Math.abs(temperature) == 9999 || check.matches("[^01459]")) {
            return;
        }

        // 设置输出的键和值
        k2.set(year);              // 键为年份
        v2.set(temperature);       // 值为温度

        // 将年份和温度作为键值对输出
        //需处理异常:Unhandled exceptions: java.io.IOException, java.lang.InterruptedException
        context.write(k2, v2);

        //context..write(new Text(year),new IntWritable(temperature));可替换上面三行
    }
}
代码思路
  • Mapper 类接收每一行数据,提取其中的年份和温度,并进行简单的过滤(剔除无效数据)。
  • 我们在 setup 中初始化键值对对象,以提高效率(避免每次调用 map 都创建新对象)。
  • 如果温度无效(如标记为 9999 或质量标记不符合要求),则跳过该数据。否则,提取有效的年份和温度输出给 Reducer
2.
TempReduce.java

—— Reducer 类

package com.dxd.dxd;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class TempReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    // reduce 方法:接收每个年份对应的多个温度值,求出最高温度
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int max = Integer.MIN_VALUE;

        // 遍历该年份的所有温度,找到最大值
        for (IntWritable value : values) {
            max = Math.max(max, value.get());
        }

        // 将年份和最高温度输出
        //需处理异常:Unhandled exceptions: java.io.IOException, java.lang.InterruptedException
        context.write(key, new IntWritable(max));
    }
}
代码思路
  • Reducer 负责接收每个年份的温度列表,使用 Math.max 函数逐一比较,找出该年份的最高温度。
  • 最终将年份和最高温度写入输出。
3.
TempDriver.java

—— Driver 类

package com.dxd.dxd;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TempDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        // 设置用户权限
        System.setProperty("HADOOP_USER_NAME", "root");

        // 配置作业
        Configuration configuration = new Configuration();//需处理异常:Unhandled exception:java.io.IOException
        Job job = Job.getInstance(configuration);
        
        // 指定包含MapReduce作业代码的JAR文件的主类 即包含main()方法的类
        job.setJarByClass(TempDriver.class);

        // 设置 Mapper、Reducer 类
        job.setMapperClass(TempMap.class);
        job.setReducerClass(TempReduce.class);

        // 设置 Combiner 类,减少数据传输量(可选)
        job.setCombinerClass(TempReduce.class);

        // 设置最终输出的键值类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 设置输入输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 提交作业并等待完成,该行有Exception需处理
        System.exit(job.waitForCompletion(true) ? 0 : 1);
        //Unhandled exceptions: java.lang.InterruptedException, java.lang.ClassNotFoundException
        
    }
}
代码思路
  • Driver 类负责配置和运行整个 MapReduce 作业。
  • 通过 job.setMapperClass()job.setReducerClass() 设置 MapperReducer
  • job.setCombinerClass() 选项用于本地聚合,减少数据传输量,提高效率。
  • 最后,通过 FileInputFormatFileOutputFormat 指定输入和输出路径。

补充:

System.exit(job.waitForCompletion(true) ? 0 : 1); 是什么意思?

System.exit(job.waitForCompletion(true) ? 0 : 1); 

是在 Hadoop MapReduce 程序的

Driver

类中用于提交作业并控制作业执行结果的代码。让我们逐步分析这行代码的含义。

1. **

job.waitForCompletion(true)

**

  • 该方法会启动 MapReduce 作业并等待其完成。它返回一个布尔值,表示作业是否成功执行。
  • 参数 true 表示在作业运行期间会在控制台输出详细的进度信息,也就是作业的状态跟踪

具体过程

  • MapReduce 作业开始执行,waitForCompletion(true) 会阻塞程序,直到作业完成。
  • 如果作业执行成功,waitForCompletion(true) 返回 true
  • 如果作业执行失败,则返回 false

2. **

? 0 : 1

(三元运算符)**

  • 这是 Java 中的三元条件运算符。格式如下: condition ? valueIfTrue : valueIfFalse;- condition:布尔表达式,判断结果为 truefalse。- valueIfTrue:如果 conditiontrue,则返回该值。- valueIfFalse:如果 conditionfalse,则返回该值。

在这里:

  • 如果 job.waitForCompletion(true) 返回 true(即作业执行成功),则返回 0
  • 如果 job.waitForCompletion(true) 返回 false(即作业执行失败),则返回 1

3. **

System.exit(status)

**

  • System.exit(int status) 是一个用于终止 Java 应用程序的命令。它会关闭当前 Java 虚拟机 (JVM)。
  • 参数 status 是一个整数,表示程序的退出状态码: - 0:通常表示程序正常退出(无错误)。- 0:表示程序异常退出或有错误。例如,1 通常表示作业失败。

这行代码的完整含义是:

  • 程序调用 job.waitForCompletion(true) 提交并等待 MapReduce 作业的执行结果。
  • 如果作业成功,job.waitForCompletion(true) 返回 true,然后 ? 0 : 1 选择返回 0,表示程序成功退出。
  • 如果作业失败,job.waitForCompletion(true) 返回 false? 0 : 1 返回 1,表示程序异常退出。
  • 最后通过 System.exit(0)System.exit(1) 来终止整个 Java 程序。

为什么使用

System.exit()

在 Hadoop 作业中,通常使用

System.exit()

来确保整个应用程序在作业完成后终止,返回的状态码可以作为操作系统判断程序是否成功执行的依据:

  • 0 表示成功,可以触发后续的处理任务。
  • 1 表示失败,可能会触发错误处理机制或发出警报。

总结:

System.exit(job.waitForCompletion(true) ? 0 : 1); 
  • 作用:等待 MapReduce 作业完成,并根据作业的执行结果退出 Java 程序。
  • 含义: - job.waitForCompletion(true) 提交作业并等待作业完成。- 根据作业是否成功,返回 01。- System.exit() 用于终止程序,并传递作业的成功或失败状态。

最终结果


本文转载自: https://blog.csdn.net/DXD1012/article/details/142962874
版权归原作者 辛小贝达尔比 所有, 如有侵权,请联系我们删除。

“使用 Hadoop MapReduce 实现历年最高温度统计”的评论:

还没有评论