Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-11035][core] Add in-process Spark app launcher. #19591

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,14 @@
<artifactId>spark-tags_${scala.binary.version}</artifactId>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-launcher_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>

<!--
This spark-tags test-dep is needed even though it isn't used in this module, otherwise testing-cmds that exclude
them will yield errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.launcher

import java.net.{InetAddress, Socket}

import org.apache.spark.SPARK_VERSION
import org.apache.spark.{SPARK_VERSION, SparkConf}
import org.apache.spark.launcher.LauncherProtocol._
import org.apache.spark.util.{ThreadUtils, Utils}

Expand All @@ -36,9 +36,14 @@ private[spark] abstract class LauncherBackend {
private var lastState: SparkAppHandle.State = _
@volatile private var _isConnected = false

protected def conf: SparkConf

def connect(): Unit = {
val port = sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT).map(_.toInt)
val secret = sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET)
val port = conf.getOption(LauncherProtocol.CONF_LAUNCHER_PORT)
.orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_PORT))
.map(_.toInt)
val secret = conf.getOption(LauncherProtocol.CONF_LAUNCHER_SECRET)
.orElse(sys.env.get(LauncherProtocol.ENV_LAUNCHER_SECRET))
if (port != None && secret != None) {
val s = new Socket(InetAddress.getLoopbackAddress(), port.get)
connection = new BackendConnection(s)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ private[spark] class StandaloneSchedulerBackend(
private var client: StandaloneAppClient = null
private val stopping = new AtomicBoolean(false)
private val launcherBackend = new LauncherBackend() {
override protected def conf: SparkConf = sc.conf
override protected def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ private[spark] class LocalSchedulerBackend(
private val userClassPath = getUserClasspath(conf)
private val listenerBus = scheduler.sc.listenerBus
private val launcherBackend = new LauncherBackend() {
override def conf: SparkConf = LocalSchedulerBackend.this.conf
override def onStopRequest(): Unit = stop(SparkAppHandle.State.KILLED)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,30 +18,26 @@
package org.apache.spark.launcher;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
import static org.mockito.Mockito.*;

import org.apache.spark.SparkContext;
import org.apache.spark.internal.config.package$;
import org.apache.spark.util.Utils;

/**
* These tests require the Spark assembly to be built before they can be run.
*/
public class SparkLauncherSuite {
public class SparkLauncherSuite extends BaseSuite {

static {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
}

private static final Logger LOG = LoggerFactory.getLogger(SparkLauncherSuite.class);
private static final NamedThreadFactory TF = new NamedThreadFactory("SparkLauncherSuite-%d");

private final SparkLauncher launcher = new SparkLauncher();
Expand Down Expand Up @@ -123,6 +119,50 @@ public void testChildProcLauncher() throws Exception {
assertEquals(0, app.waitFor());
}

@Test
public void testInProcessLauncher() throws Exception {
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
// properties, and that can cause test failures down the test pipeline. So restore the original
// system properties after this test runs.
Map<Object, Object> properties = new HashMap<>(System.getProperties());
try {
inProcessLauncherTestImpl();
} finally {
Properties p = new Properties();
for (Map.Entry<Object, Object> e : properties.entrySet()) {
p.put(e.getKey(), e.getValue());
}
System.setProperties(p);
}
}

private void inProcessLauncherTestImpl() throws Exception {
final List<SparkAppHandle.State> transitions = new ArrayList<>();
SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
doAnswer(invocation -> {
SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
transitions.add(h.getState());
return null;
}).when(listener).stateChanged(any(SparkAppHandle.class));

SparkAppHandle handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());

// Matches the behavior of LocalSchedulerBackend.
List<SparkAppHandle.State> expected = Arrays.asList(
SparkAppHandle.State.CONNECTED,
SparkAppHandle.State.RUNNING,
SparkAppHandle.State.FINISHED);
assertEquals(expected, transitions);
}

public static class SparkLauncherTestApp {

public static void main(String[] args) throws Exception {
Expand All @@ -134,4 +174,14 @@ public static void main(String[] args) throws Exception {

}

public static class InProcessTestApp {

public static void main(String[] args) throws Exception {
assertNotEquals(0, args.length);
assertEquals(args[0], "hello");
new SparkContext().stop();
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.launcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

abstract class AbstractAppHandle implements SparkAppHandle {

private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());

private final LauncherServer server;

private LauncherConnection connection;
private List<Listener> listeners;
private State state;
private String appId;
private boolean disposed;

protected AbstractAppHandle(LauncherServer server) {
this.server = server;
this.state = State.UNKNOWN;
}

@Override
public synchronized void addListener(Listener l) {
if (listeners == null) {
listeners = new ArrayList<>();
}
listeners.add(l);
}

@Override
public State getState() {
return state;
}

@Override
public String getAppId() {
return appId;
}

@Override
public void stop() {
CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
try {
connection.send(new LauncherProtocol.Stop());
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
}

@Override
public synchronized void disconnect() {
if (!disposed) {
disposed = true;
if (connection != null) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);
}
}

void setConnection(LauncherConnection connection) {
this.connection = connection;
}

LauncherConnection getConnection() {
return connection;
}

boolean isDisposed() {
return disposed;
}

void setState(State s) {
setState(s, false);
}

synchronized void setState(State s, boolean force) {
if (force || !state.isFinal()) {
state = s;
fireEvent(false);
} else {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { state, s });
}
}

synchronized void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}

private void fireEvent(boolean isInfoChanged) {
if (listeners != null) {
for (Listener l : listeners) {
if (isInfoChanged) {
l.infoChanged(this);
} else {
l.stateChanged(this);
}
}
}
}

}
Loading