Skip to content

Commit

Permalink
provide RDD[Array[Double]] support
Browse files Browse the repository at this point in the history
  • Loading branch information
rezazadeh committed Mar 20, 2014
1 parent 398d123 commit cd290fa
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 23 deletions.
80 changes: 58 additions & 22 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,6 @@ object SVD {
}
}


/**
* Singular Value Decomposition for Tall and Skinny matrices.
* Given an m x n matrix A, this will compute matrices U, S, V such that
Expand Down Expand Up @@ -213,8 +212,52 @@ object SVD {
throw new IllegalArgumentException("Must request up to n singular values")
}

val (u, s, v) = denseSVD(matrix.rows.map(_.data), k)
val retU = DenseMatrix(u.zipWithIndex.map{ case (row, i) => MatrixRow(i.toInt, row) }, m, k)
val retS = DenseMatrix(s.zipWithIndex.map{ case (row, i) => MatrixRow(i.toInt, row) }, k, k)
val retV = DenseMatrix(v.zipWithIndex.map{ case (row, i) => MatrixRow(i.toInt, row) }, n, k)
if(computeU) {
DenseMatrixSVD(retU, retS, retV)
} else {
DenseMatrixSVD(null, retS, retV)
}
}

/**
* Singular Value Decomposition for Tall and Skinny matrices.
* Given an m x n matrix A, this will compute matrices U, S, V such that
* A = U * S * V'
*
* There is no restriction on m, but we require n^2 doubles to fit in memory.
* Further, n should be less than m.
*
* The decomposition is computed by first computing A'A = V S^2 V',
* computing svd locally on that (since n x n is small),
* from which we recover S and V.
* Then we compute U via easy matrix multiplication
* as U = A * V * S^-1
*
* Only the k largest singular values and associated vectors are found.
* If there are k such values, then the dimensions of the return will be:
*
* S is k x k and diagonal, holding the singular values on diagonal
* U is m x k and satisfies U'U = eye(k)
* V is n x k and satisfies V'V = eye(k)
*
* @param matrix dense matrix to factorize
* @param k Recover k singular values and vectors
* @return Three sparse matrices: U, S, V such that A = USV^T
*/
def denseSVD(matrix: RDD[Array[Double]], k: Int) :
(RDD[Array[Double]], RDD[Array[Double]], RDD[Array[Double]]) = {
val n = matrix.first.size

if (k < 1 || k > n) {
throw new IllegalArgumentException("Must request up to n singular values")
}

// Compute A^T A
val fullata = matrix.rows.map(x => x.data).map{
val fullata = matrix.map{
row =>
val miniata = Array.ofDim[Double](n, n)
for(i <- 0 until n) for(j <- 0 until n) {
Expand Down Expand Up @@ -243,37 +286,30 @@ object SVD {

val sigma = sigmas.take(k)

val sc = matrix.rows.sparkContext
val sc = matrix.sparkContext

// prepare V for returning
val retVrows = sc.makeRDD(Array.tabulate(n)(i => MatrixRow(i, V.getRow(i).toArray.take(k))))
val retV = DenseMatrix(retVrows, n, k)
val retV = sc.makeRDD(Array.tabulate(n)(i => V.getRow(i).toArray.take(k)))

// prepare S for returning
val sparseS = DoubleMatrix.diag(new DoubleMatrix(sigmas))
val retSrows = sc.makeRDD(Array.tabulate(k)(i => MatrixRow(i, sparseS.getRow(i).toArray)))
val retS = DenseMatrix(retSrows, k, k)
val retS = sc.makeRDD(Array.tabulate(k)(i => sparseS.getRow(i).toArray))

// Compute U as U = A V S^-1
if (computeU) {
// Compute VS^-1
val vsinv = sc.broadcast(Array.tabulate(n, k)((i, j) => V.get(i, j) / sigma(j))).value
// Compute VS^-1
val vsinv = sc.broadcast(Array.tabulate(n, k)((i, j) => V.get(i, j) / sigma(j))).value

val uRows = matrix.rows.map{x =>
val row = Array.ofDim[Double](k)
for(j <- 0 until k) {
for(i <- 0 until n) {
row(j) += vsinv(i)(j) * x.data(i)
}
val retU = matrix.map{x =>
val row = Array.ofDim[Double](k)
for(j <- 0 until k) {
for(i <- 0 until n) {
row(j) += vsinv(i)(j) * x(i)
}
MatrixRow(x.i, row)
}

val retU = DenseMatrix(uRows, m, k)
DenseMatrixSVD(retU, retS, retV)
} else {
DenseMatrixSVD(null, retS, retV)
row
}

(retU, retS, retV)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
val retu = getDenseMatrix(u)
val rets = getDenseMatrix(s)
val retv = getDenseMatrix(v)



// check individual decomposition
assertMatrixEquals(retu, svd(0))
assertMatrixEquals(rets, DoubleMatrix.diag(svd(1)))
Expand Down Expand Up @@ -115,6 +116,7 @@ class SVDSuite extends FunSuite with BeforeAndAfterAll {
val rets = getDenseMatrix(s)
val retv = getDenseMatrix(v)


// check individual decomposition
assertMatrixEquals(retu, svd(0))
assertMatrixEquals(rets, DoubleMatrix.diag(svd(1)))
Expand Down

0 comments on commit cd290fa

Please sign in to comment.