0


Spark转化算子

Value类型

map()

def map[U: ClassTag](f: T => U): RDD[U]= withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this,(_, _, iter)=> iter.map(cleanF))}
  • 作用:遍历RDD的函数,应用f函数对数据处理,然后生成一个新的RDD
  • 例子
object SP_map {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value: RDD[String]= sc.textFile("data/1.txt").map(line => line +"map")
   value.foreach(println)
   sc.stop()}}

mapPartitions

def mapPartitions[U: ClassTag](
      f: Iterator[T]=> Iterator[U],
      preservesPartitioning:Boolean=false): RDD[U]= withScope {val cleanedF = sc.clean(f)new MapPartitionsRDD(this,(_: TaskContext, _:Int, iter: Iterator[T])=> cleanedF(iter),
      preservesPartitioning)}
  • 作用:此算子是以分区为单位,处理数据,而map是以一次处理一个元素
  • preservesPartitioning:是否保留原来分区的信息,默认是不保留
  • 例子
object SP_mapPartitions {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value: RDD[String]= sc.textFile("data/1.txt").mapPartitions(iter =>{
    iter.map(x =>{
      x +"900"})})
  value.foreach(println)
  sc.stop()}}

mapPartitionsWithIndex

def mapPartitionsWithIndex[U: ClassTag](
      f:(Int, Iterator[T])=> Iterator[U],
      preservesPartitioning:Boolean=false): RDD[U]= withScope {val cleanedF = sc.clean(f)new MapPartitionsRDD(this,(_: TaskContext, index:Int, iter: Iterator[T])=> cleanedF(index, iter),
      preservesPartitioning)}
  • 作用:跟mapPartiions一样,也是按照分区处理,但是这个会显示分区号
  • 例子
object SP_mapPartitionsWithIndex {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value: RDD[String]= sc.textFile("data/1.txt").mapPartitionsWithIndex((index, iter)=>{
    iter.map(x =>{
      index +"==="+ x
    })})
  value.foreach(println)
  sc.stop()}}

map和mapPartitions及mapPartitionsWithIndex区别

  • map是一个一个元素去处理元素;这样省内存,不易内存溢出;但是效率就比较低;
  • mapPartitions和mapPartitionsWithIndex是按照分区处理,按照批处理,这样效率更高;但是占用的内存更高,可能导致内存溢出;

flatMap

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]= withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this,(_, _, iter)=> iter.flatMap(cleanF))}
  • 作用:输入一行数据拆分多行数据在这里插入图片描述
  • 例子
object SP_flatMap {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value: RDD[String]= sc.textFile("data/1.txt").flatMap(x => x.split(" "))
    value.foreach(println)
    sc.stop()}}

glom

def glom(): RDD[Array[T]]= withScope {new MapPartitionsRDD[Array[T], T](this,(_, _, iter)=> Iterator(iter.toArray))}
  • 作用:是把每个分区变成一个数组
  • 例子
object SP_glom {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
    conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[Array[Int]]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(line => line.toInt).glom()val value1: RDD[Int]= value.map(_.max)

    value1.foreach(println)
    sc.stop()}}

groupBy

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]= withScope {
    groupBy[K](f, defaultPartitioner(this))}
  • 作用:把所有的元素分组,把相同key的数据分到一组;在这个过程中存在shuffle,在shuffle前后,数据量是一样的
  • 例子
object SP_groupBy {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
  conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[(String, Iterable[(String,Int)])]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(word =>(word,1)).groupBy(_._1)
  value.foreach(println)
  sc.stop()}}

Note:
This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance.
这个操作是非常昂贵的,最好用aggregateByKey 和reduceByKey代替。就是在shuffle前先进行计算,这样可以减少shuffle的数据。

filter

def filter(f: T =>Boolean): RDD[T]= withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[T, T](this,(_, _, iter)=> iter.filter(cleanF),
      preservesPartitioning =true)}
  • 作用:按照一定的条件过滤。随着数据的数据的过滤,可能导致分区的数据分布不均,导致数据倾斜,最好在重新分区,使数据分布均匀。
  • 例子
object SP_filter {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
  conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[Int]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(line => line.toInt).filter(_>5)
  value.foreach(println)
  sc.stop()}}

sample

def sample(
      withReplacement:Boolean,
      fraction:Double,
      seed:Long= Utils.random.nextLong): RDD[T]={
    require(fraction >=0,s"Fraction must be nonnegative, but got ${fraction}")

    withScope {if(withReplacement){new PartitionwiseSampledRDD[T, T](this,new PoissonSampler[T](fraction),true, seed)}else{new PartitionwiseSampledRDD[T, T](this,new BernoulliSampler[T](fraction),true, seed)}}}
  • 作用:在数据集上取样;
  • 参数说明: - withReplacement:抽出的数据是否放回,true为有放回取样,用的是泊松分布取样;false为无放回抽样,用的是伯努利取样- fraction:当withReplacement=false时,选择每个元素的概率,取值是[0,1];当withReplacement=ture时,选择每个元素期望的次数,取值必须大于等于0;- seed:指定随机数生成的种子器
  • 例子:
object SP_sample {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
    conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[Int]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(line => line.toInt).sample(true,0.4,2)
    value.foreach(println)
    sc.stop()}}

distinct

def distinct(): RDD[T]= withScope {
    distinct(partitions.length)}
  • 作用:把分区中重复的元素去重
  • 例子
object SP_distinct {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value: RDD[String]= sc.textFile("data/1.txt").flatMap(line => line.split(" ")).distinct()
  value.foreach(println)
  sc.stop()}}
  • 实现去重的源码
def distinct(numPartitions:Int)(implicit ord: Ordering[T]=null): RDD[T]= withScope {def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T]={// Create an instance of external append only map which ignores values.val map =new ExternalAppendOnlyMap[T, Null, Null](
        createCombiner = _ =>null,
        mergeValue =(a, b)=> a,
        mergeCombiners =(a, b)=> a)
      map.insertAll(partition.map(_ ->null))
      map.iterator.map(_._1)}
    partitioner match{case Some(_)if numPartitions == partitions.length =>
        mapPartitions(removeDuplicatesInPartition, preservesPartitioning =true)case _ => map(x =>(x,null)).reduceByKey((x, _)=> x, numPartitions).map(_._1)}}
  • 当新RDD的分区数和原来的分区数相同时,它会创建扩展的追加的map,而忽略值
  • 当新RDD的分区数和原来的分区数不相同时,先用reduceByKey再shuffle前去重,减少shuffle数据

coalesce

def coalesce(numPartitions:Int, shuffle:Boolean=false,
               partitionCoalescer: Option[PartitionCoalescer]= Option.empty)(implicit ord: Ordering[T]=null): RDD[T]
  • 作用:缩减分区数,主要用于大数据集过滤后,数据分布不均,提高小数据集的执行效率
  • 参数:numPartitions是分区数;shuffle:是否shuffle,默认是false;
  • 例子
object SP_coalesce {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
    conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[Int]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(line => line.toInt).filter(_>5).coalesce(2)
    value.foreach(println)
    sc.stop()}}

repartition

def repartition(numPartitions:Int)(implicit ord: Ordering[T]=null): RDD[T]= withScope {
    coalesce(numPartitions, shuffle =true)}
  • 作用:这个也重新分区,但是这个肯定会执行shuffle;这个就是coalesce实现的,就是shuffle等于true
  • 例子:
object SP_repartition {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
  conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[Int]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(line => line.toInt).filter(_>5).repartition(2)
  value.foreach(println)
  sc.stop()}}

sortBy

def sortBy[K](
      f:(T)=> K,
      ascending:Boolean=true,
      numPartitions:Int=this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]= withScope {this.keyBy[K](f).sortByKey(ascending, numPartitions).values
  }
  • 作用:该操作是用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序。默认是正序排列,排序后新RDD的分区数和原来RDD的分区数相同.
  • 例子
object SP_sortby {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")// conf.set("spark.default.parallelism", "4")val sc: SparkContext =new SparkContext(conf)val value: RDD[(Int,Int)]= sc.textFile("data/1.txt").flatMap(_.split(" ")).mapPartitionsWithIndex((index, iter)=>{
    iter.map(x=>{(index,x.toInt)})}).sortBy(x=>x,false)
  value.foreach(x=>{println(x._1+"=="+ x._2)})
  sc.stop()}}

双value类型

union

def union(other: RDD[T]): RDD[T]= withScope {
    sc.union(this, other)}
  • 作用:对两个RDD求并集返回一个新的RDD;两个RDD的数据类型必须一致;不会发生shuffle,它的依赖关系是用RangeDependency。
  • 例子
object SP_union {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[String]= sc.textFile("data/u1.txt").map(line => line)val value2: RDD[String]= sc.textFile("data/u2.txt").map(line => line)val value: RDD[String]= value1.union(value2)
    value.foreach(println)
    sc.stop()}}

substract

def subtract(other: RDD[T]): RDD[T]= withScope {
    subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))}
  • 作用:求两个RDD的差集;如果两个RDD的分区不同,则会进行shuffle,相同则不进行shuffle
  • 例子
object SP_subtract {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[String]= sc.textFile("data/u1.txt").map(line => line)val value2: RDD[String]= sc.textFile("data/u2.txt").map(line => line)val value: RDD[String]= value1.subtract(value2)
   value.foreach(println)
   sc.stop()}}

intersection

def intersection(other: RDD[T]): RDD[T]= withScope {this.map(v =>(v,null)).cogroup(other.map(v =>(v,null))).filter {case(_,(leftGroup, rightGroup))=> leftGroup.nonEmpty && rightGroup.nonEmpty }.keys
  }
  • 作用:求两个RDD的交集。如果两个RDD的分区不同,则会进行shuffle,相同则不进行shuffle
  • 例子
object SP_intersection {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[String]= sc.textFile("data/u1.txt").map(line => line)val value2: RDD[String]= sc.textFile("data/u2.txt").map(line => line)val value: RDD[String]= value1.intersection(value2)
   value.foreach(println)
   sc.stop()}}

zip

def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]= withScope {
    zipPartitions(other, preservesPartitioning =false){(thisIter, otherIter)=>new Iterator[(T, U)]{def hasNext:Boolean=(thisIter.hasNext, otherIter.hasNext)match{case(true,true)=>truecase(false,false)=>falsecase _ =>throw SparkCoreErrors.canOnlyZipRDDsWithSamePartitionSizeError()}def next():(T, U)=(thisIter.next(), otherIter.next())}}}
  • 作用:两个RDD的元祖组成key-value的形式进行合并,其中第一个RDD的元素为key,第二个RDD的元素为value;两个RDD的分区数和元素数必须一致,否则会抛出异常
  • 例子
object SP_zip {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[String]= sc.textFile("data/u1.txt").map(line => line).repartition(2)val value2: RDD[String]= sc.textFile("data/u2.txt").map(line => line).repartition(2)val value: RDD[(String,String)]= value1.zip(value2)
   value.foreach(println)
   sc.stop()}}

Key-Value类型

partitionBy

def partitionBy(partitioner: Partitioner): RDD[(K, V)]=self.withScope {if(keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]){throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()}if(self.partitioner == Some(partitioner)){self}else{new ShuffledRDD[K, V, V](self, partitioner)}}
  • 作用:自定义分区函数,按照key根据自定义函数分区
  • 例子
object SP_partitionBy {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[(String,String)]= sc.textFile("data/u1.txt").map(line =>(line, line))val value: RDD[(String,String)]= value1.partitionBy(new HashPartitioner(2))
   println(value.partitions.size)
   println(value1.partitions.size)
   sc.stop()}}

reduceByKey

def reduceByKey(partitioner: Partitioner, func:(V, V)=> V): RDD[(K, V)]=self.withScope {
    combineByKeyWithClassTag[V]((v: V)=> v, func, func, partitioner)}
  • 作用:按照key进行汇总,这个算子比较高效,因为在shuffle前进行了预聚合,减少shuffle的数据量
  • 例子
object SP_reduceByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[(String,Int)]= sc.textFile("data/u1.txt").map(line =>(line, line.toInt))val value: RDD[(String,Int)]= value1.reduceByKey(_+_)
    value.foreach(f =>{println(f._1 +"="+ f._2)})
    sc.stop()}}

groupByKey

def groupByKey(): RDD[(K, Iterable[V])]=self.withScope {
    groupByKey(defaultPartitioner(self))}
  • 作用:对每个key进行分组,生成一个序列,shuffle前不会进行预聚合,效率比reduceByKey要低;
  • 例子
object SP_groupByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[(String,Int)]= sc.textFile("data/u1.txt").map(line =>(line, line.toInt))val key: RDD[(String, Iterable[Int])]= value1.groupByKey
   val value: RDD[(String,Int)]= key.map(f =>(f._1, f._2.sum))
   value.foreach(f =>{println(f._1 +"="+ f._2)})
   sc.stop()}}

aggregateByKey

def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp:(U, V)=> U,
      combOp:(U, U)=> U): RDD[(K, U)]
  • 作用:zeroValue:是初始值;seqOp:单个分区内的计算函数;combOp:分区间的计算函数
  • 例子:
object SP_aggregateByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)def seqOp(a:Int,b:Int):Int={
      math.max(a,b)}def combOp(a:Int,b:Int):Int={
      a+b
    }//给每个key一个初始值为0,分区内对每个key计算最大值,分区间同一个key对应的值相加//val value: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max(_, _), _ + _)val value: RDD[(String,Int)]= rdd.aggregateByKey(0)(seqOp, combOp)
    value.foreach(f =>{println(f._1 +"="+ f._2)})
    sc.stop()}}

combineByKey

def combineByKey[C](
      createCombiner: V => C,
      mergeValue:(C, V)=> C,
      mergeCombiners:(C, C)=> C,
      partitioner: Partitioner,
      mapSideCombine:Boolean=true,
      serializer: Serializer =null): RDD[(K, C)]
  • 作用:转换结构后,分区内和分区间操作。 - createCombiner: V => C:对读进来的数据进行初始化,对值进行转换操作; - mergeValue: (C, V) => C:该函数主要是分区内的合并函数,C:是上一个函数转换之后的数据格式,V:原始的数据格式,即上一个函数createCombiner转换前的格式 - mergeCombiners: (C, C) => C:该函数主要是分区间的进行合并,
  • 例子
object SP_combineByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)def seqOp(a:(Int,Int),b:Int):(Int,Int)={(a._1+b,a._2+1)}def combOp(a:(Int,Int),b:(Int,Int)):(Int,Int)={(a._1+b._1,a._2+b._2)}//计算每个key的平均值val value: RDD[(String,(Int,Int))]= rdd.combineByKey((_,1),
      seqOp,
      combOp
    )
    value.map{case(key,value)=>{(key,value._1/value._2.toDouble)}}.collect().foreach(println)
    sc.stop()}}

foldByKey

def foldByKey(
      zeroValue: V,
      partitioner: Partitioner)(func:(V, V)=> V): RDD[(K, V)]=self.withScope {// Serialize the zero value to a byte array so that we can get a new clone of it on each keyval zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)val zeroArray =new Array[Byte](zeroBuffer.limit)
    zeroBuffer.get(zeroArray)// When deserializing, use a lazy val to create just one instance of the serializer per tasklazyval cachedSerializer = SparkEnv.get.serializer.newInstance()val createZero =()=> cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))val cleanedFunc =self.context.clean(func)
    combineByKeyWithClassTag[V]((v: V)=> cleanedFunc(createZero(), v),
      cleanedFunc, cleanedFunc, partitioner)}
  • 作用:初始化值,分区内和分区间进行操作,和aggregateByKey差不多,foldByKey的分区内和分区间的函数是相同,而aggregateByKey可以不同,底层实现是一样的。
  • 例子
object SP_foldByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)def combOp(a:Int,b:Int):Int={
    a+b
  }val value: RDD[(String,Int)]= rdd.foldByKey(0)( combOp)
  value.foreach(f =>{println(f._1 +"="+ f._2)})
  sc.stop()}}

sortByKey

def sortByKey(ascending:Boolean=true, numPartitions:Int=self.partitions.length): RDD[(K, V)]=self.withScope
  {val part =new RangePartitioner(numPartitions,self, ascending)new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if(ascending) ordering else ordering.reverse)}
  • 作用:按照key排序,默认是正序排列,设置false是倒序排列;
  • 例子:
object SP_sortByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)val value: RDD[(String,Int)]= rdd.sortByKey()
  value.foreach(f =>{println(f._1 +"="+ f._2)})
  sc.stop()}}

mapValues

def mapValues[U](f: V => U): RDD[(K, U)]=self.withScope {val cleanF =self.context.clean(f)new MapPartitionsRDD[(K, U),(K, V)](self,(context, pid, iter)=> iter.map {case(k, v)=>(k, cleanF(v))},
      preservesPartitioning =true)}
  • 作用:只对value的值进行操作
  • 例子:
object SP_mapValues {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)val value: RDD[(String,Int)]= rdd.mapValues(_*2)
  value.foreach(f =>{println(f._1 +"="+ f._2)})
  sc.stop()}}

join、leftOuterJoin 、rightOuterJoin、fullOuterJoin

  • 作用:跟sql语句一样,都是根据key去关联RDD,join是取交集;leftOuterJoin是左连接;rightOuterJoin是右连接;fullOuterJoin全连接。它们的底层都是cogroup实现的。
  • 例子
object SP_join {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd1: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("f",8)),2)val rdd2: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("d",2),("c",4),("b",3),("e",6),("c",8)),2)val value: RDD[(String,(Int,Int))]= rdd1.join(rdd2)
    value.foreach(f =>{println(f._1 +"="+ f._2)})
    sc.stop()}}

cogroup

def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
      other2: RDD[(K, W2)],
      other3: RDD[(K, W3)],
      partitioner: Partitioner): RDD[(K,(Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
  • 作用:相当于全连接,生成key对应的左右RDD的序列,最多能关联3个RDD;
  • 列子
object SP_cogroup {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd1: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("f",8)),2)val rdd2: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("d",2),("c",4),("b",3),("e",6),("c",8)),2)val value: RDD[(String,(Iterable[Int], Iterable[Int], Iterable[Int], Iterable[Int]))]= rdd1.cogroup(rdd2, rdd2, rdd2)
    value.foreach(f =>{println(f._1 +"="+ f._2)})
    sc.stop()}}
标签: spark 大数据

本文转载自: https://blog.csdn.net/u013297137/article/details/139773345
版权归原作者 joseph645494423 所有, 如有侵权,请联系我们删除。

“Spark转化算子”的评论:

还没有评论