0


简单使用Spark、Scala完成对天气数据的指标统计

& 什么是Spark?

Spark最初由美国加州伯克利大学(UCBerkeley)的AMP(Algorithms, Machines and People)实验室于2009年开发,是基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序。Spark在诞生之初属于研究性项目,其诸多核心理念均源自学术研究论文。2013年,Spark加入Apache孵化器项目后,开始获得迅猛的发展,如今已成为Apache软件基金会最重要的三大分布式计算系统开源项目之一(即Hadoop、Spark、Storm)

& 什么是Scala

Scala是一门多范式的编程语言,一种类似java的编程语言,设计初衷是实现可伸缩的语言 、并集成面向对象编程和函数式编程的各种特性。


二、数据准备(数据类型的转换)

将天气数据进行转换,csv转json文件,相关代码如下:

(使用的python,相对简单)

代码如下:

  1. import csv
  2. import json
  3. import chardet
  4. csvFilePath = 'weather.csv'
  5. jsonFilePath = 'weather.json'
  6. # 检测文件编码
  7. with open(csvFilePath, 'rb') as file:
  8. result = chardet.detect(file.read())
  9. encoding = result['encoding']
  10. # 读取 CSV 文件并处理非 UTF-8 字符
  11. with open(csvFilePath, 'r', encoding=encoding, errors='replace') as csvFile:
  12. csvDict = csv.DictReader(csvFile)
  13. jsonData = json.dumps([row for row in csvDict], ensure_ascii=False)
  14. # 将 JSON 数据写入文件
  15. with open(jsonFilePath, 'w', encoding='utf-8') as jsonFile:
  16. jsonFile.write(jsonData)

转后的数据如下(仅展示部分数据):


建类:

  1. import java.io.Serializable;
  2. public class Weather implements Serializable {
  3. private String date;//日期
  4. private String week;//星期
  5. private String weather;//天气情况
  6. private String min_temperature;//最低温度
  7. private String max_temperature;//最高温度
  8. private String wind_direction;//风向
  9. private String wind_scale;//风力等级
  10. public String getDate() {
  11. return date;
  12. }
  13. public void setDate(String date) {
  14. this.date = date;
  15. }
  16. public String getWeek() {
  17. return week;
  18. }
  19. public void setWeek(String week) {
  20. this.week = week;
  21. }
  22. public String getWeather() {
  23. return weather;
  24. }
  25. public void setWeather(String weather) {
  26. this.weather = weather;
  27. }
  28. public String getMin_temperature() {
  29. return min_temperature;
  30. }
  31. public void setMin_temperature(String min_temperature) {
  32. this.min_temperature = min_temperature;
  33. }
  34. public String getMax_temperature() {
  35. return max_temperature;
  36. }
  37. public void setMax_temperature(String max_temperature) {
  38. this.max_temperature = max_temperature;
  39. }
  40. public String getWind_direction() {
  41. return wind_direction;
  42. }
  43. public void setWind_direction(String wind_direction) {
  44. this.wind_direction = wind_direction;
  45. }
  46. public String getWind_scale() {
  47. return wind_scale;
  48. }
  49. public void setWind_scale(String wind_scale) {
  50. this.wind_scale = wind_scale;
  51. }
  52. }

三、Spark部分

1、使用Spark完成数据中的“风级”,“风向”、“天气情况”相关指标统计及筛选

指标:风级、风向

代码如下:

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.sql.Dataset;
  3. import org.apache.spark.sql.Row;
  4. import org.apache.spark.sql.SparkSession;
  5. import static org.apache.spark.sql.functions.*;
  6. public class WeatherAnalysis {
  7. public static void main(String[] args) {
  8. SparkSession spark = SparkSession.builder()
  9. .appName("WeatherAnalysis")
  10. .master("local").getOrCreate();
  11. SparkConf conf = new SparkConf().setAppName("Weather2").setMaster("local");
  12. // 读取json数据
  13. Dataset<Row> weatherData = spark.read().json("D:\\weather.json");
  14. // 1. 统计出现次数最多的“风级“数量,降序,并输出控制台
  15. Dataset<Row> windScaleCount = weatherData.groupBy("wind_scale")
  16. .count().sort(desc("count"));
  17. windScaleCount.show();
  18. // 2. 统计出现次数最多的“风向“数量,降序,并输出控制台
  19. Dataset<Row> windDirectionCount = weatherData.groupBy("wind_direction")
  20. .count().sort(desc("count"));
  21. windDirectionCount.show();
  22. // 3. 筛选出风级等于2级且风向为“西北风” 的天气数据,并输出控制台
  23. weatherData.where(" wind_scale = '2级' and wind_direction = '西北风'").show();
  24. }
  25. }

运行结果如下:

指标:天气情况

代码如下:

  1. import org.apache.spark.SparkConf;
  2. import org.apache.spark.sql.Dataset;
  3. import org.apache.spark.sql.Row;
  4. import org.apache.spark.sql.SparkSession;
  5. import org.apache.spark.api.java.JavaSparkContext;
  6. import static org.apache.spark.sql.functions.*;
  7. public class Weather2 {
  8. public static void main(String[] args) {
  9. SparkConf conf = new SparkConf().setAppName("Weather2").setMaster("local");
  10. JavaSparkContext sc = new JavaSparkContext(conf);
  11. SparkSession spark = SparkSession.builder().appName("Weather2").master("local").getOrCreate();
  12. // 读取json数据
  13. Dataset<Row> weatherDF = spark.read().json("D:\\weather.json");
  14. //4、统计一年的各种“天气“情况出现频数,并输出控制台
  15. Dataset<Row> windDirectionCount = weatherDF.groupBy("weather")
  16. .count().sort(desc("count"));
  17. windDirectionCount.show();
  18. // 5. 统计一年的“天气“情况为”晴“的出现天数,并输出控制台。
  19. long sunnyDays = weatherDF.filter(col("weather").equalTo("晴")).count();
  20. System.out.println("一年的“天气“情况为”晴“的天数有: \n" + sunnyDays + "天");
  21. }
  22. }

