Skip to content

Commit

Permalink
[SPARK-11035][CORE] Add in-process Spark app launcher.
Browse files Browse the repository at this point in the history
This change adds a new launcher that allows applications to be run
in a separate thread in the same process as the calling code. To
achieve that, some code from the child process implementation was
moved to abstract classes that implement the common functionality,
and the new launcher inherits from those.

The new launcher was added as a new class, instead of implemented
as a new option to the existing SparkLauncher, to avoid ambigous
APIs. For example, SparkLauncher has ways to set the child app's
environment, modify SPARK_HOME, or control the logging of the
child process, none of which apply to in-process apps.

The in-process launcher has limitations: it needs Spark in the
context class loader of the calling thread, and it's bound by
Spark's current limitation of a single client-mode application
per JVM. It also relies on the recently added SparkApplication
trait to make sure different apps don't mess up each other's
configuration, so config isolation is currently limited to cluster mode.

I also chose to keep the same socket-based communication for in-process
apps, even though it might be possible to avoid it for in-process
mode. That helps both implementations share more code.

Tested with new and existing unit tests, and with a simple app that
uses the launcher; also made sure the app ran fine with older launcher
jar to check binary compatibility.

Author: Marcelo Vanzin <[email protected]>

Closes #19591 from vanzin/SPARK-11035.
  • Loading branch information
Marcelo Vanzin authored and squito committed Dec 28, 2017
1 parent 613b71a commit cfcd746
Show file tree
Hide file tree
Showing 21 changed files with 1,139 additions and 505 deletions.
8 changes: 8 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,14 @@
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-launcher_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
them will yield errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.launcher

import java.net.{InetAddress, Socket}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.launcher.LauncherProtocol._
import org.apache.spark.util.{ThreadUtils, Utils}

Expand All @@ -36,9 +36,14 @@ private[spark] abstract class LauncherBackend {
private var lastState: SparkAppHandle.State = _
@volatile private var _isConnected = false

protected def conf: SparkConf

def connect(): Unit = {
val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)
val port = conf.getOption(LauncherProtocol.CONF_LAUNCHER_PORT)
.orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT))
.map(_.toInt)
val secret = conf.getOption(LauncherProtocol.CONF_LAUNCHER_SECRET)
.orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET))
if (port != None && secret != None) {
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
connection = new BackendConnection(s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private[spark] class StandaloneSchedulerBackend(
private var client: StandaloneAppClient = null
private val stopping = new AtomicBoolean(false)
private val launcherBackend = new LauncherBackend() {
override protected def conf: SparkConf = sc.conf
override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private[spark] class LocalSchedulerBackend(
private val userClassPath = getUserClasspath(conf)
private val listenerBus = scheduler.sc.listenerBus
private val launcherBackend = new LauncherBackend() {
override def conf: SparkConf = LocalSchedulerBackend.this.conf
override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,26 @@
package org.apache.spark.launcher;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import static org.mockito.Mockito.*;

import org.apache.spark.SparkContext;
import org.apache.spark.internal.config.package$;
import org.apache.spark.util.Utils;

/**
* These tests require the Spark assembly to be built before they can be run.
*/
public class SparkLauncherSuite {
public class SparkLauncherSuite extends BaseSuite {

static {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
}

private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");

private final SparkLauncher launcher = new SparkLauncher();
Expand Down Expand Up @@ -123,6 +119,50 @@ public void testChildProcLauncher() throws Exception {
assertEquals(0, app.waitFor());
}

@Test
public void testInProcessLauncher() throws Exception {
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
// properties, and that can cause test failures down the test pipeline. So restore the original
// system properties after this test runs.
Map<Object, Object> properties = new HashMap<>(System.getProperties());
try {
inProcessLauncherTestImpl();
} finally {
Properties p = new Properties();
for (Map.Entry<Object, Object> e : properties.entrySet()) {
p.put(e.getKey(), e.getValue());
}
System.setProperties(p);
}
}

private void inProcessLauncherTestImpl() throws Exception {
final List<SparkAppHandle.State> transitions = new ArrayList<>();
SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
doAnswer(invocation -> {
SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
transitions.add(h.getState());
return null;
}).when(listener).stateChanged(any(SparkAppHandle.class));

SparkAppHandle handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());

// Matches the behavior of LocalSchedulerBackend.
List<SparkAppHandle.State> expected = Arrays.asList(
SparkAppHandle.State.CONNECTED,
SparkAppHandle.State.RUNNING,
SparkAppHandle.State.FINISHED);
assertEquals(expected, transitions);
}

public static class SparkLauncherTestApp {

public static void main(String[] args) throws Exception {
Expand All @@ -134,4 +174,14 @@ public static void main(String[] args) throws Exception {

}

public static class InProcessTestApp {

public static void main(String[] args) throws Exception {
assertNotEquals(0, args.length);
assertEquals(args[0], "hello");
new SparkContext().stop();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.launcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

abstract class AbstractAppHandle implements SparkAppHandle {

private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());

private final LauncherServer server;

private LauncherConnection connection;
private List<Listener> listeners;
private State state;
private String appId;
private boolean disposed;

protected AbstractAppHandle(LauncherServer server) {
this.server = server;
this.state = State.UNKNOWN;
}

@Override
public synchronized void addListener(Listener l) {
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(l);
}

@Override
public State getState() {
return state;
}

@Override
public String getAppId() {
return appId;
}

@Override
public void stop() {
CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
try {
connection.send(new LauncherProtocol.Stop());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

@Override
public synchronized void disconnect() {
if (!disposed) {
disposed = true;
if (connection != null) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);
}
}

void setConnection(LauncherConnection connection) {
this.connection = connection;
}

LauncherConnection getConnection() {
return connection;
}

boolean isDisposed() {
return disposed;
}

void setState(State s) {
setState(s, false);
}

synchronized void setState(State s, boolean force) {
if (force || !state.isFinal()) {
state = s;
fireEvent(false);
} else {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { state, s });
}
}

synchronized void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}

private void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
l.infoChanged(this);
} else {
l.stateChanged(this);
}
}
}
}

}
Loading

0 comments on commit cfcd746

Please sign in to comment.