一、源数据
本章所分析的数据来自于Simon Fraser大学公开的YouTube视频网站的视频数据(https://netsg.cs.sfu.ca/youtubedata/)。数据包含两张表。第一张为视频表,记录了研究人员爬取的视频的元数据信息,具体包括以下字段:
字段备注详细描述video id视频唯一id11位字符串uploader视频上传者上传视频的用户名Stringage视频年龄视频在平台上的整数天category视频类别上传视频指定的视频分类length视频长度整形数字标识的视频长度views观看次数视频被浏览的次数rate视频评分满分5分ratings流量视频的流量,整型数字conments评论数一个视频的整数评论数related ids相关视频id相关视频的id,最多20个
第二张表为用户表,记录了爬取的YouTube用户的相关信息,具体包括:
字段备注字段类型uploader上传者用户名stringvideos上传视频数intfriends朋友数量int
二、练习题
0. 数据预处理
本章所分析的视频信息下载自http://netsg.cs.sfu.ca/youtubedata/080327.zip,我们将该压缩包中的所有文件进行了归并,并过滤掉那些字段数不足10个的记录。此外,我们将category字段的数据进行了预处理,将所有的类别用&分割,同时去掉两边空格。并且,多个相关视频id也使用&进行分割。用户信息则下载自https://netsg.cs.sfu.ca/youtubedata/080903user.zip。然后,我们将这些数据读取为Spark DataFrame形式,以供后续分析。
val spark = SparkSession
.builder()
.appName("Youtube")
.getOrCreate()
import spark.implicits._
/* 加载源数据 */
// 源数据下载自 https://netsg.cs.sfu.ca/youtubedata/
// 加载视频数据
val videoRDD =
spark.sparkContext.textFile("hdfs:///SparkLearning/youtube_video.txt")
val videoSchema = StructType(
Array[StructField](
StructField("video_id", StringType, nullable = true),
StructField("uploader", StringType, nullable = true),
StructField("age", IntegerType, nullable = true),
StructField("category", ArrayType(StringType), nullable = true),
StructField("length", IntegerType, nullable = true),
StructField("views", IntegerType, nullable = true),
StructField("rate", DoubleType, nullable = true),
StructField("ratings", IntegerType, nullable = true),
StructField("comments", IntegerType, nullable = true),
StructField("related_ids", ArrayType(StringType), nullable = true)
)
)
val rowVideoRDD = videoRDD
.map(_.split("\t"))
.map(attributes =>
Row(
attributes(0),
attributes(1),
attributes(2).toInt,
attributes(3).split("&"),
attributes(4).toInt,
attributes(5).toInt,
attributes(6).toDouble,
attributes(7).toInt,
attributes(8).toInt,
attributes(9).split("&")
)
)
val videoDF = spark.createDataFrame(rowVideoRDD, videoSchema)
// 加载用户数据
val userRDD =
spark.sparkContext.textFile("hdfs:///SparkLearning/youtube_user.txt")
val userSchema = StructType(
Array[StructField](
StructField("uploader", StringType, nullable = true),
StructField("videos", IntegerType, nullable = true),
StructField("friends", IntegerType, nullable = true)
)
)
val rowUserRDD = userRDD
.map(_.split("\t"))
.map(attributes =>
Row(attributes(0), attributes(1).toInt, attributes(2).toInt)
)
val userDF = spark.createDataFrame(rowUserRDD, userSchema)
1. 统计视频观看数Top10
val res = videoDF
.select($"video_id", $"views")
.orderBy($"views".desc)
.limit(10)
2. 统计视频类别热度Top10
val res = videoDF
.select(explode($"category").as("category"))
.groupBy($"category")
.count()
.orderBy($"count".desc)
.limit(10)
3. 统计出视频观看数最高的20个视频的所属类别以及类别包含Top20视频的个数
val res = videoDF
.orderBy($"views".desc)
.limit(20)
.select(explode($"category").as("category"))
.groupBy($"category")
.count()
4. 统计视频观看数Top50所关联视频的所属类别Rank
val res = videoDF
.orderBy($"views".desc)
.limit(50)
.select(explode($"related_ids").as("related_id"))
.alias("t1")
.join(videoDF.as("t2"), $"t1.related_id" === $"t2.video_id")
.select(explode($"t2.category").as("category"))
.groupBy($"category")
.count()
.orderBy($"count".desc)
5. 统计每个类别中的视频观看数Top10
val res = videoDF
.select($"video_id", explode($"category").as("category"), $"views")
.select(
$"category",
$"video_id",
$"views",
row_number()
.over(Window.partitionBy($"category").orderBy($"views".desc))
.alias("rank")
)
.filter($"rank" <= 10)
.orderBy($"category", $"rank")
6. 统计上传视频最多的用户Top10以及他们上传的视频
val res = userDF
.orderBy($"videos".desc)
.limit(10)
.alias("t1")
.join(videoDF.alias("t2"), $"t1.uploader" === $"t2.uploader")
.select($"t2.uploader", $"t2.video_id")
.orderBy($"uploader")
版权归原作者 liulizhi1996 所有, 如有侵权,请联系我们删除。