Skip to content

Commit

Permalink
[SPARK-19319][BACKPORT-2.1][SPARKR] SparkR Kmeans summary returns err…
Browse files Browse the repository at this point in the history
…or when the cluster size doesn't equal to k

## What changes were proposed in this pull request?

Backport fix of #16666

## How was this patch tested?

Backport unit tests

Author: [email protected] <[email protected]>

Closes #16761 from wangmiao1981/kmeansport.
  • Loading branch information
wangmiao1981 authored and Felix Cheung committed Feb 12, 2017
1 parent 173c238 commit 06e77e0
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 9 deletions.
29 changes: 21 additions & 8 deletions R/pkg/R/mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,10 @@ setMethod("summary", signature(object = "IsotonicRegressionModel"),
#' @param k number of centers.
#' @param maxIter maximum iteration number.
#' @param initMode the initialization algorithm choosen to fit the model.
#' @param seed the random seed for cluster initialization
#' @param initSteps the number of steps for the k-means|| initialization mode.
#' This is an advanced setting, the default of 2 is almost always enough. Must be > 0.
#' @param tol convergence tolerance of iterations.
#' @param ... additional argument(s) passed to the method.
#' @return \code{spark.kmeans} returns a fitted k-means model.
#' @rdname spark.kmeans
Expand Down Expand Up @@ -628,11 +632,16 @@ setMethod("summary", signature(object = "IsotonicRegressionModel"),
#' @note spark.kmeans since 2.0.0
#' @seealso \link{predict}, \link{read.ml}, \link{write.ml}
setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = "formula"),
function(data, formula, k = 2, maxIter = 20, initMode = c("k-means||", "random")) {
function(data, formula, k = 2, maxIter = 20, initMode = c("k-means||", "random"),
seed = NULL, initSteps = 2, tol = 1E-4) {
formula <- paste(deparse(formula), collapse = "")
initMode <- match.arg(initMode)
if (!is.null(seed)) {
seed <- as.character(as.integer(seed))
}
jobj <- callJStatic("org.apache.spark.ml.r.KMeansWrapper", "fit", data@sdf, formula,
as.integer(k), as.integer(maxIter), initMode)
as.integer(k), as.integer(maxIter), initMode, seed,
as.integer(initSteps), as.numeric(tol))
new("KMeansModel", jobj = jobj)
})

Expand Down Expand Up @@ -671,10 +680,13 @@ setMethod("fitted", signature(object = "KMeansModel"),

#' @param object a fitted k-means model.
#' @return \code{summary} returns summary information of the fitted model, which is a list.
#' The list includes the model's \code{k} (number of cluster centers),
#' The list includes the model's \code{k} (the configured number of cluster centers),
#' \code{coefficients} (model cluster centers),
#' \code{size} (number of data points in each cluster), and \code{cluster}
#' (cluster centers of the transformed data).
#' \code{size} (number of data points in each cluster), \code{cluster}
#' (cluster centers of the transformed data), {is.loaded} (whether the model is loaded
#' from a saved file), and \code{clusterSize}
#' (the actual number of cluster centers. When using initMode = "random",
#' \code{clusterSize} may not equal to \code{k}).
#' @rdname spark.kmeans
#' @export
#' @note summary(KMeansModel) since 2.0.0
Expand All @@ -686,16 +698,17 @@ setMethod("summary", signature(object = "KMeansModel"),
coefficients <- callJMethod(jobj, "coefficients")
k <- callJMethod(jobj, "k")
size <- callJMethod(jobj, "size")
coefficients <- t(matrix(coefficients, ncol = k))
clusterSize <- callJMethod(jobj, "clusterSize")
coefficients <- t(matrix(coefficients, ncol = clusterSize))
colnames(coefficients) <- unlist(features)
rownames(coefficients) <- 1:k
rownames(coefficients) <- 1:clusterSize
cluster <- if (is.loaded) {
NULL
} else {
dataFrame(callJMethod(jobj, "cluster"))
}
list(k = k, coefficients = coefficients, size = size,
cluster = cluster, is.loaded = is.loaded)
cluster = cluster, is.loaded = is.loaded, clusterSize = clusterSize)
})

# Predicted values based on a k-means model
Expand Down
27 changes: 27 additions & 0 deletions R/pkg/inst/tests/testthat/test_mllib.R
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,33 @@ test_that("spark.kmeans", {
expect_true(summary2$is.loaded)

unlink(modelPath)

# Test Kmeans on dataset that is sensitive to seed value
col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
cols <- as.data.frame(cbind(col1, col2, col3))
df <- createDataFrame(cols)

model1 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
initMode = "random", seed = 1, tol = 1E-5)
model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
initMode = "random", seed = 22222, tol = 1E-5)

summary.model1 <- summary(model1)
summary.model2 <- summary(model2)
cluster1 <- summary.model1$cluster
cluster2 <- summary.model2$cluster
clusterSize1 <- summary.model1$clusterSize
clusterSize2 <- summary.model2$clusterSize

# The predicted clusters are different
expect_equal(sort(collect(distinct(select(cluster1, "prediction")))$prediction),
c(0, 1, 2, 3))
expect_equal(sort(collect(distinct(select(cluster2, "prediction")))$prediction),
c(0, 1, 2))
expect_equal(clusterSize1, 4)
expect_equal(clusterSize2, 3)
})

test_that("spark.mlp", {
Expand Down
11 changes: 10 additions & 1 deletion mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ private[r] class KMeansWrapper private (

lazy val cluster: DataFrame = kMeansModel.summary.cluster

lazy val clusterSize: Int = kMeansModel.clusterCenters.size

def fitted(method: String): DataFrame = {
if (method == "centers") {
kMeansModel.summary.predictions.drop(kMeansModel.getFeaturesCol)
Expand All @@ -68,7 +70,10 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
formula: String,
k: Int,
maxIter: Int,
initMode: String): KMeansWrapper = {
initMode: String,
seed: String,
initSteps: Int,
tol: Double): KMeansWrapper = {

val rFormula = new RFormula()
.setFormula(formula)
Expand All @@ -87,6 +92,10 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] {
.setMaxIter(maxIter)
.setInitMode(initMode)
.setFeaturesCol(rFormula.getFeaturesCol)
.setInitSteps(initSteps)
.setTol(tol)

if (seed != null && seed.length > 0) kMeans.setSeed(seed.toInt)

val pipeline = new Pipeline()
.setStages(Array(rFormulaModel, kMeans))
Expand Down

0 comments on commit 06e77e0

Please sign in to comment.