Skip to content

Commit

Permalink
Refactor WorkRequestHandler to be an interface, of which Proto is one…
Browse files Browse the repository at this point in the history
… implementation.

This lays the foundation for WorkRequestHandler to be used for JSON workers as well.

RELNOTES: None.
PiperOrigin-RevId: 341078345
  • Loading branch information
susinmotion authored and copybara-github committed Nov 6, 2020
1 parent 50013dd commit a4251ea
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.devtools.build.buildjar.javac.plugins.BlazeJavaCompilerPlugin;
import com.google.devtools.build.buildjar.javac.plugins.dependency.DependencyModule;
import com.google.devtools.build.buildjar.javac.plugins.errorprone.ErrorPronePlugin;
import com.google.devtools.build.lib.worker.ProtoWorkerMessageProcessor;
import com.google.devtools.build.lib.worker.WorkRequestHandler;
import java.io.IOException;
import java.io.OutputStreamWriter;
Expand All @@ -41,8 +42,17 @@ public class BazelJavaBuilder {
public static void main(String[] args) {
BazelJavaBuilder builder = new BazelJavaBuilder();
if (args.length == 1 && args[0].equals("--persistent_worker")) {
WorkRequestHandler workerHandler = new WorkRequestHandler(builder::parseAndBuild);
System.exit(workerHandler.processRequests(System.in, System.out, System.err));
WorkRequestHandler workerHandler =
new WorkRequestHandler(
builder::parseAndBuild,
System.err,
new ProtoWorkerMessageProcessor(System.in, System.out));
try {
workerHandler.processRequests();
} catch (IOException e) {
System.err.println(e.getMessage());
System.exit(1);
}
} else {
PrintWriter pw =
new PrintWriter(new OutputStreamWriter(System.err, Charset.defaultCharset()));
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/google/devtools/build/lib/worker/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ java_library(
java_library(
name = "work_request_handlers",
srcs = [
"ProtoWorkerMessageProcessor.java",
"WorkRequestHandler.java",
],
deps = [
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2020 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.worker;

import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;

/** Implementation of the Worker Protocol using Proto to communicate with Bazel. */
public final class ProtoWorkerMessageProcessor
implements WorkRequestHandler.WorkerMessageProcessor {

/** This worker's stdin. */
private final InputStream stdin;

/** This worker's stdout. Only {@link WorkRequest}s should be written here. */
private final OutputStream stdout;

/** Constructs a {@link WorkRequestHandler} that reads and writes Protocol Buffers. */
public ProtoWorkerMessageProcessor(InputStream stdin, OutputStream stdout) {
this.stdin = stdin;
this.stdout = stdout;
}

@Override
public WorkRequest readWorkRequest() throws IOException {
return WorkRequest.parseDelimitedFrom(stdin);
}

@Override
public void writeWorkResponse(WorkResponse workResponse) throws IOException {
try {
workResponse.writeDelimitedTo(stdout);
} finally {
stdout.flush();
}
}

@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand All @@ -29,69 +28,86 @@
* (https://docs.bazel.build/versions/master/persistent-workers.html), including multiplex workers
* (https://docs.bazel.build/versions/master/multiplex-worker.html).
*/
public class WorkRequestHandler {
public class WorkRequestHandler implements AutoCloseable {

/** Contains the logic for reading {@link WorkRequest}s and writing {@link WorkResponse}s. */
public interface WorkerMessageProcessor {
/** Reads the next incoming request from this worker's stdin. */
public WorkRequest readWorkRequest() throws IOException;

/**
* Writes the provided {@link WorkResponse} to this worker's stdout. This function is also
* responsible for flushing the stdout.
*/
public void writeWorkResponse(WorkResponse workResponse) throws IOException;

/** Clean up. */
public void close() throws IOException;
}

/** The function to be called after each {@link WorkRequest} is read. */
private final BiFunction<List<String>, PrintWriter, Integer> callback;

/** This worker's stderr. */
private final PrintStream stderr;

private final WorkerMessageProcessor messageProcessor;

/**
* Creates a {@code WorkRequestHandler} that will call {@code callback} for each WorkRequest
* received. The first argument to {@code callback} is the set of command-line arguments, the
* second is where all error messages and similar should be written to.
* second is where all error messages and similar should be written to. The callback should return
* an exit code indicating success (0) or failure (nonzero).
*/
public WorkRequestHandler(BiFunction<List<String>, PrintWriter, Integer> callback) {
public WorkRequestHandler(
BiFunction<List<String>, PrintWriter, Integer> callback,
PrintStream stderr,
WorkerMessageProcessor messageProcessor) {
this.callback = callback;
this.stderr = stderr;
this.messageProcessor = messageProcessor;
}

/**
* Runs an infinite loop of reading {@code WorkRequest} from {@code in}, running the callback,
* then writing the corresponding {@code WorkResponse} to {@code out}. If there is an error
* Runs an infinite loop of reading {@link WorkRequest} from {@code in}, running the callback,
* then writing the corresponding {@link WorkResponse} to {@code out}. If there is an error
* reading or writing the requests or responses, it writes an error message on {@code err} and
* returns. If {@code in} reaches EOF, it also returns.
*
* @return 0 if we reached EOF, 1 if there was an error.
*/
public int processRequests(InputStream in, PrintStream out, PrintStream err) {
public void processRequests() throws IOException {
while (true) {
try {
WorkRequest request = WorkRequest.parseDelimitedFrom(in);

WorkRequest request = messageProcessor.readWorkRequest();
if (request == null) {
break;
}

if (request.getRequestId() != 0) {
Thread t = createResponseThread(request, out, err);
Thread t = createResponseThread(request);
t.start();
} else {
respondToRequest(request, out);
}
} catch (IOException e) {
e.printStackTrace(err);
return 1;
respondToRequest(request);
}
}
return 0;
}

/** Creates a new {@code Thread} to process a multiplex request. */
public Thread createResponseThread(WorkRequest request, PrintStream out, PrintStream err) {
/** Creates a new {@link Thread} to process a multiplex request. */
public Thread createResponseThread(WorkRequest request) {
Thread currentThread = Thread.currentThread();
return new Thread(
() -> {
try {
respondToRequest(request, out);
respondToRequest(request);
} catch (IOException e) {
e.printStackTrace(err);
e.printStackTrace(stderr);
// In case of error, shut down the entire worker.
currentThread.interrupt();
}
},
"multiplex-request-" + request.getRequestId());
}

/** Responds to {@code request}, writing the {@code WorkResponse} proto to {@code out}. */
/** Handles and responds to the given {@link WorkRequest}. */
@VisibleForTesting
void respondToRequest(WorkRequest request, PrintStream out) throws IOException {
void respondToRequest(WorkRequest request) throws IOException {
try (StringWriter sw = new StringWriter();
PrintWriter pw = new PrintWriter(sw)) {
int exitCode;
Expand All @@ -109,14 +125,13 @@ void respondToRequest(WorkRequest request, PrintStream out) throws IOException {
.setRequestId(request.getRequestId())
.build();
synchronized (this) {
workResponse.writeDelimitedTo(out);
messageProcessor.writeWorkResponse(workResponse);
}
}
out.flush();
}

// Hint to the system that now would be a good time to run a gc. After a compile
// completes lots of objects should be available for collection and it should be cheap to
// collect them.
System.gc();
@Override
public void close() throws IOException {
messageProcessor.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,16 @@ public void init() {

@Test
public void testNormalWorkRequest() throws IOException {
WorkRequestHandler handler = new WorkRequestHandler((args, err) -> 1);
ByteArrayOutputStream out = new ByteArrayOutputStream();
WorkRequestHandler handler =
new WorkRequestHandler(
(args, err) -> 1,
new PrintStream(new ByteArrayOutputStream()),
new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
handler.respondToRequest(request, new PrintStream(out));
handler.respondToRequest(request);

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand All @@ -57,12 +61,16 @@ public void testNormalWorkRequest() throws IOException {

@Test
public void testMultiplexWorkRequest() throws IOException {
WorkRequestHandler handler = new WorkRequestHandler((args, err) -> 0);
ByteArrayOutputStream out = new ByteArrayOutputStream();
WorkRequestHandler handler =
new WorkRequestHandler(
(args, err) -> 0,
new PrintStream(new ByteArrayOutputStream()),
new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).setRequestId(42).build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
handler.respondToRequest(request, new PrintStream(out));
handler.respondToRequest(request);

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand All @@ -73,17 +81,19 @@ public void testMultiplexWorkRequest() throws IOException {

@Test
public void testOutput() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
WorkRequestHandler handler =
new WorkRequestHandler(
(args, err) -> {
err.println("Failed!");
return 1;
});
},
new PrintStream(new ByteArrayOutputStream()),
new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
handler.respondToRequest(request, new PrintStream(out));
handler.respondToRequest(request);

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand All @@ -94,16 +104,18 @@ public void testOutput() throws IOException {

@Test
public void testException() throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
WorkRequestHandler handler =
new WorkRequestHandler(
(args, err) -> {
throw new RuntimeException("Exploded!");
});
},
new PrintStream(new ByteArrayOutputStream()),
new ProtoWorkerMessageProcessor(new ByteArrayInputStream(new byte[0]), out));

List<String> args = Arrays.asList("--sources", "A.java");
WorkRequest request = WorkRequest.newBuilder().addAllArguments(args).build();
ByteArrayOutputStream out = new ByteArrayOutputStream();
handler.respondToRequest(request, new PrintStream(out));
handler.respondToRequest(request);

WorkResponse response =
WorkResponse.parseDelimitedFrom(new ByteArrayInputStream(out.toByteArray()));
Expand Down

0 comments on commit a4251ea

Please sign in to comment.