Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-8514] LU factorization on BlockMatrix #8563

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
9214a1c
SPARK-8514 First Commit
nilmeier Aug 27, 2015
ed3bcd4
Ffixed Permutation Matrix isse.
nilmeier Aug 31, 2015
6ba0614
Added documentation. Also made some methods private and removed unus…
nilmeier Sep 1, 2015
2953f19
removed extraneous doc
nilmeier Sep 1, 2015
49a3cdb
updating from apache
nilmeier Sep 2, 2015
2f62cca
wRan ./dev/run-tests and fixed up style. Also modified docs to have …
nilmeier Sep 2, 2015
27c8dcb
updating version
nilmeier Sep 24, 2015
2651b83
added solve method. Refactored blockLU to return inverses to new sol…
nilmeier Sep 24, 2015
625ae03
merging to current version
nilmeier Oct 2, 2015
0be0897
Updated after Fred's Review. More comments in pull request.
nilmeier Oct 5, 2015
de594b1
Updating with Fred's review recommendations. Comments and replies ar…
nilmeier Oct 5, 2015
e6e5c86
updating version
nilmeier Oct 5, 2015
28b61b8
updating versions
nilmeier Oct 11, 2015
032805d
addressing @dbtsai review comments
nilmeier Oct 11, 2015
2984c64
fixing comments for scaladoc (per @dbtsai review)
nilmeier Oct 11, 2015
9c94e43
correcting comment syntax
nilmeier Oct 11, 2015
cb52562
correcting comment syntax (2nd update)
nilmeier Oct 11, 2015
be23343
correcting comment syntax (3rd update)
nilmeier Oct 11, 2015
3038bfd
adding base and extended class for BlockMatrix objects to improve rea…
nilmeier Oct 11, 2015
0909197
made new classes private [mllib]
nilmeier Oct 11, 2015
66cc11d
updating to current branch and addressing @yu-iskw comments
nilmeier Nov 20, 2015
ac0f3cd
applying stashed changes
nilmeier Nov 20, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/img/lu-factorization.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
16 changes: 13 additions & 3 deletions docs/mllib-data-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ import org.apache.spark.mllib.linalg.{Matrix, Matrices}
dm2 = Matrices.dense(3, 2, [1, 2, 3, 4, 5, 6])

// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
sm = Matrices.sparse(3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
sm = Matrices.sparse([Matrix Factorization](mllib-matrix-factorization.html)3, 2, [0, 1, 3], [0, 2, 1], [9, 6, 8])
{% endhighlight %}
</div>

Expand Down Expand Up @@ -660,7 +660,7 @@ blockMat = mat.toBlockMatrix()
A `BlockMatrix` is a distributed matrix backed by an RDD of `MatrixBlock`s, where a `MatrixBlock` is
a tuple of `((Int, Int), Matrix)`, where the `(Int, Int)` is the index of the block, and `Matrix` is
the sub-matrix at the given index with size `rowsPerBlock` x `colsPerBlock`.
`BlockMatrix` supports methods such as `add` and `multiply` with another `BlockMatrix`.
`BlockMatrix` supports methods such as `add`, `subtract`, and `multiply` with another `BlockMatrix`.
`BlockMatrix` also has a helper function `validate` which can be used to check whether the
`BlockMatrix` is set up properly.

Expand All @@ -669,8 +669,10 @@ the sub-matrix at the given index with size `rowsPerBlock` x `colsPerBlock`.

A [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) can be
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` by calling `toBlockMatrix`.
`toBlockMatrix` creates blocks of size 1024 x 1024 by default.
The `toBlockMatrix` method creates blocks of size 1024 x 1024 by default.
Users may change the block size by supplying the values through `toBlockMatrix(rowsPerBlock, colsPerBlock)`.
Basic operations include `add`, `subtract`, and `multiply`. See [Matrix Factorization](mllib-matrix-factorization.html)
additional methods.

Refer to the [`BlockMatrix` Scala docs](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) for details on the API.

Expand All @@ -689,7 +691,15 @@ matA.validate()

// Calculate A^T A.
val ata = matA.transpose.multiply(matA)

// Calculate A + A
val aPlusA = matA.add(matA)

// Calulate (A + A) - A (should return A)
val sameA = aPlusA.subtract(matA)

{% endhighlight %}

</div>

<div data-lang="java" markdown="1">
Expand Down
9 changes: 9 additions & 0 deletions docs/mllib-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ We list major functionality from both below, with links to detailed guides.
* [Optimization (developer)](mllib-optimization.html)
* [stochastic gradient descent](mllib-optimization.html#stochastic-gradient-descent-sgd)
* [limited-memory BFGS (L-BFGS)](mllib-optimization.html#limited-memory-bfgs-l-bfgs)
* [PMML model export](mllib-pmml-model-export.html)
* [Matrix Factorization](mllib-matrix-factorization.html)
* [LU Factorization](mllib-matrix-factorization.html#lu-factorization)
* [Solving a Linear System of Equations](mllib-matrix-factorization.html#solving-a-linear-system-of-equations)

MLlib is under active development.
The APIs marked `Experimental`/`DeveloperApi` may change in future releases,
and the migration guide below will explain all changes between releases.
=======

# spark.ml: high-level APIs for ML pipelines

Expand Down
215 changes: 215 additions & 0 deletions docs/mllib-matrix-factorization.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
---
layout: global
title: Matrix Factorization - MLlib
displayTitle: <a href="mllib-guide.html">MLlib</a> - Matrix Factorization
---

* Table of contents
{:toc}

#LU Factorization


The LU decomposition is a well studied algorithm for decomposing a matrix into a lower diagonal matrix
$L$ and an upper diagonal matrix $U$, given as

`\begin{equation}
PA = LU\\
P \begin{pmatrix}
a_{11}&\cdots &a_{1n} \\
\vdots& \ddots &\vdots \\
a_{m1}&\cdots & a_{mn}
\end{pmatrix} =
\begin{pmatrix}
\ell_{11}&\ 0 &0 \\
\vdots& \ddots &0 \\
\ell_{m1}&\cdots & \ell_{mn}
\end{pmatrix}
\begin{pmatrix}
0&\ \cdots &u_{1n} \\
\vdots& \ddots &\vdots\\
0&\cdots & u_{mn}
\end{pmatrix},
\label{eq:generalLUFactorization}
\end{equation}`

where $P$ is a row permutation matrix. [`blockLU()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix)
returns a Tuple containing the $P,L$ and $U$ [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix)
objects.

