Skip to content

Commit

Permalink
Add an async interface to SpawnAction
Browse files Browse the repository at this point in the history
Unfortunately, we can't use plain ListenableFuture until all the
implementations have to be updated. Therefore, introduce an intermediate
FutureSpawn abstraction.

Also update the strategy classes to call the new interface, as well as a
bunch of tests.

Only a single Google-internal implementation of SpawnRunner implements
the new interface as of now. Subsequent changes will change the local
and sandboxed spawn runners, as well as the remote spawn runner.

This is part of a larger effort to make action execution asynchronous.
We believe that this will have significant benefits especially for
remote execution: we currently block Skyframe threads waiting for remote
execution to complete, which means we need to increase --jobs to a large
value, which is a significant source of local thread contention.
Additionally, this is at least contributing to the problem of slow
action discovery - the Bazel CLI increases the final action count during
execution as its discovering additional actions. When this effort
completes, actions no longer block Skyframe threads, and those are
therefore free to continue action discovery.

PiperOrigin-RevId: 222606352
  • Loading branch information
ulfjack authored and Copybara-Service committed Nov 23, 2018
1 parent 21827e2 commit 57f2f58
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 31 deletions.
128 changes: 128 additions & 0 deletions src/main/java/com/google/devtools/build/lib/actions/FutureSpawn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2018 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.actions;

import com.google.common.base.Throwables;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;

