0


Spark-RDD-常用算子(方法)详解

一、Spark-概述-链接

在这里插入图片描述

二、 Spark-RDD概述-链接

在这里插入图片描述

三、RDD-算子(operator)

Spark RDD 提供了丰富的方法来对数据进行转换和操作。对 RDD(

Resilient Distributed Dataset

)的操作可以分为两大类:**转换算子(

Transformations

)和行动算子(

Actions

)。**

在这里插入图片描述

大家可以看专栏对RDD的介绍,RDD的惰性计算就体现在转换算子与行动算子。

  • 转换算子不会立即触发操作,只是记录计算逻辑。
  • 行动算子会触发操作,并回返回结果。

1.Transformation-转换算子

  • 转换算子是指对现有的RDD进行某种操作,生成一个或多个新的RDD,但原有的RDD保持不变。
  • 这些操作是惰性的,只有在遇到行动算子时才会真正被执行。
  • 转换算子分为两种:在这里插入图片描述

**

Narrow

** Transformation:

  • 窄转换是指一个父RDD的每个分区都可以被一个子RDD的一个分区所使用。
  • 换句话说,窄转换在数据处理过程中,数据分区之间没有“shuffle”操作,即数据不需要在不同分区之间进行重新分配。
  • 这种转换相对高效,因为它不涉及数据的移动和重分布,Spark可以对其并行处理。在这里插入图片描述

**

Wide

** transformation:

  • 宽转换是指一个父RDD的某个分区的数据需要被多个子RDD的不同分区所使用。
  • 通常这意味着会涉及到数据的“shuffle”过程,即数据的重新分布。
  • 由于需要在不同的节点之间移动数据,宽转换的执行效率相对低于窄转换。在这里插入图片描述

**

Narrow

** Transformation

(1)

map

对RDD每个元素应用一个函数,并返回一个新的RDD,其中包含应用函数后的结果.

在这里插入图片描述

JavaRDD<T>map(Function<T,R> f)
  • Function<T, R>一个函数接口,定义了一个输入类型为T,输出类型为R的函数。
packagecom.yushifu.spark.rdd.func;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.SparkConf;importjava.util.Arrays;importjava.util.List;importorg.apache.spark.api.java.function.Function;publicclassMapExample{publicstaticvoidmain(String[] args){// 创建SparkConf对象SparkConf conf =newSparkConf().setAppName("MapExample").setMaster("local");// 创建JavaSparkContext对象JavaSparkContext sc =newJavaSparkContext(conf);// 创建一个包含整数的RDDJavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));// 使用map方法对RDD中的每个元素进行平方操作JavaRDD<Integer> squaredRDD = rdd.map(newFunction<Integer,Integer>(){@OverridepublicIntegercall(Integer x)throwsException{return x * x;}});// 收集并打印转换后的RDDList<Integer> squaredList = squaredRDD.collect();for(Integer num : squaredList){System.out.println(num);}// 关闭SparkContext
        sc.close();}}

在这里插入图片描述

(2)

filter
  • 对 RDD 中的每个元素应用给定的函数 func。

在这里插入图片描述

  • 如果函数 func 返回 true,则将该元素保留在结果 RDD 中;否则,将其过滤掉。在这里插入图片描述
JavaRDD<T>filter(Function<T,Boolean> f)
  • JavaRDD:返回一个新的 JavaRDD,其中包含满足条件的元素。
  • Function<T, Boolean>:用于对 RDD 中的每个元素进行评估的函数接口。这里的 T 表示 RDD 中元素的类型。该函数接受一个参数,代表 RDD 中的一个元素,返回一个布尔值,表示是否保留该元素。
