0


Spark追妻系列(TopN案例和动作算子)

今天虽然有一些感冒,但是现在感觉实力爆棚,一股想要学习的冲动

小谈:

    昨天晚上睡觉的时候,当时的阅读量有19300多吧,然后写了一篇比较摆烂的博客,当时说如果阅读量破两万,就一定好好写博客,谁知道半夜醒来的时候就已经19900,早上醒来就已经突破两万了,这不明显系统在安排我,让我好好写博客,今天就努一努,不写好一篇博客不睡觉。

    稍微有一些感冒,早上力不从心,头晕然后就没有学习钻被窝里面睡了会觉。和我的那个她聊了聊天,玩了一会跳棋。

TopN案例

    **时间戳 省份 城市 用户 广告**

    1516609143867 6 7 64 16
     1516609143869 9 4 75 18
     1516609143869 1 7 87 12
     1516609143869 2 8 92 9
     1516609143869 6 7 84 24
     1516609143869 1 8 95 5
     1516609143869 8 1 90 29

    求每个省份,排名前三的广告以及点击量。

    先来看一下图解

    总之先取出相应的字段,对这些字段进行格式转换。之后使用各个算子进行操作

    下面来看代码吧,根据代码一一讲解
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
//时间戳 省份 城市 用户 广告 
val value = sparkContext.textFile("date/agent.log") 
//首先取出来省份和广告 //map转换 (省份-广告,1) 
val value1 = value.map( line => {
 val strings = line.split(" ") 
(strings(1) + " - " +strings(4),1 ) } ) 
//根据key进行累加,最后结果就是(省份-广告,总数) 
val value2 = value1.reduceByKey(_ + _) 
//(省份-广告 ,总数) =>(省份,(广告,总数)) 
val value3 = value2.map { 
case (province, clickCount) => {
 val strings = province.split("-")
 (strings(0), (strings(1), clickCount)) } } 
//对省份进行分组,分组后的数据格式 
//(省份,((广告,总数),(广告,总数),(广告,总数))) 
val value4 = value3.groupByKey() 
//对后面的广告根据总数进行排序,取降序 
//mapValues 对Value进行操作 将Value根据SortWith自定义排序 
value4.mapValues( itr => {
 itr.toList.sortWith( 
(left,right)=>{ left._2 > right._2 } ).take(3) } )
.collect().foreach(println(_))
    求TopN,关键就是在于要将格式进行转化,其实算子的操作都很简单,重要的点是能否转换成自己想要的格式,如果没有思路,可以先在纸上写一些思路,有了思路就可以很好的进行编码了。Reduce

    聚合RDD中的所有元素,先聚合分区内的元素,再聚合分区间的元素

 **   val value = sparkContext.makeRDD(List(1, 2, 3, 4),2)**

    两个分区,分区0: 1 2 分区1:3 4

    **聚合的时候,先将分区内的元素聚合**

    分区0: 1 +2 =3 分区1: 3 + 4 = 7

    来看一下代码
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value = sparkContext.makeRDD(List(1, 2, 3, 4)) 
val i = value.reduce(_ + _) println(i)
    可以看到的是,在转换算子的时候,都会有RDD显示出来,但是动作算子并没有显示RDD,

    可以看到,**转换算子并不存储数据,在Spark初了解博客里面就说过,转换算子并不存储数据,只是存储操作,当数据到节点之后,数据就会根据记录的操作对数据进行转换。**

    行动算子之后,就可以从节点将数据返回到Driver端。

Collect

    在驱动程序中,以数组Array形式返回数据集的所有元素。

    来看一下Collect动作算子

 
   def collect(): Array[T] = withScope { 
val results = sc.runJob(this, (iter: Iterator[T]) => 
iter.toArray) Array.concat(results: _*) }
    可以看到 iter.toArray。将元素转换成数组的形式

foreach

    遍历RDD中的每一个元素,并依次调用里面的函数

    举一个例子
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value = sparkContext.makeRDD(List(1, 2, 3, 4))
 val i = value.foreach(println(_)) println(i)
    上面这个例子中,就会遍历RDD中的每一个元素,因为设置的f函数就是打印操作

Count

    **返回RDD中元素个数**
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value = sparkContext.makeRDD(List(1, 2, 3, 4)) 
val i = value.count() println(i)

看一下图解

first

** 返回RDD中的第一个元素**

val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value = sparkContext.makeRDD(List(1, 2, 3, 4)) 
val i = value.first() println(i)
    返回第一个元素,就是1

take

    **返回一个由RDD的前n个元素组成的数组**
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value = sparkContext.makeRDD(List(1, 2, 3, 4)) 
val i = value.take(2) println(i)
    返回的是数组类型,单独打印的话,打印的就是数组的地址。

takeOrdered

    **返回该RDD排序后的前n个元素组成的数组**
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount") 
val sparkContext = new SparkContext(wordCount) 
val value = sparkContext.makeRDD(List(1, 2, 3, 4)) 
val i = value.takeOrdered(2) println(i)

怎么知道返回的是排序后的数组呢,看一下源码

    .toArray.sorted(ord),像转换成数组,然后排序。排完序后取前n个值

aggregate

   ** 将每个分区内的元素通过分区内逻辑和初始值进行聚合,然后用分区间逻辑和初始值进行操作。**

先看图解,根据图解来讲

    4个数据分了8个区
val value = sparkContext.makeRDD(List(1, 2, 3, 4),8) 
val i1 = value.aggregate(10)(_+ _, _ + _)
    **在分区内:对每一个值都和设定的初始值10,进行相加。**

** 分区间:在分区间计算的时候,将每个分区间的值进行相加,之后再与初始值进行相加。**

countByKey

    统计每种key的个数

    看一下图解

    返回的是Map[K,Long]类型

    key key的总数

总结:

    更**新到这里就不写了,要去歇息一会。**

** 如果我是一个打工仔,那么明天就要上班拉,虽然现在还不是打工仔,已经快是了。**

** 这两天更新的内容,干货不是特别多,到明天会更新分区器的相关内容。**

** 明天就要开始卷啦,颤抖吧,学生仔!!!**


本文转载自: https://blog.csdn.net/weixin_46300771/article/details/122800970
版权归原作者 数仓白菜白 所有, 如有侵权,请联系我们删除。

“Spark追妻系列(TopN案例和动作算子)”的评论:

还没有评论