0


educoder-Spark机器学习

第1关:基于物品的推荐算法

  1. 给用户2推荐2个商品。利用spark.mllib中的矩阵计算库,构建用户与物品的打分矩阵,然后计算物品之间的相似分数,进行推荐。实现基于用户(User CF)的协同过滤算法。
  1. import org.apache.log4j.{Level, Logger}
  2. import org.apache.spark.mllib.linalg.SparseVector
  3. import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, IndexedRow, MatrixEntry, RowMatrix}
  4. import org.apache.spark.rdd.RDD
  5. import org.apache.spark.{SparkConf, SparkContext}
  6. object ItemBasedCF {
  7. def main(args: Array[String]): Unit = {
  8. Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  9. Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
  10. //读入数据
  11. val conf = new SparkConf().setAppName("ItemBasedCFModel").setMaster("local")
  12. val sc = new SparkContext(conf)
  13. val data = sc.textFile("/root/data/als/ratingdata.txt")
  14. /*MatrixEntry代表一个分布式矩阵中的每一行(Entry)
  15. * 这里的每一项都是一个(i: Long, j: Long, value: Double) 指示行列值的元组tuple。
  16. * 其中i是行坐标,j是列坐标,value是值。*/
  17. val parseData: RDD[MatrixEntry] =
  18. data.map(_.split(",") match { case Array(user, item, rate) => MatrixEntry(user.toLong, item.toLong, rate.toDouble) })
  19. //CoordinateMatrix是Spark MLLib中专门保存user_item_rating这种数据样本的
  20. val ratings = new CoordinateMatrix(parseData)
  21. /* 由于CoordinateMatrix没有columnSimilarities方法,所以我们需要将其转换成RowMatrix矩阵,调用他的columnSimilarities计算其相似性
  22. * RowMatrix的方法columnSimilarities是计算,列与列的相似度,现在是user_item_rating,与基于用户的CF不同的是,这里不需要进行矩阵的转置,直接就是物品的相似*/
  23. val matrix: RowMatrix = ratings.toRowMatrix()
  24. //需求:为某一个用户推荐商品。基本的逻辑是:首先得到某个用户评价过(买过)的商品,然后计算其他商品与该商品的相似度,并排序;从高到低,把不在用户评价过
  25. //商品里的其他商品推荐给用户。
  26. //例如:为用户2推荐商品
  27. //第一步:得到用户2评价过(买过)的商品 take(5)表示取出所有的5个用户 2:表示第二个用户
  28. //解释:SparseVector:稀疏矩阵
  29. val user2pred = matrix.rows.take(5)(2)
  30. val prefs: SparseVector = user2pred.asInstanceOf[SparseVector]
  31. val uitems = prefs.indices //得到了用户2评价过(买过)的商品的ID
  32. val ipi = (uitems zip prefs.values) //得到了用户2评价过(买过)的商品的ID和评分,即:(物品ID,评分)
  33. //计算物品的相似性,并输出
  34. val similarities = matrix.columnSimilarities()
  35. val indexdsimilar = similarities.toIndexedRowMatrix().rows.map {
  36. case IndexedRow(idx, vector) => (idx.toInt, vector)
  37. }
  38. //ij表示:其他用户购买的商品与用户2购买的该商品的相似度
  39. val ij = sc.parallelize(ipi).join(indexdsimilar).flatMap {
  40. case (i, (pi, vector: SparseVector)) => (vector.indices zip vector.values)
  41. }
  42. /********** begin **********/
  43. //ij1表示:其他用户购买过,但不在用户2购买的商品的列表中的商品和评分
  44. val ij1 = ij.filter { case (item, pref) => !uitems.contains(item) }
  45. //将这些商品的评分求和,并降序排列,并推荐前两个物品
  46. val ij2 = ij1.reduceByKey(_ + _).sortBy(_._2, false).take(2)
  47. /********** end **********/
  48. // crgjl
  49. //取消以下1行注释
  50. for (id <- ij2) print(id._1 + " ")
  51. sc.stop()
  52. }
  53. }

第2关:基于用户的推荐算法

  1. 根据提示,在右侧编辑器补充代码.实现:找出与用户1最相似的2个用户。
  1. import org.apache.log4j.{Level, Logger}
  2. import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry, RowMatrix}
  3. import org.apache.spark.rdd.RDD
  4. import org.apache.spark.{SparkConf, SparkContext}
  5. object UserBasedCF {
  6. def main(args: Array[String]): Unit = {
  7. Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  8. Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
  9. // 创建一个SparkContext
  10. val conf = new SparkConf().setAppName("UserBasedCF").setMaster("local")
  11. val sc = new SparkContext(conf)
  12. // 读入数据
  13. val data = sc.textFile("/root/data/als/ratingdata.txt")
  14. // 解析出评分矩阵的每一行
  15. val parseData: RDD[MatrixEntry] = data.map(_.split(",")
  16. match { case Array(user, item, rate) =>
  17. MatrixEntry(user.toLong, item.toLong, rate.toDouble)
  18. })
  19. // 构建关联矩阵
  20. val ratings = new CoordinateMatrix(parseData)
  21. // 转置矩阵以计算列(用户)的相似性
  22. val matrix: RowMatrix = ratings.transpose().toRowMatrix()
  23. // 计算得到用户的相似度矩阵
  24. val similarities = matrix.columnSimilarities()
  25. // 得到某个用户对所有物品的评分
  26. val ratingOfUser1 = ratings.entries.filter(_.i == 1).
  27. map(x => (x.j, x.value)).
  28. sortBy(_._1).
  29. map(_._1).
  30. collect().
  31. toList.
  32. toArray
  33. // 得到用户1相对于其他用户的相似性
  34. val similarityOfUser1 = similarities.entries.filter(_.i == 1).
  35. sortBy(_.value, false).
  36. map(_.value).
  37. collect
  38. // 需求:为用户1推荐2个商品
  39. // 思路:找到与用户1相似性最高的两个用户,将这两个用户评过分的物品,用户1没有评过分的物品推荐给用户1
  40. /********** begin **********/
  41. //找到与用户1相似性最高的两个用户
  42. val similarityTopUser = similarities.entries.filter(_.i == 1).
  43. sortBy(_.value, false).
  44. map(x=>(x.j, x.value)).
  45. collect.
  46. take(2)
  47. //println("与用户1最相似的两个用户如下:")
  48. //取消以下2行注释
  49. for (s <- similarityTopUser) print(s._1 + " ")
  50. for (s <- similarityTopUser) {
  51. // 找到这两个用户评过分的商品,与用户1没有评过分的物品
  52. val userId = s._1
  53. val ratingOfTemp = ratings.entries.filter(_.i == userId).
  54. map(x => (x.j, x.value)).
  55. sortBy(_._1).
  56. map(_._1).
  57. collect().
  58. toList.
  59. toArray
  60. // 用户1与当前用户求差集
  61. val dis = ratingOfTemp diff ratingOfUser1
  62. //println("用户" + userId + "要推荐给用户1的商品id为")
  63. for (id <- dis) print(id + " ")
  64. }
  65. /********** end **********/
  66. sc.stop()
  67. }
  68. }

第3关:基于ALS的推荐算法

  1. 根据提示,在右侧编辑器补充代码。创建一个
  1. ALS

模型,使用调用fit方法,使用

  1. training

训练生成

  1. model

  1. import org.apache.spark.ml.evaluation.RegressionEvaluator
  2. import org.apache.spark.ml.recommendation.ALS
  3. import org.apache.spark.sql.{DataFrame, SparkSession}
  4. object ALS {
  5. case class Rating(userId: Int, movieId: Int, rating: Float)
  6. def parseRating(str: String): Rating = {
  7. val fields = str.split(",")
  8. assert(fields.size == 3)
  9. Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat)
  10. }
  11. def main(args: Array[String]) {
  12. val spark = SparkSession
  13. .builder
  14. .master("local")
  15. .appName("ALS")
  16. .getOrCreate()
  17. import spark.implicits._
  18. val ratings = spark.read.textFile("data/als/ratingdata.txt")
  19. .map(parseRating)
  20. .toDF()
  21. val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))
  22. /********** begin **********/
  23. // Build the recommendation model using ALS on the training data
  24. val als = new ALS()
  25. .setMaxIter(5)
  26. .setRegParam(0.01)
  27. .setUserCol("userId")
  28. .setItemCol("movieId")
  29. .setRatingCol("rating")
  30. val model = als.fit(training)
  31. /********** end **********/
  32. // "Evaluate the model by computing the RMSE on the test data"
  33. // "Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics"
  34. //取消以下3行注释
  35. model.setColdStartStrategy("drop")
  36. val predictions = model.transform(test)
  37. evaluatingRMSE(predictions)
  38. spark.stop()
  39. }
  40. def evaluatingRMSE(predictions:DataFrame):Unit = {
  41. val evaluator = new RegressionEvaluator()
  42. .setMetricName("rmse")
  43. .setLabelCol("rating")
  44. .setPredictionCol("prediction")
  45. val rmse = evaluator.evaluate(predictions)
  46. if (rmse <= 2){
  47. print("\n" + "good")
  48. }else{
  49. println()
  50. predictions.show(false)
  51. }
  52. }
  53. }

第4关:基于随机森林预测贷款风险

  1. 编写一个预测贷款风险的随机森林二分类模型。
  1. import org.apache.spark.rdd.RDD
  2. import org.apache.spark.sql.{DataFrame,SparkSession}
  3. import org.apache.spark.ml.classification.RandomForestClassifier
  4. import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
  5. import org.apache.spark.ml.feature.StringIndexer
  6. import org.apache.spark.ml.feature.VectorAssembler
  7. import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
  8. import org.apache.spark.ml.{Pipeline, PipelineStage}
  9. import org.apache.spark.mllib.evaluation.RegressionMetrics
  10. object Credit {
  11. case class Credit(
  12. creditability: Double,
  13. balance: Double, duration: Double, history: Double, purpose: Double, amount: Double,
  14. savings: Double, employment: Double, instPercent: Double, sexMarried: Double, guarantors: Double,
  15. residenceDuration: Double, assets: Double, age: Double, concCredit: Double, apartment: Double,
  16. credits: Double, occupation: Double, dependents: Double, hasPhone: Double, foreign: Double
  17. )
  18. def parseCredit(line: Array[Double]): Credit = {
  19. Credit(
  20. line(0),
  21. line(1) - 1, line(2), line(3), line(4), line(5),
  22. line(6) - 1, line(7) - 1, line(8), line(9) - 1, line(10) - 1,
  23. line(11) - 1, line(12) - 1, line(13), line(14) - 1, line(15) - 1,
  24. line(16) - 1, line(17) - 1, line(18) - 1, line(19) - 1, line(20) - 1
  25. )
  26. }
  27. def parseRDD(rdd: RDD[String]): RDD[Array[Double]] = {
  28. rdd.map(_.split(",")).map(_.map(_.toDouble))
  29. }
  30. def evaluatingAUC(predictedResultDF:DataFrame, labelstring:String):Unit = {
  31. val evaluator = new BinaryClassificationEvaluator().setLabelCol(labelstring).setRawPredictionCol("prediction")
  32. val predictionAUC = evaluator.setMetricName("areaUnderROC").evaluate(predictedResultDF)
  33. if(predictionAUC > 0.6){
  34. print("\n" + "good")
  35. }else{
  36. print(s"areaUnderROC: $predictionAUC")
  37. }
  38. }
  39. def main(args: Array[String]) {
  40. val spark = SparkSession
  41. .builder
  42. .appName("Credit")
  43. .master("local")
  44. .getOrCreate()
  45. import spark.implicits._
  46. val creditDF = parseRDD(spark.sparkContext.textFile("/root/data/germancredit.csv")).map(parseCredit).toDF()
  47. creditDF.createTempView("credit")
  48. val featureCols = Array("balance", "duration", "history", "purpose", "amount",
  49. "savings", "employment", "instPercent", "sexMarried", "guarantors",
  50. "residenceDuration", "assets", "age", "concCredit", "apartment",
  51. "credits", "occupation", "dependents", "hasPhone", "foreign")
  52. /********** begin **********/
  53. // 合并特征列。
  54. val assembler = new VectorAssembler().setInputCols(featureCols).setOutputCol("features")
  55. val df2 = assembler.transform(creditDF)
  56. /********** end **********/
  57. //取消以下4行注释
  58. val labelIndexer = new StringIndexer().setInputCol("creditability").setOutputCol("label")
  59. val df3 = labelIndexer.fit(df2).transform(df2)
  60. val splitSeed = 5043
  61. val Array(trainingData, testData) = df3.randomSplit(Array(0.7, 0.3), splitSeed)
  62. /********** begin **********/
  63. //调用随机森林API,使用trainingData训练生成模型model
  64. val classifier = new RandomForestClassifier().setImpurity("gini").setMaxDepth(5).setNumTrees(20).setFeatureSubsetStrategy("auto").setSeed(5043)
  65. val model = classifier.fit(trainingData)
  66. /********** end **********/
  67. //取消以下2行注释
  68. val predictions = model.transform(testData)
  69. evaluatingAUC(predictions,"label")
  70. spark.stop()
  71. }
  72. }

第5关:基于多层感知器的手机短信分类

  1. 编写一个短信文本分类的程序。使用Spark.ml中的多层感知器(MLPMulti Layer Perceptron ClassifierAPI——MultilayerPerceptronClassifer
  1. import org.apache.spark.ml.Pipeline
  2. import org.apache.spark.ml.classification.MultilayerPerceptronClassifier
  3. import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
  4. import org.apache.spark.ml.feature.{IndexToString, StringIndexer, Word2Vec}
  5. import org.apache.spark.sql.{DataFrame,SparkSession}
  6. object SMSClassifier {
  7. final val VECTOR_SIZE = 100
  8. def evaluatingAUC(predictedResultDF:DataFrame, labelcol: String):Unit = {
  9. val evaluator = new BinaryClassificationEvaluator().setLabelCol(labelcol).setRawPredictionCol("prediction")
  10. val predictionAUC = evaluator.setMetricName("areaUnderROC").evaluate(predictedResultDF)
  11. if(predictionAUC > 0.8){
  12. print("\n" + "good")
  13. }else{
  14. print(s"areaUnderROC: $predictionAUC")
  15. }
  16. }
  17. def main(args: Array[String]) {
  18. val spark = SparkSession
  19. .builder
  20. .master("local")
  21. .appName("SMS Message Classification (HAM or SPAM)")
  22. .getOrCreate()
  23. val parsedRDD = spark.sparkContext.textFile("data/smsspamcollection/SMSSpamCollection").map(_.split("\t")).map(eachRow => {
  24. (eachRow(0),eachRow(1).split(" "))
  25. })
  26. val msgDF = spark.createDataFrame(parsedRDD).toDF("label","message")
  27. val labelIndexer = new StringIndexer()
  28. .setInputCol("label")
  29. .setOutputCol("indexedLabel")
  30. .fit(msgDF)
  31. /********** begin **********/
  32. val word2Vec = new Word2Vec()
  33. .setInputCol("message")
  34. .setOutputCol("features")
  35. .setVectorSize(VECTOR_SIZE)
  36. .setMinCount(1)
  37. /********** end **********/
  38. val layers = Array[Int](VECTOR_SIZE,6,5,2)
  39. /********** begin **********/
  40. val mlpc = new MultilayerPerceptronClassifier()
  41. .setLayers(layers)
  42. .setBlockSize(512)
  43. .setSeed(1234L)
  44. .setMaxIter(128)
  45. .setFeaturesCol("features")
  46. .setLabelCol("indexedLabel")
  47. .setPredictionCol("prediction")
  48. /********** end **********/
  49. val labelConverter = new IndexToString()
  50. .setInputCol("prediction")
  51. .setOutputCol("predictedLabel")
  52. .setLabels(labelIndexer.labels)
  53. val Array(trainingData, testData) = msgDF.randomSplit(Array(0.8, 0.2))
  54. /********** begin **********/
  55. val pipeline = new Pipeline().setStages(Array(labelIndexer,word2Vec,mlpc,labelConverter))
  56. val model = pipeline.fit(trainingData)
  57. /********** end **********/
  58. //取消以下两行注释
  59. val predictionResultDF = model.transform(testData)
  60. evaluatingAUC(predictionResultDF,"indexedLabel")
  61. spark.stop()
  62. }
  63. }

本文转载自: https://blog.csdn.net/qq_48664727/article/details/125177812
版权归原作者 多多读书~ 所有, 如有侵权,请联系我们删除。

“educoder-Spark机器学习”的评论:

还没有评论