0


python——spark使用

一、spark简介

Spark使用Scala语言进行实现,能操作分布式数据集。

Spark是在借鉴了MapReduce之上发展而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的缺陷。

Spark的适用场景:

  1. 复杂的批量处理(Batch Data Processing),偏重点在于处理海量数据的能力,至于处理速度可忍受,通常的时间可能是在数十分钟到数小时;

  2. 基于历史数据的交互式查询(Interactive Query),通常的时间在数十秒到数十分钟之间

  3. 基于实时数据流的数据处理(Streaming Data Processing),通常在数百毫秒到数秒之间

二、spark使用思路

spark是一个分布式计算框架,主要流程:数据输入——数据计算——数据输出

三、准备工作

1.检查python版本,python3.12有部分功能不能使用,需要降低python版本。作者使用的是3.10

2.导入pyspark包。

3.链接python和spark。代码如下

from pyspark import SparkConf,SparkContext
import sys
import os

#查看python解释器路径
#print(sys.executable)

#spark连接python,这里的路径可以根据print(sys.executable)查看
os.environ['PYSPARK_PYTHON']="D:\\pythonProject\\.venv\\Scripts\\python.exe"

#python连接spark,local[*]是指在本机上进行操作,也可以运行在大规模集群上。test_app是名称。
conf=SparkConf().setMaster("local[*]").setAppName("test_app")
sc=SparkContext(conf=conf)

四、数据输入和读取

4.1代码

from pyspark import SparkConf,SparkContext
import sys
import os

#查看python解释器路径
#print(sys.executable)

#spark连接python,这里的路径可以根据print(sys.executable)查看
os.environ['PYSPARK_PYTHON']="D:\\pythonProject\\.venv\\Scripts\\python.exe"

#python连接spark,local[*]是指在本机上进行操作,也可以运行在大规模集群上。test_app是名称。
conf=SparkConf().setMaster("local[*]").setAppName("test_app")
sc=SparkContext(conf=conf)

#数据输入——输入列表,元组,字符串,集合,字典
rdd1=sc.parallelize([1,2,3,4,5,6])
rdd2=sc.parallelize((1,2,3,4,5,6))
rdd3=sc.parallelize("123456")
rdd4=sc.parallelize({1, 2, 3, 4, 5, 6})
rdd5=sc.parallelize({'key1':'v1','key2':'v2'})

#数据输入——读取文件,根据文件内容读取
#rdd=sc.textFile('路径')

#输出结果,带上collect()
print(rdd1.collect())
print(rdd2.collect())
print(rdd3.collect())
print(rdd4.collect())
print(rdd5.collect())

#。。。。。第五部分——数据计算放置位置。。。。。

#关闭spark
sc.stop()

4.2结果

五、数据计算

#以下代码 放在第四部分指定位置

5.1 数据计算——map

#数据计算——map计算(参数是一个函数,而且函数必须有参数(把列表传进去)和返回值
rdd1=sc.parallelize([1,2,3,4,5])

#把列表所有数*5-3
def fun(data):
    return data*5-3

#执行map计算
rdd2=rdd1.map(fun)

#对于简单函数,可以使用匿名函数表示
rdd2=rdd1.map(lambda x:x*5-3)

#读取结果
print(rdd2.collect())

5.2 数据计算——flatmap

#数据计算——flatmap,在map基础上解除一次嵌套
rdd1=sc.parallelize(["hello world","hello kunkun","ikun"])

#要把每个单词提取出来
def split(data):
    return data.split(" ")

#用map提取有两层,列表中嵌套列表
rdd2=rdd1.map(split)
print(rdd2.collect())

#用flatmap提取,只有一个列表
rdd3=rdd1.flatMap(split)
print(rdd3.collect())

5.3 数据计算——reduceByKey

#数据计算——reduceByKey,分组求和。要求:输入的数据是二元元组,元组中'hello'称为key,把key相同的值进行运算,与MapReduce中reduce操作差不多。
rdd1=sc.parallelize([('hello',1),('world',1),('bye',1),('world',1)])
#这里函数需要接受两个值,两个值类型和返回值类型必须一样。接收到1
def sum(data1,data2):
    return data1+data2
rdd2=rdd1.reduceByKey(sum)
#分组求和后的结果,按顺序排出,bye,hello,world
print(rdd2.collect())

5.4 数据计算——filter

#数据计算——filter,用于判断,判断结果如果为真返回如果为假过滤
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=rdd1.filter(lambda x:x%2 != 0)
print(rdd2.collect())

5.5 数据计算——distinct

#数据计算——distinct,去重。无参数
rdd1=sc.parallelize([1,2,3,4,5,1,2,3,6])
rdd2=rdd1.distinct()
print(rdd2.collect())

5.6 数据计算——sortBy

#数据计算——sortBy,排序。三个参数,第一个参数是函数,指定哪个元素进行排序,这里按照元组的第二个数据进行排序。第二个参数控制升序1降序0,第三个参数控制分区排序一般为1
rdd1=sc.parallelize([('hello',3),('world',2),('bye',2),('spark',1)])
rdd2=rdd1.sortBy(lambda d:d[1],ascending=1,numPartitions=1)
print(rdd2.collect())

六、案例

要求:计算sparkfile.txt文件中每个单词出现的次数,文件内容如下

hello world bye world
hello spark bye spark
bye hadoop hello hadoop

解答:

from pyspark import SparkConf,SparkContext
import sys
import os

#spark连接python
os.environ['PYSPARK_PYTHON']="D:\\pythonProject\\.venv\\Scripts\\python.exe"
#python连接spark
conf=SparkConf().setMaster("local[*]").setAppName("test_app")
sc=SparkContext(conf=conf)

#获取文件内容
rdd1=sc.textFile('sparkfile.txt')
#print(rdd1.collect())

#获取内容分开
rdd2=rdd1.flatMap(lambda s: s.split(" "))
#print(rdd2.collect())

#把列表转换成二元元组
rdd3=rdd2.map(lambda x:(x,1))
#print(rdd3.collect())

#计算单词数量
rdd4=rdd3.reduceByKey(lambda d1,d2:d1+d2)
print(rdd4.collect())


本文转载自: https://blog.csdn.net/2301_80339607/article/details/136028054
版权归原作者 py.鸽鸽 所有, 如有侵权,请联系我们删除。

“python——spark使用”的评论:

还没有评论