0


[PySpark学习]RDD的转换(Transformation)与动作算子(Action)

一、RDD概念

RDD(英文全称Resilient Distributed Dataset),即弹性分布式数据集是spark中引入的一个数据结构,是Spark中最基本的数据抽象,代表一个不可变、可分区、里面的元素可并行计算的集合。

Resilient弹性:RDD的数据可以存储在内存或者磁盘当中,RDD的数据可以分区。

Distributed分布式:RDD的数据可以分布式存储,可以进行并行计算。

Dataset数据集:一个用于存放数据的集合。

二、RDD算子

    **指的是RDD对象中提供了非常多的具有特殊功能的函数, 我们将这些函数称为算子(函数/方法/API)。**

RDD算子分为两类:

   ** Transformation(转换算子): **
         返回值: 是一个新的RDD
         特点: 转换算子只是定义数据的处理规则,并不会立即执行,是lazy(惰性)的。需要由Action算子触发
 
    ** Action(动作算子):**
         返回值: 要么没有返回值None,或者返回非RDD类型的数据
         特点: 动作算子都是立即执行。执行的时候,会将它上游的其他算子一同触发执行

    以下演示Spark学习用到的相关RDD算子,通过SecureCRTPortable客户端远程连接Linux服务器操作pyspark。

    连接界面如下:

三、RDD的转换算子

(一)(单)值类型算子

1、map算子

    格式:rdd.map(fn)
     作用:  主要根据传入的函数,对数据进行一对一的转换操作,传入一行,返回一行。

    需求: 数字加一后返回

    代码:
init_rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
init_rdd.map(lambda num:num+1).collect()
#运行结果:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]           

2、groupBy 算子

    格式: groupBy(fn)
     作用: 根据用户传入的自定义函数,对数据进行分组操作

    需求: 将数据分成奇数和偶数

    代码:
>>> init_rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
>>> init_rdd.groupBy(lambda num:"偶数"  if num % 2 == 0 else "奇数").mapValues(list).collect()
# 运行结果:[('偶数', [0, 2, 4, 6, 8]), ('奇数', [1, 3, 5, 7, 9])] 
    说明:mapValues(list)将数据类型转成List列表

3、filter算子

    格式:filter(fn)
     作用:根据用户传入的自定义函数对数据进行过滤操作。自定义函数的返回值类型是bool类型。True表示满足过滤条件,会将数据保留下来;False会将数据丢弃掉。

    需求:过滤掉数值<=3的数据

    代码:
>>> init_rdd = sc.parallelize([0,1,2,3,4,5,6,7,8,9])
>>> init_rdd.filter(lambda num:num > 3).collect()    
# 运行结果 [4, 5, 6, 7, 8, 9]

4、flatMap算子

    格式:rdd.flatMap(fn)

    作用:在map算子的基础上,加入一个压扁的操作, 主要适用于一行中包含多个内容的操作,实现一转多的操作

    需求:将姓名一个一个的输出

    代码:
>>> init_rdd = sc.parallelize(['张三 李四 王五','赵六 周日'])
>>> init_rdd.flatMap(lambda line:line.split()).collect()
#  运行结果  ['张三', '李四', '王五', '赵六', '周日']
    说明: split()默认会按照空白字符对内容进行切分处理。例如:空格、制表符、回车。还是推荐大家明确指定所需要分割的符号。

(二)双值类型算子

    双值类型算子主要有union(并集) 和intersection(交集)

    格式:rdd1.union(rdd2)、 rdd1.intersection(rdd2)

    代码:
>>> rdd1 = sc.parallelize([3,3,2,6,8,0])
>>> rdd2 = sc.parallelize([3,2,1,5,7])
>>> rdd1.union(rdd2).collect()
#  并集运行结果  [3, 3, 2, 6, 8, 0, 3, 2, 1, 5, 7]
>>> 
>>> rdd1.union(rdd2).distinct().collect()
#  并集去重运行结果 [8, 0, 1, 5, 2, 6, 3, 7]
>>> 
>>> rdd1.intersection(rdd2).collect()
#  交集运行结果 [2, 3]
    说明:union取并集不会对重复出现的数据去,distinct()是转换算子,用来对RDD中的元素进行去重处理。交集会对结果数据进行去重处理。

(三)key-value数据类型算子

1、groupByKey()

    格式: rdd.groupByKey()

    作用: 对键值对类型的RDD中的元素按照键key进行分组操作。只会进行分组。

    需求:对学生按照班级分组统计

    代码:
>>> rdd = sc.parallelize([('c01','张三'),('c02','李四'),('c02','王五'),('c01','赵六'),('c03','田七'),('c03','周八'),('c02','李九')])
>>> rdd.groupByKey().mapValues(list).collect()
#  运行结果 [('c01', ['张三', '赵六']), ('c02', ['李四', '王五', '李九']), ('c03', ['田七', '周八'])]

2、reduceByKey()

    格式: rdd.reduceByKey(fn)
     作用: 根据key进行分组,将一个组内的value数据放置到一个列表中,对这个列表基于fn进行聚合计算操作

    需求:统计每个班级学生人数

    代码:
>>> rdd = sc.parallelize([('c01','张三'),('c02','李四'),('c02','王五'),('c01','赵六'),('c03','田七'),('c03','周八'),('c02','李九')])
>>> rdd.map(lambda tup:(tup[0],1)).reduceByKey(lambda agg,curr:agg+curr).collect()                                                    
#  运行结果[('c01', 2), ('c02', 3), ('c03', 2)]

