文章目录
一、连接Spark集群
初始化一个Spark应用程序,配置其名称(以便在Spark Web UI中识别)和连接到指定的Spark集群(通过spark://192.168.126.10:7077)。创建SparkSession对象,作为Spark应用程序的单一入口点,提供了对Spark SQL和DataFrame API的访问。
代码如下:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import mean,max, col, length, substring, count, avg
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
conf = SparkConf().setAppName("PySpark的基本操作").setMaster('spark://192.168.126.10:7077')
sc = SparkContext.getOrCreate(conf)
spark = SparkSession(sc)
二、创建数据框DataFrame
创建PySpark DataFrame数据框。创建3张数据框(数据表),分别为:学生表(stu),课程信息表(cou),学生选课表(sc)。
学生表(stu)
课程信息表(cou)
学生选课表(sc)
代码如下:
# *********************创建数据框DataFrame*************************# 1.学生信息表
da =[('202301','张三','男','大一',20,1.72,'广西'),('202302','李四','女','大二',21,1.70,'广西'),('202303','王小五','男','大三',20,1.60,'湖南'),('202304','小马','男','大一',19,1.65,'四川'),('202305','小张','女','大二',None,1.68,'贵州'),('202306','小李','男','大三',22,1.68,'福建'),('202307','明明','女','大三',23,1.62,'广西'),('202308','王霆锋','女','大一',20,None,'四川'),('202303','王小五','男','大三',20,1.60,'湖南'),]
col =['学号','姓名','性别','年级','年龄','身高','籍贯']
stu = spark.createDataFrame(data=da, schema=col)# col = ['sno','name', 'sex', 'grade', 'age', 'height','birthplace']
stu.show()
stu.printSchema()# 查看表结构
stu.describe().show()# stu.summary().show()# 2.课程信息表
da2 =[('s01','大数据开发技术',32,2),('s02','数据库原理',48,3),('s03','机器学习',48,3)]
col2 =['课程号','课程名','学时','学分']
cou = spark.createDataFrame(data=da2, schema=col2)
cou.printSchema()
cou.show()# 3.学生选课表
da3 =[('202301','s01',89),('202301','s02',95),('202302','s02',98),('202303','s03',100),('202304','s01',96),('202305','s03',99),('202306','s03',94),('202307','s01',93),]
col3 =['学号','课程号','成绩']
schema = StructType([
StructField("学号", StringType(),True),# 学号字段是字符串类型,且可以为null
StructField("课程号", StringType(),True),
StructField("成绩", IntegerType(),True)])# 成绩字段是整数类型,且可以为null
sc = spark.createDataFrame(data=da3, schema=schema)
sc.printSchema()# 打印表结构
sc.show()
三、Spark SQL的基本用法
可以通过SQL方式操作Spark中的数据,需要先将DataFrame对象映射出一个表名,然后通过表名进行各类操作。如要对学生信息表(stu)操作,先将stu映射为student表,再操作。
代码如下:
print('*************SQL操作***********')
stu.createOrReplaceTempView("student")# 需要先将DataFrame对象stu映射出一个表名student,然后通过表名进行各类操作
stu2 = spark.sql("SELECT * FROM student")
stu2.show()
stu3 = spark.sql("SELECT `姓名`, `性别`, `年龄` FROM student")# 需要反引号(键盘左上角波浪线那个按键)来包围中文列名,英文列名不需要反引号。
stu3.show()
PySpark SQL可以对中文列名操作,但(键盘左上角波浪线那个按键)来包围中文列名,英文列名不需要反引号。
1. 一般查询及去重 (DISTINCT )
查询的命令和SQL语句是一样的。
学生表中,王小五有两条重复的记录,需要去重,使用DISTINCT 命令。
代码如下:
spark.sql("SELECT DISTINCT * FROM student ").show()# 去除重复值(记录)
spark.sql("SELECT DISTINCT `年级` FROM student").show()# 去除 年级 的重复值,可以看出有几个年级。
2. 条件查询(WHERE )
使用WHERE实现条件查询,结合LIKE实现模糊查询,结合REGEXP实现正则匹配查询。
# 2.条件查询
spark.sql("SELECT `姓名`, `性别`, `年龄` FROM student WHERE `年龄`>21").show()# 查询年龄大于21岁的学生
spark.sql("SELECT `姓名`, `性别`, `年龄` ,`籍贯` FROM student ""WHERE `籍贯`='广西' AND `性别`='女'").show()# 查询籍贯为广西的女生
spark.sql("SELECT `姓名` FROM student WHERE `姓名` LIKE '%小%'").show()# LIKE模糊查询,查询含“小”的姓名
spark.sql("SELECT * FROM student WHERE `姓名` REGEXP '^王.+(五|锋)$'").show()# 正则匹配查询,查询姓名第一个为王,最后一个为五或峰,且中间至少有一个字的学生
spark.sql("SELECT `姓名`, `性别`, `年龄` FROM student WHERE `年龄` IS NULL ").show()# 查询年龄为空值的学生
3. 聚合函数(aggregation function)
使用聚合函数(aggregation function),统计总和(sum)、平均数(mean)、最大值(max)、最小值(min)。聚合函数会对一组值进行统计并返回统计结果
# 3.聚合函数(aggregation function),统计总和、平均数、最大值、最小值。聚合函数会对一组值进行统计并返回统计结果
spark.sql("SELECT COUNT(*) AS `总人数` FROM student").show()# 统计学生总人数
spark.sql("SELECT MAX(`年龄`) AS `最大年龄`,AVG(`身高`) AS `平均身高`,""SUM(`年龄`) AS `总年龄` FROM student").show()# 统计年龄的最大值、总值,身高的平均值。
4. 分组 查询(GROUP BY)
在分组查询(GROUP BY)。除聚合函数外,SELECT语句中的每个列都必须在GROUP BY子句中给出。如:如果需要再查询结果中显示性别和年级,则GROUP BY子句必须给出’性别’和’年级’。如:spark.sql("SELECT
性别
,
年级
, COUNT(*) AS
人数
FROM student GROUP BY
性别
,
年级
").show() ,则查询结果会有性别和年级两列数据。
代码如下:
# 4.分组查询(GROUP BY)。除聚合函数外,SELECT语句中的每个列都必须在GROUP BY子句中给出。
spark.sql("SELECT `性别`, COUNT(*) AS `人数` FROM student GROUP BY `性别` ").show()# 查询不同性别的学生人数
spark.sql("SELECT `性别`, AVG(`年龄`) AS `平均年龄`,MAX(`身高`) AS `最高身高` ""FROM student ""GROUP BY `性别` ").show()# 按性别分组,求每组平均年龄,最高身高。# 对分组查询结果进行过滤。where分组前过滤,having分组后过滤
spark.sql("SELECT `性别`, COUNT(*) AS `人数` FROM student GROUP BY `性别` HAVING COUNT(*)>4").show()# 查询性别人数大于4的
where和having:都是对分组查询结果进行过滤。where分组前过滤,having分组后过滤
5.排序(ORDER BY)
使用排序(ORDER BY)排序,ASC升序,DESC降序。
# 5.排序(ORDER BY)。
spark.sql("SELECT * FROM student ORDER BY `年龄` ASC").show()# 按年龄升序ASC排序,DESC为降序
spark.sql("SELECT * FROM student ORDER BY `年龄` ASC, `身高` DESC").show()# 按年龄升序ASC排序,身高DESC降序
6. 多表联查
先用createOrReplaceTempView命令,把DataFrame对象映射出一个表名。
# 6.多表连接查询
cou.createOrReplaceTempView('course')# 将DataFrame对象映射出一个表名
sc.createOrReplaceTempView('stuCourse')# 将DataFrame对象映射出一个表名
spark.sql("SELECT student.`姓名`,stuCourse.`课程号`,stuCourse.`成绩` ""FROM student, stuCourse ""WHERE student.`学号`=stuCourse.`学号`").show()# 连表查询姓名,课程号,成绩
spark.sql("SELECT S.`姓名`,C.`课程名`,SC.`成绩` ""FROM student AS S,course AS C,stuCourse AS SC ""WHERE S.`学号`=SC.`学号` AND C.`课程号`=SC.`课程号`").show()# 连表查询姓名,课程名,成绩
7. udf函数
在Apache Spark中,UDF(User-Defined Function,用户自定义函数)允许扩展Spark SQL的功能,使其能够执行在标准SQL中不直接支持的复杂操作。UDF可以是用Scala、Java或Python等语言编写的函数,然后可以在Spark SQL查询中像调用内置函数一样调用它们。
先使用spark.udf.register注册UDF,然后再调用。
如下所示,如计算学生表姓名列的长度。先注册UDF函数,注册名为strLen,再使用UDF函数。
代码如下:
# 用udf函数
spark.udf.register("strLen",lambda x:len(x))# 注册udf函数,实现Spark SQL 自定义函数。计算某列元素的长度
spark.sql("SELECT *, strLen(`姓名`) AS len FROM student").show()# 计算姓名的长度,并新增一列len。
查询结果如下图:
四、DataFrame基本操作
除了用Spark SQL对DataFrame对象进行操作外,DataFrame自身也支持各类数据操作。
1.一般查询(select,selectExpr)
select()方法用于选择特定列生成新的DataFrame。如果一个DataFrame列字段太多,只需查看某些列内容,可以使用select()方法。
selectExpr()方法当作SQL查询,既可以用于查看指定列,还可以对选定的列进行特殊处理,如改列名、取绝对值、四舍五入等,最终返回新的DataFrame。
# 1.一般查询(select,selectExpr),其中,selectExpr可对列进行特殊处理,如列相加、计算、绝对值、四舍五入。它被当作SQL查询
stu2 = stu.select('姓名','性别','年龄')# 查询学生的姓名,性别,年龄
stu2.show()
stu.selectExpr("`姓名` ","ROUND( `身高`,1) AS RoundedHeight").show()# selectExpr被当作SQL查询。取身高的四舍五入,保留1位小数。
stu.selectExpr("`姓名`","`年龄`","`年龄`+10 AS AddAge").show()# 年龄增加10岁。
2. 条件查询(where,filter)
根据指定条件筛选数据,where()和filter()方法都可以用于条件查询。与select()方法、contains()方法或正则匹配查询方法rlike()结合,可实现更复杂的查询功能。
# 2. 条件查询。where,filter,效果一样。
stu.select('姓名','性别','年龄').where(stu['年龄']>21).show()# 查询年龄大于21岁的学生
stu.select("*").where((stu['籍贯']=='广西')&(stu.性别 =='女')).show()# 查询籍贯为广西的女生信息
stu.where(stu["姓名"].contains("小")).show()# 查询含“小”的姓名
stu.where(stu["姓名"].rlike(r'^王.+(五|锋)$')).show()# 正则匹配查询,查询姓名第一个为王,最后一个为五或峰,且中间至少有一个字的学生
3. 聚合(agg)
agg()方法用于聚合操作,用于部分列的统计,也可以与groupBy()方法组合使用,从而实现分组统计的功能。
用agg()方法做聚合时,常用的统计方法有:mean()、max()、min()、sum()等,这些方法来自pyspark.sql.functions类,需要先导入。
alias()方法为重命名列方法。
# 3. 聚合(agg)
stu.agg({'姓名':'count'}).show()# 统计学生人数
stu.agg(count('姓名').alias('学生人数')).show()# 统计学生人数,并重命名为:学生人数
stu.agg({'年龄':'max','身高':'avg'}).show()# 统计年龄的最大值,身高的平均值。
stu.agg(max('年龄').alias('最大年龄'), avg('身高').alias('平均身高')).show()# 求最大年龄,平均身高,并重命名列名
4. 分组(groupBy)
groupBy()方法可以根据指定的字段进行分组,在groupBy()方法之后,通常使用统计方法进行计算,如:count()(总和,仅用于数值型字段),mean()、max()、min()、sum()等。
# 4. 分组(groupBy),根据指定的字段分组
stu.groupBy('性别').count().show()# 查询不同性别的学生人数。求每组的总数
stu.groupBy('性别').mean('年龄').show()# 按性别分组,求每组平均年龄# groupBy与agg组合使用,实现分组统计功能。可以重命名统计的列名
stu.groupBy('性别').agg(mean('年龄').alias('平均年龄')).show()# 按性别分组,求每组平均年龄,并重命名列名为平均年龄
stu.groupBy('性别').agg(mean('年龄').alias('平均年龄'),max('身高').alias('最高身高')).show()# 按性别分组,求每组平均年龄,最高身高。
5. 排序(orderBy)
# 5.排序(orderBy)
stu.orderBy(stu.年龄.asc()).show()# 按照年龄升序排序
stu.orderBy(stu.年龄.asc(), stu['身高'].desc()).show()# 按照年龄升序排序,如果年龄相同,再按照身高降序排序
6. 数据去重(distinct)
去重是数据预处理中的重要环节。
**distinct()**方法用于删除重复行,返回不包含重复记录的DataFrame。可以结合count()方法来判断数据是否有重复。
dropDuplicates()方法可以根据指定的字段进行去重操作。
# 6.数据去重print('所有列名:', stu.columns)# 获取所有列名# 去重。distinct,dropDuplicates(可指定字段去重)print("去重前的数据行数:", stu.count())
stu.distinct().show()# 去除重复记录print("去重后的数据行数:", stu.distinct().count())
stu.dropDuplicates(['籍贯']).select('籍贯').show()# 去除 籍贯相同的记录,并选择查看籍贯,可以看出学生来自哪些省份。
7. 缺失值处理(dropna,fillna)
缺失值指的是现有数据集中某个或某些属性的值是不完全的,存在空缺。处理缺失值是数据预处理时必不可少的环节。处理缺失值的方法有:删除、填充等。
**dropna()**方法可以删除含有缺失值的记录。只要某行记录有一个缺失值,则该行记录被删除。
**fillna()**方法可以用其他值填充缺失值。
先用summary()方法查看数据的基本信息,如下图所示。发现年龄和身高两列只有8条记录,比其他列的9条记录少,存在缺失值。
代码如下:
# 方法1:删除缺失值
stu.summary().show()# 查看表数据的基本信息。发现年龄和身高只有8条记录,而其他列为9条。因此年龄和身高各有1个缺失值。print(stu.年龄.isNull())# 判断年龄是否有缺失值。
stu.dropna().show()# 删除缺失值,查看数据。删除含有缺失值的记录# 方法2:填充缺失值
avg_age = stu.agg(mean("年龄")).collect()[0][0]# 先求年龄的平均值
max_height = stu.agg(max("身高")).collect()[0][0]# 先求身高的最大值
stu.fillna({'年龄': avg_age,'身高': max_height}).show()# 用年龄平均值、身高最大值填充对应的列的缺失值。
上面的代码中,collect()方法返回一个列表,用于获取DataFrame的所有记录,并将DataFrame中每行的数据以Row形式完整地展示出来。如:stu.agg(mean(“年龄”)).collect(),返回如下数据:
**collect()**:返回的 DataFrame 只包含一行一列(即“年龄”的平均值),所以 collect() 方法将返回一个包含单个元素的列表,这个元素本身也是一个列表(或 Row 对象),它包含了平均值。
[0][0]: 这是一个索引操作,用于从 collect() 方法返回的列表中提取数据。[0] 提取了列表中的第一个(也是唯一一个)元素(它本身也是一个列表或 Row 对象),然后 [0] 再次被用来从这个内部列表中提取第一个(也是唯一一个)元素,即“年龄”的平均值。
用聚合函数agg()和collect()方法计算出平均年龄和最高身高 后,再用fillna()函数填充相应的缺失值。
8. 新增列(withColumn),重命名列(withColumnRenamed)
# 7. 新增列(withColumn)、重命名列(withColumnRenamed)
stu2 = stu.withColumn('AddAge', stu['年龄']+10)# 新增一列:AddAge,年龄+10
stu2.show()
stu2.withColumnRenamed('AddAge','新增年龄').show()# 将AddAge列名改为:新增年龄
9. udf函数
与Spark SQL注册udf函数有所不同,请注意区分。
from pyspark.sql.functions import udf
# 创建一个 UDF,它接受一个字符串并返回其长度(整数)
mystrlen = udf(lambda x:len(x)if x isnotNoneelse0, IntegerType())# 使用 UDF 在 stu DataFrame 上添加一个新列 'len',该列包含 '姓名' 列中每个字符串的长度
stu.withColumn("len", mystrlen(stu['姓名'])).show()
10. 其他常用方法(describe、printSchema)
describe():统计DataFrame数值型字段的信息,包括记录条数、平均值、样本标准差、最小值、最大值等。
printSchema():以树状格式输出DataFrame的模式信息,输出结果中有DataFrame的列名称、数据类型以及该数据字段的值是否可以为空。
# 统计DataFrame数值型字段的信息,包括记录条数、平均值、样本标准差、最小值、最大值等。
stu.describe().show()# 以树状格式输出DataFrame的模式信息,输出结果中有DataFrame的列名称、数据类型以及该数据字段的值是否可以为空。
stu.printSchema()
输出结果如下图所示:
五、 多表连接查询(join)
1. 内连接:inner
内连接:inner,把两张表中互相匹配的行选择出来
# 学生信息表:stu,课程信息表:cou,学生选课表:sc
stu.join(sc, stu.学号 == sc.学号) \
.select(stu.姓名, sc.课程号, sc['成绩']).show()# 连接学生信息表:stu和学生选课表:sc。 查询姓名,课程号,成绩# 三表连接
stu2 = stu.join(sc, stu.学号 == sc.学号,'inner')# 1.先连接stu表和sc表。默认内连接inner
stu3 = stu2.join(cou, stu2.课程号 == cou.课程号)# 2.再连接cou表
stu3.select('姓名','课程名','成绩').show()# 3.查询学生 姓名,课程名,成绩
三表内连接后的数据如下:
2. 外连接
full/full_outer:这种join就是把两张表的所有记录选择出来,如果一张表里有对应数据,另一张表里没有对应数据,就用NULL代替。
left/left_outer:这种join就是把左边的表的所有行都取出来,如果右边表有匹配的行,就用匹配的行,如果右边表没有匹配的行,就用NULL代替。
right/right_outer:这种join就是把右边的表的所有行都取出来,如果左边表有匹配的行,就用匹配的行,如果左边表没有匹配的行,就用NULL代替。
left_semi:这种join就是把左边表中能和右表中的行匹配的行取出来,只取左边表的记录。
left_anti:这种join是把左边表中不能和右边表中的行匹配的行取出来,也是只取左边表中的行。
cross:这种join就是把左边表中的所有行和右边表的所有行做乘积,相当于左边表中的每一行都和右边表中的所有行组合一次,即左边表×右边表。
六、小案例:分析哪种水果季节性最强
有四张水果销售数据表,分别是春季、夏季、秋季和冬季的水果销售情况。先对每个季度水果的销售量排名,然后取出每个季度销量最高的前3种水果做成一张表,分析这些水果中,哪种水果的季节性最强。所谓季节性,即有的季节销量非常高,而有的季节销量非常低。可以先计算四个季度之间的排名差,排名差最大的水果,就是季节性最强的水果。
1.创建春季、夏季、秋季和冬季的水果销量表
如下图所示:
代码如下:
# -----------------小案例:计算哪种水果是季节性最强的----------------------------------------from pyspark.sql import Window
from pyspark.sql.functions import*# 导入 functions的所有函数,包括:lit,col,min,max等
data =[('香蕉',90,6),('苹果',50,8),('雪梨',80,7),('葡萄',150,4),('龙眼',100,5),('荔枝',200,1),('西瓜',180,2),('榴莲',170,3),('蓝莓',15,9),('草莓',13,10),('橙子',12,11)]# 创建数据
spring = spark.createDataFrame(data,['水果','销量','排名'])# spring,summer,autumn,winter
spring = spring.withColumn("季节", lit("spring"))# 增加一列:季节
spring.show()
data =[('香蕉',70,5),('苹果',40,8),('雪梨',60,6),('葡萄',50,7),('龙眼',150,3),('荔枝',160,2),('西瓜',200,1),('榴莲',100,4),('蓝莓',15,9),('草莓',13,10),('橙子',12,11)]# 创建数据
summer = spark.createDataFrame(data,['水果','销量','排名'])# spring,summer,autumn,winter
summer = summer.withColumn("季节", lit("summer"))
summer.show()
data =[('香蕉',40),('苹果',150),('雪梨',120),('葡萄',60),('龙眼',20),('荔枝',30),('西瓜',50),('榴莲',35),('蓝莓',15),('草莓',13),('橙子',12)]
autumn = spark.createDataFrame(data,['水果','销量'])import pyspark.sql.functions as F
# windowSpec = Window.orderBy(autumn['销量'].desc()) # 创建一个 WindowSpec 对象,按销量降序排序。全局排序,会有警告信息提示没有分区
windowSpec = Window.partitionBy(F.lit(1)).orderBy(autumn['销量'].desc())# 按水果分区,可以均衡集群计算负载,避免警告信息
autumn = autumn.withColumn("排名", row_number().over(windowSpec))# 使用 row_number() 函数在窗口上生成排名,并添加到新的列 '排名'
autumn = autumn.withColumn("季节", lit("autumn"))
autumn.show()
data =[('香蕉',30),('苹果',200),('雪梨',182),('葡萄',50),('龙眼',20),('荔枝',20),('西瓜',30),('榴莲',35),('蓝莓',15),('草莓',13),('橙子',12)]
winter = spark.createDataFrame(data,['水果','销量'])
windowSpec = Window.partitionBy(F.lit(1)).orderBy(winter['销量'].desc())# 使用 F.lit(1) 创建一个常量分区以及WindowSpec 对象。也可以消除警告
winter = winter.withColumn("排名", dense_rank().over(windowSpec))# dense_rank()在遇到相同值时给出相同的排名,
winter = winter.withColumn("季节", lit("winter"))
winter.show()# 合并四个季节的数据,计算全年每类水果的总销量
all_seasons = spring.union(summer).union(autumn).union(winter)# 合并所有季节的DataFrame
all_seasons.show()
full_year = all_seasons.groupBy("水果").agg(F.sum("销量").alias("总销量"))# 按水果分组,计算每类水果的总销量
full_year.show()
windowSpec = Window.partitionBy(F.lit(1)).orderBy(full_year['总销量'].desc())# 创建一个WindowSpec对象,对整个DataFrame进行排序
full_year = full_year.withColumn("排名", dense_rank().over(windowSpec))# 使用dense_rank()进行排名,并添加到新的列'销量排名'
full_year.show()
2.提取每个季节销量排名前3的水果名字
提取每个季节销量排名前3的水果名字,并将它们放在一张表top_fruits 中。结果如下图。
代码如下:
# 提取每个季节销量排名前3的水果名字,并将它们合并放在一起。
top_spring = spring.filter(spring["排名"]<=3).select("水果")
top_summer = summer.filter(summer['排名']<=3).select("水果")
top_autumn = autumn.filter(autumn['排名']<=3).select("水果")
top_winter = winter.filter(winter['排名']<=3).select("水果")
top_winter.show()
top_fruits = top_spring.union(top_summer).union(top_autumn).union(top_winter).distinct()# 使用union合并结果,去除重复项
top_fruits.show()
3. 春夏秋冬销量表左连接表top_fruits
将春季、夏季、秋季和冬季销量表,左连接排名前3的水果表top_fruits ,用于计算四个季度的排名差。需要注意,数据表连接后,有很多列名是重复的,因此连接时需要为每张表指定一个别名,否则连接后无法选择列。
连接表后,重命名列名,并选择相关的列,即选择每个季度的排名列。
代码如下:
# 外连接:左连接4张表,需要为数据表指定一个别名,否则连接后无法选择列
left_fruits = top_fruits.alias("t") \
.join(spring.alias("s"), col('t.水果')== col('s.水果'), how='left_outer') \
.join(summer.alias('su'), col('t.水果')== col('su.水果'), how='left_outer') \
.join(autumn.alias('a'), col('t.水果')== col('a.水果'), how='left_outer') \
.join(winter.alias('w'), col('t.水果')== col('w.水果'), how='left_outer')# 连接后有很多重复的列
left_fruits.show()
left_fruits = left_fruits.select(col('t.水果').alias('水果'),
col('s.排名').alias('spring_排名'), col('s.销量').alias('spring_销量'),
col('su.排名').alias('summer_排名'), col('su.销量').alias('summer_销量'),
col('a.排名').alias('autumn_排名'), col('a.销量').alias('autumn_销量'),
col('w.排名').alias('winter_排名'), col('w.销量').alias('winter_销量'))# 重命名相关列
left_fruits_sel = left_fruits.select('水果','spring_排名','summer_排名','autumn_排名','winter_排名')
left_fruits_sel.show()
4.计算每类水果的季度排名差
先计算每类水果的季度排名差,然后计算排名差的最大值最小值,最后按照排名差最大值降序排序。
从结果可以看出,苹果和荔枝的季度排名差分别为7和6,名列排名差的第一和第二位,它们是季节性最强的水果。实际上,大部分水果都是季节性产品,本案例仅为说明外连接的编程思路,分析结果无实际意义。
代码如下:
# 计算每类水果各个季度排名差的最大值和最小值
left_fruits_diffs = left_fruits_sel.withColumn("min_diff",
F.least(
F.abs(F.col("spring_排名")- F.col("summer_排名")),
F.abs(F.col("spring_排名")- F.col("autumn_排名")),
F.abs(F.col("spring_排名")- F.col("winter_排名")),
F.abs(F.col("summer_排名")- F.col("autumn_排名")),
F.abs(F.col("summer_排名")- F.col("winter_排名")),
F.abs(F.col("autumn_排名")- F.col("winter_排名")))).withColumn("max_diff",
F.greatest(
F.abs(F.col("spring_排名")- F.col("summer_排名")),
F.abs(F.col("spring_排名")- F.col("autumn_排名")),
F.abs(F.col("spring_排名")- F.col("winter_排名")),
F.abs(F.col("summer_排名")- F.col("autumn_排名")),
F.abs(F.col("summer_排名")- F.col("winter_排名")),
F.abs(F.col("autumn_排名")- F.col("winter_排名"))))
left_fruits_diffs.orderBy('max_diff', ascending=False).show()# 排名差降序排序
5. 提取季节性最强的前2类水果
选择季节性最强的前2类水果,提取它们四个季度的销量情况,用于后续的可视化绘图。
代码如下:
# 取季节性最强的前2类水果,提取它们四个季度的销量情况,用于后续的可视化绘图。
top_diff = left_fruits_diffs.orderBy('max_diff', ascending=False).limit(2)# 取前两行数据
top_diff.show()# 可以看出,苹果和荔枝的季节排名差最大,也就是它们是季节性最强的水果。
top_diff = top_diff.select("水果")# collect_data=top_diff.collect()# 收集数据到驱动程序(只调用一次)。或者用topandas将pyspark dataframe转为pandas dataframeprint(top_diff.collect())print(top_diff.collect()[0])# 第1行数据print(top_diff.collect()[0][0])# 第1行数据的值:苹果print(top_diff.collect()[1][0])# 第2行数据的值:荔枝
df5 = left_fruits.select("水果",'spring_销量','summer_销量','autumn_销量','winter_销量') \
.where(col('水果').isin(['苹果', top_diff.collect()[1][0]]))# 提取 苹果、荔枝 四个季度的销量情况
df5.show()
版权归原作者 侧耳倾听童话 所有, 如有侵权,请联系我们删除。