Skip to content

Commit

Permalink
Send and receive work requests via proxy and multiplexer
Browse files Browse the repository at this point in the history
For each unique WorkerKey, Bazel can launch a multiplexer to talk to one multi-threaded worker process optionally. We use less JVM processes but maintain the approximately same performance, hence, save more memory. The worker process should be able to handle multiple requests to fully utilize this feature.

Fix: bazelbuild#2832
  • Loading branch information
borkaehw committed Sep 12, 2019
1 parent a235269 commit a6673ec
Show file tree
Hide file tree
Showing 19 changed files with 1,774 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,8 @@ public String parseIfMatches(String tag) throws ValidationException {
/** If an action supports running in persistent worker mode. */
public static final String SUPPORTS_WORKERS = "supports-workers";

public static final String SUPPORTS_MULTIPLEX_WORKERS = "supports-multiplex-workers";

public static final ImmutableMap<String, String> WORKER_MODE_ENABLED =
ImmutableMap.of(SUPPORTS_WORKERS, "1");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ public static boolean supportsWorkers(Spawn spawn) {
return "1".equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_WORKERS));
}

/**
* Returns whether a Spawn claims to support being executed with the persistent multiplex worker strategy
* according to its execution info tags.
*/
public static boolean supportsMultiplexWorkers(Spawn spawn) {
return "1".equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_MULTIPLEX_WORKERS));
}

/**
* Parse the timeout key in the spawn execution info, if it exists. Otherwise, return -1.
*/
Expand Down
44 changes: 36 additions & 8 deletions src/main/java/com/google/devtools/build/lib/worker/Worker.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.google.devtools.build.lib.shell.SubprocessBuilder;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest;
import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -41,10 +43,26 @@
* class.
*/
class Worker {
private final WorkerKey workerKey;
private final int workerId;
private final Path workDir;
private final Path logFile;
/**
* An unique identifier of the work process.
*/
protected final WorkerKey workerKey;
/**
* An unique ID of the worker. It will be used in WorkRequest and WorkResponse as well.
*/
protected final int workerId;
/**
* The execution root of the worker.
*/
protected final Path workDir;
/**
* The path of the log file.
*/
protected final Path logFile;
/**
* Stream for reading the WorkResponse.
*/
protected RecordingInputStream recordingStream;

private Subprocess process;
private Thread shutdownHook;
Expand Down Expand Up @@ -141,12 +159,22 @@ boolean isAlive() {
return !process.finished();
}

InputStream getInputStream() {
return process.getInputStream();
void putRequest(WorkRequest request) throws IOException {
request.writeDelimitedTo(process.getOutputStream());
process.getOutputStream().flush();
}

WorkResponse getResponse() throws IOException {
recordingStream = new RecordingInputStream(process.getInputStream());
recordingStream.startRecording(4096);
// response can be null when the worker has already closed stdout at this point and thus
// the InputStream is at EOF.
return WorkResponse.parseDelimitedFrom(recordingStream);
}

OutputStream getOutputStream() {
return process.getOutputStream();
String getRecordingStreamMessage() {
recordingStream.readRemaining();
return recordingStream.getRecordedDataAsString();
}

public void prepareExecution(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public Worker create(WorkerKey key) throws Exception {
if (sandboxed) {
Path workDir = getSandboxedWorkerPath(key, workerId);
worker = new SandboxedWorker(key, workerId, workDir, logFile);
} else if (key.getProxied()) {
worker = new WorkerProxy(key, workerId, key.getExecRoot(), logFile, WorkerMultiplexerManager.getInstance(key.hashCode()));
} else {
worker = new Worker(key, workerId, key.getExecRoot(), logFile);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ final class WorkerKey {
private final HashCode workerFilesCombinedHash;
private final SortedMap<PathFragment, HashCode> workerFilesWithHashes;
private final boolean mustBeSandboxed;
/** A WorkerProxy will be instantiated if true, instantiate a regular Worker if false. */
private final boolean proxied;

WorkerKey(
List<String> args,
Expand All @@ -50,44 +52,66 @@ final class WorkerKey {
String mnemonic,
HashCode workerFilesCombinedHash,
SortedMap<PathFragment, HashCode> workerFilesWithHashes,
boolean mustBeSandboxed) {
boolean mustBeSandboxed,
boolean proxied) {
/** Build options. */
this.args = ImmutableList.copyOf(Preconditions.checkNotNull(args));
/** Environment variables. */
this.env = ImmutableMap.copyOf(Preconditions.checkNotNull(env));
/** Execution root of Bazel process. */
this.execRoot = Preconditions.checkNotNull(execRoot);
/** Mnemonic of the worker. */
this.mnemonic = Preconditions.checkNotNull(mnemonic);
/** One combined hash code for all files. */
this.workerFilesCombinedHash = Preconditions.checkNotNull(workerFilesCombinedHash);
/** Worker files with the corresponding hash code. */
this.workerFilesWithHashes = Preconditions.checkNotNull(workerFilesWithHashes);
/** Set it to true if this job should be run in sandbox. */
this.mustBeSandboxed = mustBeSandboxed;
/** Set it to true if this job should be run with WorkerProxy. */
this.proxied = proxied;
}

/** Getter function for variable args. */
public ImmutableList<String> getArgs() {
return args;
}

/** Getter function for variable env. */
public ImmutableMap<String, String> getEnv() {
return env;
}

/** Getter function for variable execRoot. */
public Path getExecRoot() {
return execRoot;
}

/** Getter function for variable mnemonic. */
public String getMnemonic() {
return mnemonic;
}

/** Getter function for variable workerFilesCombinedHash. */
public HashCode getWorkerFilesCombinedHash() {
return workerFilesCombinedHash;
}

/** Getter function for variable workerFilesWithHashes. */
public SortedMap<PathFragment, HashCode> getWorkerFilesWithHashes() {
return workerFilesWithHashes;
}

/** Getter function for variable mustBeSandboxed. */
public boolean mustBeSandboxed() {
return mustBeSandboxed;
}

/** Getter function for variable proxied. */
public boolean getProxied() {
return proxied;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Loading

0 comments on commit a6673ec

Please sign in to comment.