Value类型
map()
def map[U: ClassTag](f: T => U): RDD[U]= withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this,(_, _, iter)=> iter.map(cleanF))}
- 作用:遍历RDD的函数,应用f函数对数据处理,然后生成一个新的RDD
- 例子
object SP_map {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value: RDD[String]= sc.textFile("data/1.txt").map(line => line +"map")
value.foreach(println)
sc.stop()}}
mapPartitions
def mapPartitions[U: ClassTag](
f: Iterator[T]=> Iterator[U],
preservesPartitioning:Boolean=false): RDD[U]= withScope {val cleanedF = sc.clean(f)new MapPartitionsRDD(this,(_: TaskContext, _:Int, iter: Iterator[T])=> cleanedF(iter),
preservesPartitioning)}
- 作用:此算子是以分区为单位,处理数据,而map是以一次处理一个元素
- preservesPartitioning:是否保留原来分区的信息,默认是不保留
- 例子
object SP_mapPartitions {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value: RDD[String]= sc.textFile("data/1.txt").mapPartitions(iter =>{
iter.map(x =>{
x +"900"})})
value.foreach(println)
sc.stop()}}
mapPartitionsWithIndex
def mapPartitionsWithIndex[U: ClassTag](
f:(Int, Iterator[T])=> Iterator[U],
preservesPartitioning:Boolean=false): RDD[U]= withScope {val cleanedF = sc.clean(f)new MapPartitionsRDD(this,(_: TaskContext, index:Int, iter: Iterator[T])=> cleanedF(index, iter),
preservesPartitioning)}
- 作用:跟mapPartiions一样,也是按照分区处理,但是这个会显示分区号
- 例子
object SP_mapPartitionsWithIndex {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value: RDD[String]= sc.textFile("data/1.txt").mapPartitionsWithIndex((index, iter)=>{
iter.map(x =>{
index +"==="+ x
})})
value.foreach(println)
sc.stop()}}
map和mapPartitions及mapPartitionsWithIndex区别
- map是一个一个元素去处理元素;这样省内存,不易内存溢出;但是效率就比较低;
- mapPartitions和mapPartitionsWithIndex是按照分区处理,按照批处理,这样效率更高;但是占用的内存更高,可能导致内存溢出;
flatMap
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]= withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[U, T](this,(_, _, iter)=> iter.flatMap(cleanF))}
- 作用:输入一行数据拆分多行数据
- 例子
object SP_flatMap {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value: RDD[String]= sc.textFile("data/1.txt").flatMap(x => x.split(" "))
value.foreach(println)
sc.stop()}}
glom
def glom(): RDD[Array[T]]= withScope {new MapPartitionsRDD[Array[T], T](this,(_, _, iter)=> Iterator(iter.toArray))}
- 作用:是把每个分区变成一个数组
- 例子
object SP_glom {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[Array[Int]]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(line => line.toInt).glom()val value1: RDD[Int]= value.map(_.max)
value1.foreach(println)
sc.stop()}}
groupBy
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]= withScope {
groupBy[K](f, defaultPartitioner(this))}
- 作用:把所有的元素分组,把相同key的数据分到一组;在这个过程中存在shuffle,在shuffle前后,数据量是一样的
- 例子
object SP_groupBy {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[(String, Iterable[(String,Int)])]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(word =>(word,1)).groupBy(_._1)
value.foreach(println)
sc.stop()}}
Note:
This operation may be very expensive. If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using PairRDDFunctions.aggregateByKey or PairRDDFunctions.reduceByKey will provide much better performance.
这个操作是非常昂贵的,最好用aggregateByKey 和reduceByKey代替。就是在shuffle前先进行计算,这样可以减少shuffle的数据。
filter
def filter(f: T =>Boolean): RDD[T]= withScope {val cleanF = sc.clean(f)new MapPartitionsRDD[T, T](this,(_, _, iter)=> iter.filter(cleanF),
preservesPartitioning =true)}
- 作用:按照一定的条件过滤。随着数据的数据的过滤,可能导致分区的数据分布不均,导致数据倾斜,最好在重新分区,使数据分布均匀。
- 例子
object SP_filter {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[Int]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(line => line.toInt).filter(_>5)
value.foreach(println)
sc.stop()}}
sample
def sample(
withReplacement:Boolean,
fraction:Double,
seed:Long= Utils.random.nextLong): RDD[T]={
require(fraction >=0,s"Fraction must be nonnegative, but got ${fraction}")
withScope {if(withReplacement){new PartitionwiseSampledRDD[T, T](this,new PoissonSampler[T](fraction),true, seed)}else{new PartitionwiseSampledRDD[T, T](this,new BernoulliSampler[T](fraction),true, seed)}}}
- 作用:在数据集上取样;
- 参数说明: - withReplacement:抽出的数据是否放回,true为有放回取样,用的是泊松分布取样;false为无放回抽样,用的是伯努利取样- fraction:当withReplacement=false时,选择每个元素的概率,取值是[0,1];当withReplacement=ture时,选择每个元素期望的次数,取值必须大于等于0;- seed:指定随机数生成的种子器
- 例子:
object SP_sample {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[Int]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(line => line.toInt).sample(true,0.4,2)
value.foreach(println)
sc.stop()}}
distinct
def distinct(): RDD[T]= withScope {
distinct(partitions.length)}
- 作用:把分区中重复的元素去重
- 例子
object SP_distinct {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value: RDD[String]= sc.textFile("data/1.txt").flatMap(line => line.split(" ")).distinct()
value.foreach(println)
sc.stop()}}
- 实现去重的源码
def distinct(numPartitions:Int)(implicit ord: Ordering[T]=null): RDD[T]= withScope {def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T]={// Create an instance of external append only map which ignores values.val map =new ExternalAppendOnlyMap[T, Null, Null](
createCombiner = _ =>null,
mergeValue =(a, b)=> a,
mergeCombiners =(a, b)=> a)
map.insertAll(partition.map(_ ->null))
map.iterator.map(_._1)}
partitioner match{case Some(_)if numPartitions == partitions.length =>
mapPartitions(removeDuplicatesInPartition, preservesPartitioning =true)case _ => map(x =>(x,null)).reduceByKey((x, _)=> x, numPartitions).map(_._1)}}
- 当新RDD的分区数和原来的分区数相同时,它会创建扩展的追加的map,而忽略值
- 当新RDD的分区数和原来的分区数不相同时,先用reduceByKey再shuffle前去重,减少shuffle数据
coalesce
def coalesce(numPartitions:Int, shuffle:Boolean=false,
partitionCoalescer: Option[PartitionCoalescer]= Option.empty)(implicit ord: Ordering[T]=null): RDD[T]
- 作用:缩减分区数,主要用于大数据集过滤后,数据分布不均,提高小数据集的执行效率
- 参数:numPartitions是分区数;shuffle:是否shuffle,默认是false;
- 例子
object SP_coalesce {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[Int]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(line => line.toInt).filter(_>5).coalesce(2)
value.foreach(println)
sc.stop()}}
repartition
def repartition(numPartitions:Int)(implicit ord: Ordering[T]=null): RDD[T]= withScope {
coalesce(numPartitions, shuffle =true)}
- 作用:这个也重新分区,但是这个肯定会执行shuffle;这个就是coalesce实现的,就是shuffle等于true
- 例子:
object SP_repartition {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")
conf.set("spark.default.parallelism","4")val sc: SparkContext =new SparkContext(conf)val value: RDD[Int]= sc.textFile("data/1.txt").flatMap(_.split(" ")).map(line => line.toInt).filter(_>5).repartition(2)
value.foreach(println)
sc.stop()}}
sortBy
def sortBy[K](
f:(T)=> K,
ascending:Boolean=true,
numPartitions:Int=this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]= withScope {this.keyBy[K](f).sortByKey(ascending, numPartitions).values
}
- 作用:该操作是用于排序数据。在排序之前,可以将数据通过f函数进行处理,之后按照f函数处理的结果进行排序。默认是正序排列,排序后新RDD的分区数和原来RDD的分区数相同.
- 例子
object SP_sortby {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")// conf.set("spark.default.parallelism", "4")val sc: SparkContext =new SparkContext(conf)val value: RDD[(Int,Int)]= sc.textFile("data/1.txt").flatMap(_.split(" ")).mapPartitionsWithIndex((index, iter)=>{
iter.map(x=>{(index,x.toInt)})}).sortBy(x=>x,false)
value.foreach(x=>{println(x._1+"=="+ x._2)})
sc.stop()}}
双value类型
union
def union(other: RDD[T]): RDD[T]= withScope {
sc.union(this, other)}
- 作用:对两个RDD求并集返回一个新的RDD;两个RDD的数据类型必须一致;不会发生shuffle,它的依赖关系是用RangeDependency。
- 例子
object SP_union {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[String]= sc.textFile("data/u1.txt").map(line => line)val value2: RDD[String]= sc.textFile("data/u2.txt").map(line => line)val value: RDD[String]= value1.union(value2)
value.foreach(println)
sc.stop()}}
substract
def subtract(other: RDD[T]): RDD[T]= withScope {
subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))}
- 作用:求两个RDD的差集;如果两个RDD的分区不同,则会进行shuffle,相同则不进行shuffle
- 例子
object SP_subtract {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[String]= sc.textFile("data/u1.txt").map(line => line)val value2: RDD[String]= sc.textFile("data/u2.txt").map(line => line)val value: RDD[String]= value1.subtract(value2)
value.foreach(println)
sc.stop()}}
intersection
def intersection(other: RDD[T]): RDD[T]= withScope {this.map(v =>(v,null)).cogroup(other.map(v =>(v,null))).filter {case(_,(leftGroup, rightGroup))=> leftGroup.nonEmpty && rightGroup.nonEmpty }.keys
}
- 作用:求两个RDD的交集。如果两个RDD的分区不同,则会进行shuffle,相同则不进行shuffle
- 例子
object SP_intersection {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[String]= sc.textFile("data/u1.txt").map(line => line)val value2: RDD[String]= sc.textFile("data/u2.txt").map(line => line)val value: RDD[String]= value1.intersection(value2)
value.foreach(println)
sc.stop()}}
zip
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]= withScope {
zipPartitions(other, preservesPartitioning =false){(thisIter, otherIter)=>new Iterator[(T, U)]{def hasNext:Boolean=(thisIter.hasNext, otherIter.hasNext)match{case(true,true)=>truecase(false,false)=>falsecase _ =>throw SparkCoreErrors.canOnlyZipRDDsWithSamePartitionSizeError()}def next():(T, U)=(thisIter.next(), otherIter.next())}}}
- 作用:两个RDD的元祖组成key-value的形式进行合并,其中第一个RDD的元素为key,第二个RDD的元素为value;两个RDD的分区数和元素数必须一致,否则会抛出异常
- 例子
object SP_zip {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[String]= sc.textFile("data/u1.txt").map(line => line).repartition(2)val value2: RDD[String]= sc.textFile("data/u2.txt").map(line => line).repartition(2)val value: RDD[(String,String)]= value1.zip(value2)
value.foreach(println)
sc.stop()}}
Key-Value类型
partitionBy
def partitionBy(partitioner: Partitioner): RDD[(K, V)]=self.withScope {if(keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]){throw SparkCoreErrors.hashPartitionerCannotPartitionArrayKeyError()}if(self.partitioner == Some(partitioner)){self}else{new ShuffledRDD[K, V, V](self, partitioner)}}
- 作用:自定义分区函数,按照key根据自定义函数分区
- 例子
object SP_partitionBy {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[(String,String)]= sc.textFile("data/u1.txt").map(line =>(line, line))val value: RDD[(String,String)]= value1.partitionBy(new HashPartitioner(2))
println(value.partitions.size)
println(value1.partitions.size)
sc.stop()}}
reduceByKey
def reduceByKey(partitioner: Partitioner, func:(V, V)=> V): RDD[(K, V)]=self.withScope {
combineByKeyWithClassTag[V]((v: V)=> v, func, func, partitioner)}
- 作用:按照key进行汇总,这个算子比较高效,因为在shuffle前进行了预聚合,减少shuffle的数据量
- 例子
object SP_reduceByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[(String,Int)]= sc.textFile("data/u1.txt").map(line =>(line, line.toInt))val value: RDD[(String,Int)]= value1.reduceByKey(_+_)
value.foreach(f =>{println(f._1 +"="+ f._2)})
sc.stop()}}
groupByKey
def groupByKey(): RDD[(K, Iterable[V])]=self.withScope {
groupByKey(defaultPartitioner(self))}
- 作用:对每个key进行分组,生成一个序列,shuffle前不会进行预聚合,效率比reduceByKey要低;
- 例子
object SP_groupByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val value1: RDD[(String,Int)]= sc.textFile("data/u1.txt").map(line =>(line, line.toInt))val key: RDD[(String, Iterable[Int])]= value1.groupByKey
val value: RDD[(String,Int)]= key.map(f =>(f._1, f._2.sum))
value.foreach(f =>{println(f._1 +"="+ f._2)})
sc.stop()}}
aggregateByKey
def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp:(U, V)=> U,
combOp:(U, U)=> U): RDD[(K, U)]
- 作用:zeroValue:是初始值;seqOp:单个分区内的计算函数;combOp:分区间的计算函数
- 例子:
object SP_aggregateByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)def seqOp(a:Int,b:Int):Int={
math.max(a,b)}def combOp(a:Int,b:Int):Int={
a+b
}//给每个key一个初始值为0,分区内对每个key计算最大值,分区间同一个key对应的值相加//val value: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max(_, _), _ + _)val value: RDD[(String,Int)]= rdd.aggregateByKey(0)(seqOp, combOp)
value.foreach(f =>{println(f._1 +"="+ f._2)})
sc.stop()}}
combineByKey
def combineByKey[C](
createCombiner: V => C,
mergeValue:(C, V)=> C,
mergeCombiners:(C, C)=> C,
partitioner: Partitioner,
mapSideCombine:Boolean=true,
serializer: Serializer =null): RDD[(K, C)]
- 作用:转换结构后,分区内和分区间操作。 - createCombiner: V => C:对读进来的数据进行初始化,对值进行转换操作; - mergeValue: (C, V) => C:该函数主要是分区内的合并函数,C:是上一个函数转换之后的数据格式,V:原始的数据格式,即上一个函数createCombiner转换前的格式 - mergeCombiners: (C, C) => C:该函数主要是分区间的进行合并,
- 例子
object SP_combineByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)def seqOp(a:(Int,Int),b:Int):(Int,Int)={(a._1+b,a._2+1)}def combOp(a:(Int,Int),b:(Int,Int)):(Int,Int)={(a._1+b._1,a._2+b._2)}//计算每个key的平均值val value: RDD[(String,(Int,Int))]= rdd.combineByKey((_,1),
seqOp,
combOp
)
value.map{case(key,value)=>{(key,value._1/value._2.toDouble)}}.collect().foreach(println)
sc.stop()}}
foldByKey
def foldByKey(
zeroValue: V,
partitioner: Partitioner)(func:(V, V)=> V): RDD[(K, V)]=self.withScope {// Serialize the zero value to a byte array so that we can get a new clone of it on each keyval zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue)val zeroArray =new Array[Byte](zeroBuffer.limit)
zeroBuffer.get(zeroArray)// When deserializing, use a lazy val to create just one instance of the serializer per tasklazyval cachedSerializer = SparkEnv.get.serializer.newInstance()val createZero =()=> cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))val cleanedFunc =self.context.clean(func)
combineByKeyWithClassTag[V]((v: V)=> cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, partitioner)}
- 作用:初始化值,分区内和分区间进行操作,和aggregateByKey差不多,foldByKey的分区内和分区间的函数是相同,而aggregateByKey可以不同,底层实现是一样的。
- 例子
object SP_foldByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)def combOp(a:Int,b:Int):Int={
a+b
}val value: RDD[(String,Int)]= rdd.foldByKey(0)( combOp)
value.foreach(f =>{println(f._1 +"="+ f._2)})
sc.stop()}}
sortByKey
def sortByKey(ascending:Boolean=true, numPartitions:Int=self.partitions.length): RDD[(K, V)]=self.withScope
{val part =new RangePartitioner(numPartitions,self, ascending)new ShuffledRDD[K, V, V](self, part).setKeyOrdering(if(ascending) ordering else ordering.reverse)}
- 作用:按照key排序,默认是正序排列,设置false是倒序排列;
- 例子:
object SP_sortByKey {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)val value: RDD[(String,Int)]= rdd.sortByKey()
value.foreach(f =>{println(f._1 +"="+ f._2)})
sc.stop()}}
mapValues
def mapValues[U](f: V => U): RDD[(K, U)]=self.withScope {val cleanF =self.context.clean(f)new MapPartitionsRDD[(K, U),(K, V)](self,(context, pid, iter)=> iter.map {case(k, v)=>(k, cleanF(v))},
preservesPartitioning =true)}
- 作用:只对value的值进行操作
- 例子:
object SP_mapValues {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)val value: RDD[(String,Int)]= rdd.mapValues(_*2)
value.foreach(f =>{println(f._1 +"="+ f._2)})
sc.stop()}}
join、leftOuterJoin 、rightOuterJoin、fullOuterJoin
- 作用:跟sql语句一样,都是根据key去关联RDD,join是取交集;leftOuterJoin是左连接;rightOuterJoin是右连接;fullOuterJoin全连接。它们的底层都是cogroup实现的。
- 例子
object SP_join {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd1: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("f",8)),2)val rdd2: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("d",2),("c",4),("b",3),("e",6),("c",8)),2)val value: RDD[(String,(Int,Int))]= rdd1.join(rdd2)
value.foreach(f =>{println(f._1 +"="+ f._2)})
sc.stop()}}
cogroup
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner): RDD[(K,(Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]
- 作用:相当于全连接,生成key对应的左右RDD的序列,最多能关联3个RDD;
- 列子
object SP_cogroup {def main(args: Array[String]):Unit={val conf: SparkConf =new SparkConf().setMaster("local[*]").setAppName("WC")val sc: SparkContext =new SparkContext(conf)val rdd1: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("a",2),("c",4),("b",3),("c",6),("f",8)),2)val rdd2: RDD[(String,Int)]= sc.makeRDD(List(("a",3),("d",2),("c",4),("b",3),("e",6),("c",8)),2)val value: RDD[(String,(Iterable[Int], Iterable[Int], Iterable[Int], Iterable[Int]))]= rdd1.cogroup(rdd2, rdd2, rdd2)
value.foreach(f =>{println(f._1 +"="+ f._2)})
sc.stop()}}
版权归原作者 joseph645494423 所有, 如有侵权,请联系我们删除。