Skip to content

Commit

Permalink
[SPARK-22850][core] Ensure queued events are delivered to all event q…
Browse files Browse the repository at this point in the history
…ueues.

The code in LiveListenerBus was queueing events before start in the
queues themselves; so in situations like the following:

   bus.post(someEvent)
   bus.addToEventLogQueue(listener)
   bus.start()

"someEvent" would not be delivered to "listener" if that was the first
listener in the queue.

This change buffers the events before starting the bus in the bus itself,
so that they can be delivered to all registered queues when the bus is
started.

Also tweaked the unit tests to cover the behavior above.
  • Loading branch information
Marcelo Vanzin committed Dec 20, 2017
1 parent 7570eab commit 80b900a
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) {

private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()

// Visible for testing.
private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()

/** Add a listener to queue shared by all non-internal listeners. */
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, SHARED_QUEUE)
Expand Down Expand Up @@ -124,13 +127,19 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
}

/** Post an event to all queues. */
def post(event: SparkListenerEvent): Unit = {
if (!stopped.get()) {
metrics.numEventsPosted.inc()
def post(event: SparkListenerEvent): Unit = synchronized {
if (stopped.get()) {
return
}

metrics.numEventsPosted.inc()
if (started.get()) {
val it = queues.iterator()
while (it.hasNext()) {
it.next().post(event)
}
} else {
queuedEvents += event
}
}

Expand All @@ -149,7 +158,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
}

this.sparkContext = sc
queues.asScala.foreach(_.start(sc))
queues.asScala.foreach { q =>
q.start(sc)
queuedEvents.foreach(q.post)
}
queuedEvents = null
metricsSystem.registerSource(metrics)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
}

private def queueSize(bus: LiveListenerBus): Int = {
private def sharedQueueSize(bus: LiveListenerBus): Int = {
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
.asInstanceOf[Int]
}
Expand All @@ -73,12 +73,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val conf = new SparkConf()
val counter = new BasicJobCounter
val bus = new LiveListenerBus(conf)
bus.addToSharedQueue(counter)

// Metrics are initially empty.
assert(bus.metrics.numEventsPosted.getCount === 0)
assert(numDroppedEvents(bus) === 0)
assert(queueSize(bus) === 0)
assert(bus.queuedEvents.size === 0)
assert(eventProcessingTimeCount(bus) === 0)

// Post five events:
Expand All @@ -87,17 +86,23 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Five messages should be marked as received and queued, but no messages should be posted to
// listeners yet because the the listener bus hasn't been started.
assert(bus.metrics.numEventsPosted.getCount === 5)
assert(queueSize(bus) === 5)
assert(bus.queuedEvents.size === 5)

// Add the counter to the bus after messages have been queued for later delivery.
bus.addToSharedQueue(counter)
assert(counter.count === 0)

// Starting listener bus should flush all buffered events
bus.start(mockSparkContext, mockMetricsSystem)
Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)
assert(queueSize(bus) === 0)
assert(sharedQueueSize(bus) === 0)
assert(eventProcessingTimeCount(bus) === 5)

// After the bus is started, there should be no more queued events.
assert(bus.queuedEvents === null)

// After listener bus has stopped, posting events should not increment counter
bus.stop()
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
Expand Down Expand Up @@ -188,18 +193,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Post a message to the listener bus and wait for processing to begin:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
listenerStarted.acquire()
assert(queueSize(bus) === 0)
assert(sharedQueueSize(bus) === 0)
assert(numDroppedEvents(bus) === 0)

// If we post an additional message then it should remain in the queue because the listener is
// busy processing the first event:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
assert(queueSize(bus) === 1)
assert(sharedQueueSize(bus) === 1)
assert(numDroppedEvents(bus) === 0)

// The queue is now full, so any additional events posted to the listener will be dropped:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
assert(queueSize(bus) === 1)
assert(sharedQueueSize(bus) === 1)
assert(numDroppedEvents(bus) === 1)

// Allow the the remaining events to be processed so we can stop the listener bus:
Expand Down

0 comments on commit 80b900a

Please sign in to comment.