0


大学期末大作业-基于spark探索b站每周必看视频热门的因素

前言无用废话(可以不别看):

这里是一个菜菜大学生记录自己的期末大作业的文章,结合了本人的作业和自己好多小(废话),这篇文章格式上为了美观与讲述,并非最终文档格式

spark技术这门课程的期末将考试改成大作业+答辩,总体要求按我的理解是:数据采集与预处理+利用spark数据分析+数据可视化+(创新:课上没讲过的知识)+有点像毕设要求的文档+答辩用的ppt。数据源是爬虫,爬数据加分嘿嘿,参考网上的方法与代码爬的哔哩哔哩。在数据预处理阶段三个要点,包括数据清洗,数据转换,数据整合,但其实对于期末作业的小剂量数据,爬虫再进行一些简单的处理就可以直接获取到合适的数据,但是为了完成要求作业的有些部分是有点点牵强。数据分析使用的spark core spark sql spark streamming三个技术各分析三个方面。数据可视化是使用的pyecharts。代码注释是喂了ai,老师在各方面给了很细的要求,尤其文档虽然老师给了简易参考,但是根据自己内容改还要按要求调格式真的弄了好久( ཀ͝ ∧ ཀ͝ ),最后是我的ppt用了好好看的模板,喜欢嘿嘿。


第一章 引言(略)

1.1 选题目的和意义

随着()。因此,探索B站视频热门的因素,对于视频创作者、平台运营者等都具有重要的实际意义。首先,()。

本次研究旨在深入剖析Bilibili每周必看视频的特征和趋势,通过对数据的综合分析,探索背后隐藏的规律和用户行为,为未来的新人up主提供方向。我将深入挖掘最受欢迎的视频类型、探索合作视频与播放量之间的关系、视频的时长带来不同影响,以期揭示Bilibili社区的内容风向,预测某一视频的未来可能热度,为平台运营和内容创作者供有力的数据支持。

1.2 选题背景

随着互联网技术的迅猛发展,视频分享平台如哔哩哔哩(B站)已成为广大用户获取信息和娱乐的重要途径。这些平台提供了海量的视频内容,满足了用户多样化的需求。了解每周热门视频的趋势和特征、通过分析每周的热门视频数据,可以洞察用户的喜好、关注的领域、内容创作者的表现等,从而为有计划成为up主的人提供有价值的数据支持与选题的帮助。而Spark作为一个快速、通用的大数据处理框架,在数据分析领域具有广泛的应用。利用Spark对B站视频数据进行分析和挖掘,能够高效地处理大规模数据集,提取有价值的信息和规律,为探索视频热门的因素提供有力的技术支持。

第二章 相关理论知识技术(略)

(因为是用python爬虫爬取数据,hadoop集群存储,使用spark分析,数据可视化是用的pyechars,加上文档老师设置了页数限制,所以就简写了这四个,这里就略过不写啦)

第三章 系统总体设计

3.1 总体框架图

(其实有的表述不是很准确,主要是太长放不下,为了美观~)

3.2 系统模块分析

数据采集模块:首先使用python爬虫获取bilibili每周必看数据。

数据处理模块:分别对数据进行数据清洗、创新新特征值,数据整合。

数据分析模块:分别使用spark core(RDD)、spark SQL(DF)和spark streaming对数据进行词频统计、排序、平均值计算、分类统计等等处理,并根据结果进行分析数据的意义。

数据可视化模块:使用PyRcharts分别使用柱状图,饼图,折线图综合图进行展示。

机器学习:使用spark MLlib的NaiveBayes进行的简易朴素贝叶斯分类器分别训练模型、预测数据

3.3 系统开发(运行)坏境

Hadoop-2.7.4、Spark-2.4.0、Jdk-1.8.0_201、python(3.6和3.11)Pycharm-2024.1、Mysql80、Pyechars-2.0.6

第4章 数据采集与预处理

4.1 数据爬取

实现的主要功能是从Bilibili的API接口获取“每周必看”系列视频list列表中的数据,并将这些数据整合到一个Pandas DataFrame中,最后将这些数据保存到一个文件中。

核心代码:

for num in range(244, 274):#for循环遍历要爬的最新30期的API接口
    url=f"https://api.bilibili.com/x/web-interface/popular/series/one?number={num}"
    headers = {"User-Agent": "(略,填入自己的)"}#请求头,通过设置请求头实现安全访问
    response_json = requests.get(url=url, headers=headers).json()
    #使用requests.get()方法发送GET请求,获取JSON格式的响应数据。
    df = pd.json_normalize(response_json['data']['list'], errors='ignore')
    df['num'] = num#将响应数据中的data['list']部分、解析为DataFrame df。    content = pd.concat([content, df])
    print("第", num, "期,本期", df.shape[0], "条,总计", content.shape[0], "条")
    time.sleep(random.randint(2, 8))#模拟人类点击行为,避免过快发送请求。
content = content.loc[:,[爬取数据列名,此处省略]]
# 写入原文件前清除openpyxl不支持的字符
for col in content.columns:#遍历content DataFrame的每一列ILLEGAL_CHARAC
#TERS_RE正则表达式移除Excel不支持的字符。
    content[col]=content[col].apply(lambda x:ILLEGAL_CHARACTERS_RE.sub(r'', str(x) if not pd.isna(x) else ''))

(这里仅展示了部分核心代码,主要参考了这位大大的代码http://t.csdnimg.cn/8IrzD)

结果展示:

展示了20行代码的结果,经过观察,数据集包含1291行60列,其中存在大量无意义或无价值的数据。初步获取的数据表现出了相当高的复杂性和冗余性。为了使这些数据更易于后续使用,我需要进行一系列的数据预处理工作,以去除冗余信息并进行必要的处理。(好叭,我承认是为了让数据清洗变得有意义,所以把api中列表的所有值都尽力爬取下来了)

4.2 数据预处理-数据清洗

通过观察,有价值的数据列包括["num", "owner.name", "owner.mid", "title", "desc", "rcmd_reason", "tname", "stat.view", "stat.like", "stat.shar", "duration", "pub_location"]。使用DataFrame这一高效处理数据结构数据的组件,根据表名提取有价值的数据列,将处理后的数据存储于HDFS中。

核心代码:

因为这块我是通过ssh连接到了pycharm上运行的代码,直接在虚拟机上pyspark运行的话是不用加1、2、5这三行的后面也是这样。

import os
os.environ['JAVA_HOME'] = "/home/servers/jdk"
from pyspark.sql import SparkSession
# 创建一个SparkSession对象
spark = SparkSession.builder.appName("RenameColumnsExample").getOrCreate()
# 创建DataFrame df,它包含60列(这里实我设置的ip和地址)
df = spark.read.csv("hdfs://192.168.30.133:8020/homework/Odata/input/bilibili_1291.csv", header=True, inferSchema=True)
# 需要的12列的列名
column_names = ["num", "owner_name", "owner_mid", "title",
                "rcmd_reason", "tname","stat_danmaku","stat_view", "stat_like", "stat_share",
                "duration","rights_is_cooperation","pub_location"]
# 使用select方法提取这些列
df_selected = df.select(column_names)
# 显示提取后的DataFrame
df_selected.show()
# 将结果保存为CSV文件并上传到hdfs
df_selected.write.csv("hdfs://192.168.30.133:8020/homework/Odata/cleanData", header=True)
# 停止SparkSession
spark.stop()

结果展示:

4.3 数据预处理-创新特征值

图中duration是以秒为单位的视频时长,但视频时长的具体值不是一个很合适的研究对象,因为事实上并不需要精确到每一秒来评判是否会带来更多的播放量,所以就以300秒(五分钟)为界将数据转换为0和1,1代表短视频,0代表长视频。

代码展示:

import os
os.environ['JAVA_HOME'] = "/home/servers/jdk"
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col
# 初始化SparkSession
spark = SparkSession.builder.appName("AddShortVideoFeature").getOrCreate()
# 创建dataframe,并去除第一行
df = spark.read.csv("hdfs://192.168.30.133:8020/homework/Odata/cleanData/bilibili-clean.csv", header=True, inferSchema=True)
# 添加一个名为is_short_video的新列,如果duration <= 300则为1,否则为0
df_with_feature = df.withColumn("is_short_video", when(col("duration") <= 300, 1).otherwise(0))
# 选择需要的列(title和新的is_short_video),并将结果保存为CSV文件
(df_with_feature.select("title", "is_short_video")
 .write.csv("hdfs://192.168.30.133:8020/homework/Odata/cleanData/newData", header=True))

spark.stop()

结果展示:

4.4 数据预处理-数据整合

在数据清洗与数据转换换特征值的过程中产生了两个csv文件,为了便于后续分析需要将连个数据连接到一个文件中。因此采用spark的内连接方式,更具文件的title作为键值,将内容整合为一个文件。

代码展示:

from pyspark.sql import SparkSession
# 初始化SparkSession
spark = SparkSession.builder.appName("MergeData").getOrCreate()
# 读取第两个CSV文件
df1 = spark.read.csv("hdfs://192.168.30.133:8020/homework/Odata/cleanData/bilibili-clean.csv", header=True, inferSchema=True)
df2 = spark.read.csv("hdfs://192.168.30.133:8020/homework/Odata/cleanData/newData", header=True, inferSchema=True)
# 合并两个DataFrame,使用title作为键
merged_df = df1.join(df2, df1[3] == df2[0], "inner")
# 因为两个DataFrame都有title字段,合并后会有冲突,需要选择需要的字段
# 可以使用select方法显式地选择想要的列
selected_columns = [
    "num","owner_name","owner_mid","title","rcmd_reason",
    "tname","stat_danmaku","stat_view","stat_like","stat_share",
    "duration","rights_is_cooperation","pub_location","is_short_video"
]
# 选择这些列,并创建一个新的DataFrame
final_df = merged_df.select(selected_columns)
# 写入到新的CSV文件
final_df.write.csv("hdfs://192.168.30.133:8020/homework/Odata/bilibili_last.csv", header=True)
spark.stop()

结果展示:

第5章 数据分析与可视化

(数据分析这里还做了一个使用套接字流方式的sparkstream使用和一个top播放量排序,但是对于分析意义不是很好没有采用)

5.1 探索更受欢迎的视频类型(spark streaming)

使用spark streaming对数据进行词频统计和spark sql对数据排序。首先使用SparkSession读取CSV文件,将数据转换为RDD提取特定列的数据,RDD队列流传递给StreamingContext进行词频统计,统计结果使用rdd以降序排序并转换为DataFrame,然后写入CSV文件。

(spark stream一般处理的是流式数据,一般是已经爬好的文件数据源,是没有“窗口输入”这个动作的,实在是为用到这个技术而用。在使用rdd队列流以每2秒向窗口传递数据的方式模拟streaming,并实现词频统计计算。后续词频统计的结果不能直接用于分析,又添加了降序排序功能)

代码展示:

首先要先在终端中写入文件:

cd /home/spark/pythoncode/ #一个存放代码的地址
vi RDDstreamming.py

然后写入如下:

from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("RDDStreaming").getOrCreate()
sc = spark.sparkContext
def getcsv(spark, csv_path):
    df = spark.read.csv(csv_path, header=True, inferSchema=True)  # 假设CSV有表头,并且你想使用Spark的schema推断
    rdd = df.rdd.map(lambda row: row[5])  # 假设DataFrame的索引从0开始,且你想获取第6列
    return rdd.collect()  # 将RDD的数据收集到驱动程序中,用于模拟流
csv_path ="file:///home/data/bilibili/bilibili_last.csv"
def update_func(time, rdd):
    if not rdd.isEmpty():
        # 使用flatMap分词,map将每个词映射为(word, 1)的元组,reduceByKey进行词频统计
        word_counts = rdd.flatMap(lambda line: line.split(" ")) \
                          .map(lambda word: (word, 1)) \
                          .reduceByKey(lambda a, b: a + b)
        sorted_rdd = word_counts.sortBy(lambda x: x[1], ascending=False)
        word_counts_df = spark.createDataFrame(sorted_rdd, ["word", "count"])

        # 可以指定一个基于时间的文件名或目录来避免文件覆盖
        output_path = "file:///home/data/bilibili/result/20240626" + ".csv"
        word_counts_df.write.csv(output_path, header=True)
        # 输出或保存结果(这里只是打印)
        word_counts.foreach(print)
if __name__ == "__main__":
    ssc = StreamingContext(spark.sparkContext, 2)
    rddQueue = [getcsv(spark, csv_path)]
    inputStream = ssc.queueStream(rddQueue)
    dStream = inputStream.foreachRDD(update_func)
    ssc.start()
    ssc.stop(stopSparkContext=True,stopGraceFully=True)

在终端运行输入:

/home/spark-local/bin/spark-submit RDDstream.py

结果展示:

** 数据可视化:**

import pandas as pd  # 导入pandas库,用于数据处理
from pyecharts.charts import Bar  # 导入pyecharts库中的Bar类,用于绘制柱状图
import pyecharts.options as opts  # 导入pyecharts库中的options模块,用于设置图表的各种选项
from pyecharts.commons.utils import JsCode  # 导入pyecharts库中的JsCode类,虽然在这段代码中并未直接使用

# 读取CSV文件
df = pd.read_csv('file:///home/spark025/data/bilibili/result/word_counts_20240626.csv/type.csv', encoding='gbk')
# # 从DataFrame中提取'word'列'count'的数据到列表attr中,表示柱状图的x轴标签
attr = df['word'].tolist()
v1 = df['count'].tolist()
# 创建柱状图
bar = Bar()# 创建一个Bar对象,用于绘制柱状图
bar.add_xaxis(attr)# 添加x轴数据
bar.add_yaxis("", v1)# 添加y轴数据,第一个参数为系列名称(这里为空字符串)
# 设置全局配置项
bar.set_global_opts(
    title_opts=opts.TitleOpts(title="视频类型统计"),# 设置标题
    xaxis_opts=opts.AxisOpts(axislabel_opts=opts.LabelOpts(rotate=45, font_size=12)),# 设置x轴选项,包括标签旋转角度和字体大小
    yaxis_opts=opts.AxisOpts(name="数量"),  # 设置y轴选项,包括y轴名称
    datazoom_opts=[opts.DataZoomOpts(type_="slider", range_start=0, range_end=100)],  # 添加数据缩放组件,类型为滑块,并设置初始范围
)
bar.render_notebook()
bar.render('type.html')

(用pyEcharts的bar()函数做的柱状图,但是直接可视化效果如上图中左侧图,展示效果不是很好。因此为其添加了dataZoom 效果,这样能够滑动选择显示范围效果如上图右侧)

结果分析

根据结果内容分析,B站用户偏爱的前五大视频类型依次为:搞笑、游戏、日常、手书和历史。 反映出用户对于轻松日常和游戏娱乐内容的浓厚兴趣。同时,人文历史类视频位列第四,表明除了娱乐需求外,用户也对历史人文知识有一定的关注和需求,他们期望在观看视频的过程中能够增长知识。 一些相对小众的视频如DIY手工、剧场、音乐甚至学习与财金内容在bilibili也有广泛的接受度

5.2 对比合作与非合作视频的平均播放量(spark core)

使用spark RDD分别计算合作视频与非合作视频的总播放量,然后使用avg分别计算平均值,最后输出平均值与比较结果。

(就是RDD嘛~很简单,但是能感觉出来用RDD代码比用DF长好多)

**代码展示 **

from pyspark.sql import SparkSession
# 初始化Spark
sc = SparkContext("local", "VideoAnalysis")
spark = SparkSession.builder.appName("VideoAnalysis").getOrCreate()
# 读取CSV文件并创建RDD
line = sc.textFile("file:///home/data/bilibili/bilibili_last.csv")
header025=line.first()
rdd= line.filter(lambda line: line != header025)
# 解析每行数据
def parse_line(line):
    fields = line.split(',')
    stat_view = int(fields[7])  # 假设stat_view是第8列(索引从0开始)
    rights_is_cooperation = int(fields[11])  # 假设rights_is_cooperation是第12列
    return (rights_is_cooperation, stat_view)  # 返回(合作/非合作标识, 播放量)
# 过滤并转换RDD,移除可能的空行或格式不正确的行
filtered_rdd = rdd.map(parse_line).filter(lambda x: isinstance(x[0], int) and isinstance(x[1], int))
# 分割为合作视频RDD和非合作视频RDD
cooperation_rdd = filtered_rdd.filter(lambda x: x[0] == 0)
non_cooperation_rdd = filtered_rdd.filter(lambda x: x[0] == 1)
# 分别计算合作视频和非合作视频的平均播放量
avg_cooperation_views = round(cooperation_rdd.map(lambda x: x[1]).mean(), 2)
avg_non_cooperation_views = round(non_cooperation_rdd.map(lambda x: x[1]).mean(), 2)
print(f"合作视频的平均播放量是: {avg_cooperation_views}")
print(f"非合作视频的平均播放量是: {avg_non_cooperation_views}")
# 比较播放量
if avg_cooperation_views > avg_non_cooperation_views:
    print("合作视频的平均播放量更高")
elif avg_cooperation_views < avg_non_cooperation_views:
    print("非合作视频的平均播放量更高")
else:
    print("合作视频和非合作视频的平均播放量相同")
# 创建一个包含平均播放量的DataFrame
result_df = spark.createDataFrame([
    ("合作视频", avg_cooperation_views),
    ("非合作视频", avg_non_cooperation_views)
], ["video_type", "average_views"])
# 将结果写入CSV文件
result_df.write.csv("hdfs://192.168.30.133:8020/homework/Odata/fenxi/heRDD.csv", header=True, mode="overwrite")
print("文件保存完毕")
# 停止Spark会话
spark.stop()
sc.stop()

数据可视化:

(pyecharts的pie扇形图)

import os
os.environ['JAVA_HOME'] = "/home/servers/jdk"
from pyecharts.charts import Pie
from pyecharts import options as opts
from pyspark.sql import SparkSession
# 使用SparkSession读取CSV文件,并启用header和inferSchema选项
spark = SparkSession.builder.appName("ReadCSV").getOrCreate()
df = spark.read.csv("file:///home/data/bilibili/result/cooperation/cooperation.csv", header=True,
                    inferSchema=True)
type_data = df.rdd.map(lambda row: row[0]).collect()
values_data = df.rdd.map(lambda row: row[1]).collect()
# 创建一个饼图对象
pie = Pie()
# 添加数据和配置项,注意在新版pyecharts中,add方法通常接收一个字典作为参数
pie.add("", [(category, value) for category, value in zip(type_data, values_data)])
# 设置全局配置项
pie.set_global_opts(title_opts=opts.TitleOpts(title="合作投稿对比图"))
pie.render_notebook()
# 如果您在Python脚本中运行,则使用以下代码生成HTML文件
pie.render("pie_operation.html")

结果展示

** 结果分析**

据结果可以看到,合作视频的播放量会稍比非合作视频播放量高出一些,但是相差没有特别的大,因此可以看出,同过合作视频既联合投稿的方式可以增加的一定的播放量,但是影响相对没有那么的大,没有联合投稿的视频也可以获得一定的播放量。

5.3 分析长短视频类型的受欢迎度和发展(spark SQL)

使用spark SQL分别查询每期视频的数量和每个视频的播放量,计算每期的短视频和长视频的数量和播放量,最后计算每期每个短视频和长视频的平局播放量。通过短视频与长视频的比较,与日期增长的探究长视频与短视频的对播放量的影响及发展状况。

代码展示:

import os
os.environ['JAVA_HOME'] = "/home/servers/jdk"
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum,when,round,count

# 初始化 SparkSession
spark = SparkSession.builder.appName("VideoAnalysis").getOrCreate()
# 读取 CSV 文件
df = spark.read.csv("file:///home/data/bilibili/bilibili_last.csv", header=True, inferSchema=True)
df = df.na.drop(subset=["num", "stat_view", "stat_danmaku", "is_short_video"])
grouped_df = df.groupBy("num").agg( ## 分组计算,计算每周的总视频数、短视频数、总播放量和总弹幕量
    count("*").alias("total_videos"),
    sum(when(col("is_short_video") == 1, 1).otherwise(0)).alias("short_videos"),
    sum(when(col("is_short_video") == 0, 1)).alias("long_videos"),
    sum(when(col("is_short_video") == 1, col("stat_view"))).alias("totalShort_views"),
    sum(when(col("is_short_video") == 0, col("stat_view"))).alias("totalLong_views")
)
# 计算短视频数量占比、平均播放量和平均弹幕量,使用 Window 函数或直接在 DataFrame 上进行操作也是可行的,但这里为了简单起见直接计算
final_df = grouped_df.withColumn("avgShort_views", round(col("totalShort_views") / col("short_videos"), 2)) \
    .withColumn("avgLong_views", round(col("totalLong_views") / col("long_videos"), 2))
# 显示结果
final_dfs = final_df.orderBy('num', ascending=True)
final_dfs.show()
# 保存结果到 CSV 文件
final_dfs.write.csv("file:///home/data/bilibili/result/LoS", header=True)
# 停止 SparkSession
spark.stop()

结果展示:

(show()方法结果显示前二十行)

可视化代码:

import pandas as pd
from pyecharts.charts import Bar, Line
from pyecharts import options as opts
# 读取CSV文件
df = pd.read_csv('file:///home/spark025/data/bilibili/result/LoS/LoS.csv')
# 提取数据
x = df['num'].astype(str).tolist()
short_videos = df['short_videos'].tolist()
long_videos = df['long_videos'].tolist()
avgShort_views = df['avgShort_views'].tolist()
avgLong_views = df['avgLong_views'].tolist()
# 创建柱状图
bar = Bar()
bar.add_xaxis(x)
bar.add_yaxis('Short Videos', short_videos, label_opts=opts.LabelOpts(is_show=False), itemstyle_opts=opts.ItemStyleOpts(opacity=0.7))
bar.add_yaxis('Long Videos', long_videos, label_opts=opts.LabelOpts(is_show=False), itemstyle_opts=opts.ItemStyleOpts(opacity=0.7))
bar.set_global_opts(
    title_opts=opts.TitleOpts(title='视频统计'),
    tooltip_opts=opts.TooltipOpts(is_show=True, trigger='axis', axis_pointer_type='cross'),
    xaxis_opts=opts.AxisOpts(type_='category', axispointer_opts=opts.AxisPointerOpts(is_show=True, type_='shadow')),
    yaxis_opts=opts.AxisOpts(name='Video Count')
)
# 扩展第二个Y轴用于 avgShort_views
bar.extend_axis(
    yaxis=opts.AxisOpts(
        name='Average Short Video Views',
        min_=0,
        max_=max(avgShort_views) + 1000000,  # 设置最大值,确保所有数据点都能显示
        interval=1000000,
        axislabel_opts=opts.LabelOpts(formatter='{value}')
    )
)
# 创建折线图
line = Line()
line.add_xaxis(x)
line.add_yaxis('Avg Short Video Views', avgShort_views, yaxis_index=1, label_opts=opts.LabelOpts(is_show=False), linestyle_opts=opts.LineStyleOpts(width=2))
line.add_yaxis('Avg Short avgLong_views', avgLong_views, yaxis_index=2, label_opts=opts.LabelOpts(is_show=False), linestyle_opts=opts.LineStyleOpts(width=2, type_='dashed', color='blue'))
# 合并图表
bar.overlap(line)
# 渲染图表
bar.render_notebook()

bar.render('output.html')

可视化展示:

分析结果:

更直观的看到随着时间的推移,特别是近半年内,短视频的上榜数量呈现出显著的上涨趋势,并且这些短视频的总体播放量也超越了长视频,成为了不可忽视的一部分。

这一现象表明,尽管b站不是一个以短视频为主要内容的平台,但b站也在逐渐适应并迎合当今时代人们对于视频消费的需求和喜好。短视频以其简洁、直观、易于传播的特点,吸引了大量用户的关注和喜爱,而b站作为一个内容多元化的平台,也在积极拥抱这一趋势,为用户提供更加丰富多样的视频内容。

第6章 基于spark ML的简易朴素贝叶斯分类器

使用spark ML根据训练集的视频类型,是否为合作视频和是否为短视频作为特征值,并创建了一个新特征值hot_video (1代表播放量超过300万,0代表没超过),使用saprk MLib的Naive Bayes训练一个简易的朴素贝叶斯模型,评估模型的进准度。根据上述的分析结果创建一个简易的测试数据,进行预测。下图是对源文件进一步处理后的数据。

观察我的数据,经过我的处理只有一列是字符型,其他都是数字(int)型,因此实际上只有字符型需要进行转化处理。所以先通过使用df读取csv文件。将字符串转化为索引值,使用sparkML的特征对的形式,将转化好的索引形式特征与两数字形式的特征付给特征featureIndexer。最后转换数据输入模型。使用70训练模型与30%用于预测模型。并使用evaluator模型对模型的准确度进行评估。

import os
os.environ['JAVA_HOME'] = 
from pyspark.sql import SparkSession
from pyspark.ml.classification import NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import StringIndexer, VectorAssembler, VectorIndexerModel
 # 初始化SparkSession
spark = SparkSession.builder.appName("NaiveBayesClassifierExample").getOrCreate()
# 读取CSV文件(文件包含'id', 'tname', 'rights_is_cooperation', 'is_short_video', 'hot_video'列)
df = spark.read.option("header", "true") \
    .option("inferSchema", "true") \
    .csv("file:///home/data/bilibili/bilibili_ml.csv")

#将字符串列(通常是分类标签)转换为索引列
tnameIndexer = StringIndexer(inputCol="tname", outputCol="tname_index").fit(df)
# 使用VectorAssembler将特征列转换为Vector类型,这里只包含tname的索引列,因为rights_is_cooperation和is_short_video已经是整数类型
featureIndexer = VectorAssembler(
    inputCols=["tname_index", "rights_is_cooperation", "is_short_video"],
    outputCol="features" #输出特征的名字为features
)
# 转换数据
indexedDF = featureIndexer.transform(tnameIndexer.transform(df))
# 拆分数据集为训练集和测试集
(trainingData, testData) = indexedDF.randomSplit([0.7, 0.3])
#训练朴素贝叶斯模型
nb = NaiveBayes(featuresCol="features", labelCol="hot_video")  # DataFrame中包含了特征(features)
model = nb.fit(trainingData)
# 预测
predictions = model.transform(testData)
# 评估模型(使用原始的标签列)
evaluator = MulticlassClassificationEvaluator(
    labelCol="hot_video", predictionCol="prediction", metricName="accuracy"
)
# 使用'prediction'列(它也是整数型的)与'hot_video'列(原始的标签)
accuracy = evaluator.evaluate(predictions)
print(f"Test Accuracy = {accuracy}")
testData.show(5)

模型应用

test_data = [("搞笑", 0, 1, 1)]
# 将测试数据转换为DataFrame
test_df = spark.createDataFrame(test_data, ["tname", "rights_is_cooperation", "is_short_video", "hot_video"])
# 索引和转换测试数据(与训练数据相同的方式)
test_df = tnameIndexer.transform(test_df)
test_df = featureIndexer.transform(test_df)
# 选择features和label列,因为模型需要这些列进行预测
test_df_selected = test_df.select("features", "hot_video")
# 现在,你可以使用模型对测试数据进行预测
predictions = model.transform(test_df_selected)
# 显示预测结果
predictions.show()
spark.stop()

结果展示

结果分析

模型的准确度只有大约53%。可以肯定一个视频的播放量肯定不仅仅只与这是三个因素有关,甚至还会与更多因素有关,但是(希望)通过这个简易的模型能够展示自学能力(机器学习和spark mlib还没有学过)与对于技术的掌握,完成作业要求中“创新性”的评分(虽然最后实际得分一般般)。

总结

一些可能用到的命令

hdfs上传文件
hdfs dfs -put /  /

hdfs上的文件重命名
hadoop fs -mv
标签: spark 大数据

本文转载自: https://blog.csdn.net/anxian24/article/details/140388182
版权归原作者 弦弦爱吃年糕QAQ 所有, 如有侵权,请联系我们删除。

“大学期末大作业-基于spark探索b站每周必看视频热门的因素”的评论:

还没有评论