0


(五)Spark大数据开发实战:灵活运用PySpark常用DataFrame API

一、PySpark

Apache Spark是一个用于大数据处理的开源分布式计算框架,而PySpark则是Spark的Python 实现。PySpark允许使用Python编程语言来利用Spark的强大功能,使得开发人员能够利用Python的易用性和灵活性进行大规模数据处理和分析。

PySpark与Spark-Scala的对比:

1、语言选择:
PySpark: 使用简洁而易学的Python作为编程语言,这使得PySpark学习难度大大降低。
Spark-Scala: 使用Scala作为主要编程语言。Scala是一门运行在Java虚拟机上的多范式编程语言,更接近Java,并具有强大的面向对象和函数式编程特性,但是其学习曲线较陡。

2、性能:
PySpark:由于Python是解释型语言,相比Scala的原生Spark可能会有性能上的一些损失。但通过PySpark的DataFrame和优化技术也可以显著提高性能。
Spark-Scala:使用Scala编写的Spark应用程序可能在性能上略优于PySpark,因为Scala是一门静态类型语言,而且运行在Java虚拟机上。

4、生态系统支持:
PySpark:可与Python的生态系统(如NumPy、Pandas)以及其他大数据工具和库进行集成。
Spark-Scala:由于运行在JVM上,可以利用Java生态系统,但Scala本身的生态系统相对较小。

5、机器学习支持:
PySpark: 提供了MLlib库,支持在分布式环境中进行大规模的机器学习任务。
Spark-Scala: 同样支持MLlib,但在API的使用上可能相对繁琐一些。

总体而言,PySpark强于数据分析,Spark-Scala强于性能。如果应用场景有非常多的可视化和机器学习算法需求,推荐使用pyspark,可以更好地和python中的相关库配合使用。

本文软件环境如下:

操作系统:CentOS Linux 7

Hadoop版本:3.1.3,安装教程可见我另一篇博客内容:Linux CentOS安装Hadoop3.1.3(单机版)详细教程

Spark版本:3.5.2,安装教程可见我另一篇博客内容:Linux CentOS安装PySpark3.5(单机版)详细教程及机器学习实战

Python版本:Python(Anaconda)3.11.4

PySpark基础学习可看 PySpark系列文章:

(一)PySpark3:安装教程及RDD编程

(二)PySpark3:SparkSQL编程

(三)PySpark3:SparkSQL40题

(四)PySpark3:Mlib机器学习实战-信用卡交易数据异常检测

二、数据介绍

