Skip to content

Commit

Permalink
Remote: Fix the issue that partial downloaded inputs are not deleted …
Browse files Browse the repository at this point in the history
…if the request is cancelled.

Following 280ef69, this change fixes the issue that partial downloaded files are not deleted. The root cause is, even we cancel the download futures inside AbstractActionInputPrefetcher and then delete the files, the actual downloads inside GrpcCacheClient are not cancelled so it is still writing to the path.

PiperOrigin-RevId: 447970542
  • Loading branch information
coeuvre authored and copybara-github committed May 11, 2022
1 parent a0cb25b commit a226eed
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

package com.google.devtools.build.lib.remote;

import static com.google.bytestream.ByteStreamGrpc.getReadMethod;
import static com.google.common.base.Strings.isNullOrEmpty;
import static io.grpc.stub.ClientCalls.asyncServerStreamingCall;

import build.bazel.remote.execution.v2.ActionCacheGrpc;
import build.bazel.remote.execution.v2.ActionCacheGrpc.ActionCacheFutureStub;
Expand All @@ -41,7 +43,6 @@
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.devtools.build.lib.authandtls.CallCredentialsProvider;
import com.google.devtools.build.lib.concurrent.ThreadSafety.ThreadSafe;
import com.google.devtools.build.lib.remote.RemoteRetrier.ProgressiveBackoff;
Expand All @@ -50,6 +51,7 @@
import com.google.devtools.build.lib.remote.common.RemoteActionExecutionContext;
import com.google.devtools.build.lib.remote.common.RemoteCacheClient;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.CompletableFuture;
import com.google.devtools.build.lib.remote.util.DigestOutputStream;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.TracingMetadataUtils;
Expand All @@ -58,6 +60,7 @@
import com.google.devtools.build.lib.vfs.Path;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
Expand Down Expand Up @@ -363,81 +366,84 @@ private ListenableFuture<Long> requestRead(
Channel channel) {
String resourceName =
getResourceName(options.remoteInstanceName, digest, options.cacheCompression);
SettableFuture<Long> future = SettableFuture.create();
CompletableFuture<Long> future = CompletableFuture.create();
OutputStream out;
try {
out = options.cacheCompression ? new ZstdDecompressingOutputStream(rawOut) : rawOut;
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
bsAsyncStub(context, channel)
.read(
ReadRequest.newBuilder()
.setResourceName(resourceName)
.setReadOffset(rawOut.getCount())
.build(),
new StreamObserver<ReadResponse>() {

@Override
public void onNext(ReadResponse readResponse) {
ByteString data = readResponse.getData();
try {
data.writeTo(out);
} catch (IOException e) {
// Cancel the call.
throw new RuntimeException(e);
}
// reset the stall backoff because we've made progress or been kept alive
progressiveBackoff.reset();
ByteStreamStub stub = bsAsyncStub(context, channel);
ClientCall<ReadRequest, ReadResponse> clientCall =
stub.getChannel().newCall(getReadMethod(), stub.getCallOptions());
future.setCancelCallback(() -> clientCall.cancel("Cancelled", /* cause= */ null));
asyncServerStreamingCall(
clientCall,
ReadRequest.newBuilder()
.setResourceName(resourceName)
.setReadOffset(rawOut.getCount())
.build(),
new StreamObserver<ReadResponse>() {

@Override
public void onNext(ReadResponse readResponse) {
ByteString data = readResponse.getData();
try {
data.writeTo(out);
} catch (IOException e) {
// Cancel the call.
throw new RuntimeException(e);
}
// reset the stall backoff because we've made progress or been kept alive
progressiveBackoff.reset();
}

@Override
public void onError(Throwable t) {
if (rawOut.getCount() == digest.getSizeBytes()) {
// If the file was fully downloaded, it doesn't matter if there was an error at
// the end of the stream.
logger.atInfo().withCause(t).log("ignoring error because file was fully received");
onCompleted();
return;
}
releaseOut();
Status status = Status.fromThrowable(t);
if (status.getCode() == Status.Code.NOT_FOUND) {
future.setException(new CacheNotFoundException(digest));
} else {
future.setException(t);
}
}

@Override
public void onCompleted() {
try {
if (digestSupplier != null) {
Utils.verifyBlobContents(digest, digestSupplier.get());
}

@Override
public void onError(Throwable t) {
if (rawOut.getCount() == digest.getSizeBytes()) {
// If the file was fully downloaded, it doesn't matter if there was an error at
// the end of the stream.
logger.atInfo().withCause(t).log(
"ignoring error because file was fully received");
onCompleted();
return;
}
releaseOut();
Status status = Status.fromThrowable(t);
if (status.getCode() == Status.Code.NOT_FOUND) {
future.setException(new CacheNotFoundException(digest));
} else {
future.setException(t);
}
}

@Override
public void onCompleted() {
try {
if (digestSupplier != null) {
Utils.verifyBlobContents(digest, digestSupplier.get());
}
out.flush();
future.set(rawOut.getCount());
} catch (IOException e) {
future.setException(e);
} catch (RuntimeException e) {
logger.atWarning().withCause(e).log("Unexpected exception");
future.setException(e);
} finally {
releaseOut();
}
}

private void releaseOut() {
if (out instanceof ZstdDecompressingOutputStream) {
try {
((ZstdDecompressingOutputStream) out).closeShallow();
} catch (IOException e) {
logger.atWarning().withCause(e).log("failed to cleanly close output stream");
}
}
out.flush();
future.set(rawOut.getCount());
} catch (IOException e) {
future.setException(e);
} catch (RuntimeException e) {
logger.atWarning().withCause(e).log("Unexpected exception");
future.setException(e);
} finally {
releaseOut();
}
}

private void releaseOut() {
if (out instanceof ZstdDecompressingOutputStream) {
try {
((ZstdDecompressingOutputStream) out).closeShallow();
} catch (IOException e) {
logger.atWarning().withCause(e).log("failed to cleanly close output stream");
}
});
}
}
});
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.google.devtools.build.lib.remote.common.RemoteCacheClient.CachedActionResult;
import com.google.devtools.build.lib.remote.options.RemoteOptions;
import com.google.devtools.build.lib.remote.util.AsyncTaskCache;
import com.google.devtools.build.lib.remote.util.CompletableFuture;
import com.google.devtools.build.lib.remote.util.DigestUtil;
import com.google.devtools.build.lib.remote.util.RxFutures;
import com.google.devtools.build.lib.server.FailureDetails.FailureDetail;
Expand Down Expand Up @@ -87,9 +88,7 @@ public class RemoteCache extends AbstractReferenceCounted {
protected final DigestUtil digestUtil;

public RemoteCache(
RemoteCacheClient cacheProtocol,
RemoteOptions options,
DigestUtil digestUtil) {
RemoteCacheClient cacheProtocol, RemoteOptions options, DigestUtil digestUtil) {
this.cacheProtocol = cacheProtocol;
this.options = options;
this.digestUtil = digestUtil;
Expand Down Expand Up @@ -332,8 +331,9 @@ public ListenableFuture<Void> downloadFile(
reporter.started();
OutputStream out = new ReportingOutputStream(new LazyFileOutputStream(path), reporter);

SettableFuture<Void> outerF = SettableFuture.create();
CompletableFuture<Void> outerF = CompletableFuture.create();
ListenableFuture<Void> f = cacheProtocol.downloadBlob(context, digest, out);
outerF.setCancelCallback(() -> f.cancel(/* mayInterruptIfRunning= */ true));
Futures.addCallback(
f,
new FutureCallback<Void>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 The Bazel Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.google.devtools.build.lib.remote.util;

import com.google.common.util.concurrent.AbstractFuture;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Action;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

/**
* A {@link com.google.common.util.concurrent.ListenableFuture} whose result can be set by a {@link
* #set(Object)} or {@link #setException(Throwable)}.
*
* <p>It differs from {@link com.google.common.util.concurrent.SettableFuture} that it provides
* {@link #setCancelCallback(Disposable)} for callers to register a callback which is called when
* the future is cancelled.
*/
public final class CompletableFuture<T> extends AbstractFuture<T> {

public static <T> CompletableFuture<T> create() {
return new CompletableFuture<>();
}

private final AtomicReference<Disposable> cancelCallback = new AtomicReference<>();

public void setCancelCallback(Action action) {
setCancelCallback(Disposable.fromAction(action));
}

public void setCancelCallback(Disposable cancelCallback) {
this.cancelCallback.set(cancelCallback);
// Just in case it was already canceled before we set the callback.
doCancelIfCancelled();
}

private void doCancelIfCancelled() {
if (isCancelled()) {
Disposable callback = cancelCallback.getAndSet(null);
if (callback != null) {
callback.dispose();
}
}
}

@Override
protected void afterDone() {
doCancelIfCancelled();
}

// Allow set to be called by other members.
@Override
public boolean set(@Nullable T t) {
return super.set(t);
}

// Allow setException to be called by other members.
@Override
public boolean setException(Throwable throwable) {
return super.setException(throwable);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import static com.google.common.base.Preconditions.checkState;

import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
Expand All @@ -34,7 +33,6 @@
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;

/** Methods for interoperating between Rx and ListenableFuture. */
Expand Down Expand Up @@ -249,39 +247,4 @@ public void onError(Throwable e) {
return future;
}

private static final class CompletableFuture<T> extends AbstractFuture<T> {
private final AtomicReference<Disposable> cancelCallback = new AtomicReference<>();

private void setCancelCallback(Disposable cancelCallback) {
this.cancelCallback.set(cancelCallback);
// Just in case it was already canceled before we set the callback.
doCancelIfCancelled();
}

private void doCancelIfCancelled() {
if (isCancelled()) {
Disposable callback = cancelCallback.getAndSet(null);
if (callback != null) {
callback.dispose();
}
}
}

@Override
protected void afterDone() {
doCancelIfCancelled();
}

// Allow set to be called by other members.
@Override
protected boolean set(@Nullable T t) {
return super.set(t);
}

// Allow setException to be called by other members.
@Override
protected boolean setException(Throwable throwable) {
return super.setException(throwable);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSortedMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.devtools.build.lib.actions.ActionInputHelper;
import com.google.devtools.build.lib.actions.cache.VirtualActionInput;
import com.google.devtools.build.lib.actions.util.ActionsTestUtil;
Expand All @@ -67,7 +68,9 @@
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.Status;
import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -151,6 +154,34 @@ public void onError(Throwable t) {
client.ensureInputsPresent(context, merkleTree, ImmutableMap.of(), /*force=*/ true);
}

@Test
public void downloadBlob_cancelled_cancelRequest() throws IOException {
// Test that if the download future is cancelled, the download itself is also cancelled.

// arrange
Digest digest = DIGEST_UTIL.computeAsUtf8("abcdefg");
AtomicBoolean cancelled = new AtomicBoolean();
// Mock a byte stream whose read method never finish.
serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public void read(ReadRequest request, StreamObserver<ReadResponse> responseObserver) {
((ServerCallStreamObserver<ReadResponse>) responseObserver)
.setOnCancelHandler(() -> cancelled.set(true));
}
});
GrpcCacheClient cacheClient = newClient();

// act
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
ListenableFuture<Void> download = cacheClient.downloadBlob(context, digest, out);
download.cancel(/* mayInterruptIfRunning= */ true);
}

// assert
assertThat(cancelled.get()).isTrue();
}

@Test
public void testDownloadEmptyBlob() throws Exception {
GrpcCacheClient client = newClient();
Expand Down
Loading

0 comments on commit a226eed

Please sign in to comment.