Skip to content

Commit

Permalink
add dense/sparse vector data models and conversions to/from breeze ve…
Browse files Browse the repository at this point in the history
…ctors

use breeze to implement KMeans in order to support both dense and sparse data
  • Loading branch information
mengxr committed Mar 10, 2014
1 parent b7cd9e9 commit 07ffaf2
Show file tree
Hide file tree
Showing 10 changed files with 447 additions and 44 deletions.
15 changes: 15 additions & 0 deletions NOTICE
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,18 @@ Copyright 2013 The Apache Software Foundation.

This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).

Numerical linear algebra support in MLlib is provided by the breeze package,
which depends on the following packages that are not distributed under
Apache authorized licenses:

- netlib-core, which is open source software written by Samuel Halliday,
and copyright by the University of Tennessee, the University of Tennessee
Research Foundation, the University of California at Berkeley, and the
University of Colorado at Denver. The original software is available from
https://github.com/fommil/netlib-java

- JTransforms, which is open source software written by Piotr Wendykier,
and distributed under the the terms of the MPL/LGPL/GPL tri-license.
The original software is available from
https://sites.google.com/site/piotrwendykier/software/jtransforms
5 changes: 5 additions & 0 deletions mllib/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@
<artifactId>jblas</artifactId>
<version>1.2.3</version>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
<artifactId>breeze_${scala.binary.version}</artifactId>
<version>0.7-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
Expand Down
121 changes: 94 additions & 27 deletions mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ package org.apache.spark.mllib.clustering

import scala.collection.mutable.ArrayBuffer

import org.jblas.DoubleMatrix
import breeze.linalg.{DenseVector => BDV, Vector => BV, squaredDistance => breezeSquaredDistance}

import org.apache.spark.SparkContext
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
import org.apache.spark.util.random.XORShiftRandom


/**
* K-means clustering with support for multiple parallel runs and a k-means++ like initialization
* mode (the k-means|| algorithm by Bahmani et al). When multiple concurrent runs are requested,
Expand All @@ -46,8 +45,6 @@ class KMeans private (
var epsilon: Double)
extends Serializable with Logging
{
private type ClusterCenters = Array[Array[Double]]

def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)

/** Set the number of clusters to create (k). Default: 2. */
Expand Down Expand Up @@ -114,6 +111,23 @@ class KMeans private (
* performance, because this is an iterative algorithm.
*/
def run(data: RDD[Array[Double]]): KMeansModel = {
val breezeData = data.map(v => new BDV[Double](v).asInstanceOf[BV[Double]])
runBreeze(breezeData)
}

/**
* Train a K-means model on the given set of points; `data` should be cached for high
* performance, because this is an iterative algorithm.
*/
def run(data: RDD[Vector])(implicit d: DummyImplicit): KMeansModel = {
val breezeData = data.map(v => v.toBreeze)
runBreeze(breezeData)
}

/**
* Implementation using Breeze.
*/
private def runBreeze(data: RDD[BV[Double]]): KMeansModel = {
// TODO: check whether data is persistent; this needs RDD.storageLevel to be publicly readable

val sc = data.sparkContext
Expand All @@ -132,9 +146,9 @@ class KMeans private (

// Execute iterations of Lloyd's algorithm until all runs have converged
while (iteration < maxIterations && !activeRuns.isEmpty) {
type WeightedPoint = (DoubleMatrix, Long)
type WeightedPoint = (BDV[Double], Long)
def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): WeightedPoint = {
(p1._1.addi(p2._1), p1._2 + p2._2)
(p1._1 += p2._1, p1._2 + p2._2)
}

val activeCenters = activeRuns.map(r => centers(r)).toArray
Expand All @@ -146,13 +160,13 @@ class KMeans private (
val k = activeCenters(0).length
val dims = activeCenters(0)(0).length

val sums = Array.fill(runs, k)(new DoubleMatrix(dims))
val sums = Array.fill(runs, k)(BDV.zeros[Double](dims))
val counts = Array.fill(runs, k)(0L)

for (point <- points; (centers, runIndex) <- activeCenters.zipWithIndex) {
val (bestCenter, cost) = KMeans.findClosest(centers, point)
costAccums(runIndex) += cost
sums(runIndex)(bestCenter).addi(new DoubleMatrix(point))
sums(runIndex)(bestCenter) += point
counts(runIndex)(bestCenter) += 1
}

Expand All @@ -168,8 +182,9 @@ class KMeans private (
for (j <- 0 until k) {
val (sum, count) = totalContribs((i, j))
if (count != 0) {
val newCenter = sum.divi(count).data
if (MLUtils.squaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {
sum /= count.toDouble
val newCenter = sum
if (breezeSquaredDistance(newCenter, centers(run)(j)) > epsilon * epsilon) {
changed = true
}
centers(run)(j) = newCenter
Expand All @@ -187,16 +202,20 @@ class KMeans private (
}

val bestRun = costs.zipWithIndex.min._2
new KMeansModel(centers(bestRun))
new KMeansModel(centers(bestRun).map { v =>
v.toArray
})
}

/**
* Initialize `runs` sets of cluster centers at random.
*/
private def initRandom(data: RDD[Array[Double]]): Array[ClusterCenters] = {
private def initRandom(data: RDD[BV[Double]]): Array[Array[BV[Double]]] = {
// Sample all the cluster centers in one pass to avoid repeated scans
val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq
Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).toArray)
Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v =>
v.toDenseVector
}.toArray)
}

/**
Expand All @@ -208,41 +227,39 @@ class KMeans private (
*
* The original paper can be found at http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf.
*/
private def initKMeansParallel(data: RDD[Array[Double]]): Array[ClusterCenters] = {
private def initKMeansParallel(data: RDD[BV[Double]]): Array[Array[BV[Double]]] = {
// Initialize each run's center to a random point
val seed = new XORShiftRandom().nextInt()
val sample = data.takeSample(true, runs, seed).toSeq
val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r)))
val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDenseVector))

// On each step, sample 2 * k points on average for each run with probability proportional
// to their squared distance from that run's current centers
for (step <- 0 until initializationSteps) {
val centerArrays = centers.map(_.toArray)
val sumCosts = data.flatMap { point =>
for (r <- 0 until runs) yield (r, KMeans.pointCost(centerArrays(r), point))
for (r <- 0 until runs) yield (r, KMeans.pointCost(centers(r), point))
}.reduceByKey(_ + _).collectAsMap()
val chosen = data.mapPartitionsWithIndex { (index, points) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
for {
p <- points
r <- 0 until runs
if rand.nextDouble() < KMeans.pointCost(centerArrays(r), p) * 2 * k / sumCosts(r)
if rand.nextDouble() < KMeans.pointCost(centers(r), p) * 2 * k / sumCosts(r)
} yield (r, p)
}.collect()
for ((r, p) <- chosen) {
centers(r) += p
centers(r) += p.toDenseVector
}
}

// Finally, we might have a set of more than k candidate centers for each run; weigh each
// candidate by the number of points in the dataset mapping to it and run a local k-means++
// on the weighted centers to pick just k of them
val centerArrays = centers.map(_.toArray)
val weightMap = data.flatMap { p =>
for (r <- 0 until runs) yield ((r, KMeans.findClosest(centerArrays(r), p)._1), 1.0)
for (r <- 0 until runs) yield ((r, KMeans.findClosest(centers(r), p)._1), 1.0)
}.reduceByKey(_ + _).collectAsMap()
val finalCenters = (0 until runs).map { r =>
val myCenters = centers(r).toArray
val myCenters = centers(r).toArray.asInstanceOf[Array[BV[Double]]]
val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray
LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
}
Expand All @@ -256,6 +273,7 @@ class KMeans private (
* Top-level methods for calling K-means clustering.
*/
object KMeans {

// Initialization mode names
val RANDOM = "random"
val K_MEANS_PARALLEL = "k-means||"
Expand All @@ -268,18 +286,42 @@ object KMeans {
initializationMode: String)
: KMeansModel =
{
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
.setInitializationMode(initializationMode)
.run(data)
}

def train(data: RDD[Array[Double]], k: Int, maxIterations: Int, runs: Int): KMeansModel = {
train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
}

def train(data: RDD[Array[Double]], k: Int, maxIterations: Int): KMeansModel = {
train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
}

def train(
data: RDD[Vector],
k: Int,
maxIterations: Int,
runs: Int,
initializationMode: String
)(implicit d: DummyImplicit): KMeansModel = {
new KMeans().setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
.setInitializationMode(initializationMode)
.run(data)
}

def train(data: RDD[Array[Double]], k: Int, maxIterations: Int, runs: Int): KMeansModel = {
def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int)
(implicit d: DummyImplicit): KMeansModel = {
train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
}

def train(data: RDD[Array[Double]], k: Int, maxIterations: Int): KMeansModel = {
def train(data: RDD[Vector], k: Int, maxIterations: Int)
(implicit d: DummyImplicit): KMeansModel = {
train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
}

Expand All @@ -301,6 +343,25 @@ object KMeans {
(bestIndex, bestDistance)
}

/**
* Returns the index of the closest center to the given point, as well as the squared distance.
*/
private[mllib] def findClosest(centers: TraversableOnce[BV[Double]], point: BV[Double])
: (Int, Double) = {
var bestDistance = Double.PositiveInfinity
var bestIndex = 0
var i = 0
centers.foreach { v =>
val distance: Double = breezeSquaredDistance(v, point)
if (distance < bestDistance) {
bestDistance = distance
bestIndex = i
}
i += 1
}
(bestIndex, bestDistance)
}

/**
* Return the K-means cost of a given point against the given cluster centers.
*/
Expand All @@ -315,6 +376,12 @@ object KMeans {
bestDistance
}

/**
* Returns the K-means cost of a given point against the given cluster centers.
*/
private[mllib] def pointCost(centers: TraversableOnce[BV[Double]], point: BV[Double]): Double =
findClosest(centers, point)._2

def main(args: Array[String]) {
if (args.length < 4) {
println("Usage: KMeans <master> <input_file> <k> <max_iterations> [<runs>]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@

package org.apache.spark.mllib.clustering

import breeze.linalg.{DenseVector => BreezeDenseVector}

import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.mllib.linalg.Vector

/**
* A clustering model for K-means. Each point belongs to the cluster with the closest center.
*/
class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable {

private val breezeClusterCenters = clusterCenters.map { v =>
new BreezeDenseVector[Double](v)
}

/** Total number of clusters. */
def k: Int = clusterCenters.length

Expand All @@ -32,11 +40,23 @@ class KMeansModel(val clusterCenters: Array[Array[Double]]) extends Serializable
KMeans.findClosest(clusterCenters, point)._1
}

def predict(point: Vector): Int = {
KMeans.findClosest(breezeClusterCenters, point.toBreeze)._1
}

/**
* Return the K-means cost (sum of squared distances of points to their nearest center) for this
* model on the given data.
*/
def computeCost(data: RDD[Array[Double]]): Double = {
data.map(p => KMeans.pointCost(clusterCenters, p)).sum()
}

/**
* Return the K-means cost (sum of squared distances of points to their nearest center) for this
* model on the given data.
*/
def computeCost(data: RDD[Vector])(implicit d: DummyImplicit): Double = {
data.map(p => KMeans.pointCost(breezeClusterCenters, p.toBreeze)).sum()
}
}
Loading

0 comments on commit 07ffaf2

Please sign in to comment.