Skip to content

Commit

Permalink
[SPARK-23020][CORE][BRANCH-2.3] Fix another race in the in-process la…
Browse files Browse the repository at this point in the history
…uncher test.

First the bad news: there's an unfixable race in the launcher code.
(By unfixable I mean it would take a lot more effort than this change
to fix it.) The good news is that it should only affect super short
lived applications, such as the one run by the flaky test, so it's
possible to work around it in our test.

The fix also uncovered an issue with the recently added "closeAndWait()"
method; closing the connection would still possibly cause data loss,
so this change waits a while for the connection to finish itself, and
closes the socket if that times out. The existing connection timeout
is reused so that if desired it's possible to control how long to wait.

As part of that I also restored the old behavior that disconnect() would
force a disconnection from the child app; the "wait for data to arrive"
approach is only taken when disposing of the handle.

I tested this by inserting a bunch of sleeps in the test and the socket
handling code in the launcher library; with those I was able to reproduce
the error from the jenkins jobs. With the changes, even with all the
sleeps still in place, all tests pass.

Author: Marcelo Vanzin <[email protected]>

Closes apache#20743 from vanzin/SPARK-23020.
  • Loading branch information
Marcelo Vanzin authored and dongjoon-hyun committed Mar 24, 2018
1 parent eafe2f5 commit 598e64b
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.Map;
import java.util.Properties;

import org.junit.Ignore;
import org.junit.Test;
import static org.junit.Assert.*;
import static org.junit.Assume.*;
Expand Down Expand Up @@ -122,8 +121,7 @@ public void testChildProcLauncher() throws Exception {
assertEquals(0, app.waitFor());
}