packagecom.yushifu.spark.rdd.func;importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;publicclassRDDFilterExample{publicstaticvoidmain(String[] args){// 创建 SparkConf 并设置应用名称SparkConf conf =newSparkConf().setAppName("RDD Filter Example").setMaster("local[*]");// 创建 JavaSparkContext,它是 Spark 功能的入口点JavaSparkContext sc =newJavaSparkContext(conf);// 创建一个包含整数的 RDDList<Integer> data =Arrays.asList(1,2,3,4,5,6,7,8,9,10);JavaRDD<Integer> numbersRDD = sc.parallelize(data);// 使用 filter 方法过滤出所有的偶数JavaRDD<Integer> evenNumbersRDD = numbersRDD.filter(newEvenFilter());// 打印过滤后的结果List<Integer> evenNumbers = evenNumbersRDD.collect();for(Integer num : evenNumbers){System.out.println(num);}// 停止 JavaSparkContext
        sc.stop();}// 自定义函数类,用于判断整数是否为偶数staticclassEvenFilterimplementsorg.apache.spark.api.java.function.Function<Integer,Boolean>{publicBooleancall(Integer num){return num %2==0;}}}

在这里插入图片描述

  • filter() 方法只能应用于包含元素的 RDD(JavaRDD),如果RDD 是键值对形式的,应该使用 filterByKey 或 filterValues 方法。
  • 函数 f 的评估是延迟的(lazy evaluation),只有在执行操作动作时才会真正进行计算。
  • filter() 不会改变原始 RDD,而是返回一个新的 RDD,其中包含满足过滤条件的。

在这里插入图片描述


(3)

flatMap

对 RDD 中的每个元素应用函数 func,并将结果扁平化为一个新的 RDD

在这里插入图片描述

**

flatMap

方法和

map

方法类似,不同之处在于

flatMap

会将每个输入元素映射为一个或多个输出元素。**

**通过

flatMap

方法,可以方便地处理需要将一个元素映射为多个元素的情况,比如将一行文本拆分为单词、将一组数据展开为多个数据等。**
在这里插入图片描述

**具体来说,

flatMap

会对输入RDD中的每个元素应用一个函数,然后将所有函数返回的元素合并成一个新的RDD。**

在这里插入图片描述

JavaRDD<T>flatMap(Function<T,Iterator<R>> f)
  • Function<T, Iterator<R>> f: 表示接受类型为T的输入元素,返回一个Iterator<R>(迭代器)的函数。在这里插入图片描述在这里插入图片描述

将字符串RDD每个字符串按空格分割,并返回分割后的单词:

// 创建JavaSparkContextJavaSparkContext sc =newJavaSparkContext(newSparkConf().setAppName("FlatMapExample").setMaster("local"));// 创建一个包含字符串的RDDJavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World","Spark FlatMap Example","Java Programming"));// 使用flatMap对每个字符串按空格分割JavaRDD<String> wordsRDD = rdd.flatMap(newFlatMapFunction<String,String>(){@OverridepublicIterator<String>call(String s)throwsException{returnArrays.asList(s.split(" ")).iterator();}});// 输出结果System.out.println(wordsRDD.collect());// 关闭JavaSparkContext
sc.close();

最终输出的结果是所有单词的集合

["Hello", "World", "Spark", "FlatMap", "Example", "Java", "Programming"]


(4)

mapPartition
map

mapPartitions

主要区别:

处理方式不同:

  • map: 对每个元素单独处理,传入一个元素并返回一个结果。
  • mapPartitions: 对每个分区内的所有元素批量处理,传入一个分区的迭代器并返回一个新的迭代器。

输出结果不同:

  • map: 输出的元素数量与输入相同,逐个映射。
  • mapPartitions: 输出的元素数量可以不同,可以返回多个元素或零个元素。

性能不同:

  • map: 对每个元素调用处理函数,可能会增加开销。
  • mapPartitions: 减少函数调用的开销,适合需要批量处理的场景。

在这里插入图片描述

输入参数:

mapPartitions

接收一个

Iterator<T>

,表示当前分区中的元素。
返回值: 返回一个新的

Iterator<U>

,其中

U

是转换后的元素类型。

JavaSparkContext sc =newJavaSparkContext(newSparkConf().setAppName("MapPartitionsExample").setMaster("local"));// 创建一个包含整数的 RDDJavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));// 使用 mapPartitions 处理每个分区JavaRDD<Integer> resultRDD = rdd.mapPartitions(iterator ->{List<Integer> results =newArrayList<>();// 执行一些操作,例如平方while(iterator.hasNext()){Integer value = iterator.next();
        results.add(value * value);}return results.iterator();});// 输出结果System.out.println(resultRDD.collect());// 关闭 JavaSparkContext
sc.close();
  • 批处理: 在 mapPartitions 中,可以对每个分区中的数据进行批量处理,减少了重复的初始化开销。例如,可以在处理每个分区时打开和关闭数据库连接。
  • 性能优化: 使用 mapPartitions 可以减少函数调用的频率,因此在某些场景下,比 map 更高效。
  • 适用场景: 适合需要对分区内数据进行复杂计算或需要维护状态的场景,比如读取外部数据源、复杂的数据转换等。

(5)

sample

在这里插入图片描述

JavaRDD<T>sample(boolean withReplacement,double fraction,long seed);
  • withReplacement:- true: 允许重复抽样。- false: 不允许重复抽样。
  • fraction:- 表示要抽取的比例,例如 0.1 表示从每个分区中抽取 10% 的数据。
  • seed:- 随机数生成的种子,可以用于控制随机性,确保可重复性。
importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.SparkConf;importjava.util.List;publicclassSampleExample{publicstaticvoidmain(String[] args){// 创建 Spark 配置和上下文SparkConf conf =newSparkConf().setAppName("Sample Example").setMaster("local");JavaSparkContext sc =newJavaSparkContext(conf);// 创建一个包含数字 1 到 100 的 RDDJavaRDD<Integer> rdd = sc.parallelize(java.util.Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100));// 抽取 20% 的样本,不允许重复JavaRDD<Integer> sampleRDD = rdd.sample(false,0.2);// 收集并打印结果List<Integer> sampleResult = sampleRDD.collect();System.out.println("Sampled Elements: "+ sampleResult);// 关闭上下文
        sc.close();}}
  • 非确定性: 每次执行 sample 可能会得到不同的结果,因为抽样过程是随机的。
  • 分区级别: 抽样是在每个分区内独立进行的,因此不同分区的抽样结果可能会不同。
  • 数据预处理: 在大数据集上进行分析或建模之前,可以使用 sample 来获取一个可管理的小数据集。
  • 测试和验证: 在开发阶段,可以对小样本进行测试,以减少计算资源的消耗。

(6)

Union
  • union 算子用于将两个或多个 RDD 合并成一个新的 RDD
  • 合并时,所有输入 RDD 的元素都会被包含在结果中。
  • union 不会去除重复元素,如果两个 RDD 中有相同的元素,结果 RDD 中也会包含这些重复的元素。
  • 有多个数据集(例如来自不同来源的数据),并希望将它们组合在一起进行分析时。
  • 需要将同一类型的不同 RDD 合并,方便后续的数据处理和分析。

