From 3f6abffcc1780f8271a8729fb99e4cc870c95751 Mon Sep 17 00:00:00 2001 From: "wm624@hotmail.com" Date: Wed, 15 Feb 2017 01:15:50 -0800 Subject: [PATCH] [SPARK-19456][SPARKR] Add LinearSVC R API ## What changes were proposed in this pull request? Linear SVM classifier is newly added into ML and python API has been added. This JIRA is to add R side API. Marked as WIP, as I am designing unit tests. ## How was this patch tested? Please review http://spark.apache.org/contributing.html before opening a pull request. Author: wm624@hotmail.com Closes #16800 from wangmiao1981/svc. --- R/pkg/NAMESPACE | 3 +- R/pkg/R/generics.R | 4 + R/pkg/R/mllib_classification.R | 132 +++++++++++++++ R/pkg/R/mllib_utils.R | 9 +- .../testthat/test_mllib_classification.R | 44 +++++ .../apache/spark/ml/r/LinearSVCWrapper.scala | 151 ++++++++++++++++++ .../org/apache/spark/ml/r/RWrappers.scala | 2 + 7 files changed, 341 insertions(+), 4 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 625c797f8a688..8b265006cbc71 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -65,7 +65,8 @@ exportMethods("glm", "spark.logit", "spark.randomForest", "spark.gbt", - "spark.bisectingKmeans") + "spark.bisectingKmeans", + "spark.svmLinear") # Job group lifecycle management methods export("setJobGroup", diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index d78b1a10d6b42..0d9a9968e2855 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -1401,6 +1401,10 @@ setGeneric("spark.randomForest", #' @export setGeneric("spark.survreg", function(data, formula) { standardGeneric("spark.survreg") }) +#' @rdname spark.svmLinear +#' @export +setGeneric("spark.svmLinear", function(data, formula, ...) { standardGeneric("spark.svmLinear") }) + #' @rdname spark.lda #' @export setGeneric("spark.posterior", function(object, newData) { standardGeneric("spark.posterior") }) diff --git a/R/pkg/R/mllib_classification.R b/R/pkg/R/mllib_classification.R index 552cbe40dad49..fa0d795faa10f 100644 --- a/R/pkg/R/mllib_classification.R +++ b/R/pkg/R/mllib_classification.R @@ -18,6 +18,13 @@ # mllib_regression.R: Provides methods for MLlib classification algorithms # (except for tree-based algorithms) integration +#' S4 class that represents an LinearSVCModel +#' +#' @param jobj a Java object reference to the backing Scala LinearSVCModel +#' @export +#' @note LinearSVCModel since 2.2.0 +setClass("LinearSVCModel", representation(jobj = "jobj")) + #' S4 class that represents an LogisticRegressionModel #' #' @param jobj a Java object reference to the backing Scala LogisticRegressionModel @@ -39,6 +46,131 @@ setClass("MultilayerPerceptronClassificationModel", representation(jobj = "jobj" #' @note NaiveBayesModel since 2.0.0 setClass("NaiveBayesModel", representation(jobj = "jobj")) +#' linear SVM Model +#' +#' Fits an linear SVM model against a SparkDataFrame. It is a binary classifier, similar to svm in glmnet package +#' Users can print, make predictions on the produced model and save the model to the input path. +#' +#' @param data SparkDataFrame for training. +#' @param formula A symbolic description of the model to be fitted. Currently only a few formula +#' operators are supported, including '~', '.', ':', '+', and '-'. +#' @param regParam The regularization parameter. +#' @param maxIter Maximum iteration number. +#' @param tol Convergence tolerance of iterations. +#' @param standardization Whether to standardize the training features before fitting the model. The coefficients +#' of models will be always returned on the original scale, so it will be transparent for +#' users. Note that with/without standardization, the models should be always converged +#' to the same solution when no regularization is applied. +#' @param threshold The threshold in binary classification, in range [0, 1]. +#' @param weightCol The weight column name. +#' @param aggregationDepth The depth for treeAggregate (greater than or equal to 2). If the dimensions of features +#' or the number of partitions are large, this param could be adjusted to a larger size. +#' This is an expert parameter. Default value should be good for most cases. +#' @param ... additional arguments passed to the method. +#' @return \code{spark.svmLinear} returns a fitted linear SVM model. +#' @rdname spark.svmLinear +#' @aliases spark.svmLinear,SparkDataFrame,formula-method +#' @name spark.svmLinear +#' @export +#' @examples +#' \dontrun{ +#' sparkR.session() +#' df <- createDataFrame(iris) +#' training <- df[df$Species %in% c("versicolor", "virginica"), ] +#' model <- spark.svmLinear(training, Species ~ ., regParam = 0.5) +#' summary <- summary(model) +#' +#' # fitted values on training data +#' fitted <- predict(model, training) +#' +#' # save fitted model to input path +#' path <- "path/to/model" +#' write.ml(model, path) +#' +#' # can also read back the saved model and predict +#' # Note that summary deos not work on loaded model +#' savedModel <- read.ml(path) +#' summary(savedModel) +#' } +#' @note spark.svmLinear since 2.2.0 +setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formula"), + function(data, formula, regParam = 0.0, maxIter = 100, tol = 1E-6, standardization = TRUE, + threshold = 0.0, weightCol = NULL, aggregationDepth = 2) { + formula <- paste(deparse(formula), collapse = "") + + if (!is.null(weightCol) && weightCol == "") { + weightCol <- NULL + } else if (!is.null(weightCol)) { + weightCol <- as.character(weightCol) + } + + jobj <- callJStatic("org.apache.spark.ml.r.LinearSVCWrapper", "fit", + data@sdf, formula, as.numeric(regParam), as.integer(maxIter), + as.numeric(tol), as.logical(standardization), as.numeric(threshold), + weightCol, as.integer(aggregationDepth)) + new("LinearSVCModel", jobj = jobj) + }) + +# Predicted values based on an LinearSVCModel model + +#' @param newData a SparkDataFrame for testing. +#' @return \code{predict} returns the predicted values based on an LinearSVCModel. +#' @rdname spark.svmLinear +#' @aliases predict,LinearSVCModel,SparkDataFrame-method +#' @export +#' @note predict(LinearSVCModel) since 2.2.0 +setMethod("predict", signature(object = "LinearSVCModel"), + function(object, newData) { + predict_internal(object, newData) + }) + +# Get the summary of an LinearSVCModel + +#' @param object an LinearSVCModel fitted by \code{spark.svmLinear}. +#' @return \code{summary} returns summary information of the fitted model, which is a list. +#' The list includes \code{coefficients} (coefficients of the fitted model), +#' \code{intercept} (intercept of the fitted model), \code{numClasses} (number of classes), +#' \code{numFeatures} (number of features). +#' @rdname spark.svmLinear +#' @aliases summary,LinearSVCModel-method +#' @export +#' @note summary(LinearSVCModel) since 2.2.0 +setMethod("summary", signature(object = "LinearSVCModel"), + function(object) { + jobj <- object@jobj + features <- callJMethod(jobj, "features") + labels <- callJMethod(jobj, "labels") + coefficients <- callJMethod(jobj, "coefficients") + nCol <- length(coefficients) / length(features) + coefficients <- matrix(unlist(coefficients), ncol = nCol) + intercept <- callJMethod(jobj, "intercept") + numClasses <- callJMethod(jobj, "numClasses") + numFeatures <- callJMethod(jobj, "numFeatures") + if (nCol == 1) { + colnames(coefficients) <- c("Estimate") + } else { + colnames(coefficients) <- unlist(labels) + } + rownames(coefficients) <- unlist(features) + list(coefficients = coefficients, intercept = intercept, + numClasses = numClasses, numFeatures = numFeatures) + }) + +# Save fitted LinearSVCModel to the input path + +#' @param path The directory where the model is saved. +#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE +#' which means throw exception if the output path exists. +#' +#' @rdname spark.svmLinear +#' @aliases write.ml,LinearSVCModel,character-method +#' @export +#' @note write.ml(LogisticRegression, character) since 2.2.0 +setMethod("write.ml", signature(object = "LinearSVCModel", path = "character"), +function(object, path, overwrite = FALSE) { + write_internal(object, path, overwrite) +}) + #' Logistic Regression Model #' #' Fits an logistic regression model against a SparkDataFrame. It supports "binomial": Binary logistic regression diff --git a/R/pkg/R/mllib_utils.R b/R/pkg/R/mllib_utils.R index 29c44739233d7..04a0a6f944412 100644 --- a/R/pkg/R/mllib_utils.R +++ b/R/pkg/R/mllib_utils.R @@ -35,8 +35,9 @@ #' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture}, #' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg}, #' @seealso \link{spark.kmeans}, -#' @seealso \link{spark.lda}, \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes}, -#' @seealso \link{spark.randomForest}, \link{spark.survreg}, +#' @seealso \link{spark.lda}, \link{spark.logit}, +#' @seealso \link{spark.mlp}, \link{spark.naiveBayes}, +#' @seealso \link{spark.randomForest}, \link{spark.survreg}, \link{spark.svmLinear}, #' @seealso \link{read.ml} NULL @@ -51,7 +52,7 @@ NULL #' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg}, #' @seealso \link{spark.kmeans}, #' @seealso \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes}, -#' @seealso \link{spark.randomForest}, \link{spark.survreg} +#' @seealso \link{spark.randomForest}, \link{spark.survreg}, \link{spark.svmLinear} NULL write_internal <- function(object, path, overwrite = FALSE) { @@ -115,6 +116,8 @@ read.ml <- function(path) { new("GBTClassificationModel", jobj = jobj) } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.BisectingKMeansWrapper")) { new("BisectingKMeansModel", jobj = jobj) + } else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LinearSVCWrapper")) { + new("LinearSVCModel", jobj = jobj) } else { stop("Unsupported model: ", jobj) } diff --git a/R/pkg/inst/tests/testthat/test_mllib_classification.R b/R/pkg/inst/tests/testthat/test_mllib_classification.R index 5f84a620c1089..620f528f2e6c8 100644 --- a/R/pkg/inst/tests/testthat/test_mllib_classification.R +++ b/R/pkg/inst/tests/testthat/test_mllib_classification.R @@ -27,6 +27,50 @@ absoluteSparkPath <- function(x) { file.path(sparkHome, x) } +test_that("spark.svmLinear", { + df <- suppressWarnings(createDataFrame(iris)) + training <- df[df$Species %in% c("versicolor", "virginica"), ] + model <- spark.svmLinear(training, Species ~ ., regParam = 0.01, maxIter = 10) + summary <- summary(model) + + # test summary coefficients return matrix type + expect_true(class(summary$coefficients) == "matrix") + expect_true(class(summary$coefficients[, 1]) == "numeric") + + coefs <- summary$coefficients[, "Estimate"] + expected_coefs <- c(-0.1563083, -0.460648, 0.2276626, 1.055085) + expect_true(all(abs(coefs - expected_coefs) < 0.1)) + expect_equal(summary$intercept, -0.06004978, tolerance = 1e-2) + + # Test prediction with string label + prediction <- predict(model, training) + expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character") + expected <- c("versicolor", "versicolor", "versicolor", "virginica", "virginica", + "virginica", "virginica", "virginica", "virginica", "virginica") + expect_equal(sort(as.list(take(select(prediction, "prediction"), 10))[[1]]), expected) + + # Test model save and load + modelPath <- tempfile(pattern = "spark-svm-linear", fileext = ".tmp") + write.ml(model, modelPath) + expect_error(write.ml(model, modelPath)) + write.ml(model, modelPath, overwrite = TRUE) + model2 <- read.ml(modelPath) + coefs <- summary(model)$coefficients + coefs2 <- summary(model2)$coefficients + expect_equal(coefs, coefs2) + unlink(modelPath) + + # Test prediction with numeric label + label <- c(0.0, 0.0, 0.0, 1.0, 1.0) + feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776) + data <- as.data.frame(cbind(label, feature)) + df <- createDataFrame(data) + model <- spark.svmLinear(df, label ~ feature, regParam = 0.1) + prediction <- collect(select(predict(model, df), "prediction")) + expect_equal(sort(prediction$prediction), c("0.0", "0.0", "0.0", "1.0", "1.0")) + +}) + test_that("spark.logit", { # R code to reproduce the result. # nolint start diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala new file mode 100644 index 0000000000000..2454258cf4cfd --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.r + +import org.apache.hadoop.fs.Path +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.ml.{Pipeline, PipelineModel} +import org.apache.spark.ml.classification.{LinearSVC, LinearSVCModel} +import org.apache.spark.ml.feature.{IndexToString, RFormula} +import org.apache.spark.ml.r.RWrapperUtils._ +import org.apache.spark.ml.util._ +import org.apache.spark.sql.{DataFrame, Dataset} + +private[r] class LinearSVCWrapper private ( + val pipeline: PipelineModel, + val features: Array[String], + val labels: Array[String]) extends MLWritable { + import LinearSVCWrapper._ + + private val svcModel: LinearSVCModel = + pipeline.stages(1).asInstanceOf[LinearSVCModel] + + lazy val coefficients: Array[Double] = svcModel.coefficients.toArray + + lazy val intercept: Double = svcModel.intercept + + lazy val numClasses: Int = svcModel.numClasses + + lazy val numFeatures: Int = svcModel.numFeatures + + def transform(dataset: Dataset[_]): DataFrame = { + pipeline.transform(dataset) + .drop(PREDICTED_LABEL_INDEX_COL) + .drop(svcModel.getFeaturesCol) + .drop(svcModel.getLabelCol) + } + + override def write: MLWriter = new LinearSVCWrapper.LinearSVCWrapperWriter(this) +} + +private[r] object LinearSVCWrapper + extends MLReadable[LinearSVCWrapper] { + + val PREDICTED_LABEL_INDEX_COL = "pred_label_idx" + val PREDICTED_LABEL_COL = "prediction" + + def fit( + data: DataFrame, + formula: String, + regParam: Double, + maxIter: Int, + tol: Double, + standardization: Boolean, + threshold: Double, + weightCol: String, + aggregationDepth: Int + ): LinearSVCWrapper = { + + val rFormula = new RFormula() + .setFormula(formula) + .setForceIndexLabel(true) + checkDataColumns(rFormula, data) + val rFormulaModel = rFormula.fit(data) + + val fitIntercept = rFormula.hasIntercept + + // get labels and feature names from output schema + val (features, labels) = getFeaturesAndLabels(rFormulaModel, data) + + // assemble and fit the pipeline + val svc = new LinearSVC() + .setRegParam(regParam) + .setMaxIter(maxIter) + .setTol(tol) + .setFitIntercept(fitIntercept) + .setStandardization(standardization) + .setFeaturesCol(rFormula.getFeaturesCol) + .setLabelCol(rFormula.getLabelCol) + .setPredictionCol(PREDICTED_LABEL_INDEX_COL) + .setThreshold(threshold) + .setAggregationDepth(aggregationDepth) + + if (weightCol != null) svc.setWeightCol(weightCol) + + val idxToStr = new IndexToString() + .setInputCol(PREDICTED_LABEL_INDEX_COL) + .setOutputCol(PREDICTED_LABEL_COL) + .setLabels(labels) + + val pipeline = new Pipeline() + .setStages(Array(rFormulaModel, svc, idxToStr)) + .fit(data) + + new LinearSVCWrapper(pipeline, features, labels) + } + + override def read: MLReader[LinearSVCWrapper] = new LinearSVCWrapperReader + + override def load(path: String): LinearSVCWrapper = super.load(path) + + class LinearSVCWrapperWriter(instance: LinearSVCWrapper) extends MLWriter { + + override protected def saveImpl(path: String): Unit = { + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + + val rMetadata = ("class" -> instance.getClass.getName) ~ + ("features" -> instance.features.toSeq) ~ + ("labels" -> instance.labels.toSeq) + val rMetadataJson: String = compact(render(rMetadata)) + sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath) + + instance.pipeline.save(pipelinePath) + } + } + + class LinearSVCWrapperReader extends MLReader[LinearSVCWrapper] { + + override def load(path: String): LinearSVCWrapper = { + implicit val format = DefaultFormats + val rMetadataPath = new Path(path, "rMetadata").toString + val pipelinePath = new Path(path, "pipeline").toString + + val rMetadataStr = sc.textFile(rMetadataPath, 1).first() + val rMetadata = parse(rMetadataStr) + val features = (rMetadata \ "features").extract[Array[String]] + val labels = (rMetadata \ "labels").extract[Array[String]] + + val pipeline = PipelineModel.load(pipelinePath) + new LinearSVCWrapper(pipeline, features, labels) + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala index c44179281ba1c..358e522dfe1c8 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/RWrappers.scala @@ -66,6 +66,8 @@ private[r] object RWrappers extends MLReader[Object] { GBTClassifierWrapper.load(path) case "org.apache.spark.ml.r.BisectingKMeansWrapper" => BisectingKMeansWrapper.load(path) + case "org.apache.spark.ml.r.LinearSVCWrapper" => + LinearSVCWrapper.load(path) case _ => throw new SparkException(s"SparkR read.ml does not support load $className") }