文章目录
1. 算子(方法)介绍
rdd中封装了各种算子方便进行计算,主要分为两类:
transformation 转换算子
- 对RDD数据进行转化得到新的RDD
,定义了一个线程任务。- 常见:map、filter、flatMap、reduceByKey、groupByKey、sortByKeyaction 触发算子
-触发计算任务
,让计算任务进行执行,得到结果。- 触发线程执行的。- 常见:foreach、first、count、reduce、saveAsTextFile、collect、take
RDD的转换算子大部分都是从RDD中读取元素数据(RDD中每条数据),具体计算需要开发人员编写函数传递到RDD算子中。
RDD的执行算子则大部分是用来获取数据,collect方法就是触发算子。
注意:
- 转换算子是lazy模式,一般不会触发job和task的运行,返回值一定是RDD。
- 执行算子,会触发job和task的运行,返回值一定不是RDD。
2. 常用transformation算子
2.1 map
- RDD.map(lambda 参数:参数计算)
- 参数接受每个元素数据
# Map算子使用# map算子主要使用长场景,一个转化rdd中每个元素的数据类型,拼接rdd中的元素数据,对rdd中的元素进行需求处理# 需求,处理hdfs中的学生数据,单独获取每个学生的信息from pyspark import SparkContext
sc = SparkContext()# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')# 2- 使用转化算子进行数据处理# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x:x.split(','))# 3-从rdd2中获取姓名数据
rdd3 = rdd2.map(lambda x:x[1])# lambda 函数能进行简单的数据计算,如果遇到复杂数据计算时,就需要使用自定义函数# 获取年龄数据,并且转化年龄数据为int类型,将年龄和性别合并一起保存成元组 (男,20) (女,21)deffunc(x):# 1-先切割数据
data_split = x.split(',')# 2-转化数据类型
age =int(data_split[2])# 3-拼接性别和年龄
data_tuple =(data_split[3],age)return data_tuple
# 将函数的名字传递到map中,不要加括号
rdd4 = rdd.map(func)# 触发执行算子,查看读取的数据
res = rdd.collect()print(res)
res2 = rdd2.collect()print(res2)
res3 = rdd3.collect()print(res3)
res4 = rdd4.collect()print(res4)
运行结果:
2.2 flatMap
- 处理的是二维嵌套列表数据[[1,‘张三’],[2,‘李四’],[3,‘王五’]] --> [1, ‘张三’, 2, ‘李四’, 3, ‘王五’]
- rdd.flatMap(lambda 参数:[参数计算])
#flatmap算子使用# 主要使用场景是对二维嵌套的数据降维操作 [[1,'张三'],[2,'李四'],[3,'王五']] --> [1, '张三', 2, '李四', 3, '王五']from pyspark import SparkContext
sc = SparkContext()#生成rdd
rdd = sc.parallelize([[1,'张三'],[2,'李四'],[3,'王五']])#使用flatmap算子进行转化
rdd2 = rdd.flatMap(lambda x: x)#查看数据
res = rdd2.collect()print(res)
运行结果:
2.3 filter
- rdd.filter(lambda 参数:参数条件过滤)
- 条件过滤的书写和Python中if判断的一样
# RDD数据过滤# 需求:过滤年龄大于20岁的信息from pyspark import SparkContext
sc = SparkContext()# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')# 2- 使用转化算子进行数据处理# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x:x.split(','))#使用fliter方法进行数据过滤
rdd3 = rdd2.filter(lambda x:int(x[2])>20)
rdd4 = rdd2.filter(lambda x:x[3]=='男')# 查看数据
res = rdd2.collect()print(res)
res3 = rdd3.collect()print(res3)
res4 = rdd4.collect()print(res4)
运行结果:
2.4 distinct
- 不需要lambda rdd.distinct
# distinct 去重算子# rdd中有重复数据时,可以进行去重from pyspark import SparkContext
sc = SparkContext()# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')# 2- 使用转化算子进行数据处理# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x: x.split(','))# 3-从rdd2中获取性别数据
rdd3 = rdd2.map(lambda x: x[3])#对rdd3中的数据去重
rdd4 = rdd3.distinct()#查看数据
res = rdd3.collect()print(res)
res1 = rdd4.collect()print(res1)
运行结果:
2.6 groupBy
- rdd.groupBy(lambda 参数:根据参数编写分组条件)
- mapValues(list)
# groupBy分组# 按照不同性别进行分组# 原理: 就是对需要分组的数据进行hash取余数 ,余数相同会放入同一组from pyspark import SparkContext
sc = SparkContext()# 1- 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')# 2- 使用转化算子进行数据处理# map中的lambda表达式,必须定义一个参数,用来接收rdd中的元素数据, 注意:x参数如何处理,要看x接收的数据类型
rdd2 = rdd.map(lambda x: x.split(','))# 3- 对性别进行分组
rdd3 = rdd2.groupBy(lambda x:hash(x[3])%2)#查看分组的数据内容 mapValues 取出分组后的数据值,对数据值转为列表即可
rdd4 = rdd3.mapValues(lambda x:list(x))# 查看数据内容
res = rdd2.collect()print(res)
res3 = rdd3.collect()print(res3)
res4 = rdd4.collect()print(res4)
运行结果:
2.7 sortBy()
- rdd.sortBy(lambda x:x,ascending=False)
#RDD的数据排序from pyspark import SparkContext
sc = SparkContext()# 生成rdd数据# 非k,v数据
rdd = sc.parallelize([4,7,3,2,8])#在spark中使用元组表示k,v数据
rdd2 = sc.parallelize([('张三',90),('李四',70),('王五',99)])# 数据排序
rdd3 = rdd.sortBy(lambda x: x)
rdd4 = rdd.sortBy(lambda x: x,ascending=False)#k,V数据排序
rdd5 = rdd2.sortBy(lambda x: x[1],ascending=False)
rdd6 = rdd2.sortBy(lambda x: x[1])#查看结果
res = rdd3.collect()print(res)
res2 = rdd4.collect()print(res2)
res3 = rdd5.collect()print(res3)
res4 = rdd6.collect()print(res4)
运行结果:
2.8 k-v数据[(k,v),(k1,v1)]
- groupByKey() - rdd.groupByKey()
- reduceByKey() - rdd.reduceByKey(lambda 参数1,参数2:对两个参数计算)
- sortByKey() - rdd.sortByKey()
#k,v结构数据处理from pyspark import SparkContext
sc = SparkContext()#k,v分组# 1. 读取hdfs中的学生数据
rdd = sc.textFile('hdfs://node1:8020/data/stu.txt')# 2. 使用转化算子进行数据处理
rdd2 = rdd.map(lambda x: x.split(','))#将数据转为k,v结构,然后进行分组,把分组的字段作为key值
rdd3 = rdd2.map(lambda x:(x[3], x))# 使用groupBykey方法,按key进行分组
rdd4 = rdd3.groupByKey().mapValues(lambda x:list(x))#k,v数据计算#统计不同性别的年龄总和 (求和 平均数 最大值 最小值 数量)#将需要计算的数据转为k,v结构 分组的字段是key值 聚合数据是value值
rdd5 = rdd2.map(lambda x:(x[3],int(x[2])))# 使用reduceBykey方法进行聚合计算 会将相同key值的数据先合并,然后在聚合计算# 聚合计算的算子,lambda x,y 需要结合两个参数
rdd6 = rdd5.reduceByKey(lambda x, y: x+y)
rdd7 = rdd5.groupByKey().mapValues(lambda x:sum(list(x))/len(list(x)))
rdd8 = rdd5.groupByKey().mapValues(lambda x:max(list(x)))
res = rdd2.collect()print(res)
res3 = rdd3.collect()print(res3)
res4 = rdd4.collect()print(res4)
res5 = rdd5.collect()print(res5)
res6 = rdd6.collect()print(res6)
res7 = rdd7.collect()print(res7)
res8 = rdd8.collect()print(res8)
运行结果:
3. 常用action算子
- collecct()取出RDD中所有值 - rdd.collect()
- reduce() 非k-v类型数据累加[1,2,3,4,6] - rdd.reduce(lambda 参数1,参数2:两个参数计算)
- count() 统计RDD元素个数 - rdd.count()
- take() 取出指定数量值 - rdd.take(数量)
# action算子使用# 触发转化算子执行from pyspark import SparkContext
sc = SparkContext()# 生成rdd
rdd = sc.parallelize([1,2,3,4])
rdd_kv = sc.parallelize([('a',2),('b',3)])# 进行转化处理# 使用action# 获取rdd中所有元素数据,转为列表展示
res = rdd.collect()print(res)# 指定取出的数据数量
res2 = rdd.take(3)print(res2)# 对非kv数据计算# 求和
res3 = rdd.reduce(lambda x,y:x+y)print(res3)# 求数量
res4 = rdd.count()print(res4)# 求最大值
res5 = rdd.max()print(res5)
res6 = rdd.mean()print(res6)# 将kv数据转为字典输出
res7 = rdd_kv.collectAsMap()print(res7)# 将rdd结果保存到hdfs 指定目录路径,指定的目录不能存在
rdd_kv.saveAsTextFile('hdfs://node1:8020/data/result')
运行结果:
版权归原作者 @听风吟 所有, 如有侵权,请联系我们删除。