0


(一)PySpark3:安装教程及RDD编程(非常详细)

一、pyspark介绍

Apache Spark是一个用于大数据处理的开源分布式计算框架,而PySpark则是Spark的Python 实现。PySpark允许使用Python编程语言来利用Spark的强大功能,使得开发人员能够利用Python的易用性和灵活性进行大规模数据处理和分析。

PySpark与Spark-Scala的对比:

1、语言选择:
PySpark: 使用简洁而易学的Python作为编程语言,这使得PySpark学习难度大大降低。
Spark-Scala: 使用Scala作为主要编程语言。Scala是一门运行在Java虚拟机上的多范式编程语言,更接近Java,并具有强大的面向对象和函数式编程特性,但是其学习曲线较陡。

2、性能:
PySpark:由于Python是解释型语言,相比Scala的原生Spark可能会有性能上的一些损失。但通过PySpark的DataFrame和优化技术也可以显著提高性能。
Spark-Scala:使用Scala编写的Spark应用程序可能在性能上略优于PySpark,因为Scala是一门静态类型语言,而且运行在Java虚拟机上。

4、生态系统支持:
PySpark:可与Python的生态系统(如NumPy、Pandas)以及其他大数据工具和库进行集成。
Spark-Scala:由于运行在JVM上,可以利用Java生态系统,但Scala本身的生态系统相对较小。

5、机器学习支持:
PySpark: 提供了MLlib库,支持在分布式环境中进行大规模的机器学习任务。
Spark-Scala: 同样支持MLlib,但在API的使用上可能相对繁琐一些。

总体而言,PySpark强于数据分析,Spark-Scala强于性能。如果应用场景有非常多的可视化和机器学习算法需求,推荐使用pyspark,可以更好地和python中的相关库配合使用。

前置知识:

1、熟悉Spark RDD原理,了解RDD常用算子
2、具有Python编码能力,熟悉Python中numpy, pandas库的基本用法。
3、了解机器学习算法原理,如逻辑回归、决策树等等
4、需要安装:JDK、Anaconda

二、PySpark安装

首先安装spark,本文使用的安装文件为:spark-3.2.1-bin-hadoop3.2。即Spark版本为3.2.1,Hadoop可不安装,对本文后续代码运行无影响。

百度云链接如下:

链接:https://pan.baidu.com/s/1GmPZBoBtSZWJtPHqm-DhwA?pwd=bcm5 
提取码:bcm5

将下载的安装文件解压到指定目录即可,例如我的目录:D:\bigdata\spark-3.2.1-bin-hadoop3.2

配置系统变量:

此电脑-右键点击“属性”-高级系统设置-环境变量-系统变量

#系统变量新建
SPARK_HOME D:\bigdata\spark-3.2.1-bin-hadoop3.2 #换成你的解压目录
PYSPARK_DEIVER_PYTHON_OPTS notebook
PYSPARK_DEIVER_PYTHON ipython
PYTHONPATH %SPARK_HOME%\python\lib\py4j;%SPARK_HOME%\python\lib\pyspark

#path添加
%SPARK_HOME%\bin

修改配置文件:
在解压路径目录conf下,复制文件spark-env.sh.template,修改文件名为spark-env.sh

修改配置文件spark-env.sh,在文件末尾添加以下代码:

#D:\Anaconda3换成你的anaconda安装目录
export PYSPARK_PYTHON=D:\Anaconda3
export PYSPARK_DRIVER_PYTHON=D:\Anaconda3
export PYSPARK_SUBMIT_ARGS='--master local[*]'
#local[*]  是利用所有的资源

以上步骤完成,spark已经安装完成。

接下来在Anaconda创建虚拟环境,安装相关python库。需要注意,Python安装的pyspark版本必须与前面安装的spark版本一致。

#创建虚拟环境
conda create -n spark python=3.8

#进入虚拟环境
conda activate spark

#安装相关包
pip install pyspark==3.2.1 findspark pyhive notebook pandas

三、RDD编程

第一步,初始化Spark环境,创建一个Spark应用程序:

import findspark
import pyspark 
from pyspark import SparkContext, SparkConf
findspark.init()

conf = SparkConf().setAppName("test").setMaster("local[4]")
sc = SparkContext(conf=conf)
#sc.stop()  #关闭spark上下文

以上代码中,首先创建一个SparkConf对象,用于配置Spark应用程序。通过setAppName设置应用程序的名称为"test",通过setMaster设置运行模式为本地模式,使用4个本地线程。

随后,创建一个SparkContext对象,它是与Spark集群通信的主要入口点。一旦SparkContext被创建,就可以使用它来执行各种分布式计算任务。

1、创建RDD

RDD(Resilient Distributed Dataset):是 Spark 中的核心数据结构,代表分布在集群节点上的不可变、弹性(可容错)、可并行计算的数据集。RDD 可以分为多个分区,每个分区可以在集群中的不同节点上进行并行处理。

①用textFile方法加载本地或者集群文件系统中的数据

#从本地文件系统中加载数据
file = "./data/test.txt"
rdd = sc.textFile(file,3)

#从集群文件系统中加载数据
file = "hdfs://localhost:9000/user/data/test.txt"
#也可以省去hdfs://localhost:9000
rdd = sc.textFile(file,3)

②用parallelize方法将Driver中的数据结构并行化成RDD。

rdd = sc.parallelize(range(1,11),2)

2、常用Action操作

其主要特点如下:

触发计算:Action操作是Spark计算的触发点,当调用 Action操作时,Spark将执行整个RDD的计算流程,并生成最终结果。这与Transformation操作的惰性计算形成对比。

输出结果:Action操作生成非惰性结果,即它们会立即执行计算并返回实际的结果。可以将计算结果返回到本地驱动程序,也可以将结果写入外部存储系统(如 HDFS、数据库等),另外还可以将结果缓存到驱动程序或本地内存中(但对于大型数据集来说可能会导致内存问题)。

①collect

**collect()**是一个action操作,用于从RDD中收集所有元素到Driver节点,形成一个本地的数据集(数组)。

rdd = sc.parallelize(range(1,11),2)
rdd.collect()

运行结果:

[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

注意:collect()操作将整个RDD中的数据收集到Driver节点的内存中,在大规模数据集上执行该操作可能导致内存不足或性能问题。
因此,collect()操作适用于小规模的结果集,用于调试和查看数据。对于大规模数据集,更常见的做法是使用转换操作和行动操作组合来实现分布式计算,最后将结果写到外部存储或以其他方式处理。

②take

**rdd.take(n)**用于获取RDD中的前n个元素。不同于collect(),take(n)只取前n个元素,因此在处理大规模数据时更为高效。

rdd = sc.parallelize(range(1,11),2)
rdd.take(4)

输出结果:

[1, 2, 3, 4]
takeSample

**rdd.takeSample(withReplacement, num, seed=None)**用于从RDD中获取指定数量的随机样本。

参数说明:
withReplacement:布尔值,表示是否允许采样时元素的重复抽取。如果为 True,则允许重复抽取;如果为 False,则不允许。
num:要获取的样本数量。
seed:可选的种子值,用于控制随机数生成。如果提供了相同的种子值,多次调用takeSample将产生相同的样本。

rdd = sc.parallelize(range(1,11),2)
rdd.takeSample(False,5,0)

输出结果:

[8, 9, 2, 6, 4]
first

**first()**用于获取RDD中的第一个元素,对于大型数据集的性能较好。

rdd = sc.parallelize(range(1,11),2)
rdd.first()

输出结果:

1
⑤count

**count()**用于获取RDD中元素的总数量,以了解数据规模。

rdd = sc.parallelize(range(1,11),2)
rdd.count()

输出结果:

10
⑥reduce

**reduce()**用于对RDD中的元素进行规约操作,两两结合进行某种操作后继续与下一个元素结合,直到规约成一个最终的结果。reduce()通常用于执行可以并行化的可交换和可结合的操作,例如对数字进行加法或求和。这样的操作可以在每个分区上并行执行,然后合并结果。

#计算0+1+2+3+4+5+6+7+8+9
rdd = sc.parallelize(range(10),5) 
rdd.reduce(lambda x,y:x+y)

输出结果:

45
⑦foreach

**rdd.foreach()**用于对RDD中的每个元素应用指定的函数,与map不同,foreach 是一个行动操作,它会在每个分区上并行地对每个元素执行给定的函数。

rdd = sc.parallelize(range(10),5) 
accum = sc.accumulator(0)
rdd.foreach(lambda x:accum.add(x))
print(accum.value)

输出结果:

45

在以上代码中,通过sc.accumulator()创建了一个累加器,并初始化其值为0。然后使用rdd.foreach()对RDD中的每个元素执行匿名函数,该函数将元素的值累加到累加器中。由于累加器是在分布式环境中共享的,因此每个节点上的累加器都能够更新。

countByKey

**rdd.countByKey()**用于统计 (key, value) 对的RDD中每个key的出现次数。

pairRdd = sc.parallelize([("hello",1),("world",4),("hello",9),("something",16)]) 
pairRdd.countByKey()

输出结果:

defaultdict(int, {'hello': 2, 'world': 1, 'something': 1})
saveAsTextFile

**saveAsTextFile()**用于将RDD的内容保存为文本文件,即将分布式数据集的结果写入到本地文件系统或分布式文件系统(如HDFS)中。

#saveAsTextFile保存rdd成text文件到本地
text_file = "./test/rdd.txt"
rdd = sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)

#重新读入会被解析文本
rdd_loaded = sc.textFile(text_file)
rdd_loaded.collect()

3、常用Transformation操作

Transformation操作是对RDD进行变换的操作,它们不会立即执行,而是构建了一个表示要在RDD上执行的操作的执行计划。Transformation操作是为了支持分布式计算而设计的。它们在整个集群上并行运行,并利用RDD的不可变性和分区的概念来实现高效的分布式处理。通过构建逻辑执行计划,Spark可以优化计算并在整个集群上分布计算任务,以提高性能。

其主要特点如下:

惰性计算:当应用一个Transformation操作时,Spark只是记录了该操作的存在,并没有实际执行计算。实际的计算将会在Action操作触发时进行。

生成新的RDD:由于RDD一旦创建就不能被修改,所以Transformation操作通常生成一个新的RDD,而不是修改原始的RDD。

窄/宽依赖:Transformation操作可以分为窄依赖和宽依赖。
窄依赖指每个父分区中的数据仅依赖于该父分区的数据,例如map操作。
宽依赖指某个父分区的数据可能依赖于多个父分区的数据,例如groupByKey和reduceByKey操作,这会导致数据的重新分区,因此可能引起数据的Shuffle。

①map

**rdd.map()**用于对RDD中的每个元素应用一个指定的函数,并返回一个包含应用函数后结果的RDD。rdd.map()接受一个函数作为参数(可以是lambda匿名函数),该函数将被应用到RDD中的每个元素。

rdd = sc.parallelize(range(10),3)
rdd.map(lambda x:x**2).collect()

输出结果:

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
②filter

**rdd.filter()**用于对RDD中的元素进行过滤,并返回新RDD,其中包含满足指定条件的原始RDD中的元素。

rdd = sc.parallelize(range(10),3)
rdd.filter(lambda x:x>5).collect()

输出结果:

[6, 7, 8, 9]
flatMap

**rdd.flatMap()**即在rdd.map()的基础上将所有的结果扁平化为一个新的RDD。

rdd = sc.parallelize(["hello world","hello China"])
print(rdd.map(lambda x:x.split(" ")).collect())
print(rdd.flatMap(lambda x:x.split(" ")).collect())

输出结果:

[['hello', 'world'], ['hello', 'China']]
['hello', 'world', 'hello', 'China']
sample

**rdd.sample()**用于从RDD每个分区按照比例随机抽样一部分元素,生成新的RDD。

参数如下:

withReplacement:是否放回抽样。true-有放回,false-无放回
fraction:期望样本的大小作为RDD大小的一部分。fraction范围在[0,1],即表示选择每个元素的概率。fraction大于1时,即表示选择每个元素的期望次数。
seed:随机数生成器的种子。

rdd = sc.parallelize(range(10),2)

#每个元素被抽到的概率为0.5,但输出的元素不一定是5个
print(rdd.sample(withReplacement=False, fraction=0.5,seed=0).collect())

#每个元素被抽到的期望次数是2,但输出的元素不一定是20个
print(rdd.sample(withReplacement=True, fraction=2,seed=0).collect())

输出结果:

[1, 4, 5, 7]
[0, 0, 1, 1, 1, 1, 2, 2, 2, 2, 3, 4, 4, 4, 4, 5, 5, 5, 6, 6, 6, 7, 8, 9]
⑤distinct

**rdd.distinct()**对原始RDD进行去重。

rdd = sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()

输出结果:

[4, 1, 5, 2, 3]
⑥subtract

**rdd1.subtract(rdd2)**用于计算两个RDD的差集,返回在rdd1中但不在rdd2中的元素。

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.subtract(rdd2).collect()

输出结果:

[1, 2, 3]
union

**rdd1.union(rdd2)**用于计算两个RDD的并集,需要注意返回结果中可能带有重复元素

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.union(rdd2).collect()

输出结果:

[1, 2, 3, 4, 5, 4, 5, 6, 7, 8]
intersection

**rdd1.intersection(rdd2)**用于计算两个RDD的交集。

rdd1 = sc.parallelize([1, 2, 3, 4, 5])
rdd2 = sc.parallelize([4, 5, 6, 7, 8])
rdd1.intersection(rdd2).collect()

输出结果:

[4, 5]
cartesian

**rdd1.cartesian(rdd2)**用于计算两个RDD的笛卡尔积。

rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize(['a', 'b', 'c'])
rdd1.cartesian(rdd2).collect()

输出结果:

[(1, 'a'),
 (1, 'b'),
 (1, 'c'),
 (2, 'a'),
 (2, 'b'),
 (2, 'c'),
 (3, 'a'),
 (3, 'b'),
 (3, 'c')]
sortBy

**rdd.sortBy()**用于对RDD中的元素按照指定的排序键进行排序。

参数如下:

rdd.sortBy(keyfunc, ascending=True, numPartitions=None)
keyfunc用于从 RDD 的每个元素中提取用于排序的键,可以是lambda匿名函数。
ascending表示排序的顺序。 True为升序,False为降序。
numPartitions表示返回结果RDD的分区数。

data = [(1, 'apple'), (3, 'orange'), (2, 'banana'), (4, 'grape')]
rdd = sc.parallelize(data)
rdd.sortBy(lambda x: x[0], ascending=True,numPartitions=2).collect()

输出结果:

[(1, 'apple'), (2, 'banana'), (3, 'orange'), (4, 'grape')]
⑪zip

**rdd1.zip(rdd2)**按照拉链方式将两个RDD中的元素一对一地合并成元组,效果类似python的zip函数,需要两个RDD具有相同的分区,每个分区元素数量相同。

rdd1 = sc.parallelize([1, 2, 3, 4, 5],2)
rdd2 = sc.parallelize(['a', 'b', 'c', 'd', 'e'],2)
rdd1.zip(rdd2).collect()

输出结果:

[(1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 'e')]
zipWithIndex

**rdd.zipWithIndex()**用于将RDD中的每个元素与其在 RDD 中的索引位置一对一地合并成元组。

rdd = sc.parallelize([10, 20, 30, 40, 50])
rdd.zipWithIndex().collect()

输出结果:

[(10, 0), (20, 1), (30, 2), (40, 3), (50, 4)]

4、常用Transformation操作(键值对)

PairRDD中的元素是键值对,Spark提供了针对键值对的一系列转换和操作,使得对数据进行分组、聚合和排序等操作更加方便。

reduceByKey

**rdd.reduceByKey()**用于对RDD进行聚合的Transformation操作,将具有相同键的所有值根据提供的聚合函数进行合并。

参数如下:
reduceByKey(func, numPartitions=None, partitionFunc=<function portable_hash>)
func: 聚合函数,接受两个参数,用于将相同键的值进行合并。
numPartitions: 可选参数,用于指定返回结果RDD的分区数。
partitionFunc: 可选参数,用于指定分区函数,默认为哈希分区函数。

data = [('x', 3), ('y', 5), ('x', 2), ('y', 1), ('x', 1), ('z', 4)]
rdd = sc.parallelize(data)
rdd.reduceByKey(lambda x, y: x + y).collect()

输出结果:

[('y', 6), ('x', 6), ('z', 4)]
groupByKey

**rdd.groupByKey()**用于对PairRDD进行分组的Transformation操作,将相同键的所有值放在一个迭代器中并返回新RDD,适用于不涉及值的聚合操作,只需按键进行分组的情况。不过groupByKey()可能导致数据倾斜,特别是当某个键对应的值非常多时。另外,生成的结果是以键值对形式的迭代器,存在大量数据时可能导致内存溢出。

data = [('x', 3), ('y', 5), ('x', 2), ('y', 1), ('x', 1), ('z', 4)]
rdd = sc.parallelize(data)
print(rdd.groupByKey().collect())

for key, values in rdd.groupByKey().collect():
    print(f"{key}: {list(values)}")

输出结果:

[('y', <pyspark.resultiterable.ResultIterable object at 0x00000259F5816C10>), ('x', <pyspark.resultiterable.ResultIterable object at 0x00000259F574C790>), ('z', <pyspark.resultiterable.ResultIterable object at 0x00000259F574C490>)]
y: [5, 1]
x: [3, 2, 1]
z: [4]
sortByKey

**rdd.sortByKey()**用于对PairRDD进行按键排序的Transformation操作,按照键进行排序,并返回一个新RDD。sortByKey()可能导致数据倾斜,特别是当某个键对应的值非常多时。在数据量庞大时,可能会影响性能,因为需要将数据在不同分区间移动以进行排序。

参数如下:
ascending:表示排序的顺序。True为升序(默认),False为降序。

data = [(3, 'x'), (5, 'y'), (2, 'z'), (4, 's')]
rdd = sc.parallelize(data)
rdd.sortByKey().collect()

输出结果:

[(2, 'z'), (3, 'x'), (4, 's'), (5, 'y')]
④**join / **leftOuterJoin / rightOuterJoin

**rdd1.join(rdd2)**用于对两个PairRDD进行连接的Transformation操作,根据键将两个 PairRDD 中的元素进行连接。类似SQL中的inner join。

**rdd1.leftOuterJoin(rdd2)、rdd1.rightOuterJoin(rdd2)**分别是左关联、右关联。如果另一侧PairRDD 中没有匹配的键,则对应位置的值为None。

data1 = [('Tom', 18), ('Jerry', 19), ('Alice', 17)]
data2 = [('Tom', 'male'), ('Bob', 'male'), ('Alice', 'female')]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)
print(rdd1.join(rdd2).collect())
print(rdd1.leftOuterJoin(rdd2).collect())
print(rdd1.rightOuterJoin(rdd2).collect())

输出结果:

[('Tom', (18, 'male')), ('Alice', (17, 'female'))]
[('Jerry', (19, None)), ('Tom', (18, 'male')), ('Alice', (17, 'female'))]
[('Tom', (18, 'male')), ('Bob', (None, 'male')), ('Alice', (17, 'female'))]
cogroup

**rdd1.cogroup(rdd2)**用于对两个PairRDD进行先后两次分组连接的Transformation操作,相当于对rdd1、rdd2分别进行goupByKey,再对两个结果进行groupByKey。

rdd1 = sc.parallelize([("a", 1), ("a", 2), ("b", 3)])
rdd2 = sc.parallelize([("a", 3), ("b", 4)])
[(x, tuple(map(list, y))) for x, y in sorted(list(rdd1.cogroup(rdd2).collect()))]

输出结果:

[('a', ([1, 2], [3])), ('b', ([3], [4]))]
subtractByKey

**rdd1.subtractByKey(rdd2)**用于对两个PairRDD求差集的Transformation操作,即返回在rdd1中而不在rdd2中的元素。

rdd1 = sc.parallelize([("x", 1), ("y", 2), ("z", 3)])
rdd2 = sc.parallelize([("x", 3), ("y", 4)])
rdd1.subtractByKey(rdd2).collect()

输出结果:

[('z', 3)]
foldByKey

foldByKey的操作和reduceByKey类似,但是foldByKey可以提供一个初始值

data = [('x', 1), ('x', 2), ('x', 3),  ('y', 1), ('y', 2), ('z', 1)]
rdd = sc.parallelize(data)
print(rdd.foldByKey(0, lambda x, y: x + y).collect())
print(rdd.foldByKey(1, lambda x, y: x + y).collect())

输出结果:

[('y', 3), ('x', 6), ('z', 1)]
[('y', 5), ('x', 8), ('z', 2)]

5、分区操作

RDD分区操作主要分为调整分区与转换分区操作。

调整分区操作用于调整已有RDD的分区结构,不改变数据的物理位置,仅影响分区元数据,通常性能开销较小。例如:分区数调整,即调整现有RDD的分区数,但不移动数据。

转换分区操作改变了RDD的分区结构,通常是在数据上执行Transformation操作,产生一个新的RDD,其分区数可能发生变化。例如通过指定的分区数或者使用一些具体的分区算法,重新组织数据分区。在数据的重新组织过程中可能涉及跨分区的数据移动,通常伴随着性能开销。

①glom

**rdd.glom()**用于将每个分区的数据转换为一个数组,是Transformation操作。适用于需要对每个分区进行整体操作的场景。

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
rdd.glom().collect()

输出结果:

[[1, 2, 3], [4, 5, 6], [7, 8, 9, 10]]
②HashPartitioner

**rdd.HashPartitioner()**用于对PairRDD进行哈希分区,即根据键的哈希值将数据划分到不同的分区中。

data = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
rdd1 = sc.parallelize(data)
rdd2 = rdd1.partitionBy(2)

for partition, data in enumerate(rdd2.glom().collect()):
    print(f"Partition {partition}: {data}")

输出结果:

Partition 0: [('b', 2), ('c', 3), ('d', 4)]
Partition 1: [('a', 1), ('e', 5)]
③**mapPartitions / **mapPartitionsWithIndex

**rdd.mapPartitions()**是Transformation操作,用于对RDD的每个分区执行一个自定义映射函数,该函数可以处理分区内的所有元素,而不是一次仅处理一个元素。mapPartitions能够减少通信开销,因为映射操作是在每个分区内进行的,适用于需要对整个分区进行批量操作的场景,而不适用于需要考虑跨分区元素之间关系的场景。

**rdd.mapPartitionsWithIndex()**类似于mapPartitions,但提供了分区索引信息,允许更细粒度的控制。

data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
rdd = sc.parallelize(data, 3)
print(rdd.mapPartitions(lambda x:(i * 2 for i in x)).collect())
print(rdd.mapPartitionsWithIndex(lambda index,x:((index, i*2) for i in x)).collect())

输出结果:

[2, 4, 6, 8, 10, 12, 14, 16, 18, 20]
[(0, 2), (0, 4), (0, 6), (1, 8), (1, 10), (1, 12), (2, 14), (2, 16), (2, 18), (2, 20)]
④coalesce

coalesce()用于减少分区数的Transformation操作,可以尽量避免数据迁移,提升效率。

参数如下:

coalesce(numPartitions, shuffle=False)
numPartitions:新的分区数。
shuffle:是否进行数据洗牌,默认为False。当设置为 True 时,将触发数据洗牌操作,否则只是简单地减小分区数。

rdd = sc.parallelize(range(20),10)
print(rdd.glom().collect())
print(rdd.coalesce(2,shuffle=False).glom().collect())
print(rdd.coalesce(2,shuffle=True).glom().collect())

输出结果:

[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11], [12, 13], [14, 15], [16, 17], [18, 19]]
[[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]]
[[0, 1, 4, 5, 6, 7, 12, 13, 16, 17], [2, 3, 8, 9, 10, 11, 14, 15, 18, 19]]
⑤repartition

rdd.repartition()用于重新分区的Transformation操作,可以增加或减少分区数,通过shuffle来重新组织数据。允许动态调整RDD的分区数,可在数据分布不均匀时提高计算性能。

对比:coalesce在已有分区基础上尽量减少数据shuffle,而repartition会创建新分区并且使用full shuffle。

rdd = sc.parallelize(range(25),25)
print(rdd.glom().collect())
print(rdd.repartition(5).glom().collect())
print(rdd.coalesce(5).glom().collect())

输出结果:

[[0], [1], [2], [3], [4], [5], [6], [7], [8], [9], [10], [11], [12], [13], [14], [15], [16], [17], [18], [19], [20], [21], [22], [23], [24]]
[[6, 8, 11, 15, 20, 21], [0, 9, 16, 18, 24], [2, 3, 7, 14, 19, 22], [1], [4, 5, 10, 12, 13, 17, 23]]
[[0, 1, 2, 3, 4], [5, 6, 7, 8, 9], [10, 11, 12, 13, 14], [15, 16, 17, 18, 19], [20, 21, 22, 23, 24]]
⑥partitionBy

rdd.partitionBy()用于对PairRDD重新分区,是Transformation操作,可以根据指定的分区器对键值对数据进行重新分区,以更好地控制数据的分布。

data = [('a', 1), ('b', 2), ('c', 3), ('d', 4), ('e', 5)]
pair_rdd = sc.parallelize(data)
hash_partitioned_rdd = pair_rdd.partitionBy(2)
for partition, data in enumerate(hash_partitioned_rdd.glom().collect()):
    print(f"Partition {partition}: {data}")

输出结果:

Partition 0: [('b', 2), ('c', 3), ('d', 4)]
Partition 1: [('a', 1), ('e', 5)]

6、缓存操作

如果多个任务在计算过程中共享同一个RDD作为中间数据,通过对其进行缓存,将其存储在内存中,可以显著加快计算速度。但是对RDD的缓存并不会立即生效,而是在该RDD第一次被计算出来时才会进行缓存。在不再需要某个RDD时,可以使用unpersist来释放缓存,而这个操作是立即执行的。这样可以有效地管理内存资源,避免不必要的缓存。
另一方面,缓存在提高计算速度的同时,并不会切断RDD的血缘依赖关系。因为缓存的数据可能存在某些分区的节点发生故障的情况,例如内存溢出或者节点损坏。在这种情况下,可以根据血缘关系重新计算受影响分区的数据,确保计算的正确性。
如果需要切断血缘关系,可以使用checkpoint来设置检查点,将RDD保存到磁盘中。与缓存类似,对RDD进行checkpoint同样不会立即生效,而是在该RDD第一次被计算出来时才会保存成检查点。通常,checkpoint适用于一些计算代价非常高昂的中间结果,或者在重复计算结果不可保证完全一致的情况下(例如使用zipWithIndex算子)。
对RDD进行缓存是优化Spark计算性能的有效手段,但需要根据具体情况灵活运用,以确保计算的准确性和效率。

