From 50607eca59568db47ff3a1c8a19ec96b919131d7 Mon Sep 17 00:00:00 2001 From: Neelesh Srinivas Salian Date: Mon, 13 Jul 2015 15:46:51 -0700 Subject: [PATCH 1/5] [SPARK-8743] [STREAMING] Deregister Codahale metrics for streaming when StreamingContext is closed The issue link: https://issues.apache.org/jira/browse/SPARK-8743 Deregister Codahale metrics for streaming when StreamingContext is closed Design: Adding the method calls in the appropriate start() and stop () methods for the StreamingContext Actions in the PullRequest: 1) Added the registerSource method call to the start method for the Streaming Context. 2) Added the removeSource method to the stop method. 3) Added comments for both 1 and 2 and comment to show initialization of the StreamingSource 4) Added a test case to check for both registration and de-registration of metrics Previous closed PR for reference: https://github.com/apache/spark/pull/7250 Author: Neelesh Srinivas Salian Closes #7362 from nssalian/branch-SPARK-8743 and squashes the following commits: 7d998a3 [Neelesh Srinivas Salian] Removed the Thread.sleep() call 8b26397 [Neelesh Srinivas Salian] Moved the scalatest.{} import 0e8007a [Neelesh Srinivas Salian] moved import org.apache.spark{} to correct place daedaa5 [Neelesh Srinivas Salian] Corrected Ordering of imports 8873180 [Neelesh Srinivas Salian] Removed redundancy in imports 59227a4 [Neelesh Srinivas Salian] Changed the ordering of the imports to classify scala and spark imports d8cb577 [Neelesh Srinivas Salian] Added registerSource to start() and removeSource to stop(). Wrote a test to check the registration and de-registration (cherry picked from commit b7bcbe25f90ba4e78b548465bc80d4de1d2c4a4a) Signed-off-by: Tathagata Das --- .../spark/streaming/StreamingContext.scala | 10 +++-- .../streaming/StreamingContextSuite.scala | 41 ++++++++++++++++++- 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index ec49d0f42d122..6b78a82e68c24 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -192,11 +192,8 @@ class StreamingContext private[streaming] ( None } - /** Register streaming source to metrics system */ + /* Initializing a streamingSource to register metrics */ private val streamingSource = new StreamingSource(this) - assert(env != null) - assert(env.metricsSystem != null) - env.metricsSystem.registerSource(streamingSource) private var state: StreamingContextState = INITIALIZED @@ -606,6 +603,9 @@ class StreamingContext private[streaming] ( } shutdownHookRef = Utils.addShutdownHook( StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) + // Registering Streaming Metrics at the start of the StreamingContext + assert(env.metricsSystem != null) + env.metricsSystem.registerSource(streamingSource) uiTab.foreach(_.attach()) logInfo("StreamingContext started") case ACTIVE => @@ -682,6 +682,8 @@ class StreamingContext private[streaming] ( logWarning("StreamingContext has already been stopped") case ACTIVE => scheduler.stop(stopGracefully) + // Removing the streamingSource to de-register the metrics on stop() + env.metricsSystem.removeSource(streamingSource) uiTab.foreach(_.detach()) StreamingContext.setActiveContext(null) waiter.notifyStop() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index 56b4ce5638a51..289a159d8990a 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -20,20 +20,23 @@ package org.apache.spark.streaming import java.io.{File, NotSerializableException} import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.Queue import org.apache.commons.io.FileUtils +import org.scalatest.{Assertions, BeforeAndAfter, PrivateMethodTester} import org.scalatest.concurrent.Eventually._ import org.scalatest.concurrent.Timeouts import org.scalatest.exceptions.TestFailedDueToTimeoutException import org.scalatest.time.SpanSugar._ -import org.scalatest.{Assertions, BeforeAndAfter} +import org.apache.spark.{Logging, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.metrics.MetricsSystem +import org.apache.spark.metrics.source.Source import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.util.Utils -import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, SparkFunSuite} class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeouts with Logging { @@ -299,6 +302,25 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo Thread.sleep(100) } + test ("registering and de-registering of streamingSource") { + val conf = new SparkConf().setMaster(master).setAppName(appName) + ssc = new StreamingContext(conf, batchDuration) + assert(ssc.getState() === StreamingContextState.INITIALIZED) + addInputStream(ssc).register() + ssc.start() + + val sources = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSource = StreamingContextSuite.getStreamingSource(ssc) + assert(sources.contains(streamingSource)) + assert(ssc.getState() === StreamingContextState.ACTIVE) + + ssc.stop() + val sourcesAfterStop = StreamingContextSuite.getSources(ssc.env.metricsSystem) + val streamingSourceAfterStop = StreamingContextSuite.getStreamingSource(ssc) + assert(ssc.getState() === StreamingContextState.STOPPED) + assert(!sourcesAfterStop.contains(streamingSourceAfterStop)) + } + test("awaitTermination") { ssc = new StreamingContext(master, appName, batchDuration) val inputStream = addInputStream(ssc) @@ -811,3 +833,18 @@ package object testPackage extends Assertions { } } } + +/** + * Helper methods for testing StreamingContextSuite + * This includes methods to access private methods and fields in StreamingContext and MetricsSystem + */ +private object StreamingContextSuite extends PrivateMethodTester { + private val _sources = PrivateMethod[ArrayBuffer[Source]]('sources) + private def getSources(metricsSystem: MetricsSystem): ArrayBuffer[Source] = { + metricsSystem.invokePrivate(_sources()) + } + private val _streamingSource = PrivateMethod[StreamingSource]('streamingSource) + private def getStreamingSource(streamingContext: StreamingContext): StreamingSource = { + streamingContext.invokePrivate(_streamingSource()) + } +} From dce68ad1a0da5580179d1300d4262b9648babcda Mon Sep 17 00:00:00 2001 From: zhaishidan Date: Tue, 14 Jul 2015 08:54:30 +0100 Subject: [PATCH 2/5] [SPARK-9010] [DOCUMENTATION] Improve the Spark Configuration document about `spark.kryoserializer.buffer` The meaning of spark.kryoserializer.buffer should be "Initial size of Kryo's serialization buffer. Note that there will be one buffer per core on each worker. This buffer will grow up to spark.kryoserializer.buffer.max if needed.". The spark.kryoserializer.buffer.max.mb is out-of-date in spark 1.4. Author: zhaishidan Closes #7393 from stanzhai/master and squashes the following commits: 69729ef [zhaishidan] fix document error about spark.kryoserializer.buffer.max.mb (cherry picked from commit c1feebd8fcba985667db8ccdafd2b5ec76dcfae7) Signed-off-by: Sean Owen --- docs/configuration.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/configuration.md b/docs/configuration.md index 19f3b7eaf7748..e60b0f550eefc 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -665,7 +665,7 @@ Apart from these, the following properties are also available, and may be useful Initial size of Kryo's serialization buffer. Note that there will be one buffer per core on each worker. This buffer will grow up to - spark.kryoserializer.buffer.max.mb if needed. + spark.kryoserializer.buffer.max if needed. From 109399223128b3f8918c524f611521cf6437291c Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 15 Jul 2015 17:30:57 +0900 Subject: [PATCH 3/5] [SPARK-9012] [WEBUI] Escape Accumulators in the task table If running the following codes, the task table will be broken because accumulators aren't escaped. ``` val a = sc.accumulator(1, "") sc.parallelize(1 to 10).foreach(i => a += i) ``` Before this fix, screen shot 2015-07-13 at 8 02 44 pm After this fix, screen shot 2015-07-13 at 8 14 32 pm Author: zsxwing Closes #7369 from zsxwing/SPARK-9012 and squashes the following commits: a83c9b6 [zsxwing] Escape Accumulators in the task table (cherry picked from commit adb33d3665770daf2ccb8915d19e198be9dc3b47) Signed-off-by: Kousuke Saruta --- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index b83a49f79c8a8..7b94eacdee36c 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -684,7 +684,9 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { val gettingResultTime = getGettingResultTime(info) val maybeAccumulators = info.accumulables - val accumulatorsReadable = maybeAccumulators.map{acc => s"${acc.name}: ${acc.update.get}"} + val accumulatorsReadable = maybeAccumulators.map { acc => + StringEscapeUtils.escapeHtml4(s"${acc.name}: ${acc.update.get}") + } val maybeInput = metrics.flatMap(_.inputMetrics) val inputSortable = maybeInput.map(_.bytesRead.toString).getOrElse("") From 5b5693d2feb95cc2478581b148f3dcaa88d9af4d Mon Sep 17 00:00:00 2001 From: Shuo Xiang Date: Wed, 15 Jul 2015 12:10:53 -0700 Subject: [PATCH 4/5] [SPARK-7555] [DOCS] Add doc for elastic net in ml-guide and mllib-guide jkbradley I put the elastic net under the **Algorithm guide** section. Also add the formula of elastic net in mllib-linear `mllib-linear-methods#regularizers`. dbtsai I left the code tab for you to add example code. Do you think it is the right place? Author: Shuo Xiang Closes #6504 from coderxiang/elasticnet and squashes the following commits: f6061ee [Shuo Xiang] typo 90a7c88 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into elasticnet 0610a36 [Shuo Xiang] move out the elastic net to ml-linear-methods 8747190 [Shuo Xiang] merge master 706d3f7 [Shuo Xiang] add python code 9bc2b4c [Shuo Xiang] typo db32a60 [Shuo Xiang] java code sample aab3b3a [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into elasticnet a0dae07 [Shuo Xiang] simplify code d8616fd [Shuo Xiang] Update the definition of elastic net. Add scala code; Mention Lasso and Ridge df5bd14 [Shuo Xiang] use wikipeida page in ml-linear-methods.md 78d9366 [Shuo Xiang] address comments 8ce37c2 [Shuo Xiang] Merge branch 'elasticnet' of github.com:coderxiang/spark into elasticnet 8f24848 [Shuo Xiang] Merge branch 'elastic-net-doc' of github.com:coderxiang/spark into elastic-net-doc 998d766 [Shuo Xiang] Merge branch 'elastic-net-doc' of github.com:coderxiang/spark into elastic-net-doc 89f10e4 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into elastic-net-doc 9262a72 [Shuo Xiang] update 7e07d12 [Shuo Xiang] update b32f21a [Shuo Xiang] add doc for elastic net in sparkml 937eef1 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' into elastic-net-doc 180b496 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' aa0717d [Shuo Xiang] Merge remote-tracking branch 'upstream/master' 5f109b4 [Shuo Xiang] Merge remote-tracking branch 'upstream/master' c5c5bfe [Shuo Xiang] Merge remote-tracking branch 'upstream/master' 98804c9 [Shuo Xiang] fix bug in topBykey and update test (cherry picked from commit 303c1201c468d360a5f600ce37b8bee75a77a0e6) Signed-off-by: Joseph K. Bradley --- docs/ml-guide.md | 31 +++++++++ docs/ml-linear-methods.md | 129 +++++++++++++++++++++++++++++++++++ docs/mllib-linear-methods.md | 53 +++++++------- 3 files changed, 188 insertions(+), 25 deletions(-) create mode 100644 docs/ml-linear-methods.md diff --git a/docs/ml-guide.md b/docs/ml-guide.md index c74cb1f1ef8ea..8c46adf256a9a 100644 --- a/docs/ml-guide.md +++ b/docs/ml-guide.md @@ -3,6 +3,24 @@ layout: global title: Spark ML Programming Guide --- +`\[ +\newcommand{\R}{\mathbb{R}} +\newcommand{\E}{\mathbb{E}} +\newcommand{\x}{\mathbf{x}} +\newcommand{\y}{\mathbf{y}} +\newcommand{\wv}{\mathbf{w}} +\newcommand{\av}{\mathbf{\alpha}} +\newcommand{\bv}{\mathbf{b}} +\newcommand{\N}{\mathbb{N}} +\newcommand{\id}{\mathbf{I}} +\newcommand{\ind}{\mathbf{1}} +\newcommand{\0}{\mathbf{0}} +\newcommand{\unit}{\mathbf{e}} +\newcommand{\one}{\mathbf{1}} +\newcommand{\zero}{\mathbf{0}} +\]` + + Spark 1.2 introduced a new package called `spark.ml`, which aims to provide a uniform set of high-level APIs that help users create and tune practical machine learning pipelines. @@ -154,6 +172,19 @@ Parameters belong to specific instances of `Estimator`s and `Transformer`s. For example, if we have two `LogisticRegression` instances `lr1` and `lr2`, then we can build a `ParamMap` with both `maxIter` parameters specified: `ParamMap(lr1.maxIter -> 10, lr2.maxIter -> 20)`. This is useful if there are two algorithms with the `maxIter` parameter in a `Pipeline`. +# Algorithm Guides + +There are now several algorithms in the Pipelines API which are not in the lower-level MLlib API, so we link to documentation for them here. These algorithms are mostly feature transformers, which fit naturally into the `Transformer` abstraction in Pipelines, and ensembles, which fit naturally into the `Estimator` abstraction in the Pipelines. + +**Pipelines API Algorithm Guides** + +* [Feature Extraction, Transformation, and Selection](ml-features.html) +* [Ensembles](ml-ensembles.html) + +**Algorithms in `spark.ml`** + +* [Linear methods with elastic net regularization](ml-linear-methods.html) + # Code Examples This section gives code examples illustrating the functionality discussed above. diff --git a/docs/ml-linear-methods.md b/docs/ml-linear-methods.md new file mode 100644 index 0000000000000..1ac83d94c9e81 --- /dev/null +++ b/docs/ml-linear-methods.md @@ -0,0 +1,129 @@ +--- +layout: global +title: Linear Methods - ML +displayTitle: ML - Linear Methods +--- + + +`\[ +\newcommand{\R}{\mathbb{R}} +\newcommand{\E}{\mathbb{E}} +\newcommand{\x}{\mathbf{x}} +\newcommand{\y}{\mathbf{y}} +\newcommand{\wv}{\mathbf{w}} +\newcommand{\av}{\mathbf{\alpha}} +\newcommand{\bv}{\mathbf{b}} +\newcommand{\N}{\mathbb{N}} +\newcommand{\id}{\mathbf{I}} +\newcommand{\ind}{\mathbf{1}} +\newcommand{\0}{\mathbf{0}} +\newcommand{\unit}{\mathbf{e}} +\newcommand{\one}{\mathbf{1}} +\newcommand{\zero}{\mathbf{0}} +\]` + + +In MLlib, we implement popular linear methods such as logistic regression and linear least squares with L1 or L2 regularization. Refer to [the linear methods in mllib](mllib-linear-methods.html) for details. In `spark.ml`, we also include Pipelines API for [Elastic net](http://en.wikipedia.org/wiki/Elastic_net_regularization), a hybrid of L1 and L2 regularization proposed in [this paper](http://users.stat.umn.edu/~zouxx019/Papers/elasticnet.pdf). Mathematically it is defined as a linear combination of the L1-norm and the L2-norm: +`\[ +\alpha \|\wv\|_1 + (1-\alpha) \frac{1}{2}\|\wv\|_2^2, \alpha \in [0, 1]. +\]` +By setting $\alpha$ properly, it contains both L1 and L2 regularization as special cases. For example, if a [linear regression](https://en.wikipedia.org/wiki/Linear_regression) model is trained with the elastic net parameter $\alpha$ set to $1$, it is equivalent to a [Lasso](http://en.wikipedia.org/wiki/Least_squares#Lasso_method) model. On the other hand, if $\alpha$ is set to $0$, the trained model reduces to a [ridge regression](http://en.wikipedia.org/wiki/Tikhonov_regularization) model. We implement Pipelines API for both linear regression and logistic regression with elastic net regularization. + +**Examples** + +
+ +
+ +{% highlight scala %} + +import org.apache.spark.ml.classification.LogisticRegression +import org.apache.spark.mllib.util.MLUtils + +// Load training data +val training = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF() + +val lr = new LogisticRegression() + .setMaxIter(10) + .setRegParam(0.3) + .setElasticNetParam(0.8) + +// Fit the model +val lrModel = lr.fit(training) + +// Print the weights and intercept for logistic regression +println(s"Weights: ${lrModel.weights} Intercept: ${lrModel.intercept}") + +{% endhighlight %} + +
+ +
+ +{% highlight java %} + +import org.apache.spark.ml.classification.LogisticRegression; +import org.apache.spark.ml.classification.LogisticRegressionModel; +import org.apache.spark.mllib.regression.LabeledPoint; +import org.apache.spark.mllib.util.MLUtils; +import org.apache.spark.SparkConf; +import org.apache.spark.SparkContext; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.SQLContext; + +public class LogisticRegressionWithElasticNetExample { + public static void main(String[] args) { + SparkConf conf = new SparkConf() + .setAppName("Logistic Regression with Elastic Net Example"); + + SparkContext sc = new SparkContext(conf); + SQLContext sql = new SQLContext(sc); + String path = "sample_libsvm_data.txt"; + + // Load training data + DataFrame training = sql.createDataFrame(MLUtils.loadLibSVMFile(sc, path).toJavaRDD(), LabeledPoint.class); + + LogisticRegression lr = new LogisticRegression() + .setMaxIter(10) + .setRegParam(0.3) + .setElasticNetParam(0.8) + + // Fit the model + LogisticRegressionModel lrModel = lr.fit(training); + + // Print the weights and intercept for logistic regression + System.out.println("Weights: " + lrModel.weights() + " Intercept: " + lrModel.intercept()); + } +} +{% endhighlight %} +
+ +
+ +{% highlight python %} + +from pyspark.ml.classification import LogisticRegression +from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.util import MLUtils + +# Load training data +training = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt").toDF() + +lr = LogisticRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8) + +# Fit the model +lrModel = lr.fit(training) + +# Print the weights and intercept for logistic regression +print("Weights: " + str(lrModel.weights)) +print("Intercept: " + str(lrModel.intercept)) +{% endhighlight %} + +
+ +
+ +### Optimization + +The optimization algorithm underlies the implementation is called [Orthant-Wise Limited-memory QuasiNewton](http://research-srv.microsoft.com/en-us/um/people/jfgao/paper/icml07scalable.pdf) +(OWL-QN). It is an extension of L-BFGS that can effectively handle L1 regularization and elastic net. diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md index 3dc8cc902fa72..c73e12a574f7a 100644 --- a/docs/mllib-linear-methods.md +++ b/docs/mllib-linear-methods.md @@ -10,7 +10,7 @@ displayTitle: MLlib - Linear Methods `\[ \newcommand{\R}{\mathbb{R}} -\newcommand{\E}{\mathbb{E}} +\newcommand{\E}{\mathbb{E}} \newcommand{\x}{\mathbf{x}} \newcommand{\y}{\mathbf{y}} \newcommand{\wv}{\mathbf{w}} @@ -18,10 +18,10 @@ displayTitle: MLlib - Linear Methods \newcommand{\bv}{\mathbf{b}} \newcommand{\N}{\mathbb{N}} \newcommand{\id}{\mathbf{I}} -\newcommand{\ind}{\mathbf{1}} -\newcommand{\0}{\mathbf{0}} -\newcommand{\unit}{\mathbf{e}} -\newcommand{\one}{\mathbf{1}} +\newcommand{\ind}{\mathbf{1}} +\newcommand{\0}{\mathbf{0}} +\newcommand{\unit}{\mathbf{e}} +\newcommand{\one}{\mathbf{1}} \newcommand{\zero}{\mathbf{0}} \]` @@ -29,7 +29,7 @@ displayTitle: MLlib - Linear Methods Many standard *machine learning* methods can be formulated as a convex optimization problem, i.e. the task of finding a minimizer of a convex function `$f$` that depends on a variable vector -`$\wv$` (called `weights` in the code), which has `$d$` entries. +`$\wv$` (called `weights` in the code), which has `$d$` entries. Formally, we can write this as the optimization problem `$\min_{\wv \in\R^d} \; f(\wv)$`, where the objective function is of the form `\begin{equation} @@ -39,7 +39,7 @@ the objective function is of the form \ . \end{equation}` Here the vectors `$\x_i\in\R^d$` are the training data examples, for `$1\le i\le n$`, and -`$y_i\in\R$` are their corresponding labels, which we want to predict. +`$y_i\in\R$` are their corresponding labels, which we want to predict. We call the method *linear* if $L(\wv; \x, y)$ can be expressed as a function of $\wv^T x$ and $y$. Several of MLlib's classification and regression algorithms fall into this category, and are discussed here. @@ -99,6 +99,9 @@ regularizers in MLlib:
+ + +
L1$\|\wv\|_1$$\mathrm{sign}(\wv)$
elastic net$\alpha \|\wv\|_1 + (1-\alpha)\frac{1}{2}\|\wv\|_2^2$$\alpha \mathrm{sign}(\wv) + (1-\alpha) \wv$
@@ -107,7 +110,7 @@ of `$\wv$`. L2-regularized problems are generally easier to solve than L1-regularized due to smoothness. However, L1 regularization can help promote sparsity in weights leading to smaller and more interpretable models, the latter of which can be useful for feature selection. -It is not recommended to train models without any regularization, +[Elastic net](http://en.wikipedia.org/wiki/Elastic_net_regularization) is a combination of L1 and L2 regularization. It is not recommended to train models without any regularization, especially when the number of training examples is small. ### Optimization @@ -527,7 +530,7 @@ print("Training Error = " + str(trainErr)) ### Linear least squares, Lasso, and ridge regression -Linear least squares is the most common formulation for regression problems. +Linear least squares is the most common formulation for regression problems. It is a linear method as described above in equation `$\eqref{eq:regPrimal}$`, with the loss function in the formulation given by the squared loss: `\[ @@ -535,8 +538,8 @@ L(\wv;\x,y) := \frac{1}{2} (\wv^T \x - y)^2. \]` Various related regression methods are derived by using different types of regularization: -[*ordinary least squares*](http://en.wikipedia.org/wiki/Ordinary_least_squares) or -[*linear least squares*](http://en.wikipedia.org/wiki/Linear_least_squares_(mathematics)) uses +[*ordinary least squares*](http://en.wikipedia.org/wiki/Ordinary_least_squares) or +[*linear least squares*](http://en.wikipedia.org/wiki/Linear_least_squares_(mathematics)) uses no regularization; [*ridge regression*](http://en.wikipedia.org/wiki/Ridge_regression) uses L2 regularization; and [*Lasso*](http://en.wikipedia.org/wiki/Lasso_(statistics)) uses L1 regularization. For all of these models, the average loss or training error, $\frac{1}{n} \sum_{i=1}^n (\wv^T x_i - y_i)^2$, is @@ -548,7 +551,7 @@ known as the [mean squared error](http://en.wikipedia.org/wiki/Mean_squared_erro
The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. -The example then uses LinearRegressionWithSGD to build a simple linear model to predict label +The example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We compute the mean squared error at the end to evaluate [goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit). @@ -610,7 +613,7 @@ public class LinearRegression { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("Linear Regression Example"); JavaSparkContext sc = new JavaSparkContext(conf); - + // Load and parse the data String path = "data/mllib/ridge-data/lpsa.data"; JavaRDD data = sc.textFile(path); @@ -630,7 +633,7 @@ public class LinearRegression { // Building the model int numIterations = 100; - final LinearRegressionModel model = + final LinearRegressionModel model = LinearRegressionWithSGD.train(JavaRDD.toRDD(parsedData), numIterations); // Evaluate model on training examples and compute training error @@ -661,7 +664,7 @@ public class LinearRegression {
The following example demonstrate how to load training data, parse it as an RDD of LabeledPoint. -The example then uses LinearRegressionWithSGD to build a simple linear model to predict label +The example then uses LinearRegressionWithSGD to build a simple linear model to predict label values. We compute the mean squared error at the end to evaluate [goodness of fit](http://en.wikipedia.org/wiki/Goodness_of_fit). @@ -698,8 +701,8 @@ a dependency. ###Streaming linear regression -When data arrive in a streaming fashion, it is useful to fit regression models online, -updating the parameters of the model as new data arrives. MLlib currently supports +When data arrive in a streaming fashion, it is useful to fit regression models online, +updating the parameters of the model as new data arrives. MLlib currently supports streaming linear regression using ordinary least squares. The fitting is similar to that performed offline, except fitting occurs on each batch of data, so that the model continually updates to reflect the data from the stream. @@ -714,7 +717,7 @@ online to the first stream, and make predictions on the second stream.
-First, we import the necessary classes for parsing our input data and creating the model. +First, we import the necessary classes for parsing our input data and creating the model. {% highlight scala %} @@ -726,7 +729,7 @@ import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD Then we make input streams for training and testing data. We assume a StreamingContext `ssc` has already been created, see [Spark Streaming Programming Guide](streaming-programming-guide.html#initializing) -for more info. For this example, we use labeled points in training and testing streams, +for more info. For this example, we use labeled points in training and testing streams, but in practice you will likely want to use unlabeled vectors for test data. {% highlight scala %} @@ -746,7 +749,7 @@ val model = new StreamingLinearRegressionWithSGD() {% endhighlight %} -Now we register the streams for training and testing and start the job. +Now we register the streams for training and testing and start the job. Printing predictions alongside true labels lets us easily see the result. {% highlight scala %} @@ -756,14 +759,14 @@ model.predictOnValues(testData.map(lp => (lp.label, lp.features))).print() ssc.start() ssc.awaitTermination() - + {% endhighlight %} We can now save text files with data to the training or testing folders. -Each line should be a data point formatted as `(y,[x1,x2,x3])` where `y` is the label -and `x1,x2,x3` are the features. Anytime a text file is placed in `/training/data/dir` -the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions. -As you feed more data to the training directory, the predictions +Each line should be a data point formatted as `(y,[x1,x2,x3])` where `y` is the label +and `x1,x2,x3` are the features. Anytime a text file is placed in `/training/data/dir` +the model will update. Anytime a text file is placed in `/testing/data/dir` you will see predictions. +As you feed more data to the training directory, the predictions will get better!
From bb14015079903cff69f05b414efaef08e568feec Mon Sep 17 00:00:00 2001 From: KaiXinXiaoLei Date: Wed, 15 Jul 2015 22:31:10 +0100 Subject: [PATCH 5/5] [SPARK-8974] Catch exceptions in allocation schedule task. I meet a problem. When I submit some tasks, the thread spark-dynamic-executor-allocation should seed the message about "requestTotalExecutors", and the new executor should start. But I meet a problem about this thread, like: 2015-07-14 19:02:17,461 | WARN | [spark-dynamic-executor-allocation] | Error sending message [message = RequestExecutors(1)] in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) at org.apache.spark.scheduler.cluster.YarnSchedulerBackend.doRequestTotalExecutors(YarnSchedulerBackend.scala:57) at org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.requestTotalExecutors(CoarseGrainedSchedulerBackend.scala:351) at org.apache.spark.SparkContext.requestTotalExecutors(SparkContext.scala:1382) at org.apache.spark.ExecutorAllocationManager.addExecutors(ExecutorAllocationManager.scala:343) at org.apache.spark.ExecutorAllocationManager.updateAndSyncNumExecutorsTarget(ExecutorAllocationManager.scala:295) at org.apache.spark.ExecutorAllocationManager.org$apache$spark$ExecutorAllocationManager$$schedule(ExecutorAllocationManager.scala:248) when after some minutes, I find a new ApplicationMaster start, and tasks submitted start to run. The tasks Completed. And after long time (eg, ten minutes), the number of executor does not reduce to zero. I use the default value of "spark.dynamicAllocation.minExecutors". Author: KaiXinXiaoLei Closes #7352 from KaiXinXiaoLei/dym and squashes the following commits: 3603631 [KaiXinXiaoLei] change logError to logWarning efc4f24 [KaiXinXiaoLei] change file (cherry picked from commit 674eb2a4c3ff595760f990daf369ba75d2547593) Signed-off-by: Sean Owen --- .../org/apache/spark/ExecutorAllocationManager.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index b3a1bb7e0f9ad..80c94b800c140 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -20,6 +20,7 @@ package org.apache.spark import java.util.concurrent.TimeUnit import scala.collection.mutable +import scala.util.control.ControlThrowable import com.codahale.metrics.{Gauge, MetricRegistry} @@ -204,7 +205,16 @@ private[spark] class ExecutorAllocationManager( listenerBus.addListener(listener) val scheduleTask = new Runnable() { - override def run(): Unit = Utils.logUncaughtExceptions(schedule()) + override def run(): Unit = { + try { + schedule() + } catch { + case ct: ControlThrowable => + throw ct + case t: Throwable => + logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + } + } } executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS) }