Skip to content

Commit

Permalink
[SPARK-4611][MLlib] Implement the efficient vector norm
Browse files Browse the repository at this point in the history
The vector norm in breeze is implemented by `activeIterator` which is known to be very slow.
In this PR, an efficient vector norm is implemented, and with this API, `Normalizer` and
`k-means` have big performance improvement.

Here is the benchmark against mnist8m dataset.

a) `Normalizer`
Before
DenseVector: 68.25secs
SparseVector: 17.01secs

With this PR
DenseVector: 12.71secs
SparseVector: 2.73secs

b) `k-means`
Before
DenseVector: 83.46secs
SparseVector: 61.60secs

With this PR
DenseVector: 70.04secs
SparseVector: 59.05secs

Author: DB Tsai <[email protected]>

Closes apache#3462 from dbtsai/norm and squashes the following commits:

63c7165 [DB Tsai] typo
0c3637f [DB Tsai] add import org.apache.spark.SparkContext._ back
6fa616c [DB Tsai] address feedback
9b7cb56 [DB Tsai] move norm to static method
0b632e6 [DB Tsai] kmeans
dbed124 [DB Tsai] style
c1a877c [DB Tsai] first commit
  • Loading branch information
DB Tsai authored and mengxr committed Dec 2, 2014
1 parent b0a46d8 commit 64f3175
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.mllib.clustering

import scala.collection.mutable.ArrayBuffer

import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm}
import breeze.linalg.{DenseVector => BDV, Vector => BV}

import org.apache.spark.annotation.Experimental
import org.apache.spark.Logging
Expand Down Expand Up @@ -125,7 +125,7 @@ class KMeans private (
}

// Compute squared norms and cache them.
val norms = data.map(v => breezeNorm(v.toBreeze, 2.0))
val norms = data.map(Vectors.norm(_, 2.0))
norms.persist()
val breezeData = data.map(_.toBreeze).zip(norms).map { case (v, norm) =>
new BreezeVectorWithNorm(v, norm)
Expand Down Expand Up @@ -425,7 +425,7 @@ object KMeans {
private[clustering]
class BreezeVectorWithNorm(val vector: BV[Double], val norm: Double) extends Serializable {

def this(vector: BV[Double]) = this(vector, breezeNorm(vector, 2.0))
def this(vector: BV[Double]) = this(vector, Vectors.norm(Vectors.fromBreeze(vector), 2.0))

def this(array: Array[Double]) = this(new BDV[Double](array))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.mllib.feature

import breeze.linalg.{norm => brzNorm}

import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}

Expand Down Expand Up @@ -47,7 +45,7 @@ class Normalizer(p: Double) extends VectorTransformer {
* @return normalized vector. If the norm of the input is zero, it will return the input vector.
*/
override def transform(vector: Vector): Vector = {
val norm = brzNorm(vector.toBreeze, p)
val norm = Vectors.norm(vector, p)

if (norm != 0.0) {
// For dense vector, we've to allocate new memory for new output vector.
Expand Down
51 changes: 51 additions & 0 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,57 @@ object Vectors {
sys.error("Unsupported Breeze vector type: " + v.getClass.getName)
}
}

/**
* Returns the p-norm of this vector.
* @param vector input vector.
* @param p norm.
* @return norm in L^p^ space.
*/
private[spark] def norm(vector: Vector, p: Double): Double = {
require(p >= 1.0)
val values = vector match {
case dv: DenseVector => dv.values
case sv: SparseVector => sv.values
case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
}
val size = values.size

if (p == 1) {
var sum = 0.0
var i = 0
while (i < size) {
sum += math.abs(values(i))
i += 1
}
sum
} else if (p == 2) {
var sum = 0.0
var i = 0
while (i < size) {
sum += values(i) * values(i)
i += 1
}
math.sqrt(sum)
} else if (p == Double.PositiveInfinity) {
var max = 0.0
var i = 0
while (i < size) {
val value = math.abs(values(i))
if (value > max) max = value
i += 1
}
max
} else {
var sum = 0.0
var i = 0
while (i < size) {
sum += math.pow(math.abs(values(i)), p)
i += 1
}
math.pow(sum, 1.0 / p)
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import breeze.linalg.{DenseMatrix => BDM}
import org.scalatest.FunSuite

import org.apache.spark.SparkException
import org.apache.spark.mllib.util.TestingUtils._

class VectorsSuite extends FunSuite {

Expand Down Expand Up @@ -197,4 +198,27 @@ class VectorsSuite extends FunSuite {
assert(svMap.get(2) === Some(3.1))
assert(svMap.get(3) === Some(0.0))
}

test("vector p-norm") {
val dv = Vectors.dense(0.0, -1.2, 3.1, 0.0, -4.5, 1.9)
val sv = Vectors.sparse(6, Seq((1, -1.2), (2, 3.1), (3, 0.0), (4, -4.5), (5, 1.9)))

assert(Vectors.norm(dv, 1.0) ~== dv.toArray.foldLeft(0.0)((a, v) =>
a + math.abs(v)) relTol 1E-8)
assert(Vectors.norm(sv, 1.0) ~== sv.toArray.foldLeft(0.0)((a, v) =>
a + math.abs(v)) relTol 1E-8)

assert(Vectors.norm(dv, 2.0) ~== math.sqrt(dv.toArray.foldLeft(0.0)((a, v) =>
a + v * v)) relTol 1E-8)
assert(Vectors.norm(sv, 2.0) ~== math.sqrt(sv.toArray.foldLeft(0.0)((a, v) =>
a + v * v)) relTol 1E-8)

assert(Vectors.norm(dv, Double.PositiveInfinity) ~== dv.toArray.map(math.abs).max relTol 1E-8)
assert(Vectors.norm(sv, Double.PositiveInfinity) ~== sv.toArray.map(math.abs).max relTol 1E-8)

assert(Vectors.norm(dv, 3.7) ~== math.pow(dv.toArray.foldLeft(0.0)((a, v) =>
a + math.pow(math.abs(v), 3.7)), 1.0 / 3.7) relTol 1E-8)
assert(Vectors.norm(sv, 3.7) ~== math.pow(sv.toArray.foldLeft(0.0)((a, v) =>
a + math.pow(math.abs(v), 3.7)), 1.0 / 3.7) relTol 1E-8)
}
}

0 comments on commit 64f3175

Please sign in to comment.