Skip to content

Commit

Permalink
On add-support-for-wrapping-system-streams-in-workrequesthandler: aut…
Browse files Browse the repository at this point in the history
…ostash
  • Loading branch information
Bencodes committed Jan 10, 2022
2 parents 901c960 + 3253115 commit 008280a
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,16 @@ public WorkRequestHandler build() {
* then writing the corresponding {@link WorkResponse} to {@code out}. If there is an error
* reading or writing the requests or responses, it writes an error message on {@code err} and
* returns. If {@code in} reaches EOF, it also returns.
*
* <p>This function also wraps the system streams in a {@link WorkerIO} instance that prevents
* calls to {@link System#out} and {@link System#err} from corruption the worker streams. When the
* while loop exits, the original system streams will be swapped back into {@link System}.
*/
public void processRequests() throws IOException {
// Wrap the system streams into a WorkerIO instance that can be used
// to capture output and error streams that aren't being written to the worker output directly.
WorkerIO workerIO = WorkerIO.capture();

try {
while (true) {
WorkRequest request = messageProcessor.readWorkRequest();
Expand All @@ -324,17 +332,24 @@ public void processRequests() throws IOException {
if (request.getCancel()) {
respondToCancelRequest(request);
} else {
startResponseThread(request);
startResponseThread(workerIO, request);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
stderr.println("InterruptedException processing requests.");
} finally {
try {
// Unwrap the system streams placing the original streams back
workerIO.close();
} catch (Exception e) {
stderr.println(e.getMessage());
}
}
}

/** Starts a thread for the given request. */
void startResponseThread(WorkRequest request) throws InterruptedException {
void startResponseThread(WorkerIO workerIO, WorkRequest request) throws InterruptedException {
Thread currentThread = Thread.currentThread();
String threadName =
request.getRequestId() > 0
Expand All @@ -358,7 +373,7 @@ void startResponseThread(WorkRequest request) throws InterruptedException {
return;
}
try {
respondToRequest(request, requestInfo);
respondToRequest(workerIO, request, requestInfo);
} catch (IOException e) {
e.printStackTrace(stderr);
// In case of error, shut down the entire worker.
Expand All @@ -378,7 +393,8 @@ void startResponseThread(WorkRequest request) throws InterruptedException {

/** Handles and responds to the given {@link WorkRequest}. */
@VisibleForTesting
void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOException {
void respondToRequest(WorkerIO workerIO, WorkRequest request, RequestInfo requestInfo)
throws IOException {
int exitCode;
StringWriter sw = new StringWriter();
try (PrintWriter pw = new PrintWriter(sw)) {
Expand All @@ -390,6 +406,16 @@ void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOExc
e.printStackTrace(pw);
exitCode = 1;
}

try {
// Read out the captured string for the final WorkResponse output
String captured = workerIO.readCapturedAsUtf8String().trim();
if (!captured.isEmpty()) {
pw.write(captured);
}
} catch (IOException e) {
stderr.println(e.getMessage());
}
}
Optional<WorkResponse.Builder> optBuilder = requestInfo.takeBuilder();
if (optBuilder.isPresent()) {
Expand Down Expand Up @@ -509,8 +535,8 @@ private void maybePerformGc() {
* <p>This is most useful when integrating JVM tools that write exceptions and logs directly to
* {@link System#out} and {@link System#err} which corrupt the persistent worker.
*
* <p>WorkerIO implements {@link AutoCloseable} and will swap the original streams back into {@link
* System} once close has been called.
* <p>WorkerIO implements {@link AutoCloseable} and will swap the original streams back into
* {@link System} once close has been called.
*/
public static class WorkerIO implements AutoCloseable {

Expand Down Expand Up @@ -569,22 +595,26 @@ public static WorkerIO capture() throws IOException {
}

/** Returns the original input stream most commonly provided by {@link System#in} */
public InputStream getOriginalInputStream() {
@VisibleForTesting
InputStream getOriginalInputStream() {
return originalInputStream;
}

/** Returns the original output stream most commonly provided by {@link System#out} */
public PrintStream getOriginalOutputStream() {
@VisibleForTesting
PrintStream getOriginalOutputStream() {
return originalOutputStream;
}

/** Returns the original error stream most commonly provided by {@link System#err} */
public PrintStream getOriginalErrorStream() {
@VisibleForTesting
PrintStream getOriginalErrorStream() {
return originalErrorStream;
}

/** Returns the captured outputs as a UTF-8 string */
public String readCapturedAsUtf8String() throws IOException {
@VisibleForTesting
String readCapturedAsUtf8String() throws IOException {
capturedStream.flush();
String captureOutput = new String(capturedStream.toByteArray(), StandardCharsets.UTF_8);
capturedStream.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ private static class InterruptableWorkRequestHandler extends WorkRequestHandler

@Override
public void processRequests() throws IOException {
WorkerIO workerIO = WorkerIO.capture();
while (true) {
WorkRequest request = messageProcessor.readWorkRequest();
if (request == null) {
Expand All @@ -99,7 +100,7 @@ public void processRequests() throws IOException {
respondToCancelRequest(request);
} else {
try {
startResponseThread(request);
startResponseThread(workerIO, request);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// We don't expect interrupts at this level, only inside the individual request
Expand All @@ -112,6 +113,13 @@ public void processRequests() throws IOException {
System.exit(0);
}
}

try {
// Unwrap the system streams placing the original streams back
workerIO.close();
} catch (Exception e) {
workerIO.getOriginalErrorStream().println(e.getMessage());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Semaphore;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand All @@ -43,11 +45,18 @@
@RunWith(JUnit4.class)
public class WorkRequestHandlerTest {

private final WorkRequestHandler.WorkerIO testWorkerIO = createTestWorkerIO();

@Before
public void init() {
MockitoAnnotations.initMocks(this);
}

@After
public void after() throws Exception {
testWorkerIO.close();
}

@Test
public void testNormalWorkRequest() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
Expand All @@ -59,7 +68,7 @@ public void testNormalWorkRequest() throws IOException {

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
handler.respondToRequest(request, new RequestInfo(null));
handler.respondToRequest(testWorkerIO, request, new RequestInfo(null));

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand All @@ -79,7 +88,7 @@ public void testMultiplexWorkRequest() throws IOException {

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).setRequestId(42).build();
handler.respondToRequest(request, new RequestInfo(null));
handler.respondToRequest(testWorkerIO, request, new RequestInfo(null));

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand All @@ -102,7 +111,7 @@ public void testOutput() throws IOException {

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
handler.respondToRequest(request, new RequestInfo(null));
handler.respondToRequest(testWorkerIO, request, new RequestInfo(null));

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand All @@ -124,7 +133,7 @@ public void testException() throws IOException {

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
handler.respondToRequest(request, new RequestInfo(null));
handler.respondToRequest(testWorkerIO, request, new RequestInfo(null));

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand Down Expand Up @@ -345,7 +354,8 @@ public void testCancelRequest_sendsNoResponseWhenAlreadySent()
assertThat(response.getRequestId()).isEqualTo(42);
assertThat(response.getWasCancelled()).isFalse();
assertThat(response.getExitCode()).isEqualTo(2);
assertThat(response.getOutput()).isEqualTo("Such work! Much progress! Wow!\n");
assertThat(response.getOutput())
.isEqualTo("Such work! Much progress! Wow!\nHandling request #0\nHandling request #1");

// Checks that nothing more was sent.
assertThat(dest.available()).isEqualTo(0);
Expand Down Expand Up @@ -425,7 +435,7 @@ public void testWorkRequestHandler_withWorkRequestCallback() throws IOException

List<String> args = Arrays.asList("--sources", "B.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
handler.respondToRequest(request, new RequestInfo(null));
handler.respondToRequest(testWorkerIO, request, new RequestInfo(null));

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand Down Expand Up @@ -493,4 +503,9 @@ public void testWorkerIO_doesCaptureStandardOutAndErrorStreams() throws Exceptio
assertThat(io.readCapturedAsUtf8String()).isEmpty();
}
}

private WorkRequestHandler.WorkerIO createTestWorkerIO() {
ByteArrayOutputStream captured = new ByteArrayOutputStream();
return new WorkRequestHandler.WorkerIO(System.in, System.out, System.err, captured, captured);
}
}

0 comments on commit 008280a

Please sign in to comment.