Spark之RDD,常用的分析算子大全
一、分组聚合算子
**
groupBy
**
- 功能:根据指定的函数将 RDD 中的元素进行分组,返回一个新的 RDD,其中每个元素是一个键值对,键是分组的依据,值是一个可迭代的对象,包含该组的所有元素。
- 示例:
- 要求:只有KV类型的RDD才能调用
- 分类:转换算子
- 场景:需要对数据进行分组的场景,或者说分组以后的聚合逻辑比较复杂,不适合用reduce
- 特点:必须经过Shuffle,可以指定新的RDD分区个数,可以指定分区规则
rdd = sc.parallelize([1, 2, 3, 4, 5])
def f(x):
return x % 2
grouped_rdd = rdd.groupBy(f)
result = grouped_rdd.collect()
for key, value in result:
print(f"Key: {key}, Values: {list(value)}")
单词计数
# step1: 读取数据
input_rdd = sc.textFile("../datas/wordcount/word.txt")
# step2: 处理数据
tuple_rdd = ( input_rdd
.filter(lambda line: len(line.strip()) > 0)
.flatMap(lambda line: re.split("\\s+", line))
.map(lambda word: (word, 1))
)
tuple_rdd.foreach(lambda x: print(x))
print("=============================")
# 方案一:groupByKey先分组,再聚合
group_rdd = tuple_rdd.groupByKey()
group_rdd.foreach(lambda x: print(x[0], "------>", *x[1]))
print("=============================")
rs_rdd1 = group_rdd.map(lambda x: (x[0], sum(x[1])))
rs_rdd1.foreach(lambda rs: print(rs))
**
reduceByKey
**
- 功能:在一个由键值对组成的 RDD 上,使用指定的二元函数对具有相同键的值进行聚合操作。这个操作在每个键的分区内先进行局部聚合,然后再在所有分区之间进行全局聚合,相比先
groupBy
再在每个组内聚合更高效。 - 要求:只有KV类型的RDD才能调用
- 分类:转换算子
- 场景:需要对数据进行分组并且聚合的场景【reduce能够实现的聚合】
- 特点:必须经过shuffle,可以执行新的RDD分区个数,可以指定分区规则
- 示例:
rdd = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 5)])
reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
result = reduced_rdd.collect()
for key, value in result:
print(f"Key: {key}, Value: {value}")
**
aggregateByKey
**
- 功能:与
reduceByKey
类似,但提供了更多的灵活性。它需要提供一个初始值,并且可以分别指定分区内和分区间的聚合函数。 - 示例:
rdd = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 5)])
initial_value = 0
agg_rdd = rdd.aggregateByKey(initial_value, lambda x, y: x + y, lambda x, y: x + y)
result = agg_rdd.collect()
for key, value in result:
print(f"Key: {key}, Value: {value}")
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark import SparkContext, SparkConf
import os
import re
"""
-------------------------------------------------
Description : TODO:groupByKey 和 reduceByKey算子的使用
SourceFile : 01.pyspark_core_fun_group_agg
Author : Frank
Date : 2022/7/19
-------------------------------------------------
"""
if __name__ == '__main__':
# todo:0-设置系统环境变量:全部换成Linux地址
os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['HADOOP_HOME'] = '/export/server/hadoop'
os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'
# todo:1-构建SparkContext
conf = SparkConf().setMaster("local[2]").setAppName("Remote Test APP")
sc = SparkContext(conf=conf)
# todo:2-数据处理:读取、转换、保存
# step1: 读取数据
# 读取文件
input_rdd = sc.textFile("../datas/wordcount/word.txt")
# step2: 处理数据
# ETL:先转换为二元组
tuple_rdd = ( input_rdd
# 过滤空行
.filter(lambda line: len(line.strip()) > 0)
# 一行多个单词,变成一行一个单词
.flatMap(lambda line: re.split("\\s+", line))
# 将每个单词变成二元组KV结构
.map(lambda word: (word, 1))
)
tuple_rdd.foreach(lambda x: print(x))
print("========================================")
# 方式一:groupByKey + map:先分组再聚合
group_rdd = tuple_rdd.groupByKey()
# 也可以用groupBy:可以指定按照谁分组
# groupby_rdd = tuple_rdd.groupBy(lambda x: x[0])
# RDD[Tuple[K, V]]:V是一个list
# group_rdd.foreach(lambda x: print(x[0], "--------------->", *x[1]))
print("========================================")
# 聚合:hadoop 1 1 1 1 1 1 1 = hadoop 7
rs_rdd1 = group_rdd.map(lambda x: (x[0], sum(x[1])))
# rs_rdd1.foreach(lambda x: print(x))
# 方式二:reduceBykey:分组 + reduce聚合
rs_rdd2 = tuple_rdd.reduceByKey(lambda tmp, item: tmp + item)
rs_rdd2.foreach(lambda x: print(x))
# step3: 保存结果
# todo:3-关闭SparkContext
sc.stop()
- 注意:能用reduceByKey就不要用groupByKey+map- 工作中遇到了分组聚合- 先考虑reduceByKey能不能做,能用reduceByKey就不用groupByKey- 如果聚合逻辑用reduce函数实现不了,再考虑用groupByKey- 如果不是二元组类型,只能用groupBy
- 小结- groupByKey的功能场景是什么?- 功能:对KV类型的RDD按照Key进行分组,将相同的Key的所有Value放入列表中- 分类:转换算子- 场景:需要先分组再聚合,聚合逻辑比较复杂,不能用reduce实现- 特点:经过Shuffle- reduceByKey的功能场景是什么?- 功能:对KV类型的RDD按照Key进行分组,将相同Key的所有Value使用reduce进行聚合- 分类:转换算子- 场景:需要分组并聚合,而且能使用reduce实现聚合- 特点:经过Shuffle
二、排序算子
**
sortBy
**
- 功能:根据指定的函数对 RDD 中的元素进行排序,可以指定升序或降序。
- 分类:转换算子
- 场景:适用于所有对数据排序的场景,一般用于对大数据量非KV类型的RDD的数据排序
- 特点:经过Shuffle,可以指定排序后新RDD的分区个数
- 示例:
rdd = sc.parallelize([3, 1, 4, 1, 5, 9, 2, 6])
sorted_rdd = rdd.sortBy(lambda x: x, ascending = True)
result = sorted_rdd.collect()
for num in result:
print(num)
# step1: 读取数据
input_rdd = sc.textFile("../datas/function_data/sort.txt")
# step2: 处理数据
# sortBy算子:数据量大,RDD非KV类型RDD
def split_line(line):
arr = line.strip().split(",")
return (arr[0], arr[1], arr[2])
sort_by_rdd = input_rdd.sortBy(keyfunc=lambda line: split_line(line)[1], ascending=False)
# step3: 保存结果
sort_by_rdd.foreach(lambda x: print(x))
sort_by_rdd.saveAsTextFile(path="../datas/output/sort/sort-1")
**
sortByKey
**
- 功能:对RDD中的所有元素按照Key进行整体排序,可以指定排序规则【升序还是降序】
- sortByKey既能实现全局排序,也能实现局部有序
- sortByKey会触发job构建Task
- sortByKey在经过shuffle时,只调用RangePartition,不允许修改分区器
- 语法:
def sortBy(self, keyFunc:(T) -> 0, asc: bool, numPartitions) -> RDD
keyFunc:(T) -> 0:用于指定按照数据中的哪个值进行排序
asc: bool:用于指定升序还是降序,默认是升序
# step1: 读取数据
input_rdd = sc.textFile("../datas/function_data/sort.txt")
# step2: 处理数据
def split_line(line):
arr = line.strip().split(",")
return (arr[0], arr[1], arr[2])
# sortByKey算子:数据量,RDD就是KV类型RDD
# 手动将非KV类型转换成KV类型(年龄,名字+性别)
kv_rdd = input_rdd.map(lambda line: split_line(line)).map(lambda tuple: (tuple[1], (tuple[0], tuple[2])))
# 实现排序
sort_by_key_rdd = kv_rdd.sortByKey(ascending=False)
# step3: 保存结果
sort_by_key_rdd.foreach(lambda x: print(x))
sort_by_key_rdd.saveAsTextFile(path="../datas/output/sort/sort-2")
- 两个算子区别- sortBy:可以指定按照谁排序,不限制RDD的结构- sortBykey:只能按照Key排序,只有KV类型的RDD才能用- 性能有没有区别:不是按照Key排序,性能没有区别,哪个方便哪个来,sortBy底层调用的就是sortByKey,自动根据指定参数,将参数作为Key
- 小结:sortBy和sortByKey的功能场景是什么?- 功能:实现对RDD所有元素的排序- 场景:大数据量全局或者局部排序- 特点:经过shuffle- 关联:本质上sortBy调用的还是sortByKey- 区别:sortBy可以指定按照谁排序,sortByKey只能按照Key排序
三、TopN 算子
**
top
**
- 功能:返回 RDD 中的前 N 个最大元素。默认是按照自然顺序(对于数字是从大到小),可以通过自定义比较函数来改变顺序。
- 分类:触发算子
- 场景:取RDD数据中的最大的TopN个元素
- 特点:不经过Shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量
- 语法
- def top(self, num) -> List[0]
- 需求:取RDD中最大的前5个元素
- 示例:
rdd = sc.parallelize([3, 1, 4, 1, 5, 9, 2, 6])
top_elements = rdd.top(3)
for num in top_elements:
print(num)
numRdd = sc.parallelize([1,5,2,6,9,10,4,3,8,7])
top_list = numRdd.top(5)
print(*top_list)
takeOrdered算子
- 功能:对RDD中的所有元素升序排序,并返回前N个元素,即返回RDD中最小的前N个元数据
- 分类:触发算子
- 场景:取RDD数据中的最小的TopN个元素
- 特点:不经过Shuffle,将所有元素放入Driver内存中排序,只能适合处理小数据量
- 语法
- def takeOrdered(self,num) -> List[0]
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from pyspark import SparkContext, SparkConf
import os
"""
-------------------------------------------------
Description : TODO:top 和 takeOrdered
SourceFile : 03.pyspark_core_fun_top_takeOrdered
Author : Frank
Date : 2022/7/19
-------------------------------------------------
"""
if __name__ == '__main__':
# todo:0-设置系统环境变量:全部换成Linux地址
os.environ['JAVA_HOME'] = '/export/server/jdk'
os.environ['HADOOP_HOME'] = '/export/server/hadoop'
os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'
os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'
# todo:1-构建SparkContext
conf = SparkConf().setMaster("local[2]").setAppName("Remote Test APP")
sc = SparkContext(conf=conf)
# todo:2-数据处理:读取、转换、保存
# step1: 读取数据
# 构建数据
numRdd = sc.parallelize([1, 5, 2, 6, 9, 10, 4, 3, 8, 7])
numRdd.foreach(lambda x: print(x))
print("=======================================")
# step2: 处理数据
# top:将RDD中所有元素放入Driver内存中,对元素降序排序,取前N个
top_list = numRdd.top(5)
print(*top_list)
print("=============")
# takeOrdered:将RDD所有元素升序排序,取前N个值
take_ordered_list = numRdd.takeOrdered(5)
print(*take_ordered_list)
# step3: 保存结果
# todo:3-关闭SparkContext
sc.stop()
四、重分区算子
- 问题1:RDD1有10个分区,每个分区100万条数据,想加快处理速度,构建100个分区,每个分区处理10万条数据?- 需求:提高RDD的分区数
- 问题2:RDD1有10个分区,每个分区100万条数据,经过聚合以后,只有10条数据了,希望都放入1个分区怎么做?- 需求:降低RDD的分区数
**
repartition
**
- 功能:增加或减少 RDD 的分区数。它会通过网络混洗数据来重新分布分区。
- 功能:调整RDD的分区个数
- 分类:转换算子
- 场景:一般用于调大分区个数【由小变大】,必须经过shuffle才能实现
- 特点:必须经过Shuffle过程,repartition底层就是coalesce(shuffle=True)
- 语法
- def repartition(self,numPartitions) -> RDD[T]
- 示例:
rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
repartitioned_rdd = rdd.repartition(3)
print(repartitioned_rdd.getNumPartitions())
**
coalesce
**
- 功能:用于减少 RDD 的分区数。如果要增加分区数并且希望进行数据混洗,可以设置
shuffle = True
。与repartition
不同,coalesce
默认不会进行数据混洗,效率更高,适用于减少分区的情况。 - 示例:
rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
coalesced_rdd = rdd.coalesce(2)
print(coalesced_rdd.getNumPartitions())
五、连接算子
**
join
**
- 功能:对两个键值对形式的 RDD 进行连接操作,返回一个新的 RDD,其中包含具有相同键的键值对元组。
- 示例:
rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
rdd2 = sc.parallelize([(1, 'x'), (2, 'y')])
joined_rdd = rdd1.join(rdd2)
result = joined_rdd.collect()
for key, (value1, value2) in result:
print(f"Key: {key}, Value1: {value1}, Value2: {value2}")
**
leftOuterJoin
**
- 功能:类似于
join
操作,但对于左 RDD 中的每个键,即使右 RDD 中没有对应的键,也会包含在结果中,右值用None
填充。 - 示例:
rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
rdd2 = sc.parallelize([(1, 'x')])
left_joined_rdd = rdd1.leftOuterJoin(rdd2)
result = left_joined_rdd.collect()
for key, (value1, value2) in result:
print(f"Key: {key}, Value1: {value1}, Value2: {value2 if value2 else None}")
**
rightOuterJoin
**
- 功能:与
leftOuterJoin
相反,对于右 RDD 中的每个键,即使左 RDD 中没有对应的键,也会包含在结果中,左值用None
填充。 - 示例:
rdd1 = sc.parallelize([(1, 'a')])
rdd2 = sc.parallelize([(1, 'x'), (2, 'y')])
right_joined_rdd = rdd2.rightOuterJoin(rdd1)
result = right_joined_rdd.collect()
for key, (value2, value1) in result:
print(f"Key: {key}, Value1: {value1 if value1 else None}, Value2: {value2}")
**
fullOuterJoin
**
- 功能:对两个 RDD 进行全外连接,结果包含两个 RDD 中所有的键,对于没有匹配的键,对应的的值用
None
填充。 - 示例:
rdd1 = sc.parallelize([(1, 'a')])
rdd2 = sc.parallelize([(2, 'b')])
full_joined_rdd = rdd1.fullOuterJoin(rdd2)
result = full_joined_rdd.collect()
for key, (value1, value2) in result:
print(f"Key: {key}, Value1: {value1 if value1 else None}, Value2: {value2 if value2 else None}")
综合案例:
# encoding=utf-8
# 连接与合并
from pyspark import SparkContext
sc = SparkContext()
# 导入数据
rdd_data = sc.textFile('hdfs://node1:8020/test/test.txt')
rdd_data1 = sc.textFile('hdfs://node1:8020/test/test100.txt')
'''
连接是以key值取当做的连接条件
'''
# 切割
rdd_data_split =rdd_data.map(lambda x:x.split(','))
rdd_data_split1 =rdd_data1.map(lambda x:x.split(','))
#分组
rdd_data_split_tup =rdd_data_split.map(lambda x:(x[0],x))
rdd_data_split_tup1 =rdd_data_split1.map(lambda x:(x[0],x))
#内连接
rdd_join = rdd_data_split_tup.join(rdd_data_split_tup1)
#左连接
rdd_leftjoin = rdd_data_split_tup.leftOuterJoin(rdd_data_split_tup1)
#右连接
rdd_rightjoin = rdd_data_split_tup.rightOuterJoin(rdd_data_split_tup1)
#全连接
rdd_fulljoin = rdd_data_split_tup.fullOuterJoin(rdd_data_split_tup1)
#合并
rdd_unionall = rdd_data1.union(rdd_data1)
#合并并去重
rdd_union = rdd_data.union(rdd_data1).distinct()
print(rdd_data.collect())
print('-'*50)
print(rdd_data1.collect())
print('-'*50,'内连接')
print(rdd_join.collect())
print('-'*50,'左连接')
print(rdd_leftjoin.collect())
print('-'*50,'右连接')
print(rdd_rightjoin.collect())
print('-'*50,'全连接')
print(rdd_fulljoin.collect())
print('-'*50,'纵向合并')
print(rdd_unionall.collect())
print('-'*50,'纵向合并并去重')
print(rdd_union.collect())
- 小结- repartition的功能场景是什么?- 功能:调整分区个数- 场景:一般用于增大分区个数- 特点:一定会经过Shuffle- coalesce的功能场景是什么?- 功能:调整分区个数- 场景:一般用于降低分区个数- 特点:可以自由选择是否经过shuffle过程
六、合并算子
**
union
**
- 功能:将两个 RDD 合并成一个新的 RDD,不去重。
- 示例:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
union_rdd = rdd1.union(rdd2)
result = union_rdd.collect()
for num in result:
print(num)
如果去重则用 distinct
**
intersection
**
- 功能:返回两个 RDD 的交集,即同时存在于两个 RDD 中的元素组成的 RDD。
- 示例:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
intersection_rdd = rdd1.intersection(rdd2)
result = intersection_rdd.collect()
for num in result:
print(num)
**
subtract
**
- 功能:返回在第一个 RDD 中存在,但在第二个 RDD 中不存在的元素组成的 RDD。
- 示例:
rdd1 = sc.parallelize([1, 2, 3])
rdd2 = sc.parallelize([3, 4, 5])
subtract_rdd = rdd1.subtract(rdd2)
result = subtract_rdd.collect()
for num in result:
print(num)
总结
- 分组聚合算子可以使用 groupby +mapvalue算子
- 排序算子可以使用sortBY 或者sortByKey
- topN算子可以用top
- 重新分区算子可以用repartition或者coalesce算子增加或者降低分区
- 可以使用不同的join实现sql中的 内连接,左连接,右连接,全外连接
- 可以使用union纵向合并
- 可以使用intersection求交集
- 可以使用subtract求第一个元素中存在,第二个元素中不存在的数据
本文转载自: https://blog.csdn.net/qq_55006020/article/details/142769852
版权归原作者 凡梦_leo 所有, 如有侵权,请联系我们删除。
版权归原作者 凡梦_leo 所有, 如有侵权,请联系我们删除。