Skip to content

Commit

Permalink
Remote: Fix performance regression in "upload missing inputs".
Browse files Browse the repository at this point in the history
Also add more tests.

Fixes #15872.
  • Loading branch information
coeuvre committed Jul 22, 2022
1 parent 8e03d82 commit a32df05
Show file tree
Hide file tree
Showing 4 changed files with 363 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
// limitations under the License.
package com.google.devtools.build.lib.remote;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.immediateFuture;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.devtools.build.lib.remote.util.RxFutures.toCompletable;
import static com.google.devtools.build.lib.remote.util.RxFutures.toSingle;
Expand All @@ -25,9 +24,12 @@
import build.bazel.remote.execution.v2.Digest;
import build.bazel.remote.execution.v2.Directory;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.profiler.Profiler;
import com.google.devtools.build.lib.profiler.SilentCloseable;
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.merkletree.MerkleTree;
Expand All @@ -36,16 +38,21 @@
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxUtils.TransferResult;
import com.google.protobuf.Message;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableObserver;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.Maybe;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.Single;
import io.reactivex.rxjava3.core.SingleEmitter;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.subjects.AsyncSubject;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/** A {@link RemoteCache} with additional functionality needed for remote execution. */
public class RemoteExecutionCache extends RemoteCache {
Expand Down Expand Up @@ -85,13 +92,10 @@ public void ensureInputsPresent(
return;
}

MissingDigestFinder missingDigestFinder = new MissingDigestFinder(context, allDigests.size());
Flowable<TransferResult> uploads =
Flowable.fromIterable(allDigests)
.flatMapSingle(
digest ->
uploadBlobIfMissing(
context, merkleTree, additionalInputs, force, missingDigestFinder, digest));
createUploadTasks(context, merkleTree, additionalInputs, allDigests, force)
.flatMap(uploadTasks -> findMissingBlobs(context, uploadTasks))
.flatMapPublisher(this::waitForUploadTasks);

try {
mergeBulkTransfer(uploads).blockingAwait();
Expand All @@ -105,36 +109,6 @@ public void ensureInputsPresent(
}
}

private Single<TransferResult> uploadBlobIfMissing(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
boolean force,
MissingDigestFinder missingDigestFinder,
Digest digest) {
Completable upload =
casUploadCache.execute(
digest,
Completable.defer(
() ->
// Only reach here if the digest is missing and is not being uploaded.
missingDigestFinder
.registerAndCount(digest)
.flatMapCompletable(
missingDigests -> {
if (missingDigests.contains(digest)) {
return toCompletable(
() -> uploadBlob(context, digest, merkleTree, additionalInputs),
directExecutor());
} else {
return Completable.complete();
}
})),
/* onIgnored= */ missingDigestFinder::count,
force);
return toTransferResult(upload);
}

private ListenableFuture<Void> uploadBlob(
RemoteActionExecutionContext context,
Digest digest,
Expand Down Expand Up @@ -165,92 +139,136 @@ private ListenableFuture<Void> uploadBlob(
digest)));
}

/**
* A missing digest finder that initiates the request when the internal counter reaches an
* expected count.
*/
class MissingDigestFinder {
private final int expectedCount;

private final AsyncSubject<ImmutableSet<Digest>> digestsSubject;
private final Single<ImmutableSet<Digest>> resultSingle;

@GuardedBy("this")
private final Set<Digest> digests;
static class UploadTask {
Digest digest;
SingleEmitter<Boolean> continuation;
Completable completion;
}

@GuardedBy("this")
private int currentCount = 0;
private Single<List<UploadTask>> createUploadTasks(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Iterable<Digest> allDigests,
boolean force) {
return Single.using(
() -> Profiler.instance().profile("collect digests"),
ignored ->
Flowable.fromIterable(allDigests)
.flatMapMaybe(
digest ->
maybeCreateUploadTask(context, merkleTree, additionalInputs, digest, force))
.collect(Collectors.toList()),
SilentCloseable::close);
}

MissingDigestFinder(RemoteActionExecutionContext context, int expectedCount) {
checkArgument(expectedCount > 0, "expectedCount should be greater than 0");
this.expectedCount = expectedCount;
this.digestsSubject = AsyncSubject.create();
this.digests = new HashSet<>();
private Maybe<UploadTask> maybeCreateUploadTask(
RemoteActionExecutionContext context,
MerkleTree merkleTree,
Map<Digest, Message> additionalInputs,
Digest digest,
boolean force) {
return Maybe.create(
emitter -> {
AtomicReference<Disposable> disposable = new AtomicReference<>();
AsyncSubject<Void> completion = AsyncSubject.create();
UploadTask uploadTask = new UploadTask();
uploadTask.digest = digest;
uploadTask.completion =
Completable.fromObservable(
completion.doOnDispose(
() -> {
Disposable d = disposable.getAndSet(null);
if (d != null) {
d.dispose();
}
}));
Completable upload =
casUploadCache.execute(
digest,
Single.<Boolean>create(
continuation -> {
uploadTask.continuation = continuation;
emitter.onSuccess(uploadTask);
})
.flatMapCompletable(
shouldUpload -> {
if (!shouldUpload) {
return Completable.complete();
}

AtomicBoolean findMissingDigestsCalled = new AtomicBoolean(false);
this.resultSingle =
Single.fromObservable(
digestsSubject
.flatMapSingle(
digests -> {
boolean wasCalled = findMissingDigestsCalled.getAndSet(true);
// Make sure we don't have re-subscription caused by refCount() below.
checkState(!wasCalled, "FindMissingDigests is called more than once");
return toSingle(
() -> findMissingDigests(context, digests), directExecutor());
})
// Use replay here because we could have a race condition that downstream hasn't
// been added to the subscription list (to receive the upstream result) while
// upstream is completed.
.replay(1)
.refCount());
}
return toCompletable(
() ->
uploadBlob(
context, uploadTask.digest, merkleTree, additionalInputs),
directExecutor());
}),
/* onAlreadyRunning= */ () -> emitter.onSuccess(uploadTask),
/* onAlreadyFinished= */ emitter::onComplete,
force);
upload.subscribe(
new CompletableObserver() {
@Override
public void onSubscribe(@NonNull Disposable d) {
disposable.set(d);
}

/**
* Register the {@code digest} and increase the counter.
*
* <p>Returned Single cannot be subscribed more than once.
*
* @return Single that emits the result of the {@code FindMissingDigest} request.
*/
Single<ImmutableSet<Digest>> registerAndCount(Digest digest) {
AtomicBoolean subscribed = new AtomicBoolean(false);
// count() will potentially trigger the findMissingDigests call. Adding and counting before
// returning the Single could introduce a race that the result of findMissingDigests is
// available but the consumer doesn't get it because it hasn't subscribed the returned
// Single. In this case, it subscribes after upstream is completed resulting a re-run of
// findMissingDigests (due to refCount()).
//
// Calling count() inside doOnSubscribe to ensure the consumer already subscribed to the
// returned Single to avoid a re-execution of findMissingDigests.
return resultSingle.doOnSubscribe(
d -> {
boolean wasSubscribed = subscribed.getAndSet(true);
checkState(!wasSubscribed, "Single is subscribed more than once");
synchronized (this) {
digests.add(digest);
}
count();
});
}
@Override
public void onComplete() {
completion.onComplete();
}

/** Increase the counter. */
void count() {
ImmutableSet<Digest> digestsResult = null;
@Override
public void onError(@NonNull Throwable e) {
completion.onError(e);
}
});
});
}

synchronized (this) {
if (currentCount < expectedCount) {
currentCount++;
if (currentCount == expectedCount) {
digestsResult = ImmutableSet.copyOf(digests);
}
}
}
private Single<List<UploadTask>> findMissingBlobs(
RemoteActionExecutionContext context, List<UploadTask> uploadTasks) {
return Single.using(
() -> Profiler.instance().profile("findMissingDigests"),
ignored ->
Single.fromObservable(
Observable.fromSingle(
toSingle(
() -> {
ImmutableList<Digest> digestsToQuery =
uploadTasks.stream()
.filter(uploadTask -> uploadTask.continuation != null)
.map(uploadTask -> uploadTask.digest)
.collect(ImmutableList.toImmutableList());
if (digestsToQuery.isEmpty()) {
return immediateFuture(ImmutableSet.of());
}
return findMissingDigests(context, digestsToQuery);
},
directExecutor())
.map(
missingDigests -> {
for (UploadTask uploadTask : uploadTasks) {
if (uploadTask.continuation != null) {
uploadTask.continuation.onSuccess(
missingDigests.contains(uploadTask.digest));
}
}
return uploadTasks;
}))
// Use AsyncSubject so that if downstream is disposed, the
// findMissingDigests call is not cancelled (because it may be needed by other
// threads).
.subscribeWith(AsyncSubject.create())),
SilentCloseable::close);
}

if (digestsResult != null) {
digestsSubject.onNext(digestsResult);
digestsSubject.onComplete();
}
}
private Flowable<TransferResult> waitForUploadTasks(List<UploadTask> uploadTasks) {
return Flowable.using(
() -> Profiler.instance().profile("upload"),
ignored ->
Flowable.fromIterable(uploadTasks)
.flatMapSingle(uploadTask -> toTransferResult(uploadTask.completion)),
SilentCloseable::close);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,10 @@ public boolean isDisposed() {
/**
* Executes a task.
*
* @see #execute(Object, Single, Action, boolean).
* @see #execute(Object, Single, Action, Action, boolean).
*/
public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
return execute(key, task, () -> {}, force);
return execute(key, task, () -> {}, () -> {}, force);
}

/**
Expand All @@ -270,12 +270,17 @@ public Single<ValueT> execute(KeyT key, Single<ValueT> task, boolean force) {
* <p>If the cache is already shutdown, a {@link CancellationException} will be emitted.
*
* @param key identifies the task.
* @param onIgnored callback called when provided task is ignored.
* @param onAlreadyFinished callback called when provided task is already finished.
* @param force re-execute a finished task if set to {@code true}.
* @return a {@link Single} which turns to completed once the task is finished or propagates the
* error if any.
*/
public Single<ValueT> execute(KeyT key, Single<ValueT> task, Action onIgnored, boolean force) {
public Single<ValueT> execute(
KeyT key,
Single<ValueT> task,
Action onAlreadyRunning,
Action onAlreadyFinished,
boolean force) {
return Single.create(
emitter -> {
synchronized (lock) {
Expand All @@ -285,7 +290,7 @@ public Single<ValueT> execute(KeyT key, Single<ValueT> task, Action onIgnored, b
}

if (!force && finished.containsKey(key)) {
onIgnored.run();
onAlreadyFinished.run();
emitter.onSuccess(finished.get(key));
return;
}
Expand All @@ -294,7 +299,7 @@ public Single<ValueT> execute(KeyT key, Single<ValueT> task, Action onIgnored, b

Execution execution = inProgress.get(key);
if (execution != null) {
onIgnored.run();
onAlreadyRunning.run();
} else {
execution = new Execution(key, task);
inProgress.put(key, execution);
Expand Down Expand Up @@ -445,13 +450,23 @@ public Completable executeIfNot(KeyT key, Completable task) {

/** Same as {@link AsyncTaskCache#execute} but operates on {@link Completable}. */
public Completable execute(KeyT key, Completable task, boolean force) {
return execute(key, task, () -> {}, force);
return execute(key, task, () -> {}, () -> {}, force);
}

/** Same as {@link AsyncTaskCache#execute} but operates on {@link Completable}. */
public Completable execute(KeyT key, Completable task, Action onIgnored, boolean force) {
public Completable execute(
KeyT key,
Completable task,
Action onAlreadyRunning,
Action onAlreadyFinished,
boolean force) {
return Completable.fromSingle(
cache.execute(key, task.toSingleDefault(Optional.empty()), onIgnored, force));
cache.execute(
key,
task.toSingleDefault(Optional.empty()),
onAlreadyRunning,
onAlreadyFinished,
force));
}

/** Returns a set of keys for tasks which is finished. */
Expand Down
Loading

0 comments on commit a32df05

Please sign in to comment.