diff --git a/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java b/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java index e63283913b8cbd..447939b33504cd 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java +++ b/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java @@ -144,8 +144,10 @@ public String parseIfMatches(String tag) throws ValidationException { /** If an action supports running in persistent worker mode. */ public static final String SUPPORTS_WORKERS = "supports-workers"; + public static final String SUPPORTS_MULTIPLEX_WORKERS = "supports-multiplex-workers"; + public static final ImmutableMap WORKER_MODE_ENABLED = - ImmutableMap.of(SUPPORTS_WORKERS, "1"); + ImmutableMap.of(SUPPORTS_WORKERS, "1", SUPPORTS_MULTIPLEX_WORKERS, "1"); /** * Requires local execution without sandboxing for a spawn. diff --git a/src/main/java/com/google/devtools/build/lib/actions/Spawns.java b/src/main/java/com/google/devtools/build/lib/actions/Spawns.java index 3b86bd7b4dca25..7e2bf44a24fb8e 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/Spawns.java +++ b/src/main/java/com/google/devtools/build/lib/actions/Spawns.java @@ -69,6 +69,14 @@ public static boolean supportsWorkers(Spawn spawn) { return "1".equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_WORKERS)); } + /** + * Returns whether a Spawn claims to support being executed with the persistent multiplex worker strategy + * according to its execution info tags. + */ + public static boolean supportsMultiplexWorkers(Spawn spawn) { + return "1".equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_MULTIPLEX_WORKERS)); + } + /** * Parse the timeout key in the spawn execution info, if it exists. Otherwise, return -1. */ diff --git a/src/main/java/com/google/devtools/build/lib/worker/Worker.java b/src/main/java/com/google/devtools/build/lib/worker/Worker.java index 1db19f5076129e..e14b5c7f60e942 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/Worker.java +++ b/src/main/java/com/google/devtools/build/lib/worker/Worker.java @@ -41,13 +41,13 @@ * class. */ class Worker { - private final WorkerKey workerKey; - private final int workerId; - private final Path workDir; - private final Path logFile; + protected final WorkerKey workerKey; + protected final int workerId; + protected final Path workDir; + protected final Path logFile; private Subprocess process; - private Thread shutdownHook; + protected Thread shutdownHook; Worker(WorkerKey workerKey, int workerId, final Path workDir, Path logFile) { this.workerKey = workerKey; diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java index 871a3a0c64d055..146ae22bcc572b 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java @@ -59,9 +59,12 @@ public Worker create(WorkerKey key) throws Exception { Worker worker; boolean sandboxed = workerOptions.workerSandboxing || key.mustBeSandboxed(); + boolean proxyed = key.proxyed(); if (sandboxed) { Path workDir = getSandboxedWorkerPath(key, workerId); worker = new SandboxedWorker(key, workerId, workDir, logFile); + } else if (proxyed) { + worker = new WorkerProxy(key, workerId, key.getExecRoot(), logFile); } else { worker = new Worker(key, workerId, key.getExecRoot(), logFile); } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java index 55b0910239b5d8..8fbdc23d4fa555 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java @@ -42,6 +42,7 @@ final class WorkerKey { private final HashCode workerFilesCombinedHash; private final SortedMap workerFilesWithHashes; private final boolean mustBeSandboxed; + private final boolean proxyed; WorkerKey( List args, @@ -50,7 +51,8 @@ final class WorkerKey { String mnemonic, HashCode workerFilesCombinedHash, SortedMap workerFilesWithHashes, - boolean mustBeSandboxed) { + boolean mustBeSandboxed, + boolean proxyed) { this.args = ImmutableList.copyOf(Preconditions.checkNotNull(args)); this.env = ImmutableMap.copyOf(Preconditions.checkNotNull(env)); this.execRoot = Preconditions.checkNotNull(execRoot); @@ -58,6 +60,7 @@ final class WorkerKey { this.workerFilesCombinedHash = Preconditions.checkNotNull(workerFilesCombinedHash); this.workerFilesWithHashes = Preconditions.checkNotNull(workerFilesWithHashes); this.mustBeSandboxed = mustBeSandboxed; + this.proxyed = proxyed; } public ImmutableList getArgs() { @@ -88,6 +91,10 @@ public boolean mustBeSandboxed() { return mustBeSandboxed; } + public boolean proxyed() { + return proxyed; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java new file mode 100644 index 00000000000000..c57352e62858dd --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java @@ -0,0 +1,283 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.worker; + +import com.google.devtools.build.lib.shell.Subprocess; +import com.google.devtools.build.lib.shell.SubprocessBuilder; +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; +import com.google.devtools.build.lib.vfs.Path; +import java.io.File; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Semaphore; + +/** + * An intermediate worker that receives response from the worker processes + */ +public class WorkerMultiplexer extends Thread { + private static Semaphore semInstanceMap = new Semaphore(1); + private static Map instanceMap = new HashMap<>(); + private Map responseMap; + private Map responseChecker; + private Semaphore semResponseMap; + private Semaphore semResponseChecker; + private Semaphore semAccessProcess; + + private Subprocess process; + private Integer workerHash; + + private Thread shutdownHook; + + WorkerMultiplexer(Integer workerHash) { + semAccessProcess = new Semaphore(1); + semResponseMap = new Semaphore(1); + semResponseChecker = new Semaphore(1); + responseChecker = new HashMap<>(); + responseMap = new HashMap<>(); + this.workerHash = workerHash; + + final WorkerMultiplexer self = this; + this.shutdownHook = + new Thread( + () -> { + try { + self.shutdownHook = null; + self.destroyMultiplexer(); + } finally { + // We can't do anything here. + } + }); + Runtime.getRuntime().addShutdownHook(shutdownHook); + } + + /** + * Returns a WorkerMultiplexer instance to WorkerProxy. WorkerProxys with the + * same workerHash talk to the same WorkerMultiplexer. + */ + public synchronized static WorkerMultiplexer getInstance(Integer workerHash) { + try { + semInstanceMap.acquire(); + if (!instanceMap.containsKey(workerHash)) { + instanceMap.put(workerHash, new WorkerMultiplexer(workerHash)); + } + WorkerMultiplexer receiver = instanceMap.get(workerHash); + return receiver; + } catch (InterruptedException e) { + e.printStackTrace(); + return null; + } finally { + semInstanceMap.release(); + } + } + + /** + * Only start one worker process for each WorkerMultiplexer, if it hasn't. + */ + public synchronized void createProcess(WorkerKey workerKey, Path workDir, Path logFile) throws IOException { + try { + semAccessProcess.acquire(); + if (this.process == null || this.process.finished()) { + List args = workerKey.getArgs(); + File executable = new File(args.get(0)); + if (!executable.isAbsolute() && executable.getParent() != null) { + args = new ArrayList<>(args); + args.set(0, new File(workDir.getPathFile(), args.get(0)).getAbsolutePath()); + } + SubprocessBuilder processBuilder = new SubprocessBuilder(); + processBuilder.setArgv(args); + processBuilder.setWorkingDirectory(workDir.getPathFile()); + processBuilder.setStderr(logFile.getPathFile()); + processBuilder.setEnv(workerKey.getEnv()); + this.process = processBuilder.start(); + } + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + semAccessProcess.release(); + } + if (!this.isAlive()) { + this.start(); + } + } + + synchronized void destroyMultiplexer() { + if (shutdownHook != null) { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + try { + semInstanceMap.acquire(); + instanceMap.remove(workerHash); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + semInstanceMap.release(); + } + try { + semAccessProcess.acquire(); + if (this.process != null) { + destroyProcess(this.process); + } + semAccessProcess.release(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + private static void destroyProcess(Subprocess process) { + boolean wasInterrupted = false; + try { + process.destroy(); + while (true) { + try { + process.waitFor(); + return; + } catch (InterruptedException ie) { + wasInterrupted = true; + } + } + } finally { + // Read this for detailed explanation: http://www.ibm.com/developerworks/library/j-jtp05236/ + if (wasInterrupted) { + Thread.currentThread().interrupt(); // preserve interrupted status + } + } + } + + public boolean isProcessAlive() { + try { + semAccessProcess.acquire(); + return !this.process.finished(); + } catch (InterruptedException e) { + e.printStackTrace(); + return false; + } finally { + semAccessProcess.release(); + } + } + + /** + * Pass the WorkRequest to worker process. + */ + public synchronized void putRequest(byte[] request) throws IOException { + OutputStream stdin = process.getOutputStream(); + stdin.write(request); + stdin.flush(); + } + + /** + * A WorkerProxy waits on a semaphore for the WorkResponse returned from worker process. + */ + public InputStream getResponse(int workerId) throws InterruptedException { + Semaphore waitForResponse; + try { + semResponseChecker.acquire(); + waitForResponse = responseChecker.get(workerId); + } catch (InterruptedException e) { + throw e; + } finally { + semResponseChecker.release(); + } + + try { + waitForResponse.acquire(); + } catch (InterruptedException e) { + // Return empty InputStream if there is a compilation error. + return new ByteArrayInputStream(new byte[0]); + } + + try { + semResponseMap.acquire(); + InputStream response = responseMap.get(workerId); + return response; + } catch (InterruptedException e) { + throw e; + } finally { + semResponseMap.release(); + } + } + + /** + * Reset the map that indicates if the WorkResponses have been returned. + */ + public void setResponseMap(int workerId) throws InterruptedException { + try { + semResponseChecker.acquire(); + responseChecker.put(workerId, new Semaphore(0)); + } catch (InterruptedException e) { + throw e; + } finally { + semResponseChecker.release(); + } + } + + /** + * When it gets a WorkResponse from worker process, put that WorkResponse to + * responseMap and signal responseChecker. + */ + public void waitRequest() throws InterruptedException, IOException { + InputStream stdout = process.getInputStream(); + + WorkResponse parsedResponse; + try { + parsedResponse = WorkResponse.parseDelimitedFrom(stdout); + } catch (IOException e) { + throw e; + } + + if (parsedResponse == null) return; + + int workerId = parsedResponse.getRequestId(); + ByteArrayOutputStream tempOs = new ByteArrayOutputStream(); + parsedResponse.writeDelimitedTo(tempOs); + + try { + semResponseMap.acquire(); + responseMap.put(workerId, new ByteArrayInputStream(tempOs.toByteArray())); + } catch (InterruptedException e) { + throw e; + } finally { + semResponseMap.release(); + } + + try { + semResponseChecker.acquire(); + responseChecker.get(workerId).release(); + } catch (InterruptedException e) { + throw e; + } finally { + semResponseChecker.release(); + } + } + + /** + * A multiplexer thread that listens to the WorkResponses from worker process. + */ + public void run() { + while (!this.interrupted()) { + try { + waitRequest(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java new file mode 100644 index 00000000000000..6532a29a1901b3 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java @@ -0,0 +1,106 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.worker; + +import com.google.devtools.build.lib.sandbox.SandboxHelpers; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.PathFragment; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.Set; + +/** + * A proxy that talks to the multiplexers + */ +final class WorkerProxy extends Worker { + private WorkerMultiplexer workerMultiplexer; + private ByteArrayOutputStream request; + private Thread shutdownHook; + + WorkerProxy(WorkerKey workerKey, int workerId, Path workDir, Path logFile) { + super(workerKey, workerId, workDir, logFile); + this.workerMultiplexer = WorkerMultiplexer.getInstance(workerKey.hashCode()); + this.request = new ByteArrayOutputStream(); + + final Worker self = this; + this.shutdownHook = + new Thread( + () -> { + try { + self.shutdownHook = null; + self.destroy(); + } catch (IOException e) { + // We can't do anything here. + } + }); + Runtime.getRuntime().addShutdownHook(shutdownHook); + } + + @Override + void createProcess() throws IOException { + workerMultiplexer.createProcess(workerKey, workDir, logFile); + } + + @Override + boolean isAlive() { + // This is horrible, but Process.isAlive() is only available from Java 8 on and this is the + // best we can do prior to that. + return workerMultiplexer.isProcessAlive(); + } + + @Override + public void prepareExecution( + Map inputFiles, SandboxHelpers.SandboxOutputs outputs, Set workerFiles) + throws IOException { + createProcess(); + } + + @Override + synchronized void destroy() { + if (shutdownHook != null) { + Runtime.getRuntime().removeShutdownHook(shutdownHook); + } + if (workerMultiplexer.isAlive()) { + workerMultiplexer.interrupt(); + } + workerMultiplexer.destroyMultiplexer(); + } + + /** + * Reset the responseMap, pass the WorkRequest to worker process, and wait for WorkResponse. + */ + @Override + InputStream getInputStream() { + byte[] requestBytes = request.toByteArray(); + request.reset(); + try { + workerMultiplexer.setResponseMap(workerId); + workerMultiplexer.putRequest(requestBytes); + return workerMultiplexer.getResponse(workerId); + } catch (Exception e) { + e.printStackTrace(); + return new ByteArrayInputStream(new byte[0]); + } + } + + @Override + OutputStream getOutputStream() { + return request; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java index c3ed4260465901..da52e821ccd0d4 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java @@ -108,7 +108,7 @@ public String getName() { @Override public SpawnResult exec(Spawn spawn, SpawnExecutionContext context) throws ExecException, IOException, InterruptedException { - if (!Spawns.supportsWorkers(spawn)) { + if (!Spawns.supportsWorkers(spawn) && !Spawns.supportsMultiplexWorkers(spawn)) { // TODO(ulfjack): Don't circumvent SpawnExecutionPolicy. Either drop the warning here, or // provide a mechanism in SpawnExecutionPolicy to report warnings. reporter.handle( @@ -156,6 +156,8 @@ private SpawnResult actuallyExec(Spawn spawn, SpawnExecutionContext context) spawn, context, execRoot, sandboxUsesExpandedTreeArtifactsInRunfiles); SandboxOutputs outputs = SandboxHelpers.getOutputs(spawn); + boolean proxyed = Spawns.supportsMultiplexWorkers(spawn); + WorkerKey key = new WorkerKey( workerArgs, @@ -164,7 +166,8 @@ private SpawnResult actuallyExec(Spawn spawn, SpawnExecutionContext context) spawn.getMnemonic(), workerFilesCombinedHash, workerFiles, - context.speculating()); + context.speculating(), + proxyed); WorkRequest workRequest = createWorkRequest(spawn, context, flagFiles, inputFileCache); @@ -291,6 +294,7 @@ private WorkResponse execInWorker( try { try { worker = workers.borrowObject(key); + request = request.toBuilder().setRequestId(worker.getWorkerId()).build(); } catch (IOException e) { throw new UserExecException( ErrorMessage.builder() diff --git a/src/main/protobuf/worker_protocol.proto b/src/main/protobuf/worker_protocol.proto index 4706792f4e78f2..a55ae8587db1e4 100644 --- a/src/main/protobuf/worker_protocol.proto +++ b/src/main/protobuf/worker_protocol.proto @@ -38,6 +38,8 @@ message WorkRequest { // The inputs that the worker is allowed to read during execution of this // request. repeated Input inputs = 2; + + int32 request_id = 3; } // The worker sends this message to Blaze when it finished its work on the WorkRequest message. @@ -48,4 +50,6 @@ message WorkResponse { // compiler warnings / errors etc. - thus we'll use a string type here, which gives us UTF-8 // encoding. string output = 2; + + int32 request_id = 3; } diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java index 0552cb8c5ac2d6..6b2f7905bfc80e 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java @@ -48,7 +48,8 @@ public void sandboxedWorkerPathEndsWithWorkspaceName() throws Exception { "dummy", HashCode.fromInt(0), ImmutableSortedMap.of(), - true); + true, + false); Path sandboxedWorkerPath = workerFactory.getSandboxedWorkerPath(workerKey, 1); assertThat(sandboxedWorkerPath.getBaseName()).isEqualTo("workspace");