0


Spark之RDD,常用的分析算子大全 分组聚合,排序,重分区,连接合并等算子

Spark之RDD,常用的分析算子大全

一、分组聚合算子

**

  1. groupBy

**

  • 功能:根据指定的函数将 RDD 中的元素进行分组,返回一个新的 RDD,其中每个元素是一个键值对,键是分组的依据,值是一个可迭代的对象,包含该组的所有元素。
  • 示例
  • 要求:只有KV类型的RDD才能调用
  • 分类:转换算子
  • 场景需要对数据进行分组的场景,或者说分组以后的聚合逻辑比较复杂,不适合用reduce
  • 特点必须经过Shuffle,可以指定新的RDD分区个数,可以指定分区规则
  1. rdd = sc.parallelize([1, 2, 3, 4, 5])
  2. def f(x):
  3. return x % 2
  4. grouped_rdd = rdd.groupBy(f)
  5. result = grouped_rdd.collect()
  6. for key, value in result:
  7. print(f"Key: {key}, Values: {list(value)}")

单词计数

  1. # step1: 读取数据
  2. input_rdd = sc.textFile("../datas/wordcount/word.txt")
  3. # step2: 处理数据
  4. tuple_rdd = ( input_rdd
  5. .filter(lambda line: len(line.strip()) > 0)
  6. .flatMap(lambda line: re.split("\\s+", line))
  7. .map(lambda word: (word, 1))
  8. )
  9. tuple_rdd.foreach(lambda x: print(x))
  10. print("=============================")
  11. # 方案一:groupByKey先分组,再聚合
  12. group_rdd = tuple_rdd.groupByKey()
  13. group_rdd.foreach(lambda x: print(x[0], "------>", *x[1]))
  14. print("=============================")
  15. rs_rdd1 = group_rdd.map(lambda x: (x[0], sum(x[1])))
  16. rs_rdd1.foreach(lambda rs: print(rs))

**

  1. reduceByKey

**

  • 功能:在一个由键值对组成的 RDD 上,使用指定的二元函数对具有相同键的值进行聚合操作。这个操作在每个键的分区内先进行局部聚合,然后再在所有分区之间进行全局聚合,相比先groupBy再在每个组内聚合更高效。
  • 要求:只有KV类型的RDD才能调用
  • 分类:转换算子
  • 场景需要对数据进行分组并且聚合的场景【reduce能够实现的聚合】
  • 特点必须经过shuffle,可以执行新的RDD分区个数,可以指定分区规则
  • 示例
  1. rdd = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 5)])
  2. reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
  3. result = reduced_rdd.collect()
  4. for key, value in result:
  5. print(f"Key: {key}, Value: {value}")

**

  1. aggregateByKey

**

  • 功能:与reduceByKey类似,但提供了更多的灵活性。它需要提供一个初始值,并且可以分别指定分区内和分区间的聚合函数。
  • 示例
  1. rdd = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 5)])
  2. initial_value = 0
  3. agg_rdd = rdd.aggregateByKey(initial_value, lambda x, y: x + y, lambda x, y: x + y)
  4. result = agg_rdd.collect()
  5. for key, value in result:
  6. print(f"Key: {key}, Value: {value}")
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. from pyspark import SparkContext, SparkConf
  4. import os
  5. import re
  6. """
  7. -------------------------------------------------
  8. Description : TODO:groupByKey 和 reduceByKey算子的使用
  9. SourceFile : 01.pyspark_core_fun_group_agg
  10. Author : Frank
  11. Date : 2022/7/19
  12. -------------------------------------------------
  13. """
  14. if __name__ == '__main__':
  15. # todo:0-设置系统环境变量:全部换成Linux地址
  16. os.environ['JAVA_HOME'] = '/export/server/jdk'
  17. os.environ['HADOOP_HOME'] = '/export/server/hadoop'
  18. os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'
  19. os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'
  20. # todo:1-构建SparkContext
  21. conf = SparkConf().setMaster("local[2]").setAppName("Remote Test APP")
  22. sc = SparkContext(conf=conf)
  23. # todo:2-数据处理:读取、转换、保存
  24. # step1: 读取数据
  25. # 读取文件
  26. input_rdd = sc.textFile("../datas/wordcount/word.txt")
  27. # step2: 处理数据
  28. # ETL:先转换为二元组
  29. tuple_rdd = ( input_rdd
  30. # 过滤空行
  31. .filter(lambda line: len(line.strip()) > 0)
  32. # 一行多个单词,变成一行一个单词
  33. .flatMap(lambda line: re.split("\\s+", line))
  34. # 将每个单词变成二元组KV结构
  35. .map(lambda word: (word, 1))
  36. )
  37. tuple_rdd.foreach(lambda x: print(x))
  38. print("========================================")
  39. # 方式一:groupByKey + map:先分组再聚合
  40. group_rdd = tuple_rdd.groupByKey()
  41. # 也可以用groupBy:可以指定按照谁分组
  42. # groupby_rdd = tuple_rdd.groupBy(lambda x: x[0])
  43. # RDD[Tuple[K, V]]:V是一个list
  44. # group_rdd.foreach(lambda x: print(x[0], "--------------->", *x[1]))
  45. print("========================================")
  46. # 聚合:hadoop 1 1 1 1 1 1 1 = hadoop 7
  47. rs_rdd1 = group_rdd.map(lambda x: (x[0], sum(x[1])))
  48. # rs_rdd1.foreach(lambda x: print(x))
  49. # 方式二:reduceBykey:分组 + reduce聚合
  50. rs_rdd2 = tuple_rdd.reduceByKey(lambda tmp, item: tmp + item)
  51. rs_rdd2.foreach(lambda x: print(x))
  52. # step3: 保存结果
  53. # todo:3-关闭SparkContext
  54. sc.stop()
  • 注意:能用reduceByKey就不要用groupByKey+map- 工作中遇到了分组聚合- 先考虑reduceByKey能不能做,能用reduceByKey就不用groupByKey- 如果聚合逻辑用reduce函数实现不了,再考虑用groupByKey- 如果不是二元组类型,只能用groupBy
  • 小结- groupByKey的功能场景是什么?- 功能:对KV类型的RDD按照Key进行分组,将相同的Key的所有Value放入列表中- 分类:转换算子- 场景:需要先分组再聚合,聚合逻辑比较复杂,不能用reduce实现- 特点:经过Shuffle- reduceByKey的功能场景是什么?- 功能:对KV类型的RDD按照Key进行分组,将相同Key的所有Value使用reduce进行聚合- 分类:转换算子- 场景:需要分组并聚合,而且能使用reduce实现聚合- 特点:经过Shuffle

二、排序算子

**

  1. sortBy

**

  • 功能:根据指定的函数对 RDD 中的元素进行排序,可以指定升序或降序。
  • 分类:转换算子
  • 场景适用于所有对数据排序的场景,一般用于对大数据量非KV类型的RDD的数据排序
  • 特点:经过Shuffle,可以指定排序后新RDD的分区个数
  • 示例
  1. rdd = sc.parallelize([3, 1, 4, 1, 5, 9, 2, 6])
  2. sorted_rdd = rdd.sortBy(lambda x: x, ascending = True)
  3. result = sorted_rdd.collect()
  4. for num in result:
  5. print(num)
  1. # step1: 读取数据
  2. input_rdd = sc.textFile("../datas/function_data/sort.txt")
  3. # step2: 处理数据
  4. # sortBy算子:数据量大,RDD非KV类型RDD
  5. def split_line(line):
  6. arr = line.strip().split(",")
  7. return (arr[0], arr[1], arr[2])
  8. sort_by_rdd = input_rdd.sortBy(keyfunc=lambda line: split_line(line)[1], ascending=False)
  9. # step3: 保存结果
  10. sort_by_rdd.foreach(lambda x: print(x))
  11. sort_by_rdd.saveAsTextFile(path="../datas/output/sort/sort-1")

**

  1. sortByKey

**

  • 功能:对RDD中的所有元素按照Key进行整体排序,可以指定排序规则【升序还是降序】
  • sortByKey既能实现全局排序,也能实现局部有序
  • sortByKey会触发job构建Task
  • sortByKey在经过shuffle时,只调用RangePartition,不允许修改分区器
  • 语法:
  1. def sortBy(self, keyFunc:(T) -> 0, asc: bool, numPartitions) -> RDD
  2. keyFunc:(T) -> 0:用于指定按照数据中的哪个值进行排序
  3. asc: bool:用于指定升序还是降序,默认是升序
  1. # step1: 读取数据
  2. input_rdd = sc.textFile("../datas/function_data/sort.txt")
  3. # step2: 处理数据
  4. def split_line(line):
  5. arr = line.strip().split(",")
  6. return (arr[0], arr[1], arr[2])
  7. # sortByKey算子:数据量,RDD就是KV类型RDD
  8. # 手动将非KV类型转换成KV类型(年龄,名字+性别)
  9. kv_rdd = input_rdd.map(lambda line: split_line(line)).map(lambda tuple: (tuple[1], (tuple[0], tuple[2])))
  10. # 实现排序
  11. sort_by_key_rdd = kv_rdd.sortByKey(ascending=False)
  12. # step3: 保存结果
  13. sort_by_key_rdd.foreach(lambda x: print(x))
  14. sort_by_key_rdd.saveAsTextFile(path="../datas/output/sort/sort-2")
  • 两个算子区别- sortBy:可以指定按照谁排序,不限制RDD的结构- sortBykey:只能按照Key排序,只有KV类型的RDD才能用- 性能有没有区别:不是按照Key排序,性能没有区别,哪个方便哪个来,sortBy底层调用的就是sortByKey,自动根据指定参数,将参数作为Key
  • 小结:sortBy和sortByKey的功能场景是什么?- 功能:实现对RDD所有元素的排序- 场景:大数据量全局或者局部排序- 特点:经过shuffle- 关联:本质上sortBy调用的还是sortByKey- 区别:sortBy可以指定按照谁排序,sortByKey只能按照Key排序

三、TopN 算子

**

  1. top

**

  • 功能:返回 RDD 中的前 N 个最大元素。默认是按照自然顺序(对于数字是从大到小),可以通过自定义比较函数来改变顺序。
  • 分类:触发算子
  • 场景:取RDD数据中的最大的TopN个元素
  • 特点:不经过Shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量
  • 语法
  • - def top(self, num) -> List[0]
  • 需求:取RDD中最大的前5个元素
  • 示例
  1. rdd = sc.parallelize([3, 1, 4, 1, 5, 9, 2, 6])
  2. top_elements = rdd.top(3)
  3. for num in top_elements:
  4. print(num)
  5. numRdd = sc.parallelize([1,5,2,6,9,10,4,3,8,7])
  6. top_list = numRdd.top(5)
  7. print(*top_list)

takeOrdered算子

  • 功能:对RDD中的所有元素升序排序,并返回前N个元素,即返回RDD中最小的前N个元数据
  • 分类:触发算子
  • 场景:取RDD数据中的最小的TopN个元素
  • 特点:不经过Shuffle,将所有元素放入Driver内存中排序,只能适合处理小数据量
  • 语法
  • def takeOrdered(self,num) -> List[0]
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. from pyspark import SparkContext, SparkConf
  4. import os
  5. """
  6. -------------------------------------------------
  7. Description : TODO:top 和 takeOrdered
  8. SourceFile : 03.pyspark_core_fun_top_takeOrdered
  9. Author : Frank
  10. Date : 2022/7/19
  11. -------------------------------------------------
  12. """
  13. if __name__ == '__main__':
  14. # todo:0-设置系统环境变量:全部换成Linux地址
  15. os.environ['JAVA_HOME'] = '/export/server/jdk'
  16. os.environ['HADOOP_HOME'] = '/export/server/hadoop'
  17. os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'
  18. os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'
  19. # todo:1-构建SparkContext
  20. conf = SparkConf().setMaster("local[2]").setAppName("Remote Test APP")
  21. sc = SparkContext(conf=conf)
  22. # todo:2-数据处理:读取、转换、保存
  23. # step1: 读取数据
  24. # 构建数据
  25. numRdd = sc.parallelize([1, 5, 2, 6, 9, 10, 4, 3, 8, 7])
  26. numRdd.foreach(lambda x: print(x))
  27. print("=======================================")
  28. # step2: 处理数据
  29. # top:将RDD中所有元素放入Driver内存中,对元素降序排序,取前N个
  30. top_list = numRdd.top(5)
  31. print(*top_list)
  32. print("=============")
  33. # takeOrdered:将RDD所有元素升序排序,取前N个值
  34. take_ordered_list = numRdd.takeOrdered(5)
  35. print(*take_ordered_list)
  36. # step3: 保存结果
  37. # todo:3-关闭SparkContext
  38. sc.stop()

四、重分区算子

  • 问题1:RDD1有10个分区,每个分区100万条数据,想加快处理速度,构建100个分区,每个分区处理10万条数据?- 需求:提高RDD的分区数
  • 问题2:RDD1有10个分区,每个分区100万条数据,经过聚合以后,只有10条数据了,希望都放入1个分区怎么做?- 需求:降低RDD的分区数

**

  1. repartition

**

  • 功能:增加或减少 RDD 的分区数。它会通过网络混洗数据来重新分布分区。
  • 功能:调整RDD的分区个数
  • 分类:转换算子
  • 场景:一般用于调大分区个数【由小变大】,必须经过shuffle才能实现
  • 特点:必须经过Shuffle过程,repartition底层就是coalesce(shuffle=True)
  • 语法
  • def repartition(self,numPartitions) -> RDD[T]
  • 示例
  1. rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
  2. repartitioned_rdd = rdd.repartition(3)
  3. print(repartitioned_rdd.getNumPartitions())

**

  1. coalesce

**

  • 功能:用于减少 RDD 的分区数。如果要增加分区数并且希望进行数据混洗,可以设置shuffle = True。与repartition不同,coalesce默认不会进行数据混洗,效率更高,适用于减少分区的情况。
  • 示例
  1. rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
  2. coalesced_rdd = rdd.coalesce(2)
  3. print(coalesced_rdd.getNumPartitions())

五、连接算子

**

  1. join

**

  • 功能:对两个键值对形式的 RDD 进行连接操作,返回一个新的 RDD,其中包含具有相同键的键值对元组。
  • 示例
  1. rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
  2. rdd2 = sc.parallelize([(1, 'x'), (2, 'y')])
  3. joined_rdd = rdd1.join(rdd2)
  4. result = joined_rdd.collect()
  5. for key, (value1, value2) in result:
  6. print(f"Key: {key}, Value1: {value1}, Value2: {value2}")

**

  1. leftOuterJoin

**

  • 功能:类似于join操作,但对于左 RDD 中的每个键,即使右 RDD 中没有对应的键,也会包含在结果中,右值用None填充。
  • 示例
  1. rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
  2. rdd2 = sc.parallelize([(1, 'x')])
  3. left_joined_rdd = rdd1.leftOuterJoin(rdd2)
  4. result = left_joined_rdd.collect()
  5. for key, (value1, value2) in result:
  6. print(f"Key: {key}, Value1: {value1}, Value2: {value2 if value2 else None}")

**

  1. rightOuterJoin

**

  • 功能:与leftOuterJoin相反,对于右 RDD 中的每个键,即使左 RDD 中没有对应的键,也会包含在结果中,左值用None填充。
  • 示例
  1. rdd1 = sc.parallelize([(1, 'a')])
  2. rdd2 = sc.parallelize([(1, 'x'), (2, 'y')])
  3. right_joined_rdd = rdd2.rightOuterJoin(rdd1)
  4. result = right_joined_rdd.collect()
  5. for key, (value2, value1) in result:
  6. print(f"Key: {key}, Value1: {value1 if value1 else None}, Value2: {value2}")

**

  1. fullOuterJoin

**

  • 功能:对两个 RDD 进行全外连接,结果包含两个 RDD 中所有的键,对于没有匹配的键,对应的的值用None填充。
  • 示例
  1. rdd1 = sc.parallelize([(1, 'a')])
  2. rdd2 = sc.parallelize([(2, 'b')])
  3. full_joined_rdd = rdd1.fullOuterJoin(rdd2)
  4. result = full_joined_rdd.collect()
  5. for key, (value1, value2) in result:
  6. print(f"Key: {key}, Value1: {value1 if value1 else None}, Value2: {value2 if value2 else None}")

