diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java index 5fce18ae52..35ad97ffea 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java @@ -31,7 +31,7 @@ @ParametersAreNonnullByDefault final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel { - private final ResumableSession session; + private final JsonResumableSession session; private final SettableApiFuture result; private final LongConsumer committedBytesCallback; @@ -57,7 +57,7 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException if (!open) { throw new ClosedChannelException(); } - RewindableHttpContent content = RewindableHttpContent.of(Utils.subArray(srcs, offset, length)); + RewindableContent content = RewindableContent.of(Utils.subArray(srcs, offset, length)); long available = content.getLength(); long newFinalByteOffset = cumulativeByteCount + available; final HttpContentRange header; @@ -96,7 +96,7 @@ public void close() throws IOException { if (!finished) { try { ResumableOperationResult<@Nullable StorageObject> operationResult = - session.put(RewindableHttpContent.empty(), HttpContentRange.of(cumulativeByteCount)); + session.put(RewindableContent.empty(), HttpContentRange.of(cumulativeByteCount)); long persistedSize = operationResult.getPersistedSize(); committedBytesCallback.accept(persistedSize); result.set(operationResult.getObject()); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java index 98b31a3ded..cbdbd94d67 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ByteSizeConstants.java @@ -28,6 +28,9 @@ final class ByteSizeConstants { static final int _2MiB = 2 * _1MiB; static final int _16MiB = 16 * _1MiB; static final int _32MiB = 32 * _1MiB; + static final long _1GiB = 1024 * _1MiB; + static final long _1TiB = 1024 * _1GiB; + static final long _5TiB = 5 * _1TiB; static final long _128KiBL = 131072L; static final long _256KiBL = 262144L; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java index e39bf85d33..80bca1c572 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicWritableByteChannelSessionBuilder.java @@ -20,6 +20,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; +import com.google.api.core.InternalApi; import com.google.api.core.SettableApiFuture; import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.rpc.ClientStreamingCallable; @@ -219,10 +220,12 @@ final class ResumableUploadBuilder { private RetryingDependencies deps; private ResultRetryAlgorithm alg; + private boolean fsyncEvery; ResumableUploadBuilder() { this.deps = RetryingDependencies.attemptOnce(); this.alg = Retrying.neverRetry(); + this.fsyncEvery = true; } ResumableUploadBuilder withRetryConfig(RetryingDependencies deps, ResultRetryAlgorithm alg) { @@ -231,6 +234,12 @@ ResumableUploadBuilder withRetryConfig(RetryingDependencies deps, ResultRetryAlg return this; } + @InternalApi + ResumableUploadBuilder setFsyncEvery(boolean fsyncEvery) { + this.fsyncEvery = fsyncEvery; + return this; + } + /** * Do not apply any intermediate buffering. Any call to {@link * java.nio.channels.WritableByteChannel#write(ByteBuffer)} will be segmented as is and sent to @@ -281,7 +290,10 @@ UnbufferedWritableByteChannelSession build() { return new UnbufferedWriteSession<>( requireNonNull(start, "start must be non null"), bindFunction( - WriteFlushStrategy.fsyncEveryFlush(write, deps, alg, Retrying::newCallContext), + fsyncEvery + ? WriteFlushStrategy.fsyncEveryFlush( + write, deps, alg, Retrying::newCallContext) + : WriteFlushStrategy.fsyncOnClose(write), ResumableWrite::identity) .andThen(StorageByteChannels.writable()::createSynchronized)); } @@ -310,7 +322,10 @@ BufferedWritableByteChannelSession build() { return new BufferedWriteSession<>( requireNonNull(start, "start must be non null"), bindFunction( - WriteFlushStrategy.fsyncEveryFlush(write, deps, alg, Retrying::newCallContext), + fsyncEvery + ? WriteFlushStrategy.fsyncEveryFlush( + write, deps, alg, Retrying::newCallContext) + : WriteFlushStrategy.fsyncOnClose(write), ResumableWrite::identity) .andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c)) .andThen(StorageByteChannels.writable()::createSynchronized)); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcResumableSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcResumableSession.java new file mode 100644 index 0000000000..8de44fb654 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcResumableSession.java @@ -0,0 +1,123 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.storage; + +import com.google.api.core.ApiFutures; +import com.google.api.gax.grpc.GrpcCallContext; +import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.rpc.ClientStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; +import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel; +import com.google.cloud.storage.Conversions.Decoder; +import com.google.cloud.storage.Retrying.RetryingDependencies; +import com.google.storage.v2.Object; +import com.google.storage.v2.QueryWriteStatusRequest; +import com.google.storage.v2.QueryWriteStatusResponse; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; +import java.util.concurrent.atomic.AtomicBoolean; +import org.checkerframework.checker.nullness.qual.Nullable; + +final class GrpcResumableSession { + + private final RetryingDependencies deps; + private final ResultRetryAlgorithm alg; + private final ClientStreamingCallable writeCallable; + private final UnaryCallable + queryWriteStatusCallable; + private final ResumableWrite resumableWrite; + private final Hasher hasher; + + GrpcResumableSession( + RetryingDependencies deps, + ResultRetryAlgorithm alg, + ClientStreamingCallable writeCallable, + UnaryCallable queryWriteStatusCallable, + ResumableWrite resumableWrite, + Hasher hasher) { + this.deps = deps; + this.alg = alg; + this.writeCallable = writeCallable; + this.queryWriteStatusCallable = queryWriteStatusCallable; + this.resumableWrite = resumableWrite; + this.hasher = hasher; + } + + ResumableOperationResult<@Nullable Object> query() { + QueryWriteStatusRequest.Builder b = + QueryWriteStatusRequest.newBuilder().setUploadId(resumableWrite.getRes().getUploadId()); + if (resumableWrite.getReq().hasCommonObjectRequestParams()) { + b.setCommonObjectRequestParams(resumableWrite.getReq().getCommonObjectRequestParams()); + } + QueryWriteStatusRequest req = b.build(); + try { + QueryWriteStatusResponse response = queryWriteStatusCallable.call(req); + if (response.hasResource()) { + return ResumableOperationResult.complete( + response.getResource(), response.getResource().getSize()); + } else { + return ResumableOperationResult.incremental(response.getPersistedSize()); + } + } catch (Exception e) { + throw StorageException.coalesce(e); + } + } + + ResumableOperationResult<@Nullable Object> put(RewindableContent content) { + AtomicBoolean dirty = new AtomicBoolean(false); + GrpcCallContext retryingCallContext = Retrying.newCallContext(); + BufferHandle handle = BufferHandle.allocate(ByteSizeConstants._2MiB); + + return Retrying.run( + deps, + alg, + () -> { + if (dirty.getAndSet(true)) { + ResumableOperationResult<@Nullable Object> query = query(); + if (query.getObject() != null) { + return query; + } else { + content.rewindTo(query.getPersistedSize()); + } + } + WritableByteChannelSession session = + ResumableMedia.gapic() + .write() + .byteChannel(writeCallable.withDefaultCallContext(retryingCallContext)) + .setByteStringStrategy(ByteStringStrategy.copy()) + .setHasher(hasher) + .resumable() + .setFsyncEvery(false) + .buffered(handle) + .setStartAsync(ApiFutures.immediateFuture(resumableWrite)) + .build(); + + try (BufferedWritableByteChannel channel = session.open()) { + content.writeTo(channel); + } + + WriteObjectResponse response = session.getResult().get(); + if (response.hasResource()) { + return ResumableOperationResult.complete( + response.getResource(), response.getResource().getSize()); + } else { + return ResumableOperationResult.incremental(response.getPersistedSize()); + } + }, + Decoder.identity()); + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index 14b4789901..d7d4059196 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -28,6 +28,7 @@ import static java.util.Objects.requireNonNull; import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutures; import com.google.api.core.BetaApi; import com.google.api.gax.grpc.GrpcCallContext; import com.google.api.gax.paging.AbstractPage; @@ -35,6 +36,7 @@ import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.ApiExceptions; +import com.google.api.gax.rpc.ClientStreamingCallable; import com.google.api.gax.rpc.NotFoundException; import com.google.api.gax.rpc.StatusCode; import com.google.api.gax.rpc.UnaryCallable; @@ -72,6 +74,7 @@ import com.google.common.collect.Streams; import com.google.common.io.BaseEncoding; import com.google.common.io.ByteStreams; +import com.google.common.util.concurrent.MoreExecutors; import com.google.iam.v1.GetIamPolicyRequest; import com.google.iam.v1.SetIamPolicyRequest; import com.google.iam.v1.TestIamPermissionsRequest; @@ -123,7 +126,6 @@ import java.nio.ByteBuffer; import java.nio.channels.Channels; import java.nio.channels.ReadableByteChannel; -import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; import java.nio.file.OpenOption; @@ -138,6 +140,7 @@ import java.util.Spliterator; import java.util.Spliterators.AbstractSpliterator; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; @@ -285,55 +288,34 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault()); WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts); - long size = Files.size(path); - if (size < bufferSize) { - // ignore the bufferSize argument if the file is smaller than it - GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext()); - return Retrying.run( - getOptions(), - retryAlgorithmManager.getFor(req), - () -> { - BufferedWritableByteChannelSession session = - ResumableMedia.gapic() - .write() - .byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge)) - .setHasher(Hasher.enabled()) - .setByteStringStrategy(ByteStringStrategy.noCopy()) - .direct() - .buffered(Buffers.allocate(size)) - .setRequest(req) - .build(); - - try (SeekableByteChannel src = Files.newByteChannel(path, READ_OPS); - BufferedWritableByteChannel dst = session.open()) { - ByteStreams.copy(src, dst); - } catch (Exception e) { - throw StorageException.coalesce(e); - } - return session.getResult(); - }, - this::getBlob); - } else { - ApiFuture start = startResumableWrite(grpcCallContext, req); - BufferedWritableByteChannelSession session = - ResumableMedia.gapic() - .write() - .byteChannel( - storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext)) - .setHasher(Hasher.noop()) - .setByteStringStrategy(ByteStringStrategy.noCopy()) - .resumable() - .withRetryConfig(getOptions(), retryAlgorithmManager.idempotent()) - .buffered(Buffers.allocateAligned(bufferSize, _256KiB)) - .setStartAsync(start) - .build(); - try (SeekableByteChannel src = Files.newByteChannel(path, READ_OPS); - BufferedWritableByteChannel dst = session.open()) { - ByteStreams.copy(src, dst); - } catch (Exception e) { - throw StorageException.coalesce(e); + ClientStreamingCallable write = + storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext); + + ApiFuture start = startResumableWrite(grpcCallContext, req); + ApiFuture session2 = + ApiFutures.transform( + start, + rw -> + ResumableSession.grpc( + getOptions(), + retryAlgorithmManager.idempotent(), + write, + storageClient.queryWriteStatusCallable(), + rw, + Hasher.noop()), + MoreExecutors.directExecutor()); + try { + GrpcResumableSession got = session2.get(); + ResumableOperationResult<@Nullable Object> put = got.put(RewindableContent.of(path)); + Object object = put.getObject(); + if (object == null) { + // if by some odd chance the put didn't get the Object, query for it + ResumableOperationResult<@Nullable Object> query = got.query(); + object = query.getObject(); } - return getBlob(session.getResult()); + return codecs.blobInfo().decode(object).asBlob(this); + } catch (InterruptedException | ExecutionException e) { + throw StorageException.coalesce(e); } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java index 72c1f2a156..68ce04320a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.checkerframework.checker.nullness.qual.Nullable; -final class JsonResumableSession extends ResumableSession { +final class JsonResumableSession { static final String SPAN_NAME_WRITE = String.format("Sent.%s.write", HttpStorageRpc.class.getName()); @@ -53,14 +53,12 @@ final class JsonResumableSession extends ResumableSession { * Not automatically retried. Usually called from within another retrying context. We don't yet * have the concept of nested retry handling. */ - @Override ResumableOperationResult<@Nullable StorageObject> query() { return new JsonResumableSessionQueryTask(context, resumableWrite.getUploadId()).call(); } - @Override ResumableOperationResult<@Nullable StorageObject> put( - RewindableHttpContent content, HttpContentRange contentRange) { + RewindableContent content, HttpContentRange contentRange) { JsonResumableSessionPutTask task = new JsonResumableSessionPutTask( context, resumableWrite.getUploadId(), content, contentRange); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java index d877bb90fa..6b55b0d173 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java @@ -37,7 +37,7 @@ final class JsonResumableSessionPutTask private final HttpClientContext context; private final String uploadId; - private final RewindableHttpContent content; + private final RewindableContent content; private final HttpContentRange originalContentRange; private HttpContentRange contentRange; @@ -46,7 +46,7 @@ final class JsonResumableSessionPutTask JsonResumableSessionPutTask( HttpClientContext httpClientContext, String uploadId, - RewindableHttpContent content, + RewindableContent content, HttpContentRange originalContentRange) { this.context = httpClientContext; this.uploadId = uploadId; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSession.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSession.java index 7b608c68a4..5c308f7fb9 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSession.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSession.java @@ -17,17 +17,17 @@ package com.google.cloud.storage; import com.google.api.gax.retrying.ResultRetryAlgorithm; +import com.google.api.gax.rpc.ClientStreamingCallable; +import com.google.api.gax.rpc.UnaryCallable; import com.google.cloud.storage.Retrying.RetryingDependencies; -import org.checkerframework.checker.nullness.qual.Nullable; +import com.google.storage.v2.QueryWriteStatusRequest; +import com.google.storage.v2.QueryWriteStatusResponse; +import com.google.storage.v2.WriteObjectRequest; +import com.google.storage.v2.WriteObjectResponse; -abstract class ResumableSession { +final class ResumableSession { - ResumableSession() {} - - abstract ResumableOperationResult<@Nullable T> put( - RewindableHttpContent content, HttpContentRange contentRange); - - abstract ResumableOperationResult<@Nullable T> query(); + private ResumableSession() {} static JsonResumableSession json( HttpClientContext context, @@ -36,4 +36,15 @@ static JsonResumableSession json( JsonResumableWrite resumableWrite) { return new JsonResumableSession(context, deps, alg, resumableWrite); } + + static GrpcResumableSession grpc( + RetryingDependencies deps, + ResultRetryAlgorithm alg, + ClientStreamingCallable writeCallable, + UnaryCallable queryWriteStatusCallable, + ResumableWrite resumableWrite, + Hasher hasher) { + return new GrpcResumableSession( + deps, alg, writeCallable, queryWriteStatusCallable, resumableWrite, hasher); + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableHttpContent.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java similarity index 72% rename from google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableHttpContent.java rename to google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java index 594c0a21c7..e231b50171 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableHttpContent.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/RewindableContent.java @@ -25,6 +25,7 @@ import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.Channels; +import java.nio.channels.GatheringByteChannel; import java.nio.channels.SeekableByteChannel; import java.nio.channels.WritableByteChannel; import java.nio.file.Files; @@ -32,9 +33,9 @@ import java.nio.file.StandardOpenOption; import java.util.Arrays; -abstract class RewindableHttpContent extends AbstractHttpContent { +abstract class RewindableContent extends AbstractHttpContent { - private RewindableHttpContent() { + private RewindableContent() { super((HttpMediaType) null); } @@ -43,24 +44,28 @@ private RewindableHttpContent() { abstract void rewindTo(long offset); + abstract long writeTo(WritableByteChannel gbc) throws IOException; + + abstract long writeTo(GatheringByteChannel gbc) throws IOException; + @Override public final boolean retrySupported() { return false; } - static RewindableHttpContent empty() { + static RewindableContent empty() { return EmptyRewindableContent.INSTANCE; } - static RewindableHttpContent of(ByteBuffer... buffers) { - return new ByteBufferHttpContent(buffers); + static RewindableContent of(ByteBuffer... buffers) { + return new ByteBufferContent(buffers); } - static RewindableHttpContent of(Path path) throws IOException { - return new PathRewindableHttpContent(path); + static RewindableContent of(Path path) throws IOException { + return new PathRewindableContent(path); } - private static final class EmptyRewindableContent extends RewindableHttpContent { + private static final class EmptyRewindableContent extends RewindableContent { private static final EmptyRewindableContent INSTANCE = new EmptyRewindableContent(); @Override @@ -73,18 +78,28 @@ public void writeTo(OutputStream out) throws IOException { out.flush(); } + @Override + long writeTo(WritableByteChannel gbc) { + return 0; + } + + @Override + long writeTo(GatheringByteChannel gbc) { + return 0; + } + @Override protected void rewindTo(long offset) {} } - private static final class PathRewindableHttpContent extends RewindableHttpContent { + private static final class PathRewindableContent extends RewindableContent { private final Path path; private final long size; private long readOffset; - private PathRewindableHttpContent(Path path) throws IOException { + private PathRewindableContent(Path path) throws IOException { this.path = path; this.size = Files.size(path); this.readOffset = 0; @@ -110,9 +125,25 @@ public void writeTo(OutputStream out) throws IOException { out.flush(); } } + + @Override + long writeTo(WritableByteChannel gbc) throws IOException { + try (SeekableByteChannel in = Files.newByteChannel(path, StandardOpenOption.READ)) { + in.position(readOffset); + return ByteStreams.copy(in, gbc); + } + } + + @Override + long writeTo(GatheringByteChannel gbc) throws IOException { + try (SeekableByteChannel in = Files.newByteChannel(path, StandardOpenOption.READ)) { + in.position(readOffset); + return ByteStreams.copy(in, gbc); + } + } } - private static final class ByteBufferHttpContent extends RewindableHttpContent { + private static final class ByteBufferContent extends RewindableContent { private final ByteBuffer[] buffers; // keep an array of the positions in case we need to rewind them for retries @@ -126,7 +157,7 @@ private static final class ByteBufferHttpContent extends RewindableHttpContent { private long offset; - private ByteBufferHttpContent(ByteBuffer[] buffers) { + private ByteBufferContent(ByteBuffer[] buffers) { this.buffers = buffers; this.positions = Arrays.stream(buffers).mapToInt(Buffers::position).toArray(); this.totalLength = Arrays.stream(buffers).mapToLong(Buffer::remaining).sum(); @@ -148,6 +179,22 @@ public void writeTo(OutputStream out) throws IOException { out.flush(); } + @Override + long writeTo(WritableByteChannel gbc) throws IOException { + dirty = true; + int retVal = 0; + for (ByteBuffer buffer : buffers) { + retVal += gbc.write(buffer); + } + return retVal; + } + + @Override + long writeTo(GatheringByteChannel gbc) throws IOException { + dirty = true; + return gbc.write(buffers); + } + @Override void rewindTo(long offset) { Preconditions.checkArgument( diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java index fc8601cb6c..fd387717b1 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java @@ -255,7 +255,7 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp HttpContentRange contentRange = HttpContentRange.of(ByteRangeSpec.relativeLength(0L, size), size); ResumableOperationResult put = - session.put(RewindableHttpContent.of(path), contentRange); + session.put(RewindableContent.of(path), contentRange); // all exception translation is taken care of down in the JsonResumableSession StorageObject object = put.getObject(); if (object == null) { diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcStorageImplUploadRetryTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcStorageImplUploadRetryTest.java index ac192a1ff5..bcfb8b6088 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcStorageImplUploadRetryTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGrpcStorageImplUploadRetryTest.java @@ -29,6 +29,8 @@ import com.google.storage.v2.ChecksummedData; import com.google.storage.v2.Object; import com.google.storage.v2.ObjectChecksums; +import com.google.storage.v2.QueryWriteStatusRequest; +import com.google.storage.v2.QueryWriteStatusResponse; import com.google.storage.v2.StartResumableWriteRequest; import com.google.storage.v2.StartResumableWriteResponse; import com.google.storage.v2.StorageGrpc.StorageImplBase; @@ -287,6 +289,14 @@ public void startResumableWrite( } } + @Override + public void queryWriteStatus( + QueryWriteStatusRequest request, + StreamObserver responseObserver) { + responseObserver.onNext(QueryWriteStatusResponse.newBuilder().setPersistedSize(0).build()); + responseObserver.onCompleted(); + } + // a bit of constructor lifecycle hackery to appease the compiler // Even though the thing past to super() is a lazy function, the closing over of the outer // fields happens earlier than they are available. To side step this fact, we provide the diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionPutTaskTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionPutTaskTest.java index 18704dc288..b8719ee1b1 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionPutTaskTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionPutTaskTest.java @@ -98,7 +98,7 @@ public void emptyObjectHappyPath() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(ByteRangeSpec.explicitClosed(0L, 0L), 0)); ResumableOperationResult<@Nullable StorageObject> operationResult = task.call(); @@ -144,7 +144,7 @@ public void scenario9() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(ByteRangeSpec.explicitClosed(0L, 10L))); StorageException se = assertThrows(StorageException.class, task::call); @@ -191,7 +191,7 @@ public void scenario7() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(ByteRangeSpec.explicitClosed(0L, 10L))); StorageException se = assertThrows(StorageException.class, task::call); @@ -272,7 +272,7 @@ public void scenario1() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.of(tmpFile.getPath()), + RewindableContent.of(tmpFile.getPath()), HttpContentRange.of(ByteRangeSpec.explicit(0L, _256KiBL))); StorageException se = assertThrows(StorageException.class, task::call); @@ -341,7 +341,7 @@ public void scenario2() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(_256KiBL)); StorageException se = assertThrows(StorageException.class, task::call); @@ -410,7 +410,7 @@ public void scenario3() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(_512KiBL)); StorageException se = assertThrows(StorageException.class, task::call); @@ -488,7 +488,7 @@ public void scenario4() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(_256KiBL)); ResumableOperationResult<@Nullable StorageObject> operationResult = task.call(); @@ -570,7 +570,7 @@ public void scenario4_1() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(_512KiBL)); StorageException se = assertThrows(StorageException.class, task::call); @@ -650,7 +650,7 @@ public void scenario4_2() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.empty(), + RewindableContent.empty(), HttpContentRange.of(_128KiBL)); StorageException se = assertThrows(StorageException.class, task::call); @@ -728,7 +728,7 @@ public void scenario5() throws Exception { new JsonResumableSessionPutTask( httpClientContext, uploadUrl, - RewindableHttpContent.of(tmpFile.getPath()), + RewindableContent.of(tmpFile.getPath()), HttpContentRange.of(ByteRangeSpec.explicit(_512KiBL, _768KiBL))); StorageException se = assertThrows(StorageException.class, task::call); @@ -768,7 +768,7 @@ public void jsonParseFailure() throws Exception { JsonResumableSessionPutTask task = new JsonResumableSessionPutTask( - httpClientContext, uploadUrl, RewindableHttpContent.empty(), HttpContentRange.of(0)); + httpClientContext, uploadUrl, RewindableContent.empty(), HttpContentRange.of(0)); StorageException se = assertThrows(StorageException.class, task::call); // the parse error happens while trying to read the success object, make sure we raise it as @@ -803,7 +803,7 @@ public void jsonDeserializationOnlyAttemptedWhenContentPresent() throws Exceptio JsonResumableSessionPutTask task = new JsonResumableSessionPutTask( - httpClientContext, uploadUrl, RewindableHttpContent.empty(), HttpContentRange.of(0)); + httpClientContext, uploadUrl, RewindableContent.empty(), HttpContentRange.of(0)); ResumableOperationResult<@Nullable StorageObject> operationResult = task.call(); StorageObject call = operationResult.getObject(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java index 4b2dc04880..1dd4025335 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITJsonResumableSessionTest.java @@ -115,7 +115,7 @@ public void rewindWillQueryStatusOnlyWhenDirty() throws Exception { httpClientContext, RETRYING_DEPENDENCIES, RETRY_ALGORITHM, resumableWrite); ResumableOperationResult<@Nullable StorageObject> operationResult = - session.put(RewindableHttpContent.of(tmpFile.getPath()), range1); + session.put(RewindableContent.of(tmpFile.getPath()), range1); StorageObject call = operationResult.getObject(); assertThat(call).isNull(); assertThat(operationResult.getPersistedSize()).isEqualTo(_512KiBL); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableByteBufferContentTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableByteBufferContentTest.java index 9d65a00dcb..fe867e6f2f 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableByteBufferContentTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableByteBufferContentTest.java @@ -21,7 +21,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; -import com.google.cloud.storage.RewindableHttpContentPropertyTest.ErroringOutputStream; +import com.google.cloud.storage.RewindableContentPropertyTest.ErroringOutputStream; import com.google.protobuf.ByteString; import java.io.ByteArrayOutputStream; import java.io.IOException; @@ -61,7 +61,7 @@ public void setUp() throws Exception { @Test public void getLength() { - RewindableHttpContent content = RewindableHttpContent.of(buffers); + RewindableContent content = RewindableContent.of(buffers); assertThat(content.getLength()).isEqualTo(total); } @@ -69,7 +69,7 @@ public void getLength() { @Test public void writeTo() throws IOException { - RewindableHttpContent content = RewindableHttpContent.of(buffers); + RewindableContent content = RewindableContent.of(buffers); ByteArrayOutputStream baos = new ByteArrayOutputStream(); content.writeTo(baos); @@ -81,7 +81,7 @@ public void writeTo() throws IOException { @Test public void rewind() throws IOException { - RewindableHttpContent content = RewindableHttpContent.of(buffers); + RewindableContent content = RewindableContent.of(buffers); assertThrows( IOException.class, @@ -100,7 +100,7 @@ public void rewind() throws IOException { @Test public void rewindTo() throws Exception { - RewindableHttpContent content = RewindableHttpContent.of(buffers); + RewindableContent content = RewindableContent.of(buffers); ByteString reduce = Arrays.stream(buffers) @@ -135,7 +135,7 @@ public void rewind_dirtyAware() throws IOException { int position = buf.position(); int limit = buf.limit(); - RewindableHttpContent content = RewindableHttpContent.of(buf); + RewindableContent content = RewindableContent.of(buf); int hackPosition = 2; // after content has initialized, mutate the position underneath it. We're doing this to detect // if rewind is actually modifying things. It shouldn't until the content is dirtied by calling diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableHttpContentPropertyTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java similarity index 98% rename from google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableHttpContentPropertyTest.java rename to google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java index f8e18f2866..48d29bc8c8 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableHttpContentPropertyTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/RewindableContentPropertyTest.java @@ -41,12 +41,12 @@ import net.jqwik.api.RandomDistribution; import org.checkerframework.checker.nullness.qual.NonNull; -final class RewindableHttpContentPropertyTest { +final class RewindableContentPropertyTest { @Property void path(@ForAll("PathScenario") PathScenario pathScenario) throws Exception { try (PathScenario s = pathScenario) { - RewindableHttpContent content = RewindableHttpContent.of(s.getPath()); + RewindableContent content = RewindableContent.of(s.getPath()); assertThrows( IOException.class, () -> { @@ -68,7 +68,7 @@ void path(@ForAll("PathScenario") PathScenario pathScenario) throws Exception { @Property void byteBuffers(@ForAll("ByteBuffersScenario") ByteBuffersScenario s) throws IOException { - RewindableHttpContent content = RewindableHttpContent.of(s.getBuffers()); + RewindableContent content = RewindableContent.of(s.getBuffers()); assertThat(content.getLength()).isEqualTo(s.getFullLength()); assertThrows( IOException.class, diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java index d8e026cbc0..12bbf3df5d 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITObjectChecksumSupportTest.java @@ -54,7 +54,7 @@ @RunWith(StorageITRunner.class) @CrossRun( - transports = {Transport.HTTP /*, Transport.GRPC*/}, + transports = {Transport.HTTP, Transport.GRPC}, backends = Backend.PROD) @Parameterized(ChecksummedTestContentProvider.class) public final class ITObjectChecksumSupportTest {