From c1d1f4a5c88d27296f69df0a832659e2b1eb9ca0 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Tue, 7 May 2024 17:28:57 -0400 Subject: [PATCH] fix: add strict client side response validation for gRPC chunked resumable uploads (#2527) * Rename JsonResumableSessionFailureScenario to ResumableSessionFailureScenario. The failure scenarios themselves are not json specific, and the methods which are json specific can have grpc overloads * Add more tests to validate GapicUnbufferedChunkedResumableWritableByteChannel is able to properly detect and handle various success responses from GCS which are not success for the client. --- ...edChunkedResumableWritableByteChannel.java | 158 ++-- ...apicWritableByteChannelSessionBuilder.java | 8 +- .../storage/JsonResumableSessionPutTask.java | 32 +- .../JsonResumableSessionQueryTask.java | 19 +- ...a => ResumableSessionFailureScenario.java} | 10 +- .../google/cloud/storage/ResumableWrite.java | 8 - .../cloud/storage/RewindableContent.java | 13 + ...unkedResumableWritableByteChannelTest.java | 817 ++++++++++++++++++ ...apicUnbufferedWritableByteChannelTest.java | 6 +- ... ResumableSessionFailureScenarioTest.java} | 14 +- .../com/google/cloud/storage/TestUtils.java | 6 +- 11 files changed, 969 insertions(+), 122 deletions(-) rename google-cloud-storage/src/main/java/com/google/cloud/storage/{JsonResumableSessionFailureScenario.java => ResumableSessionFailureScenario.java} (96%) create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java rename google-cloud-storage/src/test/java/com/google/cloud/storage/{JsonResumableSessionFailureScenarioTest.java => ResumableSessionFailureScenarioTest.java} (91%) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java index aa48810d49..bbcd986aaa 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java @@ -23,6 +23,7 @@ import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.rpc.ApiStreamObserver; import com.google.api.gax.rpc.ClientStreamingCallable; +import com.google.api.gax.rpc.OutOfRangeException; import com.google.cloud.storage.ChunkSegmenter.ChunkSegment; import com.google.cloud.storage.Conversions.Decoder; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; @@ -41,10 +42,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; -import java.util.function.LongConsumer; import java.util.function.Supplier; import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; final class GapicUnbufferedChunkedResumableWritableByteChannel implements UnbufferedWritableByteChannel { @@ -58,30 +58,26 @@ final class GapicUnbufferedChunkedResumableWritableByteChannel private final RetryingDependencies deps; private final ResultRetryAlgorithm alg; private final Supplier baseContextSupplier; - private final LongConsumer sizeCallback; - private final Consumer completeCallback; - private boolean open = true; + private volatile boolean open = true; private boolean finished = false; GapicUnbufferedChunkedResumableWritableByteChannel( SettableApiFuture resultFuture, @NonNull ChunkSegmenter chunkSegmenter, ClientStreamingCallable write, - ResumableWrite requestFactory, + WriteCtx writeCtx, RetryingDependencies deps, ResultRetryAlgorithm alg, Supplier baseContextSupplier) { this.resultFuture = resultFuture; this.chunkSegmenter = chunkSegmenter; this.write = write; - this.bucketName = requestFactory.bucketName(); - this.writeCtx = new WriteCtx<>(requestFactory); + this.bucketName = writeCtx.getRequestFactory().bucketName(); + this.writeCtx = writeCtx; this.deps = deps; this.alg = alg; this.baseContextSupplier = baseContextSupplier; - this.sizeCallback = writeCtx.getConfirmedBytes()::set; - this.completeCallback = resultFuture::set; } @Override @@ -106,7 +102,7 @@ public void close() throws IOException { if (open && !finished) { WriteObjectRequest message = finishMessage(true); try { - flush(ImmutableList.of(message)); + flush(ImmutableList.of(message), null, true); finished = true; } catch (RuntimeException e) { resultFuture.setException(e); @@ -122,12 +118,13 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo throw new ClosedChannelException(); } + long begin = writeCtx.getConfirmedBytes().get(); + RewindableContent content = RewindableContent.of(srcs, srcsOffset, srcsLength); ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength); List messages = new ArrayList<>(); boolean first = true; - int bytesConsumed = 0; for (ChunkSegment datum : data) { Crc32cLengthKnown crc32c = datum.getCrc32c(); ByteString b = datum.getB(); @@ -144,8 +141,13 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo WriteObjectRequest.Builder builder = writeCtx .newRequestBuilder() + .clearWriteObjectSpec() + .clearObjectChecksums() .setWriteOffset(offset) .setChecksummedData(checksummedData.build()); + if (!first) { + builder.clearUploadId(); + } if (!datum.isOnlyFullBlocks()) { builder.setFinishWrite(true); if (cumulative != null) { @@ -155,10 +157,9 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo finished = true; } - WriteObjectRequest build = possiblyPairDownRequest(builder, first).build(); + WriteObjectRequest build = builder.build(); first = false; messages.add(build); - bytesConsumed += contentSize; } if (finalize && !finished) { messages.add(finishMessage(first)); @@ -166,12 +167,15 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo } try { - flush(messages); + flush(messages, content, finalize); } catch (RuntimeException e) { resultFuture.setException(e); throw e; } + long end = writeCtx.getConfirmedBytes().get(); + + long bytesConsumed = end - begin; return bytesConsumed; } @@ -182,14 +186,20 @@ private WriteObjectRequest finishMessage(boolean first) { WriteObjectRequest.Builder b = writeCtx.newRequestBuilder().setFinishWrite(true).setWriteOffset(offset); + if (!first) { + b.clearUploadId(); + } if (crc32cValue != null) { b.setObjectChecksums(ObjectChecksums.newBuilder().setCrc32C(crc32cValue.getValue()).build()); } - WriteObjectRequest message = possiblyPairDownRequest(b, first).build(); + WriteObjectRequest message = b.build(); return message; } - private void flush(@NonNull List segments) { + private void flush( + @NonNull List segments, + @Nullable RewindableContent content, + boolean finalizing) { GrpcCallContext internalContext = contextWithBucketName(bucketName, baseContextSupplier.get()); ClientStreamingCallable callable = write.withDefaultCallContext(internalContext); @@ -198,7 +208,7 @@ private void flush(@NonNull List segments) { deps, alg, () -> { - Observer observer = new Observer(sizeCallback, completeCallback); + Observer observer = new Observer(content, finalizing); ApiStreamObserver write = callable.clientStreamingCall(observer); for (WriteObjectRequest message : segments) { @@ -211,81 +221,93 @@ private void flush(@NonNull List segments) { Decoder.identity()); } - /** - * Several fields of a WriteObjectRequest are only allowed on the "first" message sent to gcs, - * this utility method centralizes the logic necessary to clear those fields for use by subsequent - * messages. - */ - private static WriteObjectRequest.Builder possiblyPairDownRequest( - WriteObjectRequest.Builder b, boolean firstMessageOfStream) { - if (firstMessageOfStream && b.getWriteOffset() == 0) { - return b; - } - - if (!firstMessageOfStream) { - b.clearUploadId(); - } - - if (b.getWriteOffset() > 0) { - b.clearWriteObjectSpec(); - } - - if (b.getWriteOffset() > 0 && !b.getFinishWrite()) { - b.clearObjectChecksums(); - } - return b; - } - @VisibleForTesting WriteCtx getWriteCtx() { return writeCtx; } - static class Observer implements ApiStreamObserver { + class Observer implements ApiStreamObserver { - private final LongConsumer sizeCallback; - private final Consumer completeCallback; + private final RewindableContent content; + private final boolean finalizing; private final SettableApiFuture invocationHandle; private volatile WriteObjectResponse last; - Observer(LongConsumer sizeCallback, Consumer completeCallback) { - this.sizeCallback = sizeCallback; - this.completeCallback = completeCallback; + Observer(@Nullable RewindableContent content, boolean finalizing) { + this.content = content; + this.finalizing = finalizing; this.invocationHandle = SettableApiFuture.create(); } @Override public void onNext(WriteObjectResponse value) { - // incremental update - if (value.hasPersistedSize()) { - sizeCallback.accept(value.getPersistedSize()); - } else if (value.hasResource()) { - sizeCallback.accept(value.getResource().getSize()); - } last = value; } - /** - * observed exceptions so far - * - *
    - *
  1. {@link com.google.api.gax.rpc.OutOfRangeException} - *
  2. {@link com.google.api.gax.rpc.AlreadyExistsException} - *
  3. {@link io.grpc.StatusRuntimeException} - *
- */ @Override public void onError(Throwable t) { - invocationHandle.setException(t); + if (t instanceof OutOfRangeException) { + OutOfRangeException oore = (OutOfRangeException) t; + open = false; + invocationHandle.setException( + ResumableSessionFailureScenario.SCENARIO_5.toStorageException()); + } else { + invocationHandle.setException(t); + } } @Override public void onCompleted() { - if (last != null && last.hasResource()) { - completeCallback.accept(last); + try { + if (last == null) { + throw new StorageException( + 0, "onComplete without preceding onNext, unable to determine success."); + } else if (!finalizing && last.hasPersistedSize()) { // incremental + long totalSentBytes = writeCtx.getTotalSentBytes().get(); + long persistedSize = last.getPersistedSize(); + + if (totalSentBytes == persistedSize) { + writeCtx.getConfirmedBytes().set(persistedSize); + } else if (persistedSize < totalSentBytes) { + long delta = totalSentBytes - persistedSize; + // rewind our content and any state that my have run ahead of the actual ack'd bytes + content.rewindTo(delta); + writeCtx.getTotalSentBytes().set(persistedSize); + writeCtx.getConfirmedBytes().set(persistedSize); + } else { + throw ResumableSessionFailureScenario.SCENARIO_7.toStorageException(); + } + } else if (finalizing && last.hasResource()) { + long totalSentBytes = writeCtx.getTotalSentBytes().get(); + long finalSize = last.getResource().getSize(); + if (totalSentBytes == finalSize) { + writeCtx.getConfirmedBytes().set(finalSize); + resultFuture.set(last); + } else if (finalSize < totalSentBytes) { + throw ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException(); + } else { + throw ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException(); + } + } else if (!finalizing && last.hasResource()) { + throw ResumableSessionFailureScenario.SCENARIO_1.toStorageException(); + } else if (finalizing && last.hasPersistedSize()) { + long totalSentBytes = writeCtx.getTotalSentBytes().get(); + long persistedSize = last.getPersistedSize(); + if (persistedSize < totalSentBytes) { + throw ResumableSessionFailureScenario.SCENARIO_3.toStorageException(); + } else { + throw ResumableSessionFailureScenario.SCENARIO_2.toStorageException(); + } + } else { + throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException(); + } + } catch (Throwable se) { + open = false; + invocationHandle.setException(se); + } finally { + invocationHandle.set(null); } - invocationHandle.set(null); } void await() { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java index 1c6ad18c04..8854053322 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java @@ -299,13 +299,13 @@ UnbufferedWritableByteChannelSession build() { result, getChunkSegmenter(), write, - ResumableWrite.identity(start), + new WriteCtx<>(start), deps, alg, Retrying::newCallContext); } else { return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( - result, getChunkSegmenter(), write, ResumableWrite.identity(start)); + result, getChunkSegmenter(), write, start); } }) .andThen(StorageByteChannels.writable()::createSynchronized)); @@ -340,13 +340,13 @@ BufferedWritableByteChannelSession build() { result, getChunkSegmenter(), write, - ResumableWrite.identity(start), + new WriteCtx<>(start), deps, alg, Retrying::newCallContext); } else { return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel( - result, getChunkSegmenter(), write, ResumableWrite.identity(start)); + result, getChunkSegmenter(), write, start); } }) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java index 622988d4cb..9d3afa5a83 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java @@ -101,7 +101,7 @@ public void rewindTo(long offset) { int code = response.getStatusCode(); - if (!finalizing && JsonResumableSessionFailureScenario.isContinue(code)) { + if (!finalizing && ResumableSessionFailureScenario.isContinue(code)) { long effectiveEnd = ((HttpContentRange.HasRange) contentRange).range().endOffset(); @Nullable String range = response.getHeaders().getRange(); ByteRangeSpec ackRange = ByteRangeSpec.parse(range); @@ -114,11 +114,11 @@ public void rewindTo(long offset) { return ResumableOperationResult.incremental(ackRange.endOffset()); } else { StorageException se = - JsonResumableSessionFailureScenario.SCENARIO_7.toStorageException(uploadId, response); + ResumableSessionFailureScenario.SCENARIO_7.toStorageException(uploadId, response); span.setStatus(Status.UNKNOWN.withDescription(se.getMessage())); throw se; } - } else if (finalizing && JsonResumableSessionFailureScenario.isOk(code)) { + } else if (finalizing && ResumableSessionFailureScenario.isOk(code)) { @Nullable StorageObject storageObject; BigInteger actualSize = BigInteger.ZERO; @@ -145,7 +145,7 @@ public void rewindTo(long offset) { } else { response.ignore(); StorageException se = - JsonResumableSessionFailureScenario.SCENARIO_0_1.toStorageException( + ResumableSessionFailureScenario.SCENARIO_0_1.toStorageException( uploadId, response, null, () -> null); span.setStatus(Status.UNKNOWN.withDescription(se.getMessage())); throw se; @@ -158,35 +158,35 @@ public void rewindTo(long offset) { return ResumableOperationResult.complete(storageObject, actualSize.longValue()); } else if (compare > 0) { StorageException se = - JsonResumableSessionFailureScenario.SCENARIO_4_1.toStorageException( + ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException( uploadId, response, null, toString(storageObject)); span.setStatus(Status.UNKNOWN.withDescription(se.getMessage())); throw se; } else { StorageException se = - JsonResumableSessionFailureScenario.SCENARIO_4_2.toStorageException( + ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException( uploadId, response, null, toString(storageObject)); span.setStatus(Status.UNKNOWN.withDescription(se.getMessage())); throw se; } - } else if (!finalizing && JsonResumableSessionFailureScenario.isOk(code)) { + } else if (!finalizing && ResumableSessionFailureScenario.isOk(code)) { StorageException se = - JsonResumableSessionFailureScenario.SCENARIO_1.toStorageException(uploadId, response); + ResumableSessionFailureScenario.SCENARIO_1.toStorageException(uploadId, response); span.setStatus(Status.UNKNOWN.withDescription(se.getMessage())); throw se; - } else if (finalizing && JsonResumableSessionFailureScenario.isContinue(code)) { + } else if (finalizing && ResumableSessionFailureScenario.isContinue(code)) { // in order to finalize the content range must have a size, cast down to read it HttpContentRange.HasSize size = (HttpContentRange.HasSize) contentRange; ByteRangeSpec range = ByteRangeSpec.parse(response.getHeaders().getRange()); if (range.endOffsetInclusive() < size.getSize()) { StorageException se = - JsonResumableSessionFailureScenario.SCENARIO_3.toStorageException(uploadId, response); + ResumableSessionFailureScenario.SCENARIO_3.toStorageException(uploadId, response); span.setStatus(Status.UNKNOWN.withDescription(se.getMessage())); throw se; } else { StorageException se = - JsonResumableSessionFailureScenario.SCENARIO_2.toStorageException(uploadId, response); + ResumableSessionFailureScenario.SCENARIO_2.toStorageException(uploadId, response); span.setStatus(Status.UNKNOWN.withDescription(se.getMessage())); throw se; } @@ -198,8 +198,8 @@ public void rewindTo(long offset) { // a 503 with plain text content // Attempt to detect this very loosely as to minimize impact of modified error message // This is accurate circa 2023-06 - if ((!JsonResumableSessionFailureScenario.isOk(code) - && !JsonResumableSessionFailureScenario.isContinue(code)) + if ((!ResumableSessionFailureScenario.isOk(code) + && !ResumableSessionFailureScenario.isContinue(code)) && contentType != null && contentType.startsWith("text/plain") && contentLength != null @@ -207,14 +207,14 @@ public void rewindTo(long offset) { String errorMessage = cause.getContent().toLowerCase(Locale.US); if (errorMessage.contains("content-range")) { StorageException se = - JsonResumableSessionFailureScenario.SCENARIO_5.toStorageException( + ResumableSessionFailureScenario.SCENARIO_5.toStorageException( uploadId, response, cause, cause::getContent); span.setStatus(Status.UNKNOWN.withDescription(se.getMessage())); throw se; } } StorageException se = - JsonResumableSessionFailureScenario.toStorageException(response, cause, uploadId); + ResumableSessionFailureScenario.toStorageException(response, cause, uploadId); span.setStatus(Status.UNKNOWN.withDescription(se.getMessage())); throw se; } @@ -227,7 +227,7 @@ public void rewindTo(long offset) { throw e; } catch (Exception e) { StorageException se = - JsonResumableSessionFailureScenario.SCENARIO_0.toStorageException(uploadId, response, e); + ResumableSessionFailureScenario.SCENARIO_0.toStorageException(uploadId, response, e); span.setStatus(Status.UNKNOWN.withDescription(se.getMessage())); throw se; } finally { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionQueryTask.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionQueryTask.java index 5ce0de6fe3..57c5868c8e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionQueryTask.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionQueryTask.java @@ -55,7 +55,7 @@ final class JsonResumableSessionQueryTask response = req.execute(); int code = response.getStatusCode(); - if (JsonResumableSessionFailureScenario.isOk(code)) { + if (ResumableSessionFailureScenario.isOk(code)) { @Nullable StorageObject storageObject; @Nullable BigInteger actualSize; @@ -74,7 +74,7 @@ final class JsonResumableSessionQueryTask storageObject = null; } else { response.ignore(); - throw JsonResumableSessionFailureScenario.SCENARIO_0_1.toStorageException( + throw ResumableSessionFailureScenario.SCENARIO_0_1.toStorageException( uploadId, response, null, () -> null); } if (actualSize != null) { @@ -84,13 +84,13 @@ final class JsonResumableSessionQueryTask return ResumableOperationResult.incremental(actualSize.longValue()); } } else { - throw JsonResumableSessionFailureScenario.SCENARIO_0.toStorageException( + throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException( uploadId, response, null, () -> storageObject != null ? storageObject.toString() : null); } - } else if (JsonResumableSessionFailureScenario.isContinue(code)) { + } else if (ResumableSessionFailureScenario.isContinue(code)) { String range1 = response.getHeaders().getRange(); if (range1 != null) { ByteRangeSpec range = ByteRangeSpec.parse(range1); @@ -110,23 +110,22 @@ final class JsonResumableSessionQueryTask // a 503 with plain text content // Attempt to detect this very loosely as to minimize impact of modified error message // This is accurate circa 2023-06 - if ((!JsonResumableSessionFailureScenario.isOk(code) - && !JsonResumableSessionFailureScenario.isContinue(code)) + if ((!ResumableSessionFailureScenario.isOk(code) + && !ResumableSessionFailureScenario.isContinue(code)) && contentType != null && contentType.startsWith("text/plain")) { String errorMessage = cause.getContent().toLowerCase(Locale.US); if (errorMessage.contains("content-range")) { - throw JsonResumableSessionFailureScenario.SCENARIO_5.toStorageException( + throw ResumableSessionFailureScenario.SCENARIO_5.toStorageException( uploadId, response, cause, cause::getContent); } } - throw JsonResumableSessionFailureScenario.toStorageException(response, cause, uploadId); + throw ResumableSessionFailureScenario.toStorageException(response, cause, uploadId); } } catch (StorageException se) { throw se; } catch (Exception e) { - throw JsonResumableSessionFailureScenario.SCENARIO_0.toStorageException( - uploadId, response, e); + throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException(uploadId, response, e); } finally { if (response != null) { try { diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionFailureScenario.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java similarity index 96% rename from google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionFailureScenario.java rename to google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java index 2e87117800..0bccb2ff78 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionFailureScenario.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java @@ -32,7 +32,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; @ParametersAreNonnullByDefault -enum JsonResumableSessionFailureScenario { +enum ResumableSessionFailureScenario { // TODO: send more bytes than are in the Content-Range header SCENARIO_0(BaseServiceException.UNKNOWN_CODE, null, "Unknown Error"), SCENARIO_0_1(BaseServiceException.UNKNOWN_CODE, null, "Response not application/json."), @@ -87,7 +87,7 @@ enum JsonResumableSessionFailureScenario { @Nullable private final String reason; private final String message; - JsonResumableSessionFailureScenario(int code, @Nullable String reason, String message) { + ResumableSessionFailureScenario(int code, @Nullable String reason, String message) { this.code = code; this.reason = reason; this.message = message; @@ -116,11 +116,15 @@ StorageException toStorageException( return toStorageException(code, message, reason, uploadId, resp, cause, contentCallable); } + StorageException toStorageException() { + return new StorageException(code, message, reason, null); + } + static StorageException toStorageException( HttpResponse response, HttpResponseException cause, String uploadId) { String statusMessage = cause.getStatusMessage(); StorageException se = - JsonResumableSessionFailureScenario.toStorageException( + ResumableSessionFailureScenario.toStorageException( cause.getStatusCode(), String.format( "%d %s", cause.getStatusCode(), statusMessage == null ? "" : statusMessage), diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableWrite.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableWrite.java index 75921032de..b7bee9854b 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableWrite.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableWrite.java @@ -84,12 +84,4 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(req, res); } - - /** - * Helper function which is more specific than {@link Function#identity()}. Constraining the input - * and output to be exactly {@link ResumableWrite}. - */ - static ResumableWrite identity(ResumableWrite w) { - return w; - } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java index 437ef49c6e..03d9cc4627 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java @@ -61,6 +61,19 @@ static RewindableContent of(ByteBuffer... buffers) { return new ByteBufferContent(buffers); } + public static RewindableContent of(ByteBuffer[] srcs, int srcsOffset, int srcsLength) { + Preconditions.checkNotNull(srcs, "srcs must be non null"); + if (!(0 <= srcsOffset && srcsOffset <= srcs.length)) { + throw new ArrayIndexOutOfBoundsException( + String.format( + "srcsOffset out of bounds (0 <= %d && %d <= %d)", + srcsOffset, srcsOffset, srcs.length)); + } + Preconditions.checkArgument(srcsLength >= 0, "srcsLength >= 0 (%d >= 0)", srcsLength); + int end = srcsOffset + srcsLength; + return new ByteBufferContent(Arrays.copyOfRange(srcs, srcsOffset, end)); + } + static RewindableContent of(Path path) throws IOException { return new PathRewindableContent(path); } diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java new file mode 100644 index 0000000000..3170afb750 --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedChunkedResumableWritableByteChannelTest.java @@ -0,0 +1,817 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import static com.google.cloud.storage.ByteSizeConstants._256KiB; +import static com.google.cloud.storage.ByteSizeConstants._512KiB; +import static com.google.cloud.storage.ByteSizeConstants._768KiB; +import static com.google.cloud.storage.TestUtils.assertAll; +import static com.google.common.truth.Truth.assertThat; +import static com.google.common.truth.Truth.assertWithMessage; +import static org.junit.Assert.assertThrows; + +import com.google.api.core.SettableApiFuture; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.cloud.storage.ITGapicUnbufferedWritableByteChannelTest.DirectWriteService; +import com.google.cloud.storage.Retrying.RetryingDependencies; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.protobuf.ByteString; +import com.google.storage.v2.ChecksummedData; +import com.google.storage.v2.Object; +import com.google.storage.v2.StartResumableWriteRequest; +import com.google.storage.v2.StartResumableWriteResponse; +import com.google.storage.v2.StorageClient; +import com.google.storage.v2.StorageGrpc.StorageImplBase; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; +import io.grpc.Status.Code; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.TimeUnit; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.junit.Test; + +public final class ITGapicUnbufferedChunkedResumableWritableByteChannelTest { + + private static final ChunkSegmenter CHUNK_SEGMENTER = + new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _256KiB, _256KiB); + + /** + * + * + *

S.1

+ * + * Attempting to append to a session which has already been finalized should raise an error + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = { name = obj, size = 524288 }
+   *     
client state
+   * write_offset = 0, data = [0:262144]
+   *     
request
+   * WriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset= 0, checksummed_data.content.length = 262144 }
+   *     
response
+   * onNext(WriteObjectResponse{ resources = {name = obj, size = 525288 } })
+   *     
+ */ + @Test + public void scenario1() throws Exception { + + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .build(); + WriteObjectResponse resp1 = + WriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("obj").setSize(_512KiB).build()) + .build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + ResumableWrite resumableWrite = getResumableWrite(uploadId); + + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + SettableApiFuture done = SettableApiFuture.create(); + //noinspection resource + GapicUnbufferedChunkedResumableWritableByteChannel channel = + new GapicUnbufferedChunkedResumableWritableByteChannel( + done, + CHUNK_SEGMENTER, + storageClient.writeObjectCallable(), + writeCtx, + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + GrpcCallContext::createDefault); + + ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.2

+ * + * Attempting to finalize a session with fewer bytes than GCS acknowledges. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * persisted_size = 524288
+   *     
client state
+   * write_offset = 262144, finish = true
+   *     
request
+   * WriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 262144, finish_write = true}
+   *     
response
+   * onNext(WriteObjectResponse{ persisted_size = 525288 })
+   *     
+ */ + @Test + public void scenario2() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setFinishWrite(true) + .build(); + WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_512KiB).build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_256KiB); + writeCtx.getConfirmedBytes().set(_256KiB); + + //noinspection resource + GapicUnbufferedChunkedResumableWritableByteChannel channel = + new GapicUnbufferedChunkedResumableWritableByteChannel( + done, + CHUNK_SEGMENTER, + storageClient.writeObjectCallable(), + writeCtx, + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("invalid"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.3

+ * + * Attempting to finalize a session with more bytes than GCS acknowledges. + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * persisted_size = 262144
+   *     
client state
+   * write_offset = 524288, finish = true
+   *     
request
+   * WriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 524288, finish_write = true}
+   *     
response
+   * onNext(WriteObjectResponse{ persisted_size = 262144 })
+   *     
+ */ + @Test + public void scenario3() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(_512KiB); + + //noinspection resource + GapicUnbufferedChunkedResumableWritableByteChannel channel = + new GapicUnbufferedChunkedResumableWritableByteChannel( + done, + CHUNK_SEGMENTER, + storageClient.writeObjectCallable(), + writeCtx, + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.4

+ * + * Attempting to finalize an already finalized session + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 262144}
+   *     
client state
+   * write_offset = 262144, finish = true
+   *     
request
+   * WriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 262144, finish_write = true}
+   *     
response
+   * onNext(WriteObjectResponse{ resources = {name = obj, size = 262144 } })
+   *     
+ */ + @Test + public void scenario4() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setFinishWrite(true) + .build(); + WriteObjectResponse resp1 = + WriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_256KiB).build()) + .build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_256KiB); + writeCtx.getConfirmedBytes().set(_256KiB); + + GapicUnbufferedChunkedResumableWritableByteChannel channel = + new GapicUnbufferedChunkedResumableWritableByteChannel( + done, + CHUNK_SEGMENTER, + storageClient.writeObjectCallable(), + writeCtx, + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + GrpcCallContext::createDefault); + + channel.close(); + + WriteObjectResponse writeObjectResponse = done.get(2, TimeUnit.SECONDS); + assertThat(writeObjectResponse).isEqualTo(resp1); + } + } + + /** + * + * + *

S.4.1

+ * + * Attempting to finalize an already finalized session (ack < expected) + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 262144}
+   *     
client state
+   * write_offset = 524288, finish = true
+   *     
request
+   * WriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 524288, finish_write = true}
+   *     
response
+   * onNext(WriteObjectResponse{ resources = {name = obj, size = 262144 } })
+   *     
+ */ + @Test + public void scenario4_1() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + WriteObjectResponse resp1 = + WriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_256KiB).build()) + .build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(_512KiB); + + //noinspection resource + GapicUnbufferedChunkedResumableWritableByteChannel channel = + new GapicUnbufferedChunkedResumableWritableByteChannel( + done, + CHUNK_SEGMENTER, + storageClient.writeObjectCallable(), + writeCtx, + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.4.2

