Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22850][core] Ensure queued events are delivered to all event queues. #20039

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ private[spark] class LiveListenerBus(conf: SparkConf) {

private val queues = new CopyOnWriteArrayList[AsyncEventQueue]()

// Visible for testing.
private[scheduler] var queuedEvents = new mutable.ListBuffer[SparkListenerEvent]()

/** Add a listener to queue shared by all non-internal listeners. */
def addToSharedQueue(listener: SparkListenerInterface): Unit = {
addToQueue(listener, SHARED_QUEUE)
Expand Down Expand Up @@ -124,13 +127,19 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
}

/** Post an event to all queues. */
def post(event: SparkListenerEvent): Unit = {
if (!stopped.get()) {
metrics.numEventsPosted.inc()
def post(event: SparkListenerEvent): Unit = synchronized {
if (stopped.get()) {
return
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this and the original way

if (!stopped.get()) {  
...
}

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It avoids having to indent the rest of the code.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is synchronized needed only due to the modification of queuedEvents?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It's needed because you have to make sure that when the buffered events are posted to the queues in start(), this method cannot be called. Otherwise you need to have a contract that this class is not thread-safe until after start() is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see if I can rearrange things to avoid the extra synchronization...


metrics.numEventsPosted.inc()
if (started.get()) {
val it = queues.iterator()
while (it.hasNext()) {
it.next().post(event)
}
} else {
queuedEvents += event
}
}

Expand All @@ -149,7 +158,11 @@ private[spark] class LiveListenerBus(conf: SparkConf) {
}

this.sparkContext = sc
queues.asScala.foreach(_.start(sc))
queues.asScala.foreach { q =>
q.start(sc)
queuedEvents.foreach(q.post)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q.start(sc)
queuedEvents.foreach(q.post)

Ummm... In my opinion, exchange these two lines sequence would be better for following the original logic of events buffered before a queue calls start(). So, queuedEvents post to queues first before queues start would be unified logically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That really does not make any difference in behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if stop() called before all queuedEvents post to AsyncEventQueue?

/**
   * Stop the listener bus. It will wait until the queued events have been processed, but new
   * events will be dropped.
   */

(the "queued events" mentioned in description above is not equal to "queuedEvents" here.)

As queuedEvents "post" before listeners install, so, can they be treated as new events?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both start() and stop() are synchronized so they can't be interleaved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LiveListener's stop() is not synchronized completely. And LiveListener's stopped variable could be set "true" in stop() (before the synchronize body) while start() is calling. Do I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, but the interleaving you were worried about specifically above isn't possible because of the synchronized blocks.

Yes, you could get one line into start(), switch to stop() and get a few lines in ... but then stop() gets blocked on start() anyway. Sure, that means that while start() is running, we also have stopped == true ... but so what? More post() calls will be no-ops. start() will start the queues and post all buffered events, then release the lock and stop() will stop all the queues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, perfect explanation. Thanks.

}
queuedEvents = null
metricsSystem.registerSource(metrics)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
bus.metrics.metricRegistry.counter(s"queue.$SHARED_QUEUE.numDroppedEvents").getCount
}

private def queueSize(bus: LiveListenerBus): Int = {
private def sharedQueueSize(bus: LiveListenerBus): Int = {
bus.metrics.metricRegistry.getGauges().get(s"queue.$SHARED_QUEUE.size").getValue()
.asInstanceOf[Int]
}
Expand All @@ -73,12 +73,11 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
val conf = new SparkConf()
val counter = new BasicJobCounter
val bus = new LiveListenerBus(conf)
bus.addToSharedQueue(counter)

// Metrics are initially empty.
assert(bus.metrics.numEventsPosted.getCount === 0)
assert(numDroppedEvents(bus) === 0)
assert(queueSize(bus) === 0)
assert(bus.queuedEvents.size === 0)
assert(eventProcessingTimeCount(bus) === 0)

// Post five events:
Expand All @@ -87,17 +86,23 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Five messages should be marked as received and queued, but no messages should be posted to
// listeners yet because the the listener bus hasn't been started.
assert(bus.metrics.numEventsPosted.getCount === 5)
assert(queueSize(bus) === 5)
assert(bus.queuedEvents.size === 5)

// Add the counter to the bus after messages have been queued for later delivery.
bus.addToSharedQueue(counter)
assert(counter.count === 0)

// Starting listener bus should flush all buffered events
bus.start(mockSparkContext, mockMetricsSystem)
Mockito.verify(mockMetricsSystem).registerSource(bus.metrics)
bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(counter.count === 5)
assert(queueSize(bus) === 0)
assert(sharedQueueSize(bus) === 0)
assert(eventProcessingTimeCount(bus) === 5)

// After the bus is started, there should be no more queued events.
assert(bus.queuedEvents === null)

// After listener bus has stopped, posting events should not increment counter
bus.stop()
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
Expand Down Expand Up @@ -188,18 +193,18 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match
// Post a message to the listener bus and wait for processing to begin:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
listenerStarted.acquire()
assert(queueSize(bus) === 0)
assert(sharedQueueSize(bus) === 0)
assert(numDroppedEvents(bus) === 0)

// If we post an additional message then it should remain in the queue because the listener is
// busy processing the first event:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
assert(queueSize(bus) === 1)
assert(sharedQueueSize(bus) === 1)
assert(numDroppedEvents(bus) === 0)

// The queue is now full, so any additional events posted to the listener will be dropped:
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
assert(queueSize(bus) === 1)
assert(sharedQueueSize(bus) === 1)
assert(numDroppedEvents(bus) === 1)

// Allow the the remaining events to be processed so we can stop the listener bus:
Expand Down