-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-29148][CORE] Add stage level scheduling dynamic allocation and scheduler backend changes #27313
Conversation
…ler backend changes
Test build #117199 has finished for PR 27313 at commit
|
Test build #117203 has finished for PR 27313 at commit
|
test doesn't fail for me locally but its using a directory that could overlap with other tests so I'll fix that test to have unique directory for resources. |
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Outdated
Show resolved
Hide resolved
Test build #117239 has finished for PR 27313 at commit
|
test this please |
core/src/main/scala/org/apache/spark/resource/ResourceUtils.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Tom, I gave a pass over the changes - nice surgical changes !
Yet to go over the test suites though.
Had a few queries to better understand and mostly nits, ptal.
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Outdated
Show resolved
Hide resolved
// Boost our target with the number to add for this round: | ||
numExecutorsTarget += numExecutorsToAdd | ||
numExecutorsTarget += numExecutorsToAddPerResourceProfileId.getOrElseUpdate(rpId, 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why the update would be required - it should have been set to some value already right ?
(Trying to reconcile my mental model)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes it should be because we initialize the default profile to 1 at the beginning. I think I was just being cautious and just in case. The addExecutors can be called before any stage runs since you can have an initial number so if we hadn't initialized default profile it would be required.
I can remove if you would like?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was trying to see if I was missing something - or whether it is required for the entry to exist.
If we are sure, let us remove it
val rpId = getResourceProfileIdOfExecutor(executorIdToBeRemoved) | ||
if (rpId == UNKNOWN_RESOURCE_PROFILE_ID) { | ||
logWarning(s"Not removing executor $executorIdsToBeRemoved because couldn't find " + | ||
"ResourceProfile for it!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: When testing, would be nice to assert this; we should not have this situation right ? Or it is possible ?
Do we support cleaning up of resource profiles ?
This actually brings me to a general question - if all rdd's which are referencing a resource profile have been gc'ed, do we also cleanup the cluster resources allocated through that resource profile ? (idle timeout should do this eventually).
What about references within our data structures for the profile to prevent leaks ? (We can do this in a future work ofcourse if the intention is to clean it - I want to understand if it is a possibility or whether the issue is not expected to happen).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so if events come out of order in the ExecutorMonitor we aren't sure what resource profile they are for, or if the executor monitor just doesn't know about the executor, in both of those cases its returns the UNKNOWN. When I wrote this code I was not expecting it to normally happen but actually now that I look more I think there is a possible race where the remove event could come at the same time as we calculate the timed out executors. The remove event would cause executor monitor to remove it from the list and then you would get UNKNOWN back. The calculation of timed out executors which calls into this removeExecutors takes a snapshot of the executors so it could return one that has later been removed. I think I'll modify this to pass the ResourceProfile in with the list of executors to remove, then we don't have to worry about that race. thanks for bringing this up
So currently we are not cleaning up if all resource profiles have been gc'ed. The executors will idle timeout - down to the minimum set. note I think you saw that discussion but we also want to make the initial, minimum, max configs settable per resource profile - but was going to do that in followon as well.
And many of the datastructures do not remove resource profiles because its not completely obvious when they are done being used. My thought was the number of ResourceProfiles is expected to be small, I tried to keep the datastructures memory usage small, and was thinking we can make this better in follow on jiras.
I was thinking the main initial use case was the ETL -> ML type case so was thinking you would only have a couple ResourceProfiles.
If you think its needed now, I can definitely start looking more into this. Just let me know your thoughts.
I filed https://issues.apache.org/jira/browse/SPARK-30749 and https://issues.apache.org/jira/browse/SPARK-30750 to specifically track doing that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note that on the datastructures we don't clean up, the idea was there would be pretty small, so just simple Int to Int or Int to empty set. If you see somewhere that we leak more then that please point it out.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially where ever we are introducing resource profile, a Set[E] becomes a Map[Int, Set[E]] and an Int becomes a Map[Int, Int]. Under normal circumstances, I do not expect these to be bad.
But given that we dont have named resource profiles, I am not sure how it interacts with a loop. For example, in an ML loop :
val inputRdd = prepare()
while (condition) {
val computeRdd = needGpuResources(inputRdd.map().foo.bar)
...
}
Here, if I understood, for each iteration we will create a new resource profile. Depending on number of iterations, we could end up with an increasing memory usage (degenerate case - if this is used in streaming).
Note: this is not a regression - for default profile, there is no usage increase (other than a negligible increase).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, agree about out of order - thx for the details.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Essentially where ever we are introducing resource profile, a Set[E] becomes a Map[Int, Set[E]] and an Int becomes a Map[Int, Int]. Under normal circumstances, I do not expect these to be bad.
But given that we dont have named resource profiles, I am not sure how it interacts with a loop. For example, in an ML loop :val inputRdd = prepare() while (condition) { val computeRdd = needGpuResources(inputRdd.map().foo.bar) ... }
Here, if I understood, for each iteration we will create a new resource profile. Depending on number of iterations, we could end up with an increasing memory usage (degenerate case - if this is used in streaming).
Note: this is not a regression - for default profile, there is no usage increase (other than a negligible increase).
So you would not create a new resource profile on just looping unless you are actually building it because you need different requirements on each iteration. You build it once and just re-use that profile.
val resourceProfileBuilder = new ResourceProfileBuilder()
val ereq = new ExecutorResourceRequests()
val treq = new TaskResourceRequests()
ereq.cores(2).memory("6g").memoryOverhead("2g")..resource("gpu", 1, "/home/tgraves/getGpus")
treq.cpus(2).resource("gpu", 1)
resourceProfileBuilder.require(ereq)
resourceProfileBuilder.require(treq)
val resourceProfile = resourceProfileBuilder.build()
val inputRdd = prepare()
while (condition) {
val computeRDD = inputRdd.map().withResources(resourceProfile).foo.bar)
....
}
In the example above, it creates a single ResourceProfile - it only has 1 id with it. The id is numeric at this point but its really no different then a name from a uniqueness point of view.
Or maybe your intention was that needGpuResources() function would add it and if that function was creating a profile each time then yes it would but I don't see how that is different whether you have a name or not. If the user does something inefficient all we can do is try to educate them. I guess the only difference is if they try to create the same profile with the same name - then you probably error but either way you went through all the work of constructing it.
The user should just do it outside the loop like my example.
but does come back to the point about memory, where it could start to add up if the user did something like that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the saner way to define it - definitely.
I am only worried about folks doing this within needGpuResources (in example above) - it could be coming from different libraries/teams.
With named profiles, this could either be reference to externally defined profile (by admin for example) or gets deduped into same if equality match (which should be the case here ...).
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
Outdated
Show resolved
Hide resolved
@@ -726,14 +894,23 @@ private[spark] class ExecutorAllocationManager( | |||
}) | |||
} | |||
|
|||
registerGauge("numberExecutorsToAdd", numExecutorsToAdd, 0) | |||
// The metrics are going to return the numbers for the default ResourceProfile. | |||
// It would be nice to do include each profile somehow in the future. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make these as aggregate across all resource profiles instead of only default ? (We can also add a gauge just for default if required).
For existing usecases, this will be exactly same behavior as 2.4 without resource profile - but for resource aware jobs, it will indicate actual pending counts and not just default (if no other rp, it is same as default - so no functional change in that case).
I agree, exposing for all profiles will be very useful when we do it in future - we will need to namespace it properly for that : that is why I really want to be able to 'name' resource profiles !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes we can definitely aggregate, I went back and forth on whether to do all or just default. I'll update
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh wait so the reason I didn't was aggregates don't make sense in some of these cases. Number of executors to add for instance. really I think that is the only one. The rest we can aggregate.
I could rename that one to be numberExecutorsToAddForDefaultProfile. thoughts?
note that the number executors to add is simply the number its adding in that round (that is the exponential ramp up algo for asking for more), its not the total it needs to add.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Did we ever explicitly document what numberExecutorsToAdd means ? I did not see it in our doc, but I could have missed it - if we did document it, adhering to what that means would be ideal.
Assuming not, intutively I would expect it to be total outstanding requests. Without resource profiles, default == total, so no behavior change; but with resource profile, I would have expected numberExecutorsToAdd to mean total with a resourceProfile..numberExecutorsToAdd (or some such naming) to give individual values per profile. Thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No the definition is not documented. Not sure why it never was.
I personally think this metric is odd because you have to understand how the algorithm works for this number to make sense.
Its not total outstanding requests either, its the number to add on the next iteration (not the one you are asking for now) if you still need executors. And just because its the number to add the next iteration doesn't mean you are going to really ask for that many.
The numberTargetExecutors is what you are currently asking for.
I agree with you that doing totals here make sense and then in the future having per profile ones that add up to the total. In this case I think that number would be useless though.
Really I'm almost more in favor of removing it as I don't really see it being very useful as a metric. thoughts on that?
I'll update the PR to do a sum for all of them and then can change/remove based on your thoughts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw references to it online, which is why i was curious about existing semantics.
If it is tied to impl detail, then I agree that removing it might be a good idea.
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
Show resolved
Hide resolved
thanks for the review @mridulm I think I responded to your questions, let me know what you think. I'll post updated patch after doing some testing for other things |
with the set of executors to avoid race. Fix up minor nits from reviews.
Test build #118004 has finished for PR 27313 at commit
|
Thanks for the changes @tgravescs ! Other than my general query about memory cleanup (which can be decoupled from this PR), this looks good to me. |
Test build #118182 has finished for PR 27313 at commit
|
test this please |
Test build #118247 has finished for PR 27313 at commit
|
tests run fine locally again looks like something intermittent or on jenkins side |
test this please |
Test build #118261 has finished for PR 27313 at commit
|
@mridulm thanks for the reviews. I'm going to go ahead and commit this based on your last comment. if a change is needed based on the open discussion we can do in the next pr or I'll just open a followup pr specifically for it. |
Thx for working on this Tom ! |
} | ||
if(!shouldCheckExecCores) { | ||
// if we can't rely on the executor cores config throw a warning for user | ||
logWarning("Please ensure that the number of slots available on your " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @tgravescs, I was investigating sudden warning message popped up in my local suddenly, presumably, after this fix. My Spark shell shows this warnings consistently in my local:
$ ./bin/spark-shell
...
20/02/18 11:04:56 WARN ResourceProfile: Please ensure that the number of slots available on your executors is
limited by the number of cores to task cpus and not another custom resource. If cores is not the limiting resource
then dynamic allocation will not work properly!
Do you have any idea?
@tgravescs, I wonder if we can split a PR into multiple small ones next time. It's a huge PR with 2k addition 1k deletion and seems very difficult to review. |
@@ -31,7 +31,7 @@ import org.apache.spark.annotation.Evolving | |||
* requirements between stages. | |||
*/ | |||
@Evolving |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: this mixes two concepts. Evolving
says it's an API but it's actually private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is only private until the feature is completely in. I made this such that everything is ready except remove the private once the other prs that make it functional are in.
I do not want to remove Evolving now because it could be forgotten later.
…it's disabled ### What changes were proposed in this pull request? Currently, after #27313, it shows the warning about dynamic allocation which is disabled by default. ```bash $ ./bin/spark-shell ``` ``` ... 20/02/18 11:04:56 WARN ResourceProfile: Please ensure that the number of slots available on your executors is limited by the number of cores to task cpus and not another custom resource. If cores is not the limiting resource then dynamic allocation will not work properly! ``` This PR brings back the configuration checking for this warning. Seems mistakenly removed at https://github.com/apache/spark/pull/27313/files#diff-364713d7776956cb8b0a771e9b62f82dL2841 ### Why are the changes needed? To remove false warning. ### Does this PR introduce any user-facing change? Yes, it will don't show the warning. It's master only change so no user-facing to end users. ### How was this patch tested? Manually tested. Closes #27615 from HyukjinKwon/SPARK-29148. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
@HyukjinKwon I already split this feature into multiple smaller jira/prs and have asked multiple times for feedback on both splitting it up and reviews. At some point it harder to split up as you don't get the context and its hard to find places to split it that make sense. Other ideas are welcome but its much more useful before hand. |
…uling ### What changes were proposed in this pull request? Yarn side changes for Stage level scheduling. The previous PR for dynamic allocation changes was #27313 Modified the data structures to store things on a per ResourceProfile basis. I tried to keep the code changes to a minimum, the main loop that requests just goes through each Resourceprofile and the logic inside for each one stayed very close to the same. On submission we now have to give each ResourceProfile a separate yarn Priority because yarn doesn't support asking for containers with different resources at the same Priority. We just use the profile id as the priority level. Using a different Priority actually makes things easier when the containers come back to match them again which ResourceProfile they were requested for. The expectation is that yarn will only give you a container with resource amounts you requested or more. It should never give you a container if it doesn't satisfy your resource requests. If you want to see the full feature changes you can look at https://github.com/apache/spark/pull/27053/files for reference ### Why are the changes needed? For stage level scheduling YARN support. ### Does this PR introduce any user-facing change? no ### How was this patch tested? Tested manually on YARN cluster and then unit tests. Closes #27583 from tgravescs/SPARK-29149. Authored-by: Thomas Graves <[email protected]> Signed-off-by: Thomas Graves <[email protected]>
… scheduler backend changes ### What changes were proposed in this pull request? This is another PR for stage level scheduling. In particular this adds changes to the dynamic allocation manager and the scheduler backend to be able to track what executors are needed per ResourceProfile. Note the api is still private to Spark until the entire feature gets in, so this functionality will be there but only usable by tests for profiles other then the DefaultProfile. The main changes here are simply tracking things on a ResourceProfile basis as well as sending the executor requests to the scheduler backend for all ResourceProfiles. I introduce a ResourceProfileManager in this PR that will track all the actual ResourceProfile objects so that we can keep them all in a single place and just pass around and use in datastructures the resource profile id. The resource profile id can be used with the ResourceProfileManager to get the actual ResourceProfile contents. There are various places in the code that use executor "slots" for things. The ResourceProfile adds functionality to keep that calculation in it. This logic is more complex then it should due to standalone mode and mesos coarse grained not setting the executor cores config. They default to all cores on the worker, so calculating slots is harder there. This PR keeps the functionality to make the cores the limiting resource because the scheduler still uses that for "slots" for a few things. This PR does also add the resource profile id to the Stage and stage info classes to be able to test things easier. That full set of changes will come with the scheduler PR that will be after this one. The PR stops at the scheduler backend pieces for the cluster manager and the real YARN support hasn't been added in this PR, that again will be in a separate PR, so this has a few of the API changes up to the cluster manager and then just uses the default profile requests to continue. The code for the entire feature is here for reference: https://github.com/apache/spark/pull/27053/files although it needs to be upmerged again as well. ### Why are the changes needed? Needed for stage level scheduling feature. ### Does this PR introduce any user-facing change? No user facing api changes added here. ### How was this patch tested? Lots of unit tests and manually testing. I tested on yarn, k8s, standalone, local modes. Ran both failure and success cases. Closes apache#27313 from tgravescs/SPARK-29148. Authored-by: Thomas Graves <[email protected]> Signed-off-by: Thomas Graves <[email protected]>
…it's disabled ### What changes were proposed in this pull request? Currently, after apache#27313, it shows the warning about dynamic allocation which is disabled by default. ```bash $ ./bin/spark-shell ``` ``` ... 20/02/18 11:04:56 WARN ResourceProfile: Please ensure that the number of slots available on your executors is limited by the number of cores to task cpus and not another custom resource. If cores is not the limiting resource then dynamic allocation will not work properly! ``` This PR brings back the configuration checking for this warning. Seems mistakenly removed at https://github.com/apache/spark/pull/27313/files#diff-364713d7776956cb8b0a771e9b62f82dL2841 ### Why are the changes needed? To remove false warning. ### Does this PR introduce any user-facing change? Yes, it will don't show the warning. It's master only change so no user-facing to end users. ### How was this patch tested? Manually tested. Closes apache#27615 from HyukjinKwon/SPARK-29148. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…uling ### What changes were proposed in this pull request? Yarn side changes for Stage level scheduling. The previous PR for dynamic allocation changes was apache#27313 Modified the data structures to store things on a per ResourceProfile basis. I tried to keep the code changes to a minimum, the main loop that requests just goes through each Resourceprofile and the logic inside for each one stayed very close to the same. On submission we now have to give each ResourceProfile a separate yarn Priority because yarn doesn't support asking for containers with different resources at the same Priority. We just use the profile id as the priority level. Using a different Priority actually makes things easier when the containers come back to match them again which ResourceProfile they were requested for. The expectation is that yarn will only give you a container with resource amounts you requested or more. It should never give you a container if it doesn't satisfy your resource requests. If you want to see the full feature changes you can look at https://github.com/apache/spark/pull/27053/files for reference ### Why are the changes needed? For stage level scheduling YARN support. ### Does this PR introduce any user-facing change? no ### How was this patch tested? Tested manually on YARN cluster and then unit tests. Closes apache#27583 from tgravescs/SPARK-29149. Authored-by: Thomas Graves <[email protected]> Signed-off-by: Thomas Graves <[email protected]>
* Note we never remove a resource profile at this point. Its expected this number if small | ||
* so this shouldn't be much overhead. | ||
*/ | ||
@Evolving |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Is @Evolving
needed since this is a private class?
What changes were proposed in this pull request?
This is another PR for stage level scheduling. In particular this adds changes to the dynamic allocation manager and the scheduler backend to be able to track what executors are needed per ResourceProfile. Note the api is still private to Spark until the entire feature gets in, so this functionality will be there but only usable by tests for profiles other then the DefaultProfile.
The main changes here are simply tracking things on a ResourceProfile basis as well as sending the executor requests to the scheduler backend for all ResourceProfiles.
I introduce a ResourceProfileManager in this PR that will track all the actual ResourceProfile objects so that we can keep them all in a single place and just pass around and use in datastructures the resource profile id. The resource profile id can be used with the ResourceProfileManager to get the actual ResourceProfile contents.
There are various places in the code that use executor "slots" for things. The ResourceProfile adds functionality to keep that calculation in it. This logic is more complex then it should due to standalone mode and mesos coarse grained not setting the executor cores config. They default to all cores on the worker, so calculating slots is harder there.
This PR keeps the functionality to make the cores the limiting resource because the scheduler still uses that for "slots" for a few things.
This PR does also add the resource profile id to the Stage and stage info classes to be able to test things easier. That full set of changes will come with the scheduler PR that will be after this one.
The PR stops at the scheduler backend pieces for the cluster manager and the real YARN support hasn't been added in this PR, that again will be in a separate PR, so this has a few of the API changes up to the cluster manager and then just uses the default profile requests to continue.
The code for the entire feature is here for reference: https://github.com/apache/spark/pull/27053/files although it needs to be upmerged again as well.
Why are the changes needed?
Needed for stage level scheduling feature.
Does this PR introduce any user-facing change?
No user facing api changes added here.
How was this patch tested?
Lots of unit tests and manually testing. I tested on yarn, k8s, standalone, local modes. Ran both failure and success cases.