Skip to content

Commit

Permalink
private, accumulator
Browse files Browse the repository at this point in the history
  • Loading branch information
rezazadeh committed Mar 20, 2014
1 parent 17002be commit 4195e69
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 18 deletions.
31 changes: 17 additions & 14 deletions mllib/src/main/scala/org/apache/spark/mllib/linalg/PCA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ class PCA {
}

/**
* Compute PCA using the current set parameters
*/
* Compute PCA using the current set parameters
*/
def compute(matrix: TallSkinnyDenseMatrix): Array[Array[Double]] = {
computePCA(matrix, k)
}

/**
* Compute PCA using the current set parameters
*/
* Compute PCA using the parameters currently set
* See computePCA() for more details
*/
def compute(matrix: RDD[Array[Double]]): Array[Array[Double]] = {
computePCA(matrix, k)
}
Expand All @@ -68,19 +69,19 @@ class PCA {
* @param k Recover k principal components
* @return An nxk matrix with principal components in columns
*/
def computePCA(matrix: TallSkinnyDenseMatrix, k: Int): Array[Array[Double]] = {
private def computePCA(matrix: TallSkinnyDenseMatrix, k: Int): Array[Array[Double]] = {
val m = matrix.m
val n = matrix.n
val sc = matrix.rows.sparkContext

if (m <= 0 || n <= 0) {
throw new IllegalArgumentException("Expecting a well-formed matrix")
throw new IllegalArgumentException(
"Expecting a well-formed matrix: m=" + m + " n=" + n)
}

computePCA(matrix.rows.map(_.data), k)
}


/**
* Principal Component Analysis.
* Computes the top k principal component coefficients for the m-by-n data matrix X.
Expand All @@ -95,20 +96,24 @@ class PCA {
* @param k Recover k principal components
* @return An nxk matrix of principal components
*/
def computePCA(matrix: RDD[Array[Double]], k: Int): Array[Array[Double]] = {
private def computePCA(matrix: RDD[Array[Double]], k: Int): Array[Array[Double]] = {
val n = matrix.first.size
val sc = matrix.sparkContext
val m = matrix.count
val m = sc.accumulator(-1)

// compute column sums and normalize matrix
val colSums = sc.broadcast(matrix.fold(Array.ofDim[Double](n)){
val colSumsTemp = matrix.fold(Array.ofDim[Double](n)){
(a, b) =>
val am = new DoubleMatrix(a)
val bm = new DoubleMatrix(b)
am.addi(bm)
m += 1
a
}.map(x => x / m)).value

}

val normalizedColSums = colSumsTemp.map(x => x / m.value)
val colSums = sc.broadcast(normalizedColSums).value

val data = matrix.map{
x =>
val row = Array.ofDim[Double](n)
Expand All @@ -123,7 +128,6 @@ class PCA {
}
}


/**
* Top-level methods for calling Principal Component Analysis
* NOTE: All matrices are TallSkinnyDenseMatrix format
Expand Down Expand Up @@ -155,4 +159,3 @@ object PCA {
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class PCASuite extends FunSuite with BeforeAndAfterAll {
val m = matrix.m
val n = matrix.n
val ret = DoubleMatrix.zeros(m, n)
matrix.data.toArray.map(x => ret.put(x.i, x.j, x.mval))
matrix.data.collect.map(x => ret.put(x.i, x.j, x.mval))
ret
}

Expand All @@ -78,7 +78,7 @@ class PCASuite extends FunSuite with BeforeAndAfterAll {
(2,0,0.9553), (2,1,-0.0649), (2,2,0.2886))
val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)))

val coeffs = new DoubleMatrix(new PCA().computePCA(a, n))
val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a))

assertMatrixEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs)
}
Expand All @@ -95,7 +95,7 @@ class PCASuite extends FunSuite with BeforeAndAfterAll {
(2,0,0.9553), (2,1,-0.0649), (2,2,0.2886))
val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)))

val coeffs = new DoubleMatrix(new PCA().computePCA(a, n))
val coeffs = new DoubleMatrix(new PCA().setK(n).compute(a))

assertMatrixEquals(getDenseMatrix(SparseMatrix(realPCA,n,n)), coeffs)
}
Expand All @@ -113,7 +113,7 @@ class PCASuite extends FunSuite with BeforeAndAfterAll {
val realPCA = sc.makeRDD(realPCAArray.map(x => MatrixEntry(x._1, x._2, x._3)))

val k = 2
val coeffs = new DoubleMatrix(new PCA().computePCA(a, k))
val coeffs = new DoubleMatrix(new PCA().setK(k).compute(a))

assertMatrixEquals(getDenseMatrix(SparseMatrix(realPCA,n,k)), coeffs)
}
Expand Down

0 comments on commit 4195e69

Please sign in to comment.