-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-17759] [CORE] Avoid adding duplicate schedulables #15326
Conversation
I've got some issues with this PR.
override def addSchedulable(schedulable: Schedulable) {
require(schedulable != null)
val name = schedulable.name
if (null == schedulableNameToSchedulable.put(name, schedulable)) {
schedulableQueue.add(schedulable)
} else {
logWarning(s"Duplicate Schedulable added: $name")
// remove previously enqueued schedulable with same name
schedulableQueue.remove(getSchedulableByName(name))
if (!schedulableQueue.contains(schedulable)) {
schedulableQueue.add(schedulable)
}
}
schedulable.parent = this
} The only routes to this code are from the one-time initialization of the backend on the creation of a SparkContext and from This is also the only route by which a This should pass all the existing tests; but if you do something more severe than just logging the warning (such as adding a |
+1 to Mark's suggestion about putting the behavior in Pool::addSchedulable, so that it works across all Schedulables. I'm less convinced about maintaining the existing behavior where the last added pool wins. I think people usually expect first-wins behavior for these things (e.g., that's the case for Java classpath issues), and the code is also simpler for that approach. Given that this should be affecting pretty few people (and there's now a warning logged), I think this would be fine to change. |
@kayousterhout @rxin Ok, but if we're going to change the behavior, then we need to be sure that change at least makes it into the release notes. |
@markhamstra if we do change it, we should prob merge only into master and not 2.0.1. |
Yes, that is what I was thinking regardless. |
...and it would be 2.0.2 at this point. :) |
Firstly, thanks @markhamstra and @kayousterhout for quick feedbacks. I was aware of putting the check in
and if there is a case that a |
@erenavsarogullari Your concern was entirely legitimate, and is also why I called in @kayousterhout to double-check my claim that other duplicate Schedulables would also be a problem. |
@erenavsarogullari @markhamstra I just looked at this further and I actually think this could be an issue: (1) The first attempt for a stage fails with a fetch failure. The associated TaskSetManager is marked as a zombie but some tasks are still running, so removeSchedulable isn't called yet (it gets called in TaskSchedulerImpl only after all running tasks in the stage have finished). (2) The map stage re-runs and a new attempt for the stage begins. This attempt has the same stage ID, so will have the same schedulable name. I remember having a long discussion about (1) a while ago (and when we should call removeSchedulable) and decided it was most "fair" to call it only after all running tasks complete, because running tasks should be counted towards the pools share even when they're for zombie task sets. I think in this case, the last-schedulable-wins policy that currently exists seems better, although still wrong. I'd argue we should first fix this bug (by giving each TaskSetManager a unique name), in a separate PR, and then do the fix to this PR that you suggested, Mark. The other fix seems like it should be relatively simple. @markhamstra thoughts? Does that seem reasonable and do you buy the bug description above? |
Thanks @kayousterhout and @markhamstra. |
Hi @kayousterhout and @markhamstra, In the light of our previous discussion, i committed second patch as e126cd8. This PR' s changeset looks ok but it still needs unique |
`TaskSetManager` should have unique name to avoid adding duplicate ones to parent `Pool` via `SchedulableBuilder`. This problem has been surfaced with following discussion: [[PR: Avoid adding duplicate schedulables]](apache#15326) **Proposal** : There is 1x1 relationship between `stageAttemptId` and `TaskSetManager` so `taskSet.Id` covering both `stageId` and `stageAttemptId` looks to be used for uniqueness of `TaskSetManager` name instead of just `stageId`. **Current TaskSetManager Name** : `var name = "TaskSet_" + taskSet.stageId.toString` **Sample**: TaskSet_0 **Proposed TaskSetManager Name** : `val name = "TaskSet_" + taskSet.Id ` `// taskSet.Id = (stageId + "." + stageAttemptId)` **Sample** : TaskSet_0.0 Added new Unit Test. Author: erenavsarogullari <[email protected]> Closes apache#15463 from erenavsarogullari/SPARK-17894.
Hi @kayousterhout and @markhamstra, Related PR #15463 is merged and this PR is ready for review. Also previous comments have been addressed with e126cd8ec51b11fed5ffaab376c6d4a451086cac. Thanks in advance. |
Kindly reminder :) |
`TaskSetManager` should have unique name to avoid adding duplicate ones to parent `Pool` via `SchedulableBuilder`. This problem has been surfaced with following discussion: [[PR: Avoid adding duplicate schedulables]](apache#15326) **Proposal** : There is 1x1 relationship between `stageAttemptId` and `TaskSetManager` so `taskSet.Id` covering both `stageId` and `stageAttemptId` looks to be used for uniqueness of `TaskSetManager` name instead of just `stageId`. **Current TaskSetManager Name** : `var name = "TaskSet_" + taskSet.stageId.toString` **Sample**: TaskSet_0 **Proposed TaskSetManager Name** : `val name = "TaskSet_" + taskSet.Id ` `// taskSet.Id = (stageId + "." + stageAttemptId)` **Sample** : TaskSet_0.0 Added new Unit Test. Author: erenavsarogullari <[email protected]> Closes apache#15463 from erenavsarogullari/SPARK-17894.
Hi @kayousterhout and @markhamstra, |
@erenavsarogullari I looked at this again (sorry for the long delay), and it looks like you maintained the old behavior of last-added-wins. I thought from the discussion above, @markhamstra was ok with first-added-wins, which I think is more intuitive and consistent with other locations in the codebase. Did I miss something from above? |
`TaskSetManager` should have unique name to avoid adding duplicate ones to parent `Pool` via `SchedulableBuilder`. This problem has been surfaced with following discussion: [[PR: Avoid adding duplicate schedulables]](apache#15326) **Proposal** : There is 1x1 relationship between `stageAttemptId` and `TaskSetManager` so `taskSet.Id` covering both `stageId` and `stageAttemptId` looks to be used for uniqueness of `TaskSetManager` name instead of just `stageId`. **Current TaskSetManager Name** : `var name = "TaskSet_" + taskSet.stageId.toString` **Sample**: TaskSet_0 **Proposed TaskSetManager Name** : `val name = "TaskSet_" + taskSet.Id ` `// taskSet.Id = (stageId + "." + stageAttemptId)` **Sample** : TaskSet_0.0 Added new Unit Test. Author: erenavsarogullari <[email protected]> Closes apache#15463 from erenavsarogullari/SPARK-17894.
@erenavsarogullari what's the status of this PR? |
@erenavsarogullari is this ready to be updated now that #16813 has been merged? |
@kayousterhout, thanks for querying this PR. |
Jenkins test this please |
Hi @kayousterhout and @markhamstra,
Thanks. |
Jenkins this is OK to test |
val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) | ||
val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) | ||
schedulableBuilder.addTaskSetManager(taskSetManager0, null) | ||
schedulableBuilder.addTaskSetManager(taskSetManager0, null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When would this happen? (the same TSM getting added twice)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Related fix aims to avoid adding duplicate schedulables (Pool and TSM). However, a new TSM is created for submitted TaskSet and duplicate TSM submission does not look an expected behaviour(although logic is robust enough for this case) so these test cases can be removed for clearer perspective.
val taskSetManager0 = createTaskSetManager(0, 2, taskScheduler) | ||
val taskSetManager1 = createTaskSetManager(1, 2, taskScheduler) | ||
schedulableBuilder.addTaskSetManager(taskSetManager0, null) | ||
schedulableBuilder.addTaskSetManager(taskSetManager0, null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
similar to the above -- when could this happen?
Hi @erenavsarogullari, is it still active? if so could you address the comments above? Otherwise, I would like to propose to close this. |
Hi @HyukjinKwon, thanks to follow this PR. I am busy for a while. Implementation is done and just last comments need to be addressed. I will address them as well asap. Thanks ;) |
Hi @HyukjinKwon, thanks for the following this PR again. This looks required but i am too busy for a while. Fix is already ready and will address the last comments asap. Sorry for delay again ;) |
I will take this out in the list. Thanks for your input. |
Gentle ping @kayousterhout. |
@HyukjinKwon what's the ping here for? It looks like I left some comments that @erenavsarogullari will address when he has time. |
I am sorry I misunderstood and thought it is almost (or already) ready. Will read the comments carefully next time. |
Hi @kayousterhout, |
Hi @kayousterhout, |
Jenkins test this please |
Hi All, |
retest this please |
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
What changes were proposed in this pull request?
If
spark.scheduler.allocation.file
has duplicate pools, all of them are created whenSparkContext
is initialized but just one of them is used and the other ones look redundant. This causes redundant pool creation and needs to be fixed.Code to Reproduce :
fairscheduler-duplicate-pools.xml :
The following sample just shows two default and duplicate_pool1 but this also needs to be thought for N default and/or other duplicate pools.
Debug Screenshot :
The following screenshots show
Pool.schedulableQueue(ConcurrentLinkedQueue[Schedulable])
has 4 pools asbut
Pool.schedulableNameToSchedulable(ConcurrentHashMap[String, Schedulable])
hasdue to pool name as key so one of default and duplicate_pool1 look redundant and live in
Pool.schedulableQueue
.## How was this patch tested?
Added new Unit Test case.