①cache

**rdd.cache()**用于将RDD的计算结果缓存到内存中,以便在后续操作中重用,可以显著提高迭代算法等需要多次使用同一数据集的性能。rdd.cache使用存储级别MEMORY_ONLY,意味着如果内存不足,Spark可能会根据缓存数据的大小和可用内存的情况进行动态调整,例如将一部分或全部缓存的数据移除,以腾出内存供其他操作使用。

a = sc.parallelize(range(10000),5)
a.cache()
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a

print(mean_a)

输出结果:

4999.5
②persist

**rdd.persist()**用于将RDD中间结果缓存到内存或磁盘中,以便在后续操作中重用。与cache不同,persist允许用户指定不同的存储级别,以更灵活地管理缓存。存储级别即不同的数据缓存的位置和策略,可以是MEMORY_ONLY、DISK_ONLY、MEMORY_AND_DISK(默认)等。

**rdd.persist()**写入磁盘的文件是临时文件,应用执行完成后就会被删除,可以使用rdd.unpersist()立即释放缓存。

from  pyspark.storagelevel import StorageLevel
a = sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a = a.reduce(lambda x,y:x+y)
cnt_a = a.count()
mean_a = sum_a/cnt_a

a.unpersist() 
print(mean_a)

输出结果:

4999.5
③checkpoint

rdd.checkpoint()用于将RDD中间结果写入磁盘。由于血缘依赖过长会造成容错成本过高,可以中间阶段做检查点容错,如果检查点之后有节点出现问题,可以从检查点开始重新执行程序,减少开销。需要注意的是,checkpoint操作并不会马上被执行,而是在执行Action操作时才被触发。另外,checkpoint路径保存的文件是永久存在的,不会随着应用的结束而被删除。

sc.setCheckpointDir("./data/checkpoint/")
rdd = sc.parallelize(["a","b","c","d"],2)
rdd_idx = rdd.zipWithIndex() 
rdd_idx.checkpoint() 
rdd_idx.take(3)

输出结果:

[('a', 0), ('b', 1), ('c', 2)]

7、共享变量

共享变量主要用于在分布式计算中实现在任务之间共享数据,以提高性能和降低网络开销。广播变量(broadcast variables)和累加器(accumulators)是两个重要的分布式计算工具,用于在集群上共享数据和累积结果。

①broadcast

当需要在所有工作节点之间共享较小的只读数据集时,使用广播变量可以避免将该数据集多次传输到各个节点。这可以有效减少网络开销,提高性能。典型的应用场景包括在所有节点上使用相同的配置参数、字典或者映射表等。并且可以避免任务间重复传输,如果一个RDD需要在多个任务中使用,而且这个RDD的数据较小,使用广播变量可以避免在不同任务之间多次传输相同的数据。

#广播变量 broadcast 不可变,在所有节点可读
broads = sc.broadcast(100)
rdd = sc.parallelize(range(10))
print(rdd.map(lambda x:x+broads.value).collect())
print(broads.value)

输出结果:

[100, 101, 102, 103, 104, 105, 106, 107, 108, 109]
100
②accumulator

累加器主要用于执行在分布式任务中的“添加”和“合并”操作,通常用于聚合和计数等操作。
例如,可以用累加器来计算在整个集群上发生的某个特定事件的总次数;计算所有节点上某个变量的总和或平均值。

#累加器 只能在Driver上可读,在其它节点只能进行累加
total = sc.accumulator(0)
rdd = sc.parallelize(range(10),3)
rdd.foreach(lambda x:total.add(x))
total.value

输出结果:

45

四、综合应用

1、求平均数

题目:计算1000个随机数的平均数。

import numpy as np
import findspark
import pyspark 
from pyspark import SparkContext, SparkConf
findspark.init()

conf = SparkConf().setAppName("test").setMaster("local[4]")
sc = SparkContext(conf=conf)

