diff --git a/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java b/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java index d7bb88ae856ebc..0314e6d506ebd4 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java +++ b/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java @@ -157,6 +157,8 @@ public String parseIfMatches(String tag) throws ValidationException { /** If an action supports running in persistent worker mode. */ public static final String SUPPORTS_WORKERS = "supports-workers"; + public static final String SUPPORTS_MULTIPLEX_WORKERS = "supports-multiplex-workers"; + public static final ImmutableMap WORKER_MODE_ENABLED = ImmutableMap.of(SUPPORTS_WORKERS, "1"); diff --git a/src/main/java/com/google/devtools/build/lib/actions/Spawns.java b/src/main/java/com/google/devtools/build/lib/actions/Spawns.java index d90cd1d5b5a956..6f5cf1f1929933 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/Spawns.java +++ b/src/main/java/com/google/devtools/build/lib/actions/Spawns.java @@ -72,6 +72,14 @@ public static boolean supportsWorkers(Spawn spawn) { return "1".equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_WORKERS)); } + /** + * Returns whether a Spawn claims to support being executed with the persistent multiplex worker strategy + * according to its execution info tags. + */ + public static boolean supportsMultiplexWorkers(Spawn spawn) { + return "1".equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_MULTIPLEX_WORKERS)); + } + /** * Parse the timeout key in the spawn execution info, if it exists. Otherwise, return -1. */ diff --git a/src/main/java/com/google/devtools/build/lib/worker/Worker.java b/src/main/java/com/google/devtools/build/lib/worker/Worker.java index 1db19f5076129e..fe15210fb7abce 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/Worker.java +++ b/src/main/java/com/google/devtools/build/lib/worker/Worker.java @@ -41,10 +41,22 @@ * class. */ class Worker { - private final WorkerKey workerKey; - private final int workerId; - private final Path workDir; - private final Path logFile; + /** + * An unique identifier of the work process. + */ + protected final WorkerKey workerKey; + /** + * An unique ID of the worker. It will be used in WorkRequest and WorkResponse as well. + */ + protected final int workerId; + /** + * The execution root of the worker. + */ + protected final Path workDir; + /** + * The path of the log file. + */ + protected final Path logFile; private Subprocess process; private Thread shutdownHook; diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java index 665060473f1591..166b86b3a6ae3d 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java @@ -62,6 +62,8 @@ public Worker create(WorkerKey key) throws Exception { if (sandboxed) { Path workDir = getSandboxedWorkerPath(key, workerId); worker = new SandboxedWorker(key, workerId, workDir, logFile); + } else if (key.proxied()) { + worker = new WorkerProxy(key, workerId, key.getExecRoot(), logFile); } else { worker = new Worker(key, workerId, key.getExecRoot(), logFile); } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java index 55b0910239b5d8..3afaf680924d8d 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java @@ -42,6 +42,10 @@ final class WorkerKey { private final HashCode workerFilesCombinedHash; private final SortedMap workerFilesWithHashes; private final boolean mustBeSandboxed; + /** + * A WorkerProxy will be instantiated if true, instantiate a regular Worker if false. + */ + private final boolean proxied; WorkerKey( List args, @@ -50,14 +54,24 @@ final class WorkerKey { String mnemonic, HashCode workerFilesCombinedHash, SortedMap workerFilesWithHashes, - boolean mustBeSandboxed) { + boolean mustBeSandboxed, + boolean proxied) { + /** Build options. */ this.args = ImmutableList.copyOf(Preconditions.checkNotNull(args)); + /** Environment variables. */ this.env = ImmutableMap.copyOf(Preconditions.checkNotNull(env)); + /** Execution root of Bazel process. */ this.execRoot = Preconditions.checkNotNull(execRoot); + /** Mnemonic of the worker. */ this.mnemonic = Preconditions.checkNotNull(mnemonic); + /** One combined hash code for all files. */ this.workerFilesCombinedHash = Preconditions.checkNotNull(workerFilesCombinedHash); + /** Worker files with the corresponding hash code. */ this.workerFilesWithHashes = Preconditions.checkNotNull(workerFilesWithHashes); + /** Set it to true if this job should be run in sandbox. */ this.mustBeSandboxed = mustBeSandboxed; + /** Set it to true if this job should be run with WorkerProxy. */ + this.proxied = proxied; } public ImmutableList getArgs() { @@ -88,6 +102,10 @@ public boolean mustBeSandboxed() { return mustBeSandboxed; } + public boolean proxied() { + return proxied; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java new file mode 100644 index 00000000000000..a21c13a9d82b6d --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java @@ -0,0 +1,211 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.worker; + +import com.google.devtools.build.lib.shell.Subprocess; +import com.google.devtools.build.lib.shell.SubprocessBuilder; +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; +import com.google.devtools.build.lib.vfs.Path; +import java.io.File; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Semaphore; + +/** An intermediate worker that sends request and receives response from the worker processes. */ +public class WorkerMultiplexer extends Thread { + /** + * WorkerMultiplexer is running as a thread on its own. When worker process + * returns the WorkResponse, it is stored in this map and wait for + * WorkerProxy to retrieve the response. + */ + private Map workerProcessResponse; + /** + * A semaphore to protect workerProcessResponse object. + */ + private Semaphore semWorkerProcessResponse; + /** + * After sending the WorkRequest, WorkerProxy will wait on a semaphore to be + * released. WorkerMultiplexer is responsible to release the corresponding + * semaphore in order to signal WorkerProxy that the WorkerResponse has been + * received. + */ + private Map responseChecker; + /** + * A semaphore to protect responseChecker object. + */ + private Semaphore semResponseChecker; + /** + * The worker process that this WorkerMultiplexer should be talking to. + */ + private Subprocess process; + + private Thread shutdownHook; + private Integer workerHash; + + WorkerMultiplexer(Integer workerHash) { + semWorkerProcessResponse = new Semaphore(1); + semResponseChecker = new Semaphore(1); + responseChecker = new HashMap<>(); + workerProcessResponse = new HashMap<>(); + this.workerHash = workerHash; + + final WorkerMultiplexer self = this; + this.shutdownHook = + new Thread( + () -> { + try { + self.shutdownHook = null; + self.destroyMultiplexer(); + } finally { + // We can't do anything here. + } + }); + Runtime.getRuntime().addShutdownHook(shutdownHook); + } + + /** + * Only start one worker process for each WorkerMultiplexer, if it hasn't. + */ + public synchronized void createProcess(WorkerKey workerKey, Path workDir, Path logFile) throws IOException { + if (this.process == null) { + List args = workerKey.getArgs(); + File executable = new File(args.get(0)); + if (!executable.isAbsolute() && executable.getParent() != null) { + args = new ArrayList<>(args); + args.set(0, new File(workDir.getPathFile(), args.get(0)).getAbsolutePath()); + } + SubprocessBuilder processBuilder = new SubprocessBuilder(); + processBuilder.setArgv(args); + processBuilder.setWorkingDirectory(workDir.getPathFile()); + processBuilder.setStderr(logFile.getPathFile()); + processBuilder.setEnv(workerKey.getEnv()); + this.process = processBuilder.start(); + } + if (!this.isAlive()) { + this.start(); + } + } + + public synchronized void destroyMultiplexer() { + if (shutdownHook != null) { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + if (this.process != null) { + destroyProcess(this.process); + } + } + + private void destroyProcess(Subprocess process) { + boolean wasInterrupted = false; + try { + process.destroy(); + while (true) { + try { + process.waitFor(); + return; + } catch (InterruptedException ie) { + wasInterrupted = true; + } + } + } finally { + // Read this for detailed explanation: http://www.ibm.com/developerworks/library/j-jtp05236/ + if (wasInterrupted) { + Thread.currentThread().interrupt(); // preserve interrupted status + } + } + } + + public boolean isProcessAlive() { + return !this.process.finished(); + } + + /** + * Pass the WorkRequest to worker process. + */ + public synchronized void putRequest(byte[] request) throws IOException { + OutputStream stdin = process.getOutputStream(); + stdin.write(request); + stdin.flush(); + } + + /** + * A WorkerProxy waits on a semaphore for the WorkResponse returned from worker process. + */ + public InputStream getResponse(Integer workerId) throws InterruptedException { + semResponseChecker.acquire(); + Semaphore waitForResponse = responseChecker.get(workerId); + semResponseChecker.release(); + + // If there is a compilation error, the semaphore will throw InterruptedException. + waitForResponse.acquire(); + + semWorkerProcessResponse.acquire(); + InputStream response = workerProcessResponse.get(workerId); + semWorkerProcessResponse.release(); + return response; + } + + /** + * Reset the map that indicates if the WorkResponses have been returned. + */ + public void resetResponseChecker(Integer workerId) throws InterruptedException { + semResponseChecker.acquire(); + responseChecker.put(workerId, new Semaphore(0)); + semResponseChecker.release(); + } + + /** + * When it gets a WorkResponse from worker process, put that WorkResponse in + * workerProcessResponse and signal responseChecker. + */ + private void waitResponse() throws InterruptedException, IOException { + InputStream stdout = process.getInputStream(); + WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(stdout); + + if (parsedResponse == null) return; + + Integer workerId = parsedResponse.getRequestId(); + ByteArrayOutputStream tempOs = new ByteArrayOutputStream(); + parsedResponse.writeDelimitedTo(tempOs); + + semWorkerProcessResponse.acquire(); + workerProcessResponse.put(workerId, new ByteArrayInputStream(tempOs.toByteArray())); + semWorkerProcessResponse.release(); + + semResponseChecker.acquire(); + responseChecker.get(workerId).release(); + semResponseChecker.release(); + } + + /** + * A multiplexer thread that listens to the WorkResponses from worker process. + */ + public void run() { + while (!this.interrupted()) { + try { + waitResponse(); + } catch (Exception e) { + // We can't do anything here. + } + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java new file mode 100644 index 00000000000000..bb3dc175e86c75 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java @@ -0,0 +1,75 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.worker; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Semaphore; + +/** + * An intermediate worker that sends request and receives response from the + * worker processes. + */ +public class WorkerMultiplexerManager { + /** + * There should only be one WorkerMultiplexer corresponding to workers with + * the same mnemonic. If the WorkerMultiplexer has been constructed, other + * workers should point to the same one. The hash of WorkerKey is used as + * key. + */ + private static Map multiplexerInstance = new HashMap<>(); + /** + * An accumulator of how many WorkerProxies are referencing a particular + * WorkerMultiplexer. + */ + private static Map multiplexerRefCount = new HashMap<>(); + /** + * A semaphore to protect multiplexerInstance and multiplexerRefCount objects. + */ + private static Semaphore semMultiplexer = new Semaphore(1); + + /** + * Returns a WorkerMultiplexer instance to WorkerProxy. WorkerProxies with the + * same workerHash talk to the same WorkerMultiplexer. Also, record how many + * WorkerProxies are talking to this WorkerMultiplexer. + */ + public static WorkerMultiplexer getInstance(Integer workerHash) throws InterruptedException { + semMultiplexer.acquire(); + if (!multiplexerInstance.containsKey(workerHash)) { + multiplexerInstance.put(workerHash, new WorkerMultiplexer(workerHash)); + multiplexerRefCount.put(workerHash, 0); + } + multiplexerRefCount.put(workerHash, multiplexerRefCount.get(workerHash) + 1); + WorkerMultiplexer workerMultiplexer = multiplexerInstance.get(workerHash); + semMultiplexer.release(); + return workerMultiplexer; + } + + /** + * Remove the WorkerMultiplexer instance and reference count since it is no + * longer in use. + */ + public static void removeInstance(Integer workerHash) throws InterruptedException { + semMultiplexer.acquire(); + multiplexerRefCount.put(workerHash, multiplexerRefCount.get(workerHash) - 1); + if (multiplexerRefCount.get(workerHash) == 0) { + multiplexerInstance.get(workerHash).interrupt(); + multiplexerInstance.get(workerHash).destroyMultiplexer(); + multiplexerInstance.remove(workerHash); + multiplexerRefCount.remove(workerHash); + } + semMultiplexer.release(); + } +} diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java new file mode 100644 index 00000000000000..f5bc4ee6c16a30 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java @@ -0,0 +1,112 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.worker; + +import com.google.devtools.build.lib.sandbox.SandboxHelpers; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.PathFragment; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +/** + * A proxy that talks to the multiplexers + */ +final class WorkerProxy extends Worker { + private Integer workerHash; + private ByteArrayOutputStream request; + private WorkerMultiplexer workerMultiplexer; + private Thread shutdownHook; + + WorkerProxy(WorkerKey workerKey, int workerId, Path workDir, Path logFile) { + super(workerKey, workerId, workDir, logFile); + this.workerHash = workerKey.hashCode(); + this.request = new ByteArrayOutputStream(); + try { + this.workerMultiplexer = WorkerMultiplexerManager.getInstance(workerHash); + } catch (InterruptedException e) { + // We can't do anything here. + } + + final WorkerProxy self = this; + this.shutdownHook = + new Thread( + () -> { + try { + self.shutdownHook = null; + self.destroy(); + } catch (IOException e) { + // We can't do anything here. + } + }); + Runtime.getRuntime().addShutdownHook(shutdownHook); + } + + @Override + void createProcess() throws IOException { + workerMultiplexer.createProcess(workerKey, workDir, logFile); + } + + @Override + boolean isAlive() { + return workerMultiplexer.isProcessAlive(); + } + + @Override + public void prepareExecution( + Map inputFiles, SandboxHelpers.SandboxOutputs outputs, Set workerFiles) + throws IOException { + createProcess(); + } + + @Override + synchronized void destroy() throws IOException { + if (shutdownHook != null) { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + try { + WorkerMultiplexerManager.removeInstance(workerHash); + } catch (InterruptedException e) { + // We can't do anything here. + } + } + + /** + * Send the WorkRequest to worker process, and wait for WorkResponse. We have + * to set the semaphore to 0 in order to pause the WorkerProxy thread. + */ + @Override + InputStream getInputStream() { + byte[] requestBytes = request.toByteArray(); + request.reset(); + try { + workerMultiplexer.resetResponseChecker(workerId); + workerMultiplexer.putRequest(requestBytes); + return workerMultiplexer.getResponse(workerId); + } catch (Exception e) { + // Return empty InputStream if exception occurs. + return new ByteArrayInputStream(new byte[0]); + } + } + + @Override + OutputStream getOutputStream() { + return request; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java index c3ed4260465901..dfa3b4e1fea963 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java @@ -108,7 +108,7 @@ public String getName() { @Override public SpawnResult exec(Spawn spawn, SpawnExecutionContext context) throws ExecException, IOException, InterruptedException { - if (!Spawns.supportsWorkers(spawn)) { + if (!Spawns.supportsWorkers(spawn) && !Spawns.supportsMultiplexWorkers(spawn)) { // TODO(ulfjack): Don't circumvent SpawnExecutionPolicy. Either drop the warning here, or // provide a mechanism in SpawnExecutionPolicy to report warnings. reporter.handle( @@ -164,12 +164,11 @@ private SpawnResult actuallyExec(Spawn spawn, SpawnExecutionContext context) spawn.getMnemonic(), workerFilesCombinedHash, workerFiles, - context.speculating()); - - WorkRequest workRequest = createWorkRequest(spawn, context, flagFiles, inputFileCache); + context.speculating(), + Spawns.supportsMultiplexWorkers(spawn)); long startTime = System.currentTimeMillis(); - WorkResponse response = execInWorker(spawn, key, workRequest, context, inputFiles, outputs); + WorkResponse response = execInWorker(spawn, key, context, inputFiles, outputs, flagFiles, inputFileCache); Duration wallTime = Duration.ofMillis(System.currentTimeMillis() - startTime); FileOutErr outErr = context.getFileOutErr(); @@ -217,7 +216,8 @@ private WorkRequest createWorkRequest( Spawn spawn, SpawnExecutionContext context, List flagfiles, - MetadataProvider inputFileCache) + MetadataProvider inputFileCache, + int workerId) throws IOException { WorkRequest.Builder requestBuilder = WorkRequest.newBuilder(); for (String flagfile : flagfiles) { @@ -242,7 +242,7 @@ private WorkRequest createWorkRequest( .setDigest(digest) .build(); } - return requestBuilder.build(); + return requestBuilder.setRequestId(workerId).build(); } /** @@ -279,18 +279,21 @@ private static boolean isExternalRepositoryLabel(String arg) { private WorkResponse execInWorker( Spawn spawn, WorkerKey key, - WorkRequest request, SpawnExecutionContext context, Map inputFiles, - SandboxOutputs outputs) + SandboxOutputs outputs, + List flagFiles, + MetadataProvider inputFileCache) throws InterruptedException, ExecException { Worker worker = null; WorkResponse response; + WorkRequest request; ActionExecutionMetadata owner = spawn.getResourceOwner(); try { try { worker = workers.borrowObject(key); + request = createWorkRequest(spawn, context, flagFiles, inputFileCache, worker.getWorkerId()); } catch (IOException e) { throw new UserExecException( ErrorMessage.builder() diff --git a/src/main/protobuf/worker_protocol.proto b/src/main/protobuf/worker_protocol.proto index 4706792f4e78f2..ebe1fe19386cb7 100644 --- a/src/main/protobuf/worker_protocol.proto +++ b/src/main/protobuf/worker_protocol.proto @@ -38,6 +38,10 @@ message WorkRequest { // The inputs that the worker is allowed to read during execution of this // request. repeated Input inputs = 2; + + // To support multiplex worker, each WorkRequest must have an unique ID. This ID should + // be attached unchanged to the WorkResponse. + int32 request_id = 3; } // The worker sends this message to Blaze when it finished its work on the WorkRequest message. @@ -48,4 +52,9 @@ message WorkResponse { // compiler warnings / errors etc. - thus we'll use a string type here, which gives us UTF-8 // encoding. string output = 2; + + // To support multiplex worker, each WorkResponse must have an unique ID. Since worker processes + // which support multiplex worker will handle multiple WorkRequests in parallel, this ID will + // be used to determined which WorkerProxy does this WorkResponse belong to. + int32 request_id = 3; } diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java index 0552cb8c5ac2d6..c2f877d350babe 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java @@ -42,13 +42,14 @@ public void sandboxedWorkerPathEndsWithWorkspaceName() throws Exception { WorkerFactory workerFactory = new WorkerFactory(new WorkerOptions(), workerBaseDir); WorkerKey workerKey = new WorkerKey( - ImmutableList.of(), - ImmutableMap.of(), - fs.getPath("/outputbase/execroot/workspace"), - "dummy", - HashCode.fromInt(0), - ImmutableSortedMap.of(), - true); + /* args= */ ImmutableList.of(), + /* env= */ ImmutableMap.of(), + /* execRoot= */ fs.getPath("/outputbase/execroot/workspace"), + /* mnemonic= */ "dummy", + /* workerFilesCombinedHash= */ HashCode.fromInt(0), + /* workerFilesWithHashes= */ ImmutableSortedMap.of(), + /* mustBeSandboxed= */ true, + /* proxied= */ false); Path sandboxedWorkerPath = workerFactory.getSandboxedWorkerPath(workerKey, 1); assertThat(sandboxedWorkerPath.getBaseName()).isEqualTo("workspace");