From c8a9040fee8cef815a34f0865683032373e14d7e Mon Sep 17 00:00:00 2001 From: hzjane Date: Tue, 13 Sep 2022 12:04:02 +0800 Subject: [PATCH 1/7] init gbt class --- ...iningExampleOnCriteoClickLogsDataset.scala | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala new file mode 100644 index 00000000000..301a8084b9a --- /dev/null +++ b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala @@ -0,0 +1,186 @@ + +package com.intel.analytics.bigdl.dllib.example.nnframes.gbt + +import com.intel.analytics.bigdl.dllib.NNContext +import com.intel.analytics.bigdl.dllib.example.nnframes.xgboost.xgbClassifierTrainingExampleOnCriteoClickLogsDataset.feature_nums +import ml.dmlc.xgboost4j.scala.spark.TrackerConf +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.classification.GBTClassifier +import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, VectorIndexer} +import org.apache.spark.sql.types.{LongType, StructField, StructType} +import org.apache.spark.sql.{Row, SQLContext} +import scopt.OptionParser +import org.slf4j.{Logger, LoggerFactory} + + +class Task extends Serializable { + + val default_missing_value = "-999" + + def rowToLibsvm(row: Row): String = { + 0 until row.length flatMap { + case 0 => Some(row(0).toString) + case i if row(i) == null => Some(default_missing_value) + case i => Some((if (i < 14) row(i) + else java.lang.Long.parseLong(row(i).toString, 16)).toString) + } mkString " " + } +} + +case class Params( + trainingDataPath: String = "/host/data", + modelSavePath: String = "/host/data/model", + numThreads: Int = 2, + maxIter: Int = 100, + maxDepth: Int = 2, + numWorkers: Int = 1 + ) + +object gbtClassifierTrainingExampleOnCriteoClickLogsDataset { + + val feature_nums = 39 + + def main(args: Array[String]): Unit = { + + val log: Logger = LoggerFactory.getLogger(this.getClass) + + + // parse params and set value + + val params = parser.parse(args, new Params).get + val trainingDataPath = params.trainingDataPath // path to data + val modelSavePath = params.modelSavePath // save model to this path + val numThreads = params.numThreads // xgboost threads + val maxIter = params.maxIter // train round + val maxDepth = params.maxDepth // tree max depth + val numWorkers = params.numWorkers // Workers num + + + val sc = NNContext.initNNContext() + // val sc = new SparkContext() + val spark = SQLContext.getOrCreate(sc) + + val task = new Task() + + val tStart = System.nanoTime() + // read csv files to dataframe + var df = spark.read.option("header", "false"). + option("inferSchema", "true").option("delimiter", "\t").csv(trainingDataPath) + + val tBeforePreprocess = System.nanoTime() + var elapsed = (tBeforePreprocess - tStart) / 1000000000.0f // second + log.info("--reading data time is " + elapsed + " s") + // preprocess data + val processedRdd = df.rdd.map(task.rowToLibsvm) + + // declare schema + var structFieldArray = new Array[StructField](feature_nums + 1) + for (i <- 0 to feature_nums) { + structFieldArray(i) = StructField("_c" + i.toString, LongType, true) + } + var schema = new StructType(structFieldArray) + + // convert RDD to RDD[Row] + val rowRDD = processedRdd.map(_.split(" ")).map(row => Row.fromSeq( + for { + i <- 0 to feature_nums + } yield { + row(i).toLong + } + )) + // RDD[Row] to Dataframe + df = spark.createDataFrame(rowRDD, schema) + + + val stringIndexer = new StringIndexer() + .setInputCol("_c0") + .setOutputCol("classIndex") + .fit(df) + val labelTransformed = stringIndexer.transform(df).drop("_c0") + + var inputCols = new Array[String](feature_nums) + for (i <- 0 to feature_nums - 1) { + inputCols(i) = "_c" + (i + 1).toString + } + + val vectorAssembler = new VectorAssembler(). + setInputCols(inputCols). + setOutputCol("features") + + val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "classIndex") + // randomly split dataset to (train, eval1, eval2, test) in proportion 6:2:1:1 + val Array(train, eval1, eval2, test) = xgbInput.randomSplit(Array(0.6, 0.2, 0.1, 0.1)) + + train.cache().count() + eval1.cache().count() + eval2.cache().count() + + val tBeforeTraining = System.nanoTime() + elapsed = (tBeforeTraining - tBeforePreprocess) / 1000000000.0f // second + log.info("--preprocess time is " + elapsed + " s") + // use scala tracker + // val gbtParam = Map("tracker_conf" -> TrackerConf(0L, "scala"), + // "eval_sets" -> Map("eval1" -> eval1, "eval2" -> eval2) + // ) + + // Train a GBT model. + val gbtClassifier = new GBTClassifier() + gbtClassifier.setFeaturesCol("features") + gbtClassifier.setLabelCol("classIndex") + gbtClassifier.setMaxDepth(maxDepth) + gbtClassifier.setMaxIter(maxIter) +// gbtClassifier.setNumClass(2) +// gbtClassifier.setNumWorkers(numWorkers) +// gbtClassifier.setNthread(numThreads) +// gbtClassifier.setNumRound(numRound) + gbtClassifier.setFeatureSubsetStrategy("auto") +// gbtClassifier.setObjective("multi:softprob") +// gbtClassifier.setTimeoutRequestWorkers(180000L) + + + // Train model. This also runs the indexer. + val gbtClassificationModel = gbtClassifier.fit(train) + val tAfterTraining = System.nanoTime() + elapsed = (tAfterTraining - tBeforeTraining) / 1000000000.0f // second + log.info("--training time is " + elapsed + " s") + + gbtClassificationModel.save(modelSavePath) + + val tAfterSave = System.nanoTime() + elapsed = (tAfterSave - tAfterTraining) / 1000000000.0f // second + log.info("--model save time is " + elapsed + " s") + elapsed = (tAfterSave - tStart) / 1000000000.0f // second + log.info("--end-to-end time is " + elapsed + " s") + sc.stop() + } + + val parser: OptionParser[Params] = new OptionParser[Params]("input xgboost config") { + opt[String]('i', "trainingDataPath") + .text("trainingData Path") + .action((v, p) => p.copy(trainingDataPath = v)) + .required() + + opt[String]('s', "modelSavePath") + .text("savePath of model") + .action((v, p) => p.copy(modelSavePath = v)) + .required() + + opt[Int]('t', "numThreads") + .text("threads num") + .action((v, p) => p.copy(numThreads = v)) + + opt[Int]('I', "maxIter") + .text("maxIter") + .action((v, p) => p.copy(maxIter = v)) + + opt[Int]('d', "maxDepth") + .text("maxDepth") + .action((v, p) => p.copy(maxDepth = v)) + + opt[Int]('w', "numWorkers") + .text("Workers num") + .action((v, p) => p.copy(numWorkers = v)) + + } +} + From 8b3552e4a623f7220acc6219fedac87fc148c875 Mon Sep 17 00:00:00 2001 From: hzjane Date: Tue, 13 Sep 2022 14:59:53 +0800 Subject: [PATCH 2/7] remove something unimportant --- ...iningExampleOnCriteoClickLogsDataset.scala | 21 +------------------ 1 file changed, 1 insertion(+), 20 deletions(-) diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala index 301a8084b9a..5f0508b4724 100644 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala +++ b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala @@ -50,14 +50,10 @@ object gbtClassifierTrainingExampleOnCriteoClickLogsDataset { val params = parser.parse(args, new Params).get val trainingDataPath = params.trainingDataPath // path to data val modelSavePath = params.modelSavePath // save model to this path - val numThreads = params.numThreads // xgboost threads - val maxIter = params.maxIter // train round + val maxIter = params.maxIter // train max Iter val maxDepth = params.maxDepth // tree max depth - val numWorkers = params.numWorkers // Workers num - val sc = NNContext.initNNContext() - // val sc = new SparkContext() val spark = SQLContext.getOrCreate(sc) val task = new Task() @@ -129,14 +125,7 @@ object gbtClassifierTrainingExampleOnCriteoClickLogsDataset { gbtClassifier.setLabelCol("classIndex") gbtClassifier.setMaxDepth(maxDepth) gbtClassifier.setMaxIter(maxIter) -// gbtClassifier.setNumClass(2) -// gbtClassifier.setNumWorkers(numWorkers) -// gbtClassifier.setNthread(numThreads) -// gbtClassifier.setNumRound(numRound) gbtClassifier.setFeatureSubsetStrategy("auto") -// gbtClassifier.setObjective("multi:softprob") -// gbtClassifier.setTimeoutRequestWorkers(180000L) - // Train model. This also runs the indexer. val gbtClassificationModel = gbtClassifier.fit(train) @@ -165,10 +154,6 @@ object gbtClassifierTrainingExampleOnCriteoClickLogsDataset { .action((v, p) => p.copy(modelSavePath = v)) .required() - opt[Int]('t', "numThreads") - .text("threads num") - .action((v, p) => p.copy(numThreads = v)) - opt[Int]('I', "maxIter") .text("maxIter") .action((v, p) => p.copy(maxIter = v)) @@ -177,10 +162,6 @@ object gbtClassifierTrainingExampleOnCriteoClickLogsDataset { .text("maxDepth") .action((v, p) => p.copy(maxDepth = v)) - opt[Int]('w', "numWorkers") - .text("Workers num") - .action((v, p) => p.copy(numWorkers = v)) - } } From ab485ed36616e9b963ff35c3859e6d440e38ceaf Mon Sep 17 00:00:00 2001 From: hzjane Date: Tue, 13 Sep 2022 16:36:25 +0800 Subject: [PATCH 3/7] add readme --- .../dllib/example/nnframes/gbt/README.md | 45 +++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/README.md diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/README.md b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/README.md new file mode 100644 index 00000000000..97f0a2e2b4d --- /dev/null +++ b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/README.md @@ -0,0 +1,45 @@ +# Prepare + +## Environment +- Spark 2.4 or Spark 3.1 +- BigDL 2.0 + +## Data Prepare + +### BigDL nightly build + +You can download [here](https://bigdl.readthedocs.io/en/latest/doc/release.html). +For spark 2.4 you need `bigdl-dllib-spark_2.4.6-0.14.0-build_time-jar-with-dependencies.jar` or `bigdl-dllib-spark_3.1.2-0.14.0-build_time-jar-with-dependencies.jar` for spark 3.1 . + +# GBT On Criteo-click-logs-dataset +## Download data +You can download the criteo-1tb-click-logs-dataset from [here](https://ailab.criteo.com/download-criteo-1tb-click-logs-dataset/). Then unzip the files you downloaded and Split 1g data to a folder. + +## Train +``` +spark-submit \ + --master local[4] \ + --conf spark.task.cpus=4 \ + --class com.intel.analytics.bigdl.dllib.example.nnframes.gbt.gbtClassifierTrainingExampleOnCriteoClickLogsDataset \ + --num-executors 2 \ + --executor-cores 4 \ + --executor-memory 4G \ + --driver-memory 10G \ + /path/to/bigdl-dllib-spark_3.1.2-0.14.0-SNAPSHOT-jar-with-dependencies.jar \ + -i /path/to/preprocessed-data/saved -s /path/to/model/saved -I max_Iter -d max_depth +``` + +parameters: +- input_path: String. Path to criteo-click-logs-dataset. +- modelsave_path: String. Path to model to be saved. +- max_iter: Int. Training max iter. +- max_depth: Int. Tree max depth. + +The tree of folder `/path/to/model/saved` is: +``` +/path/to/model/saved +├── data +└── metadata + ├── part-00000 + └── _SUCCESS +``` From 9e188ef3706c8107b057fd8219e1f65ce8b6eba4 Mon Sep 17 00:00:00 2001 From: hzjane Date: Tue, 13 Sep 2022 16:41:05 +0800 Subject: [PATCH 4/7] change xgb to gbt --- ...rainingExampleOnCriteoClickLogsDataset.scala | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala index 5f0508b4724..54e4b277daa 100644 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala +++ b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala @@ -2,15 +2,12 @@ package com.intel.analytics.bigdl.dllib.example.nnframes.gbt import com.intel.analytics.bigdl.dllib.NNContext -import com.intel.analytics.bigdl.dllib.example.nnframes.xgboost.xgbClassifierTrainingExampleOnCriteoClickLogsDataset.feature_nums -import ml.dmlc.xgboost4j.scala.spark.TrackerConf -import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.GBTClassifier -import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler, VectorIndexer} +import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler} import org.apache.spark.sql.types.{LongType, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} -import scopt.OptionParser import org.slf4j.{Logger, LoggerFactory} +import scopt.OptionParser class Task extends Serializable { @@ -30,10 +27,8 @@ class Task extends Serializable { case class Params( trainingDataPath: String = "/host/data", modelSavePath: String = "/host/data/model", - numThreads: Int = 2, maxIter: Int = 100, - maxDepth: Int = 2, - numWorkers: Int = 1 + maxDepth: Int = 2 ) object gbtClassifierTrainingExampleOnCriteoClickLogsDataset { @@ -103,9 +98,9 @@ object gbtClassifierTrainingExampleOnCriteoClickLogsDataset { setInputCols(inputCols). setOutputCol("features") - val xgbInput = vectorAssembler.transform(labelTransformed).select("features", "classIndex") + val gbtInput = vectorAssembler.transform(labelTransformed).select("features", "classIndex") // randomly split dataset to (train, eval1, eval2, test) in proportion 6:2:1:1 - val Array(train, eval1, eval2, test) = xgbInput.randomSplit(Array(0.6, 0.2, 0.1, 0.1)) + val Array(train, eval1, eval2, test) = gbtInput.randomSplit(Array(0.6, 0.2, 0.1, 0.1)) train.cache().count() eval1.cache().count() @@ -143,7 +138,7 @@ object gbtClassifierTrainingExampleOnCriteoClickLogsDataset { sc.stop() } - val parser: OptionParser[Params] = new OptionParser[Params]("input xgboost config") { + val parser: OptionParser[Params] = new OptionParser[Params]("input xgb config") { opt[String]('i', "trainingDataPath") .text("trainingData Path") .action((v, p) => p.copy(trainingDataPath = v)) From 64e262c4bb9116bb619cc37d44e3f774ae3a7d07 Mon Sep 17 00:00:00 2001 From: hzjane Date: Tue, 13 Sep 2022 16:42:50 +0800 Subject: [PATCH 5/7] change xgb to gbt1 --- .../gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala index 54e4b277daa..e6919474680 100644 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala +++ b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala @@ -138,7 +138,7 @@ object gbtClassifierTrainingExampleOnCriteoClickLogsDataset { sc.stop() } - val parser: OptionParser[Params] = new OptionParser[Params]("input xgb config") { + val parser: OptionParser[Params] = new OptionParser[Params]("input gbt config") { opt[String]('i', "trainingDataPath") .text("trainingData Path") .action((v, p) => p.copy(trainingDataPath = v)) From 29eb62ec3d769fa1a8732220c1b88adf682a8cfd Mon Sep 17 00:00:00 2001 From: hzjane Date: Tue, 13 Sep 2022 16:46:16 +0800 Subject: [PATCH 6/7] use overwrite to save --- .../gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala index e6919474680..80f34d75a7f 100644 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala +++ b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala @@ -128,7 +128,7 @@ object gbtClassifierTrainingExampleOnCriteoClickLogsDataset { elapsed = (tAfterTraining - tBeforeTraining) / 1000000000.0f // second log.info("--training time is " + elapsed + " s") - gbtClassificationModel.save(modelSavePath) + gbtClassificationModel.write.overwrite().save(modelSavePath) val tAfterSave = System.nanoTime() elapsed = (tAfterSave - tAfterTraining) / 1000000000.0f // second From 3c863fa015ea8c82c5e3a99919e893369789e920 Mon Sep 17 00:00:00 2001 From: hzjane Date: Tue, 13 Sep 2022 17:02:21 +0800 Subject: [PATCH 7/7] add text to fix message=Header does not match expected text line=1 --- ...rTrainingExampleOnCriteoClickLogsDataset.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala index 80f34d75a7f..227bcfdae24 100644 --- a/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala +++ b/scala/dllib/src/main/scala/com/intel/analytics/bigdl/dllib/example/nnframes/gbt/gbtClassifierTrainingExampleOnCriteoClickLogsDataset.scala @@ -1,3 +1,18 @@ +/* + * Copyright 2016 The BigDL Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.intel.analytics.bigdl.dllib.example.nnframes.gbt