Skip to content

Commit

Permalink
Rework ByteStreamUploader early return logic.
Browse files Browse the repository at this point in the history
There are several points where ByteStreamUploader may discover that the server already has the blob fully uploaded. These points tried to effect an early return from the upload code, generally by "lying" to higher layers that the upload fully finished. That could lead to bugs. For example, consider the added test case: A compressed upload completes its writing but receives an error instead of a server ACK. On retry, QueryWriteStatus reveals the blob exists and returns its uncompressed size. This confused the checkCommittedSize logic, which expected the final committed size of a compressed upload to be the total compressed data size or -1. The code added by bazelbuild@daa3dbe also looks broken in the case of compressed uploads.

Rework the uploader code, so that early returns throw a AlreadyExists exception. The exception control flow naturally reflects the desire to escape quickly to the top level.

Closes bazelbuild#17791.

PiperOrigin-RevId: 517389227
Change-Id: I23a2ae92fd4ad27dad750418c128c0d0b245e573
  • Loading branch information
benjaminp authored and fweikert committed May 25, 2023
1 parent bbe980e commit b548ca7
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import com.google.bytestream.ByteStreamGrpc.ByteStreamFutureStub;
import com.google.bytestream.ByteStreamGrpc.ByteStreamStub;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusRequest;
import com.google.bytestream.ByteStreamProto.QueryWriteStatusResponse;
import com.google.bytestream.ByteStreamProto.WriteRequest;
import com.google.bytestream.ByteStreamProto.WriteResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ascii;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.util.concurrent.AsyncCallable;
import com.google.common.util.concurrent.Futures;
Expand Down Expand Up @@ -238,6 +236,16 @@ private ListenableFuture<Void> startAsyncUpload(
return currUpload;
}

/**
* Signal that the blob already exists on the server, so upload should complete early but
* successfully.
*/
private static final class AlreadyExists extends Exception {
private AlreadyExists() {
super();
}
}