本文数据来自采集豆瓣网分类排行榜 (“https://movie.douban.com/chart”)中各分类类别所有电影的相关信息并存储为csv文件。

爬虫代码在我另一篇博客:豆瓣电影信息爬取与可视化分析

数据放在了百度云上:https://pan.baidu.com/s/1YWB2iEOsMmXHkEUFpY2_TA?pwd=ej3z

数据如下图所示,包含电影名、上映日期、上映地区、类型、豆瓣链接、参演演员、演员数、评分、打分人数,共有3357部电影:

三、PySpark大数据开发实战

1、数据文件上传HDFS

首先通过xftp上传linux服务器,然后通过以下命令上传至HDFS:

  1. hdfs dfs -mkdir /data
  2. hdfs dfs -mkdir /output
  3. hdfs dfs -put film_info.csv /data

2、导入模块及数据

使用SparkSession.builder.config(conf=SparkConf()).getOrCreate()创建Spark会话。使用spark.read.csv()读取CSV文件,并设置header=True以识别首行为列名,inferSchema=True自动推断数据类型。

  1. from pyspark import SparkConf, SparkContext
  2. from pyspark.sql import Row
  3. from pyspark.sql.types import *
  4. from pyspark.sql import SparkSession
  5. from datetime import datetime
  6. import pyspark.sql.functions as F
  7. from pyspark.sql.window import Window
  8. # 主程序:
  9. spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
  10. df = spark.read.csv("/data/film_info.csv", header=True, inferSchema=True)

3、数据统计与分析

①、计算演员参演电影数

以下代码中使用了spark sql进行统计,也可以通过DataFrame API进行统计:

  1. df.groupBy("actor").agg(count("title").alias("act_film_num"))
  1. # 按分隔符切分列表
  2. df_split = df.withColumn("actors", F.split(df["actors"], "\|")) \
  3. .withColumn("types", F.split(df["types"], "\|")) \
  4. .withColumn("regions", F.split(df["regions"], "\|"))
  5. # 演员:拆分多行
  6. df_exploded = df_split.withColumn("actor", F.explode(F.col("actors")))
  7. df_exploded.drop(*["actors", "regions", "types"]).createOrReplaceTempView("actor_exploded")
  8. df1 = spark.sql('''
  9. select actor,
  10. count(*) as act_film_num
  11. from actor_exploded
  12. group by actor
  13. ''')
  14. df1.sort(df1["act_film_num"].desc()).repartition(1).write.mode("overwrite").option("header", "true").csv(
  15. "/output/result1.csv")

结果如下:

  1. +-------------+------------+
  2. | actor|act_film_num|
  3. +-------------+------------+
  4. | 童自荣| 43|
  5. | 户田惠子| 37|
  6. | 林雪| 33|
  7. | 张国荣| 32|
  8. | 刘德华| 31|
  9. | 周星驰| 31|
  10. | 成龙| 31|
  11. | 任达华| 31|
  12. | 刘洵| 30|
  13. |塞缪尔·杰克逊| 29|
  14. | 汤姆·汉克斯| 29|
  15. | 梁家辉| 28|
  16. | 吴孟达| 28|
  17. | 梁朝伟| 27|
  18. | 斯坦·李| 27|
  19. | 吴君如| 27|
  20. | 威廉·达福| 27|
  21. | 黄秋生| 27|
  22. | 胡立成| 27|
  23. | 布拉德·皮特| 26|
  24. +-------------+------------+
  25. only showing top 20 rows
②、依次罗列电影番位前十的演员

这一题考察了窗口函数、行转列等等。

  1. # 定义窗口函数,按电影标题和演员顺序排序
  2. windowSpec = Window.partitionBy("title").orderBy("actors")
  3. # 添加序号列
  4. df2 = df_exploded.withColumn("rank", F.row_number().over(windowSpec))
  5. # 过滤出前10个演员
  6. rank_num = 10
  7. rank_num_list = [str(i + 1) for i in range(rank_num)]
  8. # 将演员重新组合成单行
  9. df2_tmp1 = df2.groupBy("title").pivot("rank", rank_num_list).agg(F.collect_list("actor"))
  10. df2_tmp2 = df2_tmp1.select("title", *[F.col(f"{i + 1}")[0].alias(f"actor{i + 1}") for i in range(rank_num)])
  11. df2_tmp2.repartition(1).write.mode("overwrite").option("header", "true").csv("/output/result2.csv")

结果如下:

  1. +------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
  2. | title| actor1| actor2| actor3| actor4| actor5| actor6| actor7| actor8| actor9| actor10|
  3. +------------------------+-------------------+---------------------+------------------+---------------+-----------------+----------------------+-------------------+---------------------+-----------------+-----------------+
  4. | 101忠狗| 罗德·泰勒| 凯特·鲍尔| 本·怀特| 丽莎·戴维斯| 贝蒂·洛乌·格尔森| J·帕特·奥马利| 玛莎·温特沃思| 大卫·弗兰克海姆|弗莱德里克·沃洛克| 汤姆·康威|
  5. | 11:14| 亨利·托马斯| 布莱克·赫伦| 芭芭拉·赫希| 克拉克·格雷格| 希拉里·斯万克| 肖恩·海托西| 斯塔克·桑德斯| 科林·汉克斯| 本·福斯特| 帕特里克·斯威兹|
  6. | 2012| 约翰·库萨克| 阿曼达·皮特| 切瓦特·埃加福| 坦迪·牛顿| 奥利弗·普莱特| 汤姆·麦卡锡| 伍迪·哈里森| 丹尼·格洛弗| 连姆·詹姆斯| 摩根·莉莉|
  7. | 2046| 梁朝伟| 章子怡| 王菲| 木村拓哉| 巩俐| 刘嘉玲| 张震| 张曼玉| 董洁| 通猜·麦金泰|
  8. | 21克| 西恩·潘| 娜奥米·沃茨|本尼西奥·德尔·托罗| 夏洛特·甘斯布| 梅丽莎·里奥| 迈克尔·芬内尔|

本文转载自: https://blog.csdn.net/weixin_44458771/article/details/143397463
版权归原作者 唯余木叶下弦声 所有, 如有侵权,请联系我们删除。

“(五)Spark大数据开发实战:灵活运用PySpark常用DataFrame API”的评论:

还没有评论