0


统计全球每年的最高气温和最低气温

文章目录


数据准备

先启动hadoop,然后打开eclipse

1.下载数据

打开虚拟机从:ftp://ftp.ncdc.noaa.gov/pub/data/gsod 上下载2014~2016年的数据
在这里插入图片描述

2.处理数据

1.在linux本地目录中创建一个文件夹ncdc
cd /usr/loacl
madir /data
cd /usr/loacl/data
mkdir /ncdc
2.去下载的目录,然后把数据复制到ncdc中
在这里插入图片描述
3.解压文件gzip -d *.op.gz
在这里插入图片描述
4.输出数据到data.txt
在/usr/local/data目录下创建一个文件data.txt
cd /usr/local/data
vim data.txt
在这里插入图片描述
cd /usr/local/data/ncdc
回到ncdc目录执行命令 来删除所有文件的首行字段sed -i ‘1d’ *
在这里插入图片描述
cat *>>…/data.txt
在这里插入图片描述
查看了一下数据
在这里插入图片描述
5.上传到hdfs中
hdfs dfs -put data.txt /user/temperature
在这里插入图片描述

一、统计全球每年的最高气温和最低气温

实现思路
(1)自定义一个数据类型YearMaxTAndMinT来继承Writable接口,在这个类里定义字符串类型的变量year,定义double类型的变量maxTemp和minTemp,获取get()和set()方法。
(2)创建一个Mapper,命名为MaxTAndMinTMapper,获取年份和气温,年份为key,气温为value输出
(3)创建一个Combiner,命名为MaxTAndMinTCombiner,获取年份最高气温和最低气温,年份为key,气温为value输出
(4)创建一个Reducer,命名为MaxTAndMinTReducer,也是获取年份最高气温和最低气温,并创建一个YearMaxTAndMinT对象,按最高气温和最低气温分别设置maxTemp和minTemp的值。将YearMaxTAndMinT最为value,NullWritable.get()作为key输出
(5)创建运行代码,创建一个驱动类MaxTAndMinT.java
代码如下:

1.YearMaxTAndMinT

packagetemperature;importjava.io.DataInput;importjava.io.DataOutput;importjava.io.IOException;importorg.apache.hadoop.io.Writable;publicclassYearMaxTAndMinTimplementsWritable{privateString year;privatedouble maxTemp;privatedouble minTemp;publicYearMaxTAndMinT(){}publicStringgetYear(){return year;}publicvoidsetYear(String year){this.year = year;}publicdoublegetMaxTemp(){return maxTemp;}publicvoidsetMaxTemp(double maxTemp){this.maxTemp = maxTemp;}publicdoublegetMinTemp(){return minTemp;}publicvoidsetMinTemp(double minTemp){this.minTemp = minTemp;}@OverridepublicvoidreadFields(DataInput in)throwsIOException{this.year=in.readUTF();this.maxTemp=in.readDouble();this.minTemp=in.readDouble();}@Overridepublicvoidwrite(DataOutput out)throwsIOException{
                   out.writeUTF(year);
                   out.writeDouble(maxTemp);
                   out.writeDouble(minTemp);}@OverridepublicStringtoString(){returnthis.year+"\t"+this.maxTemp+"\t"+this.minTemp;}}

2.Mapper:MaxTAndMinTMapper

packagetemperature;importjava.io.IOException;importorg.apache.hadoop.io.DoubleWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;publicclassMaxTAndMinTMapperextendsMapper<LongWritable,Text,Text,DoubleWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,DoubleWritable>.Context context)throwsIOException,InterruptedException{String line = value.toString().trim();int[] indexs =getIndexs(line);// 获取年份和气温数据的索引范围String year =getYear(line, indexs);// 获取年份double airTemperature;String temperature =getTemperature(line, indexs);if(temperature.charAt(0)=='-'){// 每行数据中带 - 号的气温数据做负数处理
            airTemperature =0-Double.parseDouble(temperature.substring(1));// 获取气温数值}else{
            airTemperature =Double.parseDouble(temperature);// 获取气温数值}
        context.write(newText(year),newDoubleWritable(airTemperature));}//获取年份publicStringgetYear(String line,int[] indexs){return line.substring(indexs[1], indexs[2]).replace(" ","").substring(0,4);}//获取气温publicStringgetTemperature(String line,int[] indexs){return line.substring(indexs[2],indexs[3]).replace(" ","");}//获取年份和气温的索引范围publicint[]getIndexs(String line){int[] indexs =newint[4];int n=0;for(int i=0;i < line.length();i++){if(line.charAt(i)==' '){if(line.charAt(i+1)!=' '){

                                 indexs[n++]=i+1;}if(n ==4){break;}}}return indexs;}}

