diff --git a/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java b/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java index d7bb88ae856ebc..0314e6d506ebd4 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java +++ b/src/main/java/com/google/devtools/build/lib/actions/ExecutionRequirements.java @@ -157,6 +157,8 @@ public String parseIfMatches(String tag) throws ValidationException { /** If an action supports running in persistent worker mode. */ public static final String SUPPORTS_WORKERS = "supports-workers"; + public static final String SUPPORTS_MULTIPLEX_WORKERS = "supports-multiplex-workers"; + public static final ImmutableMap WORKER_MODE_ENABLED = ImmutableMap.of(SUPPORTS_WORKERS, "1"); diff --git a/src/main/java/com/google/devtools/build/lib/actions/Spawns.java b/src/main/java/com/google/devtools/build/lib/actions/Spawns.java index d90cd1d5b5a956..6f5cf1f1929933 100644 --- a/src/main/java/com/google/devtools/build/lib/actions/Spawns.java +++ b/src/main/java/com/google/devtools/build/lib/actions/Spawns.java @@ -72,6 +72,14 @@ public static boolean supportsWorkers(Spawn spawn) { return "1".equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_WORKERS)); } + /** + * Returns whether a Spawn claims to support being executed with the persistent multiplex worker strategy + * according to its execution info tags. + */ + public static boolean supportsMultiplexWorkers(Spawn spawn) { + return "1".equals(spawn.getExecutionInfo().get(ExecutionRequirements.SUPPORTS_MULTIPLEX_WORKERS)); + } + /** * Parse the timeout key in the spawn execution info, if it exists. Otherwise, return -1. */ diff --git a/src/main/java/com/google/devtools/build/lib/worker/Worker.java b/src/main/java/com/google/devtools/build/lib/worker/Worker.java index e77bb35f3ed2ea..c0d50d42b6b382 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/Worker.java +++ b/src/main/java/com/google/devtools/build/lib/worker/Worker.java @@ -20,6 +20,8 @@ import com.google.devtools.build.lib.shell.SubprocessBuilder; import com.google.devtools.build.lib.vfs.Path; import com.google.devtools.build.lib.vfs.PathFragment; +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest; +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; import java.io.File; import java.io.IOException; import java.io.InputStream; @@ -41,10 +43,26 @@ * class. */ class Worker { - private final WorkerKey workerKey; - private final int workerId; - private final Path workDir; - private final Path logFile; + /** + * An unique identifier of the work process. + */ + protected final WorkerKey workerKey; + /** + * An unique ID of the worker. It will be used in WorkRequest and WorkResponse as well. + */ + protected final int workerId; + /** + * The execution root of the worker. + */ + protected final Path workDir; + /** + * The path of the log file. + */ + protected final Path logFile; + /** + * Stream for reading the WorkResponse. + */ + protected RecordingInputStream recordingStream; private Subprocess process; private Thread shutdownHook; @@ -141,12 +159,22 @@ boolean isAlive() { return !process.finished(); } - InputStream getInputStream() { - return process.getInputStream(); + void putRequest(WorkRequest request) throws IOException { + request.writeDelimitedTo(process.getOutputStream()); + process.getOutputStream().flush(); + } + + WorkResponse getResponse() throws IOException { + recordingStream = new RecordingInputStream(process.getInputStream()); + recordingStream.startRecording(4096); + // response can be null when the worker has already closed stdout at this point and thus + // the InputStream is at EOF. + return WorkResponse.parseDelimitedFrom(recordingStream); } - OutputStream getOutputStream() { - return process.getOutputStream(); + String getRecordingStreamMessage() { + recordingStream.readRemaining(); + return recordingStream.getRecordedDataAsString(); } public void prepareExecution( diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java index 91f52e89b7d4d0..68656558a2a98e 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerFactory.java @@ -62,6 +62,8 @@ public Worker create(WorkerKey key) throws Exception { if (sandboxed) { Path workDir = getSandboxedWorkerPath(key, workerId); worker = new SandboxedWorker(key, workerId, workDir, logFile); + } else if (key.getProxied()) { + worker = new WorkerProxy(key, workerId, key.getExecRoot(), logFile, WorkerMultiplexerManager.getInstance(key.hashCode())); } else { worker = new Worker(key, workerId, key.getExecRoot(), logFile); } diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java index 55b0910239b5d8..5de1ce7dfa56ad 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerKey.java @@ -42,6 +42,8 @@ final class WorkerKey { private final HashCode workerFilesCombinedHash; private final SortedMap workerFilesWithHashes; private final boolean mustBeSandboxed; + /** A WorkerProxy will be instantiated if true, instantiate a regular Worker if false. */ + private final boolean proxied; WorkerKey( List args, @@ -50,44 +52,66 @@ final class WorkerKey { String mnemonic, HashCode workerFilesCombinedHash, SortedMap workerFilesWithHashes, - boolean mustBeSandboxed) { + boolean mustBeSandboxed, + boolean proxied) { + /** Build options. */ this.args = ImmutableList.copyOf(Preconditions.checkNotNull(args)); + /** Environment variables. */ this.env = ImmutableMap.copyOf(Preconditions.checkNotNull(env)); + /** Execution root of Bazel process. */ this.execRoot = Preconditions.checkNotNull(execRoot); + /** Mnemonic of the worker. */ this.mnemonic = Preconditions.checkNotNull(mnemonic); + /** One combined hash code for all files. */ this.workerFilesCombinedHash = Preconditions.checkNotNull(workerFilesCombinedHash); + /** Worker files with the corresponding hash code. */ this.workerFilesWithHashes = Preconditions.checkNotNull(workerFilesWithHashes); + /** Set it to true if this job should be run in sandbox. */ this.mustBeSandboxed = mustBeSandboxed; + /** Set it to true if this job should be run with WorkerProxy. */ + this.proxied = proxied; } + /** Getter function for variable args. */ public ImmutableList getArgs() { return args; } + /** Getter function for variable env. */ public ImmutableMap getEnv() { return env; } + /** Getter function for variable execRoot. */ public Path getExecRoot() { return execRoot; } + /** Getter function for variable mnemonic. */ public String getMnemonic() { return mnemonic; } + /** Getter function for variable workerFilesCombinedHash. */ public HashCode getWorkerFilesCombinedHash() { return workerFilesCombinedHash; } + /** Getter function for variable workerFilesWithHashes. */ public SortedMap getWorkerFilesWithHashes() { return workerFilesWithHashes; } + /** Getter function for variable mustBeSandboxed. */ public boolean mustBeSandboxed() { return mustBeSandboxed; } + /** Getter function for variable proxied. */ + public boolean getProxied() { + return proxied; + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java new file mode 100644 index 00000000000000..7cf25dcdbb0559 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexer.java @@ -0,0 +1,226 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.worker; + +import com.google.devtools.build.lib.shell.Subprocess; +import com.google.devtools.build.lib.shell.SubprocessBuilder; +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest; +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; +import com.google.devtools.build.lib.vfs.Path; +import java.io.File; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Semaphore; +import java.util.logging.Logger; + +/** An intermediate worker that sends request and receives response from the worker process. */ +public class WorkerMultiplexer extends Thread { + private static final Logger logger = Logger.getLogger(WorkerMultiplexer.class.getName()); + /** + * WorkerMultiplexer is running as a thread on its own. When worker process + * returns the WorkResponse, it is stored in this map and wait for + * WorkerProxy to retrieve the response. + */ + private Map workerProcessResponse; + /** A semaphore to protect workerProcessResponse object. */ + private Semaphore semWorkerProcessResponse; + /** + * After sending the WorkRequest, WorkerProxy will wait on a semaphore to be + * released. WorkerMultiplexer is responsible to release the corresponding + * semaphore in order to signal WorkerProxy that the WorkerResponse has been + * received. + */ + private Map responseChecker; + /** A semaphore to protect responseChecker object. */ + private Semaphore semResponseChecker; + /** The worker process that this WorkerMultiplexer should be talking to. */ + private Subprocess process; + /** + * If one of the worker processes returns unparseable response, + * discard all the responses from other worker processes. + */ + private boolean isUnparseable; + /** InputStream from worker process. */ + private RecordingInputStream recordingStream; + /** If worker process returns null, return null to WorkerProxy and discard all the responses. */ + private boolean isNull; + + WorkerMultiplexer() { + semWorkerProcessResponse = new Semaphore(1); + semResponseChecker = new Semaphore(1); + responseChecker = new HashMap<>(); + workerProcessResponse = new HashMap<>(); + isUnparseable = false; + isNull = false; + + final WorkerMultiplexer self = this; + } + + /** Only start one worker process for each WorkerMultiplexer, if it hasn't. */ + public synchronized void createProcess(WorkerKey workerKey, Path workDir, Path logFile) throws IOException { + if (this.process == null) { + List args = workerKey.getArgs(); + File executable = new File(args.get(0)); + if (!executable.isAbsolute() && executable.getParent() != null) { + args = new ArrayList<>(args); + args.set(0, new File(workDir.getPathFile(), args.get(0)).getAbsolutePath()); + } + SubprocessBuilder processBuilder = new SubprocessBuilder(); + processBuilder.setArgv(args); + processBuilder.setWorkingDirectory(workDir.getPathFile()); + processBuilder.setStderr(logFile.getPathFile()); + processBuilder.setEnv(workerKey.getEnv()); + this.process = processBuilder.start(); + } + if (!this.isAlive()) { + this.start(); + } + } + + public synchronized void destroyMultiplexer() { + if (this.process != null) { + destroyProcess(this.process); + } + } + + private void destroyProcess(Subprocess process) { + boolean wasInterrupted = false; + try { + process.destroy(); + while (true) { + try { + process.waitFor(); + return; + } catch (InterruptedException ie) { + wasInterrupted = true; + } + } + } finally { + // Read this for detailed explanation: http://www.ibm.com/developerworks/library/j-jtp05236/ + if (wasInterrupted) { + Thread.currentThread().interrupt(); // preserve interrupted status + } + } + } + + public boolean isProcessAlive() { + return !this.process.finished(); + } + + /** Send the WorkRequest to worker process. */ + public synchronized void putRequest(WorkRequest request) throws IOException { + request.writeDelimitedTo(process.getOutputStream()); + process.getOutputStream().flush(); + } + + /** Wait on a semaphore for the WorkResponse returned from worker process. */ + public InputStream getResponse(Integer workerId) throws IOException, InterruptedException { + semResponseChecker.acquire(); + Semaphore waitForResponse = responseChecker.get(workerId); + semResponseChecker.release(); + + // The semaphore will throw InterruptedException when the multiplexer is terminated. + waitForResponse.acquire(); + + if (isNull) { + return null; + } + + if (isUnparseable) { + recordingStream.readRemaining(); + throw new IOException(recordingStream.getRecordedDataAsString()); + } + + semWorkerProcessResponse.acquire(); + InputStream response = workerProcessResponse.get(workerId); + semWorkerProcessResponse.release(); + return response; + } + + /** Reset the semaphore map before sending request to worker process. */ + public void resetResponseChecker(Integer workerId) throws InterruptedException { + semResponseChecker.acquire(); + responseChecker.put(workerId, new Semaphore(0)); + semResponseChecker.release(); + } + + /** + * When it gets a WorkResponse from worker process, put that WorkResponse in + * workerProcessResponse and signal responseChecker. + */ + private void waitResponse() throws InterruptedException, IOException { + recordingStream = new RecordingInputStream(process.getInputStream()); + recordingStream.startRecording(4096); + WorkResponse parsedResponse = WorkResponse.parseDelimitedFrom(recordingStream); + + if (parsedResponse == null) { + isNull = true; + releaseAllSemaphores(); + return; + } + + Integer workerId = parsedResponse.getRequestId(); + ByteArrayOutputStream tempOs = new ByteArrayOutputStream(); + parsedResponse.writeDelimitedTo(tempOs); + + semWorkerProcessResponse.acquire(); + workerProcessResponse.put(workerId, new ByteArrayInputStream(tempOs.toByteArray())); + semWorkerProcessResponse.release(); + + semResponseChecker.acquire(); + responseChecker.get(workerId).release(); + semResponseChecker.release(); + } + + /** A multiplexer thread that listens to the WorkResponse from worker process. */ + public void run() { + while (!this.interrupted()) { + try { + waitResponse(); + } catch (IOException e) { + isUnparseable = true; + releaseAllSemaphores(); + logger.warning("IOException was caught while waiting for worker response. " + + "It could because the worker returned unparseable response."); + } catch (InterruptedException e) { + logger.warning("InterruptedException was caught while waiting for worker response. " + + "It could because the multiplexer was interrupted."); + } + } + logger.warning("Multiplexer thread has been terminated. It could because the memory is running low on your machine. " + + "There may be other reasons."); + } + + /** Release all the semaphores */ + private void releaseAllSemaphores() { + try { + semResponseChecker.acquire(); + for (Integer workerId: responseChecker.keySet()) { + responseChecker.get(workerId).release(); + } + } catch (InterruptedException e) { + // Do nothing + } finally { + semResponseChecker.release(); + } + } +} diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java new file mode 100644 index 00000000000000..7091bc88e3a50f --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManager.java @@ -0,0 +1,135 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.worker; + +import com.google.devtools.build.lib.actions.UserExecException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Semaphore; + +/** A manager to instantiate and distroy multiplexers. */ +public class WorkerMultiplexerManager { + /** + * There should only be one WorkerMultiplexer corresponding to workers with + * the same mnemonic. If the WorkerMultiplexer has been constructed, other + * workers should point to the same one. The hash of WorkerKey is used as + * key. + */ + private static Map multiplexerInstance; + /** A semaphore to protect multiplexerInstance and multiplexerRefCount objects. */ + private static Semaphore semMultiplexer; + + static { + multiplexerInstance = new HashMap<>(); + semMultiplexer = new Semaphore(1); + } + /** + * Returns a WorkerMultiplexer instance to WorkerProxy. WorkerProxies with the + * same workerHash talk to the same WorkerMultiplexer. Also, record how many + * WorkerProxies are talking to this WorkerMultiplexer. + */ + public static WorkerMultiplexer getInstance(Integer workerHash) throws InterruptedException { + semMultiplexer.acquire(); + if (!multiplexerInstance.containsKey(workerHash)) { + multiplexerInstance.put(workerHash, new InstanceInfo()); + } + multiplexerInstance.get(workerHash).increaseRefCount(); + WorkerMultiplexer workerMultiplexer = multiplexerInstance.get(workerHash).getWorkerMultiplexer(); + semMultiplexer.release(); + return workerMultiplexer; + } + + /** + * Remove the WorkerMultiplexer instance and reference count since it is no + * longer in use. + */ + public static void removeInstance(Integer workerHash) throws InterruptedException, UserExecException { + semMultiplexer.acquire(); + try { + multiplexerInstance.get(workerHash).decreaseRefCount(); + if (multiplexerInstance.get(workerHash).getRefCount() == 0) { + multiplexerInstance.get(workerHash).getWorkerMultiplexer().interrupt(); + multiplexerInstance.get(workerHash).getWorkerMultiplexer().destroyMultiplexer(); + multiplexerInstance.remove(workerHash); + } + } catch (Exception e) { + throw new UserExecException( + ErrorMessage.builder() + .message("NullPointerException while accessing non-existent multiplexer instance.") + .exception(e) + .build() + .toString()); + } finally { + semMultiplexer.release(); + } + } + + public static WorkerMultiplexer getMultiplexer(Integer workerHash) throws UserExecException { + try { + return multiplexerInstance.get(workerHash).getWorkerMultiplexer(); + } catch (NullPointerException e) { + throw new UserExecException( + ErrorMessage.builder() + .message("NullPointerException while accessing non-existent multiplexer instance.") + .exception(e) + .build() + .toString()); + } + } + + public static Integer getRefCount(Integer workerHash) throws UserExecException { + try { + return multiplexerInstance.get(workerHash).getRefCount(); + } catch (NullPointerException e) { + throw new UserExecException( + ErrorMessage.builder() + .message("NullPointerException while accessing non-existent multiplexer instance.") + .exception(e) + .build() + .toString()); + } + } + + public static Integer getInstanceCount() { + return multiplexerInstance.keySet().size(); + } +} + +/** Contains the WorkerMultiplexer instance and reference count */ +class InstanceInfo { + private WorkerMultiplexer workerMultiplexer; + private Integer refCount; + + public InstanceInfo() { + this.workerMultiplexer = new WorkerMultiplexer(); + this.refCount = 0; + } + + public void increaseRefCount() { + refCount = refCount + 1; + } + + public void decreaseRefCount() { + refCount = refCount - 1; + } + + public WorkerMultiplexer getWorkerMultiplexer() { + return workerMultiplexer; + } + + public Integer getRefCount() { + return refCount; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java new file mode 100644 index 00000000000000..25926bb52cfcc8 --- /dev/null +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerProxy.java @@ -0,0 +1,129 @@ +// Copyright 2018 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.worker; + +import com.google.devtools.build.lib.actions.UserExecException; +import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxInputs; +import com.google.devtools.build.lib.sandbox.SandboxHelpers.SandboxOutputs; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.PathFragment; +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest; +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.logging.Logger; + +/** + * A proxy that talks to the multiplexer + */ +final class WorkerProxy extends Worker { + private static final Logger logger = Logger.getLogger(WorkerProxy.class.getName()); + private ByteArrayOutputStream request; + private WorkerMultiplexer workerMultiplexer; + private Thread shutdownHook; + private String recordingStreamMessage; + + WorkerProxy(WorkerKey workerKey, int workerId, Path workDir, Path logFile, WorkerMultiplexer workerMultiplexer) { + super(workerKey, workerId, workDir, logFile); + request = new ByteArrayOutputStream(); + this.workerMultiplexer = workerMultiplexer; + + final WorkerProxy self = this; + } + + @Override + void createProcess() throws IOException { + workerMultiplexer.createProcess(workerKey, workDir, logFile); + } + + @Override + boolean isAlive() { + return workerMultiplexer.isProcessAlive(); + } + + @Override + public void prepareExecution( + SandboxInputs inputFiles, SandboxOutputs outputs, Set workerFiles) + throws IOException { + createProcess(); + } + + @Override + synchronized void destroy() throws IOException { + super.destroy(); + try { + WorkerMultiplexerManager.removeInstance(workerKey.hashCode()); + } catch (InterruptedException e) { + logger.warning("InterruptedException was caught while destroying multiplexer. " + + "It could because the multiplexer was interrupted."); + } catch (UserExecException e) { + logger.warning(e.toString()); + } + } + + /** Send the WorkRequest to multiplexer. */ + @Override + void putRequest(WorkRequest request) throws IOException { + try { + workerMultiplexer.resetResponseChecker(workerId); + workerMultiplexer.putRequest(request); + } catch (InterruptedException e) { + /** + * We can't throw InterruptedException to WorkerSpawnRunner because of the principle of override. + * InterruptedException will happen when Bazel is waiting for semaphore but user terminates the + * process, so we do nothing here. + */ + logger.warning("InterruptedException was caught while sending worker request. " + + "It could because the multiplexer was interrupted."); + } + } + + /** Wait for WorkResponse from multiplexer. */ + @Override + WorkResponse getResponse() throws IOException { + try { + InputStream inputStream = workerMultiplexer.getResponse(workerId); + if (inputStream == null) { + return null; + } + return WorkResponse.parseDelimitedFrom(inputStream); + } catch (IOException e) { + recordingStreamMessage = e.toString(); + throw new IOException("IOException was caught while waiting for worker response. " + + "It could because the worker returned unparseable response."); + } catch (InterruptedException e) { + /** + * We can't throw InterruptedException to WorkerSpawnRunner because of the principle of override. + * InterruptedException will happen when Bazel is waiting for semaphore but user terminates the + * process, so we do nothing here. + */ + logger.warning("InterruptedException was caught while waiting for work response. " + + "It could because the multiplexer was interrupted."); + } + // response can be null when the worker has already closed stdout at this point and thus + // the InputStream is at EOF. + return null; + } + + @Override + String getRecordingStreamMessage() { + return recordingStreamMessage; + } +} diff --git a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java index 3c32bb9048148c..174f2538525a9e 100644 --- a/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java +++ b/src/main/java/com/google/devtools/build/lib/worker/WorkerSpawnRunner.java @@ -112,7 +112,7 @@ public String getName() { @Override public SpawnResult exec(Spawn spawn, SpawnExecutionContext context) throws ExecException, IOException, InterruptedException { - if (!Spawns.supportsWorkers(spawn)) { + if (!Spawns.supportsWorkers(spawn) && !Spawns.supportsMultiplexWorkers(spawn)) { // TODO(ulfjack): Don't circumvent SpawnExecutionPolicy. Either drop the warning here, or // provide a mechanism in SpawnExecutionPolicy to report warnings. reporter.handle( @@ -127,7 +127,7 @@ public SpawnResult exec(Spawn spawn, SpawnExecutionContext context) @Override public boolean canExec(Spawn spawn) { - return Spawns.supportsWorkers(spawn); + return Spawns.supportsWorkers(spawn) || Spawns.supportsMultiplexWorkers(spawn); } private SpawnResult actuallyExec(Spawn spawn, SpawnExecutionContext context) @@ -168,12 +168,11 @@ private SpawnResult actuallyExec(Spawn spawn, SpawnExecutionContext context) spawn.getMnemonic(), workerFilesCombinedHash, workerFiles, - context.speculating()); - - WorkRequest workRequest = createWorkRequest(spawn, context, flagFiles, inputFileCache); + context.speculating(), + Spawns.supportsMultiplexWorkers(spawn)); long startTime = System.currentTimeMillis(); - WorkResponse response = execInWorker(spawn, key, workRequest, context, inputFiles, outputs); + WorkResponse response = execInWorker(spawn, key, context, inputFiles, outputs, flagFiles, inputFileCache); Duration wallTime = Duration.ofMillis(System.currentTimeMillis() - startTime); FileOutErr outErr = context.getFileOutErr(); @@ -221,7 +220,8 @@ private WorkRequest createWorkRequest( Spawn spawn, SpawnExecutionContext context, List flagfiles, - MetadataProvider inputFileCache) + MetadataProvider inputFileCache, + int workerId) throws IOException { WorkRequest.Builder requestBuilder = WorkRequest.newBuilder(); for (String flagfile : flagfiles) { @@ -246,7 +246,7 @@ private WorkRequest createWorkRequest( .setDigest(digest) .build(); } - return requestBuilder.build(); + return requestBuilder.setRequestId(workerId).build(); } /** @@ -283,18 +283,21 @@ private static boolean isExternalRepositoryLabel(String arg) { private WorkResponse execInWorker( Spawn spawn, WorkerKey key, - WorkRequest request, SpawnExecutionContext context, SandboxInputs inputFiles, - SandboxOutputs outputs) + SandboxOutputs outputs, + List flagFiles, + MetadataProvider inputFileCache) throws InterruptedException, ExecException { Worker worker = null; WorkResponse response; + WorkRequest request; ActionExecutionMetadata owner = spawn.getResourceOwner(); try { try { worker = workers.borrowObject(key); + request = createWorkRequest(spawn, context, flagFiles, inputFileCache, worker.getWorkerId()); } catch (IOException e) { throw new UserExecException( ErrorMessage.builder() @@ -331,8 +334,7 @@ private WorkResponse execInWorker( } try { - request.writeDelimitedTo(worker.getOutputStream()); - worker.getOutputStream().flush(); + worker.putRequest(request); } catch (IOException e) { throw new UserExecException( ErrorMessage.builder() @@ -345,17 +347,13 @@ private WorkResponse execInWorker( .toString()); } - RecordingInputStream recordingStream = new RecordingInputStream(worker.getInputStream()); - recordingStream.startRecording(4096); try { - // response can be null when the worker has already closed stdout at this point and thus - // the InputStream is at EOF. - response = WorkResponse.parseDelimitedFrom(recordingStream); + response = worker.getResponse(); } catch (IOException e) { // If protobuf couldn't parse the response, try to print whatever the failing worker wrote // to stdout - it's probably a stack trace or some kind of error message that will help // the user figure out why the compiler is failing. - recordingStream.readRemaining(); + String recordingStreamMessage = worker.getRecordingStreamMessage(); throw new UserExecException( ErrorMessage.builder() .message( @@ -363,7 +361,7 @@ private WorkResponse execInWorker( + "Did you try to print something to stdout? Workers aren't allowed to " + "do this, as it breaks the protocol between Bazel and the worker " + "process.") - .logText(recordingStream.getRecordedDataAsString()) + .logText(recordingStreamMessage) .exception(e) .build() .toString()); diff --git a/src/main/protobuf/worker_protocol.proto b/src/main/protobuf/worker_protocol.proto index 4706792f4e78f2..ebe1fe19386cb7 100644 --- a/src/main/protobuf/worker_protocol.proto +++ b/src/main/protobuf/worker_protocol.proto @@ -38,6 +38,10 @@ message WorkRequest { // The inputs that the worker is allowed to read during execution of this // request. repeated Input inputs = 2; + + // To support multiplex worker, each WorkRequest must have an unique ID. This ID should + // be attached unchanged to the WorkResponse. + int32 request_id = 3; } // The worker sends this message to Blaze when it finished its work on the WorkRequest message. @@ -48,4 +52,9 @@ message WorkResponse { // compiler warnings / errors etc. - thus we'll use a string type here, which gives us UTF-8 // encoding. string output = 2; + + // To support multiplex worker, each WorkResponse must have an unique ID. Since worker processes + // which support multiplex worker will handle multiple WorkRequests in parallel, this ID will + // be used to determined which WorkerProxy does this WorkResponse belong to. + int32 request_id = 3; } diff --git a/src/test/java/com/google/devtools/build/lib/BUILD b/src/test/java/com/google/devtools/build/lib/BUILD index afac6bd9fa0d93..ea60010c0c23aa 100644 --- a/src/test/java/com/google/devtools/build/lib/BUILD +++ b/src/test/java/com/google/devtools/build/lib/BUILD @@ -1594,6 +1594,7 @@ java_test( "//src/main/java/com/google/devtools/build/lib:os_util", "//src/main/java/com/google/devtools/build/lib:resource-converter", "//src/main/java/com/google/devtools/build/lib:util", + "//src/main/java/com/google/devtools/build/lib/actions", "//src/main/java/com/google/devtools/build/lib/sandbox", "//src/main/java/com/google/devtools/build/lib/vfs", "//src/main/java/com/google/devtools/build/lib/vfs/inmemoryfs", @@ -1644,6 +1645,17 @@ java_binary( ], ) +java_binary( + name = "ExampleWorkerMultiplexer", + main_class = "com.google.devtools.build.lib.worker.ExampleWorkerMultiplexer", + visibility = [ + "//src/test/shell/integration:__pkg__", + ], + runtime_deps = [ + ":ExampleWorker-lib", + ], +) + TEST_SUITES = [ "ziputils", "rules", diff --git a/src/test/java/com/google/devtools/build/lib/analysis/actions/SpawnActionTest.java b/src/test/java/com/google/devtools/build/lib/analysis/actions/SpawnActionTest.java index 0788a8970a213a..b51f7db5c75800 100644 --- a/src/test/java/com/google/devtools/build/lib/analysis/actions/SpawnActionTest.java +++ b/src/test/java/com/google/devtools/build/lib/analysis/actions/SpawnActionTest.java @@ -32,6 +32,7 @@ import com.google.devtools.build.lib.actions.ParameterFile.ParameterFileType; import com.google.devtools.build.lib.actions.RunfilesSupplier; import com.google.devtools.build.lib.actions.Spawn; +import com.google.devtools.build.lib.actions.Spawns; import com.google.devtools.build.lib.actions.cache.VirtualActionInput; import com.google.devtools.build.lib.actions.extra.EnvironmentVariable; import com.google.devtools.build.lib.actions.extra.ExtraActionInfo; @@ -471,6 +472,33 @@ public void testGetExtraActionInfoOnAspects() throws Exception { "parameter", ExtraActionInfo.StringList.newBuilder().addValue("param_value").build()); } + private SpawnAction createWorkerSupportSpawn(Map executionInfoVariables) throws Exception { + Artifact input = getSourceArtifact("input"); + Artifact output = getBinArtifactWithNoOwner("output"); + Action[] actions = + builder() + .addInput(input) + .addOutput(output) + .setExecutionInfo(executionInfoVariables) + .setExecutable(scratch.file("/bin/xxx").asFragment()) + .build(ActionsTestUtil.NULL_ACTION_OWNER, targetConfig); + return (SpawnAction) actions[0]; + } + + @Test + public void testWorkerSupport() throws Exception { + SpawnAction workerSupportSpawn = + createWorkerSupportSpawn(ImmutableMap.of("supports-workers", "1")); + assertThat(Spawns.supportsWorkers(workerSupportSpawn.getSpawn())).isEqualTo(true); + } + + @Test + public void testMultiplexWorkerSupport() throws Exception { + SpawnAction multiplexWorkerSupportSpawn = + createWorkerSupportSpawn(ImmutableMap.of("supports-multiplex-workers", "1")); + assertThat(Spawns.supportsMultiplexWorkers(multiplexWorkerSupportSpawn.getSpawn())).isEqualTo(true); + } + private static RunfilesSupplier runfilesSupplier(Artifact manifest, PathFragment dir) { return new RunfilesSupplierImpl(dir, Runfiles.EMPTY, manifest); } diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerMultiplexer.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerMultiplexer.java new file mode 100644 index 00000000000000..716b9082237744 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerMultiplexer.java @@ -0,0 +1,263 @@ +// Copyright 2015 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.worker; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.devtools.build.lib.worker.ExampleWorkerMultiplexerOptions.ExampleWorkMultiplexerOptions; +import com.google.devtools.build.lib.worker.WorkerProtocol.Input; +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkRequest; +import com.google.devtools.build.lib.worker.WorkerProtocol.WorkResponse; +import com.google.devtools.common.options.OptionsParser; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.PrintStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * An example implementation of a worker process that is used for integration tests. + */ +public class ExampleWorkerMultiplexer { + + static final Pattern FLAG_FILE_PATTERN = Pattern.compile("(?:@|--?flagfile=)(.+)"); + + // A UUID that uniquely identifies this running worker process. + static final UUID workerUuid = UUID.randomUUID(); + + // Creating Executor Service with a thread pool of Size 3. + static final int concurrentThreadNumber = 3; + + // A counter that increases with each work unit processed. + static int workUnitCounter = 1; + + static int counterOutput = workUnitCounter; + + static Semaphore protectResponse = new Semaphore(1); + + // Keep state across multiple builds. + static final LinkedHashMap inputs = new LinkedHashMap<>(); + + public static void main(String[] args) throws Exception { + if (ImmutableSet.copyOf(args).contains("--persistent_worker")) { + OptionsParser parser = + OptionsParser.builder() + .optionsClasses(ExampleWorkerMultiplexerOptions.class) + .allowResidue(false) + .build(); + parser.parse(args); + ExampleWorkerMultiplexerOptions workerOptions = parser.getOptions(ExampleWorkerMultiplexerOptions.class); + Preconditions.checkState(workerOptions.persistentWorker); + + runPersistentWorker(workerOptions); + } else { + // This is a single invocation of the example that exits after it processed the request. + processRequest(parserHelper(ImmutableList.copyOf(args))); + } + } + + private static void runPersistentWorker(ExampleWorkerMultiplexerOptions workerOptions) throws IOException { + PrintStream originalStdOut = System.out; + PrintStream originalStdErr = System.err; + + ExecutorService executorService = Executors.newFixedThreadPool(concurrentThreadNumber); + + while (true) { + try { + WorkRequest request = WorkRequest.parseDelimitedFrom(System.in); + Integer requestId = request.getRequestId(); + if (request == null) { + break; + } + + inputs.clear(); + for (Input input : request.getInputsList()) { + inputs.put(input.getPath(), input.getDigest().toStringUtf8()); + } + + // If true, returns corrupt responses instead of correct protobufs. + boolean poisoned = false; + if (workerOptions.poisonAfter > 0 && workUnitCounter > workerOptions.poisonAfter) { + poisoned = true; + } + + if (poisoned && workerOptions.hardPoison) { + System.err.println("I'm a very poisoned worker and will just crash."); + System.exit(1); + } else { + int exitCode = 0; + try { + OptionsParser parser = parserHelper(request.getArgumentsList()); + ExampleWorkMultiplexerOptions options = parser.getOptions(ExampleWorkMultiplexerOptions.class); + if (options.writeCounter) { + counterOutput = workUnitCounter++; + } + executorService.submit(createTask(originalStdOut, originalStdErr, workerOptions, requestId, parser, poisoned)); + } catch (Exception e) { + e.printStackTrace(); + exitCode = 1; + WorkResponse.newBuilder() + .setRequestId(requestId) + .setOutput((new ByteArrayOutputStream()).toString()) + .setExitCode(exitCode) + .build() + .writeDelimitedTo(System.out); + } + } + + if (workerOptions.exitAfter > 0 && workUnitCounter > workerOptions.exitAfter) { + System.in.close(); + } + } finally { + // Be a good worker process and consume less memory when idle. + System.gc(); + } + } + } + + private static OptionsParser parserHelper(List args) throws Exception { + ImmutableList.Builder expandedArgs = ImmutableList.builder(); + for (String arg : args) { + Matcher flagFileMatcher = FLAG_FILE_PATTERN.matcher(arg); + if (flagFileMatcher.matches()) { + expandedArgs.addAll(Files.readAllLines(Paths.get(flagFileMatcher.group(1)), UTF_8)); + } else { + expandedArgs.add(arg); + } + } + + OptionsParser parser = + OptionsParser.builder().optionsClasses(ExampleWorkMultiplexerOptions.class).allowResidue(true).build(); + parser.parse(expandedArgs.build()); + + return parser; + } + + private static Runnable createTask( + PrintStream originalStdOut, + PrintStream originalStdErr, + ExampleWorkerMultiplexerOptions workerOptions, + Integer requestId, + OptionsParser parser, + boolean poisoned) { + return () -> { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + int exitCode = 0; + + try { + try (PrintStream ps = new PrintStream(baos)) { + System.setOut(ps); + System.setErr(ps); + + if (poisoned) { + System.out.println("I'm a poisoned worker and this is not a protobuf."); + System.out.println("Here's a fake stack trace for you:"); + System.out.println(" at com.example.Something(Something.java:83)"); + System.out.println(" at java.lang.Thread.run(Thread.java:745)"); + System.out.print("And now, 8k of random bytes: "); + byte[] b = new byte[8192]; + new Random().nextBytes(b); + System.out.write(b); + } else { + try { + processRequest(parser); + } catch (Exception e) { + e.printStackTrace(); + exitCode = 1; + } + } + } finally { + System.setOut(originalStdOut); + System.setErr(originalStdErr); + } + + if (poisoned) { + baos.writeTo(System.out); + } else { + protectResponse.acquire(); + WorkResponse.newBuilder() + .setRequestId(requestId) + .setOutput(baos.toString()) + .setExitCode(exitCode) + .build() + .writeDelimitedTo(System.out); + protectResponse.release(); + } + System.out.flush(); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + }; + } + + private static void processRequest(OptionsParser parser) throws Exception { + ExampleWorkMultiplexerOptions options = parser.getOptions(ExampleWorkMultiplexerOptions.class); + + List outputs = new ArrayList<>(); + + if (options.delay) { + Integer randomDelay = new Random().nextInt(200) + 100; + TimeUnit.MILLISECONDS.sleep(randomDelay); + outputs.add("DELAY " + randomDelay + " MILLISECONDS"); + } + + if (options.writeUUID) { + outputs.add("UUID " + workerUuid.toString()); + } + + if (options.writeCounter) { + outputs.add("COUNTER " + counterOutput); + } + + String residueStr = Joiner.on(' ').join(parser.getResidue()); + if (options.uppercase) { + residueStr = residueStr.toUpperCase(); + } + outputs.add(residueStr); + + if (options.printInputs) { + for (Map.Entry input : inputs.entrySet()) { + outputs.add("INPUT " + input.getKey() + " " + input.getValue()); + } + } + + if (options.printEnv) { + for (Map.Entry entry : System.getenv().entrySet()) { + outputs.add(entry.getKey() + "=" + entry.getValue()); + } + } + + String outputStr = Joiner.on('\n').join(outputs); + if (options.outputFile.isEmpty()) { + System.out.println(outputStr); + } else { + try (PrintStream outputFile = new PrintStream(options.outputFile)) { + outputFile.println(outputStr); + } + } + } +} diff --git a/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerMultiplexerOptions.java b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerMultiplexerOptions.java new file mode 100644 index 00000000000000..261d1142e2fc22 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/worker/ExampleWorkerMultiplexerOptions.java @@ -0,0 +1,130 @@ +// Copyright 2015 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.google.devtools.build.lib.worker; + +import com.google.devtools.common.options.Option; +import com.google.devtools.common.options.OptionDocumentationCategory; +import com.google.devtools.common.options.OptionEffectTag; +import com.google.devtools.common.options.OptionsBase; + +/** + * Options for the example worker itself. + */ +public class ExampleWorkerMultiplexerOptions extends OptionsBase { + + /** + * Options for the example worker concerning single units of work. + */ + public static class ExampleWorkMultiplexerOptions extends OptionsBase { + @Option( + name = "output_file", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "", + help = "Write the output to a file instead of stdout." + ) + public String outputFile; + + @Option( + name = "uppercase", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "false", + help = "Uppercase the input." + ) + public boolean uppercase; + + @Option( + name = "write_uuid", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "false", + help = "Writes a UUID into the output." + ) + public boolean writeUUID; + + @Option( + name = "write_counter", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "false", + help = "Writes a counter that increases with each work unit processed into the output." + ) + public boolean writeCounter; + + @Option( + name = "print_inputs", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "false", + help = "Writes a list of input files and their digests." + ) + public boolean printInputs; + + @Option( + name = "print_env", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "false", + help = "Prints a list of all environment variables." + ) + public boolean printEnv; + + @Option( + name = "delay", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "false", + help = "Randomly delay the worker response (between 100 to 300 ms)." + ) + public boolean delay; + } + + @Option( + name = "persistent_worker", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "false" + ) + public boolean persistentWorker; + + @Option( + name = "exit_after", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "0", + help = "The worker exits after processing this many work units (default: disabled)." + ) + public int exitAfter; + + @Option( + name = "poison_after", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "0", + help = + "Poisons the worker after processing this many work units, so that it returns a " + + "corrupt response instead of a response protobuf from then on (default: disabled)." + ) + public int poisonAfter; + + @Option( + name = "hard_poison", + documentationCategory = OptionDocumentationCategory.UNCATEGORIZED, + effectTags = {OptionEffectTag.NO_OP}, + defaultValue = "false", + help = "Instead of writing an error message to stdout, write it to stderr and terminate." + ) + public boolean hardPoison; +} diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java index 0552cb8c5ac2d6..9f45d2cd6e73f4 100644 --- a/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerFactoryTest.java @@ -42,15 +42,66 @@ public void sandboxedWorkerPathEndsWithWorkspaceName() throws Exception { WorkerFactory workerFactory = new WorkerFactory(new WorkerOptions(), workerBaseDir); WorkerKey workerKey = new WorkerKey( - ImmutableList.of(), - ImmutableMap.of(), - fs.getPath("/outputbase/execroot/workspace"), - "dummy", - HashCode.fromInt(0), - ImmutableSortedMap.of(), - true); + /* args= */ ImmutableList.of(), + /* env= */ ImmutableMap.of(), + /* execRoot= */ fs.getPath("/outputbase/execroot/workspace"), + /* mnemonic= */ "dummy", + /* workerFilesCombinedHash= */ HashCode.fromInt(0), + /* workerFilesWithHashes= */ ImmutableSortedMap.of(), + /* mustBeSandboxed= */ true, + /* proxied= */ false); Path sandboxedWorkerPath = workerFactory.getSandboxedWorkerPath(workerKey, 1); assertThat(sandboxedWorkerPath.getBaseName()).isEqualTo("workspace"); } + + /** + * WorkerFactory should create correct worker type based on WorkerKey. + */ + @Test + public void workerCreationTypeCheck() throws Exception { + Path workerBaseDir = fs.getPath("/outputbase/bazel-workers"); + WorkerFactory workerFactory = new WorkerFactory(new WorkerOptions(), workerBaseDir); + WorkerKey sandboxedWorkerKey = + new WorkerKey( + /* args= */ ImmutableList.of(), + /* env= */ ImmutableMap.of(), + /* execRoot= */ fs.getPath("/outputbase/execroot/workspace"), + /* mnemonic= */ "dummy", + /* workerFilesCombinedHash= */ HashCode.fromInt(0), + /* workerFilesWithHashes= */ ImmutableSortedMap.of(), + /* mustBeSandboxed= */ true, + /* proxied= */ false); + Worker sandboxedWorker = workerFactory.create(sandboxedWorkerKey); + assertThat(sandboxedWorker.getClass()).isEqualTo(SandboxedWorker.class); + + WorkerKey nonProxiedWorkerKey = + new WorkerKey( + /* args= */ ImmutableList.of(), + /* env= */ ImmutableMap.of(), + /* execRoot= */ fs.getPath("/outputbase/execroot/workspace"), + /* mnemonic= */ "dummy", + /* workerFilesCombinedHash= */ HashCode.fromInt(0), + /* workerFilesWithHashes= */ ImmutableSortedMap.of(), + /* mustBeSandboxed= */ false, + /* proxied= */ false); + Worker nonProxiedWorker = workerFactory.create(nonProxiedWorkerKey); + assertThat(nonProxiedWorker.getClass()).isEqualTo(Worker.class); + + WorkerKey proxiedWorkerKey = + new WorkerKey( + /* args= */ ImmutableList.of(), + /* env= */ ImmutableMap.of(), + /* execRoot= */ fs.getPath("/outputbase/execroot/workspace"), + /* mnemonic= */ "dummy", + /* workerFilesCombinedHash= */ HashCode.fromInt(0), + /* workerFilesWithHashes= */ ImmutableSortedMap.of(), + /* mustBeSandboxed= */ false, + /* proxied= */ true); + Worker proxiedWorker = workerFactory.create(proxiedWorkerKey); + // If proxied = true, WorkerProxy is created along with a WorkerMultiplexer. + // Destroy WorkerMultiplexer to avoid unexpected behavior in WorkerMultiplexerManagerTest. + WorkerMultiplexerManager.removeInstance(proxiedWorkerKey.hashCode()); + assertThat(proxiedWorker.getClass()).isEqualTo(WorkerProxy.class); + } } diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerKeyTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerKeyTest.java new file mode 100644 index 00000000000000..5cd240ec5b3e41 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerKeyTest.java @@ -0,0 +1,54 @@ +// Copyright 2019 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.worker; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSortedMap; +import com.google.common.hash.HashCode; +import com.google.devtools.build.lib.vfs.FileSystem; +import com.google.devtools.build.lib.vfs.Path; +import com.google.devtools.build.lib.vfs.inmemoryfs.InMemoryFileSystem; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link WorkerKey}. */ +@RunWith(JUnit4.class) +public class WorkerKeyTest { + final FileSystem fs = new InMemoryFileSystem(); + + Path workerBaseDir = fs.getPath("/outputbase/bazel-workers"); + WorkerKey workerKey = + new WorkerKey( + /* args= */ ImmutableList.of("arg1", "arg2", "arg3"), + /* env= */ ImmutableMap.of("env1", "foo", "env2", "bar"), + /* execRoot= */ fs.getPath("/outputbase/execroot/workspace"), + /* mnemonic= */ "dummy", + /* workerFilesCombinedHash= */ HashCode.fromInt(0), + /* workerFilesWithHashes= */ ImmutableSortedMap.of(), + /* mustBeSandboxed= */ true, + /* proxied= */ true); + + @Test + public void testWorkerKeyGetter() { + assertThat(workerKey.mustBeSandboxed()).isEqualTo(true); + assertThat(workerKey.getProxied()).isEqualTo(true); + // Hash code contains args, env, execRoot, and mnemonic. + assertThat(workerKey.hashCode()).isEqualTo(322455166); + } +} diff --git a/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManagerTest.java b/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManagerTest.java new file mode 100644 index 00000000000000..06967311128b07 --- /dev/null +++ b/src/test/java/com/google/devtools/build/lib/worker/WorkerMultiplexerManagerTest.java @@ -0,0 +1,74 @@ +// Copyright 2019 The Bazel Authors. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.google.devtools.build.lib.worker; + +import static com.google.common.truth.Truth.assertThat; +import static com.google.devtools.build.lib.testutil.MoreAsserts.assertThrows; + +import com.google.devtools.build.lib.actions.UserExecException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link WorkerMultiplexerManager}. */ +@RunWith(JUnit4.class) +public class WorkerMultiplexerManagerTest { + + @Test + public void instanceCreationRemovalTest() throws Exception { + // Create a WorkerProxy hash and request for a WorkerMultiplexer. + Integer worker1Hash = "worker1".hashCode(); + WorkerMultiplexer wm1 = WorkerMultiplexerManager.getInstance(worker1Hash); + + assertThat(WorkerMultiplexerManager.getMultiplexer(worker1Hash)).isEqualTo(wm1); + assertThat(WorkerMultiplexerManager.getRefCount(worker1Hash)).isEqualTo(1); + assertThat(WorkerMultiplexerManager.getInstanceCount()).isEqualTo(1); + + // Create another WorkerProxy hash and request for a WorkerMultiplexer. + Integer worker2Hash = "worker2".hashCode(); + WorkerMultiplexer wm2 = WorkerMultiplexerManager.getInstance(worker2Hash); + + assertThat(WorkerMultiplexerManager.getMultiplexer(worker2Hash)).isEqualTo(wm2); + assertThat(WorkerMultiplexerManager.getRefCount(worker2Hash)).isEqualTo(1); + assertThat(WorkerMultiplexerManager.getInstanceCount()).isEqualTo(2); + + // Use the same WorkerProxy hash, it shouldn't instantiate a new WorkerMultiplexer. + WorkerMultiplexer wm2Annex = WorkerMultiplexerManager.getInstance(worker2Hash); + + assertThat(wm2).isEqualTo(wm2Annex); + assertThat(WorkerMultiplexerManager.getRefCount(worker2Hash)).isEqualTo(2); + assertThat(WorkerMultiplexerManager.getInstanceCount()).isEqualTo(2); + + // Remove an instance. If reference count is larger than 0, instance shouldn't be destroyed. + WorkerMultiplexerManager.removeInstance(worker2Hash); + + assertThat(WorkerMultiplexerManager.getRefCount(worker2Hash)).isEqualTo(1); + assertThat(WorkerMultiplexerManager.getInstanceCount()).isEqualTo(2); + + // Remove an instance. Reference count is down to 0, instance should be destroyed. + WorkerMultiplexerManager.removeInstance(worker2Hash); + + assertThrows(UserExecException.class, () -> WorkerMultiplexerManager.getMultiplexer(worker2Hash)); + assertThat(WorkerMultiplexerManager.getInstanceCount()).isEqualTo(1); + + // WorkerProxy hash not found. + assertThrows(UserExecException.class, () -> WorkerMultiplexerManager.removeInstance(worker2Hash)); + + // Remove all the instances. + WorkerMultiplexerManager.removeInstance(worker1Hash); + + assertThat(WorkerMultiplexerManager.getInstanceCount()).isEqualTo(0); + } +} diff --git a/src/test/shell/integration/BUILD b/src/test/shell/integration/BUILD index d6d640fb858430..4d668e9af0d352 100644 --- a/src/test/shell/integration/BUILD +++ b/src/test/shell/integration/BUILD @@ -446,6 +446,24 @@ sh_test( ], ) +sh_test( + name = "bazel_worker_multiplexer_test", + size = "large", + srcs = ["bazel_worker_multiplexer_test.sh"], + args = [ + "--worker_sandboxing=no", + "non-sandboxed", + ], + data = [ + ":test-deps", + "//src/test/java/com/google/devtools/build/lib:ExampleWorkerMultiplexer_deploy.jar", + ], + shard_count = 3, + tags = [ + "no_windows", + ], +) + sh_test( name = "bazel_sandboxed_worker_test", size = "large", diff --git a/src/test/shell/integration/bazel_worker_multiplexer_test.sh b/src/test/shell/integration/bazel_worker_multiplexer_test.sh new file mode 100755 index 00000000000000..e6cb1b8ee1ee80 --- /dev/null +++ b/src/test/shell/integration/bazel_worker_multiplexer_test.sh @@ -0,0 +1,548 @@ +#!/bin/bash +# +# Copyright 2015 The Bazel Authors. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Test rules provided in Bazel not tested by examples +# + +set -u +ADDITIONAL_BUILD_FLAGS=$1 +WORKER_TYPE_LOG_STRING=$2 +shift 2 + +# Load the test setup defined in the parent directory +CURRENT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +source "${CURRENT_DIR}/../integration_test_setup.sh" \ + || { echo "integration_test_setup.sh not found!" >&2; exit 1; } + +# TODO(philwo): Change this so the path to the custom worker gets passed in as an argument to the +# test, once the bug that makes using the "args" attribute with sh_tests in Bazel impossible is +# fixed. +example_worker=$(find $BAZEL_RUNFILES -name ExampleWorkerMultiplexer_deploy.jar) + +add_to_bazelrc "build -s" +add_to_bazelrc "build --spawn_strategy=worker,standalone" +add_to_bazelrc "build --worker_verbose --worker_max_instances=3" +add_to_bazelrc "build --debug_print_action_contexts" +add_to_bazelrc "build ${ADDITIONAL_BUILD_FLAGS}" + +function set_up() { + # Run each test in a separate folder so that their output files don't get cached. + WORKSPACE_SUBDIR=$(basename $(mktemp -d ${WORKSPACE_DIR}/testXXXXXX)) + cd ${WORKSPACE_SUBDIR} + BINS=$(bazel info $PRODUCT_NAME-bin)/${WORKSPACE_SUBDIR} + + # This causes Bazel to shut down all running workers. + bazel build --worker_quit_after_build &> $TEST_log \ + || fail "'bazel build --worker_quit_after_build' during test set_up failed" +} + +function prepare_example_worker() { + cp ${example_worker} worker_lib.jar + chmod +w worker_lib.jar + echo "exampledata" > worker_data.txt + + mkdir worker_data_dir + echo "veryexample" > worker_data_dir/more_data.txt + + cat >work.bzl <<'EOF' +def _impl(ctx): + worker = ctx.executable.worker + output = ctx.outputs.out + + argfile_inputs = [] + argfile_arguments = [] + if ctx.attr.multiflagfiles: + # Generate one flagfile per command-line arg, alternate between @ and --flagfile= style. + # This is used to test the code that handles multiple flagfiles and the --flagfile= style. + idx = 1 + for arg in ["--output_file=" + output.path] + ctx.attr.args: + argfile = ctx.actions.declare_file("%s_worker_input_%s" % (ctx.label.name, idx)) + ctx.actions.write(output=argfile, content=arg) + argfile_inputs.append(argfile) + flagfile_prefix = "@" if (idx % 2 == 0) else "--flagfile=" + argfile_arguments.append(flagfile_prefix + argfile.path) + idx += 1 + else: + # Generate the "@"-file containing the command-line args for the unit of work. + argfile = ctx.actions.declare_file("%s_worker_input" % ctx.label.name) + argfile_contents = "\n".join(["--output_file=" + output.path] + ctx.attr.args) + ctx.actions.write(output=argfile, content=argfile_contents) + argfile_inputs.append(argfile) + argfile_arguments.append("@" + argfile.path) + + ctx.actions.run( + inputs=argfile_inputs + ctx.files.srcs, + outputs=[output], + executable=worker, + progress_message="Working on %s" % ctx.label.name, + mnemonic="Work", + execution_requirements={"supports-multiplex-workers": "1"}, + arguments=ctx.attr.worker_args + argfile_arguments, + ) + +work = rule( + implementation=_impl, + attrs={ + "worker": attr.label(cfg="host", mandatory=True, allow_files=True, executable=True), + "worker_args": attr.string_list(), + "args": attr.string_list(), + "srcs": attr.label_list(allow_files=True), + "multiflagfiles": attr.bool(default=False), + }, + outputs = {"out": "%{name}.out"}, +) +EOF + cat >BUILD <>BUILD < $TEST_log \ + || fail "build failed" + assert_equals "hello world" "$(cat $BINS/hello_world.out)" + + bazel build :hello_world_uppercase &> $TEST_log \ + || fail "build failed" + assert_equals "HELLO WORLD" "$(cat $BINS/hello_world_uppercase.out)" +} + +function test_multiple_flagfiles() { + prepare_example_worker + cat >>BUILD < $TEST_log \ + || fail "build failed" + assert_equals "hello world nice to meet you" "$(cat $BINS/multi_hello_world.out)" +} + +function test_workers_quit_after_build() { + prepare_example_worker + cat >>BUILD <<'EOF' +[work( + name = "hello_world_%s" % idx, + worker = ":worker", + args = ["--write_counter"], +) for idx in range(10)] +EOF + + bazel build --worker_quit_after_build :hello_world_1 &> $TEST_log \ + || fail "build failed" + work_count=$(cat $BINS/hello_world_1.out | grep COUNTER | cut -d' ' -f2) + assert_equals "1" $work_count + + bazel build --worker_quit_after_build :hello_world_2 &> $TEST_log \ + || fail "build failed" + work_count=$(cat $BINS/hello_world_2.out | grep COUNTER | cut -d' ' -f2) + # If the worker hadn't quit as we told it, it would have been reused, causing this to be a "2". + assert_equals "1" $work_count +} + +function test_build_fails_when_worker_exits() { + prepare_example_worker + cat >>BUILD <<'EOF' +[work( + name = "hello_world_%s" % idx, + worker = ":worker", + worker_args = ["--exit_after=1"], + args = ["--write_uuid", "--write_counter"], +) for idx in range(10)] +EOF + + bazel build :hello_world_1 &> $TEST_log \ + || fail "build failed" + + bazel build :hello_world_2 &> $TEST_log \ + && fail "expected build to failed" || true + + expect_log "Worker process quit or closed its stdin stream when we tried to send a WorkRequest" +} + +function test_worker_restarts_when_worker_binary_changes() { + prepare_example_worker + cat >>BUILD <<'EOF' +[work( + name = "hello_world_%s" % idx, + worker = ":worker", + args = ["--write_uuid", "--write_counter"], +) for idx in range(10)] +EOF + + echo "First run" >> $TEST_log + bazel build :hello_world_1 &> $TEST_log \ + || fail "build failed" + worker_uuid_1=$(cat $BINS/hello_world_1.out | grep UUID | cut -d' ' -f2) + work_count=$(cat $BINS/hello_world_1.out | grep COUNTER | cut -d' ' -f2) + assert_equals "1" $work_count + + echo "Second run" >> $TEST_log + bazel build :hello_world_2 &> $TEST_log \ + || fail "build failed" + worker_uuid_2=$(cat $BINS/hello_world_2.out | grep UUID | cut -d' ' -f2) + work_count=$(cat $BINS/hello_world_2.out | grep COUNTER | cut -d' ' -f2) + assert_equals "2" $work_count + + # Check that the same worker was used twice. + assert_equals "$worker_uuid_1" "$worker_uuid_2" + + # Modify the example worker jar to trigger a rebuild of the worker. + tr -cd '[:alnum:]' < /dev/urandom | head -c32 > dummy_file + zip worker_lib.jar dummy_file + rm dummy_file + + bazel build :hello_world_3 &> $TEST_log \ + || fail "build failed" + worker_uuid_3=$(cat $BINS/hello_world_3.out | grep UUID | cut -d' ' -f2) + work_count=$(cat $BINS/hello_world_3.out | grep COUNTER | cut -d' ' -f2) + assert_equals "1" $work_count + + expect_log "worker .* can no longer be used, because its files have changed on disk" + expect_log "worker_lib.jar: .* -> .*" + + # Check that we used a new worker. + assert_not_equals "$worker_uuid_2" "$worker_uuid_3" +} + +function test_worker_restarts_when_worker_runfiles_change() { + prepare_example_worker + cat >>BUILD <<'EOF' +[work( + name = "hello_world_%s" % idx, + worker = ":worker", + args = ["--write_uuid", "--write_counter"], +) for idx in range(10)] +EOF + + bazel build :hello_world_1 &> $TEST_log \ + || fail "build failed" + worker_uuid_1=$(cat $BINS/hello_world_1.out | grep UUID | cut -d' ' -f2) + work_count=$(cat $BINS/hello_world_1.out | grep COUNTER | cut -d' ' -f2) + assert_equals "1" $work_count + + bazel build :hello_world_2 &> $TEST_log \ + || fail "build failed" + worker_uuid_2=$(cat $BINS/hello_world_2.out | grep UUID | cut -d' ' -f2) + work_count=$(cat $BINS/hello_world_2.out | grep COUNTER | cut -d' ' -f2) + assert_equals "2" $work_count + + # Check that the same worker was used twice. + assert_equals "$worker_uuid_1" "$worker_uuid_2" + + # "worker_data.txt" is included in the "data" attribute of the example worker. + echo "changeddata" > worker_data.txt + + bazel build :hello_world_3 &> $TEST_log \ + || fail "build failed" + worker_uuid_3=$(cat $BINS/hello_world_3.out | grep UUID | cut -d' ' -f2) + work_count=$(cat $BINS/hello_world_3.out | grep COUNTER | cut -d' ' -f2) + assert_equals "1" $work_count + + expect_log "worker .* can no longer be used, because its files have changed on disk" + expect_log "worker_data.txt: .* -> .*" + + # Check that we used a new worker. + assert_not_equals "$worker_uuid_2" "$worker_uuid_3" +} + +# When a worker does not conform to the protocol and returns a response that is not a parseable +# protobuf, it must be killed and a helpful error message should be printed. +function test_build_fails_when_worker_returns_junk() { + prepare_example_worker + cat >>BUILD <<'EOF' +[work( + name = "hello_world_%s" % idx, + worker = ":worker", + worker_args = ["--poison_after=1"], + args = ["--write_uuid", "--write_counter"], +) for idx in range(10)] +EOF + + bazel build :hello_world_1 &> $TEST_log \ + || fail "build failed" + + # A failing worker should cause the build to fail. + bazel build :hello_world_2 &> $TEST_log \ + && fail "expected build to fail" || true + + # Check that a helpful error message was printed. + expect_log "Worker process returned an unparseable WorkResponse!" + expect_log "Did you try to print something to stdout" + expect_log "I'm a poisoned worker and this is not a protobuf." +} + +function test_input_digests() { + prepare_example_worker + cat >>BUILD <<'EOF' +[work( + name = "hello_world_%s" % idx, + worker = ":worker", + args = ["--write_uuid", "--print_inputs"], + srcs = [":input.txt"], +) for idx in range(10)] +EOF + + echo "hello world" > input.txt + bazel build :hello_world_1 &> $TEST_log \ + || fail "build failed" + worker_uuid_1=$(cat $BINS/hello_world_1.out | grep UUID | cut -d' ' -f2) + hash1=$(egrep "INPUT .*/input.txt " $BINS/hello_world_1.out | cut -d' ' -f3) + + bazel build :hello_world_2 >> $TEST_log 2>&1 \ + || fail "build failed" + worker_uuid_2=$(cat $BINS/hello_world_2.out | grep UUID | cut -d' ' -f2) + hash2=$(egrep "INPUT .*/input.txt " $BINS/hello_world_2.out | cut -d' ' -f3) + + assert_equals "$worker_uuid_1" "$worker_uuid_2" + assert_equals "$hash1" "$hash2" + + echo "changeddata" > input.txt + + bazel build :hello_world_3 >> $TEST_log 2>&1 \ + || fail "build failed" + worker_uuid_3=$(cat $BINS/hello_world_3.out | grep UUID | cut -d' ' -f2) + hash3=$(egrep "INPUT .*/input.txt " $BINS/hello_world_3.out | cut -d' ' -f3) + + assert_equals "$worker_uuid_2" "$worker_uuid_3" + assert_not_equals "$hash2" "$hash3" +} + +function test_worker_verbose() { + prepare_example_worker + cat >>BUILD <<'EOF' +[work( + name = "hello_world_%s" % idx, + worker = ":worker", + args = ["--write_uuid", "--write_counter"], +) for idx in range(10)] +EOF + + bazel build --worker_quit_after_build :hello_world_1 &> $TEST_log \ + || fail "build failed" + expect_log "Created new ${WORKER_TYPE_LOG_STRING} Work worker (id [0-9]\+)" + expect_log "Destroying Work worker (id [0-9]\+)" + expect_log "Build completed, shutting down worker pool..." +} + +function test_logs_are_deleted_on_server_restart() { + prepare_example_worker + cat >>BUILD <<'EOF' +[work( + name = "hello_world_%s" % idx, + worker = ":worker", + args = ["--write_uuid", "--write_counter"], +) for idx in range(10)] +EOF + + bazel build --worker_quit_after_build :hello_world_1 &> $TEST_log \ + || fail "build failed" + + expect_log "Created new ${WORKER_TYPE_LOG_STRING} Work worker (id [0-9]\+)" + + worker_log=$(egrep -o -- 'logging to .*/b(azel|laze)-workers/worker-[0-9]-Work.log' "$TEST_log" | sed 's/^logging to //') + + [ -e "$worker_log" ] \ + || fail "Worker log was not found" + + # Running a build after a server shutdown should trigger the removal of old worker log files. + bazel shutdown &> $TEST_log + bazel build &> $TEST_log + + [ ! -e "$worker_log" ] \ + || fail "Worker log was not deleted" +} + +function test_missing_execution_requirements_fallback_to_standalone() { + prepare_example_worker + cat >>BUILD <<'EOF' +work( + name = "hello_world", + worker = ":worker", + args = ["--write_uuid", "--write_counter"], +) +EOF + + sed -i.bak '/execution_requirements/d' work.bzl + rm -f work.bzl.bak + + bazel build --worker_quit_after_build :hello_world &> $TEST_log \ + || fail "build failed" + + expect_not_log "Created new ${WORKER_TYPE_LOG_STRING} Work worker (id [0-9]\+)" + expect_not_log "Destroying Work worker (id [0-9]\+)" + + # WorkerSpawnStrategy falls back to standalone strategy, so we still expect the output to be generated. + [ -e "$BINS/hello_world.out" ] \ + || fail "Worker did not produce output" +} + +function test_environment_is_clean() { + prepare_example_worker + cat >>BUILD <<'EOF' +work( + name = "hello_world", + worker = ":worker", + args = ["--print_env"], +) +EOF + + bazel shutdown &> $TEST_log \ + || fail "shutdown failed" + CAKE=LIE bazel build --worker_quit_after_build :hello_world &> $TEST_log \ + || fail "build failed" + + fgrep CAKE=LIE $BINS/hello_world.out \ + && fail "environment variable leaked into worker env" || true +} + +function test_workers_quit_on_clean() { + prepare_example_worker + cat >>BUILD < $TEST_log \ + || fail "build failed" + assert_equals "hello clean" "$(cat $BINS/hello_clean.out)" + expect_log "Created new ${WORKER_TYPE_LOG_STRING} Work worker (id [0-9]\+)" + + bazel clean &> $TEST_log \ + || fail "clean failed" + expect_log "Clean command is running, shutting down worker pool..." + expect_log "Destroying Work worker (id [0-9]\+)" +} + +function test_crashed_worker_causes_log_dump() { + prepare_example_worker + cat >>BUILD <<'EOF' +[work( + name = "hello_world_%s" % idx, + worker = ":worker", + worker_args = ["--poison_after=1", "--hard_poison"], + args = ["--write_uuid", "--write_counter"], +) for idx in range(10)] +EOF + + bazel build :hello_world_1 &> $TEST_log \ + || fail "build failed" + + bazel build :hello_world_2 &> $TEST_log \ + && fail "expected build to fail" || true + + expect_log "^---8<---8<--- Start of log, file at /" + expect_log "Worker process did not return a WorkResponse:" + expect_log "I'm a very poisoned worker and will just crash." + expect_log "^---8<---8<--- End of log ---8<---8<---" +} + +function test_multiple_target_without_delay() { + prepare_example_worker + cat >>BUILD < $TEST_log \ + || fail "build failed" + assert_equals "hello world 1" "$(cat $BINS/hello_world_1.out)" + assert_equals "hello world 2" "$(cat $BINS/hello_world_2.out)" + assert_equals "hello world 3" "$(cat $BINS/hello_world_3.out)" +} + +# We just need to test the build completion, no assertion is needed. +function test_multiple_target_with_delay() { + prepare_example_worker + cat >>BUILD < $TEST_log \ + || fail "build failed" +} +run_suite "Worker multiplexer integration tests"