0


Spark 常用行动算子使用总结

前言

Spark RDD 中提供了丰富的行动算子可以帮助我们完成对RDD数据的一些常用统计、聚合等业务的操作,下面将常用的行动算子进行使用总结;

**reduce **

函数签名

def reduce(f: (T, T) => T): T

函数说明

聚集 RDD 中的所有元素,先聚合分区内数据,再聚合分区间数据

案例操作

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ReduceTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    // 聚合数据
    val reduceResult: Int = rdd.reduce(_+_)
    println(reduceResult)
    sc.stop()
  }

}

代码中将列表中的数据进行两两聚合,运行代码,观察控制台输出效果

**count **

函数签名

def count(): Long

函数说明

返回 RDD 中元素的个数

案例操作

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ReduceTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))

    // 聚合数据
    //val reduceResult: Int = rdd.reduce(_+_)
    //println(reduceResult)

    val counts = rdd.count()
    println(counts)
    
    sc.stop()
  }

}

统计列表中元素个数,运行上面的代码,观察输出效果

first

函数签名

def first(): T

函数说明

返回 RDD 中的第一个元素

案例操作

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ReduceTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))

    // 聚合数据
    //val reduceResult: Int = rdd.reduce(_+_)
    //println(reduceResult)

    //val counts = rdd.count()
    //println(counts)

    val first = rdd.first()
    println(first)

    sc.stop()
  }

}

运行程序,观察控制台输出结果

take

函数签名

def take(num: Int): Array[T]

函数说明

返回一个由 RDD 的前 n 个元素组成的数组

案例操作

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ReduceTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)
    val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4,5))

    val takeResult = rdd.take(2)
    println(takeResult.mkString(","))

    sc.stop()
  }

}

运行上面的程序,观察控制台输出效果

**takeOrdered **

函数签名

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]


函数说明

返回该 RDD 排序后的前 n 个元素组成的数组

案例操作

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ReduceTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
    // 返回 RDD 中元素的个数
    val result: Array[Int] = rdd.takeOrdered(2)
    println(result.mkString(","))
    sc.stop()
  }

}

运行上面的程序,观察控制台输出

**aggregate **

函数签名

def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

函数说明

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

案例展示

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ReduceTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 8)
    // 将该 RDD 所有元素相加得到结果
    val result: Int = rdd.aggregate(0)(_ + _, _ + _)
    //val result: Int = rdd.aggregate(10)(_ + _, _ + _)
    println(result)
    sc.stop()
  }

}

运行上面的程序,观察控制台输出结果

fold

函数签名

def fold(zeroValue: T)(op: (T, T) => T): T

函数说明

折叠操作,aggregate 的简化版操作

案例操作

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ReduceTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
    val foldResult: Int = rdd.fold(0)(_+_)
    println(foldResult)
    sc.stop()
  }

}

运行上面的程序,观察控制台输出效果

countByKey

函数签名

def countByKey(): Map[K, Long]

函数说明

统计每种 key 的个数

案例操作

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object ReduceTest {

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
    val sc = new SparkContext(sparkConf)

    val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2,
      "b"), (3, "c"), (3, "c")))
    // 统计每种 key 的个数
    val result: collection.Map[Int, Long] = rdd.countByKey()
    println(result)

    sc.stop()
  }

}

运行上面的程序,观察控制台输出


本文转载自: https://blog.csdn.net/congge_study/article/details/124461494
版权归原作者 逆风飞翔的小叔 所有, 如有侵权,请联系我们删除。

“Spark 常用行动算子使用总结”的评论:

还没有评论