Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into console-sink
Browse files Browse the repository at this point in the history
  • Loading branch information
jose-torres committed Jan 17, 2018
2 parents a9d6b82 + 8598a98 commit 2916010
Show file tree
Hide file tree
Showing 22 changed files with 423 additions and 296 deletions.
2 changes: 1 addition & 1 deletion R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -2853,7 +2853,7 @@ setMethod("intersect",
#' except
#'
#' Return a new SparkDataFrame containing rows in this SparkDataFrame
#' but not in another SparkDataFrame. This is equivalent to \code{EXCEPT} in SQL.
#' but not in another SparkDataFrame. This is equivalent to \code{EXCEPT DISTINCT} in SQL.
#'
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.launcher;

import java.time.Duration;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.HashMap;
Expand All @@ -32,7 +31,6 @@
import static org.mockito.Mockito.*;

import org.apache.spark.SparkContext;
import org.apache.spark.SparkContext$;
import org.apache.spark.internal.config.package$;
import org.apache.spark.util.Utils;

Expand Down Expand Up @@ -139,9 +137,7 @@ public void testInProcessLauncher() throws Exception {
// Here DAGScheduler is stopped, while SparkContext.clearActiveContext may not be called yet.
// Wait for a reasonable amount of time to avoid creating two active SparkContext in JVM.
// See SPARK-23019 and SparkContext.stop() for details.
eventually(Duration.ofSeconds(5), Duration.ofMillis(10), () -> {
assertTrue("SparkContext is still alive.", SparkContext$.MODULE$.getActive().isEmpty());
});
TimeUnit.MILLISECONDS.sleep(500);
}
}

Expand All @@ -150,35 +146,26 @@ private void inProcessLauncherTestImpl() throws Exception {
SparkAppHandle.Listener listener = mock(SparkAppHandle.Listener.class);
doAnswer(invocation -> {
SparkAppHandle h = (SparkAppHandle) invocation.getArguments()[0];
synchronized (transitions) {
transitions.add(h.getState());
}
transitions.add(h.getState());
return null;
}).when(listener).stateChanged(any(SparkAppHandle.class));

SparkAppHandle handle = null;
try {
handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());

// Matches the behavior of LocalSchedulerBackend.
List<SparkAppHandle.State> expected = Arrays.asList(
SparkAppHandle.State.CONNECTED,
SparkAppHandle.State.RUNNING,
SparkAppHandle.State.FINISHED);
assertEquals(expected, transitions);
} finally {
if (handle != null) {
handle.kill();
}
}
SparkAppHandle handle = new InProcessLauncher()
.setMaster("local")
.setAppResource(SparkLauncher.NO_RESOURCE)
.setMainClass(InProcessTestApp.class.getName())
.addAppArgs("hello")
.startApplication(listener);

waitFor(handle);
assertEquals(SparkAppHandle.State.FINISHED, handle.getState());

// Matches the behavior of LocalSchedulerBackend.
List<SparkAppHandle.State> expected = Arrays.asList(
SparkAppHandle.State.CONNECTED,
SparkAppHandle.State.RUNNING,
SparkAppHandle.State.FINISHED);
assertEquals(expected, transitions);
}

public static class SparkLauncherTestApp {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -479,11 +479,6 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase {
// `failOnDataLoss` is `false`, we should not fail the query
assert(query.exception.isEmpty)
}
}

class KafkaSourceSuiteBase extends KafkaSourceTest {

import testImplicits._

test("SPARK-22956: currentPartitionOffsets should be set when no new data comes in") {
def getSpecificDF(range: Range.Inclusive): org.apache.spark.sql.Dataset[Int] = {
Expand Down Expand Up @@ -549,6 +544,11 @@ class KafkaSourceSuiteBase extends KafkaSourceTest {
CheckLastBatch(120 to 124: _*)
)
}
}

class KafkaSourceSuiteBase extends KafkaSourceTest {

import testImplicits._

test("cannot stop Kafka stream") {
val topic = newTopic()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ abstract class AbstractAppHandle implements SparkAppHandle {
private List<Listener> listeners;
private State state;
private String appId;
private volatile boolean disposed;
private boolean disposed;

protected AbstractAppHandle(LauncherServer server) {
this.server = server;
Expand Down Expand Up @@ -70,15 +70,16 @@ public void stop() {

@Override
public synchronized void disconnect() {
if (!isDisposed()) {
if (!disposed) {
disposed = true;
if (connection != null) {
try {
connection.close();
} catch (IOException ioe) {
// no-op.
}
}
dispose();
server.unregister(this);
}
}

Expand All @@ -94,21 +95,6 @@ boolean isDisposed() {
return disposed;
}

/**
* Mark the handle as disposed, and set it as LOST in case the current state is not final.
*/
synchronized void dispose() {
if (!isDisposed()) {
// Unregister first to make sure that the connection with the app has been really
// terminated.
server.unregister(this);
if (!getState().isFinal()) {
setState(State.LOST);
}
this.disposed = true;
}
}

void setState(State s) {
setState(s, false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,14 @@ public synchronized void disconnect() {

@Override
public synchronized void kill() {
if (!isDisposed()) {
setState(State.KILLED);
disconnect();
if (childProc != null) {
if (childProc.isAlive()) {
childProc.destroyForcibly();
}
childProc = null;
disconnect();
if (childProc != null) {
if (childProc.isAlive()) {
childProc.destroyForcibly();
}
childProc = null;
}
setState(State.KILLED);
}

void setChildProc(Process childProc, String loggerName, InputStream logStream) {
Expand Down Expand Up @@ -96,6 +94,8 @@ void monitorChild() {
return;
}

disconnect();

int ec;
try {
ec = proc.exitValue();
Expand All @@ -118,8 +118,6 @@ void monitorChild() {
if (newState != null) {
setState(newState, true);
}

disconnect();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,15 @@ class InProcessAppHandle extends AbstractAppHandle {

@Override
public synchronized void kill() {
if (!isDisposed()) {
LOG.warning("kill() may leave the underlying app running in in-process mode.");
setState(State.KILLED);
disconnect();

// Interrupt the thread. This is not guaranteed to kill the app, though.
if (app != null) {
app.interrupt();
}
LOG.warning("kill() may leave the underlying app running in in-process mode.");
disconnect();

// Interrupt the thread. This is not guaranteed to kill the app, though.
if (app != null) {
app.interrupt();
}

setState(State.KILLED);
}

synchronized void start(String appName, Method main, String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ protected synchronized void send(Message msg) throws IOException {
}

@Override
public synchronized void close() throws IOException {
public void close() throws IOException {
if (!closed) {
closed = true;
socket.close();
synchronized (this) {
if (!closed) {
closed = true;
socket.close();
}
}
}
}

boolean isOpen() {
return !closed;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -217,33 +217,6 @@ void unregister(AbstractAppHandle handle) {
break;
}
}

// If there is a live connection for this handle, we need to wait for it to finish before
// returning, otherwise there might be a race between the connection thread processing
// buffered data and the handle cleaning up after itself, leading to potentially the wrong
// state being reported for the handle.
ServerConnection conn = null;
synchronized (clients) {
for (ServerConnection c : clients) {
if (c.handle == handle) {
conn = c;
break;
}
}
}

if (conn != null) {
synchronized (conn) {
if (conn.isOpen()) {
try {
conn.wait();
} catch (InterruptedException ie) {
// Ignore.
}
}
}
}

unref();
}

Expand Down Expand Up @@ -315,7 +288,7 @@ private String createSecret() {
private class ServerConnection extends LauncherConnection {

private TimerTask timeout;
volatile AbstractAppHandle handle;
private AbstractAppHandle handle;

ServerConnection(Socket socket, TimerTask timeout) throws IOException {
super(socket);
Expand Down Expand Up @@ -365,21 +338,16 @@ protected void handle(Message msg) throws IOException {

@Override
public void close() throws IOException {
if (!isOpen()) {
return;
}

synchronized (clients) {
clients.remove(this);
}

synchronized (this) {
super.close();
notifyAll();
}

super.close();
if (handle != null) {
handle.dispose();
if (!handle.getState().isFinal()) {
LOG.log(Level.WARNING, "Lost connection to spark application.");
handle.setState(SparkAppHandle.State.LOST);
}
handle.disconnect();
}
}

Expand Down
42 changes: 7 additions & 35 deletions launcher/src/test/java/org/apache/spark/launcher/BaseSuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.launcher;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

import org.junit.After;
Expand Down Expand Up @@ -48,46 +47,19 @@ public void postChecks() {
assertNull(server);
}

protected void waitFor(final SparkAppHandle handle) throws Exception {
protected void waitFor(SparkAppHandle handle) throws Exception {
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(10);
try {
eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
assertTrue("Handle is not in final state.", handle.getState().isFinal());
});
while (!handle.getState().isFinal()) {
assertTrue("Timed out waiting for handle to transition to final state.",
System.nanoTime() < deadline);
TimeUnit.MILLISECONDS.sleep(10);
}
} finally {
if (!handle.getState().isFinal()) {
handle.kill();
}
}

// Wait until the handle has been marked as disposed, to make sure all cleanup tasks
// have been performed.
AbstractAppHandle ahandle = (AbstractAppHandle) handle;
eventually(Duration.ofSeconds(10), Duration.ofMillis(10), () -> {
assertTrue("Handle is still not marked as disposed.", ahandle.isDisposed());
});
}

/**
* Call a closure that performs a check every "period" until it succeeds, or the timeout
* elapses.
*/
protected void eventually(Duration timeout, Duration period, Runnable check) throws Exception {
assertTrue("Timeout needs to be larger than period.", timeout.compareTo(period) > 0);
long deadline = System.nanoTime() + timeout.toNanos();
int count = 0;
while (true) {
try {
count++;
check.run();
return;
} catch (Throwable t) {
if (System.nanoTime() >= deadline) {
String msg = String.format("Failed check after %d tries: %s.", count, t.getMessage());
throw new IllegalStateException(msg, t);
}
Thread.sleep(period.toMillis());
}
}
}

}
Loading

0 comments on commit 2916010

Please sign in to comment.