Skip to content

Commit

Permalink
Array interface for dense svd and pca
Browse files Browse the repository at this point in the history
  • Loading branch information
rezazadeh committed Mar 20, 2014
1 parent cd290fa commit 2d7ccde
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 26 deletions.
48 changes: 36 additions & 12 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PCA {
computePCA(matrix, k)
}

/**
/**
* Principal Component Analysis.
* Computes the top k principal component coefficients for the m-by-n data matrix X.
* Rows of X correspond to observations and columns correspond to variables.
Expand All @@ -70,26 +70,50 @@ class PCA {
throw new IllegalArgumentException("Expecting a well-formed matrix")
}

val v = computePCA(matrix.rows.map(_.data), k)
val retV = DenseMatrix(sc.makeRDD(Array.tabulate(n)(i => MatrixRow(i, v(i)))), n, k)
retV
}


/**
* Principal Component Analysis.
* Computes the top k principal component coefficients for the m-by-n data matrix X.
* Rows of X correspond to observations and columns correspond to variables.
* The coefficient matrix is n-by-k. Each column of coeff contains coefficients
* for one principal component, and the columns are in descending
* order of component variance.
* This function centers the data and uses the
* singular value decomposition (SVD) algorithm.
*
* @param matrix dense matrix to perform pca on
* @param k Recover k principal components
* @return An nxk matrix of principal components
*/
def computePCA(matrix: RDD[Array[Double]], k: Int): Array[Array[Double]] = {
val n = matrix.first.size
val sc = matrix.sparkContext
val m = matrix.count

// compute column sums and normalize matrix
val colSums = sc.broadcast(matrix.rows.map(x => x.data).fold(new Array[Double](n)){
val colSums = sc.broadcast(matrix.fold(new Array[Double](n)){
(a, b) => for(i <- 0 until n) {
a(i) += b(i)
}
a
}).value

val data = matrix.rows.map{
x => for(i <- 0 until n) {
x.data(i) = (x.data(i) - colSums(i) / m) / Math.sqrt(n - 1)
}
x
val data = matrix.map{
x =>
val row = Array.ofDim[Double](n)
for(i <- 0 until n) {
row(i) = (x(i) - colSums(i) / m) / Math.sqrt(n - 1)
}
row
}

val normalizedMatrix = DenseMatrix(data, m, n)

val retV = new SVD().setK(k).setComputeU(false)
.compute(normalizedMatrix).V
retV
val (u, s, v) = SVD.denseSVD(data, k)
v
}
}

Expand Down
37 changes: 23 additions & 14 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,12 +197,13 @@ object SVD {
* @param matrix dense matrix to factorize
* @param k Recover k singular values and vectors
* @param computeU gives the option of skipping the U computation
* @return Three sparse matrices: U, S, V such that A = USV^T
* @return Three dense matrices: U, S, V such that A = USV^T
*/
def denseSVD(matrix: DenseMatrix, k: Int, computeU: Boolean): DenseMatrixSVD = {
val rows = matrix.rows
val m = matrix.m
val n = matrix.n
val sc = matrix.rows.sparkContext

if (m < n || m <= 0 || n <= 0) {
throw new IllegalArgumentException("Expecting a tall and skinny matrix")
Expand All @@ -212,10 +213,22 @@ object SVD {
throw new IllegalArgumentException("Must request up to n singular values")
}

val (u, s, v) = denseSVD(matrix.rows.map(_.data), k)
val retU = DenseMatrix(u.zipWithIndex.map{ case (row, i) => MatrixRow(i.toInt, row) }, m, k)
val retS = DenseMatrix(s.zipWithIndex.map{ case (row, i) => MatrixRow(i.toInt, row) }, k, k)
val retV = DenseMatrix(v.zipWithIndex.map{ case (row, i) => MatrixRow(i.toInt, row) }, n, k)
val rowIndices = matrix.rows.map(_.i)

// compute SVD
val (u, sigma, v) = denseSVD(matrix.rows.map(_.data), k)

// prep u for returning
val retU = DenseMatrix(u.zip(rowIndices).map{ case (row, i) => MatrixRow(i, row) }, m, k)

// prepare S for returning
val sparseS = DoubleMatrix.diag(new DoubleMatrix(sigma))
val retS = DenseMatrix(sc.makeRDD(Array.tabulate(k)(
i => MatrixRow(i, sparseS.getRow(i).toArray))), k, k)

// prepare V for returning
val retV = DenseMatrix(sc.makeRDD(Array.tabulate(n)(i => MatrixRow(i, v(i)))), n, k)

if(computeU) {
DenseMatrixSVD(retU, retS, retV)
} else {
Expand Down Expand Up @@ -246,10 +259,10 @@ object SVD {
*
* @param matrix dense matrix to factorize
* @param k Recover k singular values and vectors
* @return Three sparse matrices: U, S, V such that A = USV^T
* @return Three matrices: U, S, V such that A = USV^T
*/
def denseSVD(matrix: RDD[Array[Double]], k: Int) :
(RDD[Array[Double]], RDD[Array[Double]], RDD[Array[Double]]) = {
(RDD[Array[Double]], Array[Double], Array[Array[Double]]) = {
val n = matrix.first.size

if (k < 1 || k > n) {
Expand Down Expand Up @@ -289,11 +302,7 @@ object SVD {
val sc = matrix.sparkContext

// prepare V for returning
val retV = sc.makeRDD(Array.tabulate(n)(i => V.getRow(i).toArray.take(k)))

// prepare S for returning
val sparseS = DoubleMatrix.diag(new DoubleMatrix(sigmas))
val retS = sc.makeRDD(Array.tabulate(k)(i => sparseS.getRow(i).toArray))
val retV = Array.tabulate(n, k)((i, j) => V.get(i, j))

// Compute U as U = A V S^-1
// Compute VS^-1
Expand All @@ -308,8 +317,8 @@ object SVD {
}
row
}
(retU, retS, retV)

(retU, sigma, retV)
}

/**
Expand Down

0 comments on commit 2d7ccde

Please sign in to comment.