第1关 MLlib介绍
package com.educoder.bigData.sparksql5;import java.util.Arrays;import java.util.List;import org.apache.spark.ml.Pipeline;import org.apache.spark.ml.PipelineModel;import org.apache.spark.ml.PipelineStage;import org.apache.spark.ml.classification.LogisticRegression;import org.apache.spark.ml.feature.HashingTF;import org.apache.spark.ml.feature.Tokenizer;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.Metadata;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;publicclassTest1{publicstaticvoidmain(String[] args){
SparkSession spark = SparkSession.builder().appName("test1").master("local").getOrCreate();
List<Row> trainingList = Arrays.asList(
RowFactory.create(1.0,"a b c d E spark"),
RowFactory.create(0.0,"b d"),
RowFactory.create(1.0,"hadoop Mapreduce"),
RowFactory.create(0.0,"f g h"));
List<Row> testList = Arrays.asList(
RowFactory.create(0.0,"spark I j k"),
RowFactory.create(0.0,"l M n"),
RowFactory.create(0.0,"f g"),
RowFactory.create(0.0,"apache hadoop"));/********* Begin *********/
StructType schema =newStructType(newStructField[]{newStructField("label", DataTypes.DoubleType,false, Metadata.empty()),newStructField("text", DataTypes.StringType,false, Metadata.empty())});
Dataset<Row> training = spark.createDataFrame(trainingList, schema);
Dataset<Row> test = spark.createDataFrame(testList, schema);
Tokenizer tokenizer =newTokenizer().setInputCol("text").setOutputCol("words");
HashingTF hashingTF =newHashingTF().setNumFeatures(1000).setInputCol("words").setOutputCol("features");
LogisticRegression lr =newLogisticRegression().setMaxIter(10).setRegParam(0.001);
Pipeline pipeline =newPipeline().setStages(newPipelineStage[]{ tokenizer, hashingTF, lr });
PipelineModel fit = pipeline.fit(training);
fit.transform(test).select("prediction").show();/********* End *********/}}
第2关 MLlib-垃圾邮件检测
package com.educoder.bigData.sparksql5;import java.util.Arrays;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.ml.Pipeline;import org.apache.spark.ml.PipelineModel;import org.apache.spark.ml.PipelineStage;import org.apache.spark.ml.classification.GBTClassifier;import org.apache.spark.ml.feature.StringIndexer;import org.apache.spark.ml.feature.Word2Vec;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.Metadata;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;publicclassCase2{publicstatic PipelineModel training(SparkSession spark){/********* Begin *********/
JavaRDD<Row> map = spark.read().textFile("SMSSpamCollection").toJavaRDD().map(String -> String.split(" ")).map(newFunction<String[], Row>(){
@Override
public Row call(String[] v1) throws Exception {
String[] copyOfRange = Arrays.copyOfRange(v1,1, v1.length);
Row create = RowFactory.create(v1[0], copyOfRange);return create;}});
StructType schema =newStructType(newStructField[]{newStructField("label", DataTypes.StringType,false, Metadata.empty()),newStructField("message", DataTypes.createArrayType(DataTypes.StringType),false, Metadata.empty())});
Dataset<Row> data = spark.createDataFrame(map, schema);
StringIndexer labelIndexer =newStringIndexer().setInputCol("label").setOutputCol("indexedLabel");
Word2Vec word2Vec =newWord2Vec().setInputCol("message").setOutputCol("features");
GBTClassifier mlpc =newGBTClassifier().setLabelCol("indexedLabel").setFeaturesCol("features");
Pipeline pipeline =newPipeline().setStages(newPipelineStage[]{ labelIndexer, word2Vec, mlpc});
PipelineModel fit = pipeline.fit(data);/********* End *********/return fit;}}
第3关 MLlib-红酒分类预测
package com.educoder.bigData.sparksql5;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.function.Function;import org.apache.spark.ml.Pipeline;import org.apache.spark.ml.PipelineModel;import org.apache.spark.ml.PipelineStage;import org.apache.spark.ml.classification.*;import org.apache.spark.ml.linalg.VectorUDT;import org.apache.spark.ml.linalg.Vectors;import org.apache.spark.sql.Dataset;import org.apache.spark.sql.Row;import org.apache.spark.sql.RowFactory;import org.apache.spark.sql.SparkSession;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.Metadata;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;publicclassCase3{publicstatic PipelineModel training(SparkSession spark){/********* Begin *********/
JavaRDD<Row> javaRDD = spark.read().csv("dataset.csv").toJavaRDD();
JavaRDD<Row> map = javaRDD.map(newFunction<Row, Row>(){
@Override
public Row call(Row v1) throws Exception {
int size = v1.size();// 第一列为标签
double labelDouble = Double.parseDouble(v1.get(0).toString());// 获取特征数组
double[] features =newdouble[size -1];for(int n =1; n < size ; n++){
features[n-1]= Double.parseDouble(v1.get(n).toString());}// 创建 row
Row create = RowFactory.create(labelDouble,Vectors.dense(features));return create;}});
StructType schema =newStructType(newStructField[]{newStructField("label", DataTypes.DoubleType,false, Metadata.empty()),newStructField("features",newVectorUDT(),false, Metadata.empty())});
Dataset<Row> createDataFrame = spark.createDataFrame(map, schema);
RandomForestClassifier mlpc =newRandomForestClassifier().setLabelCol("label").setFeaturesCol("features");
Pipeline pipeline =newPipeline().setStages(newPipelineStage[]{ mlpc});
PipelineModel fit = pipeline.fit(createDataFrame);/********* End *********/return fit;}}
标签:
java
本文转载自: https://blog.csdn.net/bzzb52/article/details/138454532
版权归原作者 自信 喵 QAQ 所有, 如有侵权,请联系我们删除。
版权归原作者 自信 喵 QAQ 所有, 如有侵权,请联系我们删除。