3.Combiner:MaxTAndMinTCombiner

packagetemperature;importjava.io.IOException;importorg.apache.hadoop.io.DoubleWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;publicclassMaxTAndMinTCombinerextendsReducer<Text,DoubleWritable,Text,DoubleWritable>{@Overrideprotectedvoidreduce(Text key,Iterable<DoubleWritable> values,Reducer<Text,DoubleWritable,Text,DoubleWritable>.Context context)throwsIOException,InterruptedException{double maxValue =Double.MIN_VALUE;// 获取整形最大值double minValue=Double.MAX_VALUE;// 获取最小值for(DoubleWritable value : values){
            maxValue =Math.max(maxValue, value.get());// 获取最高温度
            minValue=Math.min(minValue, value.get());// 获取最低温度}
        context.write(key,newDoubleWritable(maxValue));
        context.write(key,newDoubleWritable(minValue));}}

4.Reducer:MaxTAndMinTReducer

packagetemperature;importjava.io.IOException;importorg.apache.hadoop.io.DoubleWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;publicclassMaxTAndMinTReducerextendsReducer<Text,DoubleWritable,NullWritable,YearMaxTAndMinT>{privateYearMaxTAndMinT year_max_min=newYearMaxTAndMinT();@Overrideprotectedvoidreduce(Text key,Iterable<DoubleWritable> values,Reducer<Text,DoubleWritable,NullWritable,YearMaxTAndMinT>.Context context)throwsIOException,InterruptedException{double maxValue =Double.MIN_VALUE;// 获取整形最大值double minValue=Double.MAX_VALUE;// 获取最小值for(DoubleWritable value : values){
            maxValue =Math.max(maxValue, value.get());// 获取最高温度
            minValue=Math.min(minValue, value.get());}
       year_max_min.setYear(key.toString());
       year_max_min.setMaxTemp(maxValue);
       year_max_min.setMinTemp(minValue);    
        context.write(NullWritable.get(),year_max_min);}}

5.运行代码:MaxTAndMinT.java

packagetemperature;importjava.io.IOException;importjava.nio.file.FileSystem;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.DoubleWritable;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;importorg.apache.hadoop.util.Tool;importorg.apache.hadoop.util.ToolRunner;importtest.JarUtil;publicclassMaxTAndMinTextendsConfiguredimplementsTool{publicintrun(String[] args)throwsException{Configuration conf =getConf();JobMaxTAndMinTJob=Job.getInstance(conf,"max min");//重要:指定本job所在的jar包MaxTAndMinTJob.setJarByClass(MaxTAndMinT.class);//设置wordCountJob所用的mapper逻辑类为哪个类MaxTAndMinTJob.setMapperClass(MaxTAndMinTMapper.class);//设置wordCountJob所用的reducer逻辑类为哪个类MaxTAndMinTJob.setReducerClass(MaxTAndMinTReducer.class);//设置map阶段输出的kv数据类型MaxTAndMinTJob.setMapOutputKeyClass(Text.class);MaxTAndMinTJob.setMapOutputValueClass(DoubleWritable.class);//设置最终输出的kv数据类型MaxTAndMinTJob.setOutputKeyClass(NullWritable.class);MaxTAndMinTJob.setOutputValueClass(YearMaxTAndMinT.class);MaxTAndMinTJob.setCombinerClass(MaxTAndMinTCombiner.class);//设置要处理的文本数据所存放的路径FileInputFormat.setInputPaths(MaxTAndMinTJob,newPath("hdfs://master:8020/user/temperature/data.txt"));FileOutputFormat.setOutputPath(MaxTAndMinTJob,newPath("hdfs://master:8020/user/output/temperature"));//提交job给hadoop集群MaxTAndMinTJob.waitForCompletion(true);return0;}publicstaticvoidmain(String[] args)throwsException{//获取当前环境变量Configuration conf =newConfiguration();
        conf.set("fs.defaultFS","hdfs://master:8020");// 指定namenode//使用ToolRunner的run方法对自定义的类型进行处理
        conf.set("mapreduce.job.jar",JarUtil.jar(MaxTAndMinT.class));try{ToolRunner.run(conf,newMaxTAndMinT(), args);}catch(Exception e){
            e.printStackTrace();}}}

6.输出结果

在这里插入图片描述

二、筛选气温在15~25°C之间的数据

一般来说,最适合人类生存的温度是15~25°C,所以我们要筛选出这些数据
代码如下:

1.MTAMTMapper类