在这里插入图片描述

JavaRDD<T>union(JavaRDD<T> other);
  • other 是要与当前 RDD 合并的另一个 RDD。
importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.SparkConf;importjava.util.Arrays;importjava.util.List;publicclassUnionExample{publicstaticvoidmain(String[] args){// 创建 Spark 配置和上下文SparkConf conf =newSparkConf().setAppName("Union Example").setMaster("local");JavaSparkContext sc =newJavaSparkContext(conf);// 创建两个 RDDJavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4,5));JavaRDD<Integer> rdd2 = sc.parallelize(Arrays.asList(4,5,6,7,8));// 使用 union 算子合并两个 RDDJavaRDD<Integer> unionRDD = rdd1.union(rdd2);// 收集并打印结果List<Integer> unionResult = unionRDD.collect();System.out.println("Union of RDDs: "+ unionResult);// 关闭上下文
        sc.close();}}
  • 重复元素: 结果 RDD 会包含所有输入 RDD 的元素,包括重复元素。如果需要去重,可以在后面调用 distinct() 方法:JavaRDD<Integer> distinctRDD = unionRDD.distinct();
  • 性能考虑: 在处理大数据集时,union 可能会影响性能,因为需要将所有数据重新组合。

(7)

reduceByKey

将相同键的值合并,最终生成一个新的 RDD
在这里插入图片描述
工作原理:

  1. 局部聚合:首先在每个分区内对相同键的值进行局部聚合。只在本地数据上进行合并,减少了需要传输到其他节点的数据量。

在这里插入图片描述

  1. 全局聚合:完成局部聚合后,Spark 会对每个分区的结果进行全局合并,生成最终的结果。
JavaPairRDD<K,V>reduceByKey(Function<V,V> reduceFunc)
  • K:键的类型
  • V:值的类型
  • reduceFunc:一个函数,定义了如何合并两个值(相同键的值)。
importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaSparkContext;importscala.Tuple2;importjava.util.Arrays;importjava.util.List;publicclassReduceByKeyExample{publicstaticvoidmain(String[] args){SparkConf conf =newSparkConf().setAppName("ReduceByKey Example").setMaster("local");JavaSparkContext sc =newJavaSparkContext(conf);List<Tuple2<String,Integer>> data =Arrays.asList(newTuple2<>("A",1),newTuple2<>("B",2),newTuple2<>("A",3),newTuple2<>("B",1));JavaPairRDD<String,Integer> rdd = sc.parallelizePairs(data);// 使用 reduceByKey 进行聚合JavaPairRDD<String,Integer> reducedRDD = rdd.reduceByKey((x, y)-> x + y);// 收集并打印结果
        reducedRDD.collect().forEach(tuple ->System.out.println("Key: "+ tuple._1()+", Value: "+ tuple._2()));

        sc.close();}}
Key: A, Value: 4
Key: B, Value: 3
  • 由于局部聚合减少了网络传输,reduceByKey 通常比 groupByKey 更高效。

(8)

mapValues

在不改变键的情况下,仅对每个键对应的值应用一个函数,从而生成一个新的键值对 RDD。
在这里插入图片描述

生成一个新的键值对 RDD,键不变,只有值发生了变化。
在这里插入图片描述

  • 由于只对值进行转换,mapValues() 在某些情况下比 map() 更高效,避免了重新创建键。
JavaPairRDD<K, V2>mapValues(Function<V, V2> f)

f:一个函数,接受键值对 RDD 中的值作为输入,并返回一个新的值
其中 V 是原始键值对 RDD 中的值的类型,而 V2 是新值的类型。

eg:对 RDD 包含了学生姓名和对应的成绩,将成绩加上 10 分:

JavaPairRDD<String,Integer> studentScores = sc.parallelizePairs(Arrays.asList(newTuple2<>("zs",80),newTuple2<>("ls",75),newTuple2<>("yushifu",90)));JavaPairRDD<String,Integer> adjustedScores = studentScores.mapValues(score -> score +10);

每个成绩都增加了 10 分,但是学生姓名保持不变。


**

Wide

** Transformation

(1)

distinct

从一个数据集中去除重复的元素,生成一个只包含唯一元素的新数据集

在这里插入图片描述

  • distinct() 方法在确定 RDD 中的唯一元素依赖于对象的 hashCode()equals() 方法。
  • Spark 会使用这些方法来判断两个对象是否相等,从而识别出唯一的元素。
  • 确保自定义对象正确实现这两个方法是很重要的。

tips

  • 在执行 distinct 时,Spark 首先会在每个分区内部执行去重操作,以减少数据量。

在这里插入图片描述

  • 接着,将每个分区的结果进行 shuffle,将相同的元素聚集到一起

资源消耗

  • 由于 distinct 需要全局比较,处理大规模数据集时会消耗大量的内存和计算资源。
  • Shuffle 是性能的主要瓶颈之一。为减少 shuffle 过程,可以考虑预先过滤数据或使用其他操作,如 reduceByKeyaggregateByKey,在减少数据量的同时实现类似的去重效果。
  • 可以通过合理设置分区数来提高性能。增加分区数可以并行处理更多数据,但也可能导致过多的 shuffle,反之则可能导致单个节点负担过重。
data =[1,2,2,3,4,4,4,5]
rdd = sc.parallelize(data)
distinct_rdd = rdd.distinct()print(distinct_rdd.collect())# 输出: [1, 2, 3, 4, 5]

(2)

groupBy

**

groupBy

算子用于根据指定的键对数据进行分组**。在这里插入图片描述

  • 功能: groupBy 会将数据集中的元素按指定的键进行分组,并将具有相同键的元素聚集在一起。
  • 返回值: 返回一个新的 RDD,其中每个元素是一个键值对,键是分组的依据,值是一个包含所有具有相同键的元素的集合(通常是一个可迭代的集合)。

在这里插入图片描述

  • 数据聚合,比如计算每个组的总和、平均值等。
  • 在进行数据分析时,将数据根据某些属性分类处理。
  • 语法
JavaPairRDD<K,Iterable<V>>groupBy(Function<T,K> f)
  • Function<T, K> f: 表示接受类型为T的输入元素,返回类型为K的键值的函数。
// 创建JavaSparkContextJavaSparkContext sc =newJavaSparkContext(newSparkConf().setAppName("GroupByExample").setMaster("local"));// 创建一个包含整数的RDDJavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5,6));// 根据元素的奇偶性进行分组JavaPairRDD<String,Iterable<Integer>> groupedRDD = rdd.groupBy(newFunction<Integer,String>(){@OverridepublicStringcall(Integer v1)throwsException{return(v1 %2==0)?"even":"odd";}});// 输出分组结果
groupedRDD.collectAsMap().forEach((k, v)->System.out.println(k +": "+ v));// 关闭JavaSparkContext
sc.close();

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述


(3)

join

将两个 RDD(或 DataFrame)根据某个共同的键进行连接。

join操作非常强大,可以用于一对一的连接(如

map

reduce

)或一对多的连接。

importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaSparkContext;importscala.Tuple2;importjava.util.Arrays;importjava.util.List;publicclassJoinExample{publicstaticvoidmain(String[] args){// 创建 Spark 配置和上下文SparkConf conf =newSparkConf().setAppName("Join Example").setMaster("local");JavaSparkContext sc =newJavaSparkContext(conf);// 创建两个 RDDList<Tuple2<Integer,String>> data1 =Arrays.asList(newTuple2<>(1,"Alice"),newTuple2<>(2,"Bob"),newTuple2<>(3,"Cathy"));List<Tuple2<Integer,String>> data2 =Arrays.asList(newTuple2<>(1,"Math"),newTuple2<>(2,"Science"),newTuple2<>(4,"History"));JavaPairRDD<Integer,String> rdd1 = sc.parallelizePairs(data1);JavaPairRDD<Integer,String> rdd2 = sc.parallelizePairs(data2);// 使用 join 操作JavaPairRDD<Integer,Tuple2<String,String>> joinedRDD = rdd1.join(rdd2);// 收集并打印结果
        joinedRDD.collect().forEach(tuple ->System.out.println("Key: "+ tuple._1()+", Value: "+ tuple._2()));// 关闭上下文
        sc.close();}}
  • 全局洗牌: join 会导致全局数据洗牌,因此在处理大数据集时要注意性能。
  • 数据倾斜: 如果某些键的值远远超过其他键,可能会导致性能问题。

除了

join

之外,Spark 还支持其他类型的连接:

在这里插入图片描述

  • leftOuterJoin: 返回左侧 RDD 的所有记录,以及右侧 RDD 匹配的记录(如果没有匹配,则返回 null)。
  • rightOuterJoin: 返回右侧 RDD 的所有记录,以及左侧 RDD 匹配的记录。
  • fullOuterJoin: 返回两个 RDD 的所有记录,未匹配的记录将返回 null。

(4)

repartition

coalesce

Apache Spark — Repartition 与 Coalesce(调整数据集分区)

(5)

groupByKey

将键值对 RDD 中相同键的值聚集到一起

在这里插入图片描述

类似于数据库中的

GROUP BY

操作,主要用于需要对相同键的值进行分组的场景。

  1. 数据结构groupByKey 操作的输入是一个键值对 RDD,输出是一个新的 RDD,其中每个键对应一个可迭代的值集合(如 Iterable)。
  2. 全局洗牌groupByKey 会在计算过程中导致全局洗牌,相同键的值可能会被分配到不同的分区,从而可能增加数据传输的开销
JavaPairRDD<K,Iterable<V>>groupByKey()
  • K:键的类型
  • V:值的类型
importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaSparkContext;importscala.Tuple2;importjava.util.Arrays;importjava.util.List;publicclassGroupByKeyExample{publicstaticvoidmain(String[] args){SparkConf conf =newSparkConf().setAppName("GroupByKey Example").setMaster("local");JavaSparkContext sc =newJavaSparkContext(conf);List<Tuple2<String,Integer>> data =Arrays.asList(newTuple2<>("A",1),newTuple2<>("B",2),newTuple2<>("A",3),newTuple2<>("B",1));JavaPairRDD<String,Integer> rdd = sc.parallelizePairs(data);// 使用 groupByKey 进行分组JavaPairRDD<String,Iterable<Integer>> groupedRDD = rdd.groupByKey();// 收集并打印结果
        groupedRDD.collect().forEach(tuple ->System.out.println("Key: "+ tuple._1()+", Values: "+ tuple._2()));

        sc.close();}}
Key: A, Values: [1, 3]
Key: B, Values: [2, 1]

在这里插入图片描述

  • reduceByKey和groupByKey区别

(1)reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[K,V]。

(2)groupByKey:按照key进行分组,直接进行shuffle。

在不影响业务逻辑的前提下,优先选用reduceByKey。求和操作不影响业务逻辑,求平均值影响业务逻辑。影响业务逻辑时建议先对数据类型进行转换再合并。


(6)

sortBy

对 RDD 中的数据进行排序,可以按照指定的键或值进行升序或降序排序。
在这里插入图片描述
sortBy() 操作是一个转换操作,它不会立即执行排序,而是在遇到动作操作时才会执行排序。

  1. 输入和输出sortBy 可以应用于任意类型的 RDD,输出是一个新的 RDD,按照指定的排序条件排列。
  2. 全局洗牌sortBy 通常会导致全局洗牌,因为需要将所有数据分配到合适的分区,以便按照顺序排列。
JavaRDD<T>sortBy(Function<T,K> keyfunc,boolean ascending,int numPartitions)
  • keyfunc:一个函数,用于提取排序的关键字。
  • ascending:布尔值,指示是否升序排列(true)或降序排列(false)。
  • numPartitions:分区数,决定结果 RDD 的分区数量。
importorg.apache.spark.SparkConf;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Arrays;importjava.util.List;publicclassSortByExample{publicstaticvoidmain(String[] args){SparkConf conf =newSparkConf().setAppName("SortBy Example").setMaster("local");JavaSparkContext sc =newJavaSparkContext(conf);List<Integer> data =Arrays.asList(5,3,8,1,2);JavaRDD<Integer> rdd = sc.parallelize(data);// 使用 sortBy 进行排序JavaRDD<Integer> sortedRDD = rdd.sortBy(num -> num,true,1);// 收集并打印结果
        sortedRDD.collect().forEach(System.out::println);

        sc.close();}}
1
2
3
5
8

性能问题:排序可能会消耗大量内存和计算资源,尤其是在处理大数据集时:排序会导致全局洗牌,可能会影响性能,尤其在集群环境中。



按键分组: groupByKey() 方法根据键对 RDD 进行分组,将具有相同键的所有值聚合到一个 Iterable 中。

键的保持: 生成的新 RDD 中,键保持不变,而与之对应的值是一个 Iterable,其中包含具有相同键的所有原始值。

惰性计算: 与大多数 Spark 转换操作一样,groupByKey() 是惰性的,只有在遇到动作操作时才会执行。

  • 示例

假设有一个键值对 RDD 包含了学生姓名和对应的科目成绩,我们想要按学生姓名将成绩进行分组:

JavaPairRDD<String,Integer> studentScores = sc.parallelizePairs(Arrays.asList(newTuple2<>("Alice",80),newTuple2<>("Bob",75),newTuple2<>("Alice",90),newTuple2<>("Charlie",85)));JavaPairRDD<String,Iterable<Integer>> groupedScores = studentScores.groupByKey();

在这个示例中,groupByKey() 方法被用于按学生姓名将成绩进行分组,生成了一个新的键值对 RDD groupedScores,其中每个键是学生姓名,对应的值是一个 Iterable,包含该学生所有的成绩。

  • 注意事项

(1)groupByKey() 会将具有相同键的所有值都聚合到一个 Iterable 中,这可能导致内存使用问题,特别是当某些键对应的值很多时。

(2)在使用 groupByKey() 时,应该考虑数据分布是否均匀,以避免某些键对应的值过多而导致性能问题。

(3)在大多数情况下,应该优先使用 reduceByKey()、aggregateByKey() 或 combineByKey() 等更高效的聚合操作来替代 groupByKey()。


(7)

sortByKey

根据键(Key)的自然顺序或者自定义的排序规则对RDD中的键值对进行排序,并返回一个新的排序后的RDD。
在这里插入图片描述

JavaPairRDD<K,V>sortByKey(boolean ascending,int numPartitions)
ascending

:一个布尔值,指示排序顺序,true表示升序,false表示降序。

numPartitions

:(可选)指定结果RDD的分区数。

(1)分区数据:如果指定了numPartitions,Spark将根据该值对数据进行分区,以便并行处理。

(2)按键排序:对RDD中的键值对按照键进行排序。如果ascending为true,则按照升序排列,如果为false,则按照降序排列。

(3)合并排序:在排序完成后,Spark将各个分区的排序结果进行合并排序,从而得到全局排序的结果。

importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importscala.Tuple2;importjava.util.Arrays;importjava.util.List;publicclassSortByKeyExample{publicstaticvoidmain(String[] args){// 创建 SparkContextJavaSparkContext sc =newJavaSparkContext("local","sortByKeyExample");// 创建一个包含键值对的 JavaPairRDDList<Tuple2<Integer,String>> data =Arrays.asList(newTuple2<>(5,"E"),newTuple2<>(3,"C"),newTuple2<>(1,"A"),newTuple2<>(4,"D"),newTuple2<>(2,"B"));JavaPairRDD<Integer,String> pairRDD = sc.parallelizePairs(data);// 使用 sortByKey() 方法对 Pair RDD 中的键进行升序排序JavaPairRDD<Integer,String> sortedPairRDD = pairRDD.sortByKey(true,1);// 输出排序后的结果List<Tuple2<Integer,String>> sortedData = sortedPairRDD.collect();System.out.println("Sorted data: "+ sortedData);// 关闭 SparkContext
        sc.stop();}}

对一个包含整数作为键的Pair RDD进行升序排序。

(1)与sortBy()方法不同,sortByKey()方法只能用于键值对RDD(Pair RDD)。

(2)默认情况下,sortByKey()方法使用键的自然顺序进行排序。如果需要自定义排序规则,可以使用sortByKey(Comparator comp, boolean ascending)方法。

(3)如果不需要指定结果RDD的分区数,可以使用sortByKey(boolean ascending)的重载方法。


2.Action-行动算子

行动算子是指对RDD执行计算并返回结果到驱动程序(Driver Program)中,触发实际的计算过程。

在这里插入图片描述

行动算子会触发Spark作业的执行。

(1)

collect

将RDD中的数据收集到驱动节点(Driver Node)上,并以数组的形式返回给驱动程序(Driver Program)。

在这里插入图片描述

List<T>collect()

(1)驱动程序请求:当调用collect()方法时,驱动程序会发送请求到集群上的各个执行节点(Executor Nodes)。

(2)数据收集:各个执行节点上的数据被收集到驱动程序所在的节点上。

(3)数据组装:收集到的数据被组装成一个数组,并返回给调用collect()方法的程序。

importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.List;publicclassCollectExample{publicstaticvoidmain(String[] args){// 创建 SparkContextJavaSparkContext sc =newJavaSparkContext("local","collectExample");// 创建一个包含整数的RDDJavaRDD<Integer> rdd = sc.parallelize(List.of(1,2,3,4,5));// 使用 collect() 方法将数据收集到驱动节点上List<Integer> collectedData = rdd.collect();// 输出收集到的数据System.out.println("Collected data: "+ collectedData);// 关闭 SparkContext
        sc.stop();}}
  • collect操作会触发Spark的计算,在大规模数据集上使用时需要谨慎,因为它会将所有数据都传输到驱动节点,可能导致内存不足或性能问题。

在调试和小规模数据集上进行试验时,collect()是一个有用的工具,但在生产环境中,应该避免在大规模数据集上使用它。
在这里插入图片描述


(2)

count

计算RDD中元素的数量,触发Spark的执行,并返回RDD中元素的总数。

longcount()

(1)分布式计算:调用count()方法时,Spark会在集群的各个执行节点上并行地计算RDD中元素的数量。

(2)局部计数:每个执行节点上的局部计数结果会被发送到驱动节点。

(3)总计数:驱动节点将收到的局部计数结果相加,得到RDD中元素的总数,并返回给调用count()方法的程序。

importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;publicclassCountExample{publicstaticvoidmain(String[] args){// 创建 SparkContextJavaSparkContext sc =newJavaSparkContext("local","countExample");// 创建一个包含整数的RDDJavaRDD<Integer> rdd = sc.parallelize(List.of(1,2,3,4,5));// 使用 count() 方法计算RDD中元素的数量long count = rdd.count();// 输出计数结果System.out.println("Count: "+ count);// 关闭 SparkContext
        sc.stop();}}

tips:

(1)count()操作会触发Spark的执行,并且需要遍历整个RDD来计算元素的数量。因此,在大规模数据集上使用时需要注意性能。

(2)如果RDD中的数据量非常大,count()操作可能会耗费较长的时间。

(3)在分布式计算环境下,count()操作是一个开销较大的动作,因为它需要协调各个执行节点上的计数结果。


(3)

first

在这里插入图片描述
获取RDD中的第一个元素,并返回RDD中的第一个元素。

Tfirst()

T表示RDD中元素的类型。

(1)分布式计算:当调用first()方法时,Spark会在集群的各个执行节点上并行地获取RDD中的数据。

(2)获取第一个元素:Spark会从RDD的分区中获取第一个元素,并返回给调用first()方法的程序。通常情况下,它会选择第一个分区中的第一个元素作为RDD的第一个元素。

(3)返回结果:获取到第一个元素后,Spark会将其返回给调用first()方法的程序。

importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;publicclassFirstExample{publicstaticvoidmain(String[] args){// 创建 SparkContextJavaSparkContext sc =newJavaSparkContext("local","firstExample");// 创建一个包含字符串的RDDJavaRDD<String> rdd = sc.parallelize(List.of("apple","banana","orange","grape"));// 使用 first() 方法获取RDD中的第一个元素String firstElement = rdd.first();// 输出第一个元素System.out.println("First element: "+ firstElement);// 关闭 SparkContext
        sc.stop();}}

(1)first()操作会触发Spark的执行,并且需要获取RDD的第一个元素。因此,在大规模数据集上使用时需要注意性能。

(2)如果RDD中没有元素,调用first()方法会抛出NoSuchElementException异常。因此,在调用first()方法之前,最好使用isEmpty()方法检查RDD是否为空。

(3)在分布式计算环境下,first()操作会选择第一个分区中的第一个元素作为RDD的第一个元素。因此,如果RDD的分区顺序发生变化,可能会导致不同的元素被选为第一个元素。


(4)

take

(n)

take(n)获取RDD中的前n个元素,将它们返回为一个数组
在这里插入图片描述

List<T>take(int n)

T表示RDD中元素的类型,n表示要获取的元素数量。

(1)分布式计算:当调用take(n)方法时,Spark会并行地在集群的各个执行节点上获取RDD中的元素。

(2)获取前n个元素:Spark会从RDD的分区中获取前n个元素,并将它们组合成一个数组。

(3)返回结果:获取到前n个元素后,Spark会将这些元素组成的数组返回给调用take(n)方法的程序。

importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.List;publicclassTakeExample{publicstaticvoidmain(String[] args){// 创建 SparkContextJavaSparkContext sc =newJavaSparkContext("local","takeExample");// 创建一个包含整数的RDDJavaRDD<Integer> rdd = sc.parallelize(List.of(1,2,3,4,5,6));// 使用 take(n) 方法获取RDD中的前三个元素List<Integer> elements = rdd.take(3);// 输出前三个元素System.out.println("First 3 elements: "+ elements);// 关闭 SparkContext
        sc.stop();}}

(1)take(n)操作会返回一个包含RDD中前n个元素的数组,这意味着它会拉取部分数据但不会触发完整的计算

(2)如果RDD中的元素不足n个,take(n)方法会返回RDD中所有的元素


(5)

countByKey
countByKey

统计每种key的个数,仅适用于RDD中的元素是键值对(key-value pairs)的情况,返回一个由键和对应计数值组成的Map。

在这里插入图片描述

Map<K,Long>countByKey()

K表示键的类型,Long表示计数的类型。该方法没有参数。

(1)分布式计算:当调用countByKey()方法时,Spark会并行地在集群的各个执行节点上对RDD中的键进行计数。

(2)统计键的出现次数:Spark会对RDD中的每个键进行计数,并将结果保存在一个Map中,其中键是RDD中的唯一键,值是该键在RDD中出现的次数。

(3)返回结果:计数完成后,Spark会将包含键和对应计数值的Map返回给调用countByKey()方法的程序。

importorg.apache.spark.api.java.JavaPairRDD;importorg.apache.spark.api.java.JavaSparkContext;importscala.Tuple2;importjava.util.Map;publicclassCountByKeyExample{publicstaticvoidmain(String[] args){// 创建 SparkContextJavaSparkContext sc =newJavaSparkContext("local","countByKeyExample");// 创建一个包含键值对的RDDJavaPairRDD<String,Integer> pairRDD = sc.parallelizePairs(List.of(newTuple2<>("a",1),newTuple2<>("b",2),newTuple2<>("a",3),newTuple2<>("c",1),newTuple2<>("b",2)));// 使用 countByKey() 方法对RDD中的键进行计数Map<String,Long> counts = pairRDD.countByKey();// 输出键的计数结果for(Map.Entry<String,Long> entry : counts.entrySet()){System.out.println(entry.getKey()+": "+ entry.getValue());}// 关闭 SparkContext
        sc.stop();}}

(1)countByKey()操作只能应用于RDD中的键值对元素。

(2)返回的结果是一个Map,其中包含了RDD中每个键以及对应的出现次数。

(3)由于countByKey()是一个动作操作,会触发完整的RDD计算,因此在处理大型数据集时需要谨慎使用。


(6)

takeSample

takeSample从RDD中随机抽取指定大小的样本。

可用于从大型数据集中提取一个较小的样本,以便进行测试、调试或快速预览数据。

List<T>takeSample(boolean withReplacement,int num,long seed)
withReplacement

:一个布尔值,指示是否允许有放回地抽样。如果设置为true,则在抽样时允许某个元素被抽取多次;如果设置为false,则每个元素只能被抽取一次。

num

:要抽取的样本大小。

seed

:可选的随机种子,用于确定抽样结果的随机性。如果不指定,则使用系统时间作为默认种子。

importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.List;publicclassTakeSampleExample{publicstaticvoidmain(String[] args){// 创建 SparkContextJavaSparkContext sc =newJavaSparkContext("local","takeSampleExample");// 创建一个包含整数的RDDJavaRDD<Integer> rdd = sc.parallelize(List.of(1,2,3,4,5,6,7,8,9,10));// 从RDD中随机抽取3个样本,不允许重复抽样List<Integer> sample = rdd.takeSample(false,3);// 输出抽样结果System.out.println("Sampled elements: "+ sample);// 关闭 SparkContext
        sc.stop();}}

(1)takeSample操作可以用于对大型数据集进行快速抽样,但需要注意样本大小与数据集大小之间的关系,以避免内存溢出或性能问题。

(2)如果数据集较大,建议在抽样前先进行一定的数据预处理或筛选,以确保抽样得到的样本具有代表性。

(3)可以通过指定不同的随机种子来获得不同的抽样结果,这对于调试和验证抽样的结果的随机性很有帮助。


(8)

foreach

()

同 下。
在这里插入图片描述

(9)

foreach

(func)

voidforeach(VoidFunction<T> f)

f:一个接受RDD中元素类型的函数接口,通常是一个匿名函数或lambda表达式,用于指定要在每个元素上执行的操作。

(1)并行迭代:Spark会将RDD中的元素分配到集群中的不同节点上进行并行处理。

(2)应用操作:对于每个分区中的元素,Spark会调用指定的函数来执行所需的操作。

(3)执行副作用:如果指定的函数产生了副作用(如写入外部系统、更新共享状态等),则这些副作用会在执行期间被触发。

importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importorg.apache.spark.api.java.function.VoidFunction;publicclassForeachExample{publicstaticvoidmain(String[] args){// 创建 SparkContextJavaSparkContext sc =newJavaSparkContext("local","foreachExample");// 创建一个包含字符串的RDDJavaRDD<String> rdd = sc.parallelize(List.of("apple","banana","orange","grape"));// 对RDD中的每个元素执行打印操作
        rdd.foreach((VoidFunction<String>) s ->System.out.println(s));// 关闭 SparkContext
        sc.stop();}}

(10)

foreachPartition

()

foreachPartition()方法与foreach()方法类似,但是它是针对RDD中的每个分区而不是每个元素。

对RDD中的每个分区执行一个操作,这在某些情况下可以提高性能,特别是需要在每个分区上执行一些初始化或清理工作时。

voidforeachPartition(VoidFunction<Iterator<T>> f)

f:是一个接受Iterator类型的函数,表示要在RDD的每个分区上执行的操作。

(1)分区迭代:对RDD中的每个分区依次执行指定的操作。在每个分区上,会创建一个Iterator对象,其中包含了该分区的所有元素。

(2)并行执行:在分布式环境下,这些操作会在各个节点上并行执行,以提高整体的执行效率。

(3)初始化和清理:foreachPartition()方法通常用于执行一些需要在每个分区上进行初始化或清理的操作,例如在分区开始时打开数据库连接,在分区结束时关闭连接。

importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importjava.util.Iterator;publicclassForeachPartitionExample{publicstaticvoidmain(String[] args){// 创建 SparkContextJavaSparkContext sc =newJavaSparkContext("local","foreachPartitionExample");// 创建一个包含整数的RDDJavaRDD<Integer> rdd = sc.parallelize(List.of(1,2,3,4,5),2);// 将RDD划分为2个分区// 对RDD中的每个分区执行打印操作
        rdd.foreachPartition(partition ->{while(partition.hasNext()){System.out.println(partition.next());}});// 关闭 SparkContext
        sc.stop();}}

RDD被划分为2个分区,并且对每个分区执行了打印操作。


(11)

saveAsTextFile

(path)

将RDD中的每个元素转换为字符串,并将这些字符串写入到指定路径的文本文件中。

voidsaveAsTextFile(String path)

path:指定要保存数据的目标路径。

(1)将RDD转换为文本行:对于RDD中的每个元素,Spark会调用toString()方法将其转换为一个字符串。

(2)写入文件:将转换后的字符串行写入到指定路径的文本文件中。如果指定的路径已存在,则会覆盖现有文件;如果路径不存在,则会创建新文件。

(2)分区写入:如果RDD是分区的,Spark会将每个分区的数据写入到单独的文件中,并以分区编号作为文件名的一部分。

importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;publicclassSaveAsTextFileExample{publicstaticvoidmain(String[] args){// 创建 SparkContextJavaSparkContext sc =newJavaSparkContext("local","saveAsTextFileExample");// 创建一个包含整数的RDDJavaRDD<Integer> rdd = sc.parallelize(List.of(1,2,3,4,5));// 将RDD中的数据保存到文本文件中
        rdd.saveAsTextFile("output");// 关闭 SparkContext
        sc.stop();}}

RDD中的整数元素被转换为字符串,并保存到名为"output"的目录中。

(1)使用saveAsTextFile方法保存RDD数据时,需要确保RDD中的元素都具有合适的toString()方法,以便正确转换为字符串形式。

(2)如果RDD的数据量很大,保存为文本文件可能会生成大量小文件,这可能会导致文件系统的性能问题。在这种情况下,可以考虑使用更适合大数据量的文件格式,如Parquet或ORC。

(3)在集群环境下使用saveAsTextFile方法时,要确保目标路径是集群中所有节点都能够访问的位置。


(12)

reduce

()

将RDD中的元素通过指定的函数进行聚合。

这个函数必须是可交换和可结合的,以确保在分布式环境中正确执行。

publicTreduce(Function2<T,T,T> func)

func:用于聚合RDD元素的函数。这个函数接受两个类型为T的参数,并返回一个类型为T的结果。

  • 返回值:

T:聚合后的结果。

importorg.apache.spark.api.java.JavaRDD;importorg.apache.spark.api.java.JavaSparkContext;importscala.Function2;importjava.util.Arrays;publicclassMain{publicstaticvoidmain(String[] args){JavaSparkContext sc =newJavaSparkContext("local","ReduceExample");JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1,2,3,4,5));// 使用reduce方法计算RDD中所有元素的总和Integer totalSum = rdd.reduce((Function2<Integer,Integer,Integer>)(x, y)-> x + y);System.out.println("Total sum: "+ totalSum);}}

(1)传递给reduce()方法的函数必须是可交换和可结合的。这是因为在分布式环境中,Spark会将RDD分割成多个分区,每个分区在不同的计算节点上执行。

(2)可交换和可结合的函数可以确保在不同分区上执行聚合操作时,得到的最终结果是确定的和可预期的。


欢迎一键三连呀列位彦祖、亦菲~


本文转载自: https://blog.csdn.net/weixin_48935611/article/details/139079494
版权归原作者 喻师傅 所有, 如有侵权,请联系我们删除。

“Spark-RDD-常用算子(方法)详解”的评论:

还没有评论