Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-29149][YARN] Update YARN cluster manager For Stage Level Scheduling #27583

Closed
wants to merge 42 commits into from

Conversation

tgravescs
Copy link
Contributor

@tgravescs tgravescs commented Feb 14, 2020

What changes were proposed in this pull request?

Yarn side changes for Stage level scheduling. The previous PR for dynamic allocation changes was #27313

Modified the data structures to store things on a per ResourceProfile basis.
I tried to keep the code changes to a minimum, the main loop that requests just goes through each Resourceprofile and the logic inside for each one stayed very close to the same.
On submission we now have to give each ResourceProfile a separate yarn Priority because yarn doesn't support asking for containers with different resources at the same Priority. We just use the profile id as the priority level.
Using a different Priority actually makes things easier when the containers come back to match them again which ResourceProfile they were requested for.
The expectation is that yarn will only give you a container with resource amounts you requested or more. It should never give you a container if it doesn't satisfy your resource requests.

If you want to see the full feature changes you can look at https://github.com/apache/spark/pull/27053/files for reference

Why are the changes needed?

For stage level scheduling YARN support.

Does this PR introduce any user-facing change?

no

How was this patch tested?

Tested manually on YARN cluster and then unit tests.

tests and SparkContext creations it gets re initialized
@tgravescs
Copy link
Contributor Author

General question about priority, I did not find much here [1].
How is the value of priority interpreted ?
Is it simply to "tag" requests ?
Or are higher priority requests 'prioritized' over lower priority requests from an application (to a queue) ?

How does it compare with [2] ? Will that be cleaner (using tags) ?

[1] https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-api/apidocs/org/apache/hadoop/yarn/api/records/Priority.html

[2] https://hadoop.apache.org/docs/current/hadoop-yarn/hadoop-yarn-api/apidocs/org/apache/hadoop/yarn/api/records/SchedulingRequest.html

I don't think the Priority is documented very well at all. We ran into this issue with TEZ, where you can't have different container sizes within the same Priority. A priority is as it sounds, higher priorities get allocated first. For Spark I don't think this matters since we finish a stage before proceeding to the next. If we had a slow start feature like MapReduce then it would be. It does mean that if you have 2 stages with different resourceProfile running at the same time, one of those stages containers would be prioritized over the other, but again I don't think that is an issue. If you can think of a case it would be let me know. There is actually a way to get around using different priorities but you have to turn on a feature in YARN to use like tags. Since that is optional feature I didn't want to rely on it and I didn't see any issues with the Priority.

I haven't looked at the SchedulingRequest in detail but its more about placement and gang scheduling - https://issues.apache.org/jira/browse/YARN-6592. That is definitely something interesting but would prefer to do it separate from this, unless you see an issue with the Priority? I can look at it more to see if it would get around having to use Priority, but the schedulingRequest itself also has a priority, though has a separate resource sizing. I would almost bet it has the same restriction, but maybe its using the tags to get around this.

@tgravescs
Copy link
Contributor Author

I guess the one case I can think of is if you are running spark in a job server scenario the priorities could favor certain jobs more if they used ResourceProfiles vs using the default profile. I think we could document this for now.

@tgravescs
Copy link
Contributor Author

note that YARN-6592 only went into hadoop 3.1.0 so it wouldn't work for older versions, which might go back to your version question.

@mridulm
Copy link
Contributor

mridulm commented Feb 25, 2020

I was not advocating for SchedulingRequest, just wanted to understand whether the requirement matched what was supported by SchedulingRequest (though it was probably designed for something else, conceptually it seemed to apply based on my cursory read).

Given the lack of availability in earlier hadoop versions, we can punt on using SchedulingRequest - something we can look at in future when minimum hadoop version changes.

About priority - given it had scheduling semantics associated with it, I was not sure if overloading it would be a problem. I had not thought about jobserver usecase - but that is an excellent point !
Given this, do we want to change priority of default to very high value ? Else all resource profiles will have a higher priority than default ?

@tgravescs
Copy link
Contributor Author

@dongjoon-hyun Sorry to bug you again, similar question here, how do I rerun the checks. I clicked on Details but I don't have any "rerun" button. I'm logged in with my github apache account. Do I need permissions? or am I logged in wrong?

@tgravescs
Copy link
Contributor Author

Sure we can make default profile highest priority. I put a note in the documentation jira as well to make sure to document the behavior.

@tgravescs
Copy link
Contributor Author

tgravescs commented Feb 25, 2020

Sorry I forgot, actually the default profile is already the highest priority. In Yarn lower numbers are higher priority and default profile has id 0. So my example above is wrong, job server would favor the default profiles over the custom ones, but seems that would be fine for default behavior and we can document it for now.

@tgravescs
Copy link
Contributor Author

Updated the locking to use synchronized everywhere and removed the concurrent structures since most of them were only being used by the metrics system since things have changed since originally added. I did also move some things around trying to put them in sections that were easier to read, if that is to confusing I can move things back.

@tgravescs
Copy link
Contributor Author

Note that most accesses are synchronized in allocateResources, the others places are separately synchronized and called either from applicationmastersource or AMEndPoint

@SparkQA
Copy link

SparkQA commented Feb 25, 2020

Test build #118928 has finished for PR 27583 at commit f9c1a05.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much cleaner, can you please make sure that all variables are approrpriately locked before access/update ? Looks like build time validation is not enabled/catching those. Please let me know if I am missing something !


def getNumReleasedContainers: Int = releasedContainers.size()
def getNumExecutorsStarting: Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronized on this ? I was expecting static analysis via @GuardedBy to catch this in build, apparently we dont have that validation.
Can you also check use of some of the other variables as well ? targetNumExecutorsPerResourceProfileId, etc also seems to have similar issues.

Copy link
Contributor Author

@tgravescs tgravescs Feb 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went through all the variables, they are all protected via a higher up call. We can add in more synchonizes if we want to nest (re-entrant) it just to make it more readable?
For instances this one is only called from allocateResources which is synchronized and that is the case with most of these.

Copy link
Contributor Author

@tgravescs tgravescs Feb 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead an added in more synchronized call in each funciton those variables are touched. I believe the re-entrant of synchronized is cheap so shouldn't be much overhead and help wiht readability and future breakages. If this is not what you intended let me know

make things more readable. This doesn't change the actual locking
because all of these places were already synchronized by the calling
functions.
@SparkQA
Copy link

SparkQA commented Feb 27, 2020

Test build #119042 has finished for PR 27583 at commit 9e79f1a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor Author

introduced a bug fixing it

@tgravescs
Copy link
Contributor Author

@dongjoon-hyun can you kick the check again and how do I get permissions - I don't see any rerun buttons?

@SparkQA
Copy link

SparkQA commented Feb 28, 2020

Test build #119056 has finished for PR 27583 at commit 14b6251.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 28, 2020

Test build #119065 has finished for PR 27583 at commit bd3509c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -336,7 +338,7 @@ private[yarn] class YarnAllocator(
val resource = Resource.newInstance(totalMem, cores)
ResourceRequestHelper.setResourceRequests(customResources.toMap, resource)
logDebug(s"Created resource capability: $resource")
rpIdToYarnResource(rp.id) = resource
rpIdToYarnResource.putIfAbsent(rp.id, resource)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can there be a race such that rp.id is present in the map ?
And if it does, should we be overwriting it here ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no not at the moment anyway, this function is synchronized and no where else adds it so only one can run at a time. I put in putIfAbsent but it doesn't really matter. ResourceProfile ids are unique and ResourceProfiles are immutable. Even if this code ran in multiple threads at the same time the result should be exactly the same so we would put the same thing in twice and it wouldn't matter which one got inserted first.
Strictly speaking that doesn't need to be a concurrent hashmap due to locking of the calling functions but to be more strict on it and ot help with future changes I made it one.
If you think its more clear one way or another let me know and I can modify.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We changed rpIdToYarnResource to ConcurrentHashMap in commit
e89a8b5 above from mutable.HashMap ... wanted to make sure this was only for concurrent reads and not writes which might insert keys here in parallel.

@mridulm
Copy link
Contributor

mridulm commented Feb 28, 2020

Looks good to me, just had a minor query.

@tgravescs
Copy link
Contributor Author

test this please

@SparkQA
Copy link

SparkQA commented Feb 28, 2020

Test build #119093 has finished for PR 27583 at commit bd3509c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@asfgit asfgit closed this in 0e2ca11 Feb 28, 2020
@tgravescs
Copy link
Contributor Author

thanks @mridulm, I appreciate the reviews. merged this to master

sjincho pushed a commit to sjincho/spark that referenced this pull request Apr 15, 2020
…uling

### What changes were proposed in this pull request?

Yarn side changes for Stage level scheduling.  The previous PR for dynamic allocation changes was apache#27313

Modified the data structures to store things on a per ResourceProfile basis.
 I tried to keep the code changes to a minimum, the main loop that requests just goes through each Resourceprofile and the logic inside for each one stayed very close to the same.
On submission we now have to give each ResourceProfile a separate yarn Priority because yarn doesn't support asking for containers with different resources at the same Priority. We just use the profile id as the priority level.
Using a different Priority actually makes things easier when the containers come back to match them again which ResourceProfile they were requested for.
The expectation is that yarn will only give you a container with resource amounts you requested or more. It should never give you a container if it doesn't satisfy your resource requests.

If you want to see the full feature changes you can look at https://github.com/apache/spark/pull/27053/files for reference

### Why are the changes needed?

For stage level scheduling YARN support.

### Does this PR introduce any user-facing change?

no

### How was this patch tested?

Tested manually on YARN cluster and then unit tests.

Closes apache#27583 from tgravescs/SPARK-29149.

Authored-by: Thomas Graves <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants