Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for wrapping system streams in WorkRequestHandler #14201

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.sun.management.OperatingSystemMXBean;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.management.ManagementFactory;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -317,8 +321,16 @@ public WorkRequestHandler build() {
* then writing the corresponding {@link WorkResponse} to {@code out}. If there is an error
* reading or writing the requests or responses, it writes an error message on {@code err} and
* returns. If {@code in} reaches EOF, it also returns.
*
* <p>This function also wraps the system streams in a {@link WorkerIO} instance that prevents
* calls to {@link System#out} and {@link System#err} from corruption the worker streams. When the
* while loop exits, the original system streams will be swapped back into {@link System}.
*/
public void processRequests() throws IOException {
// Wrap the system streams into a WorkerIO instance that can be used
// to capture output and error streams that aren't being written to the worker output directly.
WorkerIO workerIO = WorkerIO.capture();

try {
while (!shutdownWorker.get()) {
WorkRequest request = messageProcessor.readWorkRequest();
Expand All @@ -328,31 +340,39 @@ public void processRequests() throws IOException {
if (request.getCancel()) {
respondToCancelRequest(request);
} else {
startResponseThread(request);
startResponseThread(workerIO, request);
}
}
} catch (IOException e) {
stderr.println("Error reading next WorkRequest: " + e);
e.printStackTrace(stderr);
}
// TODO(b/220878242): Give the outstanding requests a chance to send a "shutdown" response,
// but also try to kill stuck threads. For now, we just interrupt the remaining threads.
// We considered doing System.exit here, but that is hard to test and would deny the callers
// of this method a chance to clean up. Instead, we initiate the cleanup of our resources here
// and the caller can decide whether to wait for an orderly shutdown or now.
for (RequestInfo ri : activeRequests.values()) {
if (ri.thread.isAlive()) {
try {
ri.thread.interrupt();
} catch (RuntimeException e) {
// If we can't interrupt, we can't do much else.
} finally {
// TODO(b/220878242): Give the outstanding requests a chance to send a "shutdown" response,
// but also try to kill stuck threads. For now, we just interrupt the remaining threads.
// We considered doing System.exit here, but that is hard to test and would deny the callers
// of this method a chance to clean up. Instead, we initiate the cleanup of our resources here
// and the caller can decide whether to wait for an orderly shutdown or now.
for (RequestInfo ri : activeRequests.values()) {
if (ri.thread.isAlive()) {
try {
ri.thread.interrupt();
} catch (RuntimeException e) {
// If we can't interrupt, we can't do much else.
}
}
}

try {
// Unwrap the system streams placing the original streams back
workerIO.close();
} catch (Exception e) {
stderr.println(e.getMessage());
}
}
}

/** Starts a thread for the given request. */
void startResponseThread(WorkRequest request) {
void startResponseThread(WorkerIO workerIO, WorkRequest request) {
Thread currentThread = Thread.currentThread();
String threadName =
request.getRequestId() > 0
Expand Down Expand Up @@ -381,7 +401,7 @@ void startResponseThread(WorkRequest request) {
return;
}
try {
respondToRequest(request, requestInfo);
respondToRequest(workerIO, request, requestInfo);
} catch (IOException e) {
// IOExceptions here means a problem talking to the server, so we must shut down.
if (!shutdownWorker.compareAndSet(false, true)) {
Expand Down Expand Up @@ -419,7 +439,8 @@ void startResponseThread(WorkRequest request) {
* #callback} are reported with exit code 1.
*/
@VisibleForTesting
void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOException {
void respondToRequest(WorkerIO workerIO, WorkRequest request, RequestInfo requestInfo)
throws IOException {
int exitCode;
StringWriter sw = new StringWriter();
try (PrintWriter pw = new PrintWriter(sw)) {
Expand All @@ -431,6 +452,16 @@ void respondToRequest(WorkRequest request, RequestInfo requestInfo) throws IOExc
e.printStackTrace(pw);
exitCode = 1;
}

try {
// Read out the captured string for the final WorkResponse output
String captured = workerIO.readCapturedAsUtf8String().trim();
if (!captured.isEmpty()) {
pw.write(captured);
}
} catch (IOException e) {
stderr.println(e.getMessage());
}
}
Optional<WorkResponse.Builder> optBuilder = requestInfo.takeBuilder();
if (optBuilder.isPresent()) {
Expand Down Expand Up @@ -541,4 +572,104 @@ private void maybePerformGc() {
}
}
}

/**
* Class that wraps the standard {@link System#in}, {@link System#out}, and {@link System#err}
* with our own ByteArrayOutputStream that allows {@link WorkRequestHandler} to safely capture
* outputs that can't be directly captured by the PrintStream associated with the work request.
*
* <p>This is most useful when integrating JVM tools that write exceptions and logs directly to
* {@link System#out} and {@link System#err} which corrupt the persistent worker.
*
* <p>WorkerIO implements {@link AutoCloseable} and will swap the original streams back into
* {@link System} once close has been called.
*/
public static class WorkerIO implements AutoCloseable {

private final InputStream originalInputStream;
private final PrintStream originalOutputStream;
private final PrintStream originalErrorStream;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrapping System.err isn't actually be necessary for a wrapper worker io.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Also System.in is already handled properly. Only System.out needs to be wrapped.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also System.in is already handled properly. Only System.out needs to be wrapped.

The motivation behind wrapping all of the system streams in a WorkerIO class is that there is a clear and consistent place to access the root system streams while setting up the work request handler. For System.in and System.err we can just return the original streams when calling getOriginalInputStream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Upon further reflection, what you do makes sense, but not in the way you use it in the example. Instead of wrapping the whole WorkRequestHandler, wrapping the inside of the WorkRequestCallback would be clearer. You wouldn't even need to have the accessors to the original streams. That prevents the tool from attempting to read from stdin and corrupting the communication.

That change would also mean that the WorkerIO doesn't need to reset the buffer, as it'll be thrown away once a request is done. Plus in case processRequest in your example throws an exception the WorkerIO will be closed properly.

A definite benefit of either approach is that stdout and stderr get merged into a single stream, that will be easier to understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wrapping the inside of the WorkRequestCallback would be clearer

By this do you mean calling capture from within the implementation of WorkRequestHandler, or in the lambda that gets passed into the WorkRequestHandler? I like idea of this being hidden inside the WorkRequestCallback because we hide WorkerIO#readCapturedAsUtf8String entirely which is kind of an awkward API to need to expose.

That change would also mean that the WorkerIO doesn't need to reset the

Would each work request call WorkerIO#capture swapping in it's own buffer?

A definite benefit of either approach is that stdout and stderr get merged into a single stream, that will be easier to understand.

Wrapping the stderr could be a bit error-prone because if there's ever an unrecoverable error that causes the worker to not send a work response, you just won't see an exception come out of the worker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does your current solution deal with multiplex workers?

@larsrc-google I took another pass at building this into the WorkRequestHandler so that it's a bit more transparent, and with the multiplex worker cases in mind. The new approach uses the existing WorkerIO implementation, but wraps the system streams when you call WorkRequestHandler#processRequests.

The WorkerIO instance gets passed down into respondToRequest so that there is a single source of truth for the wrapped streams, and so that we don't have to deal with multiple WorkerIO instances being created inside of respondToRequest which doesn't really make sense in a multiplex-world where respondToRequest may get called concurrently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I've been so focused on the implementation details that I forgot the larger problem: Redirecting the stderr messages into the output field of the response changes semantics. The output field is for shorter messages to be displayed to the user, stderr goes into the logs. See https://docs.bazel.build/versions/main/creating-workers.html#work-responses. It's known that multiplex workers only have one log between them, but the messages shown to the user should not interleave.

And of course, since the multiplex threads still share one System.err instance regardless of what we set, we can't possibly disentangle that. But we can still fix the main issue of stdout getting corrupted by setting it to the same stream as stderr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure i'm understanding next steps here - You'd prefer that we avoid capturing the tool output streams and instead just swap the System.err in for System.out to not pollute the work response output?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Having System.err and System.out go to the same place (the logs) is good. Changing the work response output would cause a lot of noise.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushed the changes for swapping System.err in for System.out and updated the tests.

private final ByteArrayOutputStream capturedStream;
private final AutoCloseable restore;

/**
* Creates a new {@link WorkerIO} that allows {@link WorkRequestHandler} to capture standard
* output and error streams that can't be directly captured by the PrintStream associated with
* the work request.
*/
@VisibleForTesting
WorkerIO(
InputStream originalInputStream,
PrintStream originalOutputStream,
PrintStream originalErrorStream,
ByteArrayOutputStream capturedStream,
AutoCloseable restore) {
this.originalInputStream = originalInputStream;
this.originalOutputStream = originalOutputStream;
this.originalErrorStream = originalErrorStream;
this.capturedStream = capturedStream;
this.restore = restore;
}

/** Wraps the standard System streams and WorkerIO instance */
public static WorkerIO capture() throws IOException {
// Save the original streams
InputStream originalInputStream = System.in;
PrintStream originalOutputStream = System.out;
PrintStream originalErrorStream = System.err;

// Replace the original streams with our own instances
ByteArrayOutputStream capturedStream = new ByteArrayOutputStream();
PrintStream outputBuffer = new PrintStream(capturedStream, true);
ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(new byte[0]);
System.setIn(byteArrayInputStream);
System.setOut(outputBuffer);
System.setErr(outputBuffer);

return new WorkerIO(
originalInputStream,
originalOutputStream,
originalErrorStream,
capturedStream,
() -> {
System.setIn(originalInputStream);
System.setOut(originalOutputStream);
System.setErr(originalErrorStream);
outputBuffer.close();
byteArrayInputStream.close();
});
}

/** Returns the original input stream most commonly provided by {@link System#in} */
@VisibleForTesting
InputStream getOriginalInputStream() {
return originalInputStream;
}

/** Returns the original output stream most commonly provided by {@link System#out} */
@VisibleForTesting
PrintStream getOriginalOutputStream() {
return originalOutputStream;
}

/** Returns the original error stream most commonly provided by {@link System#err} */
@VisibleForTesting
PrintStream getOriginalErrorStream() {
return originalErrorStream;
}

/** Returns the captured outputs as a UTF-8 string */
@VisibleForTesting
String readCapturedAsUtf8String() throws IOException {
capturedStream.flush();
String captureOutput = capturedStream.toString(StandardCharsets.UTF_8);
capturedStream.reset();
return captureOutput;
}

@Override
public void close() throws Exception {
restore.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ private static class InterruptableWorkRequestHandler extends WorkRequestHandler
@Override
@SuppressWarnings("SystemExitOutsideMain")
public void processRequests() throws IOException {
ByteArrayOutputStream captured = new ByteArrayOutputStream();
WorkerIO workerIO = new WorkerIO(System.in, System.out, System.err, captured, captured);

while (true) {
WorkRequest request = messageProcessor.readWorkRequest();
if (request == null) {
Expand All @@ -100,12 +103,19 @@ public void processRequests() throws IOException {
if (request.getCancel()) {
respondToCancelRequest(request);
} else {
startResponseThread(request);
startResponseThread(workerIO, request);
}
if (workerOptions.exitAfter > 0 && workUnitCounter > workerOptions.exitAfter) {
System.exit(0);
}
}

try {
// Unwrap the system streams placing the original streams back
workerIO.close();
} catch (Exception e) {
workerIO.getOriginalErrorStream().println(e.getMessage());
}
}
}

Expand Down
Loading