Skip to content

Commit

Permalink
Merge pull request #1 from mengxr/vrilleup-master
Browse files Browse the repository at this point in the history
Some updates to SVD impl
  • Loading branch information
vrilleup committed Jul 9, 2014
2 parents c273771 + a461082 commit 4c618e9
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 152 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ private[mllib] object EigenValueDecomposition {
* function.
*/
private[mllib] def symmetricEigs(
mul: DenseVector => DenseVector,
mul: BDV[Double] => BDV[Double],
n: Int,
k: Int,
tol: Double,
Expand Down Expand Up @@ -102,9 +102,9 @@ private[mllib] object EigenValueDecomposition {
// multiply working vector with the matrix
val inputOffset = ipntr(0) - 1
val outputOffset = ipntr(1) - 1
val x = w(inputOffset until inputOffset + n)
val y = w(outputOffset until outputOffset + n)
y := BDV(mul(Vectors.fromBreeze(x).asInstanceOf[DenseVector]).toArray)
val x = w.slice(inputOffset, inputOffset + n)
val y = w.slice(outputOffset, outputOffset + n)
y := mul(x)
// call ARPACK's reverse communication
arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr,
workd, workl, workl.length, info)
Expand Down Expand Up @@ -143,13 +143,12 @@ private[mllib] object EigenValueDecomposition {

// copy eigenvectors in descending order of eigenvalues
val sortedU = BDM.zeros[Double](n, computed)
sortedEigenPairs.zipWithIndex.foreach { r => {
val b = r._2 * n
var i = 0
while (i < n) {
sortedU.data(b + i) = r._1._2(i)
i += 1
}
sortedEigenPairs.zipWithIndex.foreach { r =>
val b = r._2 * n
var i = 0
while (i < n) {
sortedU.data(b + i) = r._1._2(i)
i += 1
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.mllib.linalg.distributed

import java.util
import java.util.Arrays

import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
Expand Down Expand Up @@ -202,32 +202,23 @@ class RowMatrix(
}

/**
* Multiply the Gramian matrix `A^T A` by a DenseVector on the right.
* Multiplies the Gramian matrix `A^T A` by a dense vector on the right without computing `A^T A`.
*
* @param v a local DenseVector whose length must match the number of columns of this matrix.
* @return a local DenseVector representing the product.
@param v a dense vector whose length must match the number of columns of this matrix
* @return a dense vector representing the product
*/
private[mllib] def multiplyGramianMatrixBy(v: DenseVector): DenseVector = {
private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = {
val n = numCols().toInt
val vbr = rows.context.broadcast(v.toBreeze)

val bv = rows.aggregate(BDV.zeros[Double](n))(
val vbr = rows.context.broadcast(v)
rows.aggregate(BDV.zeros[Double](n))(
seqOp = (U, r) => {
val rBrz = r.toBreeze
val a = rBrz.dot(vbr.value)
rBrz match {
case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U)
case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U)
case _ =>
throw new UnsupportedOperationException(
s"Do not support vector operation from type ${rBrz.getClass.getName}.")
}
brzAxpy(a, rBrz, U.asInstanceOf[BV[Double]])
U
},
combOp = (U1, U2) => U1 += U2
)

Vectors.fromBreeze(bv).asInstanceOf[DenseVector]
}

/**
Expand All @@ -250,49 +241,51 @@ class RowMatrix(
}

/**
* Computes singular value decomposition of this matrix. Denote this matrix by A (m x n), this
* Computes singular value decomposition of this matrix. Denote this matrix by A (m x n). This
* will compute matrices U, S, V such that A ~= U * S * V', where S contains the leading k
* singular values, U and V contain the corresponding singular vectors.
*
* This approach assumes n is smaller than m, and invokes a dense matrix implementation when n is
* small (n < 100) or the number of requested singular values is the same as n (k == n). For
* problems with large n (n >= 100) and k < n, this approach invokes a sparse matrix
* implementation that provides a function to ARPACK to multiply a vector with A'A. It iteratively
* calls ARPACK-dsaupd on the master node, from which we recover S and V. Then we compute U via
* easy matrix multiplication as U = A * (V * S^{-1}).
*
* The dense implementation requires `n^2` doubles to fit in memory and `O(n^3)` time on the
* master node.
* At most k largest non-zero singular values and associated vectors are returned. If there are k
* such values, then the dimensions of the return will be:
* - U is a RowMatrix of size m x k that satisfies U' * U = eye(k),
* - s is a Vector of size k, holding the singular values in descending order,
* - V is a Matrix of size n x k that satisfies V' * V = eye(k).
*
* The sparse implementation requires `n * (6 * k + 4)` doubles to fit in memory on the master
* node and approximately `O(k * nnz(A))` time distributed on all worker nodes. There is no
* restriction on m (number of rows).
* We assume n is smaller than m. The singular values and the right singular vectors are derived
* from the eigenvalues and the eigenvectors of the Gramian matrix A' * A. U, the matrix
* storing the right singular vectors, is computed via matrix multiplication as
* U = A * (V * S^-1^), if requested by user. The actual method to use is determined
* automatically based on the cost:
* - If n is small (n < 100) or k is large compared with n (k > n / 2), we compute the Gramian
* matrix first and then compute its top eigenvalues and eigenvectors locally on the driver.
* This requires a single pass with O(n^2^) storage on each executor and on the driver, and
* O(n^2^ k) time on the driver.
* - Otherwise, we compute (A' * A) * v in a distributive way and send it to ARPACK's DSAUPD to
* compute (A' * A)'s top eigenvalues and eigenvectors on the driver node. This requires O(k)
* passes, O(n) storage on each executor, and O(n k) storage on the driver.
*
* Several internal parameters are set to default values. The reciprocal condition number rCond
* is set to 1e-9. All singular values smaller than rCond * sigma(0) are treated as zeros, where
* sigma(0) is the largest singular value. The maximum number of Arnoldi update iterations for
* ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK
* ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK's
* eigen-decomposition is set to 1e-10.
*
* At most k largest non-zero singular values and associated vectors are returned.
* If there are k such values, then the dimensions of the return will be:
* @note The conditions that decide which method to use internally and the default parameters are
* subject to change.
*
* U is a RowMatrix of size m x k that satisfies U'U = eye(k),
* s is a Vector of size k, holding the singular values in descending order,
* and V is a Matrix of size n x k that satisfies V'V = eye(k).
*
* @param k number of leading singular values to keep (0 < k <= n). It might return less than
* k if there are numerically zero singular values or there are not enough Ritz values
* @param k number of leading singular values to keep (0 < k <= n). It might return less than k if
* there are numerically zero singular values or there are not enough Ritz values
* converged before the maximum number of Arnoldi update iterations is reached (in case
* that matrix A is ill-conditioned).
* @param computeU whether to compute U.
* @param computeU whether to compute U
* @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
* are treated as zero, where sigma(0) is the largest singular value.
* @return SingularValueDecomposition(U, s, V), U = null if computeU = false
* @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
*/
def computeSVD(k: Int,
computeU: Boolean = false,
rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
def computeSVD(
k: Int,
computeU: Boolean = false,
rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
// maximum number of Arnoldi update iterations for invoking ARPACK
val maxIter = math.max(300, k * 3)
// numerical tolerance for invoking ARPACK
Expand All @@ -301,87 +294,78 @@ class RowMatrix(
}

/**
* Actual SVD computation, visible for testing.
* The actual SVD implementation, visible for testing.
*
* @param k number of leading singular values to keep (0 < k <= n)
* @param computeU whether to compute U
* @param rCond the reciprocal condition number
* @param maxIter max number of iterations (if ARPACK is used)
* @param tol termination tolerance (if ARPACK is used)
* @param mode computation mode (auto: determine automatically which mode to use,
* local-svd: compute gram matrix and computes its full SVD locally,
* local-eigs: compute gram matrix and computes its top eigenvalues locally,
* dist-eigs: compute the top eigenvalues of the gram matrix distributively)
* @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
*/
private[mllib] def computeSVD(k: Int,
computeU: Boolean,
rCond: Double,
maxIter: Int,
tol: Double,
mode: String): SingularValueDecomposition[RowMatrix, Matrix] = {
private[mllib] def computeSVD(
k: Int,
computeU: Boolean,
rCond: Double,
maxIter: Int,
tol: Double,
mode: String): SingularValueDecomposition[RowMatrix, Matrix] = {
val n = numCols().toInt
require(k > 0 && k <= n, s"Request up to n singular values but got k=$k and n=$n.")

object SVDMode extends Enumeration {
val DenseARPACK, DenseLAPACK, SparseARPACK = Value
val LocalARPACK, LocalLAPACK, DistARPACK = Value
}

val derivedMode = mode match {
case "auto" => if (n < 100 || k == n) {
// invoke dense implementation when n is small or k == n (since ARPACK requires k < n)
require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")
"dense"
} else {
// invoke sparse implementation with ARPACK when n is large
require(k > 0 && k < n, s"Request up to n - 1 singular values for ARPACK k=$k n=$n.")
"sparse"
}
case "dense" => "dense"
case "sparse" => "sparse"
val computeMode = mode match {
case "auto" =>
// TODO: The conditions below are not fully tested.
if (n < 100 || k > n / 2) {
// If n is small or k is large compared with n, we better compute the Gramian matrix first
// and then compute its eigenvalues locally, instead of making multiple passes.
if (k < n / 3) {
SVDMode.LocalARPACK
} else {
SVDMode.LocalLAPACK
}
} else {
// If k is small compared with n, we use ARPACK with distributed multiplication.
SVDMode.DistARPACK
}
case "local-svd" => SVDMode.LocalLAPACK
case "local-eigs" => SVDMode.LocalARPACK
case "dist-eigs" => SVDMode.DistARPACK
case _ => throw new IllegalArgumentException(s"Do not support mode $mode.")
}

val computeMode = derivedMode match {
case "dense" => if (k < n / 2) {
// when k is small, call ARPACK
require(k > 0 && k < n, s"Request up to n - 1 singular values for ARPACK k=$k n=$n.")
SVDMode.DenseARPACK
} else {
// when k is large, call LAPACK
SVDMode.DenseLAPACK
}
case "sparse" => SVDMode.SparseARPACK
}

// Compute the eigen-decomposition of A' * A.
val (sigmaSquares: BDV[Double], u: BDM[Double]) = computeMode match {
case SVDMode.DenseARPACK => {
case SVDMode.LocalARPACK =>
require(k < n, s"k must be smaller than n in local-eigs mode but got k=$k and n=$n.")
val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
def multiplyDenseGramianMatrixBy(v: DenseVector): DenseVector = {
Vectors.fromBreeze(G * v.toBreeze).asInstanceOf[DenseVector]
}
EigenValueDecomposition.symmetricEigs(multiplyDenseGramianMatrixBy, n, k, tol, maxIter)
}
case SVDMode.DenseLAPACK => {
EigenValueDecomposition.symmetricEigs(v => G * v, n, k, tol, maxIter)
case SVDMode.LocalLAPACK =>
val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = brzSvd(G)
val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G)
(sigmaSquaresFull, uFull)
}
case SVDMode.SparseARPACK => {
case SVDMode.DistARPACK =>
require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.")
EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter)
}
}

computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquares, u)
}

/**
* Determine effective rank of SVD result and compute left singular vectors if required.
*/
private def computeSVDEffectiveRank(
k: Int,
n: Int,
computeU: Boolean,
rCond: Double,
sigmaSquares: BDV[Double],
u: BDM[Double]): SingularValueDecomposition[RowMatrix, Matrix] = {
val sigmas: BDV[Double] = brzSqrt(sigmaSquares)

// Determine effective rank.
// Determine the effective rank.
val sigma0 = sigmas(0)
val threshold = rCond * sigma0
var i = 0
// sigmas might have a length smaller than k, if some Ritz values do not satisfy the
// convergence criterion specified by tol after maxIterations.
// Thus use i < min(k, sigmas.length) instead of i < k
// sigmas might have a length smaller than k, if some Ritz values do not satisfy the convergence
// criterion specified by tol after max number of iterations.
// Thus use i < min(k, sigmas.length) instead of i < k.
if (sigmas.length < k) {
logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.")
}
Expand All @@ -394,12 +378,12 @@ class RowMatrix(
logWarning(s"Requested $k singular values but only found $sk nonzeros.")
}

val s = Vectors.dense(util.Arrays.copyOfRange(sigmas.data, 0, sk))
val V = Matrices.dense(n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk))
val V = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk))

if (computeU) {
// N = Vk * Sk^{-1}
val N = new BDM[Double](n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
val N = new BDM[Double](n, sk, Arrays.copyOfRange(u.data, 0, n * sk))
var i = 0
var j = 0
while (j < sk) {
Expand Down Expand Up @@ -484,7 +468,7 @@ class RowMatrix(
if (k == n) {
Matrices.dense(n, k, u.data)
} else {
Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k))
Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k))
}
}

Expand Down
Loading

0 comments on commit 4c618e9

Please sign in to comment.