0


【Spark实训】--竞赛网站访问日志分析

目录


实训题目:竞赛网站访问日志分析

一. 训练要点

(1)搭建Spurk工程环境。

(2) Spark编程。

(3)通过spark-submit提交应用。

二.需求说明

 某竞赛网站每年都会开展数据挖据的竞赛,在竞赛期间网站会有大量人群访问,生成了大量的用户访向记录。现在提供2016年10月到2017年6月的部分脱敏访问日志数据。日志数据的基本内容如图所示,仅提供以下6个字段。

属性名称

属性解析

Id

序号

Content_id

网页ID

Page_path

网址

Userid

用户ID

Sessionid

缓存生成ID

Date_time

访问时间

 要求根据提供的用户访问日志数据,利用Spark技术统计访向的用户数、被访问的不同网页个数以及每月的访问量,并将结果保存到HDFS上。

文章所用文档以及目录等等说明:

(点击可免费下载)访问日志数据: jc_content_viewlog.txt

IDEA内实现代码存储路径与名字:LogCount.scala

jc_content_viewlog.txt 内部分数据如下图:

三.关键实现思路及步骤

(1)配置好Spark的IntelliJ IDEA开发环境。

(2)启动IntelliJ IDEA,并进行Spark编程。

(3)对访向记录中的网页去重,统计本周期内被访问网页的个数。

val logs_all: RDD[Array[String]]  = sc.textFile(args(0)).map{_.split(",")}
val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))

(4) userid为用户注册登录的标识,对userid去重,统计登录用户的数量。

val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))

(5)按月统计访问记录数。

val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)

(6)将结果保存到不同文件中。

wy_count.repartition(1).saveAsTextFile(args(1))
user_count.repartition(1).saveAsTextFile(args(2))
ny_count.repartition(1).saveAsTextFile(args(3))

(7)打包Spark工程,在集群提交应用程序。

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client   --class   net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client --class net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

注:jc.jar是上面文件生成的jar包改名并上传而来;

hdfs://node1:8020/user/root/jc_content_viewlog.txt 是hdfs里面jc_content_viewlog.txt存储路径,也需要自己上传,目录自己决定;

hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3 是设置它的输出存储路径,因为会输出三个不同数据,需要三个目录,不然会报错。

四、LogCount.scala文件完整代码实现:

package net

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object LogCount {
  def main(args: Array[String]): Unit = {

    if(args.length < 2){
      println("请指定input和output")
      System.exit(1)//非0表示非正常退出程序
    }

    //TODO 1.env/准备sc/SparkContext/Spark上下文执行环境
    val conf: SparkConf = new SparkConf().setAppName("wc")
    val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("WARN")

    //TODO 2.source/读取数据
    //RDD:A Resilient Distributed Dataset (RDD):弹性分布式数据集,简单理解为分布式集合!使用起来和普通集合一样简单!
    //RDD[就是一行行的数据]
       val logs_all: RDD[Array[String]]  = sc.textFile(args(0)).map{_.split(",")}
    //TODO 3.transformation/数据操作/转换
    //对访问记录中的网页去重,统计本周期内被访问网页的个数
    val wy_log: RDD[String] = logs_all.map(x=>(x(1).toString)).distinct()
    val wy_count:RDD[(String, Int)]= wy_log.map(("wy_zs",_)).groupByKey().map(x => (x._1,x._2.size))
    //userid为用户注册登录的标识,对userid去重,统计登录用户的数量
    val user_log: RDD[String] = logs_all.map(x=>(x(3).toString)).distinct()
    val user_count:RDD[(String, Int)]= user_log.map(("user_zs",_)).groupByKey().map(x => (x._1,x._2.size))
    //按月统计访问记录数
    val logs_all_new = logs_all.map{x=>(x(0),x(1),x(2),x(3),x(4),x(5),date_time(x(5)))}
    val ny_count: RDD[(String, Int)] = logs_all_new.map(x=>(x._7,1)).reduceByKey((a, b)=>a+b)

    //TODO 4.sink/输出
    //输出到指定path(可以是文件/夹)
    wy_count.repartition(1).saveAsTextFile(args(1))
    user_count.repartition(1).saveAsTextFile(args(2))
    ny_count.repartition(1).saveAsTextFile(args(3))
    //为了便于查看Web-UI可以让程序睡一会
    Thread.sleep(1000 * 60)

    //TODO 5.关闭资源
    sc.stop()
  }
  //获取年月,时间段作为输入参数
  def date_time(date:String):String={
    val nianye =date.trim.substring(0,7)
    nianye
  }

}

五、运行过程与结果截图:


六、具体实现步骤

1、修改打包好的jar名字,并把jar上传到node1结点

2、开启一系列集群:

start-dfs.sh //一键开启
start-yarn.sh //开启
cd /myserver/
mr-jobhistory-daemon.sh start historyserver
/myserver/spark301/sbin/start-history-server.sh
jps //查看

这里不再具体说明如何开启。

