diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index 49419f7654e67..3515461b52493 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -47,7 +47,7 @@ private[mllib] object EigenValueDecomposition { * function. */ private[mllib] def symmetricEigs( - mul: DenseVector => DenseVector, + mul: BDV[Double] => BDV[Double], n: Int, k: Int, tol: Double, @@ -102,9 +102,9 @@ private[mllib] object EigenValueDecomposition { // multiply working vector with the matrix val inputOffset = ipntr(0) - 1 val outputOffset = ipntr(1) - 1 - val x = w(inputOffset until inputOffset + n) - val y = w(outputOffset until outputOffset + n) - y := BDV(mul(Vectors.fromBreeze(x).asInstanceOf[DenseVector]).toArray) + val x = w.slice(inputOffset, inputOffset + n) + val y = w.slice(outputOffset, outputOffset + n) + y := mul(x) // call ARPACK's reverse communication arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, workl, workl.length, info) @@ -143,13 +143,12 @@ private[mllib] object EigenValueDecomposition { // copy eigenvectors in descending order of eigenvalues val sortedU = BDM.zeros[Double](n, computed) - sortedEigenPairs.zipWithIndex.foreach { r => { - val b = r._2 * n - var i = 0 - while (i < n) { - sortedU.data(b + i) = r._1._2(i) - i += 1 - } + sortedEigenPairs.zipWithIndex.foreach { r => + val b = r._2 * n + var i = 0 + while (i < n) { + sortedU.data(b + i) = r._1._2(i) + i += 1 } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index cd9d28a4f976c..e003bf17178e5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.linalg.distributed -import java.util +import java.util.Arrays import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV} import breeze.linalg.{svd => brzSvd, axpy => brzAxpy} @@ -202,32 +202,23 @@ class RowMatrix( } /** - * Multiply the Gramian matrix `A^T A` by a DenseVector on the right. + * Multiplies the Gramian matrix `A^T A` by a dense vector on the right without computing `A^T A`. * - * @param v a local DenseVector whose length must match the number of columns of this matrix. - * @return a local DenseVector representing the product. + @param v a dense vector whose length must match the number of columns of this matrix + * @return a dense vector representing the product */ - private[mllib] def multiplyGramianMatrixBy(v: DenseVector): DenseVector = { + private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = { val n = numCols().toInt - val vbr = rows.context.broadcast(v.toBreeze) - - val bv = rows.aggregate(BDV.zeros[Double](n))( + val vbr = rows.context.broadcast(v) + rows.aggregate(BDV.zeros[Double](n))( seqOp = (U, r) => { val rBrz = r.toBreeze val a = rBrz.dot(vbr.value) - rBrz match { - case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U) - case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U) - case _ => - throw new UnsupportedOperationException( - s"Do not support vector operation from type ${rBrz.getClass.getName}.") - } + brzAxpy(a, rBrz, U.asInstanceOf[BV[Double]]) U }, combOp = (U1, U2) => U1 += U2 ) - - Vectors.fromBreeze(bv).asInstanceOf[DenseVector] } /** @@ -250,49 +241,51 @@ class RowMatrix( } /** - * Computes singular value decomposition of this matrix. Denote this matrix by A (m x n), this + * Computes singular value decomposition of this matrix. Denote this matrix by A (m x n). This * will compute matrices U, S, V such that A ~= U * S * V', where S contains the leading k * singular values, U and V contain the corresponding singular vectors. * - * This approach assumes n is smaller than m, and invokes a dense matrix implementation when n is - * small (n < 100) or the number of requested singular values is the same as n (k == n). For - * problems with large n (n >= 100) and k < n, this approach invokes a sparse matrix - * implementation that provides a function to ARPACK to multiply a vector with A'A. It iteratively - * calls ARPACK-dsaupd on the master node, from which we recover S and V. Then we compute U via - * easy matrix multiplication as U = A * (V * S^{-1}). - * - * The dense implementation requires `n^2` doubles to fit in memory and `O(n^3)` time on the - * master node. + * At most k largest non-zero singular values and associated vectors are returned. If there are k + * such values, then the dimensions of the return will be: + * - U is a RowMatrix of size m x k that satisfies U' * U = eye(k), + * - s is a Vector of size k, holding the singular values in descending order, + * - V is a Matrix of size n x k that satisfies V' * V = eye(k). * - * The sparse implementation requires `n * (6 * k + 4)` doubles to fit in memory on the master - * node and approximately `O(k * nnz(A))` time distributed on all worker nodes. There is no - * restriction on m (number of rows). + * We assume n is smaller than m. The singular values and the right singular vectors are derived + * from the eigenvalues and the eigenvectors of the Gramian matrix A' * A. U, the matrix + * storing the right singular vectors, is computed via matrix multiplication as + * U = A * (V * S^-1^), if requested by user. The actual method to use is determined + * automatically based on the cost: + * - If n is small (n < 100) or k is large compared with n (k > n / 2), we compute the Gramian + * matrix first and then compute its top eigenvalues and eigenvectors locally on the driver. + * This requires a single pass with O(n^2^) storage on each executor and on the driver, and + * O(n^2^ k) time on the driver. + * - Otherwise, we compute (A' * A) * v in a distributive way and send it to ARPACK's DSAUPD to + * compute (A' * A)'s top eigenvalues and eigenvectors on the driver node. This requires O(k) + * passes, O(n) storage on each executor, and O(n k) storage on the driver. * * Several internal parameters are set to default values. The reciprocal condition number rCond * is set to 1e-9. All singular values smaller than rCond * sigma(0) are treated as zeros, where * sigma(0) is the largest singular value. The maximum number of Arnoldi update iterations for - * ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK + * ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK's * eigen-decomposition is set to 1e-10. * - * At most k largest non-zero singular values and associated vectors are returned. - * If there are k such values, then the dimensions of the return will be: + * @note The conditions that decide which method to use internally and the default parameters are + * subject to change. * - * U is a RowMatrix of size m x k that satisfies U'U = eye(k), - * s is a Vector of size k, holding the singular values in descending order, - * and V is a Matrix of size n x k that satisfies V'V = eye(k). - * - * @param k number of leading singular values to keep (0 < k <= n). It might return less than - * k if there are numerically zero singular values or there are not enough Ritz values + * @param k number of leading singular values to keep (0 < k <= n). It might return less than k if + * there are numerically zero singular values or there are not enough Ritz values * converged before the maximum number of Arnoldi update iterations is reached (in case * that matrix A is ill-conditioned). - * @param computeU whether to compute U. + * @param computeU whether to compute U * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) * are treated as zero, where sigma(0) is the largest singular value. - * @return SingularValueDecomposition(U, s, V), U = null if computeU = false + * @return SingularValueDecomposition(U, s, V). U = null if computeU = false. */ - def computeSVD(k: Int, - computeU: Boolean = false, - rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { + def computeSVD( + k: Int, + computeU: Boolean = false, + rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { // maximum number of Arnoldi update iterations for invoking ARPACK val maxIter = math.max(300, k * 3) // numerical tolerance for invoking ARPACK @@ -301,87 +294,78 @@ class RowMatrix( } /** - * Actual SVD computation, visible for testing. + * The actual SVD implementation, visible for testing. + * + * @param k number of leading singular values to keep (0 < k <= n) + * @param computeU whether to compute U + * @param rCond the reciprocal condition number + * @param maxIter max number of iterations (if ARPACK is used) + * @param tol termination tolerance (if ARPACK is used) + * @param mode computation mode (auto: determine automatically which mode to use, + * local-svd: compute gram matrix and computes its full SVD locally, + * local-eigs: compute gram matrix and computes its top eigenvalues locally, + * dist-eigs: compute the top eigenvalues of the gram matrix distributively) + * @return SingularValueDecomposition(U, s, V). U = null if computeU = false. */ - private[mllib] def computeSVD(k: Int, - computeU: Boolean, - rCond: Double, - maxIter: Int, - tol: Double, - mode: String): SingularValueDecomposition[RowMatrix, Matrix] = { + private[mllib] def computeSVD( + k: Int, + computeU: Boolean, + rCond: Double, + maxIter: Int, + tol: Double, + mode: String): SingularValueDecomposition[RowMatrix, Matrix] = { val n = numCols().toInt + require(k > 0 && k <= n, s"Request up to n singular values but got k=$k and n=$n.") object SVDMode extends Enumeration { - val DenseARPACK, DenseLAPACK, SparseARPACK = Value + val LocalARPACK, LocalLAPACK, DistARPACK = Value } - val derivedMode = mode match { - case "auto" => if (n < 100 || k == n) { - // invoke dense implementation when n is small or k == n (since ARPACK requires k < n) - require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") - "dense" - } else { - // invoke sparse implementation with ARPACK when n is large - require(k > 0 && k < n, s"Request up to n - 1 singular values for ARPACK k=$k n=$n.") - "sparse" - } - case "dense" => "dense" - case "sparse" => "sparse" + val computeMode = mode match { + case "auto" => + // TODO: The conditions below are not fully tested. + if (n < 100 || k > n / 2) { + // If n is small or k is large compared with n, we better compute the Gramian matrix first + // and then compute its eigenvalues locally, instead of making multiple passes. + if (k < n / 3) { + SVDMode.LocalARPACK + } else { + SVDMode.LocalLAPACK + } + } else { + // If k is small compared with n, we use ARPACK with distributed multiplication. + SVDMode.DistARPACK + } + case "local-svd" => SVDMode.LocalLAPACK + case "local-eigs" => SVDMode.LocalARPACK + case "dist-eigs" => SVDMode.DistARPACK case _ => throw new IllegalArgumentException(s"Do not support mode $mode.") } - val computeMode = derivedMode match { - case "dense" => if (k < n / 2) { - // when k is small, call ARPACK - require(k > 0 && k < n, s"Request up to n - 1 singular values for ARPACK k=$k n=$n.") - SVDMode.DenseARPACK - } else { - // when k is large, call LAPACK - SVDMode.DenseLAPACK - } - case "sparse" => SVDMode.SparseARPACK - } - + // Compute the eigen-decomposition of A' * A. val (sigmaSquares: BDV[Double], u: BDM[Double]) = computeMode match { - case SVDMode.DenseARPACK => { + case SVDMode.LocalARPACK => + require(k < n, s"k must be smaller than n in local-eigs mode but got k=$k and n=$n.") val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] - def multiplyDenseGramianMatrixBy(v: DenseVector): DenseVector = { - Vectors.fromBreeze(G * v.toBreeze).asInstanceOf[DenseVector] - } - EigenValueDecomposition.symmetricEigs(multiplyDenseGramianMatrixBy, n, k, tol, maxIter) - } - case SVDMode.DenseLAPACK => { + EigenValueDecomposition.symmetricEigs(v => G * v, n, k, tol, maxIter) + case SVDMode.LocalLAPACK => val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] - val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], vFull: BDM[Double]) = brzSvd(G) + val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G) (sigmaSquaresFull, uFull) - } - case SVDMode.SparseARPACK => { + case SVDMode.DistARPACK => + require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.") EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter) - } } - computeSVDEffectiveRank(k, n, computeU, rCond, sigmaSquares, u) - } - - /** - * Determine effective rank of SVD result and compute left singular vectors if required. - */ - private def computeSVDEffectiveRank( - k: Int, - n: Int, - computeU: Boolean, - rCond: Double, - sigmaSquares: BDV[Double], - u: BDM[Double]): SingularValueDecomposition[RowMatrix, Matrix] = { val sigmas: BDV[Double] = brzSqrt(sigmaSquares) - // Determine effective rank. + // Determine the effective rank. val sigma0 = sigmas(0) val threshold = rCond * sigma0 var i = 0 - // sigmas might have a length smaller than k, if some Ritz values do not satisfy the - // convergence criterion specified by tol after maxIterations. - // Thus use i < min(k, sigmas.length) instead of i < k + // sigmas might have a length smaller than k, if some Ritz values do not satisfy the convergence + // criterion specified by tol after max number of iterations. + // Thus use i < min(k, sigmas.length) instead of i < k. if (sigmas.length < k) { logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.") } @@ -394,12 +378,12 @@ class RowMatrix( logWarning(s"Requested $k singular values but only found $sk nonzeros.") } - val s = Vectors.dense(util.Arrays.copyOfRange(sigmas.data, 0, sk)) - val V = Matrices.dense(n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk)) + val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk)) + val V = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk)) if (computeU) { // N = Vk * Sk^{-1} - val N = new BDM[Double](n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk)) + val N = new BDM[Double](n, sk, Arrays.copyOfRange(u.data, 0, n * sk)) var i = 0 var j = 0 while (j < sk) { @@ -484,7 +468,7 @@ class RowMatrix( if (k == n) { Matrices.dense(n, k, u.data) } else { - Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k)) + Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)) } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 3091d1f967cf5..a961f89456a18 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -95,51 +95,40 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { } test("svd of a full-rank matrix") { - for (denseSVD <- Seq(true, false)) { - for (mat <- Seq(denseMat, sparseMat)) { + for (mat <- Seq(denseMat, sparseMat)) { + for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) { val localMat = mat.toBreeze() val (localU, localSigma, localVt) = brzSvd(localMat) val localV: BDM[Double] = localVt.t.toDenseMatrix for (k <- 1 to n) { - val svd = if (k < n) { - if (denseSVD) { - mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "dense") - } else { - mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "sparse") - } - } else { - // when k = n, always use dense SVD - mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode = "dense") + val skip = (mode == "local-eigs" || mode == "dist-eigs") && k == n + if (!skip) { + val svd = mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode) + val U = svd.U + val s = svd.s + val V = svd.V + assert(U.numRows() === m) + assert(U.numCols() === k) + assert(s.size === k) + assert(V.numRows === n) + assert(V.numCols === k) + assertColumnEqualUpToSign(U.toBreeze(), localU, k) + assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) + assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) } - val U = svd.U - val s = svd.s - val V = svd.V - assert(U.numRows() === m) - assert(U.numCols() === k) - assert(s.size === k) - assert(V.numRows === n) - assert(V.numCols === k) - assertColumnEqualUpToSign(U.toBreeze(), localU, k) - assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) - assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) - } - val svdWithoutU = if (denseSVD) { - mat.computeSVD(n - 1, computeU = false, 1e-9, 300, 1e-10, mode = "dense") - } else { - mat.computeSVD(n - 1, computeU = false, 1e-9, 300, 1e-10, mode = "sparse") } + val svdWithoutU = mat.computeSVD(1, computeU = false, 1e-9, 300, 1e-10, mode) assert(svdWithoutU.U === null) } } } test("svd of a low-rank matrix") { - for (denseSVD <- Seq(true, false)) { - val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) - val mat = new RowMatrix(rows, 4, 3) - val svd = if (denseSVD) mat.computeSVD(2, computeU = true, 1e-9, 300, 1e-10, mode = "dense") - else mat.computeSVD(2, computeU = true, 1e-9, 300, 1e-10, mode = "sparse") - assert(svd.s.size === 1, "should not return zero singular values") + val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) + val mat = new RowMatrix(rows, 4, 3) + for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) { + val svd = mat.computeSVD(2, computeU = true, 1e-6, 300, 1e-10, mode) + assert(svd.s.size === 1, s"should not return zero singular values but got ${svd.s}") assert(svd.U.numRows() === 4) assert(svd.U.numCols() === 1) assert(svd.V.numRows === 3)