Skip to content

Commit

Permalink
Merge pull request apache#65 from markhamstra/csd-1.4
Browse files Browse the repository at this point in the history
SKIPME merging Apache branch-1.4 bug fixes
  • Loading branch information
markhamstra committed Jul 17, 2015
2 parents 9e54b8a + 0715408 commit 98e8af7
Show file tree
Hide file tree
Showing 8 changed files with 248 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark
import java.util.concurrent.TimeUnit

import scala.collection.mutable
import scala.util.control.ControlThrowable

import com.codahale.metrics.{Gauge, MetricRegistry}

Expand Down Expand Up @@ -204,7 +205,16 @@ private[spark] class ExecutorAllocationManager(
listenerBus.addListener(listener)

val scheduleTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -684,7 +684,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val gettingResultTime = getGettingResultTime(info)

val maybeAccumulators = info.accumulables
val accumulatorsReadable = maybeAccumulators.map{acc => s"${acc.name}: ${acc.update.get}"}
val accumulatorsReadable = maybeAccumulators.map { acc =>
StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}")
}

val maybeInput = metrics.flatMap(_.inputMetrics)
val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("")
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -665,7 +665,7 @@ Apart from these, the following properties are also available, and may be useful
<td>
Initial size of Kryo's serialization buffer. Note that there will be one buffer
<i>per core</i> on each worker. This buffer will grow up to
<code>spark.kryoserializer.buffer.max.mb</code> if needed.
<code>spark.kryoserializer.buffer.max</code> if needed.
</td>
</tr>
<tr>
Expand Down
31 changes: 31 additions & 0 deletions docs/ml-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,24 @@ layout: global
title: Spark ML Programming Guide
---

`\[
\newcommand{\R}{\mathbb{R}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\x}{\mathbf{x}}
\newcommand{\y}{\mathbf{y}}
\newcommand{\wv}{\mathbf{w}}
\newcommand{\av}{\mathbf{\alpha}}
\newcommand{\bv}{\mathbf{b}}
\newcommand{\N}{\mathbb{N}}
\newcommand{\id}{\mathbf{I}}
\newcommand{\ind}{\mathbf{1}}
\newcommand{\0}{\mathbf{0}}
\newcommand{\unit}{\mathbf{e}}
\newcommand{\one}{\mathbf{1}}
\newcommand{\zero}{\mathbf{0}}
\]`


Spark 1.2 introduced a new package called `spark.ml`, which aims to provide a uniform set of
high-level APIs that help users create and tune practical machine learning pipelines.

Expand Down Expand Up @@ -154,6 +172,19 @@ Parameters belong to specific instances of `Estimator`s and `Transformer`s.
For example, if we have two `LogisticRegression` instances `lr1` and `lr2`, then we can build a `ParamMap` with both `maxIter` parameters specified: `ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)`.
This is useful if there are two algorithms with the `maxIter` parameter in a `Pipeline`.

# Algorithm Guides

There are now several algorithms in the Pipelines API which are not in the lower-level MLlib API, so we link to documentation for them here. These algorithms are mostly feature transformers, which fit naturally into the `Transformer` abstraction in Pipelines, and ensembles, which fit naturally into the `Estimator` abstraction in the Pipelines.

**Pipelines API Algorithm Guides**

* [Feature Extraction, Transformation, and Selection](ml-features.html)
* [Ensembles](ml-ensembles.html)

**Algorithms in `spark.ml`**

* [Linear methods with elastic net regularization](ml-linear-methods.html)

# Code Examples

This section gives code examples illustrating the functionality discussed above.
Expand Down
129 changes: 129 additions & 0 deletions docs/ml-linear-methods.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
---
layout: global
title: Linear Methods - ML
displayTitle: <a href="ml-guide.html">ML</a> - Linear Methods
---


`\[
\newcommand{\R}{\mathbb{R}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\x}{\mathbf{x}}
\newcommand{\y}{\mathbf{y}}
\newcommand{\wv}{\mathbf{w}}
\newcommand{\av}{\mathbf{\alpha}}
\newcommand{\bv}{\mathbf{b}}
\newcommand{\N}{\mathbb{N}}
\newcommand{\id}{\mathbf{I}}
\newcommand{\ind}{\mathbf{1}}
\newcommand{\0}{\mathbf{0}}
\newcommand{\unit}{\mathbf{e}}
\newcommand{\one}{\mathbf{1}}
\newcommand{\zero}{\mathbf{0}}
\]`


In MLlib, we implement popular linear methods such as logistic regression and linear least squares with L1 or L2 regularization. Refer to [the linear methods in mllib](mllib-linear-methods.html) for details. In `spark.ml`, we also include Pipelines API for [Elastic net](http://en.wikipedia.org/wiki/Elastic_net_regularization), a hybrid of L1 and L2 regularization proposed in [this paper](http://users.stat.umn.edu/~zouxx019/Papers/elasticnet.pdf). Mathematically it is defined as a linear combination of the L1-norm and the L2-norm:
`\[
\alpha \|\wv\|_1 + (1-\alpha) \frac{1}{2}\|\wv\|_2^2, \alpha \in [0, 1].
\]`
By setting $\alpha$ properly, it contains both L1 and L2 regularization as special cases. For example, if a [linear regression](https://en.wikipedia.org/wiki/Linear_regression) model is trained with the elastic net parameter $\alpha$ set to $1$, it is equivalent to a [Lasso](http://en.wikipedia.org/wiki/Least_squares#Lasso_method) model. On the other hand, if $\alpha$ is set to $0$, the trained model reduces to a [ridge regression](http://en.wikipedia.org/wiki/Tikhonov_regularization) model. We implement Pipelines API for both linear regression and logistic regression with elastic net regularization.

**Examples**

<div class="codetabs">

<div data-lang="scala" markdown="1">

{% highlight scala %}

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.mllib.util.MLUtils

// Load training data
val training = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()

val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)

// Fit the model
val lrModel = lr.fit(training)

// Print the weights and intercept for logistic regression
println(s"Weights: ${lrModel.weights} Intercept: ${lrModel.intercept}")

{% endhighlight %}

</div>

<div data-lang="java" markdown="1">

{% highlight java %}

import org.apache.spark.ml.classification.LogisticRegression;
import org.apache.spark.ml.classification.LogisticRegressionModel;
import org.apache.spark.mllib.regression.LabeledPoint;
import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

public class LogisticRegressionWithElasticNetExample {
public static void main(String[] args) {
SparkConf conf = new SparkConf()
.setAppName("Logistic Regression with Elastic Net Example");

SparkContext sc = new SparkContext(conf);
SQLContext sql = new SQLContext(sc);
String path = "sample_libsvm_data.txt";

// Load training data
DataFrame training = sql.createDataFrame(MLUtils.loadLibSVMFile(sc, path).toJavaRDD(), LabeledPoint.class);

LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)

// Fit the model
LogisticRegressionModel lrModel = lr.fit(training);

// Print the weights and intercept for logistic regression
System.out.println("Weights: " + lrModel.weights() + " Intercept: " + lrModel.intercept());
}
}
{% endhighlight %}
</div>

<div data-lang="python" markdown="1">

{% highlight python %}

from pyspark.ml.classification import LogisticRegression
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.util import MLUtils

# Load training data
training = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF()

lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)

# Fit the model
lrModel = lr.fit(training)

# Print the weights and intercept for logistic regression
print("Weights: " + str(lrModel.weights))
print("Intercept: " + str(lrModel.intercept))
{% endhighlight %}

</div>

</div>

### Optimization

The optimization algorithm underlies the implementation is called [Orthant-Wise Limited-memory QuasiNewton](http://research-srv.microsoft.com/en-us/um/people/jfgao/paper/icml07scalable.pdf)
(OWL-QN). It is an extension of L-BFGS that can effectively handle L1 regularization and elastic net.
53 changes: 28 additions & 25 deletions docs/mllib-linear-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,26 @@ displayTitle: <a href="mllib-guide.html">MLlib</a> - Linear Methods

`\[
\newcommand{\R}{\mathbb{R}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\E}{\mathbb{E}}
\newcommand{\x}{\mathbf{x}}
\newcommand{\y}{\mathbf{y}}
\newcommand{\wv}{\mathbf{w}}
\newcommand{\av}{\mathbf{\alpha}}
\newcommand{\bv}{\mathbf{b}}
\newcommand{\N}{\mathbb{N}}
\newcommand{\id}{\mathbf{I}}
\newcommand{\ind}{\mathbf{1}}
\newcommand{\0}{\mathbf{0}}
\newcommand{\unit}{\mathbf{e}}
\newcommand{\one}{\mathbf{1}}
\newcommand{\ind}{\mathbf{1}}
\newcommand{\0}{\mathbf{0}}
\newcommand{\unit}{\mathbf{e}}
\newcommand{\one}{\mathbf{1}}
\newcommand{\zero}{\mathbf{0}}
\]`

## Mathematical formulation

Many standard *machine learning* methods can be formulated as a convex optimization problem, i.e.
the task of finding a minimizer of a convex function `$f$` that depends on a variable vector
`$\wv$` (called `weights` in the code), which has `$d$` entries.
`$\wv$` (called `weights` in the code), which has `$d$` entries.
Formally, we can write this as the optimization problem `$\min_{\wv \in\R^d} \; f(\wv)$`, where
the objective function is of the form
`\begin{equation}
Expand All @@ -39,7 +39,7 @@ the objective function is of the form
\ .
\end{equation}`
Here the vectors `$\x_i\in\R^d$` are the training data examples, for `$1\le i\le n$`, and
`$y_i\in\R$` are their corresponding labels, which we want to predict.
`$y_i\in\R$` are their corresponding labels, which we want to predict.
We call the method *linear* if $L(\wv; \x, y)$ can be expressed as a function of $\wv^T x$ and $y$.
Several of MLlib's classification and regression algorithms fall into this category,
and are discussed here.
Expand Down Expand Up @@ -99,6 +99,9 @@ regularizers in MLlib:
<tr>
<td>L1</td><td>$\|\wv\|_1$</td><td>$\mathrm{sign}(\wv)$</td>
</tr>
<tr>
<td>elastic net</td><td>$\alpha \|\wv\|_1 + (1-\alpha)\frac{1}{2}\|\wv\|_2^2$</td><td>$\alpha \mathrm{sign}(\wv) + (1-\alpha) \wv$</td>
</tr>
</tbody>
</table>

Expand All @@ -107,7 +110,7 @@ of `$\wv$`.

L2-regularized problems are generally easier to solve than L1-regularized due to smoothness.
However, L1 regularization can help promote sparsity in weights leading to smaller and more interpretable models, the latter of which can be useful for feature selection.
It is not recommended to train models without any regularization,
[Elastic net](http://en.wikipedia.org/wiki/Elastic_net_regularization) is a combination of L1 and L2 regularization. It is not recommended to train models without any regularization,
especially when the number of training examples is small.

### Optimization
Expand Down Expand Up @@ -527,16 +530,16 @@ print("Training Error = " + str(trainErr))
### Linear least squares, Lasso, and ridge regression


Linear least squares is the most common formulation for regression problems.
Linear least squares is the most common formulation for regression problems.
It is a linear method as described above in equation `$\eqref{eq:regPrimal}$`, with the loss
function in the formulation given by the squared loss:
`\[
L(\wv;\x,y) := \frac{1}{2} (\wv^T \x - y)^2.
\]`

Various related regression methods are derived by using different types of regularization:
[*ordinary least squares*](http://en.wikipedia.org/wiki/Ordinary_least_squares) or
[*linear least squares*](http://en.wikipedia.org/wiki/Linear_least_squares_(mathematics)) uses
[*ordinary least squares*](http://en.wikipedia.org/wiki/Ordinary_least_squares) or
[*linear least squares*](http://en.wikipedia.org/wiki/Linear_least_squares_(mathematics)) uses
no regularization; [*ridge regression*](http://en.wikipedia.org/wiki/Ridge_regression) uses L2
regularization; and [*Lasso*](http://en.wikipedia.org/wiki/Lasso_(statistics)) uses L1
regularization. For all of these models, the average loss or training error, $\frac{1}{n} \sum_{i=1}^n (\wv^T x_i - y_i)^2$, is
Expand All @@ -548,7 +551,7 @@ known as the [mean squared error](http://en.wikipedia.org/wiki/Mean_squared_erro

<div data-lang="scala" markdown="1">
The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint.
The example then uses LinearRegressionWithSGD to build a simple linear model to predict label
The example then uses LinearRegressionWithSGD to build a simple linear model to predict label
values. We compute the mean squared error at the end to evaluate
[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit).

Expand Down Expand Up @@ -610,7 +613,7 @@ public class LinearRegression {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("Linear Regression Example");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load and parse the data
String path = "data/mllib/ridge-data/lpsa.data";
JavaRDD<String> data = sc.textFile(path);
Expand All @@ -630,7 +633,7 @@ public class LinearRegression {

// Building the model
int numIterations = 100;
final LinearRegressionModel model =
final LinearRegressionModel model =
LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations);

// Evaluate model on training examples and compute training error
Expand Down Expand Up @@ -661,7 +664,7 @@ public class LinearRegression {

<div data-lang="python" markdown="1">
The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint.
The example then uses LinearRegressionWithSGD to build a simple linear model to predict label
The example then uses LinearRegressionWithSGD to build a simple linear model to predict label
values. We compute the mean squared error at the end to evaluate
[goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit).

Expand Down Expand Up @@ -698,8 +701,8 @@ a dependency.

###Streaming linear regression

When data arrive in a streaming fashion, it is useful to fit regression models online,
updating the parameters of the model as new data arrives. MLlib currently supports
When data arrive in a streaming fashion, it is useful to fit regression models online,
updating the parameters of the model as new data arrives. MLlib currently supports
streaming linear regression using ordinary least squares. The fitting is similar
to that performed offline, except fitting occurs on each batch of data, so that
the model continually updates to reflect the data from the stream.
Expand All @@ -714,7 +717,7 @@ online to the first stream, and make predictions on the second stream.

<div data-lang="scala" markdown="1">

First, we import the necessary classes for parsing our input data and creating the model.
First, we import the necessary classes for parsing our input data and creating the model.

{% highlight scala %}

Expand All @@ -726,7 +729,7 @@ import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD

Then we make input streams for training and testing data. We assume a StreamingContext `ssc`
has already been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing)
for more info. For this example, we use labeled points in training and testing streams,
for more info. For this example, we use labeled points in training and testing streams,
but in practice you will likely want to use unlabeled vectors for test data.

{% highlight scala %}
Expand All @@ -746,7 +749,7 @@ val model = new StreamingLinearRegressionWithSGD()

{% endhighlight %}

Now we register the streams for training and testing and start the job.
Now we register the streams for training and testing and start the job.
Printing predictions alongside true labels lets us easily see the result.

{% highlight scala %}
Expand All @@ -756,14 +759,14 @@ model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print()

ssc.start()
ssc.awaitTermination()

{% endhighlight %}

We can now save text files with data to the training or testing folders.
Each line should be a data point formatted as `(y,[x1,x2,x3])` where `y` is the label
and `x1,x2,x3` are the features. Anytime a text file is placed in `/training/data/dir`
the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions.
As you feed more data to the training directory, the predictions
Each line should be a data point formatted as `(y,[x1,x2,x3])` where `y` is the label
and `x1,x2,x3` are the features. Anytime a text file is placed in `/training/data/dir`
the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions.
As you feed more data to the training directory, the predictions
will get better!

</div>
Expand Down
Loading

0 comments on commit 98e8af7

Please sign in to comment.