This algorithm is a highly stable method for inverting a matrix and solving linear systems of equations that appear in
machine learning applications. Larger linear equations of the type $AX=B$ are usually solved with SGD, BGFS, or other
gradient based methods. Solving these equations at scale to numerical precision should open up possiblities for new
algorithms within MLlib, as well as other applications.

Once the decomposition is computed, many other quantities can be easily computed, including the solution to a
linear system of equations via [`BlockMatrix.solve()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix), which is
described in the next section.

**Example**

{% highlight scala %}
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix}
import org.apache.spark.mllib.linalg.{Matrix=>SparkMatrix,Matrices=>SparkMatrices}
import org.apache.spark.mllib.linalg.{DenseMatrix}

val blocks: RDD[(Int, Int), Matrix] = ... // an RDD of (i,j,m) matrix entries

val rowsPerBlock = 2; val colsPerBlock = 2;
val A = new BlockMatrix(blocks, rowsPerBlock, colsPerBlock)

val soln = A.blockLU
val P = soln.P
val L = soln.L
val U = soln.U

// computing a residual...
val residual = L.multiply(U).subtract(P.multiply(A))
val error = residual.toLocalMatrix.toArray.reduce(_ + Math.abs(_) )

println( "error : " + error.toString )
{% endhighlight %}

**How it Works**

The LU decomposition of $A$ can be written as four block matrices, given as
`\begin{align}
PA & = LU\\
\begin{pmatrix}
P_1 A_{11}&P_1 A_{12} \\
P_2 A_{21}&P_2 A_{22}
\end{pmatrix}
& = \begin{pmatrix}
L_{11}&0 \\
L_{21}&L_{22}
\end{pmatrix}
\begin{pmatrix}
U_{11}&U_{12} \\
0&U_{22}
\end{pmatrix} \\
& = \begin{pmatrix}
L_{11}U_{11}&L_{11}U_{12} \\
L_{21}U_{11}&L_{21}U_{12}+L_{22}U_{22}
\end{pmatrix}.
\label{eq:basicLUBlockDecomposition}
\end{align}`

Once the blocks are defined, we can then solve each matrix quadrant individually.

`\begin{align}
P_1 A_{11} & = L_{11}U_{11} & \Rightarrow & (P_{1},L_{11},U_{11}) & = & \text{LU}_{LOCAL}(A_{11}) \label{eq:A11Solve} \\
P_1 A_{12} & = L_{11}U_{12} & \Rightarrow & U_{12} & = & L_{11}^{-1}P_1 A_{12} \label{eq:U12Solve} \\
P_2 A_{21} & = L_{21}U_{11} & \Rightarrow & L_{21} & = &P_2 A_{21}U_{11}^{-1} \label{eq:L21Solve}\\
P_2 A_{22} & = L_{21}U_{12}+L_{22}U_{22} & \Rightarrow & (P_{2},L_{22},U_{22}) & = & \text{LU}_{RECURSIVE}(S) \label{eq:A22Solve}\\
\end{align}`

where $A_{11}$ is chosen to be a single block, so that the Breeze library can be called to compute
$\eqref{eq:A11Solve}$. The Breeze library will return the $P_{1},L_{11}$ and $U_{11}$ matrices.


Equation $\eqref{eq:A22Solve}$ is a recursive call that generates successive calculations of the Schur Complement,
given as

`\begin{equation}
S = A_{22} - L_{21} U_{12}.
\label{eq:SchurComplementInitialForm}
\end{equation}`

In this form, there is a dependency on the calculation of $\eqref{eq:A11Solve}$. An equivalent form of the Schur
complement can be used by substituting equations $\eqref{eq:U12Solve}$ and $\eqref{eq:L21Solve}$, giving

`\begin{equation}
S = A_{22} - A_{21} A_{11}^{-1} A_{12},
\label{eq:SchurComplementFinalForm}
\end{equation}`

which allows for the calculation of equation $\eqref{eq:A22Solve}$ with no dependency on equation
$\eqref{eq:A11Solve}$, resulting in a slight increase in parallelism at the expense of recomputing the inverse of
$A_{11}$ on a separate process.

Equations $\eqref{eq:U12Solve}$ and $\eqref{eq:L21Solve}$ are computed with
[`BlockMatrix.multiply()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) operations.
The Schur Complement in $\eqref{eq:A22Solve}$ is computed using $\eqref{eq:SchurComplementFinalForm}$ with a
[`BlockMatrix.multiply()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix)
operation and passed recursively to the next iteration. In this way, $P_2$ is never explicitly calculated,
but is built up as a set of $P_1$ matrices. The computation completes when the Schur Complement is a single block.

Instead of building the solution incrementally, and using more frequent but smaller block matrix operations, we construct large [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) structures, and carry out the multiplication at the end of the calculation. This should leverage the optimizations present in the [`BlockMatrix.multiply()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) routine more effectively. The matrices formed to carry out the operations are described int the in the figure below.

<p style="text-align: center;">
<img src="img/lu-factorization.png"
title="LU Algorithm Description"
alt="LU"
width="100%" />
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>


The Matrix multiply operations shown in the figure are generalizations of equations $\eqref{eq:A11Solve}$, $\eqref{eq:U12Solve}$, and $\eqref{eq:L21Solve}$.

#Solving a Linear System of Equations
[`BlockMatrix.solve()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) calls the LU Factorization
method to solve the linear system of equations $AX=B$ for $X$.
The method of forward substitution solution is used. This method is is described
[`here`](https://en.wikipedia.org/wiki/Triangular_matrix#Forward_and_back_substitution), for elements of a matrix, and
and the generalization to blocks for the [`BlockMatrix.solve()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix)
is described briefly here.

For a Linear system of equations $AX=B$, where $A$ is an $N \times N$ matrix, and $X$ and $B$ are
[`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) objects of with $N$
row blocks and $W$ column blocks. $B$ can have any number of columns, including 1. The number of row and columns
per block (`rowsPerBlock` and `colsPerBlock`) must be the same, however. [`BlockMatrix.solve(B)`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix)
will return $X$ as a [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) object
with the same dimensions of $B$.

The forward substitution equations are straightforward do obtain in two steps. The equation $AX=B$
can be expressed as $LUX=\widehat{B}$, where $\widehat{B}=PB$. First, the solution to $LY=\widehat{B}$ for $Y$ is obtained, followed by
solving $UX=Y$ for $X$.

Expanding $LY=\widehat{B}$ as [`BlockMatrix.multiply()`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix)
operations gives

`\begin{align}
\widehat{B}_m &=& \sum_{i=1}^{m}L_{m,i}Y_m \\
&=& L_{m,m}Y_m + \sum_{i=1}^{m-1}L_{m,i}Y_m. \\
\end{align}`

The solution for the $m$th row of $Y$ (which contains $W$ column elements) in terms of previous rows with the following
recurrence relation

`\begin{equation}
Y_m = L_{m,m}^{-1}\left [ \widehat{B}_m-\sum_{i=1}^{m}L_{m,i}Y_m\right ].\\
\end{equation}`

Once $Y$ is obtained, a similar method is applied to solve $UX=Y$. Here we build the solution from the last row block,
so the indexing is reversed. Letting $\widetilde{m}=N-m+1$ and expanding the multiplication terms gives


`\begin{align}
Y_{\widetilde{m}} &=& \sum_{i=\widetilde{m}}^{N}U_{\widetilde{m},i}X_{\widetilde{m}} \\
&=& U_{\widetilde{m},\widetilde{m}}X_{\widetilde{m}} + \sum_{i=\widetilde{m}+1}^{N}U_{\widetilde{m},i}X_{\widetilde{m}}. \\
\end{align}`

$X$ is obtained, starting with the bottom row block as $\widetilde{m}=N$ and incrementing to $\widetilde{m}=1$, using

`\begin{equation}
X_\widetilde{m} = U_{\widetilde{m},\widetilde{m}}^{-1}\left[ Y_{\widetilde{m}}-\sum_{i=\widetilde{m}+1}^{N}U_{\widetilde{m},i}X_\widetilde{m}\right ].\\
\end{equation}`

**Example**
{% highlight scala %}
import breeze.linalg.{DenseMatrix => BDM}
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.linalg.distributed.BlockMatrix

val blocksforLU: RDD[(Int, Int), Matrix] = ... // an RDD of (i,j,m) matrix entries

val A = new BlockMatrix(blocks, 2, 2)

val B: BlockMatrix = ....// a BlockMatrix with the same number of rows as A

val X = A.solve(B)
val residual = A.multiply(X).subtract(B)
val error = residual.toLocalMatrix.toArray.reduce(_ + Math.abs(_))
println("error" + error.toString)
{% endhighlight %}
Loading