综合案例:

  1. # encoding=utf-8
  2. # 连接与合并
  3. from pyspark import SparkContext
  4. sc = SparkContext()
  5. # 导入数据
  6. rdd_data = sc.textFile('hdfs://node1:8020/test/test.txt')
  7. rdd_data1 = sc.textFile('hdfs://node1:8020/test/test100.txt')
  8. '''
  9. 连接是以key值取当做的连接条件
  10. '''
  11. # 切割
  12. rdd_data_split =rdd_data.map(lambda x:x.split(','))
  13. rdd_data_split1 =rdd_data1.map(lambda x:x.split(','))
  14. #分组
  15. rdd_data_split_tup =rdd_data_split.map(lambda x:(x[0],x))
  16. rdd_data_split_tup1 =rdd_data_split1.map(lambda x:(x[0],x))
  17. #内连接
  18. rdd_join = rdd_data_split_tup.join(rdd_data_split_tup1)
  19. #左连接
  20. rdd_leftjoin = rdd_data_split_tup.leftOuterJoin(rdd_data_split_tup1)
  21. #右连接
  22. rdd_rightjoin = rdd_data_split_tup.rightOuterJoin(rdd_data_split_tup1)
  23. #全连接
  24. rdd_fulljoin = rdd_data_split_tup.fullOuterJoin(rdd_data_split_tup1)
  25. #合并
  26. rdd_unionall = rdd_data1.union(rdd_data1)
  27. #合并并去重
  28. rdd_union = rdd_data.union(rdd_data1).distinct()
  29. print(rdd_data.collect())
  30. print('-'*50)
  31. print(rdd_data1.collect())
  32. print('-'*50,'内连接')
  33. print(rdd_join.collect())
  34. print('-'*50,'左连接')
  35. print(rdd_leftjoin.collect())
  36. print('-'*50,'右连接')
  37. print(rdd_rightjoin.collect())
  38. print('-'*50,'全连接')
  39. print(rdd_fulljoin.collect())
  40. print('-'*50,'纵向合并')
  41. print(rdd_unionall.collect())
  42. print('-'*50,'纵向合并并去重')
  43. print(rdd_union.collect())
  • 小结- repartition的功能场景是什么?- 功能:调整分区个数- 场景:一般用于增大分区个数- 特点:一定会经过Shuffle- coalesce的功能场景是什么?- 功能:调整分区个数- 场景:一般用于降低分区个数- 特点:可以自由选择是否经过shuffle过程

六、合并算子

**

  1. union

**

  • 功能:将两个 RDD 合并成一个新的 RDD,不去重。
  • 示例
  1. rdd1 = sc.parallelize([1, 2, 3])
  2. rdd2 = sc.parallelize([3, 4, 5])
  3. union_rdd = rdd1.union(rdd2)
  4. result = union_rdd.collect()
  5. for num in result:
  6. print(num)

如果去重则用 distinct

**

  1. intersection

**

  • 功能:返回两个 RDD 的交集,即同时存在于两个 RDD 中的元素组成的 RDD。
  • 示例
  1. rdd1 = sc.parallelize([1, 2, 3])
  2. rdd2 = sc.parallelize([3, 4, 5])
  3. intersection_rdd = rdd1.intersection(rdd2)
  4. result = intersection_rdd.collect()
  5. for num in result:
  6. print(num)

**

  1. subtract

**

  • 功能:返回在第一个 RDD 中存在,但在第二个 RDD 中不存在的元素组成的 RDD。
  • 示例
  1. rdd1 = sc.parallelize([1, 2, 3])
  2. rdd2 = sc.parallelize([3, 4, 5])
  3. subtract_rdd = rdd1.subtract(rdd2)
  4. result = subtract_rdd.collect()
  5. for num in result:
  6. print(num)

总结

  • 分组聚合算子可以使用 groupby +mapvalue算子
  • 排序算子可以使用sortBY 或者sortByKey
  • topN算子可以用top
  • 重新分区算子可以用repartition或者coalesce算子增加或者降低分区
  • 可以使用不同的join实现sql中的 内连接,左连接,右连接,全外连接
  • 可以使用union纵向合并
  • 可以使用intersection求交集
  • 可以使用subtract求第一个元素中存在,第二个元素中不存在的数据

本文转载自: https://blog.csdn.net/qq_55006020/article/details/142769852
版权归原作者 凡梦_leo 所有, 如有侵权,请联系我们删除。

“Spark之RDD,常用的分析算子大全 分组聚合,排序,重分区,连接合并等算子”的评论:

还没有评论