目录
一、PySpark实战
1、前言介绍
Spark:Spark是用于大规模数据处理的统一分析引擎,是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB、EB等海量数据
pySpark:Spark对python的支持,就体现在python的第三方库pySpark上
2、基础准备
a、pySpark库的安装
命令:pip install pyspark
不知道操作步骤的可以看此文章 第六节 安装第三方python包
b、构建pySpark执行环境入口对象
from pyspark importSparkConf,SparkContext
#创建SparkConf类对象 setMaster:设置运行模式 setAppName:当前spark类的名称
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
#基于SparkConf类对象创建SparkContext类对象
sc =SparkContext(conf=conf)
#打印pySpark的运行版本
print(sc.version)
#停止Sparkcontext对象的运行
sc.stop()
c、pySpark编程模型
- 数据输入:通过SparkContext类对象的成员方法完成数据的读取操作,读取后得到RDD类对象
- 数据处理:通过RDD类对象的成员方法完成各种数据计算的需求
- 数据输出:将处理完成后的RDD对象调用各种成员方法完成写出文件等操作
3、数据输入
a、python数据容器转RDD对象
- 支持的数据容器有:list,tuple,set,dict,str
- str容器会输出单个字符,字典容器会输出所有key,其他容器会输出原本内容
from pyspark importSparkConf,SparkContext
#定义数据容器
list =['1','2','3']
tuple =('1','2','3')
set ={'1','2','3'}
dict ={'1':'abc','2':'def','3':'ghi'}
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
#将数据容器转换为RDD
rdd = sc.parallelize(dict)print(rdd.collect())
sc.stop()
b、读取文件内容转RDD对象
- 文件的每一行会变为一个元素
如创建一个文件,内容如图。
用下面代码取文件内容转换为RDD对象并输出
from pyspark importSparkConf,SparkContext
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
rdd = sc.textFile(文件地址)print(rdd.collect())
sc.stop()
输出结果为:
['这是一个文件内容。', '但是', '这才是第三行内容', '你猜这是第几行', '对了,这是第五行']
4、数据计算
RDD中内置了丰富的成员方法,也叫“算子”
a、map算子
- 功能:将RDD的数据一条一条处理(处理的逻辑是基于map算子中接收的处理函数),返回新的RDD
- 多个map方法之间可以链式调用
案例1:将list中的每个元素都乘以10
from pyspark importSparkConf,SparkContext
#如果报错Python worker failed toconnect back,需要引入os设置python安装位置
importos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list =['1','2','3']
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
rdd = sc.parallelize(list)
#写法一
# rdd2 = rdd.map(lambda x:int(x)*10)
#写法二
def func(x):returnint(x)*10
rdd2 = rdd.map(func)
rdd2 = rdd.map(func)print(rdd2.collect())
sc.stop()
结果为:
[10,20,30]
案例2:将list中的每个元素都先乘以10,再加上5,分为两个map写
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list =['1','2','3']
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
rdd = sc.parallelize(list)
rdd2 = rdd.map(lambda x:int(x)*10).map(lambda x:int(x)+5) #这里支持链式调用
print(rdd2.collect())
sc.stop()
b、flatMap算子
- 功能:对RDD执行map操作,然后解除嵌套操作
- 解除嵌套:假如输入的list的多层嵌套的,那么最后的结果全部元素都为list的一层
案例:将多层嵌套的 list 取出所有元素放到一层中
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list =[[1,2,3],[4,5,6],[7,8]]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
rdd = sc.parallelize(list)
#写法一
rdd2 = rdd.flatMap(lambda x: x)print(rdd2.collect())
sc.stop()
结果为:
[1,2,3,4,5,6,7,8]
c、reduceByKey算子
- 功能:针对KV型的RDD,自动按照key分组,然后根据提供的聚合逻辑,完成组内数据(value)的聚合操作
- KV型的RDD其实就是二元元组,比如:[(‘a’,1) , (‘b’,1) , (‘c’,1)],每个元组中第一个值为key,第二个值为value
案例:将男女分组,并且计算两组的分数总和
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list =[('男',99),('男',88),('女',77),('女',66),('男',55)]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
rdd = sc.parallelize(list)
# 写法一
rdd2 = rdd.reduceByKey(lambda a, b: a + b)print(rdd2.collect())
sc.stop()
结果为:
[('女',143),('男',242)]
d、综合案例
读取文件内容,统计各个元素出现次数,文件内容如下:
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list1 =[]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
#读取文件内容
rdd = sc.textFile("C://Users//HLY//Desktop//test.txt")
#先将内容切割成单个元素并一层展示,再将元素设置成二元元组,最后将元素分组统计
rdd2 = rdd.flatMap(lambda x : x.strip().split(" ")).map(lambda x :(x,1)).reduceByKey(lambda a, b: a + b)print(rdd2.collect())
sc.stop()
结果为:
[('test2',3),('test3',4),('test',3),('test1',3),('test4',4),('test5',4)]
e、filter算子
- 功能:过滤想要的数据进行保留
案例:过滤出所有偶数
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list1 =[1,2,3,4,5,6,7,8,9,10]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
#读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.filter(lambda x : x %2==0)print(rdd2.collect())
sc.stop()
结果为:
[2,4,6,8,10]
f、distinct算子
- 功能:对RDD中的数据进行去重,返回新的RDD
案例:对已有的列表进行去重
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list1 =[1,1,2,2,2,2,3,3,3,4,4,4]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
#读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.distinct()print(rdd2.collect())
sc.stop()
结果为:
[1,2,3,4]
g、sortBy算子
- 功能:对RDD数据进行排序,基于指定的排序依据
- 参数: - func:告知RDD是对那个数据进行排序,比如lambda x:x[1] 表示对rdd中第二列元素进行排序- ascending:True升序,False降序- numPartitions:用多少分区排序,单个分区时传1
案例:对给出的二元集合根据第二个元素进行降序排列
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list1 =[('test',1),('test1',5),('test3',2),('test4',4),('test5',8),('test6',7),('test7',6)]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd2 = rdd.sortBy(lambda x: x[1], ascending=False)print(rdd2.collect())
sc.stop()
结果为:
[('test5',8),('test6',7),('test7',6),('test1',5),('test4',4),('test3',2),('test',1)]
5、数据输出
a、collect算子
- 功能:将RDD各个分区内的数据统一收集到Driver当中,形成一个List对象
案例:输出RDD的内容
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list1 =[1,2,3,4,5]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)print(rdd.collect())
sc.stop()
结果为:
[1,2,3,4,5]
b、reduce算子
- 对RDD的全部数据按照传入的逻辑进行聚合,返回一个数字
案例:计算列表中的所有元素和
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list1 =[1,2,3,4,5]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
num = rdd.reduce(lambda a, b: a + b)print(num)
sc.stop()
结果为:
15
c、take算子
- 功能:取RDD的前N个元素,组成 List 返回
案例:取出列表前3个元素
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list1 =[1,2,3,4,5]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
list = rdd.take(3)print(list)
sc.stop()
结果为:
[1,2,3]
e、count算子
- 功能:计算RDD有多少条数据,返回一个数字
案例:获取列表中的元素个数
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
list1 =[1,2,3,4,5]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
num = rdd.count()print(num)
sc.stop()
结果为:
5
f、saveAsTextFile算子
- 功能:将RDD的数据写入文本文件中
- 执行此方法需要安装hadoop环境,具体配置过程可以看 这篇文章
- 其输出内容是根据区分决定的,有多少分区就会输出多少个文件。内容会均匀分摊到各个文件中。分区数默认与电脑的CPU内核一致
案例1:输出列表内容到各个文件中
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"]="D:/hadoop/hadoop-3.0.0"
list1 =[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()
结果会生成16个内容文件和2个状态文件,并且16个内容文件中每个文件中都有一个数字
案例2:将列表内容输出到一个文件中
#方法一:配置全局并行度为1
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"]="D:/hadoop/hadoop-3.0.0"
list1 =[1,2,3,4,5]
#设置全局并行度为1
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app").set("spark.default.parallelism","1")
sc =SparkContext(conf=conf)
# 读取文件内容
rdd = sc.parallelize(list1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()
#方法二:设置分区数为1
from pyspark importSparkConf,SparkContextimportos
os.environ["PYSPARK_PYTHON"]="F:/Python/Python311/python.exe"
os.environ["HADOOP_HOME"]="D:/hadoop/hadoop-3.0.0"
list1 =[1,2,3,4,5]
conf =SparkConf().setMaster("local[*]").setAppName("test_spark_app")
sc =SparkContext(conf=conf)
#设置分区数为1
rdd = sc.parallelize(list1,numSlices=1)
rdd.saveAsTextFile("C:/Users/HLY/Desktop/test")
sc.stop()
最后会生成1个结果文件,3个其他文件,并且内容都会在 part-00000 文件中显示
版权归原作者 骨力 所有, 如有侵权,请联系我们删除。