From 35c188463798729b65ca74549984cb765ac1e9c9 Mon Sep 17 00:00:00 2001 From: "nate.crosswhite" Date: Thu, 4 Dec 2014 14:12:29 -0500 Subject: [PATCH 1/4] Add kmeans initial seed to pyspark API --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 6 +++++- .../apache/spark/mllib/clustering/KMeans.scala | 12 +++++++++--- python/pyspark/mllib/clustering.py | 4 ++-- python/pyspark/mllib/tests.py | 17 +++++++++++++++++ 4 files changed, 33 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 9f20cd5d00dcd..481084f413da6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -266,12 +266,16 @@ class PythonMLLibAPI extends Serializable { k: Int, maxIterations: Int, runs: Int, - initializationMode: String): KMeansModel = { + initializationMode: String, + seed: java.lang.Long): KMeansModel = { val kMeansAlg = new KMeans() .setK(k) .setMaxIterations(maxIterations) .setRuns(runs) .setInitializationMode(initializationMode) + + if (seed != null) kMeansAlg.setSeed(seed) + try { kMeansAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK)) } finally { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 54c301d3e9e14..10192cf1d7362 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -43,7 +43,8 @@ class KMeans private ( private var runs: Int, private var initializationMode: String, private var initializationSteps: Int, - private var epsilon: Double) extends Serializable with Logging { + private var epsilon: Double, + private var seed: Long = System.nanoTime()) extends Serializable with Logging { /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, @@ -51,6 +52,11 @@ class KMeans private ( */ def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4) + def setSeed(seed: Long): this.type = { + this.seed = seed + this + } + /** Set the number of clusters to create (k). Default: 2. */ def setK(k: Int): this.type = { this.k = k @@ -255,7 +261,7 @@ class KMeans private ( private def initRandom(data: RDD[VectorWithNorm]) : Array[Array[VectorWithNorm]] = { // Sample all the cluster centers in one pass to avoid repeated scans - val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq + val sample = data.takeSample(true, runs * k, new XORShiftRandom(this.seed).nextInt()).toSeq Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v => new VectorWithNorm(Vectors.dense(v.vector.toArray), v.norm) }.toArray) @@ -273,7 +279,7 @@ class KMeans private ( private def initKMeansParallel(data: RDD[VectorWithNorm]) : Array[Array[VectorWithNorm]] = { // Initialize each run's center to a random point - val seed = new XORShiftRandom().nextInt() + val seed = new XORShiftRandom(this.seed).nextInt() val sample = data.takeSample(true, runs, seed).toSeq val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense)) diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index e2492eef5bd6a..6b713aa39374e 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -78,10 +78,10 @@ def predict(self, x): class KMeans(object): @classmethod - def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||"): + def train(cls, rdd, k, maxIterations=100, runs=1, initializationMode="k-means||", seed=None): """Train a k-means clustering model.""" model = callMLlibFunc("trainKMeansModel", rdd.map(_convert_to_vector), k, maxIterations, - runs, initializationMode) + runs, initializationMode, seed) centers = callJavaFunc(rdd.context, model.clusterCenters) return KMeansModel([c.toArray() for c in centers]) diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 8332f8e061f48..20e451a7f93f8 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -129,6 +129,23 @@ def test_clustering(self): self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1])) self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3])) + def test_clustering_deterministic(self): + from pyspark.mllib.clustering import KMeans + X = range(0, 100, 10) + Y = range(0, 100, 10) + data = [[x, y] for x, y in zip(X, Y)] + clusters1 = KMeans.train(self.sc.parallelize(data), + 3, initializationMode="k-means||", seed=42) + clusters2 = KMeans.train(self.sc.parallelize(data), + 3, initializationMode="k-means||", seed=42) + clusters3 = KMeans.train(self.sc.parallelize(data), + 3, initializationMode="k-means||", seed=42) + centers1 = array(clusters1.centers).flatten().tolist() + centers2 = array(clusters2.centers).flatten().tolist() + centers3 = array(clusters3.centers).flatten().tolist() + self.assertListEqual(centers1, centers2) + self.assertListEqual(centers1, centers3) + def test_classification(self): from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes from pyspark.mllib.tree import DecisionTree From 5d087b40e14db51b1eeb44e462e04d5e718338be Mon Sep 17 00:00:00 2001 From: "nate.crosswhite" Date: Thu, 4 Dec 2014 16:25:49 -0500 Subject: [PATCH 2/4] Adding KMeans train with seed and Scala unit test --- .../spark/mllib/clustering/KMeans.scala | 25 +++++++++++++++++++ .../spark/mllib/clustering/KMeansSuite.scala | 21 ++++++++++++++++ 2 files changed, 46 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 10192cf1d7362..9a5f3583506aa 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -358,6 +358,31 @@ object KMeans { .run(data) } + /** + * Trains a k-means model using the given set of parameters. + * + * @param data training points stored as `RDD[Array[Double]]` + * @param k number of clusters + * @param maxIterations max number of iterations + * @param runs number of parallel runs, defaults to 1. The best model is returned. + * @param initializationMode initialization model, either "random" or "k-means||" (default). + * @param seed seed value for cluster initialization + */ + def train( + data: RDD[Vector], + k: Int, + maxIterations: Int, + runs: Int, + initializationMode: String, + seed: Long): KMeansModel = { + new KMeans().setK(k) + .setMaxIterations(maxIterations) + .setRuns(runs) + .setInitializationMode(initializationMode) + .setSeed(seed) + .run(data) + } + /** * Trains a k-means model using specified parameters and the default values for unspecified. */ diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 9ebef8466c831..2ada0d9505fa9 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -90,6 +90,27 @@ class KMeansSuite extends FunSuite with MLlibTestSparkContext { assert(model.clusterCenters.size === 3) } + test("deterministic initilization") { + // Create a large-ish set of point to cluster + val points = List.tabulate(1000)(n => Vectors.dense(n,n)) + val rdd = sc.parallelize(points, 3) + + for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { + // Create three deterministic models and compare cluster means + val model1 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, initializationMode = initMode, seed = 42) + val centers1 = model1.clusterCenters + + val model2 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, initializationMode = initMode, seed = 42) + val centers2 = model2.clusterCenters + + val model3 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, initializationMode = initMode, seed = 42) + val centers3 = model3.clusterCenters + + assert(centers1.deep == centers2.deep) + assert(centers1.deep == centers3.deep) + } + } + test("single cluster with big dataset") { val smallData = Array( Vectors.dense(1.0, 2.0, 6.0), From f8d5928ab22b874dc46a074b7b53b104d756a541 Mon Sep 17 00:00:00 2001 From: "nate.crosswhite" Date: Wed, 10 Dec 2014 16:41:32 -0500 Subject: [PATCH 3/4] Addressing PR issues --- .../spark/mllib/clustering/KMeans.scala | 23 ++++++++++--------- .../spark/mllib/clustering/KMeansSuite.scala | 6 ++--- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 9a5f3583506aa..a167c26598bfb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -44,18 +44,13 @@ class KMeans private ( private var initializationMode: String, private var initializationSteps: Int, private var epsilon: Double, - private var seed: Long = System.nanoTime()) extends Serializable with Logging { + private var seed: Long) extends Serializable with Logging { /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, - * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4}. + * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, System.nanoTime()}. */ - def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4) - - def setSeed(seed: Long): this.type = { - this.seed = seed - this - } + def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, System.nanoTime()) /** Set the number of clusters to create (k). Default: 2. */ def setK(k: Int): this.type = { @@ -118,6 +113,12 @@ class KMeans private ( this } + /** Set the random seed for cluster initialization. */ + def setSeed(seed: Long): this.type = { + this.seed = seed + this + } + /** * Train a K-means model on the given set of points; `data` should be cached for high * performance, because this is an iterative algorithm. @@ -339,7 +340,7 @@ object KMeans { /** * Trains a k-means model using the given set of parameters. * - * @param data training points stored as `RDD[Array[Double]]` + * @param data training points stored as `RDD[Vector]` * @param k number of clusters * @param maxIterations max number of iterations * @param runs number of parallel runs, defaults to 1. The best model is returned. @@ -361,12 +362,12 @@ object KMeans { /** * Trains a k-means model using the given set of parameters. * - * @param data training points stored as `RDD[Array[Double]]` + * @param data training points stored as `RDD[Vector]` * @param k number of clusters * @param maxIterations max number of iterations * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). - * @param seed seed value for cluster initialization + * @param seed random seed value for cluster initialization */ def train( data: RDD[Vector], diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 2ada0d9505fa9..5948aa1085ebc 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -90,9 +90,9 @@ class KMeansSuite extends FunSuite with MLlibTestSparkContext { assert(model.clusterCenters.size === 3) } - test("deterministic initilization") { - // Create a large-ish set of point to cluster - val points = List.tabulate(1000)(n => Vectors.dense(n,n)) + test("deterministic initialization") { + // Create a large-ish set of points for clustering + val points = List.tabulate(1000)(n => Vectors.dense(n, n)) val rdd = sc.parallelize(points, 3) for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { From 7668124d7d774b04ac99d683781acb2f4d9ea2c6 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sun, 18 Jan 2015 17:38:24 -0800 Subject: [PATCH 4/4] minor updates --- .../apache/spark/mllib/clustering/KMeans.scala | 18 +++++++++--------- .../spark/mllib/clustering/KMeansSuite.scala | 14 +++++++------- python/pyspark/mllib/tests.py | 16 +++++++--------- 3 files changed, 23 insertions(+), 25 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index a167c26598bfb..6b5c934f015ba 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -19,14 +19,14 @@ package org.apache.spark.mllib.clustering import scala.collection.mutable.ArrayBuffer -import org.apache.spark.annotation.Experimental import org.apache.spark.Logging -import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.{axpy, scal} import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils import org.apache.spark.util.random.XORShiftRandom /** @@ -48,9 +48,9 @@ class KMeans private ( /** * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, runs: 1, - * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, System.nanoTime()}. + * initializationMode: "k-means||", initializationSteps: 5, epsilon: 1e-4, seed: random}. */ - def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, System.nanoTime()) + def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4, Utils.random.nextLong()) /** Set the number of clusters to create (k). Default: 2. */ def setK(k: Int): this.type = { @@ -345,17 +345,20 @@ object KMeans { * @param maxIterations max number of iterations * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). + * @param seed random seed value for cluster initialization */ def train( data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, - initializationMode: String): KMeansModel = { + initializationMode: String, + seed: Long): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setRuns(runs) .setInitializationMode(initializationMode) + .setSeed(seed) .run(data) } @@ -367,20 +370,17 @@ object KMeans { * @param maxIterations max number of iterations * @param runs number of parallel runs, defaults to 1. The best model is returned. * @param initializationMode initialization model, either "random" or "k-means||" (default). - * @param seed random seed value for cluster initialization */ def train( data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, - initializationMode: String, - seed: Long): KMeansModel = { + initializationMode: String): KMeansModel = { new KMeans().setK(k) .setMaxIterations(maxIterations) .setRuns(runs) .setInitializationMode(initializationMode) - .setSeed(seed) .run(data) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala index 5948aa1085ebc..caee5917000aa 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala @@ -97,17 +97,17 @@ class KMeansSuite extends FunSuite with MLlibTestSparkContext { for (initMode <- Seq(RANDOM, K_MEANS_PARALLEL)) { // Create three deterministic models and compare cluster means - val model1 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, initializationMode = initMode, seed = 42) + val model1 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, + initializationMode = initMode, seed = 42) val centers1 = model1.clusterCenters - val model2 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, initializationMode = initMode, seed = 42) + val model2 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, + initializationMode = initMode, seed = 42) val centers2 = model2.clusterCenters - val model3 = KMeans.train(rdd, k = 10, maxIterations = 2, runs = 1, initializationMode = initMode, seed = 42) - val centers3 = model3.clusterCenters - - assert(centers1.deep == centers2.deep) - assert(centers1.deep == centers3.deep) + centers1.zip(centers2).foreach { case (c1, c2) => + assert(c1 ~== c2 absTol 1E-14) + } } } diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py index 20e451a7f93f8..fc575671139e2 100644 --- a/python/pyspark/mllib/tests.py +++ b/python/pyspark/mllib/tests.py @@ -117,7 +117,7 @@ class ListTests(PySparkTestCase): as NumPy arrays. """ - def test_clustering(self): + def test_kmeans(self): from pyspark.mllib.clustering import KMeans data = [ [0, 1.1], @@ -129,7 +129,7 @@ def test_clustering(self): self.assertEquals(clusters.predict(data[0]), clusters.predict(data[1])) self.assertEquals(clusters.predict(data[2]), clusters.predict(data[3])) - def test_clustering_deterministic(self): + def test_kmeans_deterministic(self): from pyspark.mllib.clustering import KMeans X = range(0, 100, 10) Y = range(0, 100, 10) @@ -138,13 +138,11 @@ def test_clustering_deterministic(self): 3, initializationMode="k-means||", seed=42) clusters2 = KMeans.train(self.sc.parallelize(data), 3, initializationMode="k-means||", seed=42) - clusters3 = KMeans.train(self.sc.parallelize(data), - 3, initializationMode="k-means||", seed=42) - centers1 = array(clusters1.centers).flatten().tolist() - centers2 = array(clusters2.centers).flatten().tolist() - centers3 = array(clusters3.centers).flatten().tolist() - self.assertListEqual(centers1, centers2) - self.assertListEqual(centers1, centers3) + centers1 = clusters1.centers + centers2 = clusters2.centers + for c1, c2 in zip(centers1, centers2): + # TODO: Allow small numeric difference. + self.assertTrue(array_equal(c1, c2)) def test_classification(self): from pyspark.mllib.classification import LogisticRegressionWithSGD, SVMWithSGD, NaiveBayes