-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-11035][core] Add in-process Spark app launcher. #19591
Conversation
This change adds a new launcher that allows applications to be run in a separate thread in the same process as the calling code. To achieve that, some code from the child process implementation was moved to abstract classes that implement the common functionality, and the new launcher inherits from those. The new launcher was added as a new class, instead of implemented as a new option to the existing SparkLauncher, to avoid ambigous APIs. For example, SparkLauncher has ways to set the child app's environment, modify SPARK_HOME, or control the logging of the child process, none of which apply to in-process apps. The in-process launcher has limitations: it needs Spark in the context class loader of the calling thread, and it's bound by Spark's current limitation of a single client-mode application per JVM. It also relies on the recently added SparkApplication trait to make sure different apps don't mess up each other's configuration, but currently no cluster manager client implements that. I also chose to keep the same socket-based communication for in-process apps, even though it might be possible to avoid it for in-process mode. That helps both implementations share more code. Tested with new and existing unit tests, and with a simple app that uses the launcher; also made sure the app ran fine with older launcher jar to check binary compatibility.
Test build #83137 has finished for PR 19591 at commit
|
Really looking forward to this PR! For our use case, it will reduce our spark launch times by ~4 seconds. |
A note about the implementation: since this is executing |
Test build #83655 has finished for PR 19591 at commit
|
I'll leave this up a little longer to see if anyone volunteers to review, otherwise I'll ping some random people. |
@tgravescs (who reviewed the original change for this bug), @srowen @jerryshao |
ack will try to get to this tomorrow |
Ping. |
retest this please |
Test build #84442 has finished for PR 19591 at commit
|
retest this please |
Test build #84498 has finished for PR 19591 at commit
|
Test build #84629 has finished for PR 19591 at commit
|
Test build #84657 has finished for PR 19591 at commit
|
retest this please |
Test build #84666 has finished for PR 19591 at commit
|
Looks like a legitimate flaky test. Will take a look. |
Test build #84677 has finished for PR 19591 at commit
|
retest this please |
Test build #84721 has finished for PR 19591 at commit
|
retest this please |
Test build #84783 has finished for PR 19591 at commit
|
Failure looks unrelated... retest this please |
Test build #84792 has finished for PR 19591 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR is generally well-written and easy to understand. Just a single comment
if (builder.isClientMode(builder.getEffectiveConfig())) { | ||
LOG.warning("It's not recommended to run client-mode applications using InProcessLauncher."); | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just maybe a LOG.debug that shows an in-process app is started will be useful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You'll already get a ton of logs from SparkSubmit (or an exception if it doesn't run).
retest this please |
@vanzin, sorry I've been swamped and haven't had a chance to get to this. Still on my list but can't guarantee time frame. |
Test build #84877 has finished for PR 19591 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall looks good, mostly I just have small questions to make sure I understand what is going on. Will do another pass tomorrow, need to look more closely at tests.
I assume by this:
since this is executing SparkSubmit under the covers, it's possible to call the new InProcessLauncher and cause it to exit the current JVM because there's an error with the user-provided args. That's not optimal, but to avoid growing the current PR too much I'll leave the proper fix for that to a separate change.
you're talking about removing the System.exit()
in SparkSubmit?
class InProcessAppHandle extends AbstractAppHandle { | ||
|
||
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName()); | ||
private static final ThreadFactory THREAD_FACTORY = new NamedThreadFactory("spark-app-%d"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about the thread name including builder.appName
? might be useful if you are trying to monitor a bunch of threads?
(though you'd also be launching in cluster mode in that case, so the thread wouldn't be doing much ...)
return; | ||
} | ||
|
||
State currState = getState(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was added just because currState
is no longer accessible, right? You're not particularly trying to grab the state before the call to disconnect()
? Might be clearly to move it after, otherwise on first read it looks like it is trying to grab the state before its modified by disconnect()
LOG.warning("kill() may leave the underlying app running in in-process mode."); | ||
disconnect(); | ||
|
||
// Interrupt the thread. This is not guaranteed to kill the app, though. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in cluster mode, shouldn't this be pretty safe? If not, wouldn't it be a bug in spark-submit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It depends on the implementation of the client that does the submission, not spark-submit, but it should be safe and you could consider it a bug if it doesn't work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry I don't understand. I don't see why the client that does the submission would matter. I thought you'd have problems if the interrupt was caught and swallowed by spark-submit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SparkSubmit
just runs some other class, in this case, the class that submits the app in cluster mode (or the user class in client mode). And that class could swallow these interrupts, just as SparkSubmit
also could (but doesn't?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok, just a misunderstanding then. that is what I thought I was saying in the first place.
anyway, I guess we can leave the warning here for now in all cases and see if we get any reports from users in cluster mode ...
synchronized (InProcessAppHandle.this) { | ||
if (!isDisposed()) { | ||
State currState = getState(); | ||
disconnect(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment here on ordering of getState() & disconnect()
int ival = b >= 0 ? b : Byte.MAX_VALUE - b; | ||
if (ival < 0x10) { | ||
sb.append("0"); | ||
while (true) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
checking my understanding -- extra bug fix here? even in old code, if by chance two apps had same secret, you'd end up losing one handle?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really, it's mostly moving the logic that existed before (look for while (server.pending.containsKey(secret))
).
void unregister(ChildProcAppHandle handle) { | ||
pending.remove(handle.getSecret()); | ||
void unregister(AbstractAppHandle handle) { | ||
for (Map.Entry<String, AbstractAppHandle> e : pending.entrySet()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could add a system.identityHashCode "secret" to the InProcessAppHandle to keep the old version, though this is fine too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nevermind, stupid idea
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok one more try --
you are generating a secret whether its in-process or a child process, so why not store that secret in AbstractAppHandle? this isn't really a big deal, the efficiency difference doesn't matter, but the one line pending.remove(handle.getSecret())
is also easier to follow.
also could you rename pending
to secretToPendingApps
so its a more clear what the key is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I had something like that at some point, but it was more code than the current version... there's a little bit of a chicken & egg problem between handles and secrets, and keeping them separate simplified things a bit at least for me.
I'll do the rename.
@@ -49,6 +49,15 @@ | |||
* </pre> | |||
* | |||
* <p> | |||
* Applications can also be launched in-process by using | |||
* {@link org.apache.spark.launcher.InProcessLauncher} instead. Launching applications in-process |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
comment above about "there is only one entry point" is a bit incorrect now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok after a fresh look I think this is pretty much fine. couple of questions and suggestions for changes for code clarity
Yes, and also potentially changing error messages currently printed to the terminal into exceptions when running through the launcher. |
lgtm |
Test build #85279 has finished for PR 19591 at commit
|
merged to master |
This change adds a new launcher that allows applications to be run
in a separate thread in the same process as the calling code. To
achieve that, some code from the child process implementation was
moved to abstract classes that implement the common functionality,
and the new launcher inherits from those.
The new launcher was added as a new class, instead of implemented
as a new option to the existing SparkLauncher, to avoid ambigous
APIs. For example, SparkLauncher has ways to set the child app's
environment, modify SPARK_HOME, or control the logging of the
child process, none of which apply to in-process apps.
The in-process launcher has limitations: it needs Spark in the
context class loader of the calling thread, and it's bound by
Spark's current limitation of a single client-mode application
per JVM. It also relies on the recently added SparkApplication
trait to make sure different apps don't mess up each other's
configuration, so config isolation is currently limited to cluster mode.
I also chose to keep the same socket-based communication for in-process
apps, even though it might be possible to avoid it for in-process
mode. That helps both implementations share more code.
Tested with new and existing unit tests, and with a simple app that
uses the launcher; also made sure the app ran fine with older launcher
jar to check binary compatibility.