private static final class AsyncUpload implements AsyncCallable<Long> {
private final RemoteActionExecutionContext context;
private final ReferenceCountedChannel channel;
Expand Down Expand Up @@ -269,28 +277,26 @@ private static final class AsyncUpload implements AsyncCallable<Long> {
}

ListenableFuture<Void> start() {
return Futures.transformAsync(
Utils.refreshIfUnauthenticatedAsync(
() -> retrier.executeAsync(this, progressiveBackoff), callCredentialsProvider),
committedSize -> {
try {
checkCommittedSize(committedSize);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return immediateVoidFuture();
},
return Futures.catching(
Futures.transformAsync(
Utils.refreshIfUnauthenticatedAsync(
() -> retrier.executeAsync(this, progressiveBackoff), callCredentialsProvider),
committedSize -> {
try {
checkCommittedSize(committedSize);
} catch (IOException e) {
return Futures.immediateFailedFuture(e);
}
return immediateVoidFuture();
},
MoreExecutors.directExecutor()),
AlreadyExists.class,
ae -> null,
MoreExecutors.directExecutor());
}

/** Check the committed_size the server returned makes sense after a successful full upload. */
private void checkCommittedSize(long committedSize) throws IOException {
// Only check for matching committed size if we have completed the upload. If another client
// did, they might have used a different compression level/algorithm, so we cannot know the
// expected committed offset
if (chunker.hasNext()) {
return;
}

long expected = chunker.getOffset();

if (committedSize == expected) {
Expand Down Expand Up @@ -329,9 +335,6 @@ public ListenableFuture<Long> call() {
firstAttempt ? Futures.immediateFuture(0L) : query(),
committedSize -> {
if (!firstAttempt) {
if (chunker.getSize() == committedSize) {
return Futures.immediateFuture(committedSize);
}
if (committedSize > lastCommittedOffset) {
// We have made progress on this upload in the last request. Reset the backoff so
// that this request has a full deck of retries
Expand Down Expand Up @@ -362,15 +365,18 @@ private ByteStreamStub bsAsyncStub(Channel channel) {

private ListenableFuture<Long> query() {
ListenableFuture<Long> committedSizeFuture =
Futures.transform(
Futures.transformAsync(
channel.withChannelFuture(
channel ->
bsFutureStub(channel)
.queryWriteStatus(
QueryWriteStatusRequest.newBuilder()
.setResourceName(resourceName)
.build())),
QueryWriteStatusResponse::getCommittedSize,
r ->
r.getComplete()
? Futures.immediateFailedFuture(new AlreadyExists())
: Futures.immediateFuture(r.getCommittedSize()),
MoreExecutors.directExecutor());
return Futures.catchingAsync(
committedSizeFuture,
Expand All @@ -392,24 +398,7 @@ private ListenableFuture<Long> upload(long pos) {
channel -> {
SettableFuture<Long> uploadResult = SettableFuture.create();
bsAsyncStub(channel).write(new Writer(resourceName, chunker, pos, uploadResult));
return Futures.catchingAsync(
uploadResult,
Throwable.class,
throwable -> {
Preconditions.checkNotNull(throwable);

Status status = Status.fromThrowable(throwable);
switch (status.getCode()) {
case ALREADY_EXISTS:
// Server indicated the blob already exists, so we translate the error to a
// successful upload.
return Futures.immediateFuture(chunker.getSize());

default:
return Futures.immediateFailedFuture(throwable);
}
},
MoreExecutors.directExecutor());
return uploadResult;
});
}
}
Expand All @@ -423,6 +412,7 @@ private static final class Writer
private long committedSize = -1;
private ClientCallStreamObserver<WriteRequest> requestObserver;
private boolean first = true;
private boolean finishedWriting;

private Writer(
String resourceName, Chunker chunker, long pos, SettableFuture<Long> uploadResult) {
Expand All @@ -447,10 +437,6 @@ public void beforeStart(ClientCallStreamObserver<WriteRequest> requestObserver)

@Override
public void run() {
if (committedSize != -1) {
requestObserver.cancel("server has returned early", null);
return;
}
while (requestObserver.isReady()) {
WriteRequest.Builder request = WriteRequest.newBuilder();
if (first) {
Expand All @@ -477,6 +463,7 @@ public void run() {
.build());
if (isLastChunk) {
requestObserver.onCompleted();
finishedWriting = true;
return;
}
}
Expand Down Expand Up @@ -515,12 +502,22 @@ public void onNext(WriteResponse response) {

@Override
public void onCompleted() {
uploadResult.set(committedSize);
if (finishedWriting) {
uploadResult.set(committedSize);
} else {
// Server completed succesfully before we finished writing all the data, meaning the blob
// already exists. The server is supposed to set committed_size to the size of the blob (for
// uncompressed uploads) or -1 (for compressed uploads), but we do not verify this.
requestObserver.cancel("server has returned early", null);
uploadResult.setException(new AlreadyExists());
}
}

@Override
public void onError(Throwable t) {
uploadResult.setException(t);
requestObserver.cancel("failed", t);
uploadResult.setException(
(Status.fromThrowable(t).getCode() == Code.ALREADY_EXISTS) ? new AlreadyExists() : t);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,65 @@ public void queryWriteStatus(
Mockito.verify(mockBackoff, Mockito.times(1)).getRetryAttempts();
}

@Test
public void progressiveCompressedUploadSeesAlreadyExistsAtTheEnd() throws Exception {
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(
() -> new FixedBackoff(1, 0),
e -> Status.fromThrowable(e).getCode() == Code.INTERNAL,
retryService);
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
referenceCountedChannel,
CallCredentialsProvider.NO_CREDENTIALS,
300,
retrier,
/* maximumOpenFiles= */ -1);

int chunkSize = 1024;
byte[] blob = new byte[chunkSize * 2 + 1];
new Random().nextBytes(blob);

Chunker chunker =
Chunker.builder().setInput(blob).setCompressed(true).setChunkSize(chunkSize).build();
Digest digest = DIGEST_UTIL.compute(blob);

serviceRegistry.addService(
new ByteStreamImplBase() {
@Override
public StreamObserver<WriteRequest> write(StreamObserver<WriteResponse> streamObserver) {
return new StreamObserver<WriteRequest>() {
@Override
public void onNext(WriteRequest writeRequest) {}

@Override
public void onError(Throwable throwable) {
fail("onError should never be called.");
}

@Override
public void onCompleted() {
streamObserver.onError(Status.INTERNAL.asException());
}
};
}

@Override
public void queryWriteStatus(
QueryWriteStatusRequest request, StreamObserver<QueryWriteStatusResponse> response) {
response.onNext(
QueryWriteStatusResponse.newBuilder()
.setCommittedSize(blob.length)
.setComplete(true)
.build());
response.onCompleted();
}
});

uploader.uploadBlob(context, digest, chunker);
}

@Test
public void concurrentlyCompletedUploadIsNotRetried() throws Exception {
// Test that after an upload has failed and the QueryWriteStatus call returns
Expand Down Expand Up @@ -609,8 +668,7 @@ public void queryWriteStatus(

@Test
public void earlyWriteResponseShouldCompleteUpload() throws Exception {
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
RemoteRetrier retrier = TestUtils.newRemoteRetrier(() -> mockBackoff, e -> false, retryService);
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
Expand Down Expand Up @@ -700,8 +758,7 @@ public void onCompleted() {

@Test
public void incorrectCommittedSizeDoesNotFailIncompleteUpload() throws Exception {
RemoteRetrier retrier =
TestUtils.newRemoteRetrier(() -> mockBackoff, (e) -> true, retryService);
RemoteRetrier retrier = TestUtils.newRemoteRetrier(() -> mockBackoff, e -> false, retryService);
ByteStreamUploader uploader =
new ByteStreamUploader(
INSTANCE_NAME,
Expand Down

0 comments on commit b548ca7

Please sign in to comment.