0


【大数据篇】Spark转换算子(Transformations)和行动算子(Actions)详解

Apache Spark 提供了大量的算子(操作),这些算子大致可以分为两类:转换算子(Transformations)和行动算子(Actions)。转换算子用于创建一个新的RDD,而行动算子则对RDD进行操作并产生结果。下面是一些常用的Spark算子和相应的代码示例:

转换算子(Transformations)

  1. **map(func)**:- 对RDD的每个元素应用函数func,返回一个新的RDD。- 示例:将每个元素乘以2。scalaCopy codeval rdd = sc.parallelize(List(1, 2, 3, 4))val mappedRDD = rdd.map(x => x * 2)
  2. **filter(func)**:- 返回一个新的RDD,包含应用函数func后返回值为true的原始元素。- 示例:过滤出大于2的元素。scalaCopy codeval filteredRDD = rdd.filter(x => x > 2)
  3. **flatMap(func)**:- 与map类似,但每个输入元素可以映射到0或多个输出元素(因此,func应返回一个序列,而不是单一元素)。- 示例:将每个数字映射到它的值和它的平方。scalaCopy codeval flatMappedRDD = rdd.flatMap(x => List(x, x*x))
  4. **reduceByKey(func)**:- 适用于键值对(Pair RDDs)的RDD,返回一个新的RDD,其中每个键的值是应用函数func聚合的结果。- 示例:对每个键进行值的累加。scalaCopy codeval pairRDD = sc.parallelize(List(("a", 1), ("b", 1), ("a", 1)))val reducedRDD = pairRDD.reduceByKey((x, y) => x + y)
  5. **join(otherDataset)**:- 对于两个键值对RDD,返回一个新的RDD,包含两个RDD中键相同的元素的组合。- 示例:连接两个RDD。scalaCopy codeval rdd1 = sc.parallelize(List(("a", 1), ("b", 2)))val rdd2 = sc.parallelize(List(("a", 3), ("a", 4), ("b", 5)))val joinedRDD = rdd1.join(rdd2)

行动算子(Actions)

  1. **collect()**:- 在Driver程序中以数组的形式返回RDD的所有元素。- 示例:收集RDD中的所有元素。scalaCopy codeval result = mappedRDD.collect()
  2. **count()**:- 返回RDD中元素的数量。- 示例:计算RDD中的元素个数。scalaCopy codeval count = rdd.count()
  3. **reduce(func)**:- 通过函数func(接受两个参数并返回一个)来聚合RDD中的元素。- 示例:求和。scalaCopy codeval sum = rdd.reduce((x, y) => x + y)
  4. **take(n)**:- 返回一个数组,包含RDD中的前n个元素。- 示例:取RDD的前3个元素。scalaCopy codeval first3 = rdd.take(3)
  5. **saveAsTextFile(path)**:- 将RDD中的元素写入到一个文本文件中,或者文本文件的集合中(取决于RDD的分区数)。- 示例:将RDD保存到文件系统。scalaCopy codemappedRDD.saveAsTextFile("path/to/output")

更多转换算子(Transformations)

  1. **distinct()**:- 返回一个新的RDD,其中包含源RDD的所有不同元素。- 示例:去除重复元素。scalaCopy codeval rdd = sc.parallelize(List(1, 1, 2, 3, 3, 4))val distinctRDD = rdd.distinct()
  2. **union(otherDataset)**:- 返回一个新的RDD,包含源RDD和另一个RDD的所有元素。- 示例:合并两个RDD。scalaCopy codeval rdd1 = sc.parallelize(List(1, 2, 3))val rdd2 = sc.parallelize(List(4, 5, 6))val unionRDD = rdd1.union(rdd2)
  3. **intersection(otherDataset)**:- 返回一个新的RDD,包含两个RDD的共同元素。- 示例:找出两个RDD的交集。scalaCopy codeval rdd1 = sc.parallelize(List(1, 2, 3, 4))val rdd2 = sc.parallelize(List(3, 4, 5, 6))val intersectionRDD = rdd1.intersection(rdd2)
  4. **subtract(otherDataset)**:- 返回一个新的RDD,包含源RDD中有而另一个RDD中没有的元素。- 示例:从一个RDD中减去另一个RDD的元素。scalaCopy codeval rdd1 = sc.parallelize(List(1, 2, 3, 4))val rdd2 = sc.parallelize(List(3, 4, 5, 6))val subtractedRDD = rdd1.subtract(rdd2)
  5. **cartesian(otherDataset)**:- 对两个RDD中的所有元素进行笛卡尔积操作,返回所有可能的元素对。- 示例:计算两个RDD的笛卡尔积。scalaCopy codeval rdd1 = sc.parallelize(List(1, 2))val rdd2 = sc.parallelize(List("a", "b"))val cartesianRDD = rdd1.cartesian(rdd2)

更多行动算子(Actions)

  1. **foreach(func)**:- 对RDD中的每个元素应用函数func。- 示例:对每个元素执行打印操作。scalaCopy coderdd.foreach(x => println(x))
  2. **aggregate(zeroValue)(seqOp, combOp)**:- 聚合RDD中的元素,首先使用seqOp操作聚合每个分区的数据,然后使用combOp操作聚合所有分区的结果。- 示例:计算RDD中所有元素的总和和计数。scalaCopy codeval result = rdd.aggregate((0, 0))( (acc, value) => (acc._1 + value, acc._2 + 1), (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))
  3. **fold(zeroValue)(func)**:- 与aggregate类似,但是seqOpcombOp使用相同的函数。- 示例:计算RDD中所有元素的总和。scalaCopy codeval sum = rdd.fold(0)((acc, value) => acc + value)
  4. **countByKey()**:- 仅适用于键值对RDD,对每个键计数。- 示例:计算每个键的出现次数。scalaCopy codeval pairRDD = sc.parallelize(List(("a", 1), ("b", 1), ("a", 1)))val counts = pairRDD.countByKey()
  5. **saveAsNewAPIHadoopFile(path)**:- 将RDD保存到Hadoop支持的文件系统中。- 示例:将RDD保存为Hadoop文件。scalaCopy coderdd.saveAsNewAPIHadoopFile("path/to/output", classOf[Text], classOf[IntWritable], classOf[TextOutputFormat[Text, IntWritable]])

本文转载自: https://blog.csdn.net/lin819747263/article/details/137416995
版权归原作者 林木森^~^ 所有, 如有侵权,请联系我们删除。

“【大数据篇】Spark转换算子(Transformations)和行动算子(Actions)详解”的评论:

还没有评论