0


构建基于Spark-ML和Scala的协同过滤推荐系统

好久都没更新了,所以今天翻译一篇文章凑个数吧:

https://medium.com/rahasak/collaborative-filtering-based-book-recommendation-system-with-spark-ml-and-scala-1e5980ceba5e

本文代码 https://gitlab.com/rahasak-labs/dot

作者:λ.erang

在这篇文章中,我将讨论使用协同过滤机器学习方法构建图书推荐系统。Spark-ML中的协同过滤模型采用交替最小二乘(ALS)算法。这篇文章的源代码和数据集可以在gitlab上找到。

推荐系统

推荐系统使用机器学习算法向特定用户或客户推荐最相关的产品。它的工作原理是在消费者行为数据中寻找显示和隐式的模式。推荐系统最常见的示例是亚马逊上的产品推荐、Netflix 中电影和电视节目的推荐、YouTube 上的推荐视频、Spotify 上的音乐、Facebook 新闻和 Google Ads。在机器学习中,主要有两种用于提供推荐的主要技术,1)基于内容的过滤,2)协同过滤。

基于内容的过滤

基于内容的过滤会分析每个产品的属性,旨在找到数据的来识别用户偏好。基于内容推荐依靠产品自身的特征来进行推荐。比如用户生成的电影标签、产品颜色、文本描述、用户的评论、电影流派、导演、剧情介绍、演员等。这些推荐系统背后的一般思想是,如果一个人喜欢特定产品 ,他或她也会喜欢与之相似的产品。换句话说,这些算法尝试推荐与用户过去喜欢的项目相似的项目。这可以在 Netflix、Facebook Watch 等应用程序中看到,它们根据导演、演员等推荐下一部电影或视频。

协同过滤

协同过滤根据用户过去的经验和行为推荐项目。与基于内容的过滤不同,它不需要有关产品或用户本身的任何信息。协同过滤背后的关键思想是相似的用户有相似的兴趣,具有相似兴趣的人倾向于喜欢相似的产品。系统会匹配具有相似兴趣的人并基于该匹配进行推荐。这方面的主要示例是 Google Ads。协同过滤有两类,1)基于用户 2)基于产品。基于用户的协同过滤衡量目标用户与其他用户之间的相似度(计算用户之间的相似度)。基于产品的协同过滤衡量目标用户的产品与其他产品之间的相似度(计算产品之间的相似度)。协同过滤的一个问题是,它无法对没有评分的新产品(例如产品、电影、歌曲等)进行推荐(冷启动问题)。

考虑以下书籍推荐场景。用户 A 和 B 对 Book1 和 Book2 给予了很高的评价。然后它可以假设它们必须具有相似的品味。因此,用户 B 喜欢一本他/她没有遇到但被用户 A 评价很高的书的可能性更高。在这种情况下,用户 A 对 Book3 进行了评价。但用户 B 没有遇到它,因此推荐系统向用户 B 推荐 Book 3。

数据集

本篇文章的书籍推荐系统中使用了两个CSV数据集。一个数据集包含书籍的信息,另一个数据集包含书籍的用户评分。 完整的数据集可以在 Kaggle 中找到。

https://www.kaggle.com/somnambwl/bookcrossing-dataset

准备数据

