0


Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】

前言

  1. 昨天实验课试着做了一个 Spark SQL 小案例,发现好多内容还是没有掌握,以及好多书上没有的内容需要学习。

一、数据准备

csv 文件内容部分数据展示:

  1. PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
  2. 1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
  3. 2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
  4. 3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
  5. 4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
  6. 5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
  7. 6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q

字段说明

• PassengerId : 乘客编号。
• Survived : 是否存活,0表示未能存活,1表示存活。
• Pclass : 描述乘客所属的等级,总共分为三等,用1、2、3来描述:1表示高等;2表示中等;3表示低等。
• Name : 乘客姓名。
• Sex : 乘客性别。
• Age : 乘客年龄。
• SibSp : 与乘客同行的兄弟姐妹(Siblings)和配偶(Spouse)数目。
• Parch : 与乘客同行的家长(Parents)和孩子(Children)数目。
• Ticket : 乘客登船所使用的船票编号。
• Fare : 乘客上船的花费。
• Cabin : 乘客所住的船舱。
• Embarked : 乘客上船时的港口,C表示Cherbourg;Q表示Queenstown;S表示Southampton。

二、Spark数据预处理

1、通过读取本地文件生成 DataFrame 对象。

  1. // 创建 SparkSession 对象
  2. val conf = new SparkConf().setMaster("local[*]").setAppName("practice1")
  3. val spark = SparkSession.builder()
  4. .config(conf)
  5. .getOrCreate()
  6. // 导入隐式转换相关依赖
  7. import spark.implicits._
  8. // 读取csv文件生成 DataFrame 对象
  9. val df = spark.read.format("csv")
  10. .option("header","true")
  11. .option("mode","DROPMALFORMED")
  12. .load("data/practice1/titanic.csv")

2、修改字段类型

DataFrame 读取进来的都是 StringType 类型,我们需要对部分字段进行修改。

  1. 'withColumn'是一个DataFrame转换函数,用于在现有的DataFrame上添加或替换列。这个函数接收两个参数,第一个是新列的名称,第二个是新列的值。对于新列的值,我们使用 cast 方法将它强制转为一个新的类型。
  2. cast方法用于将一个数据类型的值转换为另一个数据类型。它可以用于将一种数据类型转换为另一种数据类型,例如将字符串转换为整数或将整数转换为浮点数等。

withColumn 作为一个转换函数会返回一个新的 DataFrame 对象,记得通过变量或常量存储起来。

  1. // 修改字段数据类型
  2. val md_df = df.withColumn("Pclass", df("Pclass").cast(IntegerType)) // 乘客登记 包括1-2-3三个等级
  3. .withColumn("Survived", df("Survived").cast(IntegerType)) //是否存活-1存活 0-未能存活
  4. .withColumn("Age", df("Age").cast(DoubleType)) // 年龄
  5. .withColumn("SibSp", df("SibSp").cast(IntegerType)) // 乘客的兄弟姐妹和配偶的数量
  6. .withColumn("Parch", df("Parch").cast(IntegerType)) //乘客的家长和孩子数目
  7. .withColumn("Fare", df("Fare").cast(DoubleType)) // 上传的花费

3、删除不必要的字段

  1. // 删除不必要的字段
  2. val df1 = md_df.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin")

4、缺失值处理

用到的函数:

DSL 语句中的 select、where函数,以及 count 、zip 函数。

涉及到的操作:

RDD 对象转为 DataFrame 对象,这里因为RDD对象的内容是元组,所以可以直接调用 toDF 方法。

统计缺失值
  1. // 缺失值处理
  2. val columns: Array[String] = df1.columns //返回df1的字段组成的数组 Array("字段1","字段2","字段3"...)
  3. // 通过select方法对字段数组中的每一个字段进行搜索,并通过where方法找出满足列col(字段).isNUll的值的count(个数)
  4. val missing_cnt: Array[Long] = columns.map(field => df1.select(col(field)).where(col(field).isNull).count())
  5. // 通过zip方法将两个集合数组合并成一个元组
  6. val tuples: Array[(Long, String)] = missing_cnt.zip(columns)
  7. // 把生成的元组读取为RDD对象再转为DataFrame对象
  8. val result_df: DataFrame = spark.sparkContext.parallelize(tuples).toDF("missing_cnt", "column_name")
  9. result_df.show() // 统计缺失值

统计结果展示:

  1. +-----------+-----------+
  2. |missing_cnt|column_name|
  3. +-----------+-----------+
  4. | 0| Survived|
  5. | 0| Pclass|
  6. | 0| Sex|
  7. | 177| Age|
  8. | 0| SibSp|
  9. | 0| Parch|
  10. | 0| Fare|
  11. | 2| Embarked|
  12. +-----------+-----------+
缺失值处理
  1. // 处理缺失值函数
  2. def meanAge(dataFrame: DataFrame): Double = {
  3. dataFrame.select("Age")
  4. .na.drop() //删除 Age 为空的行
  5. //'round' 函数用于将数字四舍五入到指定的小数位数。'mean' 函数则用于计算一组数值的平均值。
  6. .agg(round(mean("Age"), 0)) //对'Age'列计算平均值,并保留0位小数,也就是取整
  7. .first() //由于agg操作返回的是一个DataFrame,而这个DataFrame只有一行,所以使用first()方法获取这一行。
  8. .getDouble(0) //从结果行中获取第一个字段(索引为0)的值,并将其转换为Double类型。
  9. }

处理:

  1. val df2 = df1.na.fill(Map("Age" -> meanAge(df1), "Embarked" -> "S"))
  2. df2.show()

处理结果展示:

  1. +--------+------+------+----+-----+-----+-------+--------+
  2. |Survived|Pclass| Sex| Age|SibSp|Parch| Fare|Embarked|
  3. +--------+------+------+----+-----+-----+-------+--------+
  4. | 0| 3| male|22.0| 1| 0| 7.25| S|
  5. | 1| 1|female|38.0| 1| 0|71.2833| C|
  6. | 1| 3|female|26.0| 0| 0| 7.925| S|
  7. | 1| 1|female|35.0| 1| 0| 53.1| S|
  8. | 0| 3| male|35.0| 0| 0| 8.05| S|
  9. | 0| 3| male|30.0| 0| 0| 8.4583| Q|
  10. | 0| 1| male|54.0| 0| 0|51.8625| S|
  11. | 0| 3| male| 2.0| 3| 1| 21.075| S|
  12. | 1| 3|female|27.0| 0| 2|11.1333| S|
  13. | 1| 2|female|14.0| 1| 0|30.0708| C|
  14. | 1| 3|female| 4.0| 1| 1| 16.7| S|
  15. | 1| 1|female|58.0| 0| 0| 26.55| S|
  16. | 0| 3| male|20.0| 0| 0| 8.05| S|
  17. | 0| 3| male|39.0| 1| 5| 31.275| S|
  18. | 0| 3|female|14.0| 0| 0| 7.8542| S|
  19. | 1| 2|female|55.0| 0| 0| 16.0| S|
  20. | 0| 3| male| 2.0| 4| 1| 29.125| Q|
  21. | 1| 2| male|30.0| 0| 0| 13.0| S|
  22. | 0| 3|female|31.0| 1| 0| 18.0| S|
  23. | 1| 3|female|30.0| 0| 0| 7.225| C|
  24. +--------+------+------+----+-----+-----+-------+--------+
  25. only showing top 20 rows

三、Spark 数据分析

1、891人当中,共多少人生还?

  1. // 1.891人当中,共多少人生还?
  2. val survived_count: DataFrame = df2.groupBy("Survived").count()
  3. survived_count.show()
  4. //保存结果到本地
  5. survived_count.coalesce(1).write.option("header","true").csv("output/practice1/survived_count.csv")

运行结果:

  1. +--------+-----+
  2. |Survived|count|
  3. +--------+-----+
  4. | 1| 342|
  5. | 0| 549|
  6. +--------+-----+

2.不同上船港口生还情况

  1. // 2.不同上船港口生还情况
  2. val survived_embark = df2.groupBy("Embarked", "Survived").count()
  3. survived_embark.show()
  4. survived_embark.coalesce(1).write.option("header","true").csv("data/practice1survived_embark.csv")

