diff --git a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java index dbe0c494db3f11..3edce2c3227b6f 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java +++ b/src/main/java/com/google/devtools/build/lib/worker/SingleplexWorker.java @@ -120,24 +120,12 @@ void putRequest(WorkRequest request) throws IOException { @Override WorkResponse getResponse(int requestId) throws IOException, InterruptedException { recordingInputStream.startRecording(4096); - // Ironically, we don't allow interrupts during dynamic execution, since we can't cancel - // the worker short of destroying it. - if (!workerKey.isSpeculative()) { - while (recordingInputStream.available() == 0) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - // This should only happen when not in dynamic execution, so we can safely kill the - // worker. - destroy(); - throw e; - } - if (!process.isAlive()) { - throw new IOException( - String.format( - "Worker process for %s died while waiting for response", - workerKey.getMnemonic())); - } + while (recordingInputStream.available() == 0) { + Thread.sleep(10); + if (!process.isAlive()) { + throw new IOException( + String.format( + "Worker process for %s died while waiting for response", workerKey.getMnemonic())); } } return workerProtocol.getResponse(); diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java index 437a59b64f88b7..a810be5a2928f3 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java @@ -454,6 +454,10 @@ WorkResponse execInWorker( try { response = worker.getResponse(request.getRequestId()); + } catch (InterruptedException e) { + finishWorkAsync(key, worker, request); + worker = null; + throw e; } catch (IOException e) { restoreInterrupt(e); // If protobuf or json reader couldn't parse the response, try to print whatever the @@ -511,6 +515,41 @@ WorkResponse execInWorker( return response; } + /** + * Starts a thread to collect the response from a worker when it's no longer of interest. + * + *
This can happen either when we lost the race in dynamic execution or the build got
+ * interrupted. This takes ownership of the worker for purposes of returning it to the worker
+ * pool.
+ */
+ private void finishWorkAsync(WorkerKey key, Worker worker, WorkRequest request) {
+ Thread reaper =
+ new Thread(
+ () -> {
+ Worker w = worker;
+ try {
+ w.getResponse(request.getRequestId());
+ } catch (IOException | InterruptedException e1) {
+ // If this happens, we either can't trust the output of the worker, or we got
+ // interrupted while handling being interrupted. In the latter case, let's stop
+ // trying and just destroy the worker. If it's a singleplex worker, there will
+ // be a dangling response that we don't want to keep trying to read, so we destroy
+ // the worker.
+ try {
+ workers.invalidateObject(key, w);
+ w = null;
+ } catch (IOException | InterruptedException e2) {
+ // The reaper thread can't do anything useful about this.
+ }
+ } finally {
+ if (w != null) {
+ workers.returnObject(key, w);
+ }
+ }
+ });
+ reaper.start();
+ }
+
private static void restoreInterrupt(IOException e) {
if (e instanceof InterruptedIOException) {
Thread.currentThread().interrupt();
diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java
index 471218c060dbda..353efa617138cb 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java
@@ -35,8 +35,11 @@
import java.util.Map;
import java.util.Random;
import java.util.UUID;
+import java.util.concurrent.Semaphore;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import sun.misc.Signal;
+import sun.misc.SignalHandler;
/** An example implementation of a worker process that is used for integration tests. */
public class ExampleWorker {
@@ -79,6 +82,23 @@ private static void runPersistentWorker(ExampleWorkerOptions workerOptions) thro
PrintStream originalStdOut = System.out;
PrintStream originalStdErr = System.err;
+ if (workerOptions.waitForSignal) {
+ Semaphore signalSem = new Semaphore(0);
+ Signal.handle(
+ new Signal("HUP"),
+ new SignalHandler() {
+ @Override
+ public void handle(Signal sig) {
+ signalSem.release();
+ }
+ });
+ try {
+ signalSem.acquire();
+ } catch (InterruptedException e) {
+ System.out.println("Interrupted while waiting for signal");
+ e.printStackTrace();
+ }
+ }
ExampleWorkerProtocol workerProtocol = null;
switch (workerOptions.workerProtocol) {
case JSON:
diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java
index 40d6faa5811b4a..440717916a3fd4 100644
--- a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java
+++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerOptions.java
@@ -135,6 +135,14 @@ public static class ExampleWorkOptions extends OptionsBase {
)
public boolean hardPoison;
+ @Option(
+ name = "wait_for_signal",
+ documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
+ effectTags = {OptionEffectTag.NO_OP},
+ defaultValue = "false",
+ help = "Don't send a response until receiving a SIGXXXX.")
+ public boolean waitForSignal;
+
/** Enum converter for --worker_protocol. */
public static class WorkerProtocolEnumConverter
extends EnumConverter