向量
创建
ml
importorg.apache.spark.ml.linalg.Vectors
//稠密向量
Vectors.dense(Array(0.0,1.0,2.0,3.0))
Vectors.dense(1.0,2.0,3.0)//稀疏向量
Vectors.sparse(3, Array(0,1), Array(1.0,5.0))
Vectors.sparse(4, Seq((0,1.0),(2,9.0)))
mllib
importorg.apache.spark.mllib.linalg.Vectors
//稠密向量
Vectors.dense(Array(0.0,1.0,2.0,3.0))
Vectors.dense(1.0,2.0,3.0)//稀疏向量
Vectors.sparse(3, Array(0,1), Array(1.0,5.0))
Vectors.sparse(4, Seq((0,1.0),(2,9.0)))
breeze
importbreeze.linalg.Vector
Vector(Array(1,2,3,4))
Vector(1.0,2.0,1.0)
向量积
// 向量积(向量长度必须相同)val d:Double= v1.dot(sv)
println(d)
遍历
v1.foreachActive((index,value)=>{
println((index,value))})(0,0.0)(1,1.0)(2,2.0)(3,3.0)
两向量相加
// zip:将2个Array中的元素一次匹配为2元祖,如果2数组数量不同将按照少的来
v1.toArray.zip(v2.toArray)// Array((0.0,1.0), (1.0,2.0), (2.0,3.0))// 两向量加// 因为要对2元组进行操作,所以需要使用到模式匹配
Vectors.dense(v1.toArray.zip(sv.toArray).map {case(a, b)=> a + b })
Vectors.dense(v1.toArray.zip(sv.toArray).map {case(a, b)=> a - b })
Vectors.dense(v1.toArray.zip(sv.toArray).map {case(a, b)=> a * b })
Vectors.dense(v1.toArray.zip(sv.toArray).map {case(a, b)=> a / b })
两向量平方距离
Vectors.sqdist(v1,v2)// 计算两个向量之间的平方距离(欧式距离的平方)
向量标签
importorg.apache.spark.mllib.linalg.{Vectors, Vector}importorg.apache.spark.mllib.regression.LabeledPoint
val vr = Vectors.dense(Array(1.0,2.0,3.0))// 创建向量val pos = LabeledPoint(1, vr)// 给向量打上类别1 (1.0,[1.0,2.0,3.0])
println(pos.features)// 输出特征列 [1.0,2.0,3.0]
println(pos.label)// 输出类别 1.0
读取LIBSVM格式数据
数据
1 1:2 2:3 3:4
2 1:1 2:9 3:10
1 1:3 2:3 3:7
1 1:2 2:2 3:6
importorg.apache.spark.mllib.util.MLUtils
val mu = MLUtils.loadLibSVMFile(spark.sparkContext,"data/a")
mu.foreach(data =>{
println(data)})(1.0,(3,[0,1,2],[2.0,3.0,4.0]))(1.0,(3,[0,1,2],[3.0,3.0,7.0]))(1.0,(3,[0,1,2],[2.0,2.0,6.0]))(2.0,(3,[0,1,2],[1.0,9.0,10.0]))
Statistics
方法名解释count行内数据的个数max最大数据值单位min最小数据值单位mean平均值normL1曼哈顿距离normL2欧几里得距离(欧式距离)numNonzeros非0得个数variance标准差(返回向量)
importorg.apache.spark.mllib.linalg.Vectors
val seq = Seq(1.0,2.0,3.0,4.0,5.0,0.0,0.0)val rdd = spark.sparkContext.makeRDD(seq).map(d =>{
Vectors.dense(d)})importorg.apache.spark.mllib.stat.Statistics
val sum = Statistics.colStats(rdd)
println("count:"+sum.count)//元素数量
println("max:"+sum.max)// 最大的向量(返回向量)
println("min:"+sum.min)// 最小的向量
println("mean:"+sum.mean)// 计算平均值
println("normL1:"+sum.normL1)// 曼哈顿距离
println("normL2:"+sum.normL2)// 欧几里得距离
println("numNonzeros:"+sum.numNonzeros)// 非0得个数
println("variance:"+sum.variance)// 计算标准差
随机数
importorg.apache.spark.mllib.random.RandomRDDs._
// 创建一个正态分布的RDD随机数(100为数量)val nnn = normalRDD(spark.sparkContext,100)
nnn.collect().foreach(data => println(data))
矩阵
创建
importorg.apache.spark.ml.linalg.{Matrices, DenseMatrix, Vector}// 创建稠密矩阵val m1 = Matrices.dense(3,3, Array(1,2,3,4,5,6,7,8,9))// 创建对角矩阵val m2 = Matrices.diag(Vectors.dense(1.0,2.0,3.0))// 创建稀疏矩阵// 创建(0,0)(1.0) 位置为(9.0,199.0)其他为0.0的矩阵val m3 = Matrices.sparse(3,3, Array(0,1,2,3), Array(0,1,0), Array(9.0,199.0,1991.0)).toDense
// 创建一个全一矩阵val m4 = Matrices.ones(3,3)// 创建一个全0矩阵val m5 = Matrices.zeros(3,3)
常用算子
m1.numRows // 返回矩阵的行数
m1.numCols // 返回矩阵的列数
m1.colIter // 返回Iterator[Vector]迭代器
m1.apply(1,2)// 返回n行n列的值
m1.copy // 复制
m1.transpose // 转置
m1.multiply(m2.toDense)// 矩阵相乘
m1.multiply(Vectors.dense(Array(1.0,2.0,3.0)))// 矩阵和向量相乘(将向量作为矩阵然后相乘)
m1.numNonzeros // 统计不为0的总数
m3.numActives // 查找稀疏索引中不为0的数量
m1.toSparseColMajor // 转换为稀疏矩阵
m1.toSparse // 转换为稀疏矩阵
m3.toDense // 转换为稠密矩阵
m3.compressedColMajor // 转换为稠密矩阵
遍历
//遍历(行,列,值)
m1.foreachActive((row,col,value)=>{
println(row,col,value)})
值计算
val iter = m1.colIter
val buffer: ArrayBuffer[Double]= ArrayBuffer()while(iter.hasNext){val vector = iter.next()
buffer.append(vector.toArray.map(_ +10): _*)}
Matrices.dense(m1.numRows,m1.numCols,buffer.toArray)
Breeze
Breeze是机器学习和数值计算库
创建
importbreeze.linalg._
importbreeze.numerics._
// 创建全0矩阵(3行3列)
DenseMatrix.zeros[Double](3,3)// 创建全一矩阵
DenseMatrix.ones[Int](3,3)// 创建一个指定值的向量(前面表示长度)
DenseVector.fill(3)(2)// 创建一个从1到10步长为2的向量
DenseVector.range(1,10,2)// 创建3个值的对角矩阵
DenseMatrix.eye[Double](3)// 创建指定值的对角矩阵
diag(DenseVector(1.0,2.0,3.0))// 创建2维矩阵
DenseMatrix((1.0,2.0),(3.0,4.0))// 对向量进行转制
DenseVector(1,2,3,4).t
// 从函数创建矩阵
DenseVector.tabulate(4)(i =>2* i)// DenseVector(0, 2, 4, 6)// 从函数创建矩阵(i,j)表示行和列
DenseMatrix.tabulate(3,2){case(i,j)=> i + j}// 创建0到1的随机值向量
DenseVector.rand(4)// 创建0到1的随机值矩阵
DenseMatrix.rand[Double](2,3)
获取元素
importbreeze.linalg._
importbreeze.numerics._
val a = DenseVector(1,2,3,4,5,6,7,8,9,10)val m = DenseMatrix((1.0,2.0,3.0),(4.0,5.0,6.0))// 查看向量元素
a(1 to 4)
a(5 to 0 by -1)
a(1 to -1)// 查看矩阵元素
m(1,1)
m(::,1)
协同过滤算法
ALS
val frame4 = data1.withColumn("rating", lit(1)).cache()importorg.apache.spark.ml.recommendation.ALS
val als =new ALS().setMaxIter(15).setRank(19).setRegParam(0.01).setImplicitPrefs(true).setUserCol("user_id").setItemCol("sku_id").setRatingCol("rating").setPredictionCol("prediction")val model = als.fit(frame4)val frame3 = model.transform(frame4)// 评估模型importorg.apache.spark.ml.evaluation.RegressionEvaluator
val d =new RegressionEvaluator().setPredictionCol("prediction").setLabelCol("rating").setMetricName("rmse").evaluate(frame3)// 获取用户商品数量
model.recommendForAllUsers(5).where("user_id = 8698").show
// 获取每个商品推荐给的用户
model.recommendForAllItems(5).show
println("模型RMSE为:"+ d)for(i <-5 until 20 by 2; j <-5 until 21 by 5){val als1 =new ALS().setMaxIter(j).setRank(i).setRegParam(0.01).setImplicitPrefs(true).setUserCol("user_id").setItemCol("sku_id").setRatingCol("rating").setPredictionCol("prediction"+ i + j).setSeed(100)val model1 = als1.fit(frame3)val frame5 = model1.transform(frame3)val d1 =new RegressionEvaluator().setPredictionCol("prediction"+ i + j).setLabelCol("rating").setMetricName("rmse").evaluate(frame5)
println("rank= "+ i +" maxIter= "+ j +" RMSE= "+ d1)}
最小二乘法
标函数
=
∑
(
观测值
−
理论值
)
2
标函数 = \sum (观测值 - 理论值)^2
标函数=∑(观测值−理论值)2
协同过滤算法是通过类别与其他类别得相似度来推荐的
计算相似度
packageorg.test.sparkimportorg.apache.spark.{SparkConf, SparkContext}importscala.collection.mutableimportscala.util.Random
object t4 {def main(args: Array[String]):Unit={val conf =new SparkConf().setMaster("local[*]").setAppName("a")val sc =new SparkContext(conf)val users = sc.makeRDD(Array("aaa","bbb","ccc","ddd","eee"))val films = sc.makeRDD(Array("smzdm","ylxb","znh","nhsc","fcwr"))//嵌套姓名和电影名和评分val source: mutable.Map[String, mutable.Map[String,Int]]= mutable.Map()// 对每一个人对每一个电影评分val r =new Random()
users.collect().foreach(user =>{val map = mutable.Map[String,Int]()
films.collect().foreach(film =>{val i = r.nextInt(5)
map.put(film, i)})
source.put(user, map)})val user ="aaa"for(i <- users){
println(s"${user}和${i}的相似度为${getCollaborateSource(user,i,source)}")}}def getCollaborateSource(user1:String, user2:String, source: mutable.Map[String, mutable.Map[String,Int]]):Double={// 取出每个用户对每个电影的评分val user1FilmSource = source(user1).values.toVector
val user2FilmSource = source(user2).values.toVector
val member = user1FilmSource.zip(user2FilmSource).map(d => d._1 * d._2).sum
// sqrt平方根// pow开方val temp1 = math.sqrt(user1FilmSource.map(num => math.pow(num,2)).sum)val temp2 = math.sqrt(user2FilmSource.map(num => math.pow(num,2)).sum)// 求分母val denominator = temp1 * temp2
//结果
member / denominator
}}
杰卡德相似系数
u1:表示用户1购买的商品向量
u2:表示用户2购买的商品向量
s i m u 1 u 2 = ∣ u 1 ∩ u 2 ∣ ∣ u 1 ∪ u 2 ∣ sim_{u1u2} = \frac{|u1 \cap u2|}{|u1 \cup u2|}
simu1u2=∣u1∪u2∣∣u1∩u2∣
余弦相似度
公式:
(
X
1
,
Y
1
)
∗
(
X
2
,
Y
2
)
a
⃗
⋅
b
⃗
\frac {(X_1,Y_1) * (X_2,Y_2)}{\vec{a} \cdot \vec{b}}
a⋅b(X1,Y1)∗(X2,Y2)
X
1
X
2
+
Y
1
Y
2
X
1
2
+
Y
1
2
∗
X
2
2
+
Y
2
2
\frac {X_1X_2 + Y_1Y_2}{\sqrt{X_1^2 + Y_1^2} * \sqrt{X_2^2 + Y_2^2}}
X12+Y12∗X22+Y22X1X2+Y1Y2
packageorg.spark.testobject t2 {def main(args: Array[String]):Unit={// 创建向量importorg.apache.spark.mllib.linalg.Vectors
val v1 = Vectors.dense(1.0,2.0)val v2 = Vectors.dense(3.0,2.0)// 计算余弦相似度val a =(v1(0)* v2(0))+(v1(1)* v2(1))val b = math.sqrt(math.pow(v1(0),2)+ math.pow(v1(1),2))* math.sqrt(math.pow(v2(0),2)+ math.pow(v2(1),2))
println("分子:"+ a)
println("分母:"+ b)
println("结果:"+ a / b)// 使用函数计算
println(v1.dot(v2)/(Vectors.norm(v1,2)* Vectors.norm(v2,2)))}}
特征预处理算法包
import org.apache.spark.ml.feature
Tokenizer
这个类用于拆分文本,转换为向量
注意:只会使用空格分隔
packageorg.testimportorg.apache.spark.sql.SparkSession
object tokenizer {def main(args: Array[String]):Unit={val spark =new SparkSession.Builder().master("local[*]").config("hive.metastore.uris","thrift://bigdata1:9083").config("spark.sql.warehouse","hdfs://bigdata1:9000/user/hive.warehouse").enableHiveSupport().getOrCreate()importspark.implicits._
val data = Seq("aasd b s dfd f","ads s fd f g","af s dfd")val frame = spark.sparkContext.makeRDD(data).toDF("text")importorg.apache.spark.ml.feature.Tokenizer
val tokenizer1 =new Tokenizer().setInputCol("text").setOutputCol("word").transform(frame)
tokenizer1.show()}}+--------------+--------------------+| text| word|+--------------+--------------------+|aasd b s dfd f|[aasd, b, s, dfd, f]|| ads s fd f g|[ads, s, fd, f, g]|| af s dfd|[af, s, dfd]|+--------------+--------------------+
RegexTokenizer
分隔文本为向量,可以使用正则匹配分隔符
参数
参数名作用setPattern使用正则匹配设置分隔符setToLowercase设置是否将字符更改为小写,默认为true
packageorg.testimportorg.apache.spark.sql.SparkSession
object tokenizer {def main(args: Array[String]):Unit={val spark =new SparkSession.Builder().master("local[*]").config("hive.metastore.uris","thrift://bigdata1:9083").config("spark.sql.warehouse","hdfs://bigdata1:9000/user/hive.warehouse").enableHiveSupport().getOrCreate()importspark.implicits._
val data = Seq("aAsd|b|s|dfd|f","aDs|s|fd|f|g","aF|s|Dfd")val frame = spark.sparkContext.makeRDD(data).toDF("text")importorg.apache.spark.ml.feature.RegexTokenizer
val frame1 =new RegexTokenizer().setInputCol("text").setOutputCol("word").setPattern("\\|").setToLowercase(false).transform(frame)
frame1.show()}}+--------------+--------------------+| text| word|+--------------+--------------------+|aAsd|b|s|dfd|f|[aAsd, b, s, dfd, f]|| aDs|s|fd|f|g|[aDs, s, fd, f, g]|| aF|s|Dfd|[aF, s, Dfd]|+--------------+--------------------+
StringIndexer
将文本根据数量从大到小依次训练为索引,出现有多的文本索引越小,最小为0.0
packageorg.testimportorg.apache.spark.sql.SparkSession
object stringIndex {def main(args: Array[String]):Unit={val spark =new SparkSession.Builder().master("local[*]").config("hive.metastore.uris","thrift://bigdata1:9083").config("spark.sql.warehouse","hdfs://bigdata1:9000/user/hive.warehouse").enableHiveSupport().getOrCreate()importspark.implicits._
val frame = spark.sparkContext.makeRDD(Seq(("a",1.0),("c",8.0),("v",7.0),("b",6.0),("a",5.0),("c",4.0),("a",3.0))).toDF("name","f1")importorg.apache.spark.ml.feature.StringIndexer
val frame1 =new StringIndexer().setInputCol("name").setInputCol("name").setOutputCol("nameIndex").setHandleInvalid("skip")// 跳过未知值.fit(frame).transform(frame)
frame1.show()}}
IndexToString
将索引转换为文本
packageorg.testimportorg.apache.spark.sql.SparkSession
object stringIndex {def main(args: Array[String]):Unit={val spark =new SparkSession.Builder().master("local[*]").config("hive.metastore.uris","thrift://bigdata1:9083").config("spark.sql.warehouse","hdfs://bigdata1:9000/user/hive.warehouse").enableHiveSupport().getOrCreate()importspark.implicits._
val frame = spark.sparkContext.makeRDD(Seq(("a",1.0),("c",8.0),("v",7.0),("b",6.0),("a",5.0),("c",4.0),("a",3.0))).toDF("name","f1")importorg.apache.spark.ml.feature.StringIndexer
val frame1 =new StringIndexer().setInputCol("name").setInputCol("name").setOutputCol("nameIndex").fit(frame).transform(frame)importorg.apache.spark.ml.feature.IndexToString
val frame2 =new IndexToString().setInputCol("nameIndex").setOutputCol("name").transform(frame1.drop("name"))
frame2.show()}}
StandardScaler
方差归一化
平均值
M
=
∑
i
=
1
n
x
i
n
方差
S
=
∑
i
=
1
n
(
x
i
−
M
)
2
n
均值方差归一化
X
s
c
a
l
e
=
x
−
M
S
平均值M = \frac{\sum_{i=1}^{n}x_i}{n}\\\\ 方差S = \frac{\sum_{i=1}^{n}(x_i - M)^2}{n}\\\\ 均值方差归一化X_{scale} = \frac{x - M}{S}
平均值M=n∑i=1nxi方差S=n∑i=1n(xi−M)2均值方差归一化Xscale=Sx−M
packageorg.testimportorg.apache.spark.ml.linalg.Vectors
importorg.apache.spark.sql.SparkSession
object standardScaler {def main(args: Array[String]):Unit={val spark =new SparkSession.Builder().master("local[*]").config("hive.metastore.uris","thrift://bigdata1:9083").config("spark.sql.warehouse","hdfs://bigdata1:9000/user/hive.warehouse").enableHiveSupport().getOrCreate()importspark.implicits._
val frame = spark.sparkContext.makeRDD(Seq(("a", Vectors.dense(50.0),1.0),("c", Vectors.dense(83.0),2.0),("v", Vectors.dense(75.0),3.0),("b", Vectors.dense(63.0),4.0),("a", Vectors.dense(54.0),100.0),("c", Vectors.dense(41.0),20.0),("a", Vectors.dense(3.0),5.0))).toDF("name","f1","f2")importorg.apache.spark.ml.feature.StandardScaler
val frame1 =new StandardScaler().setInputCol("f1").setOutputCol("f1Scaler").fit(frame).transform(frame)
frame1.show()}}+----+------+-----+--------------------+|name| f1| f2| f1Scaler|+----+------+-----+--------------------+| a|[50.0]|1.0|[1.904058883550657]|| c|[83.0]|2.0|[3.1607377466940907]|| v|[75.0]|3.0|[2.8560883253259854]|| b|[63.0]|4.0|[2.399114193273828]|| a|[54.0]|100.0|[2.0563835942347097]|| c|[41.0]|20.0|[1.5613282845115388]|| a|[3.0]|5.0|[0.11424353301303...|
+----+------+-----+--------------------+
MinMaxScaler
最大最小归一化
X
=
x
−
m
a
i
n
(
x
)
m
a
x
(
x
)
−
m
i
n
(
x
)
X = \frac{x - main(x)}{max(x) -min(x)}
X=max(x)−min(x)x−main(x)
packageorg.testimportorg.apache.spark.ml.linalg.Vectors
importorg.apache.spark.sql.SparkSession
object minMaxScaler {def main(args: Array[String]):Unit={val spark =new SparkSession.Builder().master("local[*]").config("hive.metastore.uris","thrift://bigdata1:9083").config("spark.sql.warehouse","hdfs://bigdata1:9000/user/hive.warehouse").enableHiveSupport().getOrCreate()importspark.implicits._
val frame = spark.sparkContext.makeRDD(Seq(("a", Vectors.dense(50.0),1.0),("c", Vectors.dense(83.0),2.0),("v", Vectors.dense(75.0),3.0),("b", Vectors.dense(63.0),4.0),("a", Vectors.dense(54.0),100.0),("c", Vectors.dense(41.0),20.0),("a", Vectors.dense(3.0),5.0))).toDF("name","f1","f2")importorg.apache.spark.ml.feature.MinMaxScaler
val frame1 =new MinMaxScaler().setInputCol("f1").setOutputCol("f1Scaler").fit(frame).transform(frame)
frame1.show()}}+----+------+-----+--------------------+|name| f1| f2| f1Scaler|+----+------+-----+--------------------+| a|[50.0]|1.0|[0.5875]|| c|[83.0]|2.0|[1.0]|| v|[75.0]|3.0|[0.9]|| b|[63.0]|4.0|[0.75]|| a|[54.0]|100.0|[0.6375000000000001]|| c|[41.0]|20.0|[0.47500000000000...|
| a|[3.0]|5.0|[0.0]|+----+------+-----+--------------------+
Binarizer
二值分类器,给定一个数小于这个数的为0.0大于这个数的为1.0
packageorg.testimportorg.apache.spark.ml.linalg.Vectors
importorg.apache.spark.sql.SparkSession
object binarizer {def main(args: Array[String]):Unit={val spark =new SparkSession.Builder().master("local[*]").config("hive.metastore.uris","thrift://bigdata1:9083").config("spark.sql.warehouse","hdfs://bigdata1:9000/user/hive.warehouse").enableHiveSupport().getOrCreate()importspark.implicits._
val frame = spark.sparkContext.makeRDD(Seq(("a", Vectors.dense(50.0),1.0),("c", Vectors.dense(83.0),2.0),("v", Vectors.dense(75.0),3.0),("b", Vectors.dense(63.0),4.0),("a", Vectors.dense(54.0),100.0),("c", Vectors.dense(41.0),20.0),("a", Vectors.dense(3.0),5.0))).toDF("name","f1","f2")importorg.apache.spark.ml.feature.Binarizer
val frame1 =new Binarizer().setInputCol("f1").setOutputCol("binarizer").setThreshold(60.0).transform(frame)
frame1.show()}}+----+------+-----+---------+|name| f1| f2|binarizer|+----+------+-----+---------+| a|[50.0]|1.0|[0.0]|| c|[83.0]|2.0|[1.0]|| v|[75.0]|3.0|[1.0]|| b|[63.0]|4.0|[1.0]|| a|[54.0]|100.0|[0.0]|| c|[41.0]|20.0|[0.0]|| a|[3.0]|5.0|[0.0]|+----+------+-----+---------+
Bucketizer
多值分类器
packageorg.testimportorg.apache.spark.ml.linalg.Vectors
importorg.apache.spark.sql.SparkSession
object bucketizer {def main(args: Array[String]):Unit={val spark =new SparkSession.Builder().master("local[*]").config("hive.metastore.uris","thrift://bigdata1:9083").config("spark.sql.warehouse","hdfs://bigdata1:9000/user/hive.warehouse").enableHiveSupport().getOrCreate()importspark.implicits._
val frame = spark.sparkContext.makeRDD(Seq(("a", Vectors.dense(50.0),1.0),("c", Vectors.dense(83.0),2.0),("v", Vectors.dense(75.0),3.0),("b", Vectors.dense(63.0),4.0),("a", Vectors.dense(54.0),100.0),("c", Vectors.dense(41.0),20.0),("a", Vectors.dense(3.0),5.0))).toDF("name","f1","f2")importorg.apache.spark.ml.feature.Bucketizer
val frame1 =new Bucketizer().setInputCol("f2").setOutputCol("bucketizer").setSplits(Array(Double.NegativeInfinity,5.0,10.0,15.0,50.0,Double.PositiveInfinity)).transform(frame)
frame1.show()}}+----+------+-----+----------+|name| f1| f2|bucketizer|+----+------+-----+----------+| a|[50.0]|1.0|0.0|| c|[83.0]|2.0|0.0|| v|[75.0]|3.0|0.0|| b|[63.0]|4.0|0.0|| a|[54.0]|100.0|4.0|| c|[41.0]|20.0|3.0|| a|[3.0]|5.0|1.0|+----+------+-----+----------+
VectorAssembler
将多个特征合并为一个特征
分类算法包
org.apache.spark.ml.classification
逻辑回归
参数
参数名作用setMaxIter设置迭代次数(默认100)setRegParam设置正则化参数(默认0.0),可以防止过拟合setTol设置收敛值,当变化小于这个值就停止(默认1e-6)elasticNetParam控制L1和L2正则化的比例(0.0为L2正则化)setStandardization设置是否对特征进行标准化处理(默认true)
Sigmoid函数
线性回归
X
=
∑
i
=
1
i
=
n
x
i
w
1
+
d
S
i
g
m
o
i
d
g
(
z
)
=
1
1
+
e
−
X
线性回归\\ X = \sum_{i=1}^{i=n}{x_iw_1} + d\\ Sigmoid\\ g(z) = \frac{1}{1+e^{-X}}
线性回归X=i=1∑i=nxiw1+dSigmoidg(z)=1+e−X1
损失函数
衡量参数的优劣的评估指标,用来求解最优参数的工具
损失函数小,模型在训练集上表现优异,拟合充分,参数优秀
损失函数大,模型在训练集上表现差劲,拟合不足,参数糟糕
y
i
=
真实值
y
w
(
x
i
)
=
预测值
损失函数
J
(
w
)
=
−
∑
i
=
1
n
(
y
i
∗
l
o
g
(
y
w
(
x
i
)
)
+
(
1
−
y
i
)
∗
l
o
g
(
1
−
y
w
(
x
i
)
)
)
y_i = 真实值\\ y_w(x_i) = 预测值\\ 损失函数 J(w)=-\sum_{i=1}^{n}(y_i * log(y_w(x_i)) + (1 - y_i) * log(1 - y_w(x_i)))
yi=真实值yw(xi)=预测值损失函数J(w)=−i=1∑n(yi∗log(yw(xi))+(1−yi)∗log(1−yw(xi)))
正则化
用来防止模型过拟合的参数,虽然线性模型本身是欠拟合的但是还是需要正则化系数来帮助我们调整模型
w
表示斜率和截距,
w
0
表示截距
L
1
正则化
=
J
(
w
)
=
J
(
w
)
+
∑
i
=
1
n
∣
w
∣
L
2
正则化
=
J
(
w
)
=
J
(
w
)
+
∑
i
=
1
n
(
w
)
2
w 表示斜率和截距,w_0表示截距\\ L1正则化=J(w) = J(w) + \sum_{i=1}^{n}|w|\\ L2正则化=J(w) = J(w) + \sqrt{\sum_{i=1}^{n}(w)^2}
w表示斜率和截距,w0表示截距L1正则化=J(w)=J(w)+i=1∑n∣w∣L2正则化=J(w)=J(w)+i=1∑n(w)2
案例
packageorg.spark.tttimportorg.apache.spark.sql.SparkSession
importorg.apache.log4j.{Level, Logger}importorg.apache.spark.sql.functions._
object t1 {def main(args: Array[String]):Unit={
Logger.getRootLogger.setLevel(Level.ERROR)val spark = SparkSession.builder().master("local[*]").getOrCreate()importspark.implicits._
// 获取数据var data = spark.read.option("header","true").csv("data/Customer_support_data.csv")// 删除不需要的列和空值过多的列
data = data.drop("Customer Remarks","order_date_time","Customer_City","Product_category","Item_price","connected_handling_time","Order_id")// 将String转换为indeximportorg.apache.spark.ml.feature.StringIndexer
val string_index =new StringIndexer().setInputCols(
data.drop("Unique id","CSAT Score").columns).setOutputCols(
data.drop("Unique id","CSAT Score").columns.map(r => r +"_index")).setHandleInvalid("skip")// 跳过未知值.fit(data)val data_index = string_index.transform(data)// 将特征向量化importorg.apache.spark.ml.feature.VectorAssembler
val assembler =new VectorAssembler().setInputCols(
data.drop("Unique id","CSAT Score").columns.map(r => r +"_index")).setOutputCol("features")val frame = assembler.transform(data_index).withColumn("CSAT Score", col("CSAT Score").cast("Double"))// 划分数据集val Array(train, test)= frame.randomSplit(Array(0.7,0.3),100)// 使用逻辑回归训练模型importorg.apache.spark.ml.classification.LogisticRegression
val model =new LogisticRegression().setMaxIter(100).setRegParam(0.1).setTol(0.00000001).setLabelCol("CSAT Score").setFeaturesCol("features").setPredictionCol("prediction").fit(train)val frame1 = model.transform(test)// 检测精确度importorg.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val d1 =new MulticlassClassificationEvaluator().setLabelCol("CSAT Score").setPredictionCol("prediction").setMetricName("accuracy").evaluate(frame1)
println("精确度:"+ d1)}}
决策树
参数
参数名作用setMaxDepth设置树最大深度,最大为30setMinInstancesPerNode设置最小的特征裁剪,如果一个分支中小于2个特征那么将裁剪setMinInfoGain设置最小信息增益值(默认0.0),小于这个的分支将裁剪
不纯度
基尼不纯度
P(X=0)表示标签等于0时的概率比如:
P(X=0) = 0.5 P(X=1) = 0.5
不纯度 = P(X=0)(1-P(X=0)) + P(X=1)(1-P(X=1)) = 0.25 + 0.25 = 0.5
∑
i
=
1
k
P
(
X
=
k
)
(
1
−
P
(
X
=
k
)
)
\sum_{i=1}^{k} P(X=k)(1-P(X=k))
i=1∑kP(X=k)(1−P(X=k))
packageorg.spark.tttimportorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions._
importorg.apache.log4j.{Logger, Level}object t3 {def main(args: Array[String]):Unit={
Logger.getRootLogger.setLevel(Level.ERROR)val spark = SparkSession.builder().master("local[*]").getOrCreate()importspark.implicits._
// 获取数据val data = spark.read.option("header","true").csv("data/Customer_support_data.csv")// 删除不需要的列和缺失值过多val frame = data.drop("Customer Remarks","Order_id","order_date_time","Customer_City","Product_category","Item_price","connected_handling_time","Issue_reported at","issue_responded")// 将数据转换为indeximportorg.apache.spark.ml.feature.StringIndexer
val index_model =new StringIndexer().setInputCols(frame.drop("Unique id","CSAT Score").columns).setOutputCols(frame.drop("Unique id","CSAT Score").columns.map(r => r +"_index")).setHandleInvalid("skip").fit(frame)val frame_index = index_model.transform(frame)// 将特征转换为向量importorg.apache.spark.ml.feature.VectorAssembler
val assembler =new VectorAssembler().setInputCols(frame.drop("Unique id","CSAT Score").columns.map(r => r +"_index")).setOutputCol("features")val frame_assembler = assembler.transform(frame_index).withColumn("CSAT Score", col("CSAT Score").cast("Double"))// 划分测试集和训练集val Array(train, test)= frame_assembler.randomSplit(Array(0.8,0.2),100)// 训练模型importorg.apache.spark.ml.classification.DecisionTreeClassifier
val classifier =new DecisionTreeClassifier().setMaxDepth(5).setMaxBins(1373).setLabelCol("CSAT Score").setFeaturesCol("features").setPredictionCol("prediction")val model = classifier.fit(train)val frame1 = model.transform(test)// 评估模型精确度importorg.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val d =new MulticlassClassificationEvaluator().setLabelCol("CSAT Score").setPredictionCol("prediction").setMetricName("accuracy").evaluate(frame1)// // 查看缺失值的占比// val count = data.count()// data.select(data.columns.map(r => round(sum(when(col(r).isNull || col(r).isNaN, 1).otherwise(0)) / count * 100, 1).as(r)): _*)// .show// 打印树结构
println(model.toDebugString)
println("精确度为:"+ d)// for (i <- 1 until 20 by 2){// val classifier = new DecisionTreeClassifier().setMaxDepth(i).setMaxBins(1373).setLabelCol("CSAT Score").setFeaturesCol("features").setPredictionCol("prediction")// val model = classifier.fit(train)// val frame1 = model.transform(test)// val d = new MulticlassClassificationEvaluator().setLabelCol("CSAT Score").setPredictionCol("prediction").setMetricName("accuracy").evaluate(frame1)// println("精确度为:" + d)// }}}
随机森林
参数
参数名作用setNumTrees设置树的数量,越大精度越高(到达一定数量精度会上下摆动),数量越多需要的时间也就越多(默认20)setMaxDepth设置每颗树的深度(默认5)setFeatureSubsetStrategy设置子集选择策略(默认auto)setImpurity信息熵选择策略(“gini”,“entropy”)默认gini
packageorg.spark.tttimportorg.apache.log4j.{Level, Logger}importorg.apache.spark.sql.SparkSession
importorg.apache.spark.sql.functions._
importscala.collection.mutable.ArrayBuffer
object t4 {def main(args: Array[String]):Unit={
Logger.getRootLogger.setLevel(Level.ERROR)val spark = SparkSession.builder().master("local[*]").getOrCreate()importspark.implicits._
// 获取数据val data = spark.read.option("header","true").csv("data/Customer_support_data.csv")// 删除不需要的列和缺失值过多val frame = data.drop("Customer Remarks","Order_id","order_date_time","Customer_City","Product_category","Item_price","connected_handling_time","Issue_reported at","issue_responded")// 将数据转换为indeximportorg.apache.spark.ml.feature.StringIndexer
val index_model =new StringIndexer().setInputCols(frame.drop("Unique id","CSAT Score").columns).setOutputCols(frame.drop("Unique id","CSAT Score").columns.map(r => r +"_index")).setHandleInvalid("skip").fit(frame)val frame_index = index_model.transform(frame)// 将特征转换为向量importorg.apache.spark.ml.feature.VectorAssembler
val assembler =new VectorAssembler().setInputCols(frame.drop("Unique id","CSAT Score").columns.map(r => r +"_index")).setOutputCol("features")val frame_assembler = assembler.transform(frame_index).withColumn("CSAT Score", col("CSAT Score").cast("Double"))// 划分测试集和训练集val Array(train, test)= frame_assembler.randomSplit(Array(0.8,0.2),24)// 训练模型importorg.apache.spark.ml.classification.RandomForestClassifier
val classifier =new RandomForestClassifier().setMaxDepth(10).setNumTrees(50).setFeatureSubsetStrategy("all").setMaxBins(1373).setSeed(100).setFeaturesCol("features").setPredictionCol("prediction").setLabelCol("CSAT Score")val model = classifier.fit(train)val frame1 = model.transform(test)// 评估模型importorg.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val d =new MulticlassClassificationEvaluator().setLabelCol("CSAT Score").setPredictionCol("prediction").setMetricName("accuracy").evaluate(frame1)
println("精度为:"+ d)importscala.collection.mutableval buffer: mutable.ArrayBuffer[(Int,Int,Double)]= mutable.ArrayBuffer()for(numTree <-10 until 101 by 10; maxDepth <-2 until 23 by 4){val classifier =new RandomForestClassifier().setMaxDepth(maxDepth).setNumTrees(numTree).setFeatureSubsetStrategy("all").setMaxBins(1373).setSeed(100).setFeaturesCol("features").setPredictionCol("prediction").setLabelCol("CSAT Score")val frame2 = classifier.fit(train).transform(test)val d1 =new MulticlassClassificationEvaluator().setLabelCol("CSAT Score").setPredictionCol("prediction").setMetricName("accuracy").evaluate(frame2)
println("maxDepth="+ maxDepth +" numTree="+ numTree +" 精确度为:"+ d1)
buffer.append((maxDepth, numTree, d1))}val head = buffer.sortBy(-_._3).head
println("maxDepth="+ head._1 +" numTree="+ head._2 +" 精确度最高为:"+ head._3)}}
朴素贝叶斯
贝叶斯是一个不需要建模的模型,直接通过给定的数据计算概率
参数
参数名作用setModelType设置处理的分类类型setSmoothing设置平滑值(防止概率为0)
概率
P
(
X
=
x
,
Y
=
y
)
:表示
X
取值为
x
和
Y
取值为
y
,这俩个事件同时发生的概率
P
(
Y
=
y
∣
X
=
x
)
:表示在
X
取值为
x
的情况下,
Y
取值为
y
发生的概率
P(X=x,Y=y):表示X取值为x和Y取值为y,这俩个事件同时发生的概率\\ P(Y=y|X=x):表示在X取值为x的情况下,Y取值为y发生的概率
P(X=x,Y=y):表示X取值为x和Y取值为y,这俩个事件同时发生的概率P(Y=y∣X=x):表示在X取值为x的情况下,Y取值为y发生的概率
贝叶斯等式
P
(
X
,
Y
)
=
P
(
Y
∣
X
)
∗
P
(
X
)
=
P
(
X
∣
Y
)
∗
P
(
Y
)
P
(
Y
∣
X
)
=
P
(
X
∣
Y
)
∗
P
(
Y
)
P
(
X
)
P(X,Y) = P(Y|X)*P(X)=P(X|Y)*P(Y)\\ P(Y|X) =\frac{P(X|Y)*P(Y)}{P(X)}
P(X,Y)=P(Y∣X)∗P(X)=P(X∣Y)∗P(Y)P(Y∣X)=P(X)P(X∣Y)∗P(Y)
公式
∏
i
=
1
n
P
(
X
i
∣
Y
=
1
)
:表示在
Y
=
1
的条件下对每一个
X
求概率并相乘
P
(
Y
=
1
,
X
^
)
=
P
(
Y
=
1
)
∗
∏
i
=
1
n
P
(
X
i
∣
Y
=
1
)
P
(
X
^
)
\prod_{i=1}^{n}P(X_i|Y=1):表示在Y=1的条件下对每一个X求概率并相乘\\ P(Y=1,\hat{X}) = \frac{P(Y=1)*\prod_{i=1}^{n}P(X_i|Y=1)}{P(\hat{X})}
i=1∏nP(Xi∣Y=1):表示在Y=1的条件下对每一个X求概率并相乘P(Y=1,X^)=P(X^)P(Y=1)∗∏i=1nP(Xi∣Y=1)
packageorg.testimportorg.apache.spark.sql.SparkSession
importorg.apache.log4j.{Level, Logger}importorg.apache.spark.sql.functions._
object naiveBayes {def main(args: Array[String]):Unit={
Logger.getRootLogger.setLevel(Level.ERROR)val spark =new SparkSession.Builder().master("local[*]").config("hive.metastore.uris","thrift://bigdata1:9083").config("spark.sql.warehouse","hdfs://bigdata1:9000/user/hive.warehouse").enableHiveSupport().getOrCreate()importspark.implicits._
val frame = spark.read.option("header","true").csv("/opt/module/IdeaProjects/flink7/data/dating.txt").select(col("milage").cast("Double"), col("Liters").cast("Double"), col("Consumtime").cast("Double"), col("target").cast("Double"))val Array(train, test)= frame.randomSplit(Array(0.8,0.2))importorg.apache.spark.ml.feature.VectorAssembler
val assembler =new VectorAssembler().setInputCols(
train
.drop("target").columns
).setOutputCol("f")importorg.apache.spark.ml.classification.NaiveBayes
val bayes =new NaiveBayes().setLabelCol("target").setFeaturesCol("f").setPredictionCol("p").setModelType("complement").setSmoothing(1.0)val model = bayes.fit(assembler.transform(train))val frame1 = model.transform(assembler.transform(test))importorg.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val d =new MulticlassClassificationEvaluator().setLabelCol("target").setPredictionCol("p").setMetricName("accuracy").evaluate(frame1)
println(d)
frame1.select(col("target"),col("p")).show(false)}}
聚类算法包
org.apache.spark.ml.clustering
KMeans
参数
参数名作用setK设置预测多少个类别setMaxIter设置最大迭代次数
packageorg.mlimportorg.apache.spark.sql.SparkSession
importorg.apache.log4j.{Level, Logger}importorg.apache.spark.sql.functions._
object kMeans {def main(args: Array[String]):Unit={
Logger.getRootLogger.setLevel(Level.ERROR)val spark =new SparkSession.Builder().master("local[*]").config("hive.metastore.uris","thrift://bigdata1:9083").config("spark.sql.warehouse","hdfs://bigdata1:9000/user/hive.warehouse").enableHiveSupport().getOrCreate()importspark.implicits._
val frame = spark.read.option("header","true").csv("/opt/module/IdeaProjects/flink7/data/dating.txt").select(col("milage").cast("Double"), col("Liters").cast("Double"), col("Consumtime").cast("Double"), col("target").cast("Double"))val Array(train, test)= frame.randomSplit(Array(0.8,0.2))importorg.apache.spark.ml.feature.VectorAssembler
val assembler =new VectorAssembler().setInputCols(
train
.drop("target").columns
).setOutputCol("f")importorg.apache.spark.ml.clustering.KMeans
val means =new KMeans().setFeaturesCol("f").setPredictionCol("p").setK(3).setMaxIter(50)val model = means.fit(assembler.transform(train))val frame1 = model.transform(assembler.transform(test))importorg.apache.spark.ml.evaluation.ClusteringEvaluator
val d =new ClusteringEvaluator().setFeaturesCol("f").setPredictionCol("p").evaluate(frame1)
println(d)
frame1.select("target","p").show()}}
回归包
org.apache.spark.ml.regression
线性回归
公式
w
0
:表示截距
w
i
:表示每一个特征的回归系数
y
^
=
w
0
+
∑
i
=
1
n
w
i
x
i
简化
y
^
=
X
w
w_0:表示截距\\ w_i:表示每一个特征的回归系数\\ \hat{y} = w_0 + \sum_{i=1}^{n}w_ix_i\\ 简化\\ \hat{y} = X_w
w0:表示截距wi:表示每一个特征的回归系数y^=w0+i=1∑nwixi简化y^=Xw
损失函数
用来评判模型的好坏,损失函数越小,那么模型越好
y
i
:样本对应的真实值
y
i
^
:预测值
∑
i
=
1
n
(
y
i
−
y
i
^
)
2
y_i:样本对应的真实值\\ \hat{y_i}:预测值\\ \sum_{i=1}^{n}(y_i - \hat{y_i})^2
yi:样本对应的真实值yi^:预测值i=1∑n(yi−yi^)2
最小二乘法
用来求出损失函数最小的方法
每一列的真实
x
X
^
=
[
x
1
.
.
.
x
n
1
x
1
.
.
.
x
n
1
x
1
.
.
.
x
n
1
]
每一列需要预测的值
W
^
=
[
w
1
.
.
.
w
n
d
w
1
.
.
.
w
n
d
w
1
.
.
.
w
n
d
]
d
为截距
每一列真实值
y
^
=
[
y
1
y
2
y
3
]
最小二乘法:
W
^
=
(
X
^
T
X
^
)
−
1
X
^
T
y
^
每一列的真实x\\ \hat{X} = \begin{bmatrix} x_1 & ... & x_n & 1\\ x_1 & ... & x_n & 1\\ x_1 & ... & x_n & 1 \end{bmatrix}\\ 每一列需要预测的值\\ \hat{W} = \begin{bmatrix} w_1 & ... & w_n & d\\ w_1 & ... & w_n & d\\ w_1 & ... & w_n & d \end{bmatrix}d为截距\\ 每一列真实值\\ \hat{y} = \begin{bmatrix} y_1\\ y_2\\ y_3 \end{bmatrix}\\ 最小二乘法:\hat{W} = (\hat{X}^T\hat{X})^{-1}\hat{X}^T\hat{y}
每一列的真实xX^=x1x1x1.........xnxnxn111每一列需要预测的值W^=w1w1w1.........wnwnwndddd为截距每一列真实值y^=y1y2y3最小二乘法:W^=(X^TX^)−1X^Ty^
packageorg.mlimportorg.apache.spark.sql.SparkSession
importorg.apache.log4j.{Level, Logger}importorg.apache.spark.sql.functions._
object linearRegression {def main(args: Array[String]):Unit={
Logger.getRootLogger.setLevel(Level.ERROR)val spark =new SparkSession.Builder().master("local[*]").config("hive.metastore.uris","thrift://bigdata1:9083").config("spark.sql.warehouse","hdfs://bigdata1:9000/user/hive.warehouse").enableHiveSupport().getOrCreate()importspark.implicits._
val frame = spark.read.option("header","true").csv("/opt/module/IdeaProjects/flink7/data/dating.txt").select(col("milage").cast("Double"), col("Liters").cast("Double"), col("Consumtime").cast("Double"), col("target").cast("Double"))val Array(train, test)= frame.randomSplit(Array(0.8,0.2))importorg.apache.spark.ml.feature.VectorAssembler
val assembler =new VectorAssembler().setInputCols(
train
.drop("target").columns
).setOutputCol("f")importorg.apache.spark.ml.regression.LinearRegression
val lr =new LinearRegression().setLabelCol("target").setFeaturesCol("f").setPredictionCol("p").setMaxIter(100)val model = lr.fit(assembler.transform(train))val frame1 = model.transform(assembler.transform(test))
frame1.select("target","p").show(false)}}
降维
留下重要的几个特征
PCA
样本方差
PCA使用的信息量衡量指标,就是样本方差,又称可解释性方 差,方差越大,特征所带的信息量越多
衡量特征重要性的指标
平均值
M
=
∑
i
=
1
n
X
i
n
方差
S
=
∑
i
=
1
n
(
x
i
−
M
)
2
n
−
1
平均值M = \frac{\sum_{i=1}^{n}X_i}{n}\\\\ 方差S = \frac{\sum_{i=1}^{n}(x_i - M)^2}{n-1}
平均值M=n∑i=1nXi方差S=n−1∑i=1n(xi−M)2
SVD降维
注意在计算完特征向量后需要计算施密特正交化的
奇异值分解——SVD_svd求正交-CSDN博客
模型评估包
org.apache.spark.ml.evaluation
BinaryClassificationEvaluator
用来计算二分类精确度,默认使用ROC 曲线下面积
设置解释setLabelCol设置标签列setRawPredictionCol设置预测标签列setMetricName评估精确度指标setWeightCol设置加权列名
importorg.apache.spark.ml.evaluation.BinaryClassificationEvaluator
val d =new BinaryClassificationEvaluator().setLabelCol("CSAT Score").setRawPredictionCol("prediction").evaluate(frame1)
MulticlassClassificationEvaluator
多分类评估指标
设置解释setLabelCol设置标签列setPredictionCol设置预测标签列setMetricName设置评估指标(默认f1)setWeightCol设置加权列名
importorg.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val d1 =new MulticlassClassificationEvaluator().setLabelCol("CSAT Score").setPredictionCol("prediction").setMetricName("accuracy").evaluate(frame1)
模型调优包
import org.apache.spark.ml.tuning
交叉验证
// 配置评判指标importorg.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val evaluator =new MulticlassClassificationEvaluator().setLabelCol("machine_record_state").setPredictionCol("p").setMetricName("accuracy")// 配置参数网格importorg.apache.spark.ml.tuning.ParamGridBuilder
val builder =new ParamGridBuilder().addGrid(rfc.numTrees,10 to 100 by 10).addGrid(rfc.maxDepth,5 to 30 by 5).addGrid(rfc.featureSubsetStrategy, Array("all")).build()importorg.apache.spark.ml.tuning.CrossValidator
val crossValidator =new CrossValidator().setSeed(100).setEstimator(rfc).setEvaluator(evaluator).setEstimatorParamMaps(builder).setNumFolds(3)val model = crossValidator.fit(frame1)
网格搜索
// 配置评判指标importorg.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
val evaluator =new MulticlassClassificationEvaluator().setLabelCol("machine_record_state").setPredictionCol("p").setMetricName("accuracy")// 配置参数网格importorg.apache.spark.ml.tuning.ParamGridBuilder
val builder =new ParamGridBuilder().addGrid(rfc.numTrees,10 to 100 by 10).addGrid(rfc.maxDepth,5 to 30 by 5).addGrid(rfc.featureSubsetStrategy, Array("all")).build()importorg.apache.spark.ml.tuning.TrainValidationSplit
val split =new TrainValidationSplit().setSeed(100).setEstimator(rfc).setEvaluator(evaluator).setEstimatorParamMaps(builder).setTrainRatio(0.8)val model = split.fit(frame1)
版权归原作者 大菜狗666 所有, 如有侵权,请联系我们删除。