From cd0629f13014e947f35268c6204d59221e0a7e0b Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Wed, 9 Apr 2014 14:15:51 -0700 Subject: [PATCH] code refactoring and adding test --- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../spark/scheduler/LiveListenerBus.scala | 31 ++++++------ .../spark/scheduler/SparkListenerSuite.scala | 49 +++++++++++++++++++ .../apache/spark/examples/SparkHdfsLR.scala | 1 - 4 files changed, 66 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2ad8d4020bf72..af7a1345ca034 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -882,14 +882,14 @@ class SparkContext( metadataCleaner.cancel() cleaner.foreach(_.stop()) dagSchedulerCopy.stop() - listenerBus.stop() - eventLogger.foreach(_.stop()) taskScheduler = null // TODO: Cache.stop()? env.stop() SparkEnv.set(null) ShuffleMapTask.clearCache() ResultTask.clearCache() + listenerBus.stop() + eventLogger.foreach(_.stop()) logInfo("Successfully stopped SparkContext") } else { logInfo("SparkContext already stopped") diff --git a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala index 901ad237886ff..a4901c89624d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/LiveListenerBus.scala @@ -36,7 +36,19 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](EVENT_QUEUE_CAPACITY) private var queueFullErrorMessageLogged = false private var started = false - private var sparkListenerBus: Option[Thread] = _ + private val listenerThread = new Thread("SparkListenerBus") { + setDaemon(true) + override def run() { + while (true) { + val event = eventQueue.take + if (event == SparkListenerShutdown) { + // Get out of the while loop and shutdown the daemon thread + return + } + postToAll(event) + } + } + } /** * Start sending events to attached listeners. @@ -49,21 +61,8 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { if (started) { throw new IllegalStateException("Listener bus already started!") } + listenerThread.start() started = true - sparkListenerBus = Some(new Thread("SparkListenerBus") { - setDaemon(true) - override def run() { - while (true) { - val event = eventQueue.take - if (event == SparkListenerShutdown) { - // Get out of the while loop and shutdown the daemon thread - return - } - postToAll(event) - } - } - }) - sparkListenerBus.foreach(_.start()) } def post(event: SparkListenerEvent) { @@ -99,6 +98,6 @@ private[spark] class LiveListenerBus extends SparkListenerBus with Logging { throw new IllegalStateException("Attempted to stop a listener bus that has not yet started!") } post(SparkListenerShutdown) - sparkListenerBus.foreach(_.join()) + listenerThread.join() } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 7c843772bc2e0..acf45d07bdfa9 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.concurrent.Semaphore + import scala.collection.mutable import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} @@ -72,6 +74,53 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with ShouldMatc } } + test("bus.stop() waits for the event queue to completely drain") { + @volatile var drained = false + + class BlockingListener(cond: AnyRef) extends SparkListener { + override def onJobEnd(jobEnd: SparkListenerJobEnd) = { + cond.synchronized { cond.wait() } + drained = true + } + } + + val bus = new LiveListenerBus + val blockingListener = new BlockingListener(bus) + val sem = new Semaphore(0) + + bus.addListener(blockingListener) + bus.post(SparkListenerJobEnd(0, JobSucceeded)) + bus.start() + // the queue should not drain immediately + assert(!drained) + + new Thread("ListenerBusStopper") { + override def run() { + // stop() would block until notify() is called below + bus.stop() + sem.release() + } + }.start() + + val startTime = System.currentTimeMillis() + val waitTime = 100 + var done = false + while (!done) { + if (System.currentTimeMillis() > startTime + waitTime) { + bus.synchronized { + bus.notify() + } + done = true + } else { + Thread.sleep(10) + // bus.stop() should wait until the event queue is drained + assert(!drained) + } + } + sem.acquire() + assert(drained) + } + test("basic creation of StageInfo") { val listener = new SaveStageAndTaskInfo sc.addSparkListener(listener) diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala index cdbbbcbec0c66..038afbcba80a3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkHdfsLR.scala @@ -74,6 +74,5 @@ object SparkHdfsLR { println("Final w: " + w) sc.stop() - System.exit(0) } }