0


Spark之RDD,常用的分析算子大全 分组聚合,排序,重分区,连接合并等算子

Spark之RDD,常用的分析算子大全

一、分组聚合算子

**

groupBy

**

  • 功能:根据指定的函数将 RDD 中的元素进行分组,返回一个新的 RDD,其中每个元素是一个键值对,键是分组的依据,值是一个可迭代的对象,包含该组的所有元素。
  • 示例
  • 要求:只有KV类型的RDD才能调用
  • 分类:转换算子
  • 场景需要对数据进行分组的场景,或者说分组以后的聚合逻辑比较复杂,不适合用reduce
  • 特点必须经过Shuffle,可以指定新的RDD分区个数,可以指定分区规则
     rdd = sc.parallelize([1, 2, 3, 4, 5])
     def f(x):
         return x % 2
     grouped_rdd = rdd.groupBy(f)
     result = grouped_rdd.collect()
     for key, value in result:
         print(f"Key: {key}, Values: {list(value)}")

单词计数

    # step1: 读取数据
    input_rdd = sc.textFile("../datas/wordcount/word.txt")

    # step2: 处理数据
    tuple_rdd = ( input_rdd
                  .filter(lambda line: len(line.strip()) > 0)
                  .flatMap(lambda line: re.split("\\s+", line))
                  .map(lambda word: (word, 1))
    )
    tuple_rdd.foreach(lambda x: print(x))
    print("=============================")

    # 方案一:groupByKey先分组,再聚合
    group_rdd = tuple_rdd.groupByKey()
    group_rdd.foreach(lambda x: print(x[0], "------>", *x[1]))
    print("=============================")

    rs_rdd1 = group_rdd.map(lambda x: (x[0], sum(x[1])))
    rs_rdd1.foreach(lambda rs: print(rs))

**

reduceByKey

**

  • 功能:在一个由键值对组成的 RDD 上,使用指定的二元函数对具有相同键的值进行聚合操作。这个操作在每个键的分区内先进行局部聚合,然后再在所有分区之间进行全局聚合,相比先groupBy再在每个组内聚合更高效。
  • 要求:只有KV类型的RDD才能调用
  • 分类:转换算子
  • 场景需要对数据进行分组并且聚合的场景【reduce能够实现的聚合】
  • 特点必须经过shuffle,可以执行新的RDD分区个数,可以指定分区规则
  • 示例
     rdd = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 5)])
     reduced_rdd = rdd.reduceByKey(lambda x, y: x + y)
     result = reduced_rdd.collect()
     for key, value in result:
         print(f"Key: {key}, Value: {value}")

**

aggregateByKey

**

  • 功能:与reduceByKey类似,但提供了更多的灵活性。它需要提供一个初始值,并且可以分别指定分区内和分区间的聚合函数。
  • 示例
     rdd = sc.parallelize([(1, 2), (1, 3), (2, 4), (2, 5)])
     initial_value = 0
     agg_rdd = rdd.aggregateByKey(initial_value, lambda x, y: x + y, lambda x, y: x + y)
     result = agg_rdd.collect()
     for key, value in result:
         print(f"Key: {key}, Value: {value}")
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
import os
import re

"""
-------------------------------------------------
   Description :    TODO:groupByKey 和 reduceByKey算子的使用
   SourceFile  :    01.pyspark_core_fun_group_agg
   Author      :    Frank
   Date        :    2022/7/19
-------------------------------------------------
"""

if __name__ == '__main__':
    # todo:0-设置系统环境变量:全部换成Linux地址
    os.environ['JAVA_HOME'] = '/export/server/jdk'
    os.environ['HADOOP_HOME'] = '/export/server/hadoop'
    os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'
    os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'

    # todo:1-构建SparkContext
    conf = SparkConf().setMaster("local[2]").setAppName("Remote Test APP")
    sc = SparkContext(conf=conf)

    # todo:2-数据处理:读取、转换、保存
    # step1: 读取数据
    # 读取文件
    input_rdd = sc.textFile("../datas/wordcount/word.txt")

    # step2: 处理数据
    # ETL:先转换为二元组
    tuple_rdd = ( input_rdd
                  # 过滤空行
                  .filter(lambda line: len(line.strip()) > 0)
                  # 一行多个单词,变成一行一个单词
                  .flatMap(lambda line: re.split("\\s+", line))
                  # 将每个单词变成二元组KV结构
                  .map(lambda word: (word, 1))
    )
    tuple_rdd.foreach(lambda x: print(x))
    print("========================================")

    # 方式一:groupByKey + map:先分组再聚合
    group_rdd = tuple_rdd.groupByKey()
    # 也可以用groupBy:可以指定按照谁分组
    # groupby_rdd = tuple_rdd.groupBy(lambda x: x[0])
    # RDD[Tuple[K, V]]:V是一个list
    # group_rdd.foreach(lambda x: print(x[0], "--------------->", *x[1]))
    print("========================================")
    # 聚合:hadoop  1 1 1 1 1 1 1 = hadoop  7
    rs_rdd1 = group_rdd.map(lambda x: (x[0], sum(x[1])))
    # rs_rdd1.foreach(lambda x: print(x))

    # 方式二:reduceBykey:分组 + reduce聚合
    rs_rdd2 = tuple_rdd.reduceByKey(lambda tmp, item: tmp + item)
    rs_rdd2.foreach(lambda x: print(x))

    # step3: 保存结果

    # todo:3-关闭SparkContext
    sc.stop()
  • 注意:能用reduceByKey就不要用groupByKey+map- 工作中遇到了分组聚合- 先考虑reduceByKey能不能做,能用reduceByKey就不用groupByKey- 如果聚合逻辑用reduce函数实现不了,再考虑用groupByKey- 如果不是二元组类型,只能用groupBy
  • 小结- groupByKey的功能场景是什么?- 功能:对KV类型的RDD按照Key进行分组,将相同的Key的所有Value放入列表中- 分类:转换算子- 场景:需要先分组再聚合,聚合逻辑比较复杂,不能用reduce实现- 特点:经过Shuffle- reduceByKey的功能场景是什么?- 功能:对KV类型的RDD按照Key进行分组,将相同Key的所有Value使用reduce进行聚合- 分类:转换算子- 场景:需要分组并聚合,而且能使用reduce实现聚合- 特点:经过Shuffle

二、排序算子

**

sortBy

**

  • 功能:根据指定的函数对 RDD 中的元素进行排序,可以指定升序或降序。
  • 分类:转换算子
  • 场景适用于所有对数据排序的场景,一般用于对大数据量非KV类型的RDD的数据排序
  • 特点:经过Shuffle,可以指定排序后新RDD的分区个数
  • 示例
     rdd = sc.parallelize([3, 1, 4, 1, 5, 9, 2, 6])
     sorted_rdd = rdd.sortBy(lambda x: x, ascending = True)
     result = sorted_rdd.collect()
     for num in result:
         print(num)
    # step1: 读取数据
    input_rdd = sc.textFile("../datas/function_data/sort.txt")
    # step2: 处理数据
    # sortBy算子:数据量大,RDD非KV类型RDD
    def split_line(line):
        arr = line.strip().split(",")
        return (arr[0], arr[1], arr[2])

    sort_by_rdd = input_rdd.sortBy(keyfunc=lambda line: split_line(line)[1], ascending=False)

    # step3: 保存结果
    sort_by_rdd.foreach(lambda x: print(x))
    sort_by_rdd.saveAsTextFile(path="../datas/output/sort/sort-1")

**

sortByKey

**

  • 功能:对RDD中的所有元素按照Key进行整体排序,可以指定排序规则【升序还是降序】
  • sortByKey既能实现全局排序,也能实现局部有序
  • sortByKey会触发job构建Task
  • sortByKey在经过shuffle时,只调用RangePartition,不允许修改分区器
  • 语法:
def sortBy(self, keyFunc:(T) -> 0, asc: bool, numPartitions) -> RDD
    keyFunc:(T) -> 0:用于指定按照数据中的哪个值进行排序
    asc: bool:用于指定升序还是降序,默认是升序
    # step1: 读取数据
    input_rdd = sc.textFile("../datas/function_data/sort.txt")
    # step2: 处理数据
    def split_line(line):
        arr = line.strip().split(",")
        return (arr[0], arr[1], arr[2])

    # sortByKey算子:数据量,RDD就是KV类型RDD
    # 手动将非KV类型转换成KV类型(年龄,名字+性别)
    kv_rdd = input_rdd.map(lambda line: split_line(line)).map(lambda tuple: (tuple[1], (tuple[0], tuple[2])))
    # 实现排序
    sort_by_key_rdd = kv_rdd.sortByKey(ascending=False)

    # step3: 保存结果
    sort_by_key_rdd.foreach(lambda x: print(x))
    sort_by_key_rdd.saveAsTextFile(path="../datas/output/sort/sort-2")
  • 两个算子区别- sortBy:可以指定按照谁排序,不限制RDD的结构- sortBykey:只能按照Key排序,只有KV类型的RDD才能用- 性能有没有区别:不是按照Key排序,性能没有区别,哪个方便哪个来,sortBy底层调用的就是sortByKey,自动根据指定参数,将参数作为Key
  • 小结:sortBy和sortByKey的功能场景是什么?- 功能:实现对RDD所有元素的排序- 场景:大数据量全局或者局部排序- 特点:经过shuffle- 关联:本质上sortBy调用的还是sortByKey- 区别:sortBy可以指定按照谁排序,sortByKey只能按照Key排序

三、TopN 算子

**

top

**

  • 功能:返回 RDD 中的前 N 个最大元素。默认是按照自然顺序(对于数字是从大到小),可以通过自定义比较函数来改变顺序。
  • 分类:触发算子
  • 场景:取RDD数据中的最大的TopN个元素
  • 特点:不经过Shuffle,将所有元素放入Driver内存中排序,性能更好,只能适合处理小数据量
  • 语法
  • - def top(self, num) -> List[0]
  • 需求:取RDD中最大的前5个元素
  • 示例
     rdd = sc.parallelize([3, 1, 4, 1, 5, 9, 2, 6])
     top_elements = rdd.top(3)
     for num in top_elements:
         print(num)

numRdd = sc.parallelize([1,5,2,6,9,10,4,3,8,7])
top_list = numRdd.top(5)
print(*top_list)

takeOrdered算子

  • 功能:对RDD中的所有元素升序排序,并返回前N个元素,即返回RDD中最小的前N个元数据
  • 分类:触发算子
  • 场景:取RDD数据中的最小的TopN个元素
  • 特点:不经过Shuffle,将所有元素放入Driver内存中排序,只能适合处理小数据量
  • 语法
  • def takeOrdered(self,num) -> List[0]
#!/usr/bin/env python
# -*- coding: utf-8 -*-

from pyspark import SparkContext, SparkConf
import os

"""
-------------------------------------------------
   Description :    TODO:top 和 takeOrdered
   SourceFile  :    03.pyspark_core_fun_top_takeOrdered
   Author      :    Frank
   Date        :    2022/7/19
-------------------------------------------------
"""

if __name__ == '__main__':
    # todo:0-设置系统环境变量:全部换成Linux地址
    os.environ['JAVA_HOME'] = '/export/server/jdk'
    os.environ['HADOOP_HOME'] = '/export/server/hadoop'
    os.environ['PYSPARK_PYTHON'] = '/export/server/anaconda3/bin/python3'
    os.environ['PYSPARK_DRIVER_PYTHON'] = '/export/server/anaconda3/bin/python3'

    # todo:1-构建SparkContext
    conf = SparkConf().setMaster("local[2]").setAppName("Remote Test APP")
    sc = SparkContext(conf=conf)

    # todo:2-数据处理:读取、转换、保存
    # step1: 读取数据
    # 构建数据
    numRdd = sc.parallelize([1, 5, 2, 6, 9, 10, 4, 3, 8, 7])
    numRdd.foreach(lambda x: print(x))
    print("=======================================")

    # step2: 处理数据
    # top:将RDD中所有元素放入Driver内存中,对元素降序排序,取前N个
    top_list = numRdd.top(5)
    print(*top_list)

    print("=============")
    # takeOrdered:将RDD所有元素升序排序,取前N个值
    take_ordered_list = numRdd.takeOrdered(5)
    print(*take_ordered_list)

    # step3: 保存结果

    # todo:3-关闭SparkContext
    sc.stop()

四、重分区算子

  • 问题1:RDD1有10个分区,每个分区100万条数据,想加快处理速度,构建100个分区,每个分区处理10万条数据?- 需求:提高RDD的分区数
  • 问题2:RDD1有10个分区,每个分区100万条数据,经过聚合以后,只有10条数据了,希望都放入1个分区怎么做?- 需求:降低RDD的分区数

**

repartition

**

  • 功能:增加或减少 RDD 的分区数。它会通过网络混洗数据来重新分布分区。
  • 功能:调整RDD的分区个数
  • 分类:转换算子
  • 场景:一般用于调大分区个数【由小变大】,必须经过shuffle才能实现
  • 特点:必须经过Shuffle过程,repartition底层就是coalesce(shuffle=True)
  • 语法
  • def repartition(self,numPartitions) -> RDD[T]
  • 示例
     rdd = sc.parallelize([1, 2, 3, 4, 5], 2)
     repartitioned_rdd = rdd.repartition(3)
     print(repartitioned_rdd.getNumPartitions())

**

coalesce

**

  • 功能:用于减少 RDD 的分区数。如果要增加分区数并且希望进行数据混洗,可以设置shuffle = True。与repartition不同,coalesce默认不会进行数据混洗,效率更高,适用于减少分区的情况。
  • 示例
     rdd = sc.parallelize([1, 2, 3, 4, 5], 3)
     coalesced_rdd = rdd.coalesce(2)
     print(coalesced_rdd.getNumPartitions())

五、连接算子

**

join

**

  • 功能:对两个键值对形式的 RDD 进行连接操作,返回一个新的 RDD,其中包含具有相同键的键值对元组。
  • 示例
     rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
     rdd2 = sc.parallelize([(1, 'x'), (2, 'y')])
     joined_rdd = rdd1.join(rdd2)
     result = joined_rdd.collect()
     for key, (value1, value2) in result:
         print(f"Key: {key}, Value1: {value1}, Value2: {value2}")

**

leftOuterJoin

**

  • 功能:类似于join操作,但对于左 RDD 中的每个键,即使右 RDD 中没有对应的键,也会包含在结果中,右值用None填充。
  • 示例
     rdd1 = sc.parallelize([(1, 'a'), (2, 'b')])
     rdd2 = sc.parallelize([(1, 'x')])
     left_joined_rdd = rdd1.leftOuterJoin(rdd2)
     result = left_joined_rdd.collect()
     for key, (value1, value2) in result:
         print(f"Key: {key}, Value1: {value1}, Value2: {value2 if value2 else None}")

**

rightOuterJoin

**

  • 功能:与leftOuterJoin相反,对于右 RDD 中的每个键,即使左 RDD 中没有对应的键,也会包含在结果中,左值用None填充。
  • 示例
     rdd1 = sc.parallelize([(1, 'a')])
     rdd2 = sc.parallelize([(1, 'x'), (2, 'y')])
     right_joined_rdd = rdd2.rightOuterJoin(rdd1)
     result = right_joined_rdd.collect()
     for key, (value2, value1) in result:
         print(f"Key: {key}, Value1: {value1 if value1 else None}, Value2: {value2}")

**

fullOuterJoin

**

  • 功能:对两个 RDD 进行全外连接,结果包含两个 RDD 中所有的键,对于没有匹配的键,对应的的值用None填充。
  • 示例
     rdd1 = sc.parallelize([(1, 'a')])
     rdd2 = sc.parallelize([(2, 'b')])
     full_joined_rdd = rdd1.fullOuterJoin(rdd2)
     result = full_joined_rdd.collect()
     for key, (value1, value2) in result:
         print(f"Key: {key}, Value1: {value1 if value1 else None}, Value2: {value2 if value2 else None}")

综合案例:

# encoding=utf-8
# 连接与合并

from pyspark import  SparkContext

sc = SparkContext()

# 导入数据
rdd_data = sc.textFile('hdfs://node1:8020/test/test.txt')
rdd_data1 = sc.textFile('hdfs://node1:8020/test/test100.txt')
'''
连接是以key值取当做的连接条件 
'''
# 切割
rdd_data_split =rdd_data.map(lambda x:x.split(','))
rdd_data_split1 =rdd_data1.map(lambda x:x.split(','))
#分组
rdd_data_split_tup =rdd_data_split.map(lambda x:(x[0],x))
rdd_data_split_tup1 =rdd_data_split1.map(lambda x:(x[0],x))

#内连接
rdd_join = rdd_data_split_tup.join(rdd_data_split_tup1)
#左连接
rdd_leftjoin = rdd_data_split_tup.leftOuterJoin(rdd_data_split_tup1)
#右连接
rdd_rightjoin = rdd_data_split_tup.rightOuterJoin(rdd_data_split_tup1)
#全连接
rdd_fulljoin = rdd_data_split_tup.fullOuterJoin(rdd_data_split_tup1)

#合并
rdd_unionall = rdd_data1.union(rdd_data1)
#合并并去重
rdd_union = rdd_data.union(rdd_data1).distinct()

print(rdd_data.collect())
print('-'*50)
print(rdd_data1.collect())
print('-'*50,'内连接')
print(rdd_join.collect())
print('-'*50,'左连接')
print(rdd_leftjoin.collect())
print('-'*50,'右连接')
print(rdd_rightjoin.collect())
print('-'*50,'全连接')
print(rdd_fulljoin.collect())
print('-'*50,'纵向合并')
print(rdd_unionall.collect())
print('-'*50,'纵向合并并去重')
print(rdd_union.collect())
  • 小结- repartition的功能场景是什么?- 功能:调整分区个数- 场景:一般用于增大分区个数- 特点:一定会经过Shuffle- coalesce的功能场景是什么?- 功能:调整分区个数- 场景:一般用于降低分区个数- 特点:可以自由选择是否经过shuffle过程

六、合并算子

**

union

**

  • 功能:将两个 RDD 合并成一个新的 RDD,不去重。
  • 示例
     rdd1 = sc.parallelize([1, 2, 3])
     rdd2 = sc.parallelize([3, 4, 5])
     union_rdd = rdd1.union(rdd2)
     result = union_rdd.collect()
     for num in result:
         print(num)

如果去重则用 distinct

**

intersection

**

  • 功能:返回两个 RDD 的交集,即同时存在于两个 RDD 中的元素组成的 RDD。
  • 示例
     rdd1 = sc.parallelize([1, 2, 3])
     rdd2 = sc.parallelize([3, 4, 5])
     intersection_rdd = rdd1.intersection(rdd2)
     result = intersection_rdd.collect()
     for num in result:
         print(num)

**

subtract

**

  • 功能:返回在第一个 RDD 中存在,但在第二个 RDD 中不存在的元素组成的 RDD。
  • 示例
     rdd1 = sc.parallelize([1, 2, 3])
     rdd2 = sc.parallelize([3, 4, 5])
     subtract_rdd = rdd1.subtract(rdd2)
     result = subtract_rdd.collect()
     for num in result:
         print(num)

总结

  • 分组聚合算子可以使用 groupby +mapvalue算子
  • 排序算子可以使用sortBY 或者sortByKey
  • topN算子可以用top
  • 重新分区算子可以用repartition或者coalesce算子增加或者降低分区
  • 可以使用不同的join实现sql中的 内连接,左连接,右连接,全外连接
  • 可以使用union纵向合并
  • 可以使用intersection求交集
  • 可以使用subtract求第一个元素中存在,第二个元素中不存在的数据

本文转载自: https://blog.csdn.net/qq_55006020/article/details/142769852
版权归原作者 凡梦_leo 所有, 如有侵权,请联系我们删除。

“Spark之RDD,常用的分析算子大全 分组聚合,排序,重分区,连接合并等算子”的评论:

还没有评论