Skip to content

Commit

Permalink
port pca to RowRDDMatrix, and add multiply and covariance
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Apr 2, 2014
1 parent 7836e2f commit 4cf679c
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,14 @@ object Matrices {
new DenseMatrix(m, n, values)
}

private[mllib] def fromBreeze(breeze: BDM[Double]): Matrix = {
null
private[mllib] def fromBreeze(breeze: BM[Double]): Matrix = {
breeze match {
case dm: BDM[Double] =>
require(dm.majorStride == dm.rows)
new DenseMatrix(dm.rows, dm.cols, dm.data)
case _ =>
throw new UnsupportedOperationException(
s"Do not support conversion from type ${breeze.getClass.getName}.")
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.linalg

/** Represents SVD factors */
/** Represents singular value decomposition (SVD) factors. */
case class SingularValueDecomposition[UType, VType](U: UType, s: Vector, V: VType)
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import org.apache.spark.rdd.RDD
/**
* Represents a row-oriented RDDMatrix with indexed rows.
*
* @param rows
* @param m
* @param n
* @param rows indexed rows of this matrix
* @param m number of rows, where a negative number means unknown
* @param n number of cols, where a negative number means unknown
*/
class IndexedRowRDDMatrix(
val rows: RDD[RDDMatrixRow],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import com.github.fommil.netlib.BLAS.{getInstance => blas}

import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD
import org.apache.spark.Logging

/**
* Represents a row-oriented RDDMatrix with no meaningful row indices.
Expand All @@ -38,7 +39,7 @@ import org.apache.spark.rdd.RDD
class RowRDDMatrix(
val rows: RDD[Vector],
m: Long = -1L,
n: Long = -1) extends RDDMatrix {
n: Long = -1) extends RDDMatrix with Logging {

private var _m = m
private var _n = n
Expand All @@ -60,9 +61,9 @@ class RowRDDMatrix(
}

/**
* Computes the gram matrix `A^T A`.
* Computes the Gramian matrix `A^T A`.
*/
def gram(): Matrix = {
def computeGramianMatrix(): Matrix = {
val n = numCols().toInt
val nt: Int = n * (n + 1) / 2

Expand All @@ -80,31 +81,30 @@ class RowRDDMatrix(


/**
* Singular Value Decomposition for Tall and Skinny matrices.
* Given an m x n matrix A, this will compute matrices U, S, V such that
* A = U * S * V'
* Computes the singular value decomposition of this matrix.
* Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'.
*
* There is no restriction on m, but we require n^2 doubles to fit in memory.
* There is no restriction on m, but we require `n^2` doubles to fit in memory.
* Further, n should be less than m.
*
* The decomposition is computed by first computing A'A = V S^2 V',
* computing svd locally on that (since n x n is small),
* from which we recover S and V.
* Then we compute U via easy matrix multiplication
* as U = A * V * S^-1
* computing svd locally on that (since n x n is small), from which we recover S and V.
* Then we compute U via easy matrix multiplication as U = A * (V * S^-1).
* Note that this approach requires `O(n^3)` time on the master node.
*
* Only the k largest singular values and associated vectors are found.
* At most k largest non-zero singular values and associated vectors are returned.
* If there are k such values, then the dimensions of the return will be:
*
* S is k x k and diagonal, holding the singular values on diagonal
* U is m x k and satisfies U'U = eye(k)
* V is n x k and satisfies V'V = eye(k)
*
* The return values are as lean as possible: an RDD of rows for U,
* a simple array for sigma, and a dense 2d matrix array for V
* U is a RowRDDMatrix of size m x k that satisfies U'U = eye(k),
* s is a Vector of size k, holding the singular values in descending order,
* and V is a Matrix of size n x k that satisfies V'V = eye(k).
*
* @param matrix dense matrix to factorize
* @return Three matrices: U, S, V such that A = USV^T
* @param k number of singular values to keep. We might return less than k if there are
* numerically zero singular values. See rCond.
* @param computeU whether to compute U
* @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
* are treated as zero, where sigma(0) is the largest singular value.
* @return SingularValueDecomposition(U, s, V)
*/
def computeSVD(
k: Int,
Expand All @@ -113,9 +113,9 @@ class RowRDDMatrix(

val n = numCols().toInt

require(k >= 0 && k <= n, s"Request up to n singular values k=$k n=$n.")
require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")

val G = gram()
val G = computeGramianMatrix()

// TODO: Use sparse SVD instead.
val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) =
Expand Down Expand Up @@ -152,15 +152,110 @@ class RowRDDMatrix(
}
i += 1
}
val Nb = rows.context.broadcast(N)
val rowsU = rows.map { row =>
Vectors.fromBreeze(Nb.value * row.toBreeze)
}
SingularValueDecomposition(new RowRDDMatrix(rowsU, _m, n), s, V)
val U = this.multiply(Matrices.fromBreeze(N))
SingularValueDecomposition(U, s, V)
} else {
SingularValueDecomposition(null, s, V)
}
}

/**
* Computes the covariance matrix, treating each row as an observation.
* @return a local dense matrix of size n x n
*/
def computeCovariance(): Matrix = {
val n = numCols().toInt

if (n > 10000) {
val mem = n * n * java.lang.Double.SIZE / java.lang.Byte.SIZE
logWarning(s"The number of columns $n is greater than 10000! " +
s"We need at least $mem bytes of memory.")
}

val (m, mean) = rows.aggregate[(Long, BDV[Double])]((0L, BDV.zeros[Double](n)))(
seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += v.toBreeze),
combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => (s1._1 + s2._1, s1._2 += s2._2)
)

// Update _m if it is not set, or verify its value.
if (_m < 0L) {
_m = m
} else {
require(_m == m,
s"The number of rows $m is different from what specified or previously computed: ${_m}.")
}

mean :/= m.toDouble

// We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is
// large but Cov(X, Y) is small, but it is good for sparse computation.
// TODO: find a fast and stable way for sparse data.

val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]

var i = 0
var j = 0
val m1 = m - 1.0
var alpha = 0.0
while (i < n) {
alpha = m / m1 * mean(i)
j = 0
while (j < n) {
G(i, j) = G(i, j) / m1 - alpha * mean(j)
j += 1
}
i += 1
}

Matrices.fromBreeze(G)
}

/**
* Computes the top k principal components.
* Rows correspond to observations and columns correspond to variables.
* The principal components are stored a local matrix of size n-by-k.
* Each column corresponds for one principal component,
* and the columns are in descending order of component variance.
*
* @param k number of top principal components.
* @return a matrix of size n-by-k, whose columns are principal components
*/
def computePrincipalComponents(k: Int): Matrix = {
val n = numCols().toInt
require(k > 0 && k <= n, s"k = $k out of range (0, n = $n]")

val Cov = computeCovariance().toBreeze.asInstanceOf[BDM[Double]]

val (u: BDM[Double], _, _) = brzSvd(Cov)

if (k == n) {
Matrices.dense(n, k, u.data)
} else {
Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k))
}
}

/**
* Multiply this matrix by a local matrix on the right.
*
* @param B a local matrix whose number of rows must match the number of columns of this matrix
* @return a RowRDDMatrix representing the product, which preserves partitioning
*/
def multiply(B: Matrix): RowRDDMatrix = {
val n = numCols().toInt
require(n == B.m, s"Dimension mismatch: $n vs ${B.m}")

require(B.isInstanceOf[DenseMatrix],
s"Only support dense matrix at this time but found ${B.getClass.getName}.")

val Bb = rows.context.broadcast(B)
val AB = rows.mapPartitions({ iter =>
val Bi = Bb.value.toBreeze.asInstanceOf[BDM[Double]]
iter.map(v => Vectors.fromBreeze(Bi.t * v.toBreeze))
}, preservesPartitioning = true)

new RowRDDMatrix(AB, _m, B.n)
}
}

object RowRDDMatrix {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package org.apache.spark.mllib.linalg.rdd

import org.scalatest.FunSuite

import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, diag => brzDiag}
import breeze.linalg.{DenseVector => BDV, DenseMatrix => BDM, diag => brzDiag, norm => brzNorm}

import org.apache.spark.mllib.util.LocalSparkContext
import org.apache.spark.mllib.linalg.{Matrices, Vectors}
import org.apache.spark.mllib.linalg.{Matrices, Vectors, Matrix}

class RowRDDMatrixSuite extends FunSuite with LocalSparkContext {

Expand All @@ -42,6 +42,10 @@ class RowRDDMatrixSuite extends FunSuite with LocalSparkContext {
Vectors.sparse(3, Seq((0, 9.0), (2, 1.0)))
)

val principalComponents = Matrices.dense(n, n,
Array(0.0, math.sqrt(2.0) / 2.0, math.sqrt(2.0) / 2.0, 1.0, 0.0, 0.0,
0.0, math.sqrt(2.0) / 2.0, - math.sqrt(2.0) / 2.0))

var denseMat: RowRDDMatrix = _
var sparseMat: RowRDDMatrix = _

Expand All @@ -62,7 +66,7 @@ class RowRDDMatrixSuite extends FunSuite with LocalSparkContext {
val expected =
Matrices.dense(n, n, Array(126.0, 54.0, 72.0, 54.0, 66.0, 78.0, 72.0, 78.0, 94.0))
for (mat <- Seq(denseMat, sparseMat)) {
val G = mat.gram()
val G = mat.computeGramianMatrix()
assert(G.toBreeze === expected.toBreeze)
}
}
Expand All @@ -79,12 +83,52 @@ class RowRDDMatrixSuite extends FunSuite with LocalSparkContext {
assert(closeToZero(brzUt.t * brzDiag(brzSigma) * brzV.t - A))
val VtV: BDM[Double] = brzV.t * brzV
assert(closeToZero(VtV - BDM.eye[Double](n)))
val UtU = U.gram().toBreeze.asInstanceOf[BDM[Double]]
val UtU = U.computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
assert(closeToZero(UtU - BDM.eye[Double](n)))
}
}

def closeToZero(G: BDM[Double]): Boolean = {
G.valuesIterator.map(math.abs).sum < 1e-6
}

def closeToZero(v: BDV[Double]): Boolean = {
brzNorm(v, 1.0) < 1e-6
}

def assertPrincipalComponentsEqual(a: Matrix, b: Matrix, k: Int) {
val brzA = a.toBreeze.asInstanceOf[BDM[Double]]
val brzB = b.toBreeze.asInstanceOf[BDM[Double]]
assert(brzA.rows === brzB.rows)
for (j <- 0 until k) {
val aj = brzA(::, j)
val bj = brzB(::, j)
assert(closeToZero(aj - bj) || closeToZero(aj + bj),
s"The $j-th components mismatch: $aj and $bj")
}
}

test("pca") {
for (mat <- Seq(denseMat, sparseMat); k <- 1 to n) {
val pc = denseMat.computePrincipalComponents(k)
assert(pc.m === n)
assert(pc.n === k)
assertPrincipalComponentsEqual(pc, principalComponents, k)
}
}

test("multiply a local matrix") {
val B = Matrices.dense(n, 2, Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0))
for (mat <- Seq(denseMat, sparseMat)) {
val AB = mat.multiply(B)
assert(AB.numRows() === m)
assert(AB.numCols() === 2)
assert(AB.rows.collect().toSeq === Seq(
Vectors.dense(5.0, 14.0),
Vectors.dense(14.0, 50.0),
Vectors.dense(23.0, 86.0),
Vectors.dense(2.0, 32.0)
))
}
}
}

0 comments on commit 4cf679c

Please sign in to comment.