np.random.seed(6)
data = np.random.randint(0, 100, size=1000)
rdd = sc.parallelize(data,2)
print("前10个数:",rdd.take(10))
data_sum = rdd.reduce(lambda x,y:x+y)
data_count = rdd.count()
print("平均数:",data_sum/data_count)

sc.stop()

运行结果:

前10个数: [10, 73, 99, 84, 79, 80, 62, 25, 1, 75]
平均数: 50.227

2、求众数

题目:计算1000个随机数中出现最多的数。

思路:

①通过map算子和lambda函数构造键值对RDD,并赋初值为1,以便对每个数的出现次数进行统计,通过reduceByKey算子进行累加。

②通过map和reduce算子键值对中的value,并依次比大小,得出最大出现次数。

③通过filter算子根据最大出现次数筛选出对应的数。

import numpy as np
import findspark
import pyspark 
from pyspark import SparkContext, SparkConf
findspark.init()

conf = SparkConf().setAppName("test").setMaster("local[4]")
sc = SparkContext(conf=conf)
np.random.seed(6)
data = np.random.randint(0, 100, size=1000)
rdd1 = sc.parallelize(data,2)
rdd2 = rdd1.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
max_count = rdd2.map(lambda x:x[1]).reduce(lambda x,y: x if x>=y else y)
max_value = rdd2.filter(lambda x:x[1]==max_count).map(lambda x:x[0]).collect()
print("众数:",max_value," 出现次数:",max_count)

sc.stop()

运行结果:

出现最多的数: [63]  出现次数: 18

3、求总分前3的学生及各科成绩

题目:有一批学生信息,包括语文、数学、英语三门课的成绩,找出总分排名前三的学生,并按次序打印出其各科成绩和总分。

思路:

①以学生名字为key,通过reduceByKey算子统计学生总分。

②通过sortBy算子,按照总分从大到小排序,取出前三名。

③根据前三名的名字通过filter算子从所有成绩信息中筛选出前三名的各科成绩。

④通过groupByKey、join、sortBy算子将学生名字、各科成绩、总分关联在一起,然后打印信息。

import random
import findspark
import pyspark 
from pyspark import SparkContext, SparkConf
findspark.init()

conf = SparkConf().setAppName("test").setMaster("local[4]")
sc = SparkContext(conf=conf)
random.seed(6)
names = ["Alice","Andy", "Bob", "Charlie", "David", "Emily","Edward", "Frank", "Grace", "Henry", "Ivy", "Jack",
         "James","Kate", "Liam", "Mia", "Noah", "Olivia", "Penny", "Quinn", "Ryan", "Sam", "Sophia", "Tom",
         "Ursula", "Victor", "Wendy", "Xander", "Yara", "Zane"] * 3 
scores = [(name, random.randint(0, 100)) for name in names]

rdd1 = sc.parallelize(scores,2)
rdd2 = rdd1.reduceByKey(lambda x,y:x+y)
top3_list = rdd2.sortBy(lambda x:x[1],ascending=False).take(3)
rdd3 = rdd1.filter(lambda x:x[0] in [i[0] for i in top3_list])
rdd4 = rdd3.groupByKey().join(sc.parallelize(top3_list)).sortBy(lambda x:x[1][1],ascending=False)

i = 0
for key, values in rdd4.collect():
    print(f"第{i+1}名{key}的各科成绩为: {list(values[0])},总分为:{values[1]}")
    i+=1

sc.stop()

运行结果:

第1名Mia的各科成绩为: [98, 96, 86],总分为:280
第2名Alice的各科成绩为: [73, 93, 82],总分为:248
第3名Zane的各科成绩为: [89, 73, 76],总分为:238

五、总结

总的来说,PySpark适合初学者入门学习,由于python门槛不高,易于掌握,可以通过PySpark了解Spark的运行机制以及RDD算子的使用。但如果是需要几百台服务器才能运行的任务场景, PySpark的UDF(User Defined Functions)的性能差距肯定比不过Spark-Scala。

至于选什么语言,取决于业务需求。如果是处理简单的数据清洗聚合,且数据量非常大,用Scala会有性能优势,可以节约计算资源。如果需要处理较为复杂的算法模型,依赖于各种第三方包,那么使用Python会更好。

标签: python spark spark-ml

本文转载自: https://blog.csdn.net/weixin_44458771/article/details/135667367
版权归原作者 唯余木叶下弦声 所有, 如有侵权,请联系我们删除。

“(一)PySpark3:安装教程及RDD编程(非常详细)”的评论:

还没有评论