0


自用 云计算 | pyspark | 常见RDD算子及例子(云计算期末)

文章目录


前言

  1. 期末复习
  2. 主要包括云计算基本的概念
  3. 常见的pyspark算子于对应的例子

一、基本概念(云计算、RDD等)

1. 云计算的定义和优点

定义:云计算是一种通过互联网提供计算服务的技术。相比于传统计算,它的资源获取方式,从“买”变为“租”

优点:

  1. 资源池化
  2. 弹性伸缩
  3. 安全可靠

2.RDD定义和五大特性

定义 :RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

五大特性:

  1. RDD由一系列的分区组成
  2. RDD的方法会作用在其所有的分区上
  3. RDD之间是有依赖关系(RDD有血缘关系)
  4. Key-Value型的RDD可以有分区器
  5. RDD的分区规划,会尽量靠近数据所在的服务器

3. 云计算部署类型(私有云/公有云/社区云/混合云之间的区别)

部署类型私有云公有云社区云混合云定义由单一组织独享的云计算资源。由第三方服务提供商提供的云计算资源,面向公众或多个租户。由多个组织共同拥有和使用的云计算资源,通常有相似的需求和目标。结合了私有云和公有云的优点,允许数据和应用在私有云和公有云之间进行迁移和交互。优点1. 安全;
2.自主可控。1.成本低;
2.高扩展性;
3.便捷性。1.成本共享;
2.安全性和隐私性;
3.协作。1.灵活性;
2.成本效益;
3.高可用性。缺点1.成本高;
2.维护复杂;
3.扩展性有限。1.安全性和隐私性;
2.控制权有限;
3.依赖性。1. 复杂性;
2.扩展性受限;
3.合规性问题。1. 管理复杂;
2.安全挑战;
3.集成难度。

4. 云计算服务类型

Infrastructure as a Service(IaaS):基础设备即服务
Platform as a Service(PaaS):平台即服务
Software as a Service(SaaS):软件即服务

二、常见RDD算子及例子

分布式集合对象上的API称之为算子
Python中使用pyspark初始化

from pyspark import SparkConf,SparkContext
if __name__ =='__main__'://构建SparkConf对象  
    conf =SparkConf().setAppName('helloworld').setMaster('local[*]')//构建SparkContext执行环境入口对象
    sc =SparkContext(conf = conf)

1. Transformation算子:行动算子

1)map算子和flatMap算子的区别和联系

map:是将RDD的数据一条条处理,返回新的RDD

  1. 一对一映射。
  2. 返回一个新的RDD,其中的每个元素是通过函数转换得到的单个元素。
  3. 适用于输入和输出元素之间存在一对一关系的情况。
data =[1,2,3,4,5]
rdd = sc.parallelize(data)

#使用map 算子将每个元素乘以 2
mapped_rdd = rdd.map(lambda x: x *2)print(mapped_rdd.collect())  

#输出:[2,4,6,8,10]

flatMap: 对rdd执行map操作,然后进行解除嵌套操作

  1. 一对多映射。
  2. 返回一个新的RDD,其中每个输入元素可以被映射为0个或多个输出元素。最终结果是扁平化的。
  3. 适用于输入元素需要映射为多个输出元素的情况,例如将句子拆分为单词。
data =["hello world","apache spark"]
rdd = sc.parallelize(data)// 使用 flatMap 算子将每个字符串拆分为单词
flat_mapped_rdd = rdd.flatMap(lambda x: x.split(" "))print(flat_mapped_rdd.collect())// 输出: ['hello', 'world', 'apache', 'spark']

2)sortBy算子和sortByKey算子的联系区别

sortBy和sortByKey算子都是用于对RDD进行排序的
sortBy

  1. 可以对任意类型的RDD进行排序
  2. 需要指定一个函数,该函数从RDD的元素中提取排序键
data =[("Alice",23),("Bob",20),("Charlie",25)]
rdd = sc.parallelize(data)//按年龄排序
sorted_rdd = rdd.sortBy(lambda x: x[1])print(sorted_rdd.collect())//输出: [('Bob', 20), ('Alice', 23), ('Charlie', 25)]

sortByKey

  1. 仅适用于键值对形式的RDD
  2. 根据键值对进行排序
data =[("Alice",23),("Bob",20),("Charlie",25)]
rdd = sc.parallelize(data)//按名字排序
sorted_rdd = rdd.sortByKey()print(sorted_rdd.collect())//输出: [('Alice', 23), ('Bob', 20), ('Charlie', 25)]
rdd = sc.parallelize([('a',1),('E',1),('C',1),('D',1),('b',1),('g',1),('f',1)],3)// 默认按照key进行升序排序print("默认: ",rdd.sortByKey().collect())
#如果要确保全局有序,排序分区数要给1,不是1的话,只能确保各个分区内排好序整体上不保证
print("多分区: ",rdd.sortByKey(ascending=False,numPartitions=5).collect())
#对排序的key进行处理,拍排序前处理一下key ,让key以你处理的样子进行排序(不影响数据本身)print("单分区: ",rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key:str(key).lower()).collect())// 默认:  [('C', 1), ('D', 1), ('E', 1), ('a', 1), ('b', 1), ('f', 1), ('g', 1)]// 多分区:  [('g', 1), ('f', 1), ('b', 1), ('a', 1), ('E', 1), ('D', 1), ('C', 1)]// 单分区:  [('a', 1), ('b', 1), ('C', 1), ('D', 1), ('E', 1), ('f', 1), ('g', 1)]

3)reduceByKey算子和groupByKey算子的联系区别

reduceByKey和groupByKey算子都是用于对键值对(Pair RDD)进行操作的算子
reduceByKey

  1. 对相同键的值进行合并,返回每个键一个单一的值。
  2. 适用于需要对键对应的值进行聚合操作的情况,如求和、求平均等。
  3. 在本地进行预聚合,减少网络传输,性能较高。
data =[("a",1),("b",1),("a",2)]
rdd = sc.parallelize(data)// 对相同键的值进行求和
reduced_rdd = rdd.reduceByKey(lambda x,y: x + y)print(reduced_rdd.collect())// 输出: [('a', 3), ('b', 1)]

groupeByKey

  1. 对相同键的值进行分组,返回每个键一个可迭代对象。
  2. 适用于需要对键对应的值进行分组操作的情况,如需要对分组后的数据进行进一步处理。
  3. 可能会导致数据倾斜和网络传输增多,性能较低
rdd = sc.parallelize([1,2,3,4,5])// 分组,将数字分层偶数和奇数2个组
rdd2 = rdd.groupBy(lambda num:'even'if(num %2==0)else'odd')// 将rdd2的元素的value转换成list,这样print可以输出内容.print(rdd2.map(lambda x:(x[0],list(x[1]))).collect())// [('odd', [1, 3, 5]), ('even', [2, 4])]
rdd = sc.parallelize([('a',1),('a',1),('b',2),('b',3)])
result = rdd.groupBy(lambda t: t[0])print(result.map(lambda t:(t[0],list(t[1]))).collect())// [('b', [('b', 2), ('b', 3)]), ('a', [('a', 1), ('a', 1)])]
data =[("a",1),("b",1),("a",2)]
rdd = sc.parallelize(data)// 对相同键的值进行分组
grouped_rdd = rdd.groupByKey()// 将结果转换为列表以便查看
result =[(k,list(v))for k, v in grouped_rdd.collect()]print(result)// 输出: [('a', [1, 2]), ('b', [1])]

4)filter、distinct、union算子

filter算子:返回是True的数据被保留,False的数据被丢弃

rdd = sc.parallelize([1,2,3,4,5,6])// 通过Filter算子, 保留奇数
result = rdd.filter(lambda x: x %2==1)print(result.collect())// 结果:[1,3,5]  

rdd = sc.parallelize([1,2,3,4,5,6])// 保留奇数
rdd.filter(lambda x: True if(x %2==1)else False)print(rdd.filter(lambda x: x %2==1).collect())// 结果:[1,3,5]

distinct算子:去重操作

rdd1 = sc.parallelize([('a',1),('a',1),('a',3)])print(rdd1.distinct().collect())// 结果: [('a', 3), ('a', 1)]

union算子:2个rdd合并成1个rdd返回

  1. union算子是不会去重。
  2. RDD的类型不同也是可以合并的
rdd1 = sc.parallelize([1,1,3,3])
rdd2 = sc.parallelize(["a","b","a"])
rdd3 = rdd1.union(rdd2)print(rdd3.collect())// 结果: [1, 1, 3, 3, 'a', 'b', 'a']

5)join、leftOuterJoin和rightOuterJoin算子

join: join算子只能用于二元元组

  1. 结果RDD包含两个RDD中键相同的元素对
  2. 结果RDD中的元素格式为 (key, (value1, value2))。
data1 =[("a",1),("b",2)]
data2 =[("a",3),("a",4),("b",5)]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)// 内连接
joined_rdd = rdd1.join(rdd2)print(joined_rdd.collect())//  输出: [('a', (1, 3)), ('a', (1, 4)), ('b', (2, 5))]

leftOuterJoin 算子

  1. 返回左RDD中的所有元素,对于右RDD中没有匹配的键,用None填充
  2. 结果RDD中的元素格式为 (key, (value1, value2_opt)),value2_opt为None或value2。
data1 =[("a",1),("b",2),("c",3)]
data2 =[("a",3),("a",4),("b",5)]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)// 左外连接
left_joined_rdd = rdd1.leftOuterJoin(rdd2)print(left_joined_rdd.collect())// 输出: [('a', (1, 3)), ('a', (1, 4)), ('b', (2, 5)), ('c', (3, None))]

rightOuterJoin 算子

  1. 返回右RDD中的所有元素,对于左RDD中没有匹配的键,用None填充。
  2. 结果RDD中的元素格式为 (key, (value1_opt, value2)),value1_opt为None或value1。
data1 =[("a",1),("b",2),("d",4)]
data2 =[("a",3),("a",4),("b",5),("c",6)]
rdd1 = sc.parallelize(data1)
rdd2 = sc.parallelize(data2)// 右外连接
right_joined_rdd = rdd1.rightOuterJoin(rdd2)print(right_joined_rdd.collect())// 输出: [('a', (1, 3)), ('a', (1, 4)), ('b', (2, 5)), ('c', (None, 6))]

6)intersection、glom算子

intersection算子:求2个rdd的交集,返回一个新rdd

rdd1 = sc.parallelize([('a',1),('a',3)])
rdd2 = sc.parallelize([('a',1),('b',3)])// 通过intersection算子求RDD之间的交集, 将交集取出 返回新RDD
rdd3 = rdd1.intersection(rdd2)print(rdd3.collect())// 结果:[('a', 1)]

glom:将RDD的数据,加上嵌套,这个嵌套按照分区 来进行
比如RDD数据[1,2,3,4,5]有2个分区那么,被glom后,数据变成:[[1,2,3],[4,5]]

rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],2)print(rdd.glom().flatMap(lambda x: x).collect())//结果:[[1, 2, 3, 4], [5, 6, 7, 8, 9]]

2. Action算子:行动算子

countByKey: 统计key出现的次数(一般适用KV型RDD)

rdd = sc.textFile("../data/input/words.txt")
rdd2 = rdd.flatMap(lambda x: x.split(" ")).map(lambda x:(x,1))
# 通过countByKey来对key进行计数, 这是一个Action算子
# result 不是rdd而是dict
result = rdd2.countByKey()print(result)print(type(result))
# 结果:defaultdict(<class'int'>,{'hello':3,'xing':2,'qian':4})<class'collections.defaultdict'>

reduce:对RDD数据集按照你传入的逻辑进行聚合

rdd = sc.parallelize([1,2,3,4,5])print(rdd.reduce(lambda a,b: a + b))
# 结果:15

fold:对RDD数据集按照你传入的逻辑进行聚合
这个初始值聚合,会作用在:分区内聚合、分区间聚合

rdd = sc.parallelize([1,2,3,4,5,6,7,8,9],3)print(rdd.fold(10, lambda a,b: a + b))
# 结果:85
#分区1:123聚合的时候带上10作为初始值得到16=1+2+3+10 
#分区2:456聚合的时候带上10作为初始值得到25=4+5+6+10 
#分区3:789聚合的时候带上10作为初始值得到34=7+8+9+10
#3个分区的结果做聚合也带上初始值10,
#所以结果是:16+25+34+10=85

first:取出RDD的第一个元素

result = sc.parallelize([2,4,5,6,7]).first()print(result)
# 结果 2

take:取RDD的前N个元素,组合成list返回给你

result = sc.parallelize([22,4,5,346,7,56,78,55]).take(5)print(result)
# 结果 [22,4,5,346,7]

top:对RDD数据集进行降序排序,取前N个

result = sc.parallelize([22,4,5,346,7,56,78,55]).top(5)print(result)

# 结果:[346,78,56,55,22]

count:计算RDD有多少条数据,返回值是一个数字

result = sc.parallelize([22,4,5,346,7,56,78,55]).count(5)print(result)

# 结果:8

takeSample:随机抽样RDD的数据

rdd = sc.parallelize([1,3,5,3,1,3,2,6,7,8,6],1)print(rdd.takeSample(False,5,1))
# 结果:[2,7,6,6,3]

takeOrdered:对RDD进行排序取前N个

rdd = sc.parallelize([1,3,2,4,7,9,6],1)print(rdd.takeOrdered(3, lambda x:-x))
# 结果: [9,7,6]

foreach:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个意思),但是这个方法没有返回值

rdd = sc.parallelize([1,3,2,4,7,9,6],1)
#注意,forreach函数没有返回值,因此在里面直接打印了
result = rdd.foreach(lambda x:print(x *10, end=" "))
#这个result对象是None
print(result)
# 结果:None

saveAsTextFile:将RDD的数据写入文本文件中支持本地写出, hdfs等文件系统.
P.S. foreach 与 saveAsTestFile这两个算子是分区(Executor) 跳过Driver,由分区所在的Executor直接执行。其余的Action算子都会将结果发送至Driver

编程题

某个类别在哪些城市销售(以电脑为例子)

计算IDF值

总结

自用

标签: 云计算 python spark

本文转载自: https://blog.csdn.net/2401_83977996/article/details/139393101
版权归原作者 fatiao it 所有, 如有侵权,请联系我们删除。

“自用 云计算 | pyspark | 常见RDD算子及例子(云计算期末)”的评论:

还没有评论