以下是一篇关于上述代码的博客文章:
基于PySpark的电影推荐系统实现与分析
在当今数字化时代,个性化推荐系统在各个领域中都发挥着至关重要的作用,尤其是在娱乐行业,如电影推荐。本文将详细介绍如何使用PySpark构建一个简单的电影推荐系统,并对代码进行深入分析。
一、环境准备
在开始我们的电影推荐之旅前,需要正确配置运行环境。这涉及到设置一系列的环境变量,确保PySpark能够顺利运行。
os.environ['JAVA_HOME']='C:/Program Files/Java/jdk1.8.0_241'
os.environ['HADOOP_HOME']='D:/hadoop-3.3.1'
os.environ['PYSPARK_PYTHON']='C:/ProgramData/Miniconda3/python.exe'
os.environ['PYSPARK_DRIVER_PYTHON']='C:/ProgramData/Miniconda3/python.exe'
os.environ['HADOOP_USER_NAME']='root'
os.environ['file.encoding']='UTF-8'
这里,我们指定了Java的安装路径、Hadoop的路径、Python解析器路径以及一些其他相关的环境变量。这些设置是为了让PySpark能够找到所需的依赖和正确处理数据编码等问题。
二、创建SparkSession
SparkSession
是使用PySpark的入口点,它允许我们与Spark集群进行交互并执行各种数据处理操作。
spark = SparkSession.builder.appName("电影推荐案例")\
.master("local[*]").config("spark.sql.shuffle.partitions","4").getOrCreate()
通过
builder
模式,我们为这个Spark应用程序命名为“电影推荐案例”,并设置运行模式为本地(
local[*]
,表示使用本地所有可用核心),同时配置了
spark.sql.shuffle.partitions
的值为
4
。这个参数控制了在数据混洗(shuffle)操作时的分区数量,会影响数据处理的性能和资源分配。
三、数据读取与处理
我们从一个文本文件中读取用户对电影的评分数据。
df1 = spark.read.text("../../../data/input/u.data")
df1.printSchema()print(df1.take(10))
这里读取的数据格式是简单的文本文件,通过
printSchema
可以查看数据的结构(此时数据只有一个名为
value
的字符串列),
take(10)
则可以查看前10行数据内容。
接下来,我们需要对读取的数据进行处理,将每行数据按照制表符(
\t
)进行切割,并转换为包含
userId
、
movieId
和
score
三列的DataFrame。
defsplit_data(line):
arr = line.split("\t")return(int(arr[0]),int(arr[1]),int(arr[2]))
df2 = df1.rdd.map(lambda row: split_data(row.value)).toDF(["userId","movieId","score"])print(df2.take(1))
在这个过程中,我们首先定义了
split_data
函数,用于解析每行数据。然后通过
rdd.map
操作将
df1
中的每行数据应用
split_data
函数进行转换,最后将结果转换为指定列名的DataFrame。
四、数据划分
为了训练和评估我们的推荐模型,需要将数据划分为训练集和测试集。
train_data, test_data = df2.randomSplit([0.8,0.2])
这里使用
randomSplit
方法将
df2
中的数据按照80%和20%的比例随机划分为训练集和测试集。这种随机划分可以保证数据的随机性,使得训练出的模型更具泛化能力。
五、ALS模型训练
交替最小二乘法(ALS)是一种常用的推荐算法,在我们的电影推荐系统中,我们使用PySpark的
ALS
实现来训练模型。
als = ALS(userCol="userId",
itemCol="movieId",
ratingCol="score",
rank=10,
maxIter=10,
alpha=1.0)
model = als.fit(train_data)
我们指定了
userCol
(用户ID列)、
itemCol
(项目,即电影ID列)、
ratingCol
(评分列)等参数,同时设置了
rank
(可以理解为模型的潜在因子数量)、
maxIter
(最大迭代次数)和
alpha
(步长相关参数)。然后使用训练集
train_data
对模型进行训练。
六、模型推荐与结果展示
我们可以使用训练好的模型进行多种类型的推荐。
6.1 给所有用户或所有电影推荐
# df3 = model.recommendForAllUsers(5) # 给所有用户推荐5部电影# df4 = model.recommendForAllItems(5) # 给所有电影推荐5个用户# df3.show(truncate=False)# df4.show(truncate=False)
这里注释掉的代码展示了可以使用训练好的模型给所有用户推荐指定数量的电影,或者给所有电影推荐指定数量的用户的功能。
6.2 给特定用户或特定电影推荐
df5 = model.recommendForUserSubset(spark.createDataFrame([(653,)],["userId"]),5)
df6 = model.recommendForItemSubset(spark.createDataFrame([(411,)],["movieId"]),5)
df5.show(truncate=False)
df6.show(truncate=False)
在这部分,我们为特定用户(用户ID为653)推荐5部电影,以及为特定电影(电影ID为411)推荐5个用户,并展示推荐结果。通过
show(truncate=False)
可以完整地查看推荐数据的内容。
七、提取推荐结果中的电影ID
最后,我们来看如何从推荐结果中提取电影ID信息。
df5.printSchema()defgetMovieIds(row):
tuijianFilms =[]
arr = row.recommendations
for ele in arr:print(ele.movieId)
tuijianFilms.append(ele.movieId)print("推荐的电影有:", tuijianFilms)
df5.foreach(getMovieIds)
首先,
printSchema
显示了
df5
的结构,其中包含了推荐信息。然后我们定义了
getMovieIds
函数,通过遍历每行数据中的推荐列表,提取出电影ID并打印出来。通过
df5.foreach(getMovieIds)
将这个函数应用到
df5
的每一行数据上,从而实现对推荐电影ID的提取和打印。
通过以上步骤,我们完成了一个简单的基于PySpark的电影推荐系统的构建和分析。这个系统可以根据用户的历史评分数据,利用ALS算法为用户推荐可能感兴趣的电影,同时也展示了PySpark在数据处理和机器学习模型训练方面的强大功能。在实际应用中,可以进一步优化模型参数、改进数据处理流程以及提升用户体验等,以构建更高效、准确的推荐系统。
希望这篇博客能够帮助读者理解如何使用PySpark实现电影推荐系统,并且对其中的代码逻辑有更深入的了解。如果有任何问题,欢迎在评论区留言。
代码总结:
import os
from os import truncate
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
if __name__ == '__main__':
os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
# 配置Hadoop的路径,就是前面解压的那个路径
os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
# 配置base环境Python解析器的路径
os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe' # 配置base环境Python解析器的路径
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
os.environ['HADOOP_USER_NAME'] = 'root'
os.environ['file.encoding'] = 'UTF-8'
# 准备环境
spark = SparkSession.builder.appName("电影推荐案例")\
.master("local[*]").config("spark.sql.shuffle.partitions","4").getOrCreate()
#读取数据,然后切割数据
df1 = spark.read.text("../../../data/input/u.data")
df1.printSchema()
print(df1.take(10))
# 根据\t 切割文件中的数据
def split_data(line):
arr =line.split("\t")
return (int(arr[0]),int(arr[1]),int(arr[2]))
df2 = df1.rdd.map(lambda row:split_data(row.value)).toDF(["userId", "movieId", "score"])
print(df2.take(1))
# 将我们的数据分为两部分,80% 用于训练,20% 用于测试
train_data, test_data = df2.randomSplit([0.8, 0.2])
"""
rank 可以理解为:可以理解为Cm*n = Am*k X Bk*n 里面的k的值
maxIter:最大迭代次数
alpha : 步长
"""
als = ALS(userCol="userId",
itemCol="movieId",
ratingCol="score",
rank=10,
maxIter=10,
alpha=1.0)
# 使用训练集训练模型
model = als.fit(train_data)
# 将训练好的模型进行数据推荐
# df3 = model.recommendForAllUsers(5) # 给所有用户推荐5部电影
# df4 = model.recommendForAllItems(5) # 给所有电影推荐5个用户
# df3.show(truncate=False)
# df4.show(truncate=False)
# 给某个用户推荐电影
df5 = model.recommendForUserSubset(spark.createDataFrame([(653,)],["userId"]),5)
df6 = model.recommendForItemSubset(spark.createDataFrame([(411,)],["movieId"]),5)
# 给某个电影推荐用户
df5.show(truncate=False)
df6.show(truncate=False)
# 如何把df5中的数据提取为 字符串
df5.printSchema()
def getMovieIds(row):
tuijianFilms = []
arr = row.recommendations
for ele in arr:
print(ele.movieId)
tuijianFilms.append(ele.movieId)
print("推荐的电影有:",tuijianFilms)
df5.foreach(getMovieIds)
执行代码后,如果报如下错误:
ModuleNotFoundError: No module named 'numpy'
说明python中缺少numpy模块,需要下载:
pip install numpy
版权归原作者 jlting195 所有, 如有侵权,请联系我们删除。