背景:
小批量数据可以使用pandas 进行分析,方便灵活。但大批量(千万级别)数据,使用pandas分析处理,速度很慢,且需一次性读取全部数据,内存可能溢出。
此时使用spark分布式分析处理速度很快,且数据分区,再配上jupyter 在线分析工具界面,可以很方便进行交互式大数据集分析。
注意:
python第三方库pyspark,和spark自带组件pyspark,都提供了与Spark 集群交互的 Python 接口,让Python 开发人员能够利用 Apache Spark 的强大功能来处理大规模数据。
区别:spark集群自带组件pyspark,与spark一体,无需进行配置,可直接使用;python pip安装的pyspark模块,需要配置上spark集群相关信息,才能利用spark集群处理数据。
环境:
Jupyter-lab(python3.7) + Spark集群(sparkV2.4.0 - cdh6.3.4)
文章目录
1、Jupyter Pyspark 在线交互式环境配置
1.1 第一种方式
# 安装pyspark类库> pip install pyspark==2.4.0# 与spark集群版本保持一致# 启动jupyter-lab> jupyter-lab
# jupyter环境import os
from pyspark import SparkContext,SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,FloatType,IntegerType
import pyspark.sql.functions as F
from pyspark.ml.feature import QuantileDiscretizer
import pandas as pd
import numpy as np
import logging
import sys
import warnings
# 日志配置
logging.getLogger("py4j").setLevel(logging.WARN)# 屏蔽spark运行debug日志
logging.getLogger("pyspark").setLevel(logging.WARN)# 配置集群spark、hadoop家目录
os.environ['SPARK_HOME']='/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6751098/lib/spark'
os.environ['HADOOP_CONF_DIR']='/opt/cloudera/parcels/CDH-6.3.4-1.cdh6.3.4.p0.6751098/lib/hadoop'# spark on hive
os.environ['PYSPARK_PYTHON']='./py3/python3.7/bin/python'# 使用 spark.yarn.dist.archives 分发到各节点的python环境;也可以是所在节点绝对路径(要求每台机器所安装python环境一致)
os.environ['PYSPARK_DRIVER_PYTHON']='/data/d2/anaconda3/anaconda3/envs/python3.7/bin/python'# spark client 模式运行时,driver 本地运行,无法使用spark.yarn.dist.archives 分发到各节点的python环境,需单独指定# 创建sparksql会话
parameters =[('spark.app.name','测试pyspark连接'),# 设置spark应用名称 # spark.yarn.dist.archives 依赖归档文件(zip/tar.gz等),用于指定哪些文件应该分发到每个执行节点上。# 这些压缩文件将在应用程序执行前在executor节点上工作目录内解压可用。# 这里将整个python3.7解释器,压缩为py3,随spark执行程序一起分发到各个executor工作目录内。('spark.yarn.dist.archives','hdfs://namenode2:8020/python3/python3.7.tar.gz#py3'),('spark.master','yarn'),#集群运行模式,yarn(连接到正在运行的 YARN 集群,Spark应用程序会作为 YARN 上的一个应用来提交和执行) ('spark.submit.deploymode','client'),# driver运行模式 -> 客户端模式运行#('spark.submit.pyFiles','./test.txt') # job开始前上传所依赖py文件,client模式下可能提示找不到(文件会随submit自动上传)('spark.debug.maxToStringFields',100)]
conf = SparkConf.setALL(parameters)# sc = SparkContext.getOrCreate(conf=conf)# sc.addPyFile('./funcs.py') #(local_file_path/hdfs/url) 添加单个python文件到executor上,甚至在开始job后也可以添加py文件 # 创建回话
spark = SparkSession\
.builder\
.config(conf=conf)\
.enableHiveSupport()\ #.enableHiveSupport():这个方法会自动包括一些必要的 Hive 类库和配置,以便与 Hive 服务通信.getOrCreate()
spark
# 关闭回话# spark.stop()
1.2 第二种方式[未验证]
spark官网说明
## 通过配置driver端python环境为jupyter,然后再启动{SPARK_HOME}/bin/pyspark实现>exportPYSPARK_DRIVER_PYTHON=/xx/anaconda3/bin/jupyter #jupyter启动服务命令所在目录>exportPYSPARK_DRIVER_PYTHON_OPTS=notebook # jupyter启动参数,jupyter notebook方式>exportPYSPARK_DRIVER_PYTHON_OPTS='lab --allow-root'# jupyter-lab 方式 二选其一# 上面两个配置可以直接加到{SPARK_HOME}/bin/pyspark 启动文件里# 再次启动pyspark> ./bin/pyspark
2、在线交互式大数据分析测试
#1、读取hive表数据
data = spark.sql('select * from test.id4')
data.show(4)
#2、使用udf进行数据处理
@F.udf(IntegerType())
def add(x):
return x+1
data.withColumn('flag+1',add(data.flag))
data.show(4)
3、spark-submit
spark-submit 是Spark提交各类任务(python、R、Java、Scala)的工具,可以使用shell脚本运行指定的py脚本。
其实,{SPARK_HOME}/bin/pyspark交互式环境,运行时底层也是使用的spark-submit提交资源管理器进行计算。
spark-submit*(V3.1.3)具体参数:
【从spark-submit --help 翻译而来(不同版本间可能有差异)】
spark-submit 提交用法及参数(偏python)提交脚本格式Usage: spark-submit [options] "app jar | python file | R file" [app arguments] Options:注释 --master MASTER_URL spark://host:port, mesos://host:port, yarn,k8s://https://host:port,local (Default: local[*]). --deploy-mode DEPLOY_MODE 运行模式 'client/cluster '(Default: client) --class CLASS_NAME 您的应用程序的主类(用于 Java / Scala 应用程序,python应该不需要)。 --name NAME 应用程序的名称。 --packages 要包含在驱动程序和执行程序类路径中的 jar 的 maven 坐标的逗号分隔列表。 --py-files PY_FILES逗号分隔的 .zip、.egg 或 .py 文件列表,提交的python文件和入口文件在同一目录下,这里面包括Python应用主程序,这些文件将被交付给每一个执行器来使用。 --files FILES逗号分隔的文件列表,放置在每个执行器的工作目录中。这些文件在执行器中的文件路径可以通过 SparkFiles.get(fileName) 访问。 --archives ARCHIVES 要提取到每个执行程序的工作目录中的以逗号分隔的档案列表。--conf, -c PROP=VALUE 任意 Spark 配置属性--properties-file FILE加载额外属性的文件路径。如果未指定,默认查找 conf/spark-defaults.conf --driver-memory MEMdriver程序内存(例如 1000M,2G)(默认值:1024M)。 --executor-memory MEM每个executor的内存(例如 1000M,2G)(默认值:1G)仅集群部署模式: --driver-cores NUMdrive使用的核心数(默认值:1)。--executor-cores每个executor可以使用的cpu核心(Yarn和K8S默认1,standalone默认worker可利用的全部核心)Spark 仅适用于 YARN 和 Kubernetes: --num-executors NUM要启动的执行程序数量(默认值:2)。 如果启用了动态分配,则执行器的初始数量将至少为 NUM。 --principal PRINCIPAL用于登录 KDC 的主体。--keytab KEYTAB包含以上指定主体的密钥表的文件的完整路径。Spark 仅适用于 YARN:--queue QUEUE_NAME 要提交到的 YARN 队列(默认值:“default”)。
4、client 和 cluster 运行模式注意点
起初,在配置spark运行环境时,指定spark python环境配置如下:
vim spark-defaults.conf
spark.yarn.dist.archives=hdfs://***/***/***/env/python_env.zip#python_envspark.pyspark.driver.python=./python_env/bin/python # pyspark程序内部自定义函数或类执行环境spark.pyspark.python=./python_env/bin/python
Spark-submit在进行client模式提交时,提示 "Cannot run program “./python_env/bin/python “: error=2, No such file or dictor "错误,而进行cluster提交时正常运行。
原因:
--archives# code运行依赖文档。
--py-files # code依赖python。
如上面依赖,也可能出现 “file not found” 问题,原理应该一样。
driver服务启动后,会自动上传依赖文件到executor中,解压到当前文件夹下(通过查看yarn运行日志,可以看到上传解压过程)。
当使用client模式时,driver运行在本地spark-submit进程中,未进行archives的上传解压,所以报错找不到python文件。
当使用cluster模式提交时,会优先在yarn的机器中,开启一个特殊的executor运行driver,在开启executor过程中,伴随着进行archives的上传解压。
版权归原作者 yunpeng.zhou 所有, 如有侵权,请联系我们删除。