首先,我已将 CSV 数据集加载到 Spark DataFrames 中并对其进行了一些数据处理。基本上我已经加入了书籍数据集和评级数据集。

  1. // context for spark
  2. valspark=SparkSession.builder
  3. .master("local[*]")
  4. .appName("lambda")
  5. .getOrCreate()
  6. // SparkSession has implicits
  7. importspark.implicits._
  8. // book schema
  9. valbookSchema=StructType(
  10. StructField("ISBN", StringType, nullable=true) ::
  11. StructField("Title", StringType, nullable=true) ::
  12. StructField("Author", StringType, nullable=true) ::
  13. StructField("Year", IntegerType, nullable=true) ::
  14. StructField("Publisher", StringType, nullable=true) ::
  15. Nil
  16. )
  17. // rating schema
  18. valratingSchema=StructType(
  19. StructField("USER-ID", IntegerType, nullable=true) ::
  20. StructField("ISBN", IntegerType, nullable=true) ::
  21. StructField("Rating", IntegerType, nullable=true) ::
  22. Nil
  23. )
  24. // read books
  25. valbookDf=spark.read.format("csv")
  26. .option("header", value=true)
  27. .option("delimiter", ";")
  28. .option("mode", "DROPMALFORMED")
  29. .schema(bookSchema)
  30. .load(getClass.getResource("/rec_books.csv").getPath)
  31. .cache()
  32. .as("books")
  33. bookDf.printSchema()
  34. bookDf.show(10)
  35. /*
  36. output
  37. root
  38. |-- ISBN: string (nullable = true)
  39. |-- Title: string (nullable = true)
  40. |-- Author: string (nullable = true)
  41. |-- Year: integer (nullable = true)
  42. |-- Publisher: string (nullable = true)
  43. +----------+--------------------+--------------------+----+--------------------+
  44. | ISBN| Title| Author|Year| Publisher|
  45. +----------+--------------------+--------------------+----+--------------------+
  46. |0195153448| Classical Mythology| Mark P. O. Morford|2002|Oxford University...|
  47. |0002005018| Clara Callan|Richard Bruce Wright|2001|HarperFlamingo Ca...|
  48. |0060973129|Decision in Normandy| Carlo D'Este|1991| HarperPerennial|
  49. |0374157065|Flu: The Story of...| Gina Bari Kolata|1999|Farrar Straus Giroux|
  50. |0393045218|The Mummies of Ur...| E. J. W. Barber|1999|W. W. Norton & Co...|
  51. |0399135782|The Kitchen God's...| Amy Tan|1991| Putnam Pub Group|
  52. |0425176428|What If?: The Wor...| Robert Cowley|2000|Berkley Publishin...|
  53. |0671870432| PLEADING GUILTY| Scott Turow|1993| Audioworks|
  54. |0679425608|Under the Black F...| David Cordingly|1996| Random House|
  55. |074322678X|Where You'll Find...| Ann Beattie|2002| Scribner|
  56. +----------+--------------------+--------------------+----+--------------------+
  57. */
  58. // read ratings
  59. valratingDf=spark.read.format("csv")
  60. .option("header", value=true)
  61. .option("delimiter", ";")
  62. .option("mode", "DROPMALFORMED")
  63. .schema(ratingSchema)
  64. .load(getClass.getResource("/rec_ratings.csv").getPath)
  65. .cache()
  66. .as("ratings")
  67. ratingDf.printSchema()
  68. ratingDf.show(10)
  69. /*
  70. output
  71. root
  72. |-- USER-ID: integer (nullable = true)
  73. |-- ISBN: integer (nullable = true)
  74. |-- Rating: integer (nullable = true)
  75. +-------+----------+------+
  76. |USER-ID| ISBN|Rating|
  77. +-------+----------+------+
  78. | 276726| 155061224| 5|
  79. | 276727| 446520802| 0|
  80. | 276729| 521795028| 6|
  81. | 276733|2080674722| 0|
  82. | 276737| 600570967| 6|
  83. | 276745| 342310538| 10|
  84. | 276746| 425115801| 0|
  85. | 276746| 449006522| 0|
  86. | 276746| 553561618| 0|
  87. | 276746| 786013990| 0|
  88. +-------+----------+------+
  89. */
  90. // join dfs
  91. valjdf=ratingDf.join(bookDf, $"ratings.ISBN"===$"books.ISBN")
  92. .select(
  93. $"ratings.USER-ID".as("userId"),
  94. $"ratings.Rating".as("rating"),
  95. $"ratings.ISBN".as("isbn"),
  96. $"books.Title".as("title"),
  97. $"books.Author".as("author"),
  98. $"books.Year".as("year"),
  99. $"books.Publisher".as("publisher")
  100. )
  101. jdf.printSchema()
  102. jdf.show(10)
  103. /*
  104. output
  105. root
  106. |-- userId: integer (nullable = true)
  107. |-- rating: integer (nullable = true)
  108. |-- isbn: integer (nullable = true)
  109. |-- title: string (nullable = true)
  110. |-- author: string (nullable = true)
  111. |-- year: integer (nullable = true)
  112. |-- publisher: string (nullable = true)
  113. +------+------+---------+--------------------+------------+----+--------------------+
  114. |userId|rating| isbn| title| author|year| publisher|
  115. +------+------+---------+--------------------+------------+----+--------------------+
  116. |277378| 0|971880107| Wild Animus|Rich Shapero|2004| Too Far|
  117. |277157| 0|971880107| Wild Animus|Rich Shapero|2004| Too Far|
  118. |277042| 2|971880107| Wild Animus|Rich Shapero|2004| Too Far|
  119. |276954| 0|971880107| Wild Animus|Rich Shapero|2004| Too Far|
  120. |276939| 0|971880107| Wild Animus|Rich Shapero|2004| Too Far|
  121. |276925| 0|971880107| Wild Animus|Rich Shapero|2004| Too Far|
  122. |277195| 0|375406328| Lying Awake|Mark Salzman|2000| Alfred A. Knopf|
  123. |276953| 10|446310786|To Kill a Mocking...| Harper Lee|1988|Little Brown & Co...|
  124. |277168| 0|440225701| The Street Lawyer|JOHN GRISHAM|1999| Dell|
  125. |276925| 0|804106304| The Joy Luck Club| Amy Tan|1994|Prentice Hall (K-12)|
  126. +------+------+---------+--------------------+------------+----+--------------------+
  127. */
  128. // do some filtering for test
  129. jdf.filter(col("userId") ==="277378")
  130. .limit(5)
  131. .show()
  132. /*
  133. output
  134. +------+------+---------+--------------------+--------------+----+------------+
  135. |userId|rating| isbn| title| author|year| publisher|
  136. +------+------+---------+--------------------+--------------+----+------------+
  137. |277378| 0|971880107| Wild Animus| Rich Shapero|2004| Too Far|
  138. |277378| 7|312195516|The Red Tent (Bes...| Anita Diamant|1998| Picador USA|
  139. |277378| 7|670892963|Bridget Jones : T...|Helen Fielding|2000|Viking Books|
  140. |277378| 0|671028375| Fatal Voyage| Kathy Reichs|2002| Pocket|
  141. |277378| 0|670894494|The Passion of Ar...|Susan Vreeland|2002|Viking Books|
  142. +------+------+---------+--------------------+--------------+----+------------+
  143. */