packagebetween;importjava.io.IOException;importorg.apache.hadoop.io.DoubleWritable;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.LongWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Mapper.Context;publicclassMTAMTMapperextendsMapper<LongWritable,Text,Text,DoubleWritable>{@Overrideprotectedvoidmap(LongWritable key,Text value,Mapper<LongWritable,Text,Text,DoubleWritable>.Context context)throwsIOException,InterruptedException{String line = value.toString().trim();int[] indexs =getIndexs(line);// 获取年份和气温数据的索引范围String year =getYear(line, indexs);// 获取年份double airTemperature;String temperature =getTemperature(line, indexs);if(temperature.charAt(0)=='-'){// 每行数据中带 - 号的气温数据做负数处理
       airTemperature =0-Double.parseDouble(temperature.substring(1));// 获取气温数值}else{
       airTemperature =Double.parseDouble(temperature);// 获取气温数值}
   context.write(newText(year),newDoubleWritable(airTemperature));}//获取年份publicStringgetYear(String line,int[] indexs){return line.substring(indexs[1], indexs[2]).replace(" ","").substring(0,4);}//获取气温publicStringgetTemperature(String line,int[] indexs){return line.substring(indexs[2],indexs[3]).replace(" ","");}//获取年份和气温的索引范围publicint[]getIndexs(String line){int[] indexs =newint[4];int n=0;for(int i=0;i < line.length();i++){if(line.charAt(i)==' '){if(line.charAt(i+1)!=' '){

                            indexs[n++]=i+1;}if(n ==4){break;}}}return indexs;}}

2.MTAMTReducer类

packagebetween;importjava.io.IOException;importorg.apache.hadoop.io.DoubleWritable;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.Reducer.Context;publicclassMTAMTReducerextendsReducer<Text,DoubleWritable,Text,DoubleWritable>{protectedvoidreduce(Text key,Iterable<DoubleWritable> values,Context context)throwsIOException,InterruptedException{String date=key.toString();for(DoubleWritable value:values){if(value.get()>=15&& value.get()<=25){if(date.contains("2014")|| date.contains("2015")|| date.contains("2016")){
                context.write(key,newDoubleWritable(value.get()));}}}}}

3.MTAMT类

packagebetween;importjava.io.IOException;importjava.util.StringTokenizer;importorg.apache.hadoop.conf.Configuration;importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.DoubleWritable;importorg.apache.hadoop.io.IntWritable;importorg.apache.hadoop.io.NullWritable;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.output.FileOutputFormat;importorg.apache.hadoop.util.GenericOptionsParser;importorg.apache.hadoop.util.Tool;importorg.apache.hadoop.util.ToolRunner;importtest.JarUtil;publicclass MTAMT extendsConfiguredimplementsTool{@Overridepublicintrun(String[] args)throwsException{Configuration conf =getConf();JobMTAMTJob=Job.getInstance(conf,"15~25°C");//重要:指定本job所在的jar包MTAMTJob.setJarByClass(MTAMT.class);//设置wordCountJob所用的mapper逻辑类为哪个类MTAMTJob.setMapperClass(MTAMTMapper.class);//设置wordCountJob所用的reducer逻辑类为哪个类MTAMTJob.setReducerClass(MTAMTReducer.class);//设置map阶段输出的kv数据类型MTAMTJob.setMapOutputKeyClass(Text.class);MTAMTJob.setMapOutputValueClass(DoubleWritable.class);//设置最终输出的kv数据类型MTAMTJob.setOutputKeyClass(Text.class);MTAMTJob.setOutputValueClass(DoubleWritable.class);MTAMTJob.setNumReduceTasks(2);//设置要处理的文本数据所存放的路径FileInputFormat.setInputPaths(MTAMTJob,newPath("hdfs://master:8020/user/temperature/data.txt"));FileOutputFormat.setOutputPath(MTAMTJob,newPath("hdfs://master:8020/user/output/temperature2"));//提交job给hadoop集群MTAMTJob.waitForCompletion(true);return0;}publicstaticvoidmain(String[] args)throwsException{//获取当前环境变量Configuration conf =newConfiguration();
        conf.set("fs.defaultFS","hdfs://master:8020");//使用ToolRunner的run方法对自定义的类型进行处理
        conf.set("mapreduce.job.jar",JarUtil.jar(MTAMT.class));try{ToolRunner.run(conf,newMTAMT(), args);}catch(Exception e){
            e.printStackTrace();}}}

4.输出结果

在这里插入图片描述

总结

以上就是全部内容,欢迎大家在评论区讨论

标签: hadoop mapreduce linux

本文转载自: https://blog.csdn.net/BIPT919/article/details/124796189
版权归原作者 不太聪明的学渣 所有, 如有侵权,请联系我们删除。

“统计全球每年的最高气温和最低气温”的评论:

还没有评论