3、上传jc_content_viewlog.txt到node1节点,并上传到hdfs

​
[root@node1 ~]# hdfs dfs -put jc_content_viewlog.txt  /user/root/

4、在集群提交应用程序

[root@node1 ~]# /myserver/spark301/bin/spark-submit --master yarn --deploy-mode client   --class   net.LogCount /root/jc.jar hdfs://node1:8020/user/root/jc_content_viewlog.txt hdfs://node1:8020/user/root/jc1 hdfs://node1:8020/user/root/jc2 hdfs://node1:8020/user/root/jc3

七、相关知识点

进入spark-shell

[root@node1 bin]# /myserver/spark301/bin/spark-shell

1、过滤出访问次数在 50 次以上的用户记录

(1)统计用户访问次数并筛选出访问次数在50次以上的用户ID

scala> val data = sc.textFile("hdfs://node1:8020/user/root/jc_content_viewlog.txt").map{x=> x.split(",")}

data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[2] at map at <console>:24

scala> val userid=data.map(line=>(line(3),1)).reduceByKey((a,b)=>a+b).filter(x=>x._2>50).keys .collect

(2)根据过滤后的用户ID,在原数据中筛选出这一部分用户的访问记录

scala> val valib_data=data.filter(x=>userid.contains(x(3)))

valib_data: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[7] at filter at <console>:27

scala> valib_data.take(2) //查看

res1: Array[Array[String]] = Array(Array(480343, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:56:49), Array(480358, 611, /jingsa/611.jhtml, 1, 69463B3F2728DBEB045A5C31CA9C2E3A, 2017-03-01 09:58:50))

2、统计访问 50 次以上的用户主要访问的前 5 类网页

scala> val web = valib_data.map(x=>(x(2),1)).reduceByKey((a,b)=>a+b).sortBy(x=>x._2,false)

web: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[12] at sortBy at <console>:25

scala> web.take(5)

res2: Array[(String, Int)] = Array((/jingsa/1030.jhtml,67899), (/view/contentViewLog.jspx,5008), (/jingsa/712.jhtml,2551), (/youxiuzuopin/823.jhtml,1212), (/jingsa/613.jhtml,968))

3. 合并部分网页

(URL 后面带有_1、_2 字样的翻页网址,统一为一个网址)通过字符串截取的方法,对网页网址字符串进行截取,只截取“_”前面的字符串

scala> val data2=data.filter(_.length>=6).map{

x=>

  var page="";

  if(x(2).contains("_"))

    { page=x(2).substring(0,x(2).lastIndexOf("_")) }

  else

    { page=x(2) };

  (x(0),x(1),page,x(3),x(4),x(5))

  }

data2: org.apache.spark.rdd.RDD[(String, String, String, String, String, String)] = MapPartitionsRDD[14] at map at <console>:25

4.根据访问时间加入对应时段:

6:3011:30 为上午,11:3014:00 为中午,14:0017:30为下午,17:3019:00 为傍晚,19:0023:00 为晚上,23:006:30 为深夜,统计所有用户各时段访问情况

(1)首先定义一个函数,用于匹配时间段并返回相应的字段值

scala> def date_time(date:String):String={
           val hour=date.substring(date.indexOf(" ")+1,date.indexOf(":")).toInt
           val min=date.substring(date.indexOf(":")+1,date.lastIndexOf(":")).toInt
           if(hour<6 && hour>=23) "深夜"
           else if(hour==6 && min<=30) "深夜"
           else if(hour<11 && hour>=6) "上午"
           else if(hour==11 && min<=30) "上午"
           else if(hour<14 && hour>=11) "中午"
           else if(hour>=14 && hour<17) "下午"
           else if(hour==17 && hour<=30) "下午"
           else if(hour>=17 && hour<19) "傍晚"
           else if(hour==19 && min<=30) "傍晚"
           else "晚上"
          }
date_time: (date: String)String

(2)通过map方法对每一条记录的时间进行匹配,增加一个时间段的值到记录中

scala> val data_new = data2.map{x=>(x._1,x._2,x._3,x._4,x._5,x._6,date_time(x._6))}

data_new: org.apache.spark.rdd.RDD[(String, String, String, String, String, String, String)] = MapPartitionsRDD[17] at map at <console>:27

(3)将时段值作为键,值为1,利用reduceByKey的方法统计各时段访问情况

scala> val date_count = data_new.map(x=>(x._7,1)).reduceByKey((a,b)=>a+b)

date_count: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[19] at reduceByKey at <console>:25

scala> date_count.take(10)

res3: Array[(String, Int)] = Array((上午,31675), (傍晚,14511), (中午,18799), (下午,39720), (深夜,81), (晚上,67073))

标签: spark scala

本文转载自: https://blog.csdn.net/weixin_58330979/article/details/124229174
版权归原作者 ⚆Pearl 所有, 如有侵权,请联系我们删除。

“【Spark实训】--竞赛网站访问日志分析”的评论:

还没有评论