diff --git a/NOTICE b/NOTICE
index 7cbb114b2ae2d..33e19418f4e56 100644
--- a/NOTICE
+++ b/NOTICE
@@ -3,3 +3,18 @@ Copyright 2013 The Apache Software Foundation.
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
+
+Numerical linear algebra support in MLlib is provided by the breeze package,
+which depends on the following packages that are not distributed under
+Apache authorized licenses:
+
+- netlib-core, which is open source software written by Samuel Halliday,
+ and copyright by the University of Tennessee, the University of Tennessee
+ Research Foundation, the University of California at Berkeley, and the
+ University of Colorado at Denver. The original software is available from
+ https://github.com/fommil/netlib-java
+
+- JTransforms, which is open source software written by Piotr Wendykier,
+ and distributed under the the terms of the MPL/LGPL/GPL tri-license.
+ The original software is available from
+ https://sites.google.com/site/piotrwendykier/software/jtransforms
diff --git a/mllib/pom.xml b/mllib/pom.xml
index 760a2a85d5ffa..44dfb7dfb11c1 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -60,6 +60,11 @@
jblas
1.2.3
+
+ org.scalanlp
+ breeze_${scala.binary.version}
+ 0.7-SNAPSHOT
+
org.scalatest
scalatest_${scala.binary.version}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index e508b76c3f8c5..a6ecf64922713 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -19,16 +19,15 @@ package org.apache.spark.mllib.clustering
import scala.collection.mutable.ArrayBuffer
-import org.jblas.DoubleMatrix
+import breeze.linalg.{DenseVector => BDV, Vector => BV, squaredDistance => breezeSquaredDistance}
-import org.apache.spark.SparkContext
+import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.SparkContext._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
+import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.rdd.RDD
import org.apache.spark.util.random.XORShiftRandom
-
/**
* K-means clustering with support for multiple parallel runs and a k-means++ like initialization
* mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested,
@@ -46,8 +45,6 @@ class KMeans private (
var epsilon: Double)
extends Serializable with Logging
{
- private type ClusterCenters = Array[Array[Double]]
-
def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)
/** Set the number of clusters to create (k). Default: 2. */
@@ -114,6 +111,23 @@ class KMeans private (
* performance, because this is an iterative algorithm.
*/
def run(data: RDD[Array[Double]]): KMeansModel = {
+ val breezeData = data.map(v => new BDV[Double](v).asInstanceOf[BV[Double]])
+ runBreeze(breezeData)
+ }
+
+ /**
+ * Train a K-means model on the given set of points; `data` should be cached for high
+ * performance, because this is an iterative algorithm.
+ */
+ def run(data: RDD[Vector])(implicit d: DummyImplicit): KMeansModel = {
+ val breezeData = data.map(v => v.toBreeze)
+ runBreeze(breezeData)
+ }
+
+ /**
+ * Implementation using Breeze.
+ */
+ private def runBreeze(data: RDD[BV[Double]]): KMeansModel = {
// TODO: check whether data is persistent; this needs RDD.storageLevel to be publicly readable
val sc = data.sparkContext
@@ -132,9 +146,9 @@ class KMeans private (
// Execute iterations of Lloyd's algorithm until all runs have converged
while (iteration < maxIterations && !activeRuns.isEmpty) {
- type WeightedPoint = (DoubleMatrix, Long)
+ type WeightedPoint = (BDV[Double], Long)
def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
- (p1._1.addi(p2._1), p1._2 + p2._2)
+ (p1._1 += p2._1, p1._2 + p2._2)
}
val activeCenters = activeRuns.map(r => centers(r)).toArray
@@ -146,13 +160,13 @@ class KMeans private (
val k = activeCenters(0).length
val dims = activeCenters(0)(0).length
- val sums = Array.fill(runs, k)(new DoubleMatrix(dims))
+ val sums = Array.fill(runs, k)(BDV.zeros[Double](dims))
val counts = Array.fill(runs, k)(0L)
for (point <- points; (centers, runIndex) <- activeCenters.zipWithIndex) {
val (bestCenter, cost) = KMeans.findClosest(centers, point)
costAccums(runIndex) += cost
- sums(runIndex)(bestCenter).addi(new DoubleMatrix(point))
+ sums(runIndex)(bestCenter) += point
counts(runIndex)(bestCenter) += 1
}
@@ -168,8 +182,9 @@ class KMeans private (
for (j <- 0 until k) {
val (sum, count) = totalContribs((i, j))
if (count != 0) {
- val newCenter = sum.divi(count).data
- if (MLUtils.squaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {
+ sum /= count.toDouble
+ val newCenter = sum
+ if (breezeSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {
changed = true
}
centers(run)(j) = newCenter
@@ -187,16 +202,20 @@ class KMeans private (
}
val bestRun = costs.zipWithIndex.min._2
- new KMeansModel(centers(bestRun))
+ new KMeansModel(centers(bestRun).map { v =>
+ v.toArray
+ })
}
/**
* Initialize `runs` sets of cluster centers at random.
*/
- private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = {
+ private def initRandom(data: RDD[BV[Double]]): Array[Array[BV[Double]]] = {
// Sample all the cluster centers in one pass to avoid repeated scans
val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq
- Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).toArray)
+ Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v =>
+ v.toDenseVector
+ }.toArray)
}
/**
@@ -208,41 +227,39 @@ class KMeans private (
*
* The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
*/
- private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = {
+ private def initKMeansParallel(data: RDD[BV[Double]]): Array[Array[BV[Double]]] = {
// Initialize each run's center to a random point
val seed = new XORShiftRandom().nextInt()
val sample = data.takeSample(true, runs, seed).toSeq
- val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
+ val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDenseVector))
// On each step, sample 2 * k points on average for each run with probability proportional
// to their squared distance from that run's current centers
for (step <- 0 until initializationSteps) {
- val centerArrays = centers.map(_.toArray)
val sumCosts = data.flatMap { point =>
- for (r <- 0 until runs) yield (r, KMeans.pointCost(centerArrays(r), point))
+ for (r <- 0 until runs) yield (r, KMeans.pointCost(centers(r), point))
}.reduceByKey(_ + _).collectAsMap()
val chosen = data.mapPartitionsWithIndex { (index, points) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
for {
p <- points
r <- 0 until runs
- if rand.nextDouble() < KMeans.pointCost(centerArrays(r), p) * 2 * k / sumCosts(r)
+ if rand.nextDouble() < KMeans.pointCost(centers(r), p) * 2 * k / sumCosts(r)
} yield (r, p)
}.collect()
for ((r, p) <- chosen) {
- centers(r) += p
+ centers(r) += p.toDenseVector
}
}
// Finally, we might have a set of more than k candidate centers for each run; weigh each
// candidate by the number of points in the dataset mapping to it and run a local k-means++
// on the weighted centers to pick just k of them
- val centerArrays = centers.map(_.toArray)
val weightMap = data.flatMap { p =>
- for (r <- 0 until runs) yield ((r, KMeans.findClosest(centerArrays(r), p)._1), 1.0)
+ for (r <- 0 until runs) yield ((r, KMeans.findClosest(centers(r), p)._1), 1.0)
}.reduceByKey(_ + _).collectAsMap()
val finalCenters = (0 until runs).map { r =>
- val myCenters = centers(r).toArray
+ val myCenters = centers(r).toArray.asInstanceOf[Array[BV[Double]]]
val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray
LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
}
@@ -256,6 +273,7 @@ class KMeans private (
* Top-level methods for calling K-means clustering.
*/
object KMeans {
+
// Initialization mode names
val RANDOM = "random"
val K_MEANS_PARALLEL = "k-means||"
@@ -268,6 +286,28 @@ object KMeans {
initializationMode: String)
: KMeansModel =
{
+ new KMeans().setK(k)
+ .setMaxIterations(maxIterations)
+ .setRuns(runs)
+ .setInitializationMode(initializationMode)
+ .run(data)
+ }
+
+ def train(data: RDD[Array[Double]], k: Int, maxIterations: Int, runs: Int): KMeansModel = {
+ train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
+ }
+
+ def train(data: RDD[Array[Double]], k: Int, maxIterations: Int): KMeansModel = {
+ train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
+ }
+
+ def train(
+ data: RDD[Vector],
+ k: Int,
+ maxIterations: Int,
+ runs: Int,
+ initializationMode: String
+ )(implicit d: DummyImplicit): KMeansModel = {
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
@@ -275,11 +315,13 @@ object KMeans {
.run(data)
}
- def train(data: RDD[Array[Double]], k: Int, maxIterations: Int, runs: Int): KMeansModel = {
+ def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int)
+ (implicit d: DummyImplicit): KMeansModel = {
train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
}
- def train(data: RDD[Array[Double]], k: Int, maxIterations: Int): KMeansModel = {
+ def train(data: RDD[Vector], k: Int, maxIterations: Int)
+ (implicit d: DummyImplicit): KMeansModel = {
train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
}
@@ -301,6 +343,25 @@ object KMeans {
(bestIndex, bestDistance)
}
+ /**
+ * Returns the index of the closest center to the given point, as well as the squared distance.
+ */
+ private[mllib] def findClosest(centers: TraversableOnce[BV[Double]], point: BV[Double])
+ : (Int, Double) = {
+ var bestDistance = Double.PositiveInfinity
+ var bestIndex = 0
+ var i = 0
+ centers.foreach { v =>
+ val distance: Double = breezeSquaredDistance(v, point)
+ if (distance < bestDistance) {
+ bestDistance = distance
+ bestIndex = i
+ }
+ i += 1
+ }
+ (bestIndex, bestDistance)
+ }
+
/**
* Return the K-means cost of a given point against the given cluster centers.
*/
@@ -315,6 +376,12 @@ object KMeans {
bestDistance
}
+ /**
+ * Returns the K-means cost of a given point against the given cluster centers.
+ */
+ private[mllib] def pointCost(centers: TraversableOnce[BV[Double]], point: BV[Double]): Double =
+ findClosest(centers, point)._2
+
def main(args: Array[String]) {
if (args.length < 4) {
println("Usage: KMeans []")
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
index 980be931576dc..06cf2e3ceb36a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansModel.scala
@@ -17,13 +17,21 @@
package org.apache.spark.mllib.clustering
+import breeze.linalg.{DenseVector => BreezeDenseVector}
+
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
/**
* A clustering model for K-means. Each point belongs to the cluster with the closest center.
*/
class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable {
+
+ private val breezeClusterCenters = clusterCenters.map { v =>
+ new BreezeDenseVector[Double](v)
+ }
+
/** Total number of clusters. */
def k: Int = clusterCenters.length
@@ -32,6 +40,10 @@ class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable
KMeans.findClosest(clusterCenters, point)._1
}
+ def predict(point: Vector): Int = {
+ KMeans.findClosest(breezeClusterCenters, point.toBreeze)._1
+ }
+
/**
* Return the K-means cost (sum of squared distances of points to their nearest center) for this
* model on the given data.
@@ -39,4 +51,12 @@ class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable
def computeCost(data: RDD[Array[Double]]): Double = {
data.map(p => KMeans.pointCost(clusterCenters, p)).sum()
}
+
+ /**
+ * Return the K-means cost (sum of squared distances of points to their nearest center) for this
+ * model on the given data.
+ */
+ def computeCost(data: RDD[Vector])(implicit d: DummyImplicit): Double = {
+ data.map(p => KMeans.pointCost(breezeClusterCenters, p.toBreeze)).sum()
+ }
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala
index baf8251d8fc53..9226538fac3ad 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala
@@ -19,31 +19,43 @@ package org.apache.spark.mllib.clustering
import scala.util.Random
-import org.jblas.{DoubleMatrix, SimpleBlas}
+import breeze.linalg.{Vector => BV, DenseVector => BDV}
/**
* An utility object to run K-means locally. This is private to the ML package because it's used
* in the initialization of KMeans but not meant to be publicly exposed.
*/
private[mllib] object LocalKMeans {
+
+ def kMeansPlusPlus(
+ seed: Int,
+ points: Array[Array[Double]],
+ weights: Array[Double],
+ k: Int,
+ maxIterations: Int
+ ): Array[Array[Double]] = {
+ val breezePoints = points.map(v => new BDV[Double](v).asInstanceOf[BV[Double]])
+ val breezeCenters = kMeansPlusPlus(seed, breezePoints, weights, k, maxIterations)
+ breezeCenters.map(_.toArray)
+ }
+
/**
* Run K-means++ on the weighted point set `points`. This first does the K-means++
- * initialization procedure and then roudns of Lloyd's algorithm.
+ * initialization procedure and then rounds of Lloyd's algorithm.
*/
def kMeansPlusPlus(
seed: Int,
- points: Array[Array[Double]],
+ points: Array[BV[Double]],
weights: Array[Double],
k: Int,
- maxIterations: Int)
- : Array[Array[Double]] =
- {
+ maxIterations: Int
+ )(implicit d: DummyImplicit): Array[BV[Double]] = {
val rand = new Random(seed)
val dimensions = points(0).length
- val centers = new Array[Array[Double]](k)
+ val centers = new Array[BV[Double]](k)
- // Initialize centers by sampling using the k-means++ procedure
- centers(0) = pickWeighted(rand, points, weights)
+ // Initialize centers by sampling using the k-means++ procedure.
+ centers(0) = (pickWeighted(rand, points, weights)).toDenseVector
for (i <- 1 until k) {
// Pick the next center with a probability proportional to cost under current centers
val curCenters = centers.slice(0, i)
@@ -57,7 +69,7 @@ private[mllib] object LocalKMeans {
cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
j += 1
}
- centers(i) = points(j-1)
+ centers(i) = points(j-1).toDenseVector
}
// Run up to maxIterations iterations of Lloyd's algorithm
@@ -66,11 +78,13 @@ private[mllib] object LocalKMeans {
var moved = true
while (moved && iteration < maxIterations) {
moved = false
- val sums = Array.fill(k)(new DoubleMatrix(dimensions))
+ val sums = Array.fill(k)(
+ new BDV[Double](new Array[Double](dimensions)).asInstanceOf[BV[Double]]
+ )
val counts = Array.fill(k)(0.0)
for ((p, i) <- points.zipWithIndex) {
val index = KMeans.findClosest(centers, p)._1
- SimpleBlas.axpy(weights(i), new DoubleMatrix(p), sums(index))
+ breeze.linalg.axpy(weights(i), p, sums(index))
counts(index) += weights(i)
if (index != oldClosest(i)) {
moved = true
@@ -81,9 +95,10 @@ private[mllib] object LocalKMeans {
for (i <- 0 until k) {
if (counts(i) == 0.0) {
// Assign center to a random point
- centers(i) = points(rand.nextInt(points.length))
+ centers(i) = points(rand.nextInt(points.length)).toDenseVector
} else {
- centers(i) = sums(i).divi(counts(i)).data
+ sums(i) /= counts(i)
+ centers(i) = sums(i)
}
}
iteration += 1
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
new file mode 100644
index 0000000000000..b95889f9a44ff
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector,
+ SparseVector => BreezeSparseVector}
+
+/**
+ * Represents a numeric vector, whose index type is Int and value type is Double.
+ */
+trait Vector extends Serializable {
+
+ /**
+ * Size of the vector.
+ */
+ def size: Int
+
+ /**
+ * Converts the instance to a Mahout vector wrapper.
+ */
+ private[mllib] def toBreeze: BreezeVector[Double]
+}
+
+/**
+ * Represents a vector with random access to its elements.
+ *
+ */
+trait RandomAccessVector extends Vector {
+ // empty
+}
+
+/**
+ * Factory methods for [[org.apache.spark.mllib.linalg.Vector]].
+ */
+object Vectors {
+
+ /** Creates a dense vector. */
+ def dense(values: Array[Double]): Vector = new DenseVector(values)
+
+ /**
+ * Creates a sparse vector providing its index array and value array.
+ *
+ * @param size vector size.
+ * @param indices index array, must be strictly increasing.
+ * @param values value array, must have the same length as indices.
+ */
+ def sparse(size: Int, indices: Array[Int], values: Array[Double]): Vector =
+ new SparseVector(size, indices, values)
+
+ /**
+ * Creates a sparse vector using unordered (index, value) pairs.
+ *
+ * @param size vector size.
+ * @param elements vector elements in (index, value) pairs.
+ */
+ def sparse(size: Int, elements: Iterable[(Int, Double)]): Vector = {
+
+ require(size > 0)
+
+ val (indices, values) = elements.toArray.sortBy(_._1).unzip
+ var prev = -1
+ indices.foreach { i =>
+ require(prev < i, "Found duplicate indices: " + i)
+ prev = i
+ }
+ require(prev < size)
+
+ new SparseVector(size, indices.toArray, values.toArray)
+ }
+
+ /**
+ * Creates a vector instance from a breeze vector.
+ */
+ private[mllib] def fromBreeze(breezeVector: BreezeVector[Double]): Vector = {
+ breezeVector match {
+ case v: BreezeDenseVector[Double] => {
+ require(v.offset == 0)
+ require(v.stride == 1)
+ new DenseVector(v.data)
+ }
+ case v: BreezeSparseVector[Double] => {
+ new SparseVector(v.length, v.index, v.data)
+ }
+ case v: BreezeVector[_] => {
+ sys.error("Unsupported Breeze vector type: " + v.getClass.getName)
+ }
+ }
+ }
+}
+
+/**
+ * A dense vector represented by a value array.
+ *
+ * @param values
+ */
+class DenseVector(var values: Array[Double]) extends RandomAccessVector {
+
+ override def size: Int = values.length
+
+ override def toString = values.mkString("[", ",", "]")
+
+ private[mllib] override def toBreeze = new BreezeDenseVector[Double](values)
+}
+
+/**
+ * A sparse vector represented by an index array and an value array.
+ *
+ * @param n size of the vector.
+ * @param indices index array, assume to be strictly increasing.
+ * @param values value array, must have the same length as the index array.
+ */
+class SparseVector(var n: Int, var indices: Array[Int], var values: Array[Double]) extends Vector {
+
+ override def size: Int = n
+
+ override def toString = {
+ "(" + n + "," + indices.zip(values).mkString("[", "," ,"]") + ")"
+ }
+
+ private[mllib] override def toBreeze = new BreezeSparseVector[Double](indices, values, n)
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
index 4ef1d1f64ff06..cabadd15731cb 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/KMeansSuite.scala
@@ -17,11 +17,10 @@
package org.apache.spark.mllib.clustering
-
-import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSuite
import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
class KMeansSuite extends FunSuite with LocalSparkContext {
@@ -131,6 +130,45 @@ class KMeansSuite extends FunSuite with LocalSparkContext {
assertSetsEqual(model.clusterCenters, Array(Array(1.0, 3.0, 4.0)))
}
+ test("single cluster with sparse data") {
+ val n = 1000
+ val smallData = Array(
+ Vectors.sparse(n, Seq((0, 1.0), (1, 2.0), (2, 6.0))),
+ Vectors.sparse(n, Seq((0, 1.0), (1, 3.0))),
+ Vectors.sparse(n, Seq((0, 1.0), (1, 4.0), (2, 6.0)))
+ )
+ val data = sc.parallelize((1 to 100).flatMap(_ => smallData), 4)
+
+ // No matter how many runs or iterations we use, we should get one cluster,
+ // centered at the mean of the points
+
+ val center = new Array[Double](n)
+ center(0) = 1.0
+ center(1) = 3.0
+ center(2) = 4.0
+
+ var model = KMeans.train(data, k=1, maxIterations=1)
+ assertSetsEqual(model.clusterCenters, Array(center))
+
+ model = KMeans.train(data, k=1, maxIterations=2)
+ assertSetsEqual(model.clusterCenters, Array(center))
+
+ model = KMeans.train(data, k=1, maxIterations=5)
+ assertSetsEqual(model.clusterCenters, Array(center))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+ assertSetsEqual(model.clusterCenters, Array(center))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=5)
+ assertSetsEqual(model.clusterCenters, Array(center))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=RANDOM)
+ assertSetsEqual(model.clusterCenters, Array(center))
+
+ model = KMeans.train(data, k=1, maxIterations=1, runs=1, initializationMode=K_MEANS_PARALLEL)
+ assertSetsEqual(model.clusterCenters, Array(center))
+ }
+
test("k-means|| initialization") {
val points = Array(
Array(1.0, 2.0, 6.0),
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeVectorConversionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeVectorConversionSuite.scala
new file mode 100644
index 0000000000000..aacaa300849aa
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/BreezeVectorConversionSuite.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.scalatest.FunSuite
+
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
+
+/**
+ * Test Breeze vector conversions.
+ */
+class BreezeVectorConversionSuite extends FunSuite {
+
+ val arr = Array(0.1, 0.2, 0.3, 0.4)
+ val n = 20
+ val indices = Array(0, 3, 5, 10, 13)
+ val values = Array(0.1, 0.5, 0.3, -0.8, -1.0)
+
+ test("dense to breeze") {
+ val vec = Vectors.dense(arr)
+ assert(vec.toBreeze === new BDV[Double](arr))
+ }
+
+ test("sparse to breeze") {
+ val vec = Vectors.sparse(n, indices, values)
+ assert(vec.toBreeze === new BSV[Double](indices, values, n))
+ }
+
+ test("dense breeze to vector") {
+ val breeze = new BDV[Double](arr)
+ val vec = Vectors.fromBreeze(breeze).asInstanceOf[DenseVector]
+ assert(vec.size === arr.length)
+ assert(vec.values.eq(arr), "should not copy data")
+ }
+
+ test("sparse breeze to vector") {
+ val breeze = new BSV[Double](indices, values, n)
+ val vec = Vectors.fromBreeze(breeze).asInstanceOf[SparseVector]
+ assert(vec.size === n)
+ assert(vec.indices.eq(indices), "should not copy data")
+ assert(vec.values.eq(values), "should not copy data")
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorSuite.scala
new file mode 100644
index 0000000000000..e3ee97121f822
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import org.scalatest.FunSuite
+
+class VectorSuite extends FunSuite {
+
+ val arr = Array(0.1, 0.2, 0.3, 0.4)
+ val n = 20
+ val indices = Array(0, 3, 5, 10, 13)
+ val values = Array(0.1, 0.5, 0.3, -0.8, -1.0)
+
+ test("dense vector construction") {
+ val vec = Vectors.dense(arr).asInstanceOf[DenseVector]
+ assert(vec.size === arr.length)
+ assert(vec.values.eq(arr))
+ }
+
+ test("sparse vector construction") {
+ val vec = Vectors.sparse(n, indices, values).asInstanceOf[SparseVector]
+ assert(vec.size === n)
+ assert(vec.indices.eq(indices))
+ assert(vec.values.eq(values))
+ }
+
+ test("sparse vector construction with unordered elements") {
+ val vec = Vectors.sparse(n, indices.zip(values).reverse).asInstanceOf[SparseVector]
+ assert(vec.size === n)
+ assert(vec.indices === indices)
+ assert(vec.values === values)
+ }
+}
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 138aad7561043..83dd439720172 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -350,7 +350,8 @@ object SparkBuild extends Build {
def mllibSettings = sharedSettings ++ Seq(
name := "spark-mllib",
libraryDependencies ++= Seq(
- "org.jblas" % "jblas" % "1.2.3"
+ "org.jblas" % "jblas" % "1.2.3",
+ "org.scalanlp" %% "breeze" % "0.7-SNAPSHOT"
)
)