// TODO: [SPARK-23020] Re-enable this
@Ignore
@Test
public void testInProcessLauncher() throws Exception {
// Because this test runs SparkLauncher in process and in client mode, it pollutes the system
// properties, and that can cause test failures down the test pipeline. So restore the original
Expand Down Expand Up @@ -159,12 +157,24 @@ private void inProcessLauncherTestImpl() throws Exception {

SparkAppHandle handle = null;
try {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);
synchronized (InProcessTestApp.LOCK) {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

// SPARK-23020: see doc for InProcessTestApp.LOCK for a description of the race. Here
// we wait until we know that the connection between the app and the launcher has been
// established before allowing the app to finish.
final SparkAppHandle _handle = handle;
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertNotEquals(SparkAppHandle.State.UNKNOWN, _handle.getState());
});

InProcessTestApp.LOCK.wait(5000);
}

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());
Expand Down Expand Up @@ -195,10 +205,26 @@ public static void main(String[] args) throws Exception {

public static class InProcessTestApp {

/**
* SPARK-23020: there's a race caused by a child app finishing too quickly. This would cause
* the InProcessAppHandle to dispose of itself even before the child connection was properly
* established, so no state changes would be detected for the application and its final
* state would be LOST.
*
* It's not really possible to fix that race safely in the handle code itself without changing
* the way in-process apps talk to the launcher library, so we work around that in the test by
* synchronizing on this object.
*/
public static final Object LOCK = new Object();

public static void main(String[] args) throws Exception {
assertNotEquals(0, args.length);
assertEquals(args[0], "hello");
new SparkContext().stop();

synchronized (LOCK) {
LOCK.notifyAll();
}
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@
package org.apache.spark.launcher;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.logging.Level;
import java.util.logging.Logger;

abstract class AbstractAppHandle implements SparkAppHandle {

private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
private static final Logger LOG = Logger.getLogger(AbstractAppHandle.class.getName());

private final LauncherServer server;

private LauncherServer.ServerConnection connection;
private List<Listener> listeners;
private AtomicReference<State> state;
private String appId;
private volatile String appId;
private volatile boolean disposed;

protected AbstractAppHandle(LauncherServer server) {
Expand All @@ -44,7 +44,7 @@ protected AbstractAppHandle(LauncherServer server) {
@Override
public synchronized void addListener(Listener l) {
if (listeners == null) {
listeners = new ArrayList<>();
listeners = new CopyOnWriteArrayList<>();
}
listeners.add(l);
}
Expand All @@ -71,16 +71,14 @@ public void stop() {

@Override
public synchronized void disconnect() {
if (!isDisposed()) {
if (connection != null) {
try {
connection.closeAndWait();
} catch (IOException ioe) {
// no-op.
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
dispose();
}
dispose();
}

void setConnection(LauncherServer.ServerConnection connection) {
Expand All @@ -97,10 +95,25 @@ boolean isDisposed() {

/**
* Mark the handle as disposed, and set it as LOST in case the current state is not final.
*
* This method should be called only when there's a reasonable expectation that the communication
* with the child application is not needed anymore, either because the code managing the handle
* has said so, or because the child application is finished.
*/
synchronized void dispose() {
if (!isDisposed()) {
// First wait for all data from the connection to be read. Then unregister the handle.
// Otherwise, unregistering might cause the server to be stopped and all child connections
// to be closed.
if (connection != null) {
try {
connection.waitForClose();
} catch (IOException ioe) {
// no-op.
}
}
server.unregister(this);

// Set state to LOST if not yet final.
setState(State.LOST, false);
this.disposed = true;
Expand All @@ -127,11 +140,13 @@ void setState(State s, boolean force) {
current = state.get();
}

LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { current, s });
if (s != State.LOST) {
LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
new Object[] { current, s });
}
}

synchronized void setAppId(String appId) {
void setAppId(String appId) {
this.appId = appId;
fireEvent(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ void monitorChild() {
}
}

disconnect();
dispose();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ synchronized void start(String appName, Method main, String[] args) {
setState(State.FAILED);
}

disconnect();
dispose();
});

app.setName(String.format(THREAD_NAME_FMT, THREAD_IDS.incrementAndGet(), appName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ public void run() {
};
ServerConnection clientConnection = new ServerConnection(client, timeout);
Thread clientThread = factory.newThread(clientConnection);
clientConnection.setConnectionThread(clientThread);
synchronized (clients) {
clients.add(clientConnection);
}
Expand Down Expand Up @@ -290,17 +291,15 @@ class ServerConnection extends LauncherConnection {

private TimerTask timeout;
private volatile Thread connectionThread;
volatile AbstractAppHandle handle;
private volatile AbstractAppHandle handle;

ServerConnection(Socket socket, TimerTask timeout) throws IOException {
super(socket);
this.timeout = timeout;
}

@Override
public void run() {
this.connectionThread = Thread.currentThread();
super.run();
void setConnectionThread(Thread t) {
this.connectionThread = t;
}

@Override
Expand Down Expand Up @@ -361,19 +360,30 @@ public void close() throws IOException {
}

/**
* Close the connection and wait for any buffered data to be processed before returning.
* Wait for the remote side to close the connection so that any pending data is processed.
* This ensures any changes reported by the child application take effect.
*
* This method allows a short period for the above to happen (same amount of time as the
* connection timeout, which is configurable). This should be fine for well-behaved
* applications, where they close the connection arond the same time the app handle detects the
* app has finished.
*
* In case the connection is not closed within the grace period, this method forcefully closes
* it and any subsequent data that may arrive will be ignored.
*/
public void closeAndWait() throws IOException {
close();

public void waitForClose() throws IOException {
Thread connThread = this.connectionThread;
if (Thread.currentThread() != connThread) {
try {
connThread.join();
connThread.join(getConnectionTimeout());
} catch (InterruptedException ie) {
// Ignore.
}

if (connThread.isAlive()) {
LOG.log(Level.WARNING, "Timed out waiting for child connection to close.");
close();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ public void infoChanged(SparkAppHandle handle) {
Message stopMsg = client.inbound.poll(30, TimeUnit.SECONDS);
assertTrue(stopMsg instanceof Stop);
} finally {
handle.kill();
close(client);
handle.kill();
client.clientThread.join();
}
}
Expand Down

0 comments on commit 598e64b

Please sign in to comment.