diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala new file mode 100644 index 0000000000000..3515461b52493 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.linalg + +import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV} +import com.github.fommil.netlib.ARPACK +import org.netlib.util.{intW, doubleW} + +import org.apache.spark.annotation.Experimental + +/** + * :: Experimental :: + * Compute eigen-decomposition. + */ +@Experimental +private[mllib] object EigenValueDecomposition { + /** + * Compute the leading k eigenvalues and eigenvectors on a symmetric square matrix using ARPACK. + * The caller needs to ensure that the input matrix is real symmetric. This function requires + * memory for `n*(4*k+4)` doubles. + * + * @param mul a function that multiplies the symmetric matrix with a DenseVector. + * @param n dimension of the square matrix (maximum Int.MaxValue). + * @param k number of leading eigenvalues required, 0 < k < n. + * @param tol tolerance of the eigs computation. + * @param maxIterations the maximum number of Arnoldi update iterations. + * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors + * (columns of the matrix). + * @note The number of computed eigenvalues might be smaller than k when some Ritz values do not + * satisfy the convergence criterion specified by tol (see ARPACK Users Guide, Chapter 4.6 + * for more details). The maximum number of Arnoldi update iterations is set to 300 in this + * function. + */ + private[mllib] def symmetricEigs( + mul: BDV[Double] => BDV[Double], + n: Int, + k: Int, + tol: Double, + maxIterations: Int): (BDV[Double], BDM[Double]) = { + // TODO: remove this function and use eigs in breeze when switching breeze version + require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n") + + val arpack = ARPACK.getInstance() + + // tolerance used in stopping criterion + val tolW = new doubleW(tol) + // number of desired eigenvalues, 0 < nev < n + val nev = new intW(k) + // nev Lanczos vectors are generated in the first iteration + // ncv-nev Lanczos vectors are generated in each subsequent iteration + // ncv must be smaller than n + val ncv = math.min(2 * k, n) + + // "I" for standard eigenvalue problem, "G" for generalized eigenvalue problem + val bmat = "I" + // "LM" : compute the NEV largest (in magnitude) eigenvalues + val which = "LM" + + var iparam = new Array[Int](11) + // use exact shift in each iteration + iparam(0) = 1 + // maximum number of Arnoldi update iterations, or the actual number of iterations on output + iparam(2) = maxIterations + // Mode 1: A*x = lambda*x, A symmetric + iparam(6) = 1 + + var ido = new intW(0) + var info = new intW(0) + var resid = new Array[Double](n) + var v = new Array[Double](n * ncv) + var workd = new Array[Double](n * 3) + var workl = new Array[Double](ncv * (ncv + 8)) + var ipntr = new Array[Int](11) + + // call ARPACK's reverse communication, first iteration with ido = 0 + arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd, + workl, workl.length, info) + + val w = BDV(workd) + + // ido = 99 : done flag in reverse communication + while (ido.`val` != 99) { + if (ido.`val` != -1 && ido.`val` != 1) { + throw new IllegalStateException("ARPACK returns ido = " + ido.`val` + + " This flag is not compatible with Mode 1: A*x = lambda*x, A symmetric.") + } + // multiply working vector with the matrix + val inputOffset = ipntr(0) - 1 + val outputOffset = ipntr(1) - 1 + val x = w.slice(inputOffset, inputOffset + n) + val y = w.slice(outputOffset, outputOffset + n) + y := mul(x) + // call ARPACK's reverse communication + arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, + workd, workl, workl.length, info) + } + + if (info.`val` != 0) { + info.`val` match { + case 1 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + + " Maximum number of iterations taken. (Refer ARPACK user guide for details)") + case 2 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + + " No shifts could be applied. Try to increase NCV. " + + "(Refer ARPACK user guide for details)") + case _ => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` + + " Please refer ARPACK user guide for error message.") + } + } + + val d = new Array[Double](nev.`val`) + val select = new Array[Boolean](ncv) + // copy the Ritz vectors + val z = java.util.Arrays.copyOfRange(v, 0, nev.`val` * n) + + // call ARPACK's post-processing for eigenvectors + arpack.dseupd(true, "A", select, d, z, n, 0.0, bmat, n, which, nev, tol, resid, ncv, v, n, + iparam, ipntr, workd, workl, workl.length, info) + + // number of computed eigenvalues, might be smaller than k + val computed = iparam(4) + + val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map { r => + (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n)) + } + + // sort the eigen-pairs in descending order + val sortedEigenPairs = eigenPairs.sortBy(- _._1) + + // copy eigenvectors in descending order of eigenvalues + val sortedU = BDM.zeros[Double](n, computed) + sortedEigenPairs.zipWithIndex.foreach { r => + val b = r._2 * n + var i = 0 + while (i < n) { + sortedU.data(b + i) = r._1._2(i) + i += 1 + } + } + + (BDV[Double](sortedEigenPairs.map(_._1)), sortedU) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 695e03b736baf..99cb6516e065c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -17,9 +17,10 @@ package org.apache.spark.mllib.linalg.distributed -import java.util +import java.util.Arrays -import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd} +import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV} +import breeze.linalg.{svd => brzSvd, axpy => brzAxpy} import breeze.numerics.{sqrt => brzSqrt} import com.github.fommil.netlib.BLAS.{getInstance => blas} @@ -34,7 +35,7 @@ import org.apache.spark.mllib.stat.MultivariateStatisticalSummary * [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]] * together with add() and merge() function. * A numerically stable algorithm is implemented to compute sample mean and variance: - *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]. + * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]. * Zero elements (including explicit zero values) are skipped when calling add() and merge(), * to have time complexity O(nnz) instead of O(n) for each column. */ @@ -200,6 +201,26 @@ class RowMatrix( nRows } + /** + * Multiplies the Gramian matrix `A^T A` by a dense vector on the right without computing `A^T A`. + * + * @param v a dense vector whose length must match the number of columns of this matrix + * @return a dense vector representing the product + */ + private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = { + val n = numCols().toInt + val vbr = rows.context.broadcast(v) + rows.aggregate(BDV.zeros[Double](n))( + seqOp = (U, r) => { + val rBrz = r.toBreeze + val a = rBrz.dot(vbr.value) + brzAxpy(a, rBrz, U.asInstanceOf[BV[Double]]) + U + }, + combOp = (U1, U2) => U1 += U2 + ) + } + /** * Computes the Gramian matrix `A^T A`. */ @@ -220,50 +241,135 @@ class RowMatrix( } /** - * Computes the singular value decomposition of this matrix. - * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'. + * Computes singular value decomposition of this matrix. Denote this matrix by A (m x n). This + * will compute matrices U, S, V such that A ~= U * S * V', where S contains the leading k + * singular values, U and V contain the corresponding singular vectors. * - * There is no restriction on m, but we require `n^2` doubles to fit in memory. - * Further, n should be less than m. - - * The decomposition is computed by first computing A'A = V S^2 V', - * computing svd locally on that (since n x n is small), from which we recover S and V. - * Then we compute U via easy matrix multiplication as U = A * (V * S^-1). - * Note that this approach requires `O(n^3)` time on the master node. + * At most k largest non-zero singular values and associated vectors are returned. If there are k + * such values, then the dimensions of the return will be: + * - U is a RowMatrix of size m x k that satisfies U' * U = eye(k), + * - s is a Vector of size k, holding the singular values in descending order, + * - V is a Matrix of size n x k that satisfies V' * V = eye(k). + * + * We assume n is smaller than m. The singular values and the right singular vectors are derived + * from the eigenvalues and the eigenvectors of the Gramian matrix A' * A. U, the matrix + * storing the right singular vectors, is computed via matrix multiplication as + * U = A * (V * S^-1^), if requested by user. The actual method to use is determined + * automatically based on the cost: + * - If n is small (n < 100) or k is large compared with n (k > n / 2), we compute the Gramian + * matrix first and then compute its top eigenvalues and eigenvectors locally on the driver. + * This requires a single pass with O(n^2^) storage on each executor and on the driver, and + * O(n^2^ k) time on the driver. + * - Otherwise, we compute (A' * A) * v in a distributive way and send it to ARPACK's DSAUPD to + * compute (A' * A)'s top eigenvalues and eigenvectors on the driver node. This requires O(k) + * passes, O(n) storage on each executor, and O(n k) storage on the driver. * - * At most k largest non-zero singular values and associated vectors are returned. - * If there are k such values, then the dimensions of the return will be: + * Several internal parameters are set to default values. The reciprocal condition number rCond + * is set to 1e-9. All singular values smaller than rCond * sigma(0) are treated as zeros, where + * sigma(0) is the largest singular value. The maximum number of Arnoldi update iterations for + * ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK's + * eigen-decomposition is set to 1e-10. * - * U is a RowMatrix of size m x k that satisfies U'U = eye(k), - * s is a Vector of size k, holding the singular values in descending order, - * and V is a Matrix of size n x k that satisfies V'V = eye(k). + * @note The conditions that decide which method to use internally and the default parameters are + * subject to change. * - * @param k number of singular values to keep. We might return less than k if there are - * numerically zero singular values. See rCond. + * @param k number of leading singular values to keep (0 < k <= n). It might return less than k if + * there are numerically zero singular values or there are not enough Ritz values + * converged before the maximum number of Arnoldi update iterations is reached (in case + * that matrix A is ill-conditioned). * @param computeU whether to compute U * @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0) * are treated as zero, where sigma(0) is the largest singular value. - * @return SingularValueDecomposition(U, s, V) + * @return SingularValueDecomposition(U, s, V). U = null if computeU = false. */ def computeSVD( k: Int, computeU: Boolean = false, rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = { + // maximum number of Arnoldi update iterations for invoking ARPACK + val maxIter = math.max(300, k * 3) + // numerical tolerance for invoking ARPACK + val tol = 1e-10 + computeSVD(k, computeU, rCond, maxIter, tol, "auto") + } + + /** + * The actual SVD implementation, visible for testing. + * + * @param k number of leading singular values to keep (0 < k <= n) + * @param computeU whether to compute U + * @param rCond the reciprocal condition number + * @param maxIter max number of iterations (if ARPACK is used) + * @param tol termination tolerance (if ARPACK is used) + * @param mode computation mode (auto: determine automatically which mode to use, + * local-svd: compute gram matrix and computes its full SVD locally, + * local-eigs: compute gram matrix and computes its top eigenvalues locally, + * dist-eigs: compute the top eigenvalues of the gram matrix distributively) + * @return SingularValueDecomposition(U, s, V). U = null if computeU = false. + */ + private[mllib] def computeSVD( + k: Int, + computeU: Boolean, + rCond: Double, + maxIter: Int, + tol: Double, + mode: String): SingularValueDecomposition[RowMatrix, Matrix] = { val n = numCols().toInt - require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.") + require(k > 0 && k <= n, s"Request up to n singular values but got k=$k and n=$n.") - val G = computeGramianMatrix() + object SVDMode extends Enumeration { + val LocalARPACK, LocalLAPACK, DistARPACK = Value + } + + val computeMode = mode match { + case "auto" => + // TODO: The conditions below are not fully tested. + if (n < 100 || k > n / 2) { + // If n is small or k is large compared with n, we better compute the Gramian matrix first + // and then compute its eigenvalues locally, instead of making multiple passes. + if (k < n / 3) { + SVDMode.LocalARPACK + } else { + SVDMode.LocalLAPACK + } + } else { + // If k is small compared with n, we use ARPACK with distributed multiplication. + SVDMode.DistARPACK + } + case "local-svd" => SVDMode.LocalLAPACK + case "local-eigs" => SVDMode.LocalARPACK + case "dist-eigs" => SVDMode.DistARPACK + case _ => throw new IllegalArgumentException(s"Do not support mode $mode.") + } + + // Compute the eigen-decomposition of A' * A. + val (sigmaSquares: BDV[Double], u: BDM[Double]) = computeMode match { + case SVDMode.LocalARPACK => + require(k < n, s"k must be smaller than n in local-eigs mode but got k=$k and n=$n.") + val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] + EigenValueDecomposition.symmetricEigs(v => G * v, n, k, tol, maxIter) + case SVDMode.LocalLAPACK => + val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] + val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G) + (sigmaSquaresFull, uFull) + case SVDMode.DistARPACK => + require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.") + EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter) + } - // TODO: Use sparse SVD instead. - val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) = - brzSvd(G.toBreeze.asInstanceOf[BDM[Double]]) val sigmas: BDV[Double] = brzSqrt(sigmaSquares) - // Determine effective rank. + // Determine the effective rank. val sigma0 = sigmas(0) val threshold = rCond * sigma0 var i = 0 - while (i < k && sigmas(i) >= threshold) { + // sigmas might have a length smaller than k, if some Ritz values do not satisfy the convergence + // criterion specified by tol after max number of iterations. + // Thus use i < min(k, sigmas.length) instead of i < k. + if (sigmas.length < k) { + logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.") + } + while (i < math.min(k, sigmas.length) && sigmas(i) >= threshold) { i += 1 } val sk = i @@ -272,12 +378,12 @@ class RowMatrix( logWarning(s"Requested $k singular values but only found $sk nonzeros.") } - val s = Vectors.dense(util.Arrays.copyOfRange(sigmas.data, 0, sk)) - val V = Matrices.dense(n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk)) + val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk)) + val V = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk)) if (computeU) { // N = Vk * Sk^{-1} - val N = new BDM[Double](n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk)) + val N = new BDM[Double](n, sk, Arrays.copyOfRange(u.data, 0, n * sk)) var i = 0 var j = 0 while (j < sk) { @@ -364,7 +470,7 @@ class RowMatrix( if (k == n) { Matrices.dense(n, k, u.data) } else { - Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k)) + Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k)) } } @@ -390,15 +496,24 @@ class RowMatrix( */ def multiply(B: Matrix): RowMatrix = { val n = numCols().toInt + val k = B.numCols require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}") require(B.isInstanceOf[DenseMatrix], s"Only support dense matrix at this time but found ${B.getClass.getName}.") - val Bb = rows.context.broadcast(B) + val Bb = rows.context.broadcast(B.toBreeze.asInstanceOf[BDM[Double]].toDenseVector.toArray) val AB = rows.mapPartitions({ iter => - val Bi = Bb.value.toBreeze.asInstanceOf[BDM[Double]] - iter.map(v => Vectors.fromBreeze(Bi.t * v.toBreeze)) + val Bi = Bb.value + iter.map(row => { + val v = BDV.zeros[Double](k) + var i = 0 + while (i < k) { + v(i) = row.toBreeze.dot(new BDV(Bi, i * n, 1, n)) + i += 1 + } + Vectors.fromBreeze(v) + }) }, preservesPartitioning = true) new RowMatrix(AB, nRows, B.numCols) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index c9f9acf4c1335..a961f89456a18 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -96,37 +96,44 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { test("svd of a full-rank matrix") { for (mat <- Seq(denseMat, sparseMat)) { - val localMat = mat.toBreeze() - val (localU, localSigma, localVt) = brzSvd(localMat) - val localV: BDM[Double] = localVt.t.toDenseMatrix - for (k <- 1 to n) { - val svd = mat.computeSVD(k, computeU = true) - val U = svd.U - val s = svd.s - val V = svd.V - assert(U.numRows() === m) - assert(U.numCols() === k) - assert(s.size === k) - assert(V.numRows === n) - assert(V.numCols === k) - assertColumnEqualUpToSign(U.toBreeze(), localU, k) - assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) - assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) + for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) { + val localMat = mat.toBreeze() + val (localU, localSigma, localVt) = brzSvd(localMat) + val localV: BDM[Double] = localVt.t.toDenseMatrix + for (k <- 1 to n) { + val skip = (mode == "local-eigs" || mode == "dist-eigs") && k == n + if (!skip) { + val svd = mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode) + val U = svd.U + val s = svd.s + val V = svd.V + assert(U.numRows() === m) + assert(U.numCols() === k) + assert(s.size === k) + assert(V.numRows === n) + assert(V.numCols === k) + assertColumnEqualUpToSign(U.toBreeze(), localU, k) + assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k) + assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k))) + } + } + val svdWithoutU = mat.computeSVD(1, computeU = false, 1e-9, 300, 1e-10, mode) + assert(svdWithoutU.U === null) } - val svdWithoutU = mat.computeSVD(n) - assert(svdWithoutU.U === null) } } test("svd of a low-rank matrix") { - val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2) - val mat = new RowMatrix(rows, 4, 2) - val svd = mat.computeSVD(2, computeU = true) - assert(svd.s.size === 1, "should not return zero singular values") - assert(svd.U.numRows() === 4) - assert(svd.U.numCols() === 1) - assert(svd.V.numRows === 2) - assert(svd.V.numCols === 1) + val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2) + val mat = new RowMatrix(rows, 4, 3) + for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) { + val svd = mat.computeSVD(2, computeU = true, 1e-6, 300, 1e-10, mode) + assert(svd.s.size === 1, s"should not return zero singular values but got ${svd.s}") + assert(svd.U.numRows() === 4) + assert(svd.U.numCols() === 1) + assert(svd.V.numRows === 3) + assert(svd.V.numCols === 1) + } } def closeToZero(G: BDM[Double]): Boolean = {