3、sortByKey()算子

    格式:rdd.sortByKey(ascending=True|False)
     作用:  根据key进行排序操作,默认按照key进行升序排序,如果需要降序,设置 ascending 参数的值为False。

    需求1:根据key进行排序操作,演示升序

    代码:
>>> rdd = sc.parallelize([(10,2),(15,3),(8,4),(7,4),(2,4),(12,4)]) 
>>> rdd.sortByKey().collect()                                     
#  运行结果[(2, 4), (7, 4), (8, 4), (10, 2), (12, 4), (15, 3)]
    需求2:根据key进行排序操作,演示降序

    代码:
>>> rdd = sc.parallelize([(10,2),(15,3),(8,4),(7,4),(2,4),(12,4)]) 
>>> rdd.sortByKey(ascending=False).collect()                       
#  运行结果 [(15, 3), (12, 4), (10, 2), (8, 4), (7, 4), (2, 4)]
    需求3:根据key进行排序操作,演示升序

    代码:根据key进行排序操作,演示升序
>>> rdd = sc.parallelize([('a01',2),('A01',3),('a011',2),('a03',2),('a021',2),('a04',2)])>>> rdd.sortByKey().collect()
#  运行结果 [('A01', 3), ('a01', 2), ('a011', 2), ('a021', 2), ('a03', 2), ('a04', 2)]
    说明:对字符串类型的key进行排序的时候,按照ASCII码表进行排序。大写字母排在小写字母的前面;如果前缀一样,短的排在前面,长的排在后面。

四、RDD的动作算子

1、collect() 算子:

    格式: collect()

    作用: 收集各个分区的数据,将数据汇总到一个大的列表返回

    使用:在需要执行的转换算子最后加上collect()

2、reduce() 算子

    格式: reduce(fn)
     作用: 根据用户传入的自定义函数,对数据进行聚合操作。该算子是Action动作算子;而reduceByKey是Transformation转换算子。

    需求:统计所有元素之和是多少
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])     
>>> def mysum(agg,curr):
...     print(f"中间临时聚合结果{agg},当前遍历到的元素{curr}")
...     return agg+curr
... 
>>> rdd.reduce(mysum)
###
运行结果
中间临时聚合结果6,当前遍历到的元素7
中间临时聚合结果13,当前遍历到的元素8
中间临时聚合结果21,当前遍历到的元素9
中间临时聚合结果30,当前遍历到的元素10
中间临时聚合结果1,当前遍历到的元素2
中间临时聚合结果3,当前遍历到的元素3
中间临时聚合结果6,当前遍历到的元素4
中间临时聚合结果10,当前遍历到的元素5
中间临时聚合结果15,当前遍历到的元素40
55
###
    说明: 初始化的时候,agg,表示中间临时聚合结果,默认取列表中的第一个元素值,curr表示当前遍历到的元素,默认取列表中的第二个元素的值。

3、first() 算子

    格式: rdd.first()
     作用: 取RDD中的第一个元素。(不会对RDD中的数据排序)

    需求:

    代码:
>>> rdd = sc.parallelize([3,1,2,4,5,6,7,8,9,10])
>>> rdd.first()
#  运行结果 3

4、take() 算子

    格式: rdd.take(N)
     说明: 取RDD中的前N元素。(不会对RDD中的数据排序)

    需求:获取前3个元素

    代码:
>>> rdd = sc.parallelize([3,1,2,4,5,6,7,8,9,10])
>>> rdd.take(3)
#  运行结果 [3, 1, 2]

5、top()算子

    格式: top(N,[fn])
     作用: 对数据集进行倒序排序操作,如果kv(键值对)类型,针对key进行排序,获取前N个元素
     fn: 可以自定义排序,按照谁来排序

    需求1:获取前3个元素

    代码:
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.top(3)
#  运行结果 [10, 9, 8]
    需求2:按照班级人数降序排序,取前2个

    代码:
>>> rdd = sc.parallelize([('c01',5),('c02',8),('c04',1),('c03',4)])
>>> rdd.top(2,key=lambda tup:tup[1])
#运行结果  [('c02', 8), ('c01', 5)]
    需求3:按照班级人数升序排序,取前2个
>>> rdd = sc.parallelize([('c01',5),('c02',8),('c04',1),('c03',4)])
>>> rdd.top(2,key= lambda tup:-tup[1])
#  运行结果 [('c04', 1), ('c03', 4)]

6、count() 算子

    格式:count()
     作用:统计RDD中一共有多少个元素

    需求:获取一共有多少个元素

    代码:
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.count()
#  运行结果 10

7、foreach() 算子

    格式: foreach(fn)
     作用: 遍历RDD中的元素,对元素根据传入的函数进行自定义的处理

    需求:对数据进行遍历打印

    代码:
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.foreach(lambda num:print(num))          
###
运行结果
6
7
8
9
10
1
2
3
4
5
###
    说明: 
         1- foreach()算子对自定义函数不要求有返回值,另外该算子也没有返回值
         2- 因为底层是多线程运行的,因此输出结果分区间可能乱序
         3- 该算子一般用来对结果数据保存到数据库或者文件中

本文转载自: https://blog.csdn.net/alie_123/article/details/135394812
版权归原作者 alie_123 所有, 如有侵权,请联系我们删除。

“[PySpark学习]RDD的转换(Transformation)与动作算子(Action)”的评论:

还没有评论