+ * + * Attempting to finalize an already finalized session (ack > expected) + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * resource = {name = obj1, size = 786432}
+   *     
client state
+   * write_offset = 524288, finish = true
+   *     
request
+   * WriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 524288, finish_write = true}
+   *     
response
+   * onNext(WriteObjectResponse{ resources = {name = obj, size = 786432 } })
+   *     
+ */ + @Test + public void scenario4_2() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_512KiB) + .setFinishWrite(true) + .build(); + WriteObjectResponse resp1 = + WriteObjectResponse.newBuilder() + .setResource(Object.newBuilder().setName("name").setSize(_768KiB).build()) + .build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_512KiB); + writeCtx.getConfirmedBytes().set(_512KiB); + + //noinspection resource + GapicUnbufferedChunkedResumableWritableByteChannel channel = + new GapicUnbufferedChunkedResumableWritableByteChannel( + done, + CHUNK_SEGMENTER, + storageClient.writeObjectCallable(), + writeCtx, + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + GrpcCallContext::createDefault); + + StorageException se = assertThrows(StorageException.class, channel::close); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_512KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.5

+ * + * Attempt to append to a resumable session with an offset higher than GCS expects + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + * + *
server state
+   * persisted_size = 0
+   *     
client state
+   * write_offset = 262144, data = [262144:524288]
+   *     
request
+   * WriteObjectRequest{ upload_id = $UPLOAD_ID, write_offset = 262144, checksummed_data.content.length = 262144}
+   *     
response
+   * onError(Status{code=OUT_OF_RANGE, description="Upload request started at offset '262144', which is past expected offset '0'."})
+   *     
+ */ + @Test + public void scenario5() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setWriteOffset(_256KiB) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB)))) + .build(); + StorageImplBase service1 = + new DirectWriteService( + (obs, requests) -> { + if (requests.equals(ImmutableList.of(req1))) { + obs.onError( + TestUtils.apiException( + Code.OUT_OF_RANGE, + "Upload request started at offset '262144', which is past expected offset '0'.")); + } else { + obs.onError( + TestUtils.apiException(Code.PERMISSION_DENIED, "Unexpected request chain.")); + } + }); + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + writeCtx.getTotalSentBytes().set(_256KiB); + writeCtx.getConfirmedBytes().set(_256KiB); + + //noinspection resource + GapicUnbufferedChunkedResumableWritableByteChannel channel = + new GapicUnbufferedChunkedResumableWritableByteChannel( + done, + CHUNK_SEGMENTER, + storageClient.writeObjectCallable(), + writeCtx, + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + GrpcCallContext::createDefault); + + ByteBuffer bb = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(bb)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + /** + * + * + *

S.7

+ * + * GCS Acknowledges more bytes than were sent in the PUT + * + *

The client believes the server offset is N, it sends K bytes and the server responds that N + * + 2K bytes are now committed. + * + *

The client has detected data loss and should raise an error and prevent sending of more + * bytes. + */ + @Test + public void scenario7() throws Exception { + + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .build(); + WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_512KiB).build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + + //noinspection resource + GapicUnbufferedChunkedResumableWritableByteChannel channel = + new GapicUnbufferedChunkedResumableWritableByteChannel( + done, + CHUNK_SEGMENTER, + storageClient.writeObjectCallable(), + writeCtx, + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + GrpcCallContext::createDefault); + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); + StorageException se = assertThrows(StorageException.class, () -> channel.write(buf)); + assertAll( + () -> assertThat(se.getCode()).isEqualTo(0), + () -> assertThat(se.getReason()).isEqualTo("dataLoss"), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(0), + () -> assertThat(channel.isOpen()).isFalse()); + } + } + + @Test + public void incremental_success() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_256KiB))) + .build()) + .build(); + WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + + //noinspection resource + GapicUnbufferedChunkedResumableWritableByteChannel channel = + new GapicUnbufferedChunkedResumableWritableByteChannel( + done, + CHUNK_SEGMENTER, + storageClient.writeObjectCallable(), + writeCtx, + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + GrpcCallContext::createDefault); + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_256KiB); + int written = channel.write(buf); + assertAll( + () -> assertThat(buf.remaining()).isEqualTo(0), + () -> assertThat(written).isEqualTo(_256KiB), + () -> assertThat(writeCtx.getTotalSentBytes().get()).isEqualTo(_256KiB), + () -> assertThat(writeCtx.getConfirmedBytes().get()).isEqualTo(_256KiB)); + } + } + + @Test + public void incremental_partialSuccess() throws Exception { + String uploadId = "uploadId"; + WriteObjectRequest req1 = + WriteObjectRequest.newBuilder() + .setUploadId(uploadId) + .setChecksummedData( + ChecksummedData.newBuilder() + .setContent( + ByteString.copyFrom(DataGenerator.base64Characters().genBytes(_512KiB))) + .build()) + .build(); + WriteObjectResponse resp1 = WriteObjectResponse.newBuilder().setPersistedSize(_256KiB).build(); + + ImmutableMap, WriteObjectResponse> map = + ImmutableMap.of(ImmutableList.of(req1), resp1); + DirectWriteService service1 = new DirectWriteService(map); + + try (FakeServer fakeServer = FakeServer.of(service1); + GrpcStorageImpl storage = + (GrpcStorageImpl) fakeServer.getGrpcStorageOptions().getService()) { + StorageClient storageClient = storage.storageClient; + + SettableApiFuture done = SettableApiFuture.create(); + ResumableWrite resumableWrite = getResumableWrite(uploadId); + WriteCtx writeCtx = new WriteCtx<>(resumableWrite); + + ChunkSegmenter chunkSegmenter = + new ChunkSegmenter(Hasher.noop(), ByteStringStrategy.copy(), _512KiB, _256KiB); + //noinspection resource + GapicUnbufferedChunkedResumableWritableByteChannel channel = + new GapicUnbufferedChunkedResumableWritableByteChannel( + done, + chunkSegmenter, + storageClient.writeObjectCallable(), + writeCtx, + RetryingDependencies.attemptOnce(), + Retrying.neverRetry(), + GrpcCallContext::createDefault); + + ByteBuffer buf = DataGenerator.base64Characters().genByteBuffer(_512KiB); + int written = channel.write(buf); + assertAll( + () -> assertThat(buf.remaining()).isEqualTo(_256KiB), + () -> assertThat(written).isEqualTo(_256KiB), + () -> + assertWithMessage("totalSentBytes") + .that(writeCtx.getTotalSentBytes().get()) + .isEqualTo(_256KiB), + () -> + assertWithMessage("confirmedBytes") + .that(writeCtx.getConfirmedBytes().get()) + .isEqualTo(_256KiB)); + } + } + + private static @NonNull ResumableWrite getResumableWrite(String uploadId) { + StartResumableWriteRequest req = StartResumableWriteRequest.getDefaultInstance(); + StartResumableWriteResponse resp = + StartResumableWriteResponse.newBuilder().setUploadId(uploadId).build(); + return new ResumableWrite( + req, resp, id -> WriteObjectRequest.newBuilder().setUploadId(id).build()); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java index edbc8006de..239b9bf3e6 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedWritableByteChannelTest.java @@ -183,7 +183,7 @@ public void resumableUpload() throws IOException, InterruptedException, Executio result, segmenter, sc.writeObjectCallable(), - reqFactory, + new WriteCtx<>(reqFactory), RetryingDependencies.attemptOnce(), Retrying.neverRetry(), Retrying::newCallContext); @@ -266,7 +266,7 @@ public void resumableUpload_chunkAutomaticRetry() result, segmenter, sc.writeObjectCallable(), - reqFactory, + new WriteCtx<>(reqFactory), TestUtils.defaultRetryingDeps(), new BasicResultRetryAlgorithm() { @Override @@ -323,7 +323,7 @@ public void resumableUpload_finalizeWhenWriteAndCloseCalledEvenWhenQuantumAligne result, segmenter, sc.writeObjectCallable(), - reqFactory, + new WriteCtx<>(reqFactory), RetryingDependencies.attemptOnce(), Retrying.neverRetry(), Retrying::newCallContext); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/JsonResumableSessionFailureScenarioTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ResumableSessionFailureScenarioTest.java similarity index 91% rename from google-cloud-storage/src/test/java/com/google/cloud/storage/JsonResumableSessionFailureScenarioTest.java rename to google-cloud-storage/src/test/java/com/google/cloud/storage/ResumableSessionFailureScenarioTest.java index f40cf27f47..0a53a4bbc6 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/JsonResumableSessionFailureScenarioTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ResumableSessionFailureScenarioTest.java @@ -16,8 +16,8 @@ package com.google.cloud.storage; -import static com.google.cloud.storage.JsonResumableSessionFailureScenario.isContinue; -import static com.google.cloud.storage.JsonResumableSessionFailureScenario.isOk; +import static com.google.cloud.storage.ResumableSessionFailureScenario.isContinue; +import static com.google.cloud.storage.ResumableSessionFailureScenario.isOk; import static com.google.common.truth.Truth.assertThat; import com.google.api.client.http.EmptyContent; @@ -33,7 +33,7 @@ import java.nio.charset.StandardCharsets; import org.junit.Test; -public final class JsonResumableSessionFailureScenarioTest { +public final class ResumableSessionFailureScenarioTest { private static final GsonFactory gson = GsonFactory.getDefaultInstance(); @Test @@ -64,7 +64,7 @@ public void toStorageException_ioExceptionDuringContentResolutionAddedAsSuppress resp.getHeaders().setContentType("text/plain; charset=utf-8").setContentLength(5L); StorageException storageException = - JsonResumableSessionFailureScenario.SCENARIO_1.toStorageException( + ResumableSessionFailureScenario.SCENARIO_1.toStorageException( "uploadId", resp, new Cause(), @@ -105,7 +105,7 @@ public void multilineResponseBodyIsProperlyPrefixed() throws Exception { .setContentLength((long) bytes.length); StorageException storageException = - JsonResumableSessionFailureScenario.SCENARIO_0.toStorageException( + ResumableSessionFailureScenario.SCENARIO_0.toStorageException( "uploadId", resp, null, () -> json); assertThat(storageException.getCode()).isEqualTo(0); @@ -128,7 +128,7 @@ public void xGoogStoredHeadersIncludedIfPresent() throws IOException { .setContentLength(0L); StorageException storageException = - JsonResumableSessionFailureScenario.SCENARIO_0.toStorageException( + ResumableSessionFailureScenario.SCENARIO_0.toStorageException( "uploadId", resp, null, () -> null); assertThat(storageException.getCode()).isEqualTo(0); @@ -151,7 +151,7 @@ public void xGoogGcsIdempotencyTokenHeadersIncludedIfPresent() throws IOExceptio resp.getHeaders().set("X-Goog-Gcs-Idempotency-Token", "5").setContentLength(0L); StorageException storageException = - JsonResumableSessionFailureScenario.SCENARIO_0.toStorageException( + ResumableSessionFailureScenario.SCENARIO_0.toStorageException( "uploadId", resp, null, () -> null); assertThat(storageException.getCode()).isEqualTo(0); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java index 27e925f8d9..e1eb11c981 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/TestUtils.java @@ -103,9 +103,9 @@ public static ApiException apiException(Code code) { } public static ApiException apiException(Code code, String message) { - StatusRuntimeException statusRuntimeException = code.toStatus().asRuntimeException(); - DebugInfo debugInfo = - DebugInfo.newBuilder().setDetail("forced failure |~| " + code.name() + message).build(); + StatusRuntimeException statusRuntimeException = + code.toStatus().withDescription(message).asRuntimeException(); + DebugInfo debugInfo = DebugInfo.newBuilder().setDetail(message).build(); ErrorDetails errorDetails = ErrorDetails.builder().setRawErrorMessages(ImmutableList.of(Any.pack(debugInfo))).build(); return ApiExceptionFactory.createException(