Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22850][core] Ensure queued events are delivered to all event queues. #20039

Closed
wants to merge 2 commits into from

Conversation

vanzin
Copy link
Contributor

@vanzin vanzin commented Dec 20, 2017

The code in LiveListenerBus was queueing events before start in the
queues themselves; so in situations like the following:

bus.post(someEvent)
bus.addToEventLogQueue(listener)
bus.start()

"someEvent" would not be delivered to "listener" if that was the first
listener in the queue, because the queue wouldn't exist when the
event was posted.

This change buffers the events before starting the bus in the bus itself,
so that they can be delivered to all registered queues when the bus is
started.

Also tweaked the unit tests to cover the behavior above.

…ueues.

The code in LiveListenerBus was queueing events before start in the
queues themselves; so in situations like the following:

   bus.post(someEvent)
   bus.addToEventLogQueue(listener)
   bus.start()

"someEvent" would not be delivered to "listener" if that was the first
listener in the queue.

This change buffers the events before starting the bus in the bus itself,
so that they can be delivered to all registered queues when the bus is
started.

Also tweaked the unit tests to cover the behavior above.
@SparkQA
Copy link

SparkQA commented Dec 20, 2017

Test build #85206 has finished for PR 20039 at commit 80b900a.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@SparkQA
Copy link

SparkQA commented Dec 21, 2017

Test build #85258 has finished for PR 20039 at commit 80b900a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

def post(event: SparkListenerEvent): Unit = synchronized {
if (stopped.get()) {
return
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this and the original way

if (!stopped.get()) {  
...
}

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It avoids having to indent the rest of the code.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is synchronized needed only due to the modification of queuedEvents?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. It's needed because you have to make sure that when the buffered events are posted to the queues in start(), this method cannot be called. Otherwise you need to have a contract that this class is not thread-safe until after start() is called.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me see if I can rearrange things to avoid the extra synchronization...

@squito
Copy link
Contributor

squito commented Dec 21, 2017

this change looks fine -- but I'm having trouble connecting this to the original bug. I guess really I'm confused about how driver logs ever worked ... where is the SparkListenerExecutorAdded event for the driver?

@vanzin
Copy link
Contributor Author

vanzin commented Dec 21, 2017

where is the SparkListenerExecutorAdded event for the driver?

There isn't one. There's just SparkListenerBlockManagerAdded. But the old executor list was based on block managers, not executor added / removed, so it included the driver.

@vanzin
Copy link
Contributor Author

vanzin commented Dec 21, 2017

That failing test looks flaky. retest this please

@squito
Copy link
Contributor

squito commented Dec 21, 2017

that explains how it gets into the list of executors in the UI -- but where does it get the link for the logs?

@vanzin
Copy link
Contributor Author

vanzin commented Dec 21, 2017

#20038

@squito
Copy link
Contributor

squito commented Dec 21, 2017

ahhhh, got it. sorry I misunderstood the comments on the jira ... I thought your second comment meant the original description was incorrect.

anyway -- lgtm

@SparkQA
Copy link

SparkQA commented Dec 22, 2017

Test build #85285 has finished for PR 20039 at commit 80b900a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

queues.asScala.foreach(_.start(sc))
queues.asScala.foreach { q =>
q.start(sc)
queuedEvents.foreach(q.post)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q.start(sc)
queuedEvents.foreach(q.post)

Ummm... In my opinion, exchange these two lines sequence would be better for following the original logic of events buffered before a queue calls start(). So, queuedEvents post to queues first before queues start would be unified logically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That really does not make any difference in behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if stop() called before all queuedEvents post to AsyncEventQueue?

/**
   * Stop the listener bus. It will wait until the queued events have been processed, but new
   * events will be dropped.
   */

(the "queued events" mentioned in description above is not equal to "queuedEvents" here.)

As queuedEvents "post" before listeners install, so, can they be treated as new events?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both start() and stop() are synchronized so they can't be interleaved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LiveListener's stop() is not synchronized completely. And LiveListener's stopped variable could be set "true" in stop() (before the synchronize body) while start() is calling. Do I missing something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, but the interleaving you were worried about specifically above isn't possible because of the synchronized blocks.

Yes, you could get one line into start(), switch to stop() and get a few lines in ... but then stop() gets blocked on start() anyway. Sure, that means that while start() is running, we also have stopped == true ... but so what? More post() calls will be no-ops. start() will start the queues and post all buffered events, then release the lock and stop() will stop all the queues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, perfect explanation. Thanks.

@SparkQA
Copy link

SparkQA commented Dec 22, 2017

Test build #85317 has finished for PR 20039 at commit 2602fa6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// synchronization and post events directly to the queues. This should be the most
// common case during the life of the bus.
if (queuedEvents == null) {
postToQueues(event)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if stop() called after the null judge and before postToQueues() call ?
Do you think we should check the stopped.get() in postToQueues()?
like:

private def postToQueues(event: SparkListenerEvent): Unit = {
  if (!stopped.get()) {
     val it = queues.iterator()
     while (it.hasNext()) {
        it.next().post(event)
     }
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this would help at all. IIUC, you're saying that its possible that a post() and a stop() are racing against each other, and we might post to a queue after its been stopped. But thats OK -- the queues are prepared to deal with this.

Its also possible that during that race, the event makes it one queue before that queue is stopped, but to another queue after its stopped. Which means that final event only makes it to some queues. Again, I think that is fine, and your change wouldn't help anyway, that would still be possible.

asfgit pushed a commit that referenced this pull request Jan 4, 2018
…ueues.

The code in LiveListenerBus was queueing events before start in the
queues themselves; so in situations like the following:

   bus.post(someEvent)
   bus.addToEventLogQueue(listener)
   bus.start()

"someEvent" would not be delivered to "listener" if that was the first
listener in the queue, because the queue wouldn't exist when the
event was posted.

This change buffers the events before starting the bus in the bus itself,
so that they can be delivered to all registered queues when the bus is
started.

Also tweaked the unit tests to cover the behavior above.

Author: Marcelo Vanzin <[email protected]>

Closes #20039 from vanzin/SPARK-22850.

(cherry picked from commit d2cddc8)
Signed-off-by: Imran Rashid <[email protected]>
@squito
Copy link
Contributor

squito commented Jan 4, 2018

merged to master / 2.3

@asfgit asfgit closed this in d2cddc8 Jan 4, 2018
@vanzin vanzin deleted the SPARK-22850 branch January 5, 2018 22:34
Willymontaz pushed a commit to criteo-forks/spark that referenced this pull request Sep 5, 2018
…ueues.

The code in LiveListenerBus was queueing events before start in the
queues themselves; so in situations like the following:

   bus.post(someEvent)
   bus.addToEventLogQueue(listener)
   bus.start()

"someEvent" would not be delivered to "listener" if that was the first
listener in the queue, because the queue wouldn't exist when the
event was posted.

This change buffers the events before starting the bus in the bus itself,
so that they can be delivered to all registered queues when the bus is
started.

Also tweaked the unit tests to cover the behavior above.

Author: Marcelo Vanzin <[email protected]>

Closes apache#20039 from vanzin/SPARK-22850.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants