From 5dd5b5db112ac99f3e70ce50db5e5196379c3610 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 21 Dec 2017 11:17:44 -0800 Subject: [PATCH] Feedback. --- .../spark/launcher/ChildProcAppHandle.java | 2 +- .../spark/launcher/InProcessAppHandle.java | 23 +++++++++----- .../spark/launcher/InProcessLauncher.java | 4 ++- .../apache/spark/launcher/LauncherServer.java | 15 ++++----- .../apache/spark/launcher/package-info.java | 31 +++++++++++-------- 5 files changed, 46 insertions(+), 29 deletions(-) diff --git a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java index 389907842bc76..3bb7e12385fd8 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/ChildProcAppHandle.java @@ -94,7 +94,6 @@ void monitorChild() { return; } - State currState = getState(); disconnect(); int ec; @@ -105,6 +104,7 @@ void monitorChild() { ec = 1; } + State currState = getState(); State newState = null; if (ec != 0) { // Override state with failure if the current state is not final, or is success. diff --git a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java index 2f2fb2c8e9c48..0d6a73a3da3ed 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java +++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessAppHandle.java @@ -19,14 +19,18 @@ import java.io.IOException; import java.lang.reflect.Method; -import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; class InProcessAppHandle extends AbstractAppHandle { + private static final String THREAD_NAME_FMT = "spark-app-%d: '%s'"; private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName()); - private static final ThreadFactory THREAD_FACTORY = new NamedThreadFactory("spark-app-%d"); + private static final AtomicLong THREAD_IDS = new AtomicLong(); + + // Avoid really long thread names. + private static final int MAX_APP_NAME_LEN = 16; private Thread app; @@ -47,9 +51,14 @@ public synchronized void kill() { setState(State.KILLED); } - synchronized void start(Method main, String[] args) { + synchronized void start(String appName, Method main, String[] args) { CommandBuilderUtils.checkState(app == null, "Handle already started."); - app = THREAD_FACTORY.newThread(() -> { + + if (appName.length() > MAX_APP_NAME_LEN) { + appName = "..." + appName.substring(appName.length() - MAX_APP_NAME_LEN); + } + + app = new Thread(() -> { try { main.invoke(null, (Object) args); } catch (Throwable t) { @@ -59,15 +68,15 @@ synchronized void start(Method main, String[] args) { synchronized (InProcessAppHandle.this) { if (!isDisposed()) { - State currState = getState(); disconnect(); - - if (!currState.isFinal()) { + if (!getState().isFinal()) { setState(State.LOST, true); } } } }); + + app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName)); app.start(); } diff --git a/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java index aa8f61a7e088e..6d726b4a69a86 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java +++ b/launcher/src/main/java/org/apache/spark/launcher/InProcessLauncher.java @@ -70,7 +70,9 @@ public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) thr List sparkArgs = builder.buildSparkSubmitArgs(); String[] argv = sparkArgs.toArray(new String[sparkArgs.size()]); - handle.start(main, argv); + String appName = CommandBuilderUtils.firstNonEmpty(builder.appName, builder.mainClass, + ""); + handle.start(appName, main, argv); return handle; } diff --git a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java index 55530cd249814..b8999a1d7a4f4 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java +++ b/launcher/src/main/java/org/apache/spark/launcher/LauncherServer.java @@ -107,7 +107,7 @@ static synchronized LauncherServer getServer() { private final AtomicLong refCount; private final AtomicLong threadIds; - private final ConcurrentMap pending; + private final ConcurrentMap secretToPendingApps; private final List clients; private final ServerSocket server; private final Thread serverThread; @@ -127,7 +127,7 @@ private LauncherServer() throws IOException { this.clients = new ArrayList<>(); this.threadIds = new AtomicLong(); this.factory = new NamedThreadFactory(THREAD_NAME_FMT); - this.pending = new ConcurrentHashMap<>(); + this.secretToPendingApps = new ConcurrentHashMap<>(); this.timeoutTimer = new Timer("LauncherServer-TimeoutTimer", true); this.server = server; this.running = true; @@ -149,7 +149,7 @@ private LauncherServer() throws IOException { */ synchronized String registerHandle(AbstractAppHandle handle) { String secret = createSecret(); - pending.put(secret, handle); + secretToPendingApps.put(secret, handle); return secret; } @@ -210,9 +210,10 @@ int getPort() { * the server. */ void unregister(AbstractAppHandle handle) { - for (Map.Entry e : pending.entrySet()) { + for (Map.Entry e : secretToPendingApps.entrySet()) { if (e.getValue().equals(handle)) { - pending.remove(e.getKey()); + String secret = e.getKey(); + secretToPendingApps.remove(secret); break; } } @@ -278,7 +279,7 @@ private String createSecret() { } String secretStr = sb.toString(); - if (!pending.containsKey(secretStr)) { + if (!secretToPendingApps.containsKey(secretStr)) { return secretStr; } } @@ -301,7 +302,7 @@ protected void handle(Message msg) throws IOException { timeout.cancel(); timeout = null; Hello hello = (Hello) msg; - AbstractAppHandle handle = pending.remove(hello.secret); + AbstractAppHandle handle = secretToPendingApps.remove(hello.secret); if (handle != null) { handle.setConnection(this); handle.setState(SparkAppHandle.State.CONNECTED); diff --git a/launcher/src/main/java/org/apache/spark/launcher/package-info.java b/launcher/src/main/java/org/apache/spark/launcher/package-info.java index f3489f46fa277..248b6d978733a 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/package-info.java +++ b/launcher/src/main/java/org/apache/spark/launcher/package-info.java @@ -16,17 +16,18 @@ */ /** - * Library for launching Spark applications. + * Library for launching Spark applications programmatically. * *

- * This library allows applications to launch Spark programmatically. There's only one entry - * point to the library - the {@link org.apache.spark.launcher.SparkLauncher} class. + * There are two ways to start applications with this library: as a child process, using + * {@link org.apache.spark.launcher.SparkLauncher}, or in-process, using + * {@link org.apache.spark.launcher.InProcessLauncher}. *

* *

- * The {@link org.apache.spark.launcher.SparkLauncher#startApplication( - * org.apache.spark.launcher.SparkAppHandle.Listener...)} can be used to start Spark and provide - * a handle to monitor and control the running application: + * The {@link org.apache.spark.launcher.AbstractLauncher#startApplication( + * org.apache.spark.launcher.SparkAppHandle.Listener...)} method can be used to start Spark and + * provide a handle to monitor and control the running application: *

* *
@@ -49,16 +50,20 @@
  * 
* *

- * Applications can also be launched in-process by using - * {@link org.apache.spark.launcher.InProcessLauncher} instead. Launching applications in-process - * is only recommended in cluster mode, since Spark cannot run multiple client-mode applications - * concurrently in the same process. The in-process launcher requires the necessary Spark - * dependencies (such as spark-core and cluster manager-specific modules) to be present in the - * caller thread's class loader. + * Launching applications as a child process requires a full Spark installation. The installation + * directory can be provided to the launcher explicitly in the launcher's configuration, or by + * setting the SPARK_HOME environment variable. *

* *

- * It's also possible to launch a raw child process, using the + * Launching applications in-process is only recommended in cluster mode, since Spark cannot run + * multiple client-mode applications concurrently in the same process. The in-process launcher + * requires the necessary Spark dependencies (such as spark-core and cluster manager-specific + * modules) to be present in the caller thread's class loader. + *

+ * + *

+ * It's also possible to launch a raw child process, without the extra monitoring, using the * {@link org.apache.spark.launcher.SparkLauncher#launch()} method: *

*