0


spark-shell(pyspark)单机模式使用和编写独立应用程序

spark有四种部署方式:Local,Standalone,Spark on Mesos,Spark on yarn。第一个为单机模式,后三个为集群模式。

spark-shell支持python和scala,这里使用python。

1.启动pyspark环境

在spark安装目录下

./bin/pyspark

进入之后,如下图:

2.编写程序

新建代码文件WordCount.py,并编写程序

touch WordCount.py
vim WordCount.py
from pyspark import SparkConf, SparkContext
# 使用本地模式启动
conf = SparkConf().setMaster("local").setAppName("My App")
# 生成一个SparkContext对象
sc = SparkContext(conf=conf)
# 设置文件路径
logFile = "file:///opt/servers/spark/README.md"
# 读取README.md文件生成的RDD
logData = sc.textFile(logFile, 2).cache()
# 分别统计RDD元素中包含字母a和b的行数
numAS = logData.filter(lambda line: 'a' in line).count()
numBs = logData.filter(lambda line: 'b' in line).count()
# 打印输出结果
print('Lines with a: %s, Lines with b: %s' % (numAS, numBs))

运行代码:python3 WordCount.py

如果报如下错误:

python3 WordCount.py
Traceback (most recent call last):
File "WordCount.py", line 1, in <module>
from pyspark import SparkConf, SparkContext
ModuleNotFoundError: No module named 'pyspark'
说明没有pyspark模块。

进入python安装目录下的lib/site-packages目录下,使用pip下载安装pyspark,这里使用国内清华大学镜像网站。

pip install pyspark -i http://pypi.tuna.tsinghua.edu.cn/simple/ --trusted-host pypi.tuna.tsinghua.edu.cn

下载一个镜像,中间出了好几个问题,被我记录在

pip安装更新第三方库报错解决-CSDN博客

安装pyspark库成功后,重新运行代码,然后还是报错

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
/usr/local/python3/lib/python3.7/site-packages/pyspark/context.py:317: FutureWarning: Python 3.7 support is deprecated in Spark 3.4.
warnings.warn("Python 3.7 support is deprecated in Spark 3.4.", FutureWarning)
Traceback (most recent call last):
File "WordCount.py", line 11, in <module>
numAS = logData.filter(lambda line: 'a' in line).count()
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2297, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2273, in sum
0, operator.add
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2025, in fold
vals = self.mapPartitions(func).collect()
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 1814, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 5442, in _jrdd
self.ctx, self.func, self._prev_jrdd_deserializer, self._jrdd_deserializer, profiler
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 5250, in _wrap_function
sc._javaAccumulator,
TypeError: 'JavaPackage' object is not callable

是因为pyspark版本过高, 改成3.2.0版本的就可以了

pip3 install pyspark==3.2.0 -i http://pypi.tuna.tsinghua.edu.cn/simple/ --trusted-host pypi.tuna.tsinghua.edu.cn
Looking in indexes: http://pypi.tuna.tsinghua.edu.cn/simple/

再次运行代码,运行结果如下:

3.通过spark-submit运行程序

进入spark安装目录下的bin中

./spark-submit WordCound.py的绝对路径
省略了<master-url>参数,默认本地模式

运行结果如下 (部分截图):

在这个过程中产生了许多其他信息干扰,可以通过修改log4j的日志信息显示级别,来消除干扰信息。

进入spark安装目录下的配置文件夹conf

cp log4j2.properties.template log4j2.properties

我的是spark-3.3.3版本,日志文件名字为log4j2.properties.template,每个版本的日志文件名字不太一样,具体的,要按照自己安装的版本的日志文件来,复制日志文件。

编辑日志文件

vim log4j2.properties

把显示控制台的信息改为error,保存并退出。

再次使用spark-submit运行python文件,结果如下

已经没有其他信息干扰了。

标签: spark 大数据

本文转载自: https://blog.csdn.net/m0_68131322/article/details/136895813
版权归原作者 r囧r小猫♚ 所有, 如有侵权,请联系我们删除。

“spark-shell(pyspark)单机模式使用和编写独立应用程序”的评论:

还没有评论