Skip to content

Commit

Permalink
Feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Dec 21, 2017
1 parent ee4098b commit 5dd5b5d
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ void monitorChild() {
return;
}

State currState = getState();
disconnect();

int ec;
Expand All @@ -105,6 +104,7 @@ void monitorChild() {
ec = 1;
}

State currState = getState();
State newState = null;
if (ec != 0) {
// Override state with failure if the current state is not final, or is success.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;

class InProcessAppHandle extends AbstractAppHandle {

private static final String THREAD_NAME_FMT = "spark-app-%d: '%s'";
private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
private static final ThreadFactory THREAD_FACTORY = new NamedThreadFactory("spark-app-%d");
private static final AtomicLong THREAD_IDS = new AtomicLong();

// Avoid really long thread names.
private static final int MAX_APP_NAME_LEN = 16;

private Thread app;

Expand All @@ -47,9 +51,14 @@ public synchronized void kill() {
setState(State.KILLED);
}

synchronized void start(Method main, String[] args) {
synchronized void start(String appName, Method main, String[] args) {
CommandBuilderUtils.checkState(app == null, "Handle already started.");
app = THREAD_FACTORY.newThread(() -> {

if (appName.length() > MAX_APP_NAME_LEN) {
appName = "..." + appName.substring(appName.length() - MAX_APP_NAME_LEN);
}

app = new Thread(() -> {
try {
main.invoke(null, (Object) args);
} catch (Throwable t) {
Expand All @@ -59,15 +68,15 @@ synchronized void start(Method main, String[] args) {

synchronized (InProcessAppHandle.this) {
if (!isDisposed()) {
State currState = getState();
disconnect();

if (!currState.isFinal()) {
if (!getState().isFinal()) {
setState(State.LOST, true);
}
}
}
});

app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName));
app.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) thr
List<String> sparkArgs = builder.buildSparkSubmitArgs();
String[] argv = sparkArgs.toArray(new String[sparkArgs.size()]);

handle.start(main, argv);
String appName = CommandBuilderUtils.firstNonEmpty(builder.appName, builder.mainClass,
"<unknown>");
handle.start(appName, main, argv);
return handle;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ static synchronized LauncherServer getServer() {

private final AtomicLong refCount;
private final AtomicLong threadIds;
private final ConcurrentMap<String, AbstractAppHandle> pending;
private final ConcurrentMap<String, AbstractAppHandle> secretToPendingApps;
private final List<ServerConnection> clients;
private final ServerSocket server;
private final Thread serverThread;
Expand All @@ -127,7 +127,7 @@ private LauncherServer() throws IOException {
this.clients = new ArrayList<>();
this.threadIds = new AtomicLong();
this.factory = new NamedThreadFactory(THREAD_NAME_FMT);
this.pending = new ConcurrentHashMap<>();
this.secretToPendingApps = new ConcurrentHashMap<>();
this.timeoutTimer = new Timer("LauncherServer-TimeoutTimer", true);
this.server = server;
this.running = true;
Expand All @@ -149,7 +149,7 @@ private LauncherServer() throws IOException {
*/
synchronized String registerHandle(AbstractAppHandle handle) {
String secret = createSecret();
pending.put(secret, handle);
secretToPendingApps.put(secret, handle);
return secret;
}

Expand Down Expand Up @@ -210,9 +210,10 @@ int getPort() {
* the server.
*/
void unregister(AbstractAppHandle handle) {
for (Map.Entry<String, AbstractAppHandle> e : pending.entrySet()) {
for (Map.Entry<String, AbstractAppHandle> e : secretToPendingApps.entrySet()) {
if (e.getValue().equals(handle)) {
pending.remove(e.getKey());
String secret = e.getKey();
secretToPendingApps.remove(secret);
break;
}
}
Expand Down Expand Up @@ -278,7 +279,7 @@ private String createSecret() {
}

String secretStr = sb.toString();
if (!pending.containsKey(secretStr)) {
if (!secretToPendingApps.containsKey(secretStr)) {
return secretStr;
}
}
Expand All @@ -301,7 +302,7 @@ protected void handle(Message msg) throws IOException {
timeout.cancel();
timeout = null;
Hello hello = (Hello) msg;
AbstractAppHandle handle = pending.remove(hello.secret);
AbstractAppHandle handle = secretToPendingApps.remove(hello.secret);
if (handle != null) {
handle.setConnection(this);
handle.setState(SparkAppHandle.State.CONNECTED);
Expand Down
31 changes: 18 additions & 13 deletions launcher/src/main/java/org/apache/spark/launcher/package-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@
*/

/**
* Library for launching Spark applications.
* Library for launching Spark applications programmatically.
*
* <p>
* This library allows applications to launch Spark programmatically. There's only one entry
* point to the library - the {@link org.apache.spark.launcher.SparkLauncher} class.
* There are two ways to start applications with this library: as a child process, using
* {@link org.apache.spark.launcher.SparkLauncher}, or in-process, using
* {@link org.apache.spark.launcher.InProcessLauncher}.
* </p>
*
* <p>
* The {@link org.apache.spark.launcher.SparkLauncher#startApplication(
* org.apache.spark.launcher.SparkAppHandle.Listener...)} can be used to start Spark and provide
* a handle to monitor and control the running application:
* The {@link org.apache.spark.launcher.AbstractLauncher#startApplication(
* org.apache.spark.launcher.SparkAppHandle.Listener...)} method can be used to start Spark and
* provide a handle to monitor and control the running application:
* </p>
*
* <pre>
Expand All @@ -49,16 +50,20 @@
* </pre>
*
* <p>
* Applications can also be launched in-process by using
* {@link org.apache.spark.launcher.InProcessLauncher} instead. Launching applications in-process
* is only recommended in cluster mode, since Spark cannot run multiple client-mode applications
* concurrently in the same process. The in-process launcher requires the necessary Spark
* dependencies (such as spark-core and cluster manager-specific modules) to be present in the
* caller thread's class loader.
* Launching applications as a child process requires a full Spark installation. The installation
* directory can be provided to the launcher explicitly in the launcher's configuration, or by
* setting the <i>SPARK_HOME</i> environment variable.
* </p>
*
* <p>
* It's also possible to launch a raw child process, using the
* Launching applications in-process is only recommended in cluster mode, since Spark cannot run
* multiple client-mode applications concurrently in the same process. The in-process launcher
* requires the necessary Spark dependencies (such as spark-core and cluster manager-specific
* modules) to be present in the caller thread's class loader.
* </p>
*
* <p>
* It's also possible to launch a raw child process, without the extra monitoring, using the
* {@link org.apache.spark.launcher.SparkLauncher#launch()} method:
* </p>
*
Expand Down

0 comments on commit 5dd5b5d

Please sign in to comment.