Skip to content

Commit

Permalink
Let workers finish lost races without delaying dynamic execution.
Browse files Browse the repository at this point in the history
If a worker is blocked on reading a response, it doesn't listen for interrupts. This changes blocking on reading to blocking on a sleep-loop, and if interrupted, the worker gets to finish in a separate thread before returning to the pool. This lets the action finish immediately.

RELNOTES: None.
PiperOrigin-RevId: 368207735
  • Loading branch information
larsrc-google authored and katre committed Jul 13, 2021
1 parent 058c0c9 commit eb8fb6f
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,24 +120,12 @@ void putRequest(WorkRequest request) throws IOException {
@Override
WorkResponse getResponse(int requestId) throws IOException, InterruptedException {
recordingInputStream.startRecording(4096);
// Ironically, we don't allow interrupts during dynamic execution, since we can't cancel
// the worker short of destroying it.
if (!workerKey.isSpeculative()) {
while (recordingInputStream.available() == 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
// This should only happen when not in dynamic execution, so we can safely kill the
// worker.
destroy();
throw e;
}
if (!process.isAlive()) {
throw new IOException(
String.format(
"Worker process for %s died while waiting for response",
workerKey.getMnemonic()));
}
while (recordingInputStream.available() == 0) {
Thread.sleep(10);
if (!process.isAlive()) {
throw new IOException(
String.format(
"Worker process for %s died while waiting for response", workerKey.getMnemonic()));
}
}
return workerProtocol.getResponse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,10 @@ WorkResponse execInWorker(

try {
response = worker.getResponse(request.getRequestId());
} catch (InterruptedException e) {
finishWorkAsync(key, worker, request);
worker = null;
throw e;
} catch (IOException e) {
restoreInterrupt(e);
// If protobuf or json reader couldn't parse the response, try to print whatever the
Expand Down Expand Up @@ -511,6 +515,41 @@ WorkResponse execInWorker(
return response;
}

/**
* Starts a thread to collect the response from a worker when it's no longer of interest.
*
* <p>This can happen either when we lost the race in dynamic execution or the build got
* interrupted. This takes ownership of the worker for purposes of returning it to the worker
* pool.
*/
private void finishWorkAsync(WorkerKey key, Worker worker, WorkRequest request) {
Thread reaper =
new Thread(
() -> {
Worker w = worker;
try {
w.getResponse(request.getRequestId());
} catch (IOException | InterruptedException e1) {
// If this happens, we either can't trust the output of the worker, or we got
// interrupted while handling being interrupted. In the latter case, let's stop
// trying and just destroy the worker. If it's a singleplex worker, there will
// be a dangling response that we don't want to keep trying to read, so we destroy
// the worker.
try {
workers.invalidateObject(key, w);
w = null;
} catch (IOException | InterruptedException e2) {
// The reaper thread can't do anything useful about this.
}
} finally {
if (w != null) {
workers.returnObject(key, w);
}
}
});
reaper.start();
}

private static void restoreInterrupt(IOException e) {
if (e instanceof InterruptedIOException) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Semaphore;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import sun.misc.Signal;
import sun.misc.SignalHandler;

/** An example implementation of a worker process that is used for integration tests. */
public class ExampleWorker {
Expand Down Expand Up @@ -79,6 +82,23 @@ private static void runPersistentWorker(ExampleWorkerOptions workerOptions) thro
PrintStream originalStdOut = System.out;
PrintStream originalStdErr = System.err;

if (workerOptions.waitForSignal) {
Semaphore signalSem = new Semaphore(0);
Signal.handle(
new Signal("HUP"),
new SignalHandler() {
@Override
public void handle(Signal sig) {
signalSem.release();
}
});
try {
signalSem.acquire();
} catch (InterruptedException e) {
System.out.println("Interrupted while waiting for signal");
e.printStackTrace();
}
}
ExampleWorkerProtocol workerProtocol = null;
switch (workerOptions.workerProtocol) {
case JSON:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,14 @@ public static class ExampleWorkOptions extends OptionsBase {
)
public boolean hardPoison;

@Option(
name = "wait_for_signal",
documentationCategory = OptionDocumentationCategory.UNCATEGORIZED,
effectTags = {OptionEffectTag.NO_OP},
defaultValue = "false",
help = "Don't send a response until receiving a SIGXXXX.")
public boolean waitForSignal;

/** Enum converter for --worker_protocol. */
public static class WorkerProtocolEnumConverter
extends EnumConverter<ExecutionRequirements.WorkerProtocolFormat> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,11 +174,6 @@ private void verifyGetResponseFailure(String responseString, String expectedErro
assertThat(ex).hasMessageThat().contains(expectedError);
}

@Test
public void testGetResponse_json_emptyString_throws() throws IOException {
verifyGetResponseFailure("", "Could not parse json work request correctly");
}

@Test
public void testGetResponse_badJson_throws() throws IOException {
verifyGetResponseFailure(
Expand Down

0 comments on commit eb8fb6f

Please sign in to comment.