0


PySpark 本地开发环境搭建与实践

Spark 的Standalone集群环境安装与测试-CSDN博客


  1. 在大数据处理领域,PySpark 作为一个强大的工具,为数据科学家和开发人员提供了便捷的方式来处理大规模数据。本文将详细介绍如何在 Windows 环境下搭建 PySpark 本地开发环境,并深入探讨在这个环境下的代码编写、案例实践、程序监控以及一些常见问题的处理,帮助读者快速上手 PySpark 本地开发。

一、PySpark 本地开发环境搭建

(一)Windows 本地 JDK 和 Hadoop 的安装

  1. JDKJava Development Kit)是 Java 开发的基础,而 Hadoop 是处理大数据的重要框架。在 Windows 上安装它们是后续搭建 PySpark 环境的第一步。安装过程需要注意选择合适的版本,并按照安装向导进行操作,确保安装路径等设置正确。

JDK安装配置教程_jdk64位安装-CSDN博客

Windows 系统安装 Hadoop 详细教程-CSDN博客

(二)Windows 安装 Anaconda

  1. Anaconda 是一个开源的 Python 发行版本,它包含了 condaPython 180 多个科学包及其依赖项。右键以管理员身份运行安装程序,默认安装到了 ProgramData 文件夹(这是一个非空文件夹)。在安装过程中,要注意各种安装选项,确保安装顺利进行。Anaconda 的安装为后续在其环境中安装 PySpark 等相关包提供了基础。

通过网盘分享Miniconda3的:Miniconda3-py38_4.11.0-Windows-x86_64.exe

(三)Anaconda 中安装 PySpark

  1. 在命令提示符(cmd)中进行操作。在安装过程中,如果遇到需要输入 y 或者 n 的情况,输入 y。安装完成后,可以通过
  1. conda list

或者

  1. pip list

检查是否包含

  1. py4j

  1. pyspark

两个包。PySpark 的安装路径在 $ANACONDA_HOME/Lib/site - packages。这里需要强调的是,这实际上是在本地安装一个 Spark 软件,如果没有 Spark 环境,仅仅安装了 PySpark 是无法运行 Spark 代码的。

(四)Pycharm 中创建工程

  1. 选择 Conda:在 Pycharm 中创建工程时,选择 Conda。直接点确定即可。因为 Anaconda 包含了 Python 并且可以安装各种环境,比如 pyspark,通过这种关联,Pycharm 可以使用 Anaconda 中的工具。
  2. 解决识别问题:如果 Anaconda 没有安装在 C 盘,可能会出现识别不了的情况。此时需要手动选择。
  3. 检查安装包中是否有相关软件,并验证选择的解释器是否正确。
  4. 创建文件夹,为后续代码编写做好准备。

main :用于存放每天开发的一些代码文件
resources :用于存放程序中需要用到的配置文件
datas :用于存放每天用到的一些数据文件
test :用于存放测试时的一些代码文件

二、编写代码

(一)编写环境变量的代码

  1. 环境变量的设置对于 PySpark 程序的运行至关重要。它确保程序能够找到所需的资源和配置。在代码中,要正确地设置与 Spark 相关的环境变量,包括 Spark 的安装路径、配置文件路径等。
  1. import os
  2. if __name__ == '__main__':
  3. # 你自己的JDK路径
  4. os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
  5. # 配置Hadoop的路径,就是前面解压的那个路径
  6. os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
  7. # 配置base环境Python解析器的路径
  8. os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  9. # 配置base环境Python解析器的路径
  10. os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

(二)获取 SparkContext 对象

  1. SparkContext Spark 中的核心类,任何一个 Spark 的程序都必须包含一个 SparkContext 类的对象。通过获取这个对象,我们可以进一步构建和执行 Spark 任务。例如:
  1. import os
  2. # 导入pyspark模块
  3. from pyspark import SparkContext,SparkConf
  4. if __name__ == '__main__':
  5. # 配置环境
  6. os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
  7. # 配置Hadoop的路径,就是前面解压的那个路径
  8. os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
  9. # 配置base环境Python解析器的路径
  10. os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
  11. os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  12. # 获取 conf 对象
  13. conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
  14. # 假如我想设置压缩
  15. # conf.set("spark.eventLog.compression.codec","snappy")
  16. # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
  17. sc = SparkContext(conf=conf)
  18. print(sc)
  19. # 使用完后,记得关闭
  20. sc.stop()

(三)将代码模板化

  1. 创建一个名为
  1. pyspark_local_script

的模板,并在其中添加必要的内容。模板化代码有助于提高代码的复用性和规范性。在模板中,可以将一些常用的代码结构和函数定义好,方便在不同的项目中使用。

完整的模板:记得给模板起个名字pyspark_local_script
  1. import os
  2. # 导入pyspark模块
  3. from pyspark import SparkContext,SparkConf
  4. """
  5. ------------------------------------------
  6. Description : TODO:
  7. SourceFile : ${NAME}
  8. Author : ${USER}
  9. Date : ${DATE}
  10. -------------------------------------------
  11. """
  12. if __name__ == '__main__':
  13. # 配置环境
  14. os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
  15. # 配置Hadoop的路径,就是前面解压的那个路径
  16. os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
  17. # 配置base环境Python解析器的路径
  18. os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
  19. os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  20. # 获取 conf 对象
  21. conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
  22. # 假如我想设置压缩
  23. # conf.set("spark.eventLog.compression.codec","snappy")
  24. # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
  25. sc = SparkContext(conf=conf)
  26. print(sc)
  27. # 使用完后,记得关闭
  28. sc.stop()

模板的使用:

三、本地开发案例

(一)WordCount 案例

代码编写:这是一个经典的大数据处理案例。通过读取文本文件,将其中的单词进行拆分、计数。代码实现如下:

  1. import os
  2. # 导入pyspark模块
  3. from pyspark import SparkContext, SparkConf
  4. if __name__ == '__main__':
  5. # 配置环境
  6. os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'
  7. # 配置Hadoop的路径,就是前面解压的那个路径
  8. os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'
  9. # 配置base环境Python解析器的路径
  10. os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
  11. os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  12. # 获取 conf 对象
  13. # setMaster 按照什么模式运行,local bigdata01:7077 yarn
  14. # local[2] 使用2核CPU * 你本地资源有多少核就用多少核
  15. # appName 任务的名字
  16. conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
  17. # 假如我想设置压缩
  18. # conf.set("spark.eventLog.compression.codec","snappy")
  19. # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
  20. sc = SparkContext(conf=conf)
  21. print(sc)
  22. fileRdd = sc.textFile("../../datas/WordCount/data.txt")
  23. rsRdd = fileRdd.filter(lambda x: len(x) > 0) \
  24. .flatMap(lambda line: line.strip().split()) \
  25. .map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
  26. rsRdd.saveAsTextFile("../datas/WordCount/result2")
  27. # 使用完后,记得关闭
  28. sc.stop()
  29. # 注意,在cmd窗口中执行spark-submit命令时,需要将以下路径添加到环境变量中

建议安装一个工具psutil,如果不想看到就在 Python解释器中安装:pip install psutil

查看运行结果:运行代码后,可以在指定的输出路径中查看结果文件。结果文件中包含了每个单词及其出现的次数。

常见的其他错误:

(二)使用正则解决特殊分隔符问题

  1. 在实际数据处理中,可能会遇到特殊的分隔符。这时可以使用正则表达式来改造 WordCount 代码。例如,如果数据是用特定的非空格字符分隔的,可以通过修改
  1. flatMap

函数中的分隔逻辑,使用正则表达式来正确拆分单词。

  1. import os
  2. import re
  3. # 导入pyspark模块
  4. from pyspark import SparkContext, SparkConf
  5. if __name__ == '__main__':
  6. # 配置环境
  7. os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'
  8. # 配置Hadoop的路径,就是前面解压的那个路径
  9. os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'
  10. # 配置base环境Python解析器的路径
  11. os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
  12. os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  13. # 获取 conf 对象
  14. # setMaster 按照什么模式运行,local bigdata01:7077 yarn
  15. # local[2] 使用2核CPU * 你本地资源有多少核就用多少核
  16. # appName 任务的名字
  17. conf = SparkConf().setMaster("local[*]").setAppName("")
  18. # 假如我想设置压缩
  19. # conf.set("spark.eventLog.compression.codec","snappy")
  20. # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
  21. sc = SparkContext(conf=conf)
  22. print(sc)
  23. fileRdd = sc.textFile("../../datas/WordCount/data.txt")
  24. rsRdd = fileRdd.filter(lambda x: len(x) > 0) \
  25. .flatMap(lambda line: re.split("\\s+", line.strip())) \
  26. .map(lambda word: (word, 1)) \
  27. .reduceByKey(lambda sum, num: sum+num)
  28. rsRdd.saveAsTextFile("../datas/WordCount/result3")
  29. # 使用完后,记得关闭
  30. sc.stop()

(三)本地开发 - 读取 hdfs 上的数据

  1. Windows 环境下,用户通常没有权限访问 hdfs 文件系统。这需要进行一些额外的配置,比如配置 Hadoop 的相关权限,或者通过一些代理工具来实现访问。在代码中,要正确设置 Hadoop 的配置参数,以确保能够读取 hdfs 上的数据。
  1. fileRdd = sc.textFile("hdfs://bigdata01:9820/spark/wordcount/input/*")
  2. rsRdd = fileRdd.filter(lambda line: len(line.strip()) > 0 ).flatMap(lambda line: re.split("\\s+",line.strip())).map(lambda word: (word,1)).reduceByKey(lambda sum,num : sum+num)
  3. rsRdd.saveAsTextFile("hdfs://bigdata01:9820/spark/wordcount/output4")

以上这个说明,windows用户没有权限访问hdfs文件系统

申明当前以root用户的身份来执行操作

os.environ['HADOOP_USER_NAME'] = 'root'

完整代码

  1. import os
  2. import re
  3. # 导入pyspark模块
  4. from pyspark import SparkContext, SparkConf
  5. if __name__ == '__main__':
  6. # 配置环境
  7. os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'
  8. # 配置Hadoop的路径,就是前面解压的那个路径
  9. os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'
  10. # 配置base环境Python解析器的路径
  11. os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
  12. os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  13. # 申明当前以root用户的身份来执行操作
  14. os.environ['HADOOP_USER_NAME'] = 'root'
  15. conf = SparkConf().setMaster("local[*]").setAppName("")
  16. sc = SparkContext(conf=conf)
  17. fileRdd = sc.textFile("hdfs://bigdata01:9820/spark/wordcount/input/*")
  18. rsRdd = fileRdd.filter(lambda line: len(line.strip()) > 0) \
  19. .flatMap(lambda line: re.split("\\s+", line.strip())) \
  20. .map(lambda word: (word, 1))\
  21. .reduceByKey(lambda sum, num: sum + num)
  22. rsRdd.saveAsTextFile("hdfs://bigdata01:9820/spark/wordcount/output4")
  23. # 使用完后,记得关闭
  24. sc.stop()

运行

(四)本地开发 - 获取外部的变量

  1. 类似于 Java 中的
  1. String[] args

,在 PySpark 中也可以获取外部变量。可以通过命令行参数传递的方式来实现。例如,在运行

  1. pyspark

脚本时,可以使用

  1. spark - sumit xxxxx.py 参数 1, 参数 2

的形式传递参数。在代码中,需要对这些参数进行解析和使用。

  1. import os
  2. import re
  3. import sys
  4. # 导入pyspark模块
  5. from pyspark import SparkContext, SparkConf
  6. if __name__ == '__main__':
  7. # 配置环境
  8. os.environ['JAVA_HOME'] = 'C:/Program Files/java/jdk1.8.0_181'
  9. # 配置Hadoop的路径,就是前面解压的那个路径
  10. os.environ['HADOOP_HOME'] = 'D:/Linux/hadoop/hadoop-3.3.1'
  11. # 配置base环境Python解析器的路径
  12. os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
  13. os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
  14. # 申明当前以root用户的身份来执行操作
  15. os.environ['HADOOP_USER_NAME'] = 'root'
  16. # 获取 conf 对象
  17. conf = SparkConf().setMaster("local[*]").setAppName("")
  18. # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
  19. sc = SparkContext(conf=conf)
  20. print(sc)
  21. # 获取第一个参数
  22. fileRdd = sc.textFile(sys.argv[1])
  23. rsRdd = fileRdd.filter(lambda line: len(line.strip()) > 0) \
  24. .flatMap(lambda line: re.split("\\s+", line.strip())) \
  25. .map(lambda word: (word, 1)) \
  26. .reduceByKey(lambda sum, num: sum + num)
  27. # 获取第二个参数
  28. rsRdd.saveAsTextFile(sys.argv[2])
  29. # 使用完后,记得关闭
  30. sc.stop()
传递数据

参数一: hdfs://bigdata01:9820/spark/wordcount/input/*
参数二: hdfs://bigdata01:9820/spark/wordcount/output4

参数设置界面

四、Spark 程序的监控

4040 界面的使用

  1. 因为是本地的程序,所以可以通过访问地址
  1. http://localhost:4040

来监控程序。每个 Spark 程序都有一个对应的 4040 界面。这个界面提供了丰富的信息:

  1. RDD 相关信息:每个黑点表示一个 RDD,每个矩形框中的 RDD 的转换都是在内存中完成的,曲线代表经过了 Shuffle,灰色代表没有执行(因为之前执行过)。
  2. 进程信息:显示当前这个程序的运行进程的信息。每个 Spark 程序都由两种进程组成:一个 Driver 和多个 Executors。Driver 进程负责解析程序,构建 DAG 图,构建 Stage,构建、调度、监控 Task 任务的运行;Executor 进程负责运行程序中的所有 Task 任务。
  3. 存储信息:Storage 部分显示当前这个程序在内存缓存的数据信息。
  4. 配置信息:Environment 显示当前这个程序所有的配置信息。

五、local 和结果文件的数量问题

(一)local 模式并行度

  1. local:使用本地模式,并行度是 1。
  2. **local[3]**:使用本地模式,并行度是 3,这个并行度最好和 CPU 的核数一致,一般并行度 <= CPU 的核数。
  3. *local[]**:并行度 = CPU 的核数。

(二)结果文件数量与 local 模式的关系

  1. 文件的结果经常是 2 个文件,这跟分区数有关系,跟 local = N 也有一定的关系。其规律是
  1. min(N,2)

,例如如果是 local [1],最后的文件数量就是 1。

  1. 如果在 local 模式下,想要结果文件是 10,可以使用
  1. sc.textFile("../datas/wordcount/data.txt",10)

的方式来设置分区数。

六、总结

  1. 本文详细介绍了 PySpark 本地开发环境的搭建过程,包括 JDKHadoopAnacondaPySpark 的安装以及 Pycharm 工程的创建。同时,深入讲解了代码编写、本地开发案例(如 WordCount、处理特殊分隔符、读取 hdfs 数据、获取外部变量)、Spark 程序的监控和 local 模式下结果文件数量问题等内容。通过掌握这些知识和技能,读者可以在 Windows 本地环境中高效地进行 PySpark 开发,处理大规模数据,解决实际业务中的数据分析和处理问题。希望本文能为读者在 PySpark 学习和实践的道路上提供有力的帮助,让读者能够更好地利用这个强大的工具来挖掘数据的价值。
标签: spark pyspark conda

本文转载自: https://blog.csdn.net/weixin_64726356/article/details/143416680
版权归原作者 天冬忘忧 所有, 如有侵权,请联系我们删除。

“PySpark 本地开发环境搭建与实践”的评论:

还没有评论