From 32531157002ab6ed2091dacce7661c62453218f4 Mon Sep 17 00:00:00 2001 From: Benjamin Lee Date: Sun, 9 Jan 2022 20:32:35 -0800 Subject: [PATCH] index on add-support-for-wrapping-system-streams-in-workrequesthandler: 901c960355 Merge branch 'bazelbuild:master' into add-support-for-wrapping-system-streams-in-workrequesthandler --- .../build/lib/worker/WorkRequestHandler.java | 50 +++++++++++++++---- .../build/lib/worker/ExampleWorker.java | 10 +++- .../lib/worker/WorkRequestHandlerTest.java | 27 +++++++--- 3 files changed, 70 insertions(+), 17 deletions(-) diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java index 27be6ff756d8c0..9f64c9d09fdad4 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkRequestHandler.java @@ -313,8 +313,16 @@ public WorkRequestHandler build() { * then writing the corresponding {@link WorkResponse} to {@code out}. If there is an error * reading or writing the requests or responses, it writes an error message on {@code err} and * returns. If {@code in} reaches EOF, it also returns. + * + *

This function also wraps the system streams in a {@link WorkerIO} instance that prevents + * calls to {@link System#out} and {@link System#err} from corruption the worker streams. When the + * while loop exits, the original system streams will be swapped back into {@link System}. */ public void processRequests() throws IOException { + // Wrap the system streams into a WorkerIO instance that can be used + // to capture output and error streams that aren't being written to the worker output directly. + WorkerIO workerIO = WorkerIO.capture(); + try { while (true) { WorkRequest request = messageProcessor.readWorkRequest(); @@ -324,17 +332,24 @@ public void processRequests() throws IOException { if (request.getCancel()) { respondToCancelRequest(request); } else { - startResponseThread(request); + startResponseThread(workerIO, request); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); stderr.println("InterruptedException processing requests."); + } finally { + try { + // Unwrap the system streams placing the original streams back + workerIO.close(); + } catch (Exception e) { + stderr.println(e.getMessage()); + } } } /** Starts a thread for the given request. */ - void startResponseThread(WorkRequest request) throws InterruptedException { + void startResponseThread(WorkerIO workerIO, WorkRequest request) throws InterruptedException { Thread currentThread = Thread.currentThread(); String threadName = request.getRequestId() > 0 @@ -358,7 +373,7 @@ void startResponseThread(WorkRequest request) throws InterruptedException { return; } try { - respondToRequest(request, requestInfo); + respondToRequest(workerIO, request, requestInfo); } catch (IOException e) { e.printStackTrace(stderr); // In case of error, shut down the entire worker. @@ -378,7 +393,8 @@ void startResponseThread(WorkRequest request) throws InterruptedException { /** Handles and responds to the given {@link WorkRequest}. */ @VisibleForTesting - void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOException { + void respondToRequest(WorkerIO workerIO, WorkRequest request, RequestInfo requestInfo) + throws IOException { int exitCode; StringWriter sw = new StringWriter(); try (PrintWriter pw = new PrintWriter(sw)) { @@ -390,6 +406,16 @@ void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOExc e.printStackTrace(pw); exitCode = 1; } + + try { + // Read out the captured string for the final WorkResponse output + String captured = workerIO.readCapturedAsUtf8String().trim(); + if (!captured.isEmpty()) { + pw.write(captured); + } + } catch (IOException e) { + stderr.println(e.getMessage()); + } } Optional optBuilder = requestInfo.takeBuilder(); if (optBuilder.isPresent()) { @@ -509,8 +535,8 @@ private void maybePerformGc() { *

This is most useful when integrating JVM tools that write exceptions and logs directly to * {@link System#out} and {@link System#err} which corrupt the persistent worker. * - *

WorkerIO implements {@link AutoCloseable} and will swap the original streams back into {@link - * System} once close has been called. + *

WorkerIO implements {@link AutoCloseable} and will swap the original streams back into + * {@link System} once close has been called. */ public static class WorkerIO implements AutoCloseable { @@ -569,22 +595,26 @@ public static WorkerIO capture() throws IOException { } /** Returns the original input stream most commonly provided by {@link System#in} */ - public InputStream getOriginalInputStream() { + @VisibleForTesting + InputStream getOriginalInputStream() { return originalInputStream; } /** Returns the original output stream most commonly provided by {@link System#out} */ - public PrintStream getOriginalOutputStream() { + @VisibleForTesting + PrintStream getOriginalOutputStream() { return originalOutputStream; } /** Returns the original error stream most commonly provided by {@link System#err} */ - public PrintStream getOriginalErrorStream() { + @VisibleForTesting + PrintStream getOriginalErrorStream() { return originalErrorStream; } /** Returns the captured outputs as a UTF-8 string */ - public String readCapturedAsUtf8String() throws IOException { + @VisibleForTesting + String readCapturedAsUtf8String() throws IOException { capturedStream.flush(); String captureOutput = new String(capturedStream.toByteArray(), StandardCharsets.UTF_8); capturedStream.reset(); diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java index b172e167770fc1..aacf9f299ec765 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java +++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorker.java @@ -81,6 +81,7 @@ private static class InterruptableWorkRequestHandler extends WorkRequestHandler @Override public void processRequests() throws IOException { + WorkerIO workerIO = WorkerIO.capture(); while (true) { WorkRequest request = messageProcessor.readWorkRequest(); if (request == null) { @@ -99,7 +100,7 @@ public void processRequests() throws IOException { respondToCancelRequest(request); } else { try { - startResponseThread(request); + startResponseThread(workerIO, request); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // We don't expect interrupts at this level, only inside the individual request @@ -112,6 +113,13 @@ public void processRequests() throws IOException { System.exit(0); } } + + try { + // Unwrap the system streams placing the original streams back + workerIO.close(); + } catch (Exception e) { + workerIO.getOriginalErrorStream().println(e.getMessage()); + } } } diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java index 1b2a3f92ce7463..2ee58b7ce50f4e 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkRequestHandlerTest.java @@ -33,6 +33,8 @@ import java.util.Arrays; import java.util.List; import java.util.concurrent.Semaphore; + +import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -43,11 +45,18 @@ @RunWith(JUnit4.class) public class WorkRequestHandlerTest { + private final WorkRequestHandler.WorkerIO testWorkerIO = createTestWorkerIO(); + @Before public void init() { MockitoAnnotations.initMocks(this); } + @After + public void after() throws Exception { + testWorkerIO.close(); + } + @Test public void testNormalWorkRequest() throws IOException { ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -59,7 +68,7 @@ public void testNormalWorkRequest() throws IOException { List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build(); - handler.respondToRequest(request, new RequestInfo(null)); + handler.respondToRequest(testWorkerIO, request, new RequestInfo(null)); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray())); @@ -79,7 +88,7 @@ public void testMultiplexWorkRequest() throws IOException { List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).setRequestId(42).build(); - handler.respondToRequest(request, new RequestInfo(null)); + handler.respondToRequest(testWorkerIO, request, new RequestInfo(null)); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray())); @@ -102,7 +111,7 @@ public void testOutput() throws IOException { List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build(); - handler.respondToRequest(request, new RequestInfo(null)); + handler.respondToRequest(testWorkerIO, request, new RequestInfo(null)); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray())); @@ -124,7 +133,7 @@ public void testException() throws IOException { List args = Arrays.asList("--sources", "A.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build(); - handler.respondToRequest(request, new RequestInfo(null)); + handler.respondToRequest(testWorkerIO, request, new RequestInfo(null)); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray())); @@ -345,7 +354,8 @@ public void testCancelRequest_sendsNoResponseWhenAlreadySent() assertThat(response.getRequestId()).isEqualTo(42); assertThat(response.getWasCancelled()).isFalse(); assertThat(response.getExitCode()).isEqualTo(2); - assertThat(response.getOutput()).isEqualTo("Such work! Much progress! Wow!\n"); + assertThat(response.getOutput()) + .isEqualTo("Such work! Much progress! Wow!\nHandling request #0\nHandling request #1"); // Checks that nothing more was sent. assertThat(dest.available()).isEqualTo(0); @@ -425,7 +435,7 @@ public void testWorkRequestHandler_withWorkRequestCallback() throws IOException List args = Arrays.asList("--sources", "B.java"); WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build(); - handler.respondToRequest(request, new RequestInfo(null)); + handler.respondToRequest(testWorkerIO, request, new RequestInfo(null)); WorkResponse response = WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray())); @@ -493,4 +503,9 @@ public void testWorkerIO_doesCaptureStandardOutAndErrorStreams() throws Exceptio assertThat(io.readCapturedAsUtf8String()).isEmpty(); } } + + private WorkRequestHandler.WorkerIO createTestWorkerIO() { + ByteArrayOutputStream captured = new ByteArrayOutputStream(); + return new WorkRequestHandler.WorkerIO(System.in, System.out, System.err, captured, captured); + } }