/**
* An equivalent of <code>ListenableFuture&lt;SpawnResult&gt;</code>.
*
* <p>This is a temporary wrapper for ListenableFuture to be used during the migration of all {@link
* com.google.devtools.build.lib.exec.SpawnRunner} implementations to async execution. The reason
* for moving to async execution is that it avoids blocking Skyframe threads for expensive
* operations (waiting for subprocesses, possibly running remotely). Especially for remote
* execution, this significantly improves scalability while reducing local thread contention due to
* the high number of threads currently used to drive it.
*
* <p>We cannot use ListenableFuture as is as long as not all implementations have been migrated to
* async execution. The reason for this is that we need to control which code runs in which thread
* pool, and Skyframe currently does not expose the underlying thread pool; intentionally so - all
* Skyframe operations have to run with a Skyframe environment so that Skyframe can track and cache
* it.
*
* <p>As long as some implementations still block on evaluating the future, this blocking must not
* happen in the implementations' thread pool. Especially for remote execution, the thread pool is
* also responsible for network operations, and blocking threads would negate the desired benefits.
*
* <p>Once all implementations are async, we can use Futures.transform - stealing a little bit of
* CPU from another thread pool is ok as long as it's non-blocking.
*/
public class FutureSpawn {
public static FutureSpawn immediate(SpawnResult f) {
return new FutureSpawn(Futures.immediateFuture(f));
}

private final ListenableFuture<? extends SpawnResult> future;
private final Wrapper wrapper;

public FutureSpawn(ListenableFuture<? extends SpawnResult> future) {
this(future, (c) -> c.get());
}

private FutureSpawn(ListenableFuture<? extends SpawnResult> future, Wrapper wrapper) {
this.future = future;
this.wrapper = wrapper;
}

/**
* Returns the underlying future. This is only intended to be used for getting notified about
* completion, and should not be used to access the {@link SpawnResult} directly, which should be
* obtained from {@link #get} instead.
*/
public ListenableFuture<? extends SpawnResult> getFuture() {
return future;
}

/**
* Blocks the current thread until completion of the underlying future, and calls the wrappers set
* on this future in the order in which they were set.
*/
public SpawnResult get() throws ExecException, InterruptedException {
return wrapper.apply(
() -> {
try {
return future.get();
} catch (ExecutionException e) {
Throwables.propagateIfPossible(
e.getCause(), ExecException.class, InterruptedException.class);
throw new RuntimeException(e);
} catch (CancellationException e) {
throw new InterruptedException(e.getMessage());
} catch (InterruptedException e) {
future.cancel(/*mayInterruptIfRunning*/ true);
throw e;
}
});
}

/**
* Wraps the evaluation within this future with the given wrapper. This is similar to {@link
* com.google.common.util.concurrent.Futures#lazyTransform} in that the wrapper is executed every
* time get() is called. However, it ensures that the wrapper code isexecuted in the Skyframe
* thread pool.
*/
public FutureSpawn wrap(Wrapper wrapper) {
Wrapper previousWrapper = this.wrapper;
// Closure chaining magic: we create a new FutureSpawn with the same ListenableFuture, but with
// a wrapper that first calls the previous wrapper and then the new wrapper.
return new FutureSpawn(future, (c) -> wrapper.apply(() -> previousWrapper.apply(c)));
}

/**
* A {@link java.util.concurrent.Callable} equivalent that declares certain exceptions we need for
* spawn runners.
*/
@FunctionalInterface
public interface Callable<T> {
T get() throws ExecException, InterruptedException;
}

/**
* A {@link java.util.function.Function} equivalent that declares certain exceptions we need for
* spawn runners.
*/
@FunctionalInterface
public interface Wrapper {
/**
* This is passed the future or a wrapped future. An implementation is expected to do any
* desired preprocessing, then call the future, then perform any desired post-processing. Note
* that this scheme allows catching exceptions from lower layers.
*/
SpawnResult apply(Callable<SpawnResult> future) throws ExecException, InterruptedException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public List<SpawnResult> exec(
spawnResult = Preconditions.checkNotNull(cacheHandle.getResult());
} else {
// Actual execution.
spawnResult = spawnRunner.exec(spawn, context);
spawnResult = spawnRunner.execAsync(spawn, context).get();
if (cacheHandle.willStore()) {
cacheHandle.store(spawnResult);
}
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/google/devtools/build/lib/exec/SpawnRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.google.devtools.build.lib.actions.Artifact.ArtifactExpander;
import com.google.devtools.build.lib.actions.ArtifactPathResolver;
import com.google.devtools.build.lib.actions.ExecException;
import com.google.devtools.build.lib.actions.FutureSpawn;
import com.google.devtools.build.lib.actions.MetadataProvider;
import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.SpawnResult;
Expand Down Expand Up @@ -200,6 +201,23 @@ SortedMap<PathFragment, ActionInput> getInputMapping(boolean expandTreeArtifacts
void report(ProgressStatus state, String name);
}

/**
* Run the given spawn asynchronously. The default implementation is synchronous for migration.
*
* @param spawn the spawn to run
* @param context the spawn execution context
* @return the result from running the spawn
* @throws InterruptedException if the calling thread was interrupted, or if the runner could not
* lock the output files (see {@link SpawnExecutionContext#lockOutputFiles()})
* @throws IOException if something went wrong reading or writing to the local file system
* @throws ExecException if the request is malformed
*/
default FutureSpawn execAsync(Spawn spawn, SpawnExecutionContext context)
throws InterruptedException, IOException, ExecException {
// TODO(ulfjack): Remove this default implementation. [exec-async]
return FutureSpawn.immediate(exec(spawn, context));
}

/**
* Run the given spawn.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.devtools.build.lib.actions.ActionExecutionContext;
import com.google.devtools.build.lib.actions.Artifact;
import com.google.devtools.build.lib.actions.ArtifactRoot;
import com.google.devtools.build.lib.actions.FutureSpawn;
import com.google.devtools.build.lib.actions.MetadataProvider;
import com.google.devtools.build.lib.actions.Spawn;
import com.google.devtools.build.lib.actions.SpawnResult;
Expand Down Expand Up @@ -86,16 +87,16 @@ public void testZeroExit() throws Exception {
when(actionExecutionContext.getExecRoot()).thenReturn(execRoot);
SpawnResult spawnResult =
new SpawnResult.Builder().setStatus(Status.SUCCESS).setRunnerName("test").build();
when(spawnRunner.exec(any(Spawn.class), any(SpawnExecutionContext.class)))
.thenReturn(spawnResult);
when(spawnRunner.execAsync(any(Spawn.class), any(SpawnExecutionContext.class)))
.thenReturn(FutureSpawn.immediate(spawnResult));

List<SpawnResult> spawnResults =
new TestedSpawnStrategy(execRoot, spawnRunner).exec(SIMPLE_SPAWN, actionExecutionContext);

assertThat(spawnResults).containsExactly(spawnResult);

// Must only be called exactly once.
verify(spawnRunner).exec(any(Spawn.class), any(SpawnExecutionContext.class));
verify(spawnRunner).execAsync(any(Spawn.class), any(SpawnExecutionContext.class));
}

@Test
Expand All @@ -108,7 +109,8 @@ public void testNonZeroExit() throws Exception {
.setExitCode(1)
.setRunnerName("test")
.build();
when(spawnRunner.exec(any(Spawn.class), any(SpawnExecutionContext.class))).thenReturn(result);
when(spawnRunner.execAsync(any(Spawn.class), any(SpawnExecutionContext.class)))
.thenReturn(FutureSpawn.immediate(result));

try {
// Ignoring the List<SpawnResult> return value.
Expand All @@ -118,7 +120,7 @@ public void testNonZeroExit() throws Exception {
assertThat(e.getSpawnResult()).isSameAs(result);
}
// Must only be called exactly once.
verify(spawnRunner).exec(any(Spawn.class), any(SpawnExecutionContext.class));
verify(spawnRunner).execAsync(any(Spawn.class), any(SpawnExecutionContext.class));
}

@Test
Expand All @@ -134,7 +136,7 @@ public void testCacheHit() throws Exception {
List<SpawnResult> spawnResults =
new TestedSpawnStrategy(execRoot, spawnRunner).exec(SIMPLE_SPAWN, actionExecutionContext);
assertThat(spawnResults).containsExactly(spawnResult);
verify(spawnRunner, never()).exec(any(Spawn.class), any(SpawnExecutionContext.class));
verify(spawnRunner, never()).execAsync(any(Spawn.class), any(SpawnExecutionContext.class));
}

@SuppressWarnings("unchecked")
Expand All @@ -150,16 +152,16 @@ public void testCacheMiss() throws Exception {
when(actionExecutionContext.getExecRoot()).thenReturn(execRoot);
SpawnResult spawnResult =
new SpawnResult.Builder().setStatus(Status.SUCCESS).setRunnerName("test").build();
when(spawnRunner.exec(any(Spawn.class), any(SpawnExecutionContext.class)))
.thenReturn(spawnResult);
when(spawnRunner.execAsync(any(Spawn.class), any(SpawnExecutionContext.class)))
.thenReturn(FutureSpawn.immediate(spawnResult));

List<SpawnResult> spawnResults =
new TestedSpawnStrategy(execRoot, spawnRunner).exec(SIMPLE_SPAWN, actionExecutionContext);

assertThat(spawnResults).containsExactly(spawnResult);

// Must only be called exactly once.
verify(spawnRunner).exec(any(Spawn.class), any(SpawnExecutionContext.class));
verify(spawnRunner).execAsync(any(Spawn.class), any(SpawnExecutionContext.class));
verify(entry).store(eq(spawnResult));
}

Expand All @@ -180,7 +182,8 @@ public void testCacheMissWithNonZeroExit() throws Exception {
.setExitCode(1)
.setRunnerName("test")
.build();
when(spawnRunner.exec(any(Spawn.class), any(SpawnExecutionContext.class))).thenReturn(result);
when(spawnRunner.execAsync(any(Spawn.class), any(SpawnExecutionContext.class)))
.thenReturn(FutureSpawn.immediate(result));

try {
// Ignoring the List<SpawnResult> return value.
Expand All @@ -190,7 +193,7 @@ public void testCacheMissWithNonZeroExit() throws Exception {
assertThat(e.getSpawnResult()).isSameAs(result);
}
// Must only be called exactly once.
verify(spawnRunner).exec(any(Spawn.class), any(SpawnExecutionContext.class));
verify(spawnRunner).execAsync(any(Spawn.class), any(SpawnExecutionContext.class));
verify(entry).store(eq(result));
}

Expand All @@ -200,13 +203,14 @@ public void testLogSpawn() throws Exception {
when(actionExecutionContext.getExecRoot()).thenReturn(execRoot);
when(actionExecutionContext.getContext(eq(SpawnLogContext.class)))
.thenReturn(new SpawnLogContext(execRoot, messageOutput));
when(spawnRunner.exec(any(Spawn.class), any(SpawnExecutionContext.class)))
when(spawnRunner.execAsync(any(Spawn.class), any(SpawnExecutionContext.class)))
.thenReturn(
new SpawnResult.Builder()
.setStatus(Status.NON_ZERO_EXIT)
.setExitCode(23)
.setRunnerName("runner")
.build());
FutureSpawn.immediate(
new SpawnResult.Builder()
.setStatus(Status.NON_ZERO_EXIT)
.setExitCode(23)
.setRunnerName("runner")
.build()));
when(actionExecutionContext.getMetadataProvider()).thenReturn(mock(MetadataProvider.class));

Artifact input = new Artifact(scratch.file("/execroot/foo", "1"), rootDir);
Expand Down
Loading

0 comments on commit 57f2f58

Please sign in to comment.