今天虽然有一些感冒,但是现在感觉实力爆棚,一股想要学习的冲动
小谈:
昨天晚上睡觉的时候,当时的阅读量有19300多吧,然后写了一篇比较摆烂的博客,当时说如果阅读量破两万,就一定好好写博客,谁知道半夜醒来的时候就已经19900,早上醒来就已经突破两万了,这不明显系统在安排我,让我好好写博客,今天就努一努,不写好一篇博客不睡觉。
稍微有一些感冒,早上力不从心,头晕然后就没有学习钻被窝里面睡了会觉。和我的那个她聊了聊天,玩了一会跳棋。
TopN案例
**时间戳 省份 城市 用户 广告**
1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
1516609143869 2 8 92 9
1516609143869 6 7 84 24
1516609143869 1 8 95 5
1516609143869 8 1 90 29
求每个省份,排名前三的广告以及点击量。
先来看一下图解
总之先取出相应的字段,对这些字段进行格式转换。之后使用各个算子进行操作
下面来看代码吧,根据代码一一讲解
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
//时间戳 省份 城市 用户 广告
val value = sparkContext.textFile("date/agent.log")
//首先取出来省份和广告 //map转换 (省份-广告,1)
val value1 = value.map( line => {
val strings = line.split(" ")
(strings(1) + " - " +strings(4),1 ) } )
//根据key进行累加,最后结果就是(省份-广告,总数)
val value2 = value1.reduceByKey(_ + _)
//(省份-广告 ,总数) =>(省份,(广告,总数))
val value3 = value2.map {
case (province, clickCount) => {
val strings = province.split("-")
(strings(0), (strings(1), clickCount)) } }
//对省份进行分组,分组后的数据格式
//(省份,((广告,总数),(广告,总数),(广告,总数)))
val value4 = value3.groupByKey()
//对后面的广告根据总数进行排序,取降序
//mapValues 对Value进行操作 将Value根据SortWith自定义排序
value4.mapValues( itr => {
itr.toList.sortWith(
(left,right)=>{ left._2 > right._2 } ).take(3) } )
.collect().foreach(println(_))
求TopN,关键就是在于要将格式进行转化,其实算子的操作都很简单,重要的点是能否转换成自己想要的格式,如果没有思路,可以先在纸上写一些思路,有了思路就可以很好的进行编码了。Reduce
聚合RDD中的所有元素,先聚合分区内的元素,再聚合分区间的元素
** val value = sparkContext.makeRDD(List(1, 2, 3, 4),2)**
两个分区,分区0: 1 2 分区1:3 4
**聚合的时候,先将分区内的元素聚合**
分区0: 1 +2 =3 分区1: 3 + 4 = 7
来看一下代码
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4))
val i = value.reduce(_ + _) println(i)
可以看到的是,在转换算子的时候,都会有RDD显示出来,但是动作算子并没有显示RDD,
可以看到,**转换算子并不存储数据,在Spark初了解博客里面就说过,转换算子并不存储数据,只是存储操作,当数据到节点之后,数据就会根据记录的操作对数据进行转换。**
行动算子之后,就可以从节点将数据返回到Driver端。
Collect
在驱动程序中,以数组Array形式返回数据集的所有元素。
来看一下Collect动作算子
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) =>
iter.toArray) Array.concat(results: _*) }
可以看到 iter.toArray。将元素转换成数组的形式
foreach
遍历RDD中的每一个元素,并依次调用里面的函数
举一个例子
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4))
val i = value.foreach(println(_)) println(i)
上面这个例子中,就会遍历RDD中的每一个元素,因为设置的f函数就是打印操作
Count
**返回RDD中元素个数**
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4))
val i = value.count() println(i)
看一下图解
first
** 返回RDD中的第一个元素**
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4))
val i = value.first() println(i)
返回第一个元素,就是1
take
**返回一个由RDD的前n个元素组成的数组**
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4))
val i = value.take(2) println(i)
返回的是数组类型,单独打印的话,打印的就是数组的地址。
takeOrdered
**返回该RDD排序后的前n个元素组成的数组**
val wordCount = new SparkConf().setMaster("local").setAppName("WordCount")
val sparkContext = new SparkContext(wordCount)
val value = sparkContext.makeRDD(List(1, 2, 3, 4))
val i = value.takeOrdered(2) println(i)
怎么知道返回的是排序后的数组呢,看一下源码
.toArray.sorted(ord),像转换成数组,然后排序。排完序后取前n个值
aggregate
** 将每个分区内的元素通过分区内逻辑和初始值进行聚合,然后用分区间逻辑和初始值进行操作。**
先看图解,根据图解来讲
4个数据分了8个区
val value = sparkContext.makeRDD(List(1, 2, 3, 4),8)
val i1 = value.aggregate(10)(_+ _, _ + _)
**在分区内:对每一个值都和设定的初始值10,进行相加。**
** 分区间:在分区间计算的时候,将每个分区间的值进行相加,之后再与初始值进行相加。**
countByKey
统计每种key的个数
看一下图解
返回的是Map[K,Long]类型
key key的总数
总结:
更**新到这里就不写了,要去歇息一会。**
** 如果我是一个打工仔,那么明天就要上班拉,虽然现在还不是打工仔,已经快是了。**
** 这两天更新的内容,干货不是特别多,到明天会更新分区器的相关内容。**
** 明天就要开始卷啦,颤抖吧,学生仔!!!**
版权归原作者 数仓白菜白 所有, 如有侵权,请联系我们删除。