运行结果如下:


四、Scala部分

1、使用Scala统计某月、全年的温差、平均气温以及最值等相关的指标

指标:温差、平均温、最值

代码如下:

  1. kage scala_weather
  2. import org.apache.spark.sql.SparkSession
  3. import org.apache.spark.sql.functions._
  4. import org.apache.spark.sql.types._
  5. object Weather1 {
  6. def main(args: Array[String]): Unit = {
  7. val spark = SparkSession.builder()
  8. .appName("WeatherAnalysis")
  9. .master("local")
  10. .getOrCreate()
  11. // 读取json数据
  12. val weatherDF = spark.read.json("D:\\weather.json")
  13. weatherDF.show()
  14. // 6. 统计3月份每天的温差,以及平均温度
  15. val marchWeatherDF = weatherDF.filter(month(to_date(col("date"), "yyyy/MM/dd")) === 3)
  16. val marchWeatherDiffDF = marchWeatherDF
  17. .withColumn("min_temp", regexp_replace(col("min_temperature"), "℃", "").cast(DoubleType))
  18. .withColumn("max_temp", regexp_replace(col("max_temperature"), "℃", "").cast(DoubleType))
  19. .withColumn("temp_diff", col("max_temp") - col("min_temp"))
  20. .groupBy("date")
  21. .agg(round(avg("min_temp"), 2).alias("avg_min_temp"), round(avg("max_temp"), 2).alias("avg_max_temp"), round(avg("temp_diff"), 2).alias("avg_temp_diff"))
  22. .orderBy("date")
  23. println("3月份每天的温差以及平均温度:")
  24. marchWeatherDiffDF.show()
  25. // 7. 统计全年的温差
  26. val yearWeatherDiffDF = weatherDF
  27. .withColumn("min_temp", regexp_replace(col("min_temperature"), "℃", "").cast(DoubleType))
  28. .withColumn("max_temp", regexp_replace(col("max_temperature"), "℃", "").cast(DoubleType))
  29. .withColumn("temp_diff", col("max_temp") - col("min_temp"))
  30. .agg(round(avg("temp_diff"), 2).alias("avg_temp_diff"))
  31. println("全年的温差平均值:")
  32. yearWeatherDiffDF.show()
  33. //8. 统计1月份每天的最高气温
  34. val januaryWeatherDF = weatherDF.filter(month(to_date(col("date"), "yyyy/MM/dd")) === 1)
  35. val januaryMaxTempDF = januaryWeatherDF
  36. .withColumn("max_temp", regexp_replace(col("max_temperature"), "℃", "").cast(DoubleType))
  37. .select("date","max_temp")
  38. .orderBy("date","max_temp")
  39. .orderBy("date")
  40. println("1月份每天的最高气温:")
  41. januaryMaxTempDF.show()
  42. spark.stop()
  43. }
  44. }

运行结果如下:


五、遇到的问题:

1、json文件转换成功,但是使用ider进行数据的读取时,返回显示无法解析的错误

(使用了3种方法进行文件的转换。。。以下展示2种文件格式对比)

解决

  1. // 1 ider可以解析的json文件的样式:
  2. [{"date": "2019/1/1", "week": " 星期二 ", "weather": "晴", "min_temperature": "1℃", "max_temperature": "6℃", "wind_direction": "西北风", "wind_scale": "2级"}
  3. // 2 无法解析的json文件样式:(看着样式和上面的区别不大,但是我运行时提示无法解析。。。不太懂。。。可能空格问题?)
  4. [
  5. {
  6. "weather": "晴",
  7. "date": "2019/1/1",
  8. "week": " 星期二 ",
  9. "min_temperature": "1℃",
  10. "max_temperature": "6℃",
  11. "wind_scale": "2级",
  12. "wind_direction": "西北风"
  13. },

2、尝试读取json文件时,返回文件路径不存在的问题(使用相对路径,绝对路径均无用)

解决:我把json文件移动到根目录下,就成功读取到了。。。

3、当创建Scala类时,找不到创建Scala类或者Scala项目的选项(Scala已经安装并已完全部署好、插件、包均已导入)

解决:打开“文件”--->“项目结构”--->“平台设置”---->“全局库”,把Scala包再重新导入就可以啦

4、 编写Scala代码来统计气温的时候,使用 "$" 符号,提示 $不是StringContext的成员(插件已经安装,Scala包也已经导入)

解决:将 "$" 换成col,再将字段()起来就可以啦


六、总结

  1. 学习SparkScala编程可以帮助我们处理大规模数据,进行数据分析。使用SparkScala编写程序可以提高数据处理的效率和灵活性,同时还能够充分发挥分布式计算的优势。通过学习这两门技术,我们可以更好地理解数据处理的流程和原理,并且可以应用到实际的数据分析和统计工作中。总而言之,学习SparkScala编程是提升数据处理能力和数据分析技能的重要途径。
标签: spark scala 大数据

本文转载自: https://blog.csdn.net/m0_65046090/article/details/134517739
版权归原作者 db_lcz_2014 所有, 如有侵权,请联系我们删除。

“简单使用Spark、Scala完成对天气数据的指标统计”的评论:

还没有评论