0


Spark 程序开发与提交:本地与集群模式全解析

Spark 的介绍与搭建:从理论到实践-CSDN博客

Spark 的Standalone集群环境安装与测试-CSDN博客

PySpark 本地开发环境搭建与实践-CSDN博客

    在大数据处理领域,Apache Spark 是一款强大的分布式计算框架。它提供了高效、灵活的数据处理能力,广泛应用于各种数据密集型应用场景。无论是在本地开发环境还是在集群环境中,正确地开发和提交 Spark 程序都是至关重要的。本文将深入探讨 Spark 程序在本地开发并远程提交到集群测试的过程,以及使用 spark - submit 脚本在集群模式下提交程序的相关知识,包括参数配置、运行模式等内容。

一、本地开发与远程提交测试

(一)问题背景

    在本地开发 Spark 程序时,常常会面临一些挑战。比如,当使用 HDFS 上的数据进行开发时,数据量可能过大,导致本地运行消耗大量本地资源且运行时间很长。因此,需要一种便捷的方法,能够在本地编写代码后,通过简单操作将代码自动提交到集群上运行。

(二)解决方案

集群环境准备

首先要启动集群,在第一台机器上执行以下命令:

start - dfs.sh
cd /opt/installs/spark
sbin/start - master.sh
sbin/start - workers.sh
sbin/start - history - server.sh
Windows 指定远程解析器
    创建一个用于同步本地代码到服务上的文件夹。在 Linux 上创建同步目录,将 Windows 上的代码和数据同步到
/root/pyspark_code

目录下:

mkdir -p /root/pyspark_code

这样,后续右键运行代码时,就可以将代码自动提交给集群运行。如果需要换回本地运行,可以相应地切换环境。

问题

解决方案:在bigdata01中,安装pyspark。

我们在linux上配置了阿里云的源,不代表在base中也可以使用,运行时需要带上源地址
pip install pyspark==3.1.2 -i https://pypi.tuna.tsinghua.edu.cn/simple/
验证是否已经安装
pip list 或者  conda list
也可以这么干:
pip list | grep pyspark

下载完成之后,重启一下 pycharm.

(三)代码提交到集群

自动提交

手动提交

同步后,成功会显示绿色,在linux上查看,有代码:

(四)代码实现与参数传递

单词统计案例
import os
import time

from pyspark import SparkContext, SparkConf
import sys

if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = '/opt/installs/jdk'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = '/opt/installs/hadoop'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = '/opt/installs/anaconda3/bin/python3'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = '/opt/installs/anaconda3/bin/python3'

    # 获取sc 对象
    conf = SparkConf().setMaster("spark://bigdata01:7077").setAppName("wordcount单词统计")
    sc = SparkContext(conf=conf)
    print(sc)
    # 编写各种需求代码
    print(sys.argv[0])
    # 读取hdfs上的数据
    fileRdd = sc.textFile(sys.argv[1])
    filterRdd = fileRdd.filter(lambda line:len(line.strip()) > 0)
    flatMapRdd = filterRdd.flatMap(lambda line: line.split())
    mapRdd = flatMapRdd.map(lambda word: (word,1))
    reduceBykeyRdd = mapRdd.reduceByKey(lambda total,num : total + num)
    reduceBykeyRdd.saveAsTextFile(sys.argv[2])

    #time.sleep(10000)
    # 关闭sc
    sc.stop()

运行的话,直接在本地右键运行即可,以上代码需要传递参数,所以在ide工具中写。

运行过程中出现错误
Initial job has not accepted any resources; check your cluster UI to ensure that workers are registe

有可能是以下几个问题:

1、集群资源不足,关闭掉没用的资源

2、目前集群中有其他任务,占用了资源,直接关闭掉

修改配置文件

修改 worker 的内存大小:

如何关闭呢,http://bigdata01:8080

选择任务后面的 【kill】 按钮

参考

spark报错:WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI..._initial job has not accepted any resources; check -CSDN博客

二、集群提交:spark - submit 脚本

(一)复习

在 Pycharm 中开发好的程序要在 Linux 中执行,可以使用

spark - submit

脚本。例如:

/opt/installs/spark/bin/spark - submit --master spark://bigdata01:7077 /opt/installs/spark/examples/src/main/python/pi.py 200
spark - submit

的作用是将写好的代码提交到 Spark 环境(可能是本地或集群环境)运行,它是基于 Linux 平台实现 Spark 程序提交运行的客户端工具。

(二)spark - submit 的用法

任务的命令格式
提交任务的命令:
spark-submit [options] <app jar | pythonfile | R file> [app arguments]

杀死一个正在运行的任务
spark-submit --kill [submission ID] --master [spark://...]

查看某个任务的状态:
spark-submit --status [submission ID] --master [spark://...]

以上这些,其实只需要学习如何提交任务即可,因为我们有图形化界面
http://bigdata01:8080

具体案例讲解
# 提交程序的语法
# spark-submit [可选的选项] Python文件 Python文件中用到的参数
spark-submit --master local[2] / spark://bigdata01:7077 / yarn \
……
hdfs://bigdata01:9820/spark/app/pyspark_core_word_args.py /spark/wordcount/input /spark/wordcount/output
参数详解
  • --master:用于指定程序运行的模式,有本地模式(--master local)、Standalone 模式(--master spark://master:7077)、YARN 模式(--master yarn)等 5 种模式,其作用等同于在代码中设置setMaster
  • --deploy - mode:用于指定 Driver 进程运行位置,重点内容,后面详细讲解。
  • --name:用于指定程序的名称,等同于代码中的setAppName
  • --jars:指定一些额外的 jar 包,如读写 MySQL 时需要的 MySQL 驱动包。
  • --conf:指定当前程序运行的额外配置,等同于代码中的set

(三)Driver 驱动进程和 Executor 计算进程

Spark 程序在集群模式运行时会启动两种进程:Driver 驱动进程和 Executor 计算进程,每种进程运行都需要资源。

  • Driver 进程:每个程序有 1 个,负责申请资源、解析代码、构建 Job/Task、分配调度 Task、监控 Task 运行,构建有向无环图 DAG。
  • Executor 进程:每个程序可以有多个,分布式启动在多台节点上,负责运行 Driver 分配的 Task 任务。可以类比 MR 程序运行在 YARN 上,Driver 类似于 Application Master(项目经理),Executor 类似于 Container(包含 Map Task、ReduceTask 等进程)。

此外,还有一些与资源相关的参数

  • Driver 资源选项: - --driver - memory:指定 Driver 进程能够使用的内存大小,默认是 1G。- --driver - cores:指定 Driver 进程能够使用的 CPU 核数,默认是 1Core。- --supervise:指定如果 Driver 故障,就自动重启。
  • Executor 资源选项: - --executor - memory:指定每个 Executor 能够使用的内存。- --executor - cores:指定每个 Executor 能够使用的 CPU。- --total - executor - cores:Standalone 集群模式下,指定所有 Executor 总共使用的 CPU 核数,用于间接指定 Executor 的个数。- --num - executors:YARN 集群模式下,直接指定 Executor 的个数。- --queue:指定提交程序到哪个队列中运行。

以上这些参数,还可以直接写在代码中,可以配置在conf文件

加载顺序:优先级:代码中配置【set】 > 参数选项【--conf】 > 配置文件【公共配置:spark-defualt.conf】

默认配置 -> 集群管理器配置 -> 环境变量配置 -> 命令行参数 -> 用户提供的SparkConf配置 -> 执行时环境变量配置

优先级最高的:

用户提供的SparkConf配置 > 命令行参数 > 公共的配置(spark-default.conf)> 默认配置

(四)实战案例

WordCount 程序提交示例

上传 Python 代码文件时要注意注释掉代码中的

master

、本地环境变量相关内容,因为代码中的设置优先级较高,去掉这些才能使用

spark - submit

中编写的配置。以下是一个示例代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
import os
import sys

"""
-------------------------------------------------
   Description :    TODO:用于实现词频统计
   SourceFile  :    04.pyspark_core_wordcount_hdfs_args
-------------------------------------------------
"""

if __name__ == '__main__':
    # todo:0-设置系统环境变量
    # os.environ['JAVA_HOME'] = 'D:/jdk1.8.0_241'
    # os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.0'
    # os.environ['PYSPARK_PYTHON'] = 'D:/Anaconda/python.exe'
    # os.environ['PYSPARK_DRIVER_PYTHON'] = 'D:/Anaconda/python.exe'
    # os.environ['HADOOP_USER_NAME'] = 'root'

    # todo:1-构建SparkContext
    # 甚至 任务的名字都可以不写,让提交任务的时候指定
    conf = SparkConf().setAppName("SparkSubmitApp")
        # .setMaster("local[2]")\

    sc = SparkContext(conf=conf)

    # todo:2-数据处理:读取、转换、保存
    # step1: 读取数据:SparkContext对象负责读取文件,用传递的第二个参数作为程序的输入地址
    input_rdd = sc.textFile(sys.argv[1])
    # 输出第一行
    # print(input_rdd.first())
    # 打印总行数
    # print(input_rdd.count())

    # step2: 处理数据
    rs_rdd = input_rdd\
            .filter(lambda line: len(line.strip()) > 0)\
            .flatMap(lambda line: line.strip().split(" "))\
            .map(lambda word: (word, 1))\
            .reduceByKey(lambda tmp,item: tmp+item)

    # step3: 保存结果
    # 打印结果
    rs_rdd.foreach(lambda x: print(x))
    # 结果保存到文件中:路径不能提前存在,将第二个参数作为输出路径
    rs_rdd.saveAsTextFile(sys.argv[2])

    # todo:3-关闭SparkContext
    sc.stop()
编写提交任务的命令
本地模式测试
/opt/installs/spark/bin/spark-submit \
--master local[2] \
/home/pyspark_core_word_args.py \
/home/data.txt \
/home/output01

如果出现以上这个是因为,本地模式之前配置集群时软连接已经被毁了,读取数据默认是去hdfs上读取的,没结果,假如你想运行创建软连接

rm -rf /opt/installs/spark
ln -s /opt/installs/spark-local  /opt/installs/spark
Standalone 模式

创建软连接

rm -rf /opt/installs/spark
ln -s /opt/installs/spark-standalone  /opt/installs/spark

**没指定资源的写法 **

/opt/installs/spark/bin/spark-submit \
--master spark://bigdata01:7077 \
/home/_06_WordCount_远程.py \
hdfs://bigdata01:9820/spark/wordcount/input \
hdfs://bigdata01:9820/spark/wordcount/jiqun01

指定资源的写法

方法一

/opt/installs/spark/bin/spark-submit \
--master spark://bigdata01:7077 \
--driver-memory 512M \
--driver-cores 1 \
--supervise \
--executor-memory 1G \
--executor-cores 1 \
--total-executor-cores 2 \
/home/_06_WordCount_远程.py \
hdfs://bigdata01:9820/spark/wordcount/input \
hdfs://bigdata01:9820/spark/wordcount/jiqun02

**方法二 **

/opt/installs/spark/bin/spark-submit  \
--master spark://bigdata01:7077 \
--deploy-mode client \
--driver-memory 1G  \
--driver-cores 1  \
--supervise  \
--executor-cores 1   \
--executor-memory 1G   \
--total-executor-cores 2  \
/root/pyspark_code/main/Demo06.py /spark/wordcount/input/data.txt /spark/wordcount/output09 

(五)Driver 启动位置:client 模式与 cluster 模式

基本概念

--deploy-mode client | cluster 只有这两个

当你提交任务时,假如等于 client , 此时Driver 进程在提交任务的那台机器上运行。假如在bigdata01上提交任务 spark-submit ,Driver 就在bigdata01上运行。

假如 等于cluster,此时driver 在某一台服务器上运行。集群上哪一台都可以。

pyspark 不支持 cluster ,支持 client

集群模式下任何一个Spark程序都包含两种进程Driver和Executor
程序提交以后,会先启动Driver进程
Driver进程:驱动进程,每一个Spark程序都有1个向主节点申请资源去启动Executor进程
Driver等待所有Executor都启动完成会解析代码变成Task任务
Driver会将Task任务调度分配给Executor去运行,并且监控所有Task运行

Executor进程:计算进程,每一个Spark程序都至少有1个
Executor进程会利用Worker节点的资源运行
所有Executor一旦启动成功,向Driver反向注册负责运行Driver所分配所有Task任务

工作流程对比

step1:客户端提交程序给主节点
step2:主节点会根据提交参数在对应的位置启动Driver进程
step3:Driver向主节点申请启动Executor计算进程
step4:Master根据配置在Worker节点上启动对应的Executor
step5:所有Executor启动成功以后会向Driver反向注册
step6:Driver解析代码,根据代码构建Task,分配给Executor运行,并监控所有Task

集群提交:deploy mode

程序启动之后,Driver在哪里,取决于提交任务的时候deploy mode 的值是什么

DEPLOY_MODE 这个值只有两个,一个是client ,一个是cluster

client:默认值,表示Driver运行在客户端节点上【在哪台机 器提交代码的哪台机器就是客户端】

cluster:建议值,表示Driver可以随机运行在某台从节点上【工作中一般都用cluster模式】

# deploy-mode ==client
/opt/installs/spark/bin/spark-submit \
--master spark://bigdata01:7077 \
--deploy-mode client \
--driver-memory 512M \
--driver-cores 1 \
--supervise \
--executor-memory 1G \
--executor-cores 1 \
--total-executor-cores 2 \
 /home/pyspark_core_word_args.py \
/spark/wordcount/input \
/spark/wordcount/output03

# deploy-mode ==cluster
/opt/installs/spark/bin/spark-submit \
--master spark://bigdata01:7077 \
--deploy-mode cluster \
--driver-memory 512M \
--driver-cores 1 \
--supervise \
--executor-memory 1G \
--executor-cores 1 \
--total-executor-cores 2 \
 /home/pyspark_core_word_args.py \
/spark/wordcount/input \
/spark/wordcount/output05

client 模式,该模式下,Driver 充当了非常重要的角色,任务在运行的时候,必须保证Driver的服务正常运行。Driver需要做的事情很多,任务在运行过程中,Driver不能走。

cluster 模式:在该模式下,client端,提交完之后,就可以走了,Driver进程放在了AppMaster里面,spark集群将任务执行完即可。

假如deploy-mode==client 可以理解为胖客户端模式,deploy-mode==cluster 可以理解为瘦客户端模式。

Exception in thread "main" org.apache.spark.SparkException: Cluster deploy mode is currently not supported for python applications on standalone clusters.

目前的版本中,pysparks 在在不支持模式为cluster的写法,所以会报以上错误,请知晓,换成scala等是不会有问题的。

以下这两张是Yarn集群的图,帮助理解概念。

client模式的一个示意图:

cluster模式:

区别:yarn 模式下就有 appmaster

client模式: Driver进程和AppMaster是共存关系,各玩各的

cluster模式:Driver进程和AppMaster合二为一,Driver在AppMaster里面。

deploy-mode 有 cluster 和 client 两个可选参数,默认为 client。这里以 Spark On Yarn 模式对两者进行说明 :

  • 在 cluster 模式下,Spark Drvier 在应用程序的 Master 进程内运行,该进程由集群上的 YARN 管理,提交作业的客户端可以在启动应用程序后关闭;
  • 在 client 模式下,Spark Drvier 在提交作业的客户端进程中运行,Master 进程仅用于从 YARN 请求资源。

spark-submit 提交任务时,client模式和cluster模式有何不同。
以Yarn集群为例:
client模式: Driver进程和AppMaster是 不在一起的,各玩各的。Driver进程在提交命令的电脑上运行,运行期间,该服务器不能停止,因为Client 在这个模式下起的作用很大。
cluster模式:Driver进程和AppMaster合二为一,Driver在AppMaster里面。Client端仅仅是提交了代码到集群,提交完就没有什么事情了,可以关闭。

三、总结

    本文详细介绍了 Spark 程序从本地开发远程提交到集群测试的方法,以及使用
spark - submit

脚本在集群模式下提交程序的相关知识。在本地开发时,通过合理配置集群环境和同步代码目录,可以方便地将代码提交到集群运行。而在集群提交方面,

spark - submit

脚本的参数配置至关重要,不同的参数如

--master

--deploy - mode

、各种资源相关参数等决定了程序在集群中的运行模式和资源分配情况。特别是

--deploy - mode

client

cluster

模式,在 Driver 进程的启动位置和整个程序的运行机制上有很大区别。理解这些内容对于正确开发和高效运行 Spark 程序,充分利用集群资源来处理大数据任务具有重要意义。无论是在开发过程中遇到资源问题的排查,还是根据实际场景选择合适的提交模式和参数配置,都需要对这些知识有深入的掌握,以便更好地发挥 Spark 在大数据处理中的优势。


本文转载自: https://blog.csdn.net/weixin_64726356/article/details/143559604
版权归原作者 天冬忘忧 所有, 如有侵权,请联系我们删除。

“Spark 程序开发与提交:本地与集群模式全解析”的评论:

还没有评论