前言
Apache Spark是用于大规模数据(large-scala data)处理的统一(unified)分析引擎。
简单来说,Spark是一款分布式的计算框架,用于调度成百上千的服务器集群,计算TB、PB乃至EB级别的海量数据。
Spark对Python语言的支持,重点体现在Python第三方库:PySpark
PySpark是由Spark官方开发的Python语言第三方库。
Python开发者可以使用pip程序快速的安装PySpark并像其它第三方库那样直接使用。
基础准备
安装
同其它的Python第三方库一样,PySpark同样可以使用pip程序进行安装。
pip install pyspark
或使用国内代理镜像网站(清华大学源)
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple pyspark
构建PySpark执行环境入口对象
想要使用PySpark库完成数据处理,首先需要构建一个执行环境入口对象。
PySpark的执行环境入口对象是:类SparkContext的类对象
# 导包from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)# 打印PySpark的运行版本print(sc.version)# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
运行需要Java环境,推荐jdk8
PySpark的编程模型
SparkContext类对象,是PySpark编程中一切功能的入口。
PySpark的编程,主要分为如下三大步骤:
数据输入
PySpark支持多种数据的输入,在输入完成后,都会得到一个:RDD类的对象
RDD全称为:弹性分布式数据集(Resilient Distributed Datasets)
PySpark针对数据的处理,都是以RDD对象作为载体,即:
- 数据存储在RDD内
- 各类数据的计算方法,也都是RDD的成员方法
- RDD的数据计算方法,返回值依旧是RDD对象
Python数据容器转RDD对象
PySpark支持通过SparkContext对象的parallelize成员方法,将list/tuple/set/dict/str转换为PySpark的RDD对象
# 导包from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd1 = sc.parallelize([1,2,3])
rdd2 = sc.parallelize((1,2,3))
rdd3 = sc.parallelize({1,2,3})
rdd4 = sc.parallelize({'key1':'value1','key2':'value2'})
rdd5 = sc.parallelize('hello')# 输出RDD的内容,需要使用collect()print(rdd1.collect())# [1, 2, 3]print(rdd2.collect())# [1, 2, 3]print(rdd3.collect())# [1, 2, 3]print(rdd4.collect())# ['key1', 'key2']print(rdd5.collect())# ['h', 'e', 'l', 'l', 'o']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
注意:
- 字符串会被拆分出一个个的字符,存入RDD对象
- 字典仅有key会被存入RDD对象
读取文件转RDD对象
PySpark也支持通过SparkContext入口对象来读取文件,构建出RDD对象。
先提前预备一个txt文件
hello
python
day
# 导包from pyspark import SparkConf, SparkContext
# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.textFile('E:\\code\\py-space\\8.27\\hello.txt')# 输出RDD的内容,需要使用collect()print(rdd.collect())# ['hello', 'python', 'day']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
数据计算
RDD对象内置丰富的:成员方法(算子)
map算子
将RDD的数据一条条处理(处理的逻辑基于map算子中接收的处理函数),返回新的RDD
rdd.map(func)# func: f:(T) -> U# f: 表示这是一个函数# (T) -> U 表示的是方法的定义:()表示无需传入参数,(T)表示传入1个参数# T是泛型的代称,在这里表示 任意类型# U是泛型的代称,在这里表示 任意类型# (T) -> U : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型不限# (A) -> A : 这是一个函数,该函数接收1个参数,传入参数类型不限,返回一个返回值,返回值类型和传入参数类型一致
示例:
# 导包from pyspark import SparkConf, SparkContext, sql
import os
# 设置环境变量
os.environ['PYSPARK_PYTHON']='D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6])# 通过map方法将全部数据乘以10,传入参数为函数
rdd2 = rdd.map(lambda x: x *10)# 输出RDD的内容,需要使用collect()print(rdd2.collect())# [10, 20, 30, 40, 50, 60]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
由于map()的返回值还是RDD对象,可以继续在尾部进行链式调用
rdd3 = rdd.map(lambda x: x *10).map(lambda x: x +9)
flatMap算子
对RDD执行map操作,然后进行
解除嵌套
操作。
# 导包from pyspark import SparkConf, SparkContext, sql
import os
# 设置环境变量
os.environ['PYSPARK_PYTHON']='D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize(['a b c','d e f'])# 输出RDD的内容,需要使用collect()print(rdd.map(lambda x: x.split(' ')).collect())# [['a', 'b', 'c'], ['d', 'e', 'f']]print(rdd.flatMap(lambda x:x.split(' ')).collect())# ['a', 'b', 'c', 'd', 'e', 'f']# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
reduceByKey算子
针对
KV型(二元元组)
RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成
组内数据(value)
的聚合操作
rdd.reduceByKey(func)# func: (V, V) -> V# 接收2个传入参数(类型要一致),返回一个返回值,返回值类型和传入参数类型要求一致
示例:
# 导包from pyspark import SparkConf, SparkContext, sql
import os
# 设置环境变量
os.environ['PYSPARK_PYTHON']='D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('a',1),('a',1),('b',1),('b',1),('b',1)])# 输出RDD的内容,需要使用collect()print(rdd.reduceByKey(lambda a, b: a+b).collect())# [('b', 3), ('a', 2)]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
reduceByKey中的聚合逻辑是:比如有[1,2,3,4,5],然后聚合函数是:
lambda a,b: a+b
注意:reduceByKey中接收的函数,只负责聚合,不理会分组;分组是自动
by key
来分组的
filter算子
过滤想要的数据进行保留。
rdd.filter(func)# func: (T) -> bool# 传入一个参数任意类型,返回值必须是True/False,返回是True的数据被保留,False的数据被丢弃
示例:
# 导包from pyspark import SparkConf, SparkContext, sql
import os
# 设置环境变量
os.environ['PYSPARK_PYTHON']='D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,4,5,6])# 输出RDD的内容,需要使用collect()print(rdd.filter(lambda x: x %2==0).collect())# [2, 4, 6]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
distinct算子
对RDD数据进行去重,返回新的RDD
rdd.distinct()# 无需传参
示例:
# 导包from pyspark import SparkConf, SparkContext, sql
import os
# 设置环境变量
os.environ['PYSPARK_PYTHON']='D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3,3,2,6])# 输出RDD的内容,需要使用collect()print(rdd.distinct().collect())# [6, 1, 2, 3]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
sortBy算子
对RDD数据进行排序,基于你指定的排序依据。
rdd.sortKey(func, ascending=False, numPartitions=1)# func: (T) -> U:告知按照RDD中的哪个数据进行排序,比如lambda x: x[1]表示按照RDD中的第二列元素进行排序# ascending:True升序,False降序# numPartitions:用多少分区排序,全局排序需要设置为1
示例:
# 导包from pyspark import SparkConf, SparkContext, sql
import os
# 设置环境变量
os.environ['PYSPARK_PYTHON']='D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize([('Aiw',9),('Tom',6),('Jack',8),('Bolb',5)])# 输出RDD的内容,需要使用collect()print(rdd.sortBy(lambda x: x[1], ascending=False,
numPartitions=1).collect())# [('Aiw', 9), ('Jack', 8), ('Tom', 6), ('Bolb', 5)]# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
数据输出
collect算子
将RDD各个分区内的数据,统一收集到Driver中,形成一个List对象。
rdd.collect()# 返回值是一个List
示例:
# 导包from pyspark import SparkConf, SparkContext, sql
import os
# 设置环境变量
os.environ['PYSPARK_PYTHON']='D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1,2,3])
rdd_list:list= rdd.collect()print(rdd_list)# [1, 2, 3]print(type(rdd_list))# <class 'list'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
reduce算子
对RDD数据集按照你传入的逻辑进行聚合
rdd.reduce(func)# func:(T, T) -> T# 传入2个参数,1个返回值,要求返回值和参数类型一致
示例:
# 导包from pyspark import SparkConf, SparkContext, sql
import os
# 设置环境变量
os.environ['PYSPARK_PYTHON']='D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1,10))print(rdd.reduce(lambda a, b: a+b))# 45# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
take算子
取RDD的前N个元素,组合成List进行返回。
# 导包from pyspark import SparkConf, SparkContext, sql
import os
# 设置环境变量
os.environ['PYSPARK_PYTHON']='D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1,10))
rdd_take:list= rdd.take(3)print(rdd_take)# [1, 2, 3]print(type(rdd_take))# <class 'list'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
count算子
计算RDD有多少条数据,返回值是一个数字。
# 导包from pyspark import SparkConf, SparkContext, sql
import os
# 设置环境变量
os.environ['PYSPARK_PYTHON']='D:/Python/python.exe'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1,10))
rdd_count:int= rdd.count()print(rdd_count)# 9print(type(rdd_count))# <class 'int'># 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
saveAsTextFile算子
将RDD的数据写入文本文件中。支持本地写出、HDFS等文件系统。
注意事项:
# 导包from pyspark import SparkConf, SparkContext, sql
import os
# 设置环境变量
os.environ['PYSPARK_PYTHON']='D:/Python/python.exe'
os.environ['HADOOP_HOME']='D:/Hadoop-3.0.0'# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
rdd = sc.parallelize(range(1,10))
rdd.saveAsTextFile('./8.27/output')# 运行之前确保输出文件夹不存在,否则报错# 停止SparkContext对象的运行(停止PySpark程序)
sc.stop()
上述代码输出结果,输出文件夹内有多个分区文件
修改RDD分区为1个
方式一:SparkConf对象设置属性全局并行度为1:
# 创建SparkConf类对象
conf = SparkConf().setMaster('local[*]').setAppName('test_spark_app')# 设置属性全局并行度为1
conf.set('spark.default.parallelism','1')# 基于SparkConf类对象创建SparkContext类对象
sc = SparkContext(conf=conf)
方式二:创建RDD的时候设置(parallelize方法传入numSlices参数为1)
rdd = sc.parallelize(range(1,10), numSlices=1)
rdd = sc.parallelize(range(1,10),1)
版权归原作者 杼蛘 所有, 如有侵权,请联系我们删除。