构建模型

我使用交替最小二乘(ALS)协同过滤算法来构建推荐模型。ALS 是一种非常流行的推荐算法。Apache Spark-ML 库提供的 ALS 算法实现。ALS 算法使用以下训练参数。

  • userCol - 用户 ID 的列名。Id 必须是(或可以被强制转换为)整数。
  • itemCol - 产品ID的列名。Id 必须是(或可以被强制转换为)整数
  • ratingCol - 产品评分的列名。。
  • maxIter - 最大迭代次数 (>= 0)。
  • regParam - ALS 中的正则化参数,默认为 1.0 (>= 0)。
  • iterations - 要运行的迭代次数。
  • ColdStartStrategy - 当运行预测并且没有为特定用户/评分训练模型或未为用户找到项目时,如果给定“drop”作为参数值,它将删除这些用户。
  1. // build recommendation model with als algorithm
  2. valals=newALS()
  3. .setMaxIter(5)
  4. .setRegParam(0.01)
  5. .setUserCol("userId")
  6. .setItemCol("isbn")
  7. .setRatingCol("rating")
  8. valalsModel=als.fit(trainingData)
  9. // evaluate the als model
  10. // compute root mean square error(rmse) with test data for evaluation
  11. // set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
  12. alsModel.setColdStartStrategy("drop")
  13. valpredictions=alsModel.transform(testData)
  14. valevaluator=newRegressionEvaluator()
  15. .setMetricName("rmse")
  16. .setLabelCol("rating")
  17. .setPredictionCol("prediction")
  18. valrmse=evaluator.evaluate(predictions)
  19. println(s"root mean square error $rmse")
  20. /*
  21. output
  22. root mean square error 5.44073307705168
  23. */

预测推荐

一旦构建了 ALS 模型并对其进行了训练,我们就可以使用它来进行用户和商品的推荐。以下是我为用户和书籍所做的各种建议。

  1. // top 10 book recommendations for each user
  2. valallUserRec=alsModel.recommendForAllUsers(10)
  3. allUserRec.printSchema()
  4. allUserRec.show(10)
  5. /*
  6. output
  7. root
  8. |-- userId: integer (nullable = false)
  9. |-- recommendations: array (nullable = true)
  10. | |-- element: struct (containsNull = true)
  11. | | |-- isbn: integer (nullable = true)
  12. | | |-- rating: float (nullable = true)
  13. +------+--------------------+
  14. |userId| recommendations|
  15. +------+--------------------+
  16. | 496|[[373244630, 0.0]...|
  17. | 1238|[[373244630, 0.0]...|
  18. | 2366|[[689831390, 20.6...|
  19. | 3918|[[373244630, 0.0]...|
  20. | 4900|[[689841981, 37.4...|
  21. | 6336|[[142003069, 26.6...|
  22. | 6357|[[1714600, 0.0], ...|
  23. | 6397|[[345368924, 17.4...|
  24. | 6466|[[873529758, 19.0...|
  25. | 7253|[[736413057, 12.9...|
  26. +------+--------------------+
  27. */
  28. // top 10 user recommendations for each book
  29. valallBookRec=alsModel.recommendForAllItems(10)
  30. allBookRec.printSchema()
  31. allBookRec.show(10)
  32. /*
  33. output
  34. root
  35. |-- isbn: integer (nullable = false)
  36. |-- recommendations: array (nullable = true)
  37. | |-- element: struct (containsNull = true)
  38. | | |-- userId: integer (nullable = true)
  39. | | |-- rating: float (nullable = true)
  40. +--------+--------------------+
  41. | isbn| recommendations|
  42. +--------+--------------------+
  43. | 2250810|[[263979, 44.0867...|
  44. | 2740958|[[177870, 0.0], [...|
  45. | 3701387|[[202919, 116.593...|
  46. |20427115|[[263979, 135.824...|
  47. |23417706|[[177870, 0.0], [...|
  48. |27780260|[[10, 0.0], [20, ...|
  49. |28604458|[[106208, 75.9024...|
  50. |28616340|[[253797, 41.2638...|
  51. |28639693|[[41111, 60.72071...|
  52. |28644417|[[10, 0.0], [20, ...|
  53. +--------+--------------------+
  54. */
  55. // top 10 book recommendations for specific set of users(3 users)
  56. valuserRec=alsModel.recommendForUserSubset(jdf.select("userId").distinct().limit(3), 10)
  57. userRec.printSchema()
  58. userRec.show(10)
  59. /*
  60. output
  61. root
  62. |-- userId: integer (nullable = false)
  63. |-- recommendations: array (nullable = true)
  64. | |-- element: struct (containsNull = true)
  65. | | |-- isbn: integer (nullable = true)
  66. | | |-- rating: float (nullable = true)
  67. +------+--------------------+
  68. |userId| recommendations|
  69. +------+--------------------+
  70. | 496|[[1714600, 0.0], ...|
  71. +------+--------------------+
  72. */
  73. // top 10 user recommendations for specific set of books(3 books)
  74. valbookRec=alsModel.recommendForItemSubset(jdf.select("isbn").distinct().limit(3), 10)
  75. bookRec.printSchema()
  76. bookRec.show(10)
  77. /*
  78. output
  79. root
  80. |-- isbn: integer (nullable = false)
  81. |-- recommendations: array (nullable = true)
  82. | |-- element: struct (containsNull = true)
  83. | | |-- userId: integer (nullable = true)
  84. | | |-- rating: float (nullable = true)
  85. +---------+--------------------+
  86. | isbn| recommendations|
  87. +---------+--------------------+
  88. |440226430|[[263979, 28.0991...|
  89. |385314744|[[114434, 81.7188...|
  90. |872860175|[[226926, 116.011...|
  91. +---------+--------------------+
  92. */
  93. // top 10 book recommendations for user 277378
  94. valudf=jdf.select("userId").filter(col("userId") ===277378).limit(1)
  95. valuserRec277378=alsModel.recommendForUserSubset(udf, 10)
  96. userRec277378.printSchema()
  97. userRec277378.show(10)
  98. /*
  99. output
  100. root
  101. |-- userId: integer (nullable = false)
  102. |-- recommendations: array (nullable = true)
  103. | |-- element: struct (containsNull = true)
  104. | | |-- isbn: integer (nullable = true)
  105. | | |-- rating: float (nullable = true)
  106. +------+--------------------+
  107. |userId| recommendations|
  108. +------+--------------------+
  109. |277378|[[1563526514, 47....|
  110. +------+--------------------+
  111. */
标签: Spark Scala

“构建基于Spark-ML和Scala的协同过滤推荐系统”的评论:

还没有评论