前言
昨天实验课试着做了一个 Spark SQL 小案例,发现好多内容还是没有掌握,以及好多书上没有的内容需要学习。
一、数据准备
csv 文件内容部分数据展示:
PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked
1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S
2,1,1,"Cumings, Mrs. John Bradley (Florence Briggs Thayer)",female,38,1,0,PC 17599,71.2833,C85,C
3,1,3,"Heikkinen, Miss. Laina",female,26,0,0,STON/O2. 3101282,7.925,,S
4,1,1,"Futrelle, Mrs. Jacques Heath (Lily May Peel)",female,35,1,0,113803,53.1,C123,S
5,0,3,"Allen, Mr. William Henry",male,35,0,0,373450,8.05,,S
6,0,3,"Moran, Mr. James",male,,0,0,330877,8.4583,,Q
字段说明
• PassengerId : 乘客编号。
• Survived : 是否存活,0表示未能存活,1表示存活。
• Pclass : 描述乘客所属的等级,总共分为三等,用1、2、3来描述:1表示高等;2表示中等;3表示低等。
• Name : 乘客姓名。
• Sex : 乘客性别。
• Age : 乘客年龄。
• SibSp : 与乘客同行的兄弟姐妹(Siblings)和配偶(Spouse)数目。
• Parch : 与乘客同行的家长(Parents)和孩子(Children)数目。
• Ticket : 乘客登船所使用的船票编号。
• Fare : 乘客上船的花费。
• Cabin : 乘客所住的船舱。
• Embarked : 乘客上船时的港口,C表示Cherbourg;Q表示Queenstown;S表示Southampton。
二、Spark数据预处理
1、通过读取本地文件生成 DataFrame 对象。
// 创建 SparkSession 对象
val conf = new SparkConf().setMaster("local[*]").setAppName("practice1")
val spark = SparkSession.builder()
.config(conf)
.getOrCreate()
// 导入隐式转换相关依赖
import spark.implicits._
// 读取csv文件生成 DataFrame 对象
val df = spark.read.format("csv")
.option("header","true")
.option("mode","DROPMALFORMED")
.load("data/practice1/titanic.csv")
2、修改字段类型
DataFrame 读取进来的都是 StringType 类型,我们需要对部分字段进行修改。
'withColumn'是一个DataFrame转换函数,用于在现有的DataFrame上添加或替换列。这个函数接收两个参数,第一个是新列的名称,第二个是新列的值。对于新列的值,我们使用 cast 方法将它强制转为一个新的类型。
cast方法用于将一个数据类型的值转换为另一个数据类型。它可以用于将一种数据类型转换为另一种数据类型,例如将字符串转换为整数或将整数转换为浮点数等。
withColumn 作为一个转换函数会返回一个新的 DataFrame 对象,记得通过变量或常量存储起来。
// 修改字段数据类型
val md_df = df.withColumn("Pclass", df("Pclass").cast(IntegerType)) // 乘客登记 包括1-2-3三个等级
.withColumn("Survived", df("Survived").cast(IntegerType)) //是否存活-1存活 0-未能存活
.withColumn("Age", df("Age").cast(DoubleType)) // 年龄
.withColumn("SibSp", df("SibSp").cast(IntegerType)) // 乘客的兄弟姐妹和配偶的数量
.withColumn("Parch", df("Parch").cast(IntegerType)) //乘客的家长和孩子数目
.withColumn("Fare", df("Fare").cast(DoubleType)) // 上传的花费
3、删除不必要的字段
// 删除不必要的字段
val df1 = md_df.drop("PassengerId").drop("Name").drop("Ticket").drop("Cabin")
4、缺失值处理
用到的函数:
DSL 语句中的 select、where函数,以及 count 、zip 函数。
涉及到的操作:
RDD 对象转为 DataFrame 对象,这里因为RDD对象的内容是元组,所以可以直接调用 toDF 方法。
统计缺失值
// 缺失值处理
val columns: Array[String] = df1.columns //返回df1的字段组成的数组 Array("字段1","字段2","字段3"...)
// 通过select方法对字段数组中的每一个字段进行搜索,并通过where方法找出满足列col(字段).isNUll的值的count(个数)
val missing_cnt: Array[Long] = columns.map(field => df1.select(col(field)).where(col(field).isNull).count())
// 通过zip方法将两个集合数组合并成一个元组
val tuples: Array[(Long, String)] = missing_cnt.zip(columns)
// 把生成的元组读取为RDD对象再转为DataFrame对象
val result_df: DataFrame = spark.sparkContext.parallelize(tuples).toDF("missing_cnt", "column_name")
result_df.show() // 统计缺失值
统计结果展示:
+-----------+-----------+
|missing_cnt|column_name|
+-----------+-----------+
| 0| Survived|
| 0| Pclass|
| 0| Sex|
| 177| Age|
| 0| SibSp|
| 0| Parch|
| 0| Fare|
| 2| Embarked|
+-----------+-----------+
缺失值处理
// 处理缺失值函数
def meanAge(dataFrame: DataFrame): Double = {
dataFrame.select("Age")
.na.drop() //删除 Age 为空的行
//'round' 函数用于将数字四舍五入到指定的小数位数。'mean' 函数则用于计算一组数值的平均值。
.agg(round(mean("Age"), 0)) //对'Age'列计算平均值,并保留0位小数,也就是取整
.first() //由于agg操作返回的是一个DataFrame,而这个DataFrame只有一行,所以使用first()方法获取这一行。
.getDouble(0) //从结果行中获取第一个字段(索引为0)的值,并将其转换为Double类型。
}
处理:
val df2 = df1.na.fill(Map("Age" -> meanAge(df1), "Embarked" -> "S"))
df2.show()
处理结果展示:
+--------+------+------+----+-----+-----+-------+--------+
|Survived|Pclass| Sex| Age|SibSp|Parch| Fare|Embarked|
+--------+------+------+----+-----+-----+-------+--------+
| 0| 3| male|22.0| 1| 0| 7.25| S|
| 1| 1|female|38.0| 1| 0|71.2833| C|
| 1| 3|female|26.0| 0| 0| 7.925| S|
| 1| 1|female|35.0| 1| 0| 53.1| S|
| 0| 3| male|35.0| 0| 0| 8.05| S|
| 0| 3| male|30.0| 0| 0| 8.4583| Q|
| 0| 1| male|54.0| 0| 0|51.8625| S|
| 0| 3| male| 2.0| 3| 1| 21.075| S|
| 1| 3|female|27.0| 0| 2|11.1333| S|
| 1| 2|female|14.0| 1| 0|30.0708| C|
| 1| 3|female| 4.0| 1| 1| 16.7| S|
| 1| 1|female|58.0| 0| 0| 26.55| S|
| 0| 3| male|20.0| 0| 0| 8.05| S|
| 0| 3| male|39.0| 1| 5| 31.275| S|
| 0| 3|female|14.0| 0| 0| 7.8542| S|
| 1| 2|female|55.0| 0| 0| 16.0| S|
| 0| 3| male| 2.0| 4| 1| 29.125| Q|
| 1| 2| male|30.0| 0| 0| 13.0| S|
| 0| 3|female|31.0| 1| 0| 18.0| S|
| 1| 3|female|30.0| 0| 0| 7.225| C|
+--------+------+------+----+-----+-----+-------+--------+
only showing top 20 rows
三、Spark 数据分析
1、891人当中,共多少人生还?
// 1.891人当中,共多少人生还?
val survived_count: DataFrame = df2.groupBy("Survived").count()
survived_count.show()
//保存结果到本地
survived_count.coalesce(1).write.option("header","true").csv("output/practice1/survived_count.csv")
运行结果:
+--------+-----+
|Survived|count|
+--------+-----+
| 1| 342|
| 0| 549|
+--------+-----+
2.不同上船港口生还情况
// 2.不同上船港口生还情况
val survived_embark = df2.groupBy("Embarked", "Survived").count()
survived_embark.show()
survived_embark.coalesce(1).write.option("header","true").csv("data/practice1survived_embark.csv")
运行结果:
+--------+--------+-----+
|Embarked|Survived|count|
+--------+--------+-----+
| Q| 1| 30|
| S| 0| 427|
| S| 1| 219|
| C| 1| 93|
| Q| 0| 47|
| C| 0| 75|
+--------+--------+-----+
3.存活/未存活的男女数量及比例
// 3.存活/未存活的男女数量及比例
val survived_sex_count=df2.groupBy("Sex","Survived").count()
val survived_sex_percent=survived_sex_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
survived_sex_percent.show()
survived_sex_percent.coalesce(1).write.option("header", "true").csv("data/practice1/survived_sex_percent.csv")
运行结果:
+------+--------+-----+--------+
| Sex|Survived|count| percent|
+------+--------+-----+--------+
| male| 0| 468|52.52525|
|female| 1| 233|26.15039|
|female| 0| 81| 9.09091|
| male| 1| 109|12.23345|
+------+--------+-----+--------+
4. 不同级别乘客生还人数和占总生还人数的比例
// 4. 不同级别乘客生还人数和占总生还人数的比例
val survived_df = df2.filter(col("Survived")===1)
val pclass_survived_count=survived_df.groupBy("Pclass").count()
val pclass_survived_percent=pclass_survived_count.withColumn("percent",format_number(col("count").divide(functions.sum("count").over()).multiply(100),5));
pclass_survived_percent.show()
pclass_survived_percent.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_survived_percent.csv")
运行结果:
+------+-----+--------+
|Pclass|count| percent|
+------+-----+--------+
| 1| 136|39.76608|
| 3| 119|34.79532|
| 2| 87|25.43860|
+------+-----+--------+
5. 有无同行父母/孩子的生还情况
// 5.有无同行父母/孩子的生还情况
val df4=df2.withColumn("Parch_label",when(df2("Parch")>0,1).otherwise(0))
val parch_survived_count=df4.groupBy("Parch_label","Survived").count()
parch_survived_count.show()
parch_survived_count.coalesce(1).write.option("header", "true").csv("data/practice1/parch_survived_count.csv")
运行结果:
+-----------+--------+-----+
|Parch_label|Survived|count|
+-----------+--------+-----+
| 1| 0| 104|
| 1| 1| 109|
| 0| 0| 445|
| 0| 1| 233|
+-----------+--------+-----+
6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况
// 6.按照年龄,将乘客划分为未成年人、青年人、中年人和老年人,分析四个群体生还情况
val df3=survived_df.withColumn("Age_label",when(df2("Age")<=18,"minor").when(df2("Age")>18 && df2("Age")<=35,"young").when(df2("Age")>35 && df2("Age")<=55,"middle").otherwise("older"))
val age_survived=df3.groupBy("Age_label","Survived").count()
age_survived.show()
age_survived.coalesce(1).write.option("header", "true").csv("data/practice1/age_survived.csv")
运行结果:
+---------+--------+-----+
|Age_label|Survived|count|
+---------+--------+-----+
| young| 1| 189|
| older| 1| 12|
| minor| 1| 70|
| middle| 1| 71|
+---------+--------+-----+
7. 提取乘客等级和上船费用信息
// 7.提取乘客等级和上船费用信息
val sef = Seq("Pclass", "Fare")
val df5 = df2.select(sef.head, sef.tail: _*)
df5.show(5)
df5.coalesce(1).write.option("header", "true").csv("data/practice1/pclass_fare.csv")
运行结果:
+------+-------+
|Pclass| Fare|
+------+-------+
| 3| 7.25|
| 1|71.2833|
| 3| 7.925|
| 1| 53.1|
| 3| 8.05|
+------+-------+
only showing top 5 rows
四、数据可视化
数据可视化部分打算在学完 R 语言再完成,Python 实现后续更新。
版权归原作者 让线程再跑一会 所有, 如有侵权,请联系我们删除。