Skip to content

Commit

Permalink
Automated rollback of commit 915fb3e.
Browse files Browse the repository at this point in the history
*** Reason for rollback ***

Might cause build to hang forever.

b/320630578

*** Original change description ***

Optimize prefetchInputs.

Use a pre-allocated array to hold the intermediate transfers to avoid allocations. Replace some of RxJava code with Futures to avoid RxJava overheads.

This improves the perfromance of prefetchInputs on a large set of inputs from ~400ms to ~16ms.

Fixes #20555.

Closes #20557.

PiperOrigin-RevId: 599135847
Change-Id: Idae6a1c57e634d16091e31e097b16ca97a67e62d
  • Loading branch information
coeuvre authored and copybara-github committed Jan 17, 2024
1 parent ee7762e commit 162cacd
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,17 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toListenableFuture;
import static com.google.devtools.build.lib.remote.util.RxUtils.mergeBulkTransfer;
import static com.google.devtools.build.lib.remote.util.Utils.getFromFuture;
import static com.google.devtools.build.lib.remote.util.Utils.mergeBulkTransfer;

import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.Action;
import com.google.devtools.build.lib.actions.ActionExecutionMetadata;
Expand All @@ -47,13 +44,17 @@
import com.google.devtools.build.lib.profiler.ProfilerTask;
import com.google.devtools.build.lib.remote.common.CacheNotFoundException;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.RxUtils;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.devtools.build.lib.remote.util.TempPathGenerator;
import com.google.devtools.build.lib.vfs.FileSymlinkLoopException;
import com.google.devtools.build.lib.vfs.FileSystemUtils;
import com.google.devtools.build.lib.vfs.OutputPermissions;
import com.google.devtools.build.lib.vfs.Path;
import com.google.devtools.build.lib.vfs.PathFragment;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Single;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -282,10 +283,6 @@ public ListenableFuture<Void> prefetchFiles(
files.add(input);
}

if (files.isEmpty()) {
return immediateVoidFuture();
}

// Collect the set of directories whose output permissions must be set at the end of this call.
// This responsibility cannot lie with the downloading of an individual file, because multiple
// files may be concurrently downloaded into the same directory within a single call to
Expand All @@ -294,38 +291,30 @@ public ListenableFuture<Void> prefetchFiles(
// it must still synchronize on the output permissions having been set.
Set<Path> dirsWithOutputPermissions = Sets.newConcurrentHashSet();

// Using plain futures to avoid RxJava overheads.
List<ListenableFuture<Void>> transfers = new ArrayList<>(files.size());
try (var s = Profiler.instance().profile("compose prefetches")) {
for (var file : files) {
transfers.add(
prefetchFile(action, dirsWithOutputPermissions, metadataSupplier, file, priority));
}
}

ListenableFuture<Void> mergedTransfer;
try (var s = Profiler.instance().profile("mergeBulkTransfer")) {
mergedTransfer = mergeBulkTransfer(transfers);
}

return Futures.transformAsync(
mergedTransfer,
unused -> {
try {
// Set output permissions on tree artifact subdirectories, matching the behavior of
// SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
for (Path dir : dirsWithOutputPermissions) {
directoryTracker.setOutputPermissions(dir);
}
} catch (IOException e) {
return immediateFailedFuture(e);
}
return immediateVoidFuture();
},
directExecutor());
Completable prefetch =
mergeBulkTransfer(
Flowable.fromIterable(files)
.flatMapSingle(
input ->
prefetchFile(
action,
dirsWithOutputPermissions,
metadataSupplier,
input,
priority)))
.doOnComplete(
// Set output permissions on tree artifact subdirectories, matching the behavior of
// SkyframeActionExecutor#checkOutputs for artifacts produced by local actions.
() -> {
for (Path dir : dirsWithOutputPermissions) {
directoryTracker.setOutputPermissions(dir);
}
});

return toListenableFuture(prefetch);
}

private ListenableFuture<Void> prefetchFile(
private Single<TransferResult> prefetchFile(
ActionExecutionMetadata action,
Set<Path> dirsWithOutputPermissions,
MetadataSupplier metadataSupplier,
Expand All @@ -334,14 +323,14 @@ private ListenableFuture<Void> prefetchFile(
try {
if (input instanceof VirtualActionInput) {
prefetchVirtualActionInput((VirtualActionInput) input);
return immediateVoidFuture();
return Single.just(TransferResult.ok());
}

PathFragment execPath = input.getExecPath();

FileArtifactValue metadata = metadataSupplier.getMetadata(input);
if (metadata == null || !canDownloadFile(execRoot.getRelative(execPath), metadata)) {
return immediateVoidFuture();
return Single.just(TransferResult.ok());
}

@Nullable Symlink symlink = maybeGetSymlink(action, input, metadata, metadataSupplier);
Expand All @@ -368,9 +357,11 @@ private ListenableFuture<Void> prefetchFile(
result = result.andThen(plantSymlink(symlink));
}

return toListenableFuture(result);
} catch (IOException | InterruptedException e) {
return immediateFailedFuture(e);
return RxUtils.toTransferResult(result);
} catch (IOException e) {
return Single.just(TransferResult.error(e));
} catch (InterruptedException e) {
return Single.just(TransferResult.interrupted());
}
}

Expand Down
68 changes: 10 additions & 58 deletions src/main/java/com/google/devtools/build/lib/remote/util/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,8 @@
// limitations under the License.
package com.google.devtools.build.lib.remote.util;

import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Throwables.getStackTraceAsString;
import static com.google.common.util.concurrent.Futures.immediateFailedFuture;
import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.stream.Collectors.joining;

import build.bazel.remote.execution.v2.Action;
Expand All @@ -33,6 +29,7 @@
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.devtools.build.lib.actions.ActionInput;
import com.google.devtools.build.lib.actions.ExecutionRequirements;
import com.google.devtools.build.lib.actions.Spawn;
Expand Down Expand Up @@ -420,11 +417,11 @@ public static ListenableFuture<ActionResult> downloadAsActionResult(
try {
return Futures.immediateFuture(ActionResult.parseFrom(data.toByteArray()));
} catch (InvalidProtocolBufferException e) {
return immediateFailedFuture(e);
return Futures.immediateFailedFuture(e);
}
},
directExecutor())
.catching(CacheNotFoundException.class, (e) -> null, directExecutor());
MoreExecutors.directExecutor())
.catching(CacheNotFoundException.class, (e) -> null, MoreExecutors.directExecutor());
}

public static void verifyBlobContents(Digest expected, Digest actual) throws IOException {
Expand Down Expand Up @@ -486,15 +483,15 @@ public ByteString getContents() {
*/
public static <V> ListenableFuture<V> refreshIfUnauthenticatedAsync(
AsyncCallable<V> call, CallCredentialsProvider callCredentialsProvider) {
checkNotNull(call);
checkNotNull(callCredentialsProvider);
Preconditions.checkNotNull(call);
Preconditions.checkNotNull(callCredentialsProvider);

try {
return Futures.catchingAsync(
call.call(),
Throwable.class,
(e) -> refreshIfUnauthenticatedAsyncOnException(e, call, callCredentialsProvider),
directExecutor());
MoreExecutors.directExecutor());
} catch (Throwable t) {
return refreshIfUnauthenticatedAsyncOnException(t, call, callCredentialsProvider);
}
Expand All @@ -514,15 +511,15 @@ private static <V> ListenableFuture<V> refreshIfUnauthenticatedAsyncOnException(
}
}

return immediateFailedFuture(t);
return Futures.immediateFailedFuture(t);
}

/** Same as {@link #refreshIfUnauthenticatedAsync} but calling a synchronous code block. */
public static <V> V refreshIfUnauthenticated(
Callable<V> call, CallCredentialsProvider callCredentialsProvider)
throws IOException, InterruptedException {
checkNotNull(call);
checkNotNull(callCredentialsProvider);
Preconditions.checkNotNull(call);
Preconditions.checkNotNull(callCredentialsProvider);

try {
return call.call();
Expand Down Expand Up @@ -621,49 +618,4 @@ public static void waitForBulkTransfer(
throw bulkTransferException;
}
}

public static ListenableFuture<Void> mergeBulkTransfer(
Iterable<ListenableFuture<Void>> transfers) {
return Futures.whenAllComplete(transfers)
.callAsync(
() -> {
BulkTransferException bulkTransferException = null;

for (var transfer : transfers) {
IOException error = null;
try {
transfer.get();
} catch (CancellationException e) {
return immediateFailedFuture(new InterruptedException());
} catch (InterruptedException e) {
return immediateFailedFuture(e);
} catch (ExecutionException e) {
var cause = e.getCause();
if (cause instanceof InterruptedException) {
return immediateFailedFuture(cause);
} else if (cause instanceof IOException) {
error = (IOException) cause;
} else {
error = new IOException(cause);
}
}

if (error == null) {
continue;
}

if (bulkTransferException == null) {
bulkTransferException = new BulkTransferException();
}
bulkTransferException.add(error);
}

if (bulkTransferException != null) {
return immediateFailedFuture(bulkTransferException);
}

return immediateVoidFuture();
},
directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -759,7 +760,7 @@ public void prefetchFile_interruptingMetadataSupplier_interruptsDownload() throw
prefetcher.prefetchFiles(
action, ImmutableList.of(a1), interruptedMetadataSupplier, Priority.MEDIUM);

assertThrows(InterruptedException.class, () -> getFromFuture(future));
assertThrows(CancellationException.class, future::get);
}

@Test
Expand Down

0 comments on commit 162cacd

Please sign in to comment.