0


Spark常见算子详解

文章目录

二、Spark算子分类

1、 Transform(转换)算子
2、 Action(行动) 算子

Transform算子

定义:RDD经过Transform算子之后,还是一个RDD
特征:懒加载,如果没有行动算子,那么不会生效

Action算子

定义:

注:以下算子代码均为Scala

三、常用的Transform算子,使用方法及经验

1.map算子

功能:将RDD中的数据一条条按照匿名函数map()中的逻辑进行处理,返回一个新的RDD
例子:

//需求:1234=>2468val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4))//转换函数 ,把旧的rdd转换成新的mapRDD,即为转换函数val mapRDD = rdd.map(_*2)

      mapRDD.collect().foreach(println)//collect算子触发
      sc.stop()

2.flatmap算子

定义:就是scala中的扁平化处理,将每一行的数据切割成一个个元素
例子:

val rdd: RDD[String]= sc.makeRDD(List("hello spark","hello scala"))val flatRDD: RDD[String]= rdd.flatMap(s => s.split(" "))

    flatRDD.collect().foreach(println)
    sc.stop()

通常搭配split及分隔符使用

3.mappartitions算子

定义:以分区为一个迭代器,对每个迭代器处理数据
例子:

val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4),2)//mapPartitions :可以以分区为单位进行数据转换操作,存在缓冲区一个分区数据再处理//               但是会将整个分区的数据加载到内存中引用//               如果处理完数据不会被释放掉,分区内数据量大,内存小的时候,会存在内存溢出val mpRDD: RDD[Int]= rdd.mapPartitions(
      iter =>{
        println(">>>>>>>>>>>")
        iter.map(_ *2)})
    mpRDD.collect().foreach(println)

需要以分区为单位处理数据的时候,效率比map算子高

例子:

val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4),2)//需求:计算分区内的最大值val mpRDD: RDD[Int]= rdd.mapPartitions(
      iter =>{
        List(iter.max).iterator   //单值转换成迭代器类型})
    mpRDD.collect().foreach(println)
    sc.stop()//结果:2//     4

4.mapPartitionsWithIndex算子

定义:对RDD某个分区进行操作,匿名函数内是(index, iter) 索引加上每个分区对应的迭代器数据进行操作

例子1:

val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4),2)//需求,只保留 需要1分区的数据val mipRDD: RDD[Int]= rdd.mapPartitionsWithIndex((index, iter)=>{if(index ==1){
          iter
        }else{
          Nil.iterator          //Nil 代表空的集合}})
    mipRDD.collect().foreach(println)

例子2:

val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4),2)//需求,获取集合数据的所在分区val mpiRDD: RDD[(Int,Int)]= rdd.mapPartitionsWithIndex((index, iter)=>{
        iter.map(
          num =>{(index, num)})})//结果://(0,1)//(0,2)//(1,3)//(1,4)

5.mapValues算子

6.glom 算子

定义:返回的是分区数对应的数组RDD
例子:

val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4),2)val glomRDD: RDD[Array[Int]]= rdd.glom()

    glomRDD.collect().foreach(data => println(data.mkString(",")))//glom 返回的是分区数对应数量的数组,两个数组类型的RDD,所以要用匿名函数遍历,两个数组

    sc.stop()//结果://1,2//3,4

常用于分区间的计算

例子:

val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4),2)//需求:求分区内最大值,分区间的和val glomRDD: RDD[Array[Int]]= rdd.glom()val maxRDD: RDD[Int]= glomRDD.map(
      array => array.max
    )

   println(maxRDD.collect().sum)// 结果 6

7.groupBy 算子

定义:groupBy算子,rdd.groupBy(匿名函数内定义分区的Key)
例子:

val rdd: RDD[String]= sc.makeRDD(List("Hello","Hadoop","Scala","Spark"),2)//按照字符串首字母进行分组,分组的传递的是函数val firstRDD: RDD[(Char, Iterable[String])]= rdd.groupBy((s:String)=>{
      s.charAt(0)})

    firstRDD.collect().foreach(println)

    sc.stop()//结果//(H,CompactBuffer(Hello, Hadoop))//(S,CompactBuffer(Scala, Spark))

7.filter 算子

定义:rdd.filter(匿名函数中定义筛选条件,符合条件为true,保留数据)
例子:

//RDD算子 filter(可能造成数据倾斜,RDD算子是针对每个分区进行计算的// 有的分区筛选后,会造成数据不均衡的情况)val sc =new SparkContext(sparkConf)val rdd = sc.makeRDD(List(1,2,3,4))val filterRDD = rdd.filter(num =>{num%2!=0})

    filterRDD.collect().foreach(println)

项目实践:在用户行为项目中,根据入参,筛选出session关联的目标用户信息,某个年龄段,性别,所在城市,关键字等等

8.sample 算子

定义:随机抽取数据的算子,一共有三个参数

val rdd = sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))//第一个参数:抽取是否放回//第二个参数:数据源中每个数据被抽取的概率,可以理解基准值,60分及格,那每个数据超过0.4那么就被抽取,没有超过0.4则不会被抽取,当种子数为1的时候,每次给某一个数据的随机值就是固定的,是否超0.4(固定)//第三个参数:随机抽取算法的种子//如果概率固定,种子固定,则每次抽取的结果是一样的

    println(rdd.sample(false,0.4,1//默认参数为系统时间,不固定的话,每个数据的随机数都是不同的).collect().mkString(","))//第二种场景,是抽取放回,那么第二个参数表示的时每个数据被抽取可能的次数// 用途:数据倾斜判断哪个枚举值倾斜/*println(rdd.sample(
      true,
      2
    ).collect().mkString(","))*/

9. Coalesce and Repartition 算子

Coalesce:缩减RDD分区数量
用法:rdd.coalesce(2,default=true) 缩减分区默认是不会shuffle(打乱原有数据在分区中的顺序),所以也容易形成数据倾斜

val rdd = sc.makeRDD(List(1,2,3,4,5,6),3)//缩减分区:coalesce默认不会将分区内已排序的数据,[1,2][3,4][5,6]打乱重新排序val newRDD: RDD[Int]= rdd.coalesce(2)//所以在没有shuffle的情况下:[1,2][3,4,5,6] 可能会造成数据倾斜,如果想均匀分,就shuffle

Repartition:增加RDD分区数量,底层就是coalesce(n,true)

val rdd = sc.makeRDD(List(1,2,3,4,5,6),3)val newRDD2: RDD[Int]= rdd.coalesce(2,true)

源码:

def repartition(numPartitions:Int)(implicit ord: Ordering[T]=null): RDD[T]= withScope {
    coalesce(numPartitions, shuffle =true)}

10. Distinct 算子

定义:对RDD里面的数据进行去重,scala是用set集合,底层是hashset去重
例子:

val rdd = sc.makeRDD(List(1,2,3,4,1,2,3,4))//比对scala去重和RDD去重的源码是否一致//scala去重,底层是hashset集合,set本来就是过滤重复的集合//List(1,2,4,5).distinctval disRDD: RDD[Int]= rdd.distinct()

源码:

case _ => map(x =>(x,null)).reduceByKey((x, _)=> x, numPartitions).map(_._1)

源码解析:
第一步:map (x => (x,null))
(1,null)(2,null),(3,null),(4,null),(1,null)
第二步:reduceByKey((x,) => x, numPartitions)
reduceByKey(x,
) => x 的意思是,(null)(null) => null 我只看迭代后第一个值
(1,null)(1,null) ,Key相同,value聚合 (x,) = (null,null) =null 两两聚合只取第一个值
最终结果:(1,null)
第三部
map(
._1)=map(x =>{x._1})每个集合我只取第一个值,就是1

12. sortBy 算子

定义:根据指定的匿名函数规则排序,第二个参数决定是asc还是desc,默认是asc,不会改变分区数量,但是会有shuffle,重新打乱排序
例子:

val rdd = sc.makeRDD(List(("1",1),("11",3),("2",2)),2)//sortBy 方法可以根据指定的规则进行排序,默认是asc,第二个参数决定asc还是desc//sortBy不会修改分区数,但是会有shuffle过程,就是重新打乱再排序val sortRDD: RDD[(String,Int)]= rdd.sortBy(t =>{t._2})

    sortRDD.collect().foreach(println)//结果//(1,1)//(2,2)//(11,3)

13. 双value类型算子

双value类型一般有:交集:rdd1.intersection(rdd2)
并集:rdd1.union(rdd2)
差集:rdd1.subtract(rdd2)
拉链:rdd1.zip(rdd2)
例子:

val rdd1 = sc.makeRDD(List(1,2,3,4))val rdd2 = sc.makeRDD(List(3,4,5,6))//交集:[3,4] 双value类型//交集,并集和差集要求两个RDD类型一致//拉链操作的两个数据源类型可以不一致,数据元素数量必须一致val rdd3 = rdd1.intersection(rdd2)
    println(rdd3.collect().mkString(","))//并集:[3,4]val rdd4 = rdd1.union(rdd2)
    println(rdd4.collect().mkString(","))//差集:[3,4]val rdd5 = rdd1.subtract(rdd2)
    println(rdd5.collect().mkString(","))//拉链:[3,4]val rdd6 = rdd1.zip(rdd2)
    println(rdd6.collect().mkString(","))}

14. <key,value>类型算子partitionBy

定义:按照key来进行重分区
例子:

val rdd = sc.makeRDD(List(1,2,3,4),2)//目前rdd不是k-V类型,所以无法使用paritionByval mapRDD: RDD[(Int,Int)]= rdd.map((_,1))//这个partitionBy,不属于RDD.scala,而属于 PairRDDFunctions.scala//隐式转换(理解为二次编译,把RDD=>PairRDDFunctions)//implicit def rddToPairRDDFunctions//Hash分区逻辑:
    mapRDD.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")

源码:

def numPartitions:Int= partitions

    def getPartition(key:Any):Int= key match{casenull=>0//如果key是Null则放在0号分区case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) 否则用key的hashcode,取模分区数
    }

15. <key,value>类型算子aggregateByKey

定义:rdd.aggregateByKey((初始化的值)(分区内迭代计算的逻辑)(分区间计算的逻辑))
例子:

val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)//需求:分区内累加value,以及出现的次数(num,cnt)//     分区间两两相加,然后再用num/cnt 相除//加深理解:aggregateByKey最终返回的数据结果应该与初始化的类型保持一致val newRDD: RDD[(String,(Int,Int))]= rdd.aggregateByKey((0,0))((t, v)=>{//(3,2)     (6,1)             分区内计算:(t,v)=( (0,0) ,value),实际就是初始值,与叠加运算的value值(t._1 + v, t._2 +1)},(t1, t2)=>{//分区间计算:(t1,t2)=a (num,cnt)(num1,cnt1) 其实就是不同分区对应的数值和以及出现的次数(t1._1 + t2._1, t1._2 + t2._2)//(3,2)+(6,1)  })val resultRDD: RDD[(String,Int)]= newRDD.mapValues {//只对map中的value值处理,key不动的场景,使用mapValues{} 函数case(num, cnt)=>{
        num / cnt    //  9/3 结果就是{a,3}}}

    resultRDD.collect().foreach(println)

16. <key,value>类型算子groupByKey

定义:确定是以Key为分组条件,返回相同Key的value迭代器 [a,(1,2,3)]
例子:

val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("d",4)),2)val groupbykeyRDD: RDD[(String, Iterable[Int])]= rdd.groupByKey()

    groupbykeyRDD.collect().foreach(println)//(a,1,2,3)(d,4)
 println(">>>>>>>>>>>>>>>>>>>")//和group by 的区别val groupbyRDD: RDD[(String, Iterable[(String,Int)])]= rdd.groupBy(_._1)
    groupbyRDD.collect().foreach(println)//结果//(d,CompactBuffer(4))//(a,CompactBuffer(1, 2, 3))//>>>>>>>>>>>>>>>>>>>//(d,CompactBuffer((d,4)))//(a,CompactBuffer((a,1), (a,2), (a,3)))

注:
1.groupByKey 和groupby 的区别是前者是确定以key为分组,所以只返回value的迭代器,程序上后者不确定以什么来分组,所以返回[(String,Int) (K,V)]
2.goupByKey其实就是把相同Key的value集中在一个Excutor的Task中

17. <key,value>类型算子reduceByKey

定义:对相同Key的value进行聚合操作,匿名函数定义
例子:

val rdd = sc.makeRDD(List(("a",1),("a",2),("a",3),("d",4)),2)val reduceRDD: RDD[(String,Int)]= rdd.reduceByKey((x:Int, y:Int)=>{
      println(s"$x,$y")
      x + y
    })//结果://(d,4)//(a,6)

    reduceRDD.collect().foreach(println)

18. <key,value>类型算子combineByKey

定义:

val rdd = sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)//加深理解:通过把第一个值进行转换,再进行后续计算,避免初始值不累计到初始值的做法//CombineByKey//第一个参数表示对第一个数据,结构转换val newRDD: RDD[(String,(Int,Int))]= rdd.combineByKey(
      v =>(v,1),(t:(Int,Int), v)=>{//因为现在没有初始值,所以这里的tuple类型,根据转换函数v =>(v,1) 的结果来定义(t._1 + v, t._2 +1)},(t1:(Int,Int), t2:(Int,Int))=>{//因为现在没有初始值,所以这里的tuple类型,分区内计算的结果来定义(t1._1 + t2._1, t1._2 + t2._2)})val resultRDD: RDD[(String,Int)]= newRDD.mapValues {//只对map中的value值处理,key不动的场景,使用mapValues{} 函数case(num, cnt)=>{
        num / cnt
      }}

    resultRDD.collect().foreach(println)

四、常用的Action算子,行动算子及使用的方法

1.collect算子

定义:将RDD各个分区的数据collect到Driver里面形成List
rdd.collect()

源码:

def collect(): Array[T]= withScope {val results = sc.runJob(this,(iter: Iterator[T])=> iter.toArray)
    Array.concat(results: _*)}

底层有实现RunJob方法,所以是可以触发作业执行的

2.reduce算子

定义:对RDD进行聚合计算,匿名函数实现逻辑
例子:

val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4),2)//reduce算子val i:Int= rdd.reduce(_+_)
    println(i)

3.first算子

定义:取RDD中第一个元素
例子:

val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4),2)val first:Int= rdd.first()
    println(first)

4.take算子

定义:RDD中获取N个数据元素
例子:

val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4),2)//获取N个数据val ints: Array[Int]= rdd.take(3)
    println(ints.mkString(","))

5.takeordered算子

定义:对数据排序然后取出N个数据
例子:

val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4),2)//对数据排序然后取出N个数据val rdd1: RDD[Int]= sc.makeRDD(List(1,3,2,4),2)val ints2: Array[Int]= rdd1.takeOrdered(3)(Ordering[Int]reverse)//降序的用法
    println(ints2.mkString(","))

6.aggregate算子

定义:有初始值,第一个参数是初始值,第二个参数(分区内计算逻辑,分区间计算逻辑)
例子:

val rdd: RDD[Int]= sc.makeRDD(List(1,2,3,4),2)val result:Int= rdd.aggregate(10)(_+_,_+_)
//val result: Int = rdd.fold(10)(_+_)
与aggregateByKey最大的区别就是,不一定要K-V键值类型才可以使用,初始值,不光会参与分区内计算,分区间计算也会参与
分区内计算:10+1+2  10+3+4
分区间计算:10+ 13+17 =40

7.countByKey算子(K-V) 和countByValue(不一定KV,普通RDD也OK)

定义:countByKey算子统计K-v类型的数据中的Key的次数,countByValue统计所有的value出现的次数,统计对象可以是K-v,也可以是List等
例子:

/* val rdd: RDD[Int] = sc.makeRDD(List(1, 1, 1, 4), 2)

    val intToLong: collection.Map[Int, Long] = rdd.countByValue()
    //Map[Int,Long] Int代表的是value,Long代表出现的次数
    println(intToLong)

    sc.stop()*/val rdd = sc.makeRDD(List(("a",1),("a",1),("a",1)))val stringToLong = rdd.countByKey()

    println(stringToLong)//统计a出现的次数,返回的Map(value->Long) 与value值无关

实践:

Map<String,Object> countMap = time2sessionidRDD.countByKey();//<yyyy-MM-dd_HH,CNT>

为了得到每天每小时的session数量,转换RDD的格式以后,进行countByKey操作

7.save相关的三个算子

定义:saveAsTextFile 算子用得比较多,可以把结果保存到HDFS文件中
例子:

val rdd: RDD[(String,Int)]= sc.makeRDD(List(("Hello",1),("Hello",2)))//save算子
    rdd.saveAsTextFile("output")
    rdd.saveAsObjectFile("output1")//saveAsSequence只能够在map类型使用
    rdd.saveAsSequenceFile("output2")

8.foreach 和foreachPartition 算子

五、reduceByKey和groupByKey的区别

未完待续

标签: spark ajax 大数据

本文转载自: https://blog.csdn.net/weixin_42733117/article/details/139259229
版权归原作者 weixin_42733117 所有, 如有侵权,请联系我们删除。

“Spark常见算子详解”的评论:

还没有评论