Skip to content

Commit

Permalink
housekeeping
Browse files Browse the repository at this point in the history
  • Loading branch information
rezazadeh committed Mar 20, 2014
1 parent 156ff78 commit 120f796
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 164 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ object SparkSVD {
val n = args(3).toInt

// recover largest singular vector
val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), 1)
val decomposed = new SVD().setK(1).compute(SparseMatrix(data, m, n))
val u = decomposed.U.data
val s = decomposed.S.data
val v = decomposed.V.data
Expand Down
308 changes: 148 additions & 160 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.rdd.RDD

import org.jblas.{DoubleMatrix, Singular, MatrixFunctions}


/**
* Class used to obtain singular value decompositions
*/
Expand Down Expand Up @@ -56,18 +55,11 @@ class SVD {
this
}

/**
* Compute SVD using the current set parameters
*/
def compute(matrix: SparseMatrix) : MatrixSVD = {
SVD.sparseSVD(matrix, k)
}

/**
* Compute SVD using the current set parameters
*/
def compute(matrix: TallSkinnyDenseMatrix) : TallSkinnyMatrixSVD = {
SVD.denseSVD(matrix, k, computeU, smallestSigma)
denseSVD(matrix)
}

/**
Expand All @@ -80,117 +72,18 @@ class SVD {
*/
def compute(matrix: RDD[Array[Double]]) :
(RDD[Array[Double]], Array[Double], Array[Array[Double]]) = {
SVD.denseSVD(matrix, k, computeU, smallestSigma)
denseSVD(matrix)
}
}


/**
* Top-level methods for calling Singular Value Decomposition
* NOTE: All matrices are 0-indexed
*/
object SVD {
/**
* Singular Value Decomposition for Tall and Skinny matrices.
* Given an m x n matrix A, this will compute matrices U, S, V such that
* A = U * S * V'
*
* There is no restriction on m, but we require n^2 doubles to fit in memory.
* Further, n should be less than m.
*
* The decomposition is computed by first computing A'A = V S^2 V',
* computing svd locally on that (since n x n is small),
* from which we recover S and V.
* Then we compute U via easy matrix multiplication
* as U = A * V * S^-1
*
* Only the k largest singular values and associated vectors are found.
* If there are k such values, then the dimensions of the return will be:
*
* S is k x k and diagonal, holding the singular values on diagonal
* U is m x k and satisfies U'U = eye(k)
* V is n x k and satisfies V'V = eye(k)
*
* All input and output is expected in sparse matrix format, 0-indexed
* as tuples of the form ((i,j),value) all in RDDs using the
* SparseMatrix class
*
* @param matrix sparse matrix to factorize
* @param k Recover k singular values and vectors
* @param computeU gives the option of skipping the U computation
* @return Three sparse matrices: U, S, V such that A = USV^T
*/
def sparseSVD(matrix: SparseMatrix, k: Int, computeU: Boolean): MatrixSVD = {
val data = matrix.data
val m = matrix.m
val n = matrix.n

if (m < n || m <= 0 || n <= 0) {
throw new IllegalArgumentException("Expecting a tall and skinny matrix")
}

if (k < 1 || k > n) {
throw new IllegalArgumentException("Must request up to n singular values")
}

// Compute A^T A, assuming rows are sparse enough to fit in memory
val rows = data.map(entry =>
(entry.i, (entry.j, entry.mval))).groupByKey()
val emits = rows.flatMap{ case (rowind, cols) =>
cols.flatMap{ case (colind1, mval1) =>
cols.map { case (colind2, mval2) =>
((colind1, colind2), mval1*mval2) } }
}.reduceByKey(_ + _)

// Construct jblas A^T A locally
val ata = DoubleMatrix.zeros(n, n)
for (entry <- emits.collect()) {
ata.put(entry._1._1, entry._1._2, entry._2)
}

// Since A^T A is small, we can compute its SVD directly
val svd = Singular.sparseSVD(ata)
val V = svd(0)
val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9)

if (sigmas.size < k) {
throw new Exception("Not enough singular values to return k=" + k + " s=" + sigmas.size)
}

val sigma = sigmas.take(k)

val sc = data.sparkContext

// prepare V for returning
val retVdata = sc.makeRDD(
Array.tabulate(V.rows, sigma.length){ (i,j) =>
MatrixEntry(i, j, V.get(i,j)) }.flatten)
val retV = SparseMatrix(retVdata, V.rows, sigma.length)

val retSdata = sc.makeRDD(Array.tabulate(sigma.length){
x => MatrixEntry(x, x, sigma(x))})

val retS = SparseMatrix(retSdata, sigma.length, sigma.length)

// Compute U as U = A V S^-1
// turn V S^-1 into an RDD as a sparse matrix
val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
{ (i,j) => ((i, j), V.get(i,j) / sigma(j)) }.flatten)

if (computeU) {
// Multiply A by VS^-1
val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
val retUdata = aCols.join(bRows).map {
case (key, ( (rowInd, rowVal), (colInd, colVal)) ) =>
((rowInd, colInd), rowVal * colVal)
}.reduceByKey(_ + _).map{ case ((row, col), mval) => MatrixEntry(row, col, mval)}

val retU = SparseMatrix(retUdata, m, sigma.length)
MatrixSVD(retU, retS, retV)
} else {
MatrixSVD(null, retS, retV)
}
/**
* Compute SVD with default parameter for computeU = true.
* See full paramter definition of sparseSVD for more description.
*
* @param matrix sparse matrix to factorize
* @return Three sparse matrices: U, S, V such that A = USV^T
*/
def compute(matrix: SparseMatrix): MatrixSVD = {
sparseSVD(matrix)
}

/**
Expand All @@ -217,40 +110,39 @@ object SVD {
* @param matrix dense matrix to factorize
* @param k Recover k singular values and vectors
* @param computeU gives the option of skipping the U computation
* @param rcond smallest singular value considered nonzero
* @param smallestSigma smallest singular value considered nonzero
* @return Three dense matrices: U, S, V such that A = USV^T
*/
private def denseSVD(matrix: TallSkinnyDenseMatrix, k: Int,
computeU: Boolean, rcond: Double): TallSkinnyMatrixSVD = {
val rows = matrix.rows
val m = matrix.m
val n = matrix.n
val sc = matrix.rows.sparkContext

if (m < n || m <= 0 || n <= 0) {
throw new IllegalArgumentException("Expecting a tall and skinny matrix m=$m n=$n")
}
private def denseSVD(matrix: TallSkinnyDenseMatrix): TallSkinnyMatrixSVD = {
val rows = matrix.rows
val m = matrix.m
val n = matrix.n
val sc = matrix.rows.sparkContext

if (m < n || m <= 0 || n <= 0) {
throw new IllegalArgumentException("Expecting a tall and skinny matrix m=$m n=$n")
}

if (k < 1 || k > n) {
throw new IllegalArgumentException("Request up to n singular values n=$n k=$k")
}
if (k < 1 || k > n) {
throw new IllegalArgumentException("Request up to n singular values n=$n k=$k")
}

val rowIndices = matrix.rows.map(_.i)
val rowIndices = matrix.rows.map(_.i)

// compute SVD
val (u, sigma, v) = denseSVD(matrix.rows.map(_.data), k, computeU, rcond)
// compute SVD
val (u, sigma, v) = denseSVD(matrix.rows.map(_.data))

if(computeU) {
// prep u for returning
val retU = TallSkinnyDenseMatrix(
u.zip(rowIndices).map {case (row, i) => MatrixRow(i, row) },
m,
k)
if(computeU) {
// prep u for returning
val retU = TallSkinnyDenseMatrix(
u.zip(rowIndices).map {case (row, i) => MatrixRow(i, row) },
m,
k)

TallSkinnyMatrixSVD(retU, sigma, v)
} else {
TallSkinnyMatrixSVD(null, sigma, v)
}
TallSkinnyMatrixSVD(retU, sigma, v)
} else {
TallSkinnyMatrixSVD(null, sigma, v)
}
}

/**
Expand Down Expand Up @@ -281,8 +173,7 @@ object SVD {
* @param k Recover k singular values and vectors
* @return Three matrices: U, S, V such that A = USV^T
*/
private def denseSVD(matrix: RDD[Array[Double]], k: Int,
computeU: Boolean, rcond: Double) :
private def denseSVD(matrix: RDD[Array[Double]]) :
(RDD[Array[Double]], Array[Double], Array[Array[Double]]) = {
val n = matrix.first.size

Expand Down Expand Up @@ -326,7 +217,7 @@ object SVD {
// Since A^T A is small, we can compute its SVD directly
val svd = Singular.sparseSVD(ata)
val V = svd(0)
val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > rcond)
val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > smallestSigma)

val sk = Math.min(k, sigmas.size)
val sigma = sigmas.take(sk)
Expand All @@ -348,18 +239,115 @@ object SVD {
}
}

/**
* Compute SVD with default parameter for computeU = true.
* See full paramter definition of sparseSVD for more description.
*
* @param matrix sparse matrix to factorize
* @param k Recover k singular values and vectors
* @return Three sparse matrices: U, S, V such that A = USV^T
*/
def sparseSVD(matrix: SparseMatrix, k: Int): MatrixSVD = {
sparseSVD(matrix, k, true)
}
/**
* Singular Value Decomposition for Tall and Skinny sparse matrices.
* Given an m x n matrix A, this will compute matrices U, S, V such that
* A = U * S * V'
*
* There is no restriction on m, but we require n^2 doubles to fit in memory.
* Further, n should be less than m.
*
* The decomposition is computed by first computing A'A = V S^2 V',
* computing svd locally on that (since n x n is small),
* from which we recover S and V.
* Then we compute U via easy matrix multiplication
* as U = A * V * S^-1
*
* Only the k largest singular values and associated vectors are found.
* If there are k such values, then the dimensions of the return will be:
*
* S is k x k and diagonal, holding the singular values on diagonal
* U is m x k and satisfies U'U = eye(k)
* V is n x k and satisfies V'V = eye(k)
*
* All input and output is expected in sparse matrix format, 0-indexed
* as tuples of the form ((i,j),value) all in RDDs using the
* SparseMatrix class
*
* @param matrix sparse matrix to factorize
* @param k Recover k singular values and vectors
* @param computeU gives the option of skipping the U computation
* @return Three sparse matrices: U, S, V such that A = USV^T
*/
private def sparseSVD(matrix: SparseMatrix): MatrixSVD = {
val data = matrix.data
val m = matrix.m
val n = matrix.n

if (m < n || m <= 0 || n <= 0) {
throw new IllegalArgumentException("Expecting a tall and skinny matrix")
}

if (k < 1 || k > n) {
throw new IllegalArgumentException("Must request up to n singular values")
}

// Compute A^T A, assuming rows are sparse enough to fit in memory
val rows = data.map(entry =>
(entry.i, (entry.j, entry.mval))).groupByKey()
val emits = rows.flatMap{ case (rowind, cols) =>
cols.flatMap{ case (colind1, mval1) =>
cols.map { case (colind2, mval2) =>
((colind1, colind2), mval1*mval2) } }
}.reduceByKey(_ + _)

// Construct jblas A^T A locally
val ata = DoubleMatrix.zeros(n, n)
for (entry <- emits.collect()) {
ata.put(entry._1._1, entry._1._2, entry._2)
}

// Since A^T A is small, we can compute its SVD directly
val svd = Singular.sparseSVD(ata)
val V = svd(0)
val sigmas = MatrixFunctions.sqrt(svd(1)).toArray.filter(x => x > 1e-9)

if (sigmas.size < k) {
throw new Exception("Not enough singular values to return k=" + k + " s=" + sigmas.size)
}

val sigma = sigmas.take(k)

val sc = data.sparkContext

// prepare V for returning
val retVdata = sc.makeRDD(
Array.tabulate(V.rows, sigma.length){ (i,j) =>
MatrixEntry(i, j, V.get(i,j)) }.flatten)
val retV = SparseMatrix(retVdata, V.rows, sigma.length)

val retSdata = sc.makeRDD(Array.tabulate(sigma.length){
x => MatrixEntry(x, x, sigma(x))})

val retS = SparseMatrix(retSdata, sigma.length, sigma.length)

// Compute U as U = A V S^-1
// turn V S^-1 into an RDD as a sparse matrix
val vsirdd = sc.makeRDD(Array.tabulate(V.rows, sigma.length)
{ (i,j) => ((i, j), V.get(i,j) / sigma(j)) }.flatten)

if (computeU) {
// Multiply A by VS^-1
val aCols = data.map(entry => (entry.j, (entry.i, entry.mval)))
val bRows = vsirdd.map(entry => (entry._1._1, (entry._1._2, entry._2)))
val retUdata = aCols.join(bRows).map {
case (key, ( (rowInd, rowVal), (colInd, colVal)) ) =>
((rowInd, colInd), rowVal * colVal)
}.reduceByKey(_ + _).map{ case ((row, col), mval) => MatrixEntry(row, col, mval)}

val retU = SparseMatrix(retUdata, m, sigma.length)
MatrixSVD(retU, retS, retV)
} else {
MatrixSVD(null, retS, retV)
}
}
}

/**
* Top-level methods for calling sparse Singular Value Decomposition
* NOTE: All matrices are 0-indexed
*/
object SVD {
def main(args: Array[String]) {
if (args.length < 8) {
println("Usage: SVD <master> <matrix_file> <m> <n> " +
Expand All @@ -379,7 +367,7 @@ object SVD {
MatrixEntry(parts(0).toInt, parts(1).toInt, parts(2).toDouble)
}

val decomposed = SVD.sparseSVD(SparseMatrix(data, m, n), k)
val decomposed = new SVD().setK(k).compute(SparseMatrix(data, m, n))
val u = decomposed.U.data
val s = decomposed.S.data
val v = decomposed.V.data
Expand Down
Loading

0 comments on commit 120f796

Please sign in to comment.