diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md index 600f5de154dad..d73731c80307d 100644 --- a/docs/mllib-guide.md +++ b/docs/mllib-guide.md @@ -65,6 +65,7 @@ We list major functionality from both below, with links to detailed guides. * [PMML model export](mllib-pmml-model-export.html) * [Matrix Factorization](mllib-matrix-factorization.html) * [LU Factorization](mllib-matrix-factorization.html#lu-factorization) + * [Solving a Linear System of Equations](mllib-matrix-factorization.html#solving-a-linear-system-of-equations) MLlib is under active development. The APIs marked `Experimental`/`DeveloperApi` may change in future releases, diff --git a/docs/mllib-matrix-factorization.md b/docs/mllib-matrix-factorization.md index de26270d799b8..979ebe280bad2 100644 --- a/docs/mllib-matrix-factorization.md +++ b/docs/mllib-matrix-factorization.md @@ -9,8 +9,9 @@ displayTitle: MLlib - Matrix Factorization #LU Factorization -[`blockLU()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) returns a [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) -The LU decomposition is a well studied algorithm for decomposing a matrix into a lower diagonal matrix $L$ and an upper diagonal matrix $U$. + +The LU decomposition is a well studied algorithm for decomposing a matrix into a lower diagonal matrix +$L$ and an upper diagonal matrix $U$, given as `\begin{equation} PA = LU\\ @@ -32,10 +33,19 @@ The LU decomposition is a well studied algorithm for decomposing a matrix into a \label{eq:generalLUFactorization} \end{equation}` -and $P$ is a row permutation matrix. This algorithm is a highly stable method for inverting a matrix and solving linear systems of equations that appear in machine learning applications. Larger linear equations of the type $Ax=b$ are usually solved with SGD, BGFS, or other gradient based methods. Being able to solve these equations at scale to numerical precision rather than to convergence should open up possiblities for new algorithms within MLlib as well as other applications. It scales as ~$n^3$, for a [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) with $n \times n$ blocks. +where $P$ is a row permutation matrix. [`blockLU()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) +returns a Tuple containing the $P,L$ and $U$ [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) +objects. -Once the decomposition is computed, the inverse is straightforward to compute using a forward subsitution method on $L$ and $U$. The inversion method is not yet available in [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix). +This algorithm is a highly stable method for inverting a matrix and solving linear systems of equations that appear in +machine learning applications. Larger linear equations of the type $AX=B$ are usually solved with SGD, BGFS, or other +gradient based methods. Solving these equations at scale to numerical precision should open up possiblities for new algorithms within MLlib, as well as other applications. +It scales as ~$N^3$, for a [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) +with $N \times N$ blocks. +Once the decomposition is computed, many other quantities can be easily derived. The most important one, +[`BlockMatrix.solve()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix), and is +described in the next section. **Example** @@ -73,7 +83,7 @@ println( "error (sb ~6.7e-16): " + error.toString ) **How it Works** -The LU decomposition of $A$ can be written in 4 block form as: +The LU decomposition of $A$ can be written as four block matrices, given as `\begin{align} PA & = LU\\ \begin{pmatrix} @@ -91,7 +101,7 @@ The LU decomposition of $A$ can be written in 4 block form as: & = \begin{pmatrix} L_{11}U_{11}&L_{11}U_{12} \\ L_{21}U_{11}&L_{21}U_{12}+L_{22}U_{22} - \end{pmatrix} + \end{pmatrix}. \label{eq:basicLUBlockDecomposition} \end{align}` @@ -104,26 +114,36 @@ P_2 A_{21} & = L_{21}U_{11} & \Rightarrow & L_{21} P_2 A_{22} & = L_{21}U_{12}+L_{22}U_{22} & \Rightarrow & (P_{2},L_{22},U_{22}) & = & \text{LU}_{RECURSIVE}(S) \label{eq:A22Solve}\\ \end{align}` -where $A_{11}$ is chosen to be a single block, so that the Breeze library can be called to compute $\eqref{eq:A11Solve}$. The Breeze library will return the $P_{1},L_{11}$ and $U_{11}$ matrices. Equation $\eqref{eq:A22Solve}$ is a recursive call that generates successive calculations of the Schur Complement, given as +where $A_{11}$ is chosen to be a single block, so that the Breeze library can be called to compute +$\eqref{eq:A11Solve}$. The Breeze library will return the $P_{1},L_{11}$ and $U_{11}$ matrices. + + +Equation $\eqref{eq:A22Solve}$ is a recursive call that generates successive calculations of the Schur Complement, +given as `\begin{equation} S = A_{22} - L_{21} U_{12}. \label{eq:SchurComplementInitialForm} \end{equation}` -In this form, there is a dependency on the calculation of $\eqref{eq:A11Solve}$. An equivalent form of the Schur complement can be used by substituting equations $\eqref{eq:U12Solve}$ and $\eqref{eq:L21Solve}$, giving +In this form, there is a dependency on the calculation of $\eqref{eq:A11Solve}$. An equivalent form of the Schur +complement can be used by substituting equations $\eqref{eq:U12Solve}$ and $\eqref{eq:L21Solve}$, giving `\begin{equation} S = A_{22} - A_{21} A_{11}^{-1} A_{12}, \label{eq:SchurComplementFinalForm} \end{equation}` -which allows for the calculation of equation $\eqref{eq:A22Solve}$ with no dependency on equation $\eqref{eq:A11Solve}$, resulting in a slight increase in parallelism at the expense of recomputing the inverse of $A_{11}$ on a separate process. - -The Schur Complement in $\eqref{eq:A22Solve}$ is computed using $\eqref{eq:SchurComplementFinalForm}$ and passed recursively to the next itereration. In this way, $P_2$ is never explicitly calculated, but is built up as a set of $P_1$ matrices. The computation completes when the Schur Complement is a single block. +which allows for the calculation of equation $\eqref{eq:A22Solve}$ with no dependency on equation +$\eqref{eq:A11Solve}$, resulting in a slight increase in parallelism at the expense of recomputing the inverse of +$A_{11}$ on a separate process. - - Equations $\eqref{eq:U12Solve}$ and $\eqref{eq:L21Solve}$ are computed with [`BlockMatrix.multiply()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) operations. The Schur Complement in $\eqref{eq:A22Solve}$ is computed using $\eqref{eq:SchurComplementFinalForm}$ with a [`BlockMatrix.multiply()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) opperation and passed recursively to the next itereration. In this way, $P_2$ is never explicitly calculated, but is built up as a set of $P_1$ matrices. The computation completes when the Schur Complement is a single block. +Equations $\eqref{eq:U12Solve}$ and $\eqref{eq:L21Solve}$ are computed with +[`BlockMatrix.multiply()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) operations. +The Schur Complement in $\eqref{eq:A22Solve}$ is computed using $\eqref{eq:SchurComplementFinalForm}$ with a +[`BlockMatrix.multiply()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) +operation and passed recursively to the next iteration. In this way, $P_2$ is never explicitly calculated, +but is built up as a set of $P_1$ matrices. The computation completes when the Schur Complement is a single block. Instead of building the solution incrementally, and using more frequent but smaller block matrix operations, we construct large [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) structures, and carry out the multiplication at the end of the calculation. This should leverage the optimizations present in the [`BlockMatrix.multiply()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) routine more effectively. The matrices formed to carry out the operations are described int the in the figure below. @@ -138,4 +158,91 @@ Instead of building the solution incrementally, and using more frequent but smal The Matrix multiply operations shown in the figure are generalizations of equations $\eqref{eq:A11Solve}$, $\eqref{eq:U12Solve}$, and $\eqref{eq:L21Solve}$. +#Solving a Linear System of Equations +[`BlockMatrix.solve()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) calls the LU Factorization +method to solve the linear system of equations $AX=B$ for $X$. +The method of forward subsitution solution is used. This method is is described +[`here`](https://en.wikipedia.org/wiki/Triangular_matrix#Forward_and_back_substitution), for elements of a matrix, and +and the generalization to blocks using [`BlockMatrix.solve()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) +is described briefly here. + +For a Linear system of equations $AX=B$, where $A$ is an $N \times N$ matrix, and $X$ and $B$ are +[`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) objects of with $N$ +row blocks and $W$ column blocks. $B$ can have any number of columns, including 1. The number of row and columns +per block must be the same, however. [`BlockMatrix.solve(B)`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) +will return $X$ as a [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) object +with the same dimensions of $B$. + +The forward substitution equations are straightforward do obtain in two steps. The equation $AX=PB$ +can be expressed as $LUX=\widehat{B}$. First, the solution to $LY=\widehat{B}$ for $Y$ is obtained, followed by +solving $UX=Y$ for $X$. + +Expanding $LY=\widehat{B}$ as [`BlockMatrix.multiply()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) +operations gives + +`\begin{align} +\widehat{B}_m &=& \sum_{i=1}^{m}L_{m,i}Y_m \\ + &=& L_{m,m}Y_m + \sum_{i=1}^{m-1}L_{m,i}Y_m. \\ +\end{align}` + +The solution for the $m$th row of $Y$ (which contains $W$ column elements) in terms of previous rows with the following +recurrence relation + +`\begin{equation} +Y_m = L_{m,m}^{-1}\left [ \widehat{B}_m-\sum_{i=1}^{m}L_{m,i}Y_m\right ].\\ +\end{equation}` + +Once $Y$ is obtained, a similar method is applied to solve $UX=Y$. Here we build the solution from the last row block, +so the indexing is reversed. Letting $\widetilde{m}=N-m+1$ and expanding the multiplication terms gives + + +`\begin{align} +Y_{\widetilde{m}} &=& \sum_{i=\widetilde{m}}^{N}U_{\widetilde{m},i}X_{\widetilde{m}} \\ + &=& U_{\widetilde{m},\widetilde{m}}X_{\widetilde{m}} + \sum_{i=\widetilde{m}+1}^{N}U_{\widetilde{m},i}X_{\widetilde{m}}. \\ +\end{align}` + +$X$ is obtained, starting with the bottom row block as $\widetilde{m}=N$ and incrementing to $\widetilde{m}=1$, using + +`\begin{equation} +X_\widetilde{m} = U_{\widetilde{m},\widetilde{m}}^{-1}\left[ Y_{\widetilde{m}}-\sum_{i=\widetilde{m}+1}^{N}U_{\widetilde{m},i}X_\widetilde{m}\right ].\\ +\end{equation}` +**Example** +{% highlight scala %} +import breeze.linalg.{DenseMatrix => BDM} +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.linalg.distributed.BlockMatrix + + // square matrix, but not fully populated in edge blocks. +val blocksForLU: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 2.0, 3.0, 2.0))), + ((0, 1), new DenseMatrix(2, 2, Array(2.0, 1.0, 3.0, 5.0))), + ((1, 0), new DenseMatrix(2, 2, Array(3.0, 2.0, 1.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), + ((0, 2), new DenseMatrix(2, 1, Array(1.0, 1.0))), + ((1, 2), new DenseMatrix(2, 1, Array(1.0, 3.0))), + ((2, 0), new DenseMatrix(1, 2, Array(1.0, 0.0))), + ((2, 1), new DenseMatrix(1, 2, Array(1.0, 2.0))), + ((2, 2), new DenseMatrix(1, 1, Array(4.0)))) + +val A = new BlockMatrix(sc.parallelize(blocksForLU), 2, 2) + +val B = new BlockMatrix(sc.parallelize(Seq( //5x7 B vector (7 columns of 5 row B) + ((0, 0), new DenseMatrix(2, 2, Array( 0, 1, -1, 2))), + ((0, 1), new DenseMatrix(2, 2, Array(-2, -1, -3, -2))), + ((0, 2), new DenseMatrix(2, 2, Array(-4, -3, -5, -4))), + ((0, 3), new DenseMatrix(2, 2, Array(-5, -6, -7, -7))), + ((1, 0), new DenseMatrix(2, 2, Array( 2, 3, 1, 2))), + ((1, 1), new DenseMatrix(2, 2, Array( 3, 1, -1, 4))), + ((1, 2), new DenseMatrix(2, 2, Array(-2, -1, -3, -2))), + ((1, 3), new DenseMatrix(2, 2, Array(2, 1, 3, 2))), + ((2, 0), new DenseMatrix(1, 2, Array( 0, 5))), + ((2, 1), new DenseMatrix(1, 2, Array( 1, 3))), + ((2, 2), new DenseMatrix(1, 2, Array( 0, 0))), + ((2, 3), new DenseMatrix(1, 2, Array( 1, 1))))), 2, 2) + +val X = A.solve(B) +val residual = A.multiply(X).subtract(B) +val error = residual.toLocalMatrix.toArray.reduce(_ + Math.abs(_)) +println("error" + error.toString)//sb 2.25e-14 +{% endhighlight %} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 051b40244a4ca..9de2c0bb7453b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -313,7 +313,7 @@ class BlockMatrix @Since("1.3.0") ( } /** Collects data and assembles a local dense breeze matrix (for test only). */ - private[mllib] def toBreeze(): BDM[Double] = { + private [mllib] def toBreeze(): BDM[Double] = { val localMat = toLocalMatrix() new BDM[Double](localMat.numRows, localMat.numCols, localMat.toArray) } @@ -479,7 +479,8 @@ class BlockMatrix @Since("1.3.0") ( * @since 1.6.0 */ - private[mllib] def subBlock(blockRowRange: (Int, Int), blockColRange: (Int, Int)): BlockMatrix = { + private [mllib] def subBlock(blockRowRange: (Int, Int), blockColRange: (Int, Int)): + BlockMatrix = { // Extracts BlockMatrix elements from a specified range of block indices // Creating a Sub BlockMatrix of rectangular shape. // Also reindexes so that the upper left block is always (0, 0) @@ -502,7 +503,7 @@ class BlockMatrix @Since("1.3.0") ( * @return List[BDM[Double]] of Breeze Matrices (BDM) (P,L,U) for blockLU method. * @since 1.6.0 */ - def singleBlockPLU: List[BDM[Double]] = { + private [mllib] def singleBlockPLU: List[BDM[Double]] = { // returns PA = LU factorization from Breeze val PLU = LU(this.subBlock((0, 0), (0, 0)).toBreeze) val k = PLU._1.cols @@ -521,6 +522,27 @@ class BlockMatrix @Since("1.3.0") ( return List(P, L, U) } + + /** This method reassigns 'absolute' index locations (i,j), to sequences. This is + * designed to reconsitute the orignal block locations that were lost in the + * subBlock method. + * + * @param rowMin The new lowest row value + * @param colMin The new lowest column value + * @return an RDD of Sequences with new block indexing + * @since 1.6.0 + * + */ + private [mllib] def shiftIndices(rowMin: Int, colMin: Int): RDD[((Int, Int), Matrix)] = { + // This routine recovers the absolute indexing of the block matrices for reassembly + val extractedSeq = this.blocks.map{ // shifting indices + case(((x, y), matrix)) => ((x + rowMin, y + colMin), matrix) + } + return extractedSeq + } + + + /** Computes the LU Decomposition of a Square Matrix. For a matrix A of size (n x n) * LU decomposition computes the Lower Triangular Matrix L, the Upper Triangular * Matrix U, along with a Permutation Matrix P, such that PA=LU. The Permutation @@ -528,18 +550,25 @@ class BlockMatrix @Since("1.3.0") ( * solution of L or U. * * The BlockMatrix version takes a BlockMatrix as an input and returns a Tuple - * of 3 BlockMatrix objects: + * of 5 BlockMatrix objects: * P, L, U (in that order), such that P.multiply(A)-L.multiply(U) = 0 + * and Li, Ui, which are the inverse of the block diagonal terms for L and U. + * + * The blockLU method will return only P,L, and U, but blockLUtoSolver will return + * the extra Li and Ui matrices, which will be used by the solve method + * so that it does not need to recompute these values. * * The method follows a procedure similar to the method used in ScaLAPACK, but * places more emphasis on preparing BlockMatrix objects as inputs to large - * BlockMatrix.multiply - * operations. + * BlockMatrix.multiply operations. * - * @return P,L,U as a Tuple of BlockMatrix + * + * @return P,L,U,Li,Ui as a Tuple of BlockMatrix + * @since 1.6.0 */ - def blockLU: (BlockMatrix, BlockMatrix, BlockMatrix) = { + private [mllib] def blockLUtoSolver: + (BlockMatrix, BlockMatrix, BlockMatrix, BlockMatrix, BlockMatrix) = { // builds up the array as a union of RDD sets val nDiagBlocks = this.numColBlocks @@ -549,27 +578,6 @@ class BlockMatrix @Since("1.3.0") ( // accessing the spark context val sc = this.blocks.sparkContext - /** This method reassigns 'absolute' index locations (i,j), to sequences. This is - * designed to reconsitute the orignal block locations that were lost in the - * subBlock method. - * - * @param blockRowRange The lower and upper row ranges, as (Int,Int) - * @param blockColRange The lower and upper col ranges, as (Int, Int) - * @return an RDD of Sequences with new block indexing - * @since 1.6.0 - * - */ - def shiftIndicesOfBlockMatrix(M: BlockMatrix, - blockRowRange: (Int, Int), - blockColRange: (Int, Int)): RDD[((Int, Int), Matrix)] = { - // This routine recovers the absolute indexing of the block matrices for reassembly - val rowMin = blockRowRange._1; val colMin = blockColRange._1; - val extractedSeq = M.blocks.map{ // shifting indices - case(((x, y), matrix)) => ((x + rowMin, y + colMin), matrix) - } - return extractedSeq - } - /** Recursive Sequence Build is a nested recursion method that builds up all of the * sequences that are converted to BlockMatrix classes for large matrix * multiplication operations. The Schur Complement is calculated at each @@ -637,10 +645,11 @@ class BlockMatrix @Since("1.3.0") ( val nextLi = sc.parallelize(Seq(((rowI, rowI), Li))) val nextUi = sc.parallelize(Seq(((rowI, rowI), Ui))) - val nextLD = shiftIndicesOfBlockMatrix(ABlock.subBlock(botRangeRel, topRangeRel), - botRangeAbs, topRangeAbs) - val nextUD = shiftIndicesOfBlockMatrix(ABlock.subBlock(topRangeRel, botRangeRel), - topRangeAbs, botRangeAbs) + val nextLD = ABlock.subBlock(botRangeRel, topRangeRel). + shiftIndices(botRangeAbs._1, topRangeAbs._1) + val nextUD = ABlock.subBlock(topRangeRel, botRangeRel). + shiftIndices(topRangeAbs._1, botRangeAbs._1) + val nextTuple = (nextP ++ prevP, nextL ++ prevL, nextU ++ prevU, nextLi ++ prevLi, nextUi ++ prevUi, prevLD ++ nextLD, prevUD ++ nextUD, SBlock) @@ -669,10 +678,12 @@ class BlockMatrix @Since("1.3.0") ( val Z = Matrices.dense(this.rowsPerBlock, this.colsPerBlock, ZB.toArray) val firstZ = sc.parallelize(Seq(((0, 0), Z))) val topRange = (0, 0); val botRange = (1, this.numRowBlocks-1) - val nextLD = shiftIndicesOfBlockMatrix(this.subBlock(botRange, topRange), - botRange, topRange) - val nextUD = shiftIndicesOfBlockMatrix(this.subBlock(topRange, botRange), - topRange, botRange) + + val nextLD = this.subBlock(botRange, topRange). + shiftIndices(botRange._1, topRange._1) + val nextUD = this.subBlock(topRange, botRange). + shiftIndices(topRange._1, botRange._1) + val nextTuple = (nextP, nextL, nextU, nextLi, nextUi, firstZ ++ nextLD, firstZ ++ nextUD, this.SchurComplement) @@ -704,7 +715,132 @@ class BlockMatrix @Since("1.3.0") ( // U = ( d[Linv] * dP * UD + dU ) val UFin = dLi.multiply(dP.multiply(UD)).add(dU) // val UFin = dLi.multiply(UD).add(dU) - return (PFin, LFin, UFin) + return (PFin, LFin, UFin, dLi, dUi) } -} + +/** Returns the LU Decomposition of a Square Matrix. For a matrix A of size (n x n) + * LU decomposition computes the Lower Triangular Matrix L, the Upper Triangular + * Matrix U, along with a Permutation Matrix P, such that PA=LU. The Permutation + * Matrix addresses cases where zero entries prevent forward substitution + * solution of L or U. The main method, blockLUtoSolver, returns more values that are used + * by the solver, and this method returns only P, L, and U. + * + * @return P,L,U as a Tuple of BlockMatrix + * @since 1.6.0 +*/ + + def blockLU: (BlockMatrix, BlockMatrix, BlockMatrix) = { + val PLU = this.blockLUtoSolver + val P = PLU._1 + val L = PLU._2 + val U = PLU._3 + return (P, L, U) + } + +/** For the matrix Equation AX=B, where A is NxN blocks, and X, B are matrices of + * dimension NxW blocks, A.solve(B) returns X. B can be a single column vector of length N + * (W=1), or a matrix of W column vectors. In all cases, B must be a BlockMatrix with + * the same block size and number of row blocks. The width can vary according to the + * number of columns in B. + * + * @return X as a BlockMatrix + * @since 1.6.0 + */ + def solve(B: BlockMatrix): BlockMatrix = { + val solutionPLU = this.blockLUtoSolver + + val P = solutionPLU._1 + val L = solutionPLU._2 + val U = solutionPLU._3 + val Li = solutionPLU._4 + val Ui = solutionPLU._5 + val pB = P.multiply(B) + val N = this.numRowBlocks + val W = B.numColBlocks + val sc = this.blocks.sparkContext + + // last diagonal block is not always fully populated + val numRowsLast = this.subBlock((N - 1, N - 1), (N - 1, N - 1)).numRows.toInt + val numColsLast = this.subBlock((N - 1, N - 1), (N - 1, N - 1)).numCols.toInt + // solving AX = b; + // 1) solve LY = PB for y + // 2) solve UX = Y for x + + // Solving LY = PB for Y using (see docs): + def recursiveYBuild(m: Int, prevY: BlockMatrix): BlockMatrix = { + // note that only m and prevY are passed, while the remaining + // variables are used from the scope of BlockMatrix.solve() + + // the diagonal block inv(L) is *mostly* computed in the blockLU routine + // but the last diagonal block is not used for constructing the inverse. + // It is computed here. + val invL = { + if (m == N - 1){ + new BlockMatrix( + sc.parallelize(Seq(((0, 0), Matrices.dense(numRowsLast, numColsLast, + inv(L.subBlock((m, m), (m, m)).toBreeze).toArray)))), + this.rowsPerBlock, this.colsPerBlock) + } + else { Li.subBlock((m, m), (m, m)) } + } + + val nextY = new BlockMatrix( + invL.multiply( pB.subBlock((m, m), (0, W - 1)).subtract( + L.subBlock((m, m), (0, m-1)).multiply(prevY))).shiftIndices(m, 0), + this.rowsPerBlock, this.colsPerBlock) + + val currentY = new BlockMatrix(prevY.blocks ++ nextY.blocks, + this.rowsPerBlock, this.colsPerBlock) + + if (m == N - 1){return currentY} // terminal case + else { return recursiveYBuild(m + 1, currentY)} // recursive case + } + + // Solving LY = PB for Y using (see docs): + val firstY = Li.subBlock((0, 0), (0, 0)). + multiply(pB.subBlock((0, 0), (0, W-1))) + + val Y = recursiveYBuild(1, firstY ) + + // defining X recursion build for second solver + def recursiveXBuild(mRev: Int, prevX: BlockMatrix): BlockMatrix = { + // very similar to recursiveYBuild, but builds from the bottom up, + // which requires extra bookkeepping. + + val nextX = new BlockMatrix( + Ui.subBlock((mRev, mRev), (mRev, mRev)).multiply( + Y.subBlock((mRev, mRev), (0, W - 1)).subtract( + U.subBlock((mRev, mRev), (mRev + 1, N - 1 )).multiply( + prevX.subBlock((mRev + 1, N - 1), (0, W - 1)) + ))).shiftIndices(mRev, 0), + this.rowsPerBlock, this.colsPerBlock) + + val currentX = new BlockMatrix(prevX.blocks ++ nextX.blocks, + this.rowsPerBlock, this.colsPerBlock) + + + if (mRev == 0 ){return currentX} // terminal case + else { return recursiveXBuild(mRev - 1, currentX)} // recursive case + } + + // Solving UX = Y for X + val mRev = N - 1 + + // last diagonal block of inv(U) is not stored...and needs to be computed + val invU = new BlockMatrix( sc.parallelize( Seq(((mRev, mRev), + Matrices.dense( numRowsLast, numColsLast, + inv(U.subBlock((mRev, mRev), (mRev, mRev)).toBreeze).toArray)))), + this.rowsPerBlock, this.colsPerBlock) + + val firstX = new BlockMatrix( + invU.subBlock((mRev, mRev), (mRev, mRev)). + multiply(Y.subBlock((mRev, mRev), (0, W - 1))). + shiftIndices(mRev, 0), + this.rowsPerBlock, this.colsPerBlock) + + val X = recursiveXBuild(mRev-1, firstX) + return X + + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala index b8486ca13cc64..f52cbf23366f0 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrixSuite.scala @@ -318,7 +318,7 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext { val error = residual.toLocalMatrix.toArray.reduce(_ + Math.abs(_)) assert(error < 6.8e-16) // should be ~ 2.22e-16 - // testing matrices that have nontrivial permutation matrices + // testing matrices that have nontrivial permutations in solution val blocksForSecondLU: Seq[((Int, Int), Matrix)] = Seq( ((0, 0), new DenseMatrix(2, 2, Array( 0, 1, -1, 2))), ((0, 1), new DenseMatrix(2, 2, Array(-2, -1, -3, -2))), @@ -339,4 +339,53 @@ class BlockMatrixSuite extends SparkFunSuite with MLlibTestSparkContext { val error2 = residual2.toLocalMatrix.toArray.reduce(_ + Math.abs(_)) assert(error2 < 2.6e-14) // should be ~ 2.49e-14 } + + test("BlockMatrix Solver") { + // square matrix, but not fully populated in edge blocks. + val blocksForLU: Seq[((Int, Int), Matrix)] = Seq( + ((0, 0), new DenseMatrix(2, 2, Array(1.0, 2.0, 3.0, 2.0))), + ((0, 1), new DenseMatrix(2, 2, Array(2.0, 1.0, 3.0, 5.0))), + ((1, 0), new DenseMatrix(2, 2, Array(3.0, 2.0, 1.0, 1.0))), + ((1, 1), new DenseMatrix(2, 2, Array(1.0, 2.0, 0.0, 1.0))), + ((0, 2), new DenseMatrix(2, 1, Array(1.0, 1.0))), + ((1, 2), new DenseMatrix(2, 1, Array(1.0, 3.0))), + ((2, 0), new DenseMatrix(1, 2, Array(1.0, 0.0))), + ((2, 1), new DenseMatrix(1, 2, Array(1.0, 2.0))), + ((2, 2), new DenseMatrix(1, 1, Array(4.0)))) + + val A = new BlockMatrix(sc.parallelize(blocksForLU), 2, 2) + + // B is a single column vector, but still defined as a BlockMatrix + val singleColumnB = new BlockMatrix(sc.parallelize(Seq( + ((0, 0), new DenseMatrix(2, 1, Array(1.0, 2.0))), + ((1, 0), new DenseMatrix(2, 1, Array(3.0, 4.0))), + ((2, 0), new DenseMatrix(1, 1, Array(5.0))))), 2, 2) + + + val X1 = A.solve(singleColumnB) + val residual1 = A.multiply(X1).subtract(singleColumnB) + val error1 = residual1.toLocalMatrix.toArray.reduce(_ + Math.abs(_)) + assert(error1 < 9.0e-16) // should be ~ 8.88e-16 + + // B is a 5x7 matrix, or (7 columns of B vectors of length 5) + // Still defined as a BlockMatrix + val manyColumnB = new BlockMatrix(sc.parallelize(Seq( + ((0, 0), new DenseMatrix(2, 2, Array( 0, 1, -1, 2))), + ((0, 1), new DenseMatrix(2, 2, Array(-2, -1, -3, -2))), + ((0, 2), new DenseMatrix(2, 2, Array(-4, -3, -5, -4))), + ((0, 3), new DenseMatrix(2, 2, Array(-5, -6, -7, -7))), + ((1, 0), new DenseMatrix(2, 2, Array(2, 3, 1, 2))), + ((1, 1), new DenseMatrix(2, 2, Array(3, 1, -1, 4))), + ((1, 2), new DenseMatrix(2, 2, Array(-2, -1, -3, -2))), + ((1, 3), new DenseMatrix(2, 2, Array(2, 1, 3, 2))), + ((2, 0), new DenseMatrix(1, 2, Array(0, 5))), + ((2, 1), new DenseMatrix(1, 2, Array(1, 3))), + ((2, 2), new DenseMatrix(1, 2, Array(0, 0))), + ((2, 3), new DenseMatrix(1, 2, Array(1, 1))))), 2, 2) + + val X2 = A.solve(manyColumnB) + val residual2 = A.multiply(X2).subtract(manyColumnB) + val error2 = residual2.toLocalMatrix.toArray.reduce(_ + Math.abs(_)) + assert(error2<2.5e-14) // sb ~ 2.25e-14 + } }