Skip to content

Commit

Permalink
Remote: Only waits for background tasks from remote execution. (#14752)
Browse files Browse the repository at this point in the history
We added the block waiting behaviour after each command in remote module to wait for background uploads when introducing async upload. However, not all background uploads should be waited, e.g. uploads from BES module but with flag `--bes_upload_mode=fully_async`.

This PR updates remote module so that only uploads initiated by remote module are waited after the command. This also enable us to implement something like `--remote_upload_mode=fully_async` in the future.

Fixes #14620.

Closes #14634.

PiperOrigin-RevId: 424296966
(cherry picked from commit 3836ad0)

Co-authored-by: Chi Wang <[email protected]>
  • Loading branch information
brentleyjones and coeuvre authored Feb 9, 2022
1 parent 60f757c commit a5f2813
Showing 1 changed file with 10 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@
import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;

Expand All @@ -164,6 +165,7 @@ public class RemoteExecutionService {
@Nullable private final Path captureCorruptedOutputsDir;
private final Cache<Object, MerkleTree> merkleTreeCache;
private final Set<String> reportedErrors = new HashSet<>();
private final Phaser backgroundTaskPhaser = new Phaser(1);

private final Scheduler scheduler;

Expand Down Expand Up @@ -1160,13 +1162,18 @@ public void uploadOutputs(RemoteAction action, SpawnResult spawnResult)
.subscribe(
new SingleObserver<ActionResult>() {
@Override
public void onSubscribe(@NonNull Disposable d) {}
public void onSubscribe(@NonNull Disposable d) {
backgroundTaskPhaser.register();
}

@Override
public void onSuccess(@NonNull ActionResult actionResult) {}
public void onSuccess(@NonNull ActionResult actionResult) {
backgroundTaskPhaser.arriveAndDeregister();
}

@Override
public void onError(@NonNull Throwable e) {
backgroundTaskPhaser.arriveAndDeregister();
reportUploadError(e);
}
});
Expand Down Expand Up @@ -1300,7 +1307,7 @@ public void shutdown() {
remoteCache.release();

try {
remoteCache.awaitTermination();
backgroundTaskPhaser.awaitAdvanceInterruptibly(backgroundTaskPhaser.arrive());
} catch (InterruptedException e) {
buildInterrupted.set(true);
remoteCache.shutdownNow();
Expand Down

0 comments on commit a5f2813

Please sign in to comment.