批量进行One-hot-encoder且进行特征字段拼接,并完成模型训练demo

原创
2018/09/13 11:33
阅读数 1.5K

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{StringIndexer, OneHotEncoder}
import org.apache.spark.ml.feature.VectorAssembler
import ml.dmlc.xgboost4j.scala.spark.{XGBoostEstimator, XGBoostClassificationModel}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.PipelineModel



val data = (spark.read.format("csv")
  .option("sep", ",")
  .option("inferSchema", "true")
  .option("header", "true")
  .load("/user/spark/security/Affairs.csv"))

data.createOrReplaceTempView("res1")

val affairs = "case when affairs>0 then 1 else 0 end as affairs,"
val df = (spark.sql("select " + affairs +
  "gender,age,yearsmarried,children,religiousness,education,occupation,rating" +
  " from res1 "))
  
  
val categoricals = df.dtypes.filter(_._2 == "StringType") map (_._1)

val indexers = categoricals.map(
  c => new StringIndexer().setInputCol(c).setOutputCol(s"${c}_idx")
)

val encoders = categoricals.map(
  c => new OneHotEncoder().setInputCol(s"${c}_idx").setOutputCol(s"${c}_enc").setDropLast(false)
)
  
  
val colArray_enc = categoricals.map(x => x + "_enc")
val colArray_numeric = df.dtypes.filter(_._2 != "StringType") map (_._1)

val final_colArray = (colArray_numeric ++ colArray_enc).filter(!_.contains("affairs"))

val vectorAssembler = new VectorAssembler().setInputCols(final_colArray).setOutputCol("features")

/*
val pipeline = new Pipeline().setStages(indexers ++ encoders ++ Array(vectorAssembler))
pipeline.fit(df).transform(df)
*/



///
// Create an XGBoost Classifier 
val xgb = new XGBoostEstimator(Map("num_class" -> 2, "num_rounds" -> 5, "objective" -> "binary:logistic", "booster" -> "gbtree")).setLabelCol("affairs").setFeaturesCol("features")
	  

// XGBoost paramater grid
val xgbParamGrid = (new ParamGridBuilder()
   .addGrid(xgb.round, Array(10))
   .addGrid(xgb.maxDepth, Array(10,20))
   .addGrid(xgb.minChildWeight, Array(0.1))
   .addGrid(xgb.gamma, Array(0.1))
   .addGrid(xgb.subSample, Array(0.8))
   .addGrid(xgb.colSampleByTree, Array(0.90))
   .addGrid(xgb.alpha, Array(0.0))
   .addGrid(xgb.lambda, Array(0.6))
   .addGrid(xgb.scalePosWeight, Array(0.1))
   .addGrid(xgb.eta, Array(0.4))
   .addGrid(xgb.boosterType, Array("gbtree"))
   .addGrid(xgb.objective, Array("binary:logistic")) 
   .build())
   
// Create the XGBoost pipeline
val pipeline = new Pipeline().setStages(indexers ++ encoders ++ Array(vectorAssembler, xgb))


// Setup the binary classifier evaluator
val evaluator = (new BinaryClassificationEvaluator()
   .setLabelCol("affairs")
   .setRawPredictionCol("prediction")
   .setMetricName("areaUnderROC"))

   
// Create the Cross Validation pipeline, using XGBoost as the estimator, the
// Binary Classification evaluator, and xgbParamGrid for hyperparameters
val cv = (new CrossValidator()
   .setEstimator(pipeline)
   .setEvaluator(evaluator)
   .setEstimatorParamMaps(xgbParamGrid)
   .setNumFolds(3)
   .setSeed(0))


 // Create the model by fitting the training data
val xgbModel = cv.fit(df)

 // Test the data by scoring the model
val results = xgbModel.transform(df)



// Print out a copy of the parameters used by XGBoost, attention pipeline
(xgbModel.bestModel.asInstanceOf[PipelineModel]
  .stages(5).asInstanceOf[XGBoostClassificationModel]
  .extractParamMap().toSeq.foreach(println))
  
results.select("affairs","prediction").show

println("---Confusion Matrix------")
results.stat.crosstab("affairs","prediction").show()


// What was the overall accuracy of the model, using AUC
val auc = evaluator.evaluate(results)
println("----AUC--------")
println("auc="+auc)

展开阅读全文
打赏
1
0 收藏
分享
加载中
你好:
可以加个联系方式吗,我使用您的代码 发现输出数据有问题,为什么会出现[(248,[0,1,2,4,10,11,13,15,16,17,18,23。。。],[200.0,1.59551075E8,1.0,36.0,1.0,1.0,2.0,1.0,1.0,123.0,0.5348837209302325,]
代码
val stagesArray = new ListBuffer[PipelineStage]()
val vectorizeCol = udf((v: Double) => Vectors.dense(Array(v)))
for (cate <- categoricalColumns) {
productDF = productDF.withColumn(s"${cate}Vec", vectorizeCol(productDF(cate)))
val scaler = new MinMaxScaler()
.setInputCol(s"${cate}Vec")
.setOutputCol(s"${cate}Vectors")
stagesArray.append(scaler)
}
val numeric_arr = productDF.dtypes.filter(_._2 != "StringType").map (_._1)
val assemblerInputs = (numeric_arr ++ categoricalColumns.map(_ + "Vectors")).filter(!_.contains("is_click"))
val assembler = new VectorAssembler().setInputCols(assemblerInputs).setOutputCol("features")
stagesArray.append(assembler)
val pipeline = new Pipeline()
pipeline.setStages(stagesArray.toArray)
val pipelineModel = pipeline.fit(productDF)
val dataset = pipelineModel.transform(productDF)
2018/12/25 19:05
回复
举报
KYO4321博主
##scala批量转换数据类型
https://blog.csdn.net/dkl12/article/details/80256585


##随机森林特征数据批量处理
https://blog.csdn.net/Nougats/article/details/73929182


##操作逻辑回归
https://www.cnblogs.com/wwxbi/p/6224670.html


##SPARK函数扩展UDF
https://www.jianshu.com/p/833b72adb2b6
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-Column.html
https://stackoverflow.com/questions/35227568/applying-function-to-spark-dataframe-column
2018/09/13 11:35
回复
举报
KYO4321博主
////待操作
//使用默认的参数训练XGB ---√
//使用原始数据预测模型 ---√
//参数造一批数据预测模型:与源数据特征值一致,比源数据特征值少,比源数据特征值偏多 https://github.com/dmlc/xgboost/blob/1c08b3b2eaf356a448c156271c394692dc698065/jvm-packages/xgboost4j/src/main/scala/ml/dmlc/xgboost4j/scala/Booster.scala
//关键是用xgb.train的方式可以保存模型特征 https://github.com/dmlc/xgboost/blob/0988fb191f6aacde50020b2d5e94c92df2ac4e58/jvm-packages/xgboost4j-example/src/main/scala/ml/dmlc/xgboost4j/scala/example/BasicWalkThrough.scala


//使用spark中的xgb.train训练模型:先用pipeline得到best_model param,再用xgb.train操作训练模型
//https://xgboost.readthedocs.io/en/latest/jvm/xgboost4j_spark_tutorial.html
2018/09/13 11:33
回复
举报
更多评论
打赏
3 评论
0 收藏
1
分享
返回顶部
顶部