运行结果:

  1. +--------+--------+-----+
  2. |Embarked|Survived|count|
  3. +--------+--------+-----+
  4. | Q| 1| 30|
  5. | S| 0| 427|
  6. | S| 1| 219|
  7. | C| 1| 93|
  8. | Q| 0| 47|
  9. | C| 0| 75|
  10. +--------+--------+-----+

3.存活/未存活的男女数量及比例

  1. // 3.存活/未存活的男女数量及比例
  2. val survived_sex_count=df2.groupBy("Sex","Survived").count()
  3. val survived_sex_percent=survived_sex_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
  4. survived_sex_percent.show()
  5. survived_sex_percent.coalesce(1).write.option("header", "true").csv("data/practice1/survived_sex_percent.csv")

运行结果:

  1. +------+--------+-----+--------+
  2. | Sex|Survived|count| percent|
  3. +------+--------+-----+--------+
  4. | male| 0| 468|52.52525|
  5. |female| 1| 233|26.15039|
  6. |female| 0| 81| 9.09091|
  7. | male| 1| 109|12.23345|
  8. +------+--------+-----+--------+

4. 不同级别乘客生还人数和占总生还人数的比例

  1. // 4. 不同级别乘客生还人数和占总生还人数的比例
  2. val survived_df = df2.filter(col("Survived")===1)
  3. val pclass_survived_count=survived_df.groupBy("Pclass").count()
  4. val pclass_survived_percent=pclass_survived_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
  5. pclass_survived_percent.show()
  6. pclass_survived_percent.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_survived_percent.csv")

运行结果:

  1. +------+-----+--------+
  2. |Pclass|count| percent|
  3. +------+-----+--------+
  4. | 1| 136|39.76608|
  5. | 3| 119|34.79532|
  6. | 2| 87|25.43860|
  7. +------+-----+--------+

5. 有无同行父母/孩子的生还情况

  1. // 5.有无同行父母/孩子的生还情况
  2. val df4=df2.withColumn("Parch_label",when(df2("Parch")>0,1).otherwise(0))
  3. val parch_survived_count=df4.groupBy("Parch_label","Survived").count()
  4. parch_survived_count.show()
  5. parch_survived_count.coalesce(1).write.option("header", "true").csv("data/practice1/parch_survived_count.csv")

运行结果:

  1. +-----------+--------+-----+
  2. |Parch_label|Survived|count|
  3. +-----------+--------+-----+
  4. | 1| 0| 104|
  5. | 1| 1| 109|
  6. | 0| 0| 445|
  7. | 0| 1| 233|
  8. +-----------+--------+-----+

6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况

  1. // 6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况
  2. val df3=survived_df.withColumn("Age_label",when(df2("Age")<=18,"minor").when(df2("Age")>18 && df2("Age")<=35,"young").when(df2("Age")>35 && df2("Age")<=55,"middle").otherwise("older"))
  3. val age_survived=df3.groupBy("Age_label","Survived").count()
  4. age_survived.show()
  5. age_survived.coalesce(1).write.option("header", "true").csv("data/practice1/age_survived.csv")

运行结果:

  1. +---------+--------+-----+
  2. |Age_label|Survived|count|
  3. +---------+--------+-----+
  4. | young| 1| 189|
  5. | older| 1| 12|
  6. | minor| 1| 70|
  7. | middle| 1| 71|
  8. +---------+--------+-----+

7. 提取乘客等级和上船费用信息

  1. // 7.提取乘客等级和上船费用信息
  2. val sef = Seq("Pclass", "Fare")
  3. val df5 = df2.select(sef.head, sef.tail: _*)
  4. df5.show(5)
  5. df5.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_fare.csv")

运行结果:

  1. +------+-------+
  2. |Pclass| Fare|
  3. +------+-------+
  4. | 3| 7.25|
  5. | 1|71.2833|
  6. | 3| 7.925|
  7. | 1| 53.1|
  8. | 3| 8.05|
  9. +------+-------+
  10. only showing top 5 rows

四、数据可视化

数据可视化部分打算在学完 R 语言再完成,Python 实现后续更新。

标签: 数据分析 spark

本文转载自: https://blog.csdn.net/m0_64261982/article/details/133064194
版权归原作者 让线程再跑一会 所有, 如有侵权,请联系我们删除。

“Spark SQL【基于泰坦尼克号生还数据的 Spark 数据分析处理】”的评论:

还没有评论