Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into SPARK-1413
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Apr 10, 2014
2 parents fa69dcf + 0adc932 commit 45e5b70
Show file tree
Hide file tree
Showing 37 changed files with 88 additions and 87 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -931,7 +931,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** Shut down the SparkContext. */
def stop() {
ui.stop()
eventLogger.foreach(_.stop())
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
Expand All @@ -940,13 +939,14 @@ class SparkContext(config: SparkConf) extends Logging {
metadataCleaner.cancel()
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
listenerBus.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
ResultTask.clearCache()
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
private var started = false
private val listenerThread = new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
val event = eventQueue.take
if (event == SparkListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread
return
}
postToAll(event)
}
}
}

// Exposed for testing
@volatile private[spark] var stopCalled = false

/**
* Start sending events to attached listeners.
Expand All @@ -48,20 +64,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
if (started) {
throw new IllegalStateException("Listener bus already started!")
}
listenerThread.start()
started = true
new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
while (true) {
val event = eventQueue.take
if (event == SparkListenerShutdown) {
// Get out of the while loop and shutdown the daemon thread
return
}
postToAll(event)
}
}
}.start()
}

def post(event: SparkListenerEvent) {
Expand Down Expand Up @@ -93,9 +97,11 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging {
}

def stop() {
stopCalled = true
if (!started) {
throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!")
}
post(SparkListenerShutdown)
listenerThread.join()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.scheduler

import java.util.concurrent.Semaphore

import scala.collection.mutable

import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
Expand Down Expand Up @@ -72,6 +74,49 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc
}
}

test("bus.stop() waits for the event queue to completely drain") {
@volatile var drained = false

// Tells the listener to stop blocking
val listenerWait = new Semaphore(1)

// When stop has returned
val stopReturned = new Semaphore(1)

class BlockingListener extends SparkListener {
override def onJobEnd(jobEnd: SparkListenerJobEnd) = {
listenerWait.acquire()
drained = true
}
}

val bus = new LiveListenerBus
val blockingListener = new BlockingListener

bus.addListener(blockingListener)
bus.start()
bus.post(SparkListenerJobEnd(0, JobSucceeded))

// the queue should not drain immediately
assert(!drained)

new Thread("ListenerBusStopper") {
override def run() {
// stop() will block until notify() is called below
bus.stop()
stopReturned.release(1)
}
}.start()

while (!bus.stopCalled) {
Thread.sleep(10)
}

listenerWait.release()
stopReturned.acquire()
assert(drained)
}

test("basic creation of StageInfo") {
val listener = new SaveStageAndTaskInfo
sc.addSparkListener(listener)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,6 @@ object SparkHdfsLR {
}

println("Final w: " + w)
System.exit(0)
sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.rdd.RDD

/**
* :: DeveloperApi ::
*
* The Java stubs necessary for the Python mllib bindings.
*/
@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.spark.rdd.RDD

/**
* :: Experimental ::
*
* Model for Naive Bayes Classifiers.
*
* @param labels list of labels
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ class KMeans private (

/**
* :: Experimental ::
*
* Set the number of runs of the algorithm to execute in parallel. We initialize the algorithm
* this many times with random starting conditions (configured by the initialization mode), then
* return the best clustering found over any run. Default: 1.
Expand Down Expand Up @@ -398,9 +397,6 @@ object KMeans {
MLUtils.fastSquaredDistance(v1.vector, v1.norm, v2.vector, v2.norm)
}

/**
* :: Experimental ::
*/
@Experimental
def main(args: Array[String]) {
if (args.length < 4) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ case class MatrixEntry(i: Long, j: Long, value: Double)

/**
* :: Experimental ::
*
* Represents a matrix in coordinate format.
*
* @param entries matrix entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ import org.apache.spark.mllib.linalg.SingularValueDecomposition

/**
* :: Experimental ::
*
* Represents a row of [[org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix]].
*/
@Experimental
case class IndexedRow(index: Long, vector: Vector)

/**
* :: Experimental ::
*
* Represents a row-oriented [[org.apache.spark.mllib.linalg.distributed.DistributedMatrix]] with
* indexed rows.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.spark.Logging

/**
* :: Experimental ::
*
* Represents a row-oriented distributed Matrix with no meaningful row indices.
*
* @param rows rows stored as an RDD[Vector]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector}

/**
* :: DeveloperApi ::
*
* Class used to compute the gradient for a loss function, given a single data point.
*/
@DeveloperApi
Expand Down Expand Up @@ -56,7 +55,6 @@ abstract class Gradient extends Serializable {

/**
* :: DeveloperApi ::
*
* Compute gradient and loss for a logistic loss function, as used in binary classification.
* See also the documentation for the precise formulation.
*/
Expand Down Expand Up @@ -100,7 +98,6 @@ class LogisticGradient extends Gradient {

/**
* :: DeveloperApi ::
*
* Compute gradient and loss for a Least-squared loss function, as used in linear regression.
* This is correct for the averaged least squares loss function (mean squared error)
* L = 1/n ||A weights-y||^2
Expand Down Expand Up @@ -135,7 +132,6 @@ class LeastSquaresGradient extends Gradient {

/**
* :: DeveloperApi ::
*
* Compute gradient and loss for a Hinge loss function, as used in SVM binary classification.
* See also the documentation for the precise formulation.
* NOTE: This assumes that the labels are {0,1}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector}

/**
* :: DeveloperApi ::
*
* Class used to solve an optimization problem using Gradient Descent.
* @param gradient Gradient function to be used.
* @param updater Updater to be used to update weights after every iteration.
Expand Down Expand Up @@ -113,7 +112,6 @@ class GradientDescent(private var gradient: Gradient, private var updater: Updat

/**
* :: DeveloperApi ::
*
* Top-level method to run gradient descent.
*/
@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.mllib.linalg.Vector

/**
* :: DeveloperApi ::
*
* Trait for optimization problem solvers.
*/
@DeveloperApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.mllib.linalg.{Vectors, Vector}

/**
* :: DeveloperApi ::
*
* Class used to perform steps (weight update) using Gradient Descent methods.
*
* For general minimization problems, or for regularized problems of the form
Expand Down Expand Up @@ -64,7 +63,6 @@ abstract class Updater extends Serializable {

/**
* :: DeveloperApi ::
*
* A simple updater for gradient descent *without* any regularization.
* Uses a step-size decreasing with the square root of the number of iterations.
*/
Expand All @@ -86,7 +84,6 @@ class SimpleUpdater extends Updater {

/**
* :: DeveloperApi ::
*
* Updater for L1 regularized problems.
* R(w) = ||w||_1
* Uses a step-size decreasing with the square root of the number of iterations.
Expand Down Expand Up @@ -131,7 +128,6 @@ class L1Updater extends Updater {

/**
* :: DeveloperApi ::
*
* Updater for L2 regularized problems.
* R(w) = 1/2 ||w||^2
* Uses a step-size decreasing with the square root of the number of iterations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@ class ALS private (

/**
* :: Experimental ::
*
* Sets the constant used in computing confidence in implicit ALS. Default: 1.0.
*/
@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ class MatrixFactorizationModel(

/**
* :: DeveloperApi ::
*
* Predict the rating of many users for many products.
* This is a Java stub for python predictAll()
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,6 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]

/**
* :: Experimental ::
*
* Set if the algorithm should validate data before training. Default true.
*/
@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import org.apache.spark.mllib.linalg.{Vector, Vectors}

/**
* :: Experimental ::
*
* A class that implements a decision tree algorithm for classification and regression. It
* supports both continuous and categorical features.
* @param strategy The configuration parameters for the tree algorithm which specify the type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
*
* Enum to select the algorithm for the decision tree
*/
@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
*
* Enum to describe whether a feature is "continuous" or "categorical"
*/
@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.annotation.Experimental

/**
* :: Experimental ::
*
* Enum for selecting the quantile calculation strategy
*/
@Experimental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.apache.spark.mllib.tree.configuration.QuantileStrategy._

/**
* :: Experimental ::
*
* Stores all the configuration options for tree construction
* @param algo classification or regression
* @param impurity criterion used for information gain calculation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}

/**
* :: Experimental ::
*
* Class for calculating [[http://en.wikipedia.org/wiki/Binary_entropy_function entropy]] during
* binary classification.
*/
Expand All @@ -32,7 +31,6 @@ object Entropy extends Impurity {

/**
* :: DeveloperApi ::
*
* entropy calculation
* @param c0 count of instances with label 0
* @param c1 count of instances with label 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import org.apache.spark.annotation.{DeveloperApi, Experimental}

/**
* :: Experimental ::
*
* Class for calculating the
* [[http://en.wikipedia.org/wiki/Decision_tree_learning#Gini_impurity Gini impurity]]
* during binary classification.
Expand All @@ -31,7 +30,6 @@ object Gini extends Impurity {

/**
* :: DeveloperApi ::
*
* Gini coefficient calculation
* @param c0 count of instances with label 0
* @param c1 count of instances with label 1
Expand Down
Loading

0 comments on commit 45e5b70

Please sign in to comment.