Skip to content

Commit

Permalink
Delete CompletableFuture.
Browse files Browse the repository at this point in the history
I noticed that CompletableFuture became more prevalant in a226eed. However, I find it's generally simpler to use plain-old ListenableFuture.addListener to propagate cancellations.

Closes #15469.

PiperOrigin-RevId: 449461782
  • Loading branch information
benjaminp authored and copybara-github committed May 18, 2022
1 parent ee3637c commit 44fef49
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@

package com.google.devtools.build.lib.remote;

import static com.google.bytestream.ByteStreamGrpc.getReadMethod;
import static com.google.common.base.Strings.isNullOrEmpty;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;

import build.bazel.remote.execution.v2.ActionCacheGrpc;
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheFutureStub;
Expand All @@ -43,6 +41,7 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
Expand All @@ -51,7 +50,6 @@
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.CompletableFuture;
import com.google.devtools.build.lib.remote.util.DigestOutputStream;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
Expand All @@ -60,7 +58,7 @@
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -366,84 +364,86 @@ private ListenableFuture<Long> requestRead(
Channel channel) {
String resourceName =
getResourceName(options.remoteInstanceName, digest, options.cacheCompression);
CompletableFuture<Long> future = CompletableFuture.create();
SettableFuture<Long> future = SettableFuture.create();
OutputStream out;
try {
out = options.cacheCompression ? new ZstdDecompressingOutputStream(rawOut) : rawOut;
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
ByteStreamStub stub = bsAsyncStub(context, channel);
ClientCall<ReadRequest, ReadResponse> clientCall =
stub.getChannel().newCall(getReadMethod(), stub.getCallOptions());
future.setCancelCallback(() -> clientCall.cancel("Cancelled", /* cause= */ null));
asyncServerStreamingCall(
clientCall,
ReadRequest.newBuilder()
.setResourceName(resourceName)
.setReadOffset(rawOut.getCount())
.build(),
new StreamObserver<ReadResponse>() {

@Override
public void onNext(ReadResponse readResponse) {
ByteString data = readResponse.getData();
try {
data.writeTo(out);
} catch (IOException e) {
// Cancel the call.
throw new RuntimeException(e);
}
// reset the stall backoff because we've made progress or been kept alive
progressiveBackoff.reset();
}

@Override
public void onError(Throwable t) {
if (rawOut.getCount() == digest.getSizeBytes()) {
// If the file was fully downloaded, it doesn't matter if there was an error at
// the end of the stream.
logger.atInfo().withCause(t).log("ignoring error because file was fully received");
onCompleted();
return;
}
releaseOut();
Status status = Status.fromThrowable(t);
if (status.getCode() == Status.Code.NOT_FOUND) {
future.setException(new CacheNotFoundException(digest));
} else {
future.setException(t);
}
}

@Override
public void onCompleted() {
try {
if (digestSupplier != null) {
Utils.verifyBlobContents(digest, digestSupplier.get());
}
out.flush();
future.set(rawOut.getCount());
} catch (IOException e) {
future.setException(e);
} catch (RuntimeException e) {
logger.atWarning().withCause(e).log("Unexpected exception");
future.setException(e);
} finally {
releaseOut();
}
}

private void releaseOut() {
if (out instanceof ZstdDecompressingOutputStream) {
try {
((ZstdDecompressingOutputStream) out).closeShallow();
} catch (IOException e) {
logger.atWarning().withCause(e).log("failed to cleanly close output stream");
}
}
}
});
Context.CancellableContext grpcContext = Context.current().withCancellation();
future.addListener(() -> grpcContext.cancel(null), MoreExecutors.directExecutor());
grpcContext.run(
() ->
bsAsyncStub(context, channel)
.read(
ReadRequest.newBuilder()
.setResourceName(resourceName)
.setReadOffset(rawOut.getCount())
.build(),
new StreamObserver<ReadResponse>() {
@Override
public void onNext(ReadResponse readResponse) {
ByteString data = readResponse.getData();
try {
data.writeTo(out);
} catch (IOException e) {
// Cancel the call.
throw new RuntimeException(e);
}
// reset the stall backoff because we've made progress or been kept alive
progressiveBackoff.reset();
}

@Override
public void onError(Throwable t) {
if (rawOut.getCount() == digest.getSizeBytes()) {
// If the file was fully downloaded, it doesn't matter if there was an
// error at
// the end of the stream.
logger.atInfo().withCause(t).log(
"ignoring error because file was fully received");
onCompleted();
return;
}
releaseOut();
Status status = Status.fromThrowable(t);
if (status.getCode() == Status.Code.NOT_FOUND) {
future.setException(new CacheNotFoundException(digest));
} else {
future.setException(t);
}
}

@Override
public void onCompleted() {
try {
if (digestSupplier != null) {
Utils.verifyBlobContents(digest, digestSupplier.get());
}
out.flush();
future.set(rawOut.getCount());
} catch (IOException e) {
future.setException(e);
} catch (RuntimeException e) {
logger.atWarning().withCause(e).log("Unexpected exception");
future.setException(e);
} finally {
releaseOut();
}
}

private void releaseOut() {
if (out instanceof ZstdDecompressingOutputStream) {
try {
((ZstdDecompressingOutputStream) out).closeShallow();
} catch (IOException e) {
logger.atWarning().withCause(e).log(
"failed to cleanly close output stream");
}
}
}
}));
return future;
}

Expand Down
46 changes: 10 additions & 36 deletions src/main/java/com/google/devtools/build/lib/remote/RemoteCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CachedActionResult;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.CompletableFuture;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxFutures;
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
Expand Down Expand Up @@ -331,45 +330,20 @@ public ListenableFuture<Void> downloadFile(
reporter.started();
OutputStream out = new ReportingOutputStream(new LazyFileOutputStream(path), reporter);

CompletableFuture<Void> outerF = CompletableFuture.create();
ListenableFuture<Void> f = cacheProtocol.downloadBlob(context, digest, out);
outerF.setCancelCallback(() -> f.cancel(/* mayInterruptIfRunning= */ true));
Futures.addCallback(
f,
new FutureCallback<Void>() {
@Override
public void onSuccess(Void result) {
try {
out.close();
outerF.set(null);
reporter.finished();
} catch (IOException e) {
outerF.setException(e);
} catch (RuntimeException e) {
logger.atWarning().withCause(e).log("Unexpected exception");
outerF.setException(e);
}
}

@Override
public void onFailure(Throwable t) {
try {
out.close();
reporter.finished();
} catch (IOException e) {
if (t != e) {
t.addSuppressed(e);
}
} catch (RuntimeException e) {
logger.atWarning().withCause(e).log("Unexpected exception");
t.addSuppressed(e);
} finally {
outerF.setException(t);
}
f.addListener(
() -> {
try {
out.close();
reporter.finished();
} catch (IOException e) {
logger.atWarning().withCause(e).log(
"Unexpected exception closing output stream after downloading %s/%d to %s",
digest.getHash(), digest.getSizeBytes(), path);
}
},
directExecutor());
return outerF;
return f;
}

/**
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
package com.google.devtools.build.lib.remote.util;

import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.reactivex.rxjava3.annotations.NonNull;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableEmitter;
Expand Down Expand Up @@ -189,12 +191,18 @@ public void onFailure(Throwable throwable) {
* the {@link Completable} will automatically be cancelled.
*/
public static ListenableFuture<Void> toListenableFuture(Completable completable) {
CompletableFuture<Void> future = new CompletableFuture<>();
SettableFuture<Void> future = SettableFuture.create();
completable.subscribe(
new CompletableObserver() {
@Override
public void onSubscribe(Disposable d) {
future.setCancelCallback(d);
future.addListener(
() -> {
if (future.isCancelled()) {
d.dispose();
}
},
directExecutor());
}

@Override
Expand Down Expand Up @@ -222,12 +230,18 @@ public void onError(Throwable e) {
* the {@link Single} will automatically be cancelled.
*/
public static <T> ListenableFuture<T> toListenableFuture(Single<T> single) {
CompletableFuture<T> future = new CompletableFuture<>();
SettableFuture<T> future = SettableFuture.create();
single.subscribe(
new SingleObserver<T>() {
@Override
public void onSubscribe(Disposable d) {
future.setCancelCallback(d);
future.addListener(
() -> {
if (future.isCancelled()) {
d.dispose();
}
},
directExecutor());
}

@Override
Expand Down

0 comments on commit 44fef49

Please sign in to comment.