Skip to content

Commit

Permalink
[SPARK-19456][SPARKR] Add LinearSVC R API
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Linear SVM classifier is newly added into ML and python API has been added. This JIRA is to add R side API.

Marked as WIP, as I am designing unit tests.

## How was this patch tested?

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: [email protected] <[email protected]>

Closes apache#16800 from wangmiao1981/svc.
  • Loading branch information
wangmiao1981 authored and cmonkey committed Feb 15, 2017
1 parent 7c9cec4 commit 3f6abff
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 4 deletions.
3 changes: 2 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ exportMethods("glm",
"spark.logit",
"spark.randomForest",
"spark.gbt",
"spark.bisectingKmeans")
"spark.bisectingKmeans",
"spark.svmLinear")

# Job group lifecycle management methods
export("setJobGroup",
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1401,6 +1401,10 @@ setGeneric("spark.randomForest",
#' @export
setGeneric("spark.survreg", function(data, formula) { standardGeneric("spark.survreg") })

#' @rdname spark.svmLinear
#' @export
setGeneric("spark.svmLinear", function(data, formula, ...) { standardGeneric("spark.svmLinear") })

#' @rdname spark.lda
#' @export
setGeneric("spark.posterior", function(object, newData) { standardGeneric("spark.posterior") })
Expand Down
132 changes: 132 additions & 0 deletions R/pkg/R/mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@
# mllib_regression.R: Provides methods for MLlib classification algorithms
# (except for tree-based algorithms) integration

#' S4 class that represents an LinearSVCModel
#'
#' @param jobj a Java object reference to the backing Scala LinearSVCModel
#' @export
#' @note LinearSVCModel since 2.2.0
setClass("LinearSVCModel", representation(jobj = "jobj"))

#' S4 class that represents an LogisticRegressionModel
#'
#' @param jobj a Java object reference to the backing Scala LogisticRegressionModel
Expand All @@ -39,6 +46,131 @@ setClass("MultilayerPerceptronClassificationModel", representation(jobj = "jobj"
#' @note NaiveBayesModel since 2.0.0
setClass("NaiveBayesModel", representation(jobj = "jobj"))

#' linear SVM Model
#'
#' Fits an linear SVM model against a SparkDataFrame. It is a binary classifier, similar to svm in glmnet package
#' Users can print, make predictions on the produced model and save the model to the input path.
#'
#' @param data SparkDataFrame for training.
#' @param formula A symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
#' @param regParam The regularization parameter.
#' @param maxIter Maximum iteration number.
#' @param tol Convergence tolerance of iterations.
#' @param standardization Whether to standardize the training features before fitting the model. The coefficients
#' of models will be always returned on the original scale, so it will be transparent for
#' users. Note that with/without standardization, the models should be always converged
#' to the same solution when no regularization is applied.
#' @param threshold The threshold in binary classification, in range [0, 1].
#' @param weightCol The weight column name.
#' @param aggregationDepth The depth for treeAggregate (greater than or equal to 2). If the dimensions of features
#' or the number of partitions are large, this param could be adjusted to a larger size.
#' This is an expert parameter. Default value should be good for most cases.
#' @param ... additional arguments passed to the method.
#' @return \code{spark.svmLinear} returns a fitted linear SVM model.
#' @rdname spark.svmLinear
#' @aliases spark.svmLinear,SparkDataFrame,formula-method
#' @name spark.svmLinear
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' df <- createDataFrame(iris)
#' training <- df[df$Species %in% c("versicolor", "virginica"), ]
#' model <- spark.svmLinear(training, Species ~ ., regParam = 0.5)
#' summary <- summary(model)
#'
#' # fitted values on training data
#' fitted <- predict(model, training)
#'
#' # save fitted model to input path
#' path <- "path/to/model"
#' write.ml(model, path)
#'
#' # can also read back the saved model and predict
#' # Note that summary deos not work on loaded model
#' savedModel <- read.ml(path)
#' summary(savedModel)
#' }
#' @note spark.svmLinear since 2.2.0
setMethod("spark.svmLinear", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, regParam = 0.0, maxIter = 100, tol = 1E-6, standardization = TRUE,
threshold = 0.0, weightCol = NULL, aggregationDepth = 2) {
formula <- paste(deparse(formula), collapse = "")

if (!is.null(weightCol) && weightCol == "") {
weightCol <- NULL
} else if (!is.null(weightCol)) {
weightCol <- as.character(weightCol)
}

jobj <- callJStatic("org.apache.spark.ml.r.LinearSVCWrapper", "fit",
data@sdf, formula, as.numeric(regParam), as.integer(maxIter),
as.numeric(tol), as.logical(standardization), as.numeric(threshold),
weightCol, as.integer(aggregationDepth))
new("LinearSVCModel", jobj = jobj)
})

# Predicted values based on an LinearSVCModel model

#' @param newData a SparkDataFrame for testing.
#' @return \code{predict} returns the predicted values based on an LinearSVCModel.
#' @rdname spark.svmLinear
#' @aliases predict,LinearSVCModel,SparkDataFrame-method
#' @export
#' @note predict(LinearSVCModel) since 2.2.0
setMethod("predict", signature(object = "LinearSVCModel"),
function(object, newData) {
predict_internal(object, newData)
})

# Get the summary of an LinearSVCModel

#' @param object an LinearSVCModel fitted by \code{spark.svmLinear}.
#' @return \code{summary} returns summary information of the fitted model, which is a list.
#' The list includes \code{coefficients} (coefficients of the fitted model),
#' \code{intercept} (intercept of the fitted model), \code{numClasses} (number of classes),
#' \code{numFeatures} (number of features).
#' @rdname spark.svmLinear
#' @aliases summary,LinearSVCModel-method
#' @export
#' @note summary(LinearSVCModel) since 2.2.0
setMethod("summary", signature(object = "LinearSVCModel"),
function(object) {
jobj <- object@jobj
features <- callJMethod(jobj, "features")
labels <- callJMethod(jobj, "labels")
coefficients <- callJMethod(jobj, "coefficients")
nCol <- length(coefficients) / length(features)
coefficients <- matrix(unlist(coefficients), ncol = nCol)
intercept <- callJMethod(jobj, "intercept")
numClasses <- callJMethod(jobj, "numClasses")
numFeatures <- callJMethod(jobj, "numFeatures")
if (nCol == 1) {
colnames(coefficients) <- c("Estimate")
} else {
colnames(coefficients) <- unlist(labels)
}
rownames(coefficients) <- unlist(features)
list(coefficients = coefficients, intercept = intercept,
numClasses = numClasses, numFeatures = numFeatures)
})

# Save fitted LinearSVCModel to the input path

#' @param path The directory where the model is saved.
#' @param overwrite Overwrites or not if the output path already exists. Default is FALSE
#' which means throw exception if the output path exists.
#'
#' @rdname spark.svmLinear
#' @aliases write.ml,LinearSVCModel,character-method
#' @export
#' @note write.ml(LogisticRegression, character) since 2.2.0
setMethod("write.ml", signature(object = "LinearSVCModel", path = "character"),
function(object, path, overwrite = FALSE) {
write_internal(object, path, overwrite)
})

#' Logistic Regression Model
#'
#' Fits an logistic regression model against a SparkDataFrame. It supports "binomial": Binary logistic regression
Expand Down
9 changes: 6 additions & 3 deletions R/pkg/R/mllib_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
#' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture},
#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
#' @seealso \link{spark.kmeans},
#' @seealso \link{spark.lda}, \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
#' @seealso \link{spark.randomForest}, \link{spark.survreg},
#' @seealso \link{spark.lda}, \link{spark.logit},
#' @seealso \link{spark.mlp}, \link{spark.naiveBayes},
#' @seealso \link{spark.randomForest}, \link{spark.survreg}, \link{spark.svmLinear},
#' @seealso \link{read.ml}
NULL

Expand All @@ -51,7 +52,7 @@ NULL
#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
#' @seealso \link{spark.kmeans},
#' @seealso \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
#' @seealso \link{spark.randomForest}, \link{spark.survreg}
#' @seealso \link{spark.randomForest}, \link{spark.survreg}, \link{spark.svmLinear}
NULL

write_internal <- function(object, path, overwrite = FALSE) {
Expand Down Expand Up @@ -115,6 +116,8 @@ read.ml <- function(path) {
new("GBTClassificationModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.BisectingKMeansWrapper")) {
new("BisectingKMeansModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.LinearSVCWrapper")) {
new("LinearSVCModel", jobj = jobj)
} else {
stop("Unsupported model: ", jobj)
}
Expand Down
44 changes: 44 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib_classification.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,50 @@ absoluteSparkPath <- function(x) {
file.path(sparkHome, x)
}

test_that("spark.svmLinear", {
df <- suppressWarnings(createDataFrame(iris))
training <- df[df$Species %in% c("versicolor", "virginica"), ]
model <- spark.svmLinear(training, Species ~ ., regParam = 0.01, maxIter = 10)
summary <- summary(model)

# test summary coefficients return matrix type
expect_true(class(summary$coefficients) == "matrix")
expect_true(class(summary$coefficients[, 1]) == "numeric")

coefs <- summary$coefficients[, "Estimate"]
expected_coefs <- c(-0.1563083, -0.460648, 0.2276626, 1.055085)
expect_true(all(abs(coefs - expected_coefs) < 0.1))
expect_equal(summary$intercept, -0.06004978, tolerance = 1e-2)

# Test prediction with string label
prediction <- predict(model, training)
expect_equal(typeof(take(select(prediction, "prediction"), 1)$prediction), "character")
expected <- c("versicolor", "versicolor", "versicolor", "virginica", "virginica",
"virginica", "virginica", "virginica", "virginica", "virginica")
expect_equal(sort(as.list(take(select(prediction, "prediction"), 10))[[1]]), expected)

# Test model save and load
modelPath <- tempfile(pattern = "spark-svm-linear", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
coefs <- summary(model)$coefficients
coefs2 <- summary(model2)$coefficients
expect_equal(coefs, coefs2)
unlink(modelPath)

# Test prediction with numeric label
label <- c(0.0, 0.0, 0.0, 1.0, 1.0)
feature <- c(1.1419053, 0.9194079, -0.9498666, -1.1069903, 0.2809776)
data <- as.data.frame(cbind(label, feature))
df <- createDataFrame(data)
model <- spark.svmLinear(df, label ~ feature, regParam = 0.1)
prediction <- collect(select(predict(model, df), "prediction"))
expect_equal(sort(prediction$prediction), c("0.0", "0.0", "0.0", "1.0", "1.0"))

})

test_that("spark.logit", {
# R code to reproduce the result.
# nolint start
Expand Down
151 changes: 151 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/r/LinearSVCWrapper.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.ml.r

import org.apache.hadoop.fs.Path
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{LinearSVC, LinearSVCModel}
import org.apache.spark.ml.feature.{IndexToString, RFormula}
import org.apache.spark.ml.r.RWrapperUtils._
import org.apache.spark.ml.util._
import org.apache.spark.sql.{DataFrame, Dataset}

private[r] class LinearSVCWrapper private (
val pipeline: PipelineModel,
val features: Array[String],
val labels: Array[String]) extends MLWritable {
import LinearSVCWrapper._

private val svcModel: LinearSVCModel =
pipeline.stages(1).asInstanceOf[LinearSVCModel]

lazy val coefficients: Array[Double] = svcModel.coefficients.toArray

lazy val intercept: Double = svcModel.intercept

lazy val numClasses: Int = svcModel.numClasses

lazy val numFeatures: Int = svcModel.numFeatures

def transform(dataset: Dataset[_]): DataFrame = {
pipeline.transform(dataset)
.drop(PREDICTED_LABEL_INDEX_COL)
.drop(svcModel.getFeaturesCol)
.drop(svcModel.getLabelCol)
}

override def write: MLWriter = new LinearSVCWrapper.LinearSVCWrapperWriter(this)
}

private[r] object LinearSVCWrapper
extends MLReadable[LinearSVCWrapper] {

val PREDICTED_LABEL_INDEX_COL = "pred_label_idx"
val PREDICTED_LABEL_COL = "prediction"

def fit(
data: DataFrame,
formula: String,
regParam: Double,
maxIter: Int,
tol: Double,
standardization: Boolean,
threshold: Double,
weightCol: String,
aggregationDepth: Int
): LinearSVCWrapper = {

val rFormula = new RFormula()
.setFormula(formula)
.setForceIndexLabel(true)
checkDataColumns(rFormula, data)
val rFormulaModel = rFormula.fit(data)

val fitIntercept = rFormula.hasIntercept

// get labels and feature names from output schema
val (features, labels) = getFeaturesAndLabels(rFormulaModel, data)

// assemble and fit the pipeline
val svc = new LinearSVC()
.setRegParam(regParam)
.setMaxIter(maxIter)
.setTol(tol)
.setFitIntercept(fitIntercept)
.setStandardization(standardization)
.setFeaturesCol(rFormula.getFeaturesCol)
.setLabelCol(rFormula.getLabelCol)
.setPredictionCol(PREDICTED_LABEL_INDEX_COL)
.setThreshold(threshold)
.setAggregationDepth(aggregationDepth)

if (weightCol != null) svc.setWeightCol(weightCol)

val idxToStr = new IndexToString()
.setInputCol(PREDICTED_LABEL_INDEX_COL)
.setOutputCol(PREDICTED_LABEL_COL)
.setLabels(labels)

val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, svc, idxToStr))
.fit(data)

new LinearSVCWrapper(pipeline, features, labels)
}

override def read: MLReader[LinearSVCWrapper] = new LinearSVCWrapperReader

override def load(path: String): LinearSVCWrapper = super.load(path)

class LinearSVCWrapperWriter(instance: LinearSVCWrapper) extends MLWriter {

override protected def saveImpl(path: String): Unit = {
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadata = ("class" -> instance.getClass.getName) ~
("features" -> instance.features.toSeq) ~
("labels" -> instance.labels.toSeq)
val rMetadataJson: String = compact(render(rMetadata))
sc.parallelize(Seq(rMetadataJson), 1).saveAsTextFile(rMetadataPath)

instance.pipeline.save(pipelinePath)
}
}

class LinearSVCWrapperReader extends MLReader[LinearSVCWrapper] {

override def load(path: String): LinearSVCWrapper = {
implicit val format = DefaultFormats
val rMetadataPath = new Path(path, "rMetadata").toString
val pipelinePath = new Path(path, "pipeline").toString

val rMetadataStr = sc.textFile(rMetadataPath, 1).first()
val rMetadata = parse(rMetadataStr)
val features = (rMetadata \ "features").extract[Array[String]]
val labels = (rMetadata \ "labels").extract[Array[String]]

val pipeline = PipelineModel.load(pipelinePath)
new LinearSVCWrapper(pipeline, features, labels)
}
}
}
Loading

0 comments on commit 3f6abff

Please sign in to comment.