-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-2933] [yarn] Refactor and cleanup Yarn AM code. #2020
Conversation
Man, I just merged with master and there's already a conflict... will update shortly. |
This change modifies the Yarn module so that all the logic related to running the ApplicationMaster is localized. Instead of, previously, 4 different classes with mostly identical code, now we have: - A single, shared ApplicationMaster class, which can operate both in client and cluster mode, and substitutes the old ApplicationMaster (for cluster mode) and ExecutorLauncher (for client mode). The benefit here is that all different execution modes for all supported yarn versions use the same shared code for monitoring executor allocation, setting up configuration, and monitoring the process's lifecycle. - A new YarnRMClient interface, which defines basic RM functionality needed by the ApplicationMaster. This interface has concrete implementations for each supported Yarn version. - A new YarnAllocator interface, which just abstracts the existing interface of the YarnAllocationHandler class. This is to avoid having to touch the allocator code too much in this change, although it might benefit from a similar effort in the future. The end result is much easier to understand code, with much less duplication, making it much easier to fix bugs, add features, and test everything knowing that all supported versions will behave the same.
Made some tweaks to the YarnAllocator interface to cover both APIs more easily. There's still a lot of cleanup possible on that front, but I'll leave that as a separate task.
This avoids the NPEs that would happen if code just kept going.
Makes code a little cleaner and easier to follow.
QA tests have started for PR 2020 at commit
|
I tried to keep the allocator changes minimal; a lot of it is just removing dead code and moving shared code to a shared place, but I did make some changes to make the alpha and stable interfaces work similarly. @tgravescs, it would be great if you guys could test the alpha changes; they compile, but I don't have a 0.23.x cluster to test them on. That would be greatly appreciated! I tested both client and cluster mode in stable yarn (CDH 5.1 = 2.3.0), including killing the AM and killing executors while jobs were running. |
I'll take a look and hopefully try it out tomorrow. You didn't happen to change the client mode to be unmanaged am did you? |
QA tests have finished for PR 2020 at commit
|
No, I don't believe I changed anything in that regard. Goal was to keep all the current functionality intact. |
Jenkins, test this please. |
QA tests have started for PR 2020 at commit
|
QA tests have finished for PR 2020 at commit
|
resourceManager: AMRMProtocol, | ||
appAttemptId: ApplicationAttemptId, | ||
args: ApplicationMasterArguments, | ||
map: collection.Map[String, collection.Set[SplitInfo]]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be nice to have more descriptive name then map.
I took one pass through this and made some minor comment. I also ran a few small tests on 0.23 and they worked ok. I want to take another more detailed pass over this though. |
for some reason github isn't let me comment on the files anymore so here are a couple more. allocateExecutors() isn't being called. |
Conflicts: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster | |||
|
|||
import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} | |||
import org.apache.spark.{SparkException, Logging, SparkContext} | |||
import org.apache.spark.deploy.yarn.{Client, ClientArguments, ExecutorLauncher, YarnSparkHadoopUtil} | |||
import org.apache.spark.deploy.yarn.{ApplicationMaster, Client, ClientArguments, YarnSparkHadoopUtil} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see that ApplicationMaster is used here.
QA tests have started for PR 2020 at commit
|
QA tests have finished for PR 2020 at commit
|
} | ||
} | ||
|
||
// Note: this needs to happen before allocateExecutors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit, comment not valid anymore
Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) | ||
finished = true | ||
finalStatus = status | ||
reporterThread.interrupt() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is being called from the reporterThread in the case of checkNumExecutorsFailed. So interrupting that thread then causes it to not finish cleaning up. Like setting diagnostics message and calling sc.stop().
On hadoop 0.23 this results in the application hanging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm. Works fine in stable, and interrupting "self" should be ok in general. But I guess it's safer to not do it (and avoid the sleep in the reporter thread when the AM is finished).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was testing on stable too and it didn't report the diagnostics properly and didn't do the sc stop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fyi - The way I tested this was to use jdk32 but try to allocate executors with >4G memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed and re-tested (and re-merged). I generally run this in a loop to try this out:
jps | grep CoarseGrained | awk '{print $1}' | xargs kill
Other then the one comment I think this looks good. |
Conflicts: yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
QA tests have started for PR 2020 at commit
|
QA tests have finished for PR 2020 at commit
|
Jenkins, test this please |
QA tests have started for PR 2020 at commit
|
QA tests have finished for PR 2020 at commit
|
Ok this looks good. +1. thanks for taking this on @vanzin ! |
I committed this just to master. |
This change modifies the Yarn module so that all the logic related to running the ApplicationMaster is localized. Instead of, previously, 4 different classes with mostly identical code, now we have: - A single, shared ApplicationMaster class, which can operate both in client and cluster mode, and substitutes the old ApplicationMaster (for cluster mode) and ExecutorLauncher (for client mode). The benefit here is that all different execution modes for all supported yarn versions use the same shared code for monitoring executor allocation, setting up configuration, and monitoring the process's lifecycle. - A new YarnRMClient interface, which defines basic RM functionality needed by the ApplicationMaster. This interface has concrete implementations for each supported Yarn version. - A new YarnAllocator interface, which just abstracts the existing interface of the YarnAllocationHandler class. This is to avoid having to touch the allocator code too much in this change, although it might benefit from a similar effort in the future. The end result is much easier to understand code, with much less duplication, making it much easier to fix bugs, add features, and test everything knowing that all supported versions will behave the same. Author: Marcelo Vanzin <[email protected]> Closes apache#2020 from vanzin/SPARK-2933 and squashes the following commits: 3bbf3e7 [Marcelo Vanzin] Merge branch 'master' into SPARK-2933 ff389ed [Marcelo Vanzin] Do not interrupt reporter thread from within itself. 3a8ed37 [Marcelo Vanzin] Remote stale comment. 0f5142c [Marcelo Vanzin] Review feedback. 41f8c8a [Marcelo Vanzin] Fix app status reporting. c0794be [Marcelo Vanzin] Correctly clean up staging directory. 92770cc [Marcelo Vanzin] Merge branch 'master' into SPARK-2933 ecaf332 [Marcelo Vanzin] Small fix to shutdown code. f02d3f8 [Marcelo Vanzin] Merge branch 'master' into SPARK-2933 f581122 [Marcelo Vanzin] Review feedback. 557fdeb [Marcelo Vanzin] Cleanup a couple more constants. be6068d [Marcelo Vanzin] Restore shutdown hook to clean up staging dir. 5150993 [Marcelo Vanzin] Some more cleanup. b6289ab [Marcelo Vanzin] Move cluster/client code to separate methods. ecb23cd [Marcelo Vanzin] More trivial cleanup. 34f1e63 [Marcelo Vanzin] Fix some questionable error handling. 5657c7d [Marcelo Vanzin] Finish app if SparkContext initialization times out. 0e4be3d [Marcelo Vanzin] Keep "ExecutorLauncher" as the main class for client-mode AM. 91beabb [Marcelo Vanzin] Fix UI filter registration. 8c72239 [Marcelo Vanzin] Trivial cleanups. 99a52d5 [Marcelo Vanzin] Changes to the yarn-alpha project to use common AM code. 848ca6d [Marcelo Vanzin] [SPARK-2933] [yarn] Refactor and cleanup Yarn AM code.
This is part of a broader effort to clean up the Yarn integration code after #2020. The high-level changes in this PR include: - Removing duplicate code, especially across the alpha and stable APIs - Simplify unnecessarily complex method signatures and hierarchies - Rename unclear variable and method names - Organize logging output produced when the user runs Spark on Yarn - Extensively add documentation - Privatize classes where possible I have tested the stable API on a Hadoop 2.4 cluster. I tested submitting a jar that references classes in other jars in both client and cluster mode. I also made changes in the alpha API, though I do not have access to an alpha cluster. I have verified that it compiles, but it would be ideal if others can help test it. For those interested in some examples in detail, please read on. -------------------------------------------------------------------------------------------------------- ***Appendix*** - The loop to `getApplicationReport` from the RM is duplicated in 4 places: in the stable `Client`, alpha `Client`, and twice in `YarnClientSchedulerBackend`. We should not have different loops for client and cluster deploy modes. - There are many fragmented small helper methods that are only used once and should just be inlined. For instance, `ClientBase#getLocalPath` returns `null` on certain conditions, and its only caller `ClientBase#addFileToClasspath` checks whether the value returned is `null`. We could just have the caller check on that same condition to avoid passing `null`s around. - In `YarnSparkHadoopUtil#addToEnvironment`, we take in an argument `classpathSeparator` that always has the same value upstream (i.e. `File.pathSeparator`). This argument is now removed from the signature and all callers of this method upstream. - `ClientBase#copyRemoteFile` is now renamed to `copyFileToRemote`. It was unclear whether we are copying a remote file to our local file system, or copying a locally visible file to a remote file system. Also, even the content of the method has inaccurately named variables. We use `val remoteFs` to signify the file system of the locally visible file and `val fs` to signify the remote, destination file system. These are now renamed `srcFs` and `destFs` respectively. - We currently log the AM container's environment and resource mappings directly as Scala collections. This is incredibly hard to read and probably too verbose for the average Spark user. In other modes (e.g. standalone), we also don't log the launch commands by default, so the logging level of these information is now set to `DEBUG`. - None of these classes (`Client`, `ClientBase`, `YarnSparkHadoopUtil` etc.) is intended to be used by a Spark application (the user should go through Spark submit instead). At the very least they should be `private[spark]`. Author: Andrew Or <[email protected]> Closes #2350 from andrewor14/yarn-cleanup and squashes the following commits: 39e8c7b [Andrew Or] Address review comments 6619f9b [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup 2ca6d64 [Andrew Or] Improve logging in application monitor a3b9693 [Andrew Or] Minor changes 7dd6298 [Andrew Or] Simplify ClientBase#monitorApplication 547487c [Andrew Or] Provide default values for null application report entries a0ad1e9 [Andrew Or] Fix class not found error 1590141 [Andrew Or] Address review comments 45ccdea [Andrew Or] Remove usages of getAMMemory d8e33b6 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup ed0b42d [Andrew Or] Fix alpha compilation error c0587b4 [Andrew Or] Merge branch 'master' of github.com:apache/spark into yarn-cleanup 6d74888 [Andrew Or] Minor comment changes 6573c1d [Andrew Or] Clean up, simplify and document code for setting classpaths e4779b6 [Andrew Or] Clean up log messages + variable naming in ClientBase 8766d37 [Andrew Or] Heavily add documentation to Client* classes + various clean-ups 6c94d79 [Andrew Or] Various cleanups in ClientBase and ClientArguments ef7069a [Andrew Or] Clean up YarnClientSchedulerBackend more 6de9072 [Andrew Or] Guard against potential NPE in debug logging mode fabe4c4 [Andrew Or] Reuse more code in YarnClientSchedulerBackend 3f941dc [Andrew Or] First cut at simplifying the Client (stable and alpha)
### What changes were proposed in this pull request? Currently if any Xmx string is available in the driver java options even if not related to Max Heap setting the job sumission failed For Ex. this command failed `./bin/spark-submit --class org.apache.spark.examples.SparkPi --conf "spark.driver.extraJavaOptions=-Dtest=Xmx" examples/jars/spark-examples_2.12-3.4.1.jar 100` ### Why are the changes needed? The command should any failed if the -Xmx argument is set not if it is part of another parameter or string ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a unitary test Closes apache#41806 from ashangit/nfraison/SPARK-44242. Authored-by: Nicolas Fraison <[email protected]> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> Co-authored-by: Nicolas Fraison <[email protected]>
This change modifies the Yarn module so that all the logic related
to running the ApplicationMaster is localized. Instead of, previously,
4 different classes with mostly identical code, now we have:
client and cluster mode, and substitutes the old ApplicationMaster
(for cluster mode) and ExecutorLauncher (for client mode).
The benefit here is that all different execution modes for all supported
yarn versions use the same shared code for monitoring executor allocation,
setting up configuration, and monitoring the process's lifecycle.
by the ApplicationMaster. This interface has concrete implementations for
each supported Yarn version.
of the YarnAllocationHandler class. This is to avoid having to touch the
allocator code too much in this change, although it might benefit from a
similar effort in the future.
The end result is much easier to understand code, with much less duplication,
making it much easier to fix bugs, add features, and test everything knowing
that all supported versions will behave the same.