Skip to content

Commit

Permalink
[SPARK-18821][SPARKR] Bisecting k-means wrapper in SparkR
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Add R wrapper for bisecting Kmeans.

As JIRA is down, I will update title to link with corresponding JIRA later.

## How was this patch tested?

Add new unit tests.

Author: [email protected] <[email protected]>

Closes #16566 from wangmiao1981/bk.
  • Loading branch information
wangmiao1981 authored and Felix Cheung committed Jan 27, 2017
1 parent 1191fe2 commit c0ba284
Show file tree
Hide file tree
Showing 7 changed files with 347 additions and 5 deletions.
3 changes: 2 additions & 1 deletion R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ exportMethods("glm",
"spark.kstest",
"spark.logit",
"spark.randomForest",
"spark.gbt")
"spark.gbt",
"spark.bisectingKmeans")

# Job group lifecycle management methods
export("setJobGroup",
Expand Down
5 changes: 5 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,11 @@ setGeneric("rbind", signature = "...")
#' @export
setGeneric("spark.als", function(data, ...) { standardGeneric("spark.als") })

#' @rdname spark.bisectingKmeans
#' @export
setGeneric("spark.bisectingKmeans",
function(data, formula, ...) { standardGeneric("spark.bisectingKmeans") })

#' @rdname spark.gaussianMixture
#' @export
setGeneric("spark.gaussianMixture",
Expand Down
149 changes: 149 additions & 0 deletions R/pkg/R/mllib_clustering.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@

# mllib_clustering.R: Provides methods for MLlib clustering algorithms integration

#' S4 class that represents a BisectingKMeansModel
#'
#' @param jobj a Java object reference to the backing Scala BisectingKMeansModel
#' @export
#' @note BisectingKMeansModel since 2.2.0
setClass("BisectingKMeansModel", representation(jobj = "jobj"))

#' S4 class that represents a GaussianMixtureModel
#'
#' @param jobj a Java object reference to the backing Scala GaussianMixtureModel
Expand All @@ -38,6 +45,148 @@ setClass("KMeansModel", representation(jobj = "jobj"))
#' @note LDAModel since 2.1.0
setClass("LDAModel", representation(jobj = "jobj"))

#' Bisecting K-Means Clustering Model
#'
#' Fits a bisecting k-means clustering model against a Spark DataFrame.
#' Users can call \code{summary} to print a summary of the fitted model, \code{predict} to make
#' predictions on new data, and \code{write.ml}/\code{read.ml} to save/load fitted models.
#'
#' @param data a SparkDataFrame for training.
#' @param formula a symbolic description of the model to be fitted. Currently only a few formula
#' operators are supported, including '~', '.', ':', '+', and '-'.
#' Note that the response variable of formula is empty in spark.bisectingKmeans.
#' @param k the desired number of leaf clusters. Must be > 1.
#' The actual number could be smaller if there are no divisible leaf clusters.
#' @param maxIter maximum iteration number.
#' @param seed the random seed.
#' @param minDivisibleClusterSize The minimum number of points (if greater than or equal to 1.0)
#' or the minimum proportion of points (if less than 1.0) of a divisible cluster.
#' Note that it is an expert parameter. The default value should be good enough
#' for most cases.
#' @param ... additional argument(s) passed to the method.
#' @return \code{spark.bisectingKmeans} returns a fitted bisecting k-means model.
#' @rdname spark.bisectingKmeans
#' @aliases spark.bisectingKmeans,SparkDataFrame,formula-method
#' @name spark.bisectingKmeans
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' df <- createDataFrame(iris)
#' model <- spark.bisectingKmeans(df, Sepal_Length ~ Sepal_Width, k = 4)
#' summary(model)
#'
#' # get fitted result from a bisecting k-means model
#' fitted.model <- fitted(model, "centers")
#' showDF(fitted.model)
#'
#' # fitted values on training data
#' fitted <- predict(model, df)
#' head(select(fitted, "Sepal_Length", "prediction"))
#'
#' # save fitted model to input path
#' path <- "path/to/model"
#' write.ml(model, path)
#'
#' # can also read back the saved model and print
#' savedModel <- read.ml(path)
#' summary(savedModel)
#' }
#' @note spark.bisectingKmeans since 2.2.0
#' @seealso \link{predict}, \link{read.ml}, \link{write.ml}
setMethod("spark.bisectingKmeans", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, k = 4, maxIter = 20, seed = NULL, minDivisibleClusterSize = 1.0) {
formula <- paste0(deparse(formula), collapse = "")
if (!is.null(seed)) {
seed <- as.character(as.integer(seed))
}
jobj <- callJStatic("org.apache.spark.ml.r.BisectingKMeansWrapper", "fit",
data@sdf, formula, as.integer(k), as.integer(maxIter),
seed, as.numeric(minDivisibleClusterSize))
new("BisectingKMeansModel", jobj = jobj)
})

# Get the summary of a bisecting k-means model

#' @param object a fitted bisecting k-means model.
#' @return \code{summary} returns summary information of the fitted model, which is a list.
#' The list includes the model's \code{k} (number of cluster centers),
#' \code{coefficients} (model cluster centers),
#' \code{size} (number of data points in each cluster), \code{cluster}
#' (cluster centers of the transformed data; cluster is NULL if is.loaded is TRUE),
#' and \code{is.loaded} (whether the model is loaded from a saved file).
#' @rdname spark.bisectingKmeans
#' @export
#' @note summary(BisectingKMeansModel) since 2.2.0
setMethod("summary", signature(object = "BisectingKMeansModel"),
function(object) {
jobj <- object@jobj
is.loaded <- callJMethod(jobj, "isLoaded")
features <- callJMethod(jobj, "features")
coefficients <- callJMethod(jobj, "coefficients")
k <- callJMethod(jobj, "k")
size <- callJMethod(jobj, "size")
coefficients <- t(matrix(coefficients, ncol = k))
colnames(coefficients) <- unlist(features)
rownames(coefficients) <- 1:k
cluster <- if (is.loaded) {
NULL
} else {
dataFrame(callJMethod(jobj, "cluster"))
}
list(k = k, coefficients = coefficients, size = size,
cluster = cluster, is.loaded = is.loaded)
})

# Predicted values based on a bisecting k-means model

#' @param newData a SparkDataFrame for testing.
#' @return \code{predict} returns the predicted values based on a bisecting k-means model.
#' @rdname spark.bisectingKmeans
#' @export
#' @note predict(BisectingKMeansModel) since 2.2.0
setMethod("predict", signature(object = "BisectingKMeansModel"),
function(object, newData) {
predict_internal(object, newData)
})

#' Get fitted result from a bisecting k-means model
#'
#' Get fitted result from a bisecting k-means model.
#' Note: A saved-loaded model does not support this method.
#'
#' @param method type of fitted results, \code{"centers"} for cluster centers
#' or \code{"classes"} for assigned classes.
#' @return \code{fitted} returns a SparkDataFrame containing fitted values.
#' @rdname spark.bisectingKmeans
#' @export
#' @note fitted since 2.2.0
setMethod("fitted", signature(object = "BisectingKMeansModel"),
function(object, method = c("centers", "classes")) {
method <- match.arg(method)
jobj <- object@jobj
is.loaded <- callJMethod(jobj, "isLoaded")
if (is.loaded) {
stop("Saved-loaded bisecting k-means model does not support 'fitted' method")
} else {
dataFrame(callJMethod(jobj, "fitted", method))
}
})

# Save fitted MLlib model to the input path

#' @param path the directory where the model is saved.
#' @param overwrite overwrites or not if the output path already exists. Default is FALSE
#' which means throw exception if the output path exists.
#'
#' @rdname spark.bisectingKmeans
#' @export
#' @note write.ml(BisectingKMeansModel, character) since 2.2.0
setMethod("write.ml", signature(object = "BisectingKMeansModel", path = "character"),
function(object, path, overwrite = FALSE) {
write_internal(object, path, overwrite)
})

#' Multivariate Gaussian Mixture Model (GMM)
#'
#' Fits multivariate gaussian mixture model against a Spark DataFrame, similarly to R's
Expand Down
10 changes: 6 additions & 4 deletions R/pkg/R/mllib_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
#' @rdname write.ml
#' @name write.ml
#' @export
#' @seealso \link{spark.glm}, \link{glm},
#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg},
#' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture},
#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
#' @seealso \link{spark.kmeans},
#' @seealso \link{spark.lda}, \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
#' @seealso \link{spark.randomForest}, \link{spark.survreg},
Expand All @@ -47,8 +47,8 @@ NULL
#' @rdname predict
#' @name predict
#' @export
#' @seealso \link{spark.glm}, \link{glm},
#' @seealso \link{spark.als}, \link{spark.gaussianMixture}, \link{spark.gbt}, \link{spark.isoreg},
#' @seealso \link{spark.als}, \link{spark.bisectingKmeans}, \link{spark.gaussianMixture},
#' @seealso \link{spark.gbt}, \link{spark.glm}, \link{glm}, \link{spark.isoreg},
#' @seealso \link{spark.kmeans},
#' @seealso \link{spark.logit}, \link{spark.mlp}, \link{spark.naiveBayes},
#' @seealso \link{spark.randomForest}, \link{spark.survreg}
Expand Down Expand Up @@ -113,6 +113,8 @@ read.ml <- function(path) {
new("GBTRegressionModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.GBTClassifierWrapper")) {
new("GBTClassificationModel", jobj = jobj)
} else if (isInstanceOf(jobj, "org.apache.spark.ml.r.BisectingKMeansWrapper")) {
new("BisectingKMeansModel", jobj = jobj)
} else {
stop("Unsupported model: ", jobj)
}
Expand Down
40 changes: 40 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib_clustering.R
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,46 @@ absoluteSparkPath <- function(x) {
file.path(sparkHome, x)
}

test_that("spark.bisectingKmeans", {
newIris <- iris
newIris$Species <- NULL
training <- suppressWarnings(createDataFrame(newIris))

take(training, 1)

model <- spark.bisectingKmeans(data = training, ~ .)
sample <- take(select(predict(model, training), "prediction"), 1)
expect_equal(typeof(sample$prediction), "integer")
expect_equal(sample$prediction, 1)

# Test fitted works on Bisecting KMeans
fitted.model <- fitted(model)
expect_equal(sort(collect(distinct(select(fitted.model, "prediction")))$prediction),
c(0, 1, 2, 3))

# Test summary works on KMeans
summary.model <- summary(model)
cluster <- summary.model$cluster
k <- summary.model$k
expect_equal(k, 4)
expect_equal(sort(collect(distinct(select(cluster, "prediction")))$prediction),
c(0, 1, 2, 3))

# Test model save/load
modelPath <- tempfile(pattern = "spark-bisectingkmeans", fileext = ".tmp")
write.ml(model, modelPath)
expect_error(write.ml(model, modelPath))
write.ml(model, modelPath, overwrite = TRUE)
model2 <- read.ml(modelPath)
summary2 <- summary(model2)
expect_equal(sort(unlist(summary.model$size)), sort(unlist(summary2$size)))
expect_equal(summary.model$coefficients, summary2$coefficients)
expect_true(!summary.model$is.loaded)
expect_true(summary2$is.loaded)

unlink(modelPath)
})

test_that("spark.gaussianMixture", {
# R code to reproduce the result.
# nolint start
Expand Down
Loading

0 comments on commit c0ba284

Please sign in to comment.