目录
实训题目:竞赛网站访问日志分析
一. 训练要点
(1)搭建Spurk工程环境。
(2) Spark编程。
(3)通过spark-submit提交应用。
二.需求说明
某竞赛网站每年都会开展数据挖据的竞赛,在竞赛期间网站会有大量人群访问,生成了大量的用户访向记录。现在提供2016年10月到2017年6月的部分脱敏访问日志数据。日志数据的基本内容如图所示,仅提供以下6个字段。
属性名称
属性解析
Id
序号
Content_id
网页ID
Page_path
网址
Userid
用户ID
Sessionid
缓存生成ID
Date_time
访问时间
要求根据提供的用户访问日志数据,利用Spark技术统计访向的用户数、被访问的不同网页个数以及每月的访问量,并将结果保存到HDFS上。
文章所用文档以及目录等等说明:
(点击可免费下载)访问日志数据: jc_content_viewlog.txt
IDEA内实现代码存储路径与名字:LogCount.scala
jc_content_viewlog.txt 内部分数据如下图:
三.关键实现思路及步骤
(1)配置好Spark的IntelliJ IDEA开发环境。
(2)启动IntelliJ IDEA,并进行Spark编程。
(3)对访向记录中的网页去重,统计本周期内被访问网页的个数。
val logs_all: RDD[Array[String]] = sc.textFile(args(0)).map{_.split(",")}
val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))
(4) userid为用户注册登录的标识,对userid去重,统计登录用户的数量。
val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))
(5)按月统计访问记录数。
val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)
(6)将结果保存到不同文件中。
wy_count.repartition(1).saveAsTextFile(args(1))
user_count.repartition(1).saveAsTextFile(args(2))
ny_count.repartition(1).saveAsTextFile(args(3))
(7)打包Spark工程,在集群提交应用程序。
[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client --class net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3
[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client --class net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3
注:jc.jar是上面文件生成的jar包改名并上传而来;
hdfs://node1:8020/user/root/jc_content_viewlog.txt 是hdfs里面jc_content_viewlog.txt存储路径,也需要自己上传,目录自己决定;
hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3 是设置它的输出存储路径,因为会输出三个不同数据,需要三个目录,不然会报错。
四、LogCount.scala文件完整代码实现:
package net
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
object LogCount {
def main(args: Array[String]): Unit = {
if(args.length < 2){
println("请指定input和output")
System.exit(1)//非0表示非正常退出程序
}
//TODO 1.env/准备sc/SparkContext/Spark上下文执行环境
val conf: SparkConf = new SparkConf().setAppName("wc")
val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("WARN")
//TODO 2.source/读取数据
//RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单!
//RDD[就是一行行的数据]
val logs_all: RDD[Array[String]] = sc.textFile(args(0)).map{_.split(",")}
//TODO 3.transformation/数据操作/转换
//对访问记录中的网页去重,统计本周期内被访问网页的个数
val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))
//userid为用户注册登录的标识,对userid去重,统计登录用户的数量
val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))
//按月统计访问记录数
val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)
//TODO 4.sink/输出
//输出到指定path(可以是文件/夹)
wy_count.repartition(1).saveAsTextFile(args(1))
user_count.repartition(1).saveAsTextFile(args(2))
ny_count.repartition(1).saveAsTextFile(args(3))
//为了便于查看Web-UI可以让程序睡一会
Thread.sleep(1000 * 60)
//TODO 5.关闭资源
sc.stop()
}
//获取年月,时间段作为输入参数
def date_time(date:String):String={
val nianye =date.trim.substring(0,7)
nianye
}
}
五、运行过程与结果截图:
六、具体实现步骤
1、修改打包好的jar名字,并把jar上传到node1结点
2、开启一系列集群:
start-dfs.sh //一键开启
start-yarn.sh //开启
cd /myserver/
mr-jobhistory-daemon.sh start historyserver
/myserver/spark301/sbin/start-history-server.sh
jps //查看
这里不再具体说明如何开启。
3、上传jc_content_viewlog.txt到node1节点,并上传到hdfs
[root@node1 ~]# hdfs dfs -put jc_content_viewlog.txt /user/root/
4、在集群提交应用程序
[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client --class net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3
七、相关知识点
进入spark-shell
[root@node1 bin]# /myserver/spark301/bin/spark-shell
1、过滤出访问次数在 50 次以上的用户记录
(1)统计用户访问次数并筛选出访问次数在50次以上的用户ID
scala> val data = sc.textFile("hdfs://node1:8020/user/root/jc_content_viewlog.txt").map{x=> x.split(",")}
data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:24
scala> val userid=data.map(line=>(line(3),1)).reduceByKey((a,b)=>a+b).filter(x=>x._2>50).keys .collect
(2)根据过滤后的用户ID,在原数据中筛选出这一部分用户的访问记录
scala> val valib_data=data.filter(x=>userid.contains(x(3)))
valib_data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at filter at <console>:27
scala> valib_data.take(2) //查看
res1: Array[Array[String]] = Array(Array(480343, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:56:49), Array(480358, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:58:50))
2、统计访问 50 次以上的用户主要访问的前 5 类网页
scala> val web = valib_data.map(x=>(x(2),1)).reduceByKey((a,b)=>a+b).sortBy(x=>x._2,false)
web: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at sortBy at <console>:25
scala> web.take(5)
res2: Array[(String, Int)] = Array((/jingsa/1030.jhtml,67899), (/view/contentViewLog.jspx,5008), (/jingsa/712.jhtml,2551), (/youxiuzuopin/823.jhtml,1212), (/jingsa/613.jhtml,968))
3. 合并部分网页
(URL 后面带有_1、_2 字样的翻页网址,统一为一个网址)通过字符串截取的方法,对网页网址字符串进行截取,只截取“_”前面的字符串
scala> val data2=data.filter(_.length>=6).map{
x=> var page=""; if(x(2).contains("_")) { page=x(2).substring(0,x(2).lastIndexOf("_")) } else { page=x(2) }; (x(0),x(1),page,x(3),x(4),x(5)) }
data2: org.apache.spark.rdd.RDD[(String, String, String, String, String, String)] = MapPartitionsRDD[14] at map at <console>:25
4.根据访问时间加入对应时段:
6:3011:30 为上午,11:3014:00 为中午,14:0017:30为下午,17:3019:00 为傍晚,19:0023:00 为晚上,23:006:30 为深夜,统计所有用户各时段访问情况
(1)首先定义一个函数,用于匹配时间段并返回相应的字段值
scala> def date_time(date:String):String={
val hour=date.substring(date.indexOf(" ")+1,date.indexOf(":")).toInt
val min=date.substring(date.indexOf(":")+1,date.lastIndexOf(":")).toInt
if(hour<6 && hour>=23) "深夜"
else if(hour==6 && min<=30) "深夜"
else if(hour<11 && hour>=6) "上午"
else if(hour==11 && min<=30) "上午"
else if(hour<14 && hour>=11) "中午"
else if(hour>=14 && hour<17) "下午"
else if(hour==17 && hour<=30) "下午"
else if(hour>=17 && hour<19) "傍晚"
else if(hour==19 && min<=30) "傍晚"
else "晚上"
}
date_time: (date: String)String
(2)通过map方法对每一条记录的时间进行匹配,增加一个时间段的值到记录中
scala> val data_new = data2.map{x=>(x._1,x._2,x._3,x._4,x._5,x._6,date_time(x._6))}
data_new: org.apache.spark.rdd.RDD[(String, String, String, String, String, String, String)] = MapPartitionsRDD[17] at map at <console>:27
(3)将时段值作为键,值为1,利用reduceByKey的方法统计各时段访问情况
scala> val date_count = data_new.map(x=>(x._7,1)).reduceByKey((a,b)=>a+b)
date_count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:25
scala> date_count.take(10)
res3: Array[(String, Int)] = Array((上午,31675), (傍晚,14511), (中午,18799), (下午,39720), (深夜,81), (晚上,67073))
版权归原作者 ⚆Pearl 所有, 如有侵权,请联系我们删除。