From 8c7404dea8ef16a7b61accad4fe614e033a5ad2c Mon Sep 17 00:00:00 2001 From: JesseLovelace <43148100+JesseLovelace@users.noreply.github.com> Date: Sun, 14 Apr 2024 16:42:35 -0700 Subject: [PATCH] feat: Adds a ZeroCopy response marshaller for grpc ReadObject handling (#2489) * feat: Adds a ZeroCopy response marshaller for grpc ReadObject handling * Update google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java Co-authored-by: BenWhitehead * add copyright header * Apply suggestions from code review Co-authored-by: BenWhitehead * Extract classes to own files, more StorageClient initialization * copyright headers on new files * formatter * one more lint issue * fix: improve GrpcStorageOptions.ReadObjectResponseZeroCopyMessageMarshaller#close() handling of multiple IOExceptions * clean up ZeroCopyMarshallerTest * add gprc core to pom --------- Co-authored-by: BenWhitehead --- google-cloud-storage/pom.xml | 8 + .../storage/GapicDownloadSessionBuilder.java | 16 +- .../GapicUnbufferedReadableByteChannel.java | 79 ++--- .../cloud/storage/GrpcBlobReadChannel.java | 5 +- .../google/cloud/storage/GrpcStorageImpl.java | 9 +- .../cloud/storage/GrpcStorageOptions.java | 299 +++++++++++++++++- .../java/com/google/cloud/storage/Hasher.java | 25 +- .../com/google/cloud/storage/ReadCursor.java | 52 +++ .../ResponseContentLifecycleHandle.java | 57 ++++ .../ResponseContentLifecycleManager.java | 31 ++ ...apicUnbufferedReadableByteChannelTest.java | 15 +- .../ITGzipReadableByteChannelTest.java | 22 +- .../storage/TransportCompatibilityTest.java | 4 +- .../cloud/storage/ZeroCopyMarshallerTest.java | 223 +++++++++++++ 14 files changed, 767 insertions(+), 78 deletions(-) create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java create mode 100644 google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java create mode 100644 google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java diff --git a/google-cloud-storage/pom.xml b/google-cloud-storage/pom.xml index 7fbba0aeeb..c6219b776d 100644 --- a/google-cloud-storage/pom.xml +++ b/google-cloud-storage/pom.xml @@ -96,6 +96,14 @@ com.google.protobuf protobuf-java-util + + io.grpc + grpc-core + + + io.grpc + grpc-protobuf + com.google.api.grpc proto-google-common-protos diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java index 704794a389..82789bcd2f 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java @@ -48,19 +48,23 @@ public static GapicDownloadSessionBuilder create() { * ultimately produced channel will not do any retries of its own. */ public ReadableByteChannelSessionBuilder byteChannel( - ServerStreamingCallable read) { - return new ReadableByteChannelSessionBuilder(read); + ServerStreamingCallable read, + ResponseContentLifecycleManager responseContentLifecycleManager) { + return new ReadableByteChannelSessionBuilder(read, responseContentLifecycleManager); } public static final class ReadableByteChannelSessionBuilder { private final ServerStreamingCallable read; + private final ResponseContentLifecycleManager responseContentLifecycleManager; private boolean autoGzipDecompression; private Hasher hasher; private ReadableByteChannelSessionBuilder( - ServerStreamingCallable read) { + ServerStreamingCallable read, + ResponseContentLifecycleManager responseContentLifecycleManager) { this.read = read; + this.responseContentLifecycleManager = responseContentLifecycleManager; this.hasher = Hasher.noop(); this.autoGzipDecompression = false; } @@ -100,11 +104,13 @@ public UnbufferedReadableByteChannelSessionBuilder unbuffered() { return (object, resultFuture) -> { if (autoGzipDecompression) { return new GzipReadableByteChannel( - new GapicUnbufferedReadableByteChannel(resultFuture, read, object, hasher), + new GapicUnbufferedReadableByteChannel( + resultFuture, read, object, hasher, responseContentLifecycleManager), ApiFutures.transform( resultFuture, Object::getContentEncoding, MoreExecutors.directExecutor())); } else { - return new GapicUnbufferedReadableByteChannel(resultFuture, read, object, hasher); + return new GapicUnbufferedReadableByteChannel( + resultFuture, read, object, hasher, responseContentLifecycleManager); } }; } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java index 4b19c3f998..6cecc8dded 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java @@ -16,8 +16,6 @@ package com.google.cloud.storage; -import static com.google.common.base.Preconditions.checkArgument; - import com.google.api.client.http.HttpStatusCodes; import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; @@ -25,6 +23,7 @@ import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown; import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel; +import com.google.protobuf.ByteString; import com.google.storage.v2.ChecksummedData; import com.google.storage.v2.Object; import com.google.storage.v2.ReadObjectRequest; @@ -46,6 +45,7 @@ final class GapicUnbufferedReadableByteChannel private final ReadObjectRequest req; private final Hasher hasher; private final LazyServerStreamIterator iter; + private final ResponseContentLifecycleManager rclm; private boolean open = true; private boolean complete = false; @@ -53,18 +53,20 @@ final class GapicUnbufferedReadableByteChannel private Object metadata; - private ByteBuffer leftovers; + private ResponseContentLifecycleHandle leftovers; GapicUnbufferedReadableByteChannel( SettableApiFuture result, ServerStreamingCallable read, ReadObjectRequest req, - Hasher hasher) { + Hasher hasher, + ResponseContentLifecycleManager rclm) { this.result = result; this.read = read; this.req = req; this.hasher = hasher; this.blobOffset = req.getReadOffset(); + this.rclm = rclm; this.iter = new LazyServerStreamIterator(); } @@ -82,8 +84,9 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { ReadCursor c = new ReadCursor(blobOffset, blobOffset + totalBufferCapacity); while (c.hasRemaining()) { if (leftovers != null) { - copy(c, leftovers, dsts, offset, length); + leftovers.copy(c, dsts, offset, length); if (!leftovers.hasRemaining()) { + leftovers.close(); leftovers = null; } continue; @@ -91,6 +94,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { if (iter.hasNext()) { ReadObjectResponse resp = iter.next(); + ResponseContentLifecycleHandle handle = rclm.get(resp); if (resp.hasMetadata()) { Object respMetadata = resp.getMetadata(); if (metadata == null) { @@ -107,22 +111,24 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { } } ChecksummedData checksummedData = resp.getChecksummedData(); - ByteBuffer content = checksummedData.getContent().asReadOnlyByteBuffer(); - // very important to know whether a crc32c value is set. Without checking, protobuf will + ByteString content = checksummedData.getContent(); + int contentSize = content.size(); + // Very important to know whether a crc32c value is set. Without checking, protobuf will // happily return 0, which is a valid crc32c value. if (checksummedData.hasCrc32C()) { - Crc32cLengthKnown expected = - Crc32cValue.of(checksummedData.getCrc32C(), checksummedData.getContent().size()); + Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize); try { - hasher.validate(expected, content::duplicate); + hasher.validate(expected, content.asReadOnlyByteBufferList()); } catch (IOException e) { close(); throw e; } } - copy(c, content, dsts, offset, length); - if (content.hasRemaining()) { - leftovers = content; + handle.copy(c, dsts, offset, length); + if (handle.hasRemaining()) { + leftovers = handle; + } else { + handle.close(); } } else { complete = true; @@ -144,18 +150,19 @@ public boolean isOpen() { @Override public void close() throws IOException { open = false; - iter.close(); + try { + if (leftovers != null) { + leftovers.close(); + } + } finally { + iter.close(); + } } ApiFuture getResult() { return result; } - private void copy(ReadCursor c, ByteBuffer content, ByteBuffer[] dsts, int offset, int length) { - long copiedBytes = Buffers.copy(content, dsts, offset, length); - c.advance(copiedBytes); - } - private IOException closeWithError(String message) throws IOException { close(); StorageException cause = @@ -163,40 +170,6 @@ private IOException closeWithError(String message) throws IOException { throw new IOException(message, cause); } - /** - * Shrink wraps a beginning, offset and limit for tracking state of an individual invocation of - * {@link #read} - */ - private static final class ReadCursor { - private final long beginning; - private long offset; - private final long limit; - - private ReadCursor(long beginning, long limit) { - this.limit = limit; - this.beginning = beginning; - this.offset = beginning; - } - - public boolean hasRemaining() { - return limit - offset > 0; - } - - public void advance(long incr) { - checkArgument(incr >= 0); - offset += incr; - } - - public long read() { - return offset - beginning; - } - - @Override - public String toString() { - return String.format("ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit); - } - } - private final class LazyServerStreamIterator implements Iterator, Closeable { private ServerStream serverStream; private Iterator responseIterator; diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java index 4ae3f24466..03e3cb517a 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java @@ -28,15 +28,18 @@ final class GrpcBlobReadChannel extends BaseStorageReadChannel { private final ServerStreamingCallable read; + private final ResponseContentLifecycleManager responseContentLifecycleManager; private final ReadObjectRequest request; private final boolean autoGzipDecompression; GrpcBlobReadChannel( ServerStreamingCallable read, + ResponseContentLifecycleManager responseContentLifecycleManager, ReadObjectRequest request, boolean autoGzipDecompression) { super(Conversions.grpc().blobInfo()); this.read = read; + this.responseContentLifecycleManager = responseContentLifecycleManager; this.request = request; this.autoGzipDecompression = autoGzipDecompression; } @@ -53,7 +56,7 @@ protected LazyReadChannel newLazyReadChannel() { ReadableByteChannelSessionBuilder b = ResumableMedia.gapic() .read() - .byteChannel(read) + .byteChannel(read, responseContentLifecycleManager) .setHasher(Hasher.noop()) .setAutoGzipDecompression(autoGzipDecompression); BufferHandle bufferHandle = getBufferHandle(); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java index e9857e93d5..e5e81c060e 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java @@ -180,6 +180,7 @@ final class GrpcStorageImpl extends BaseService .collect(ImmutableSet.toImmutableSet()))); final StorageClient storageClient; + final ResponseContentLifecycleManager responseContentLifecycleManager; final WriterFactory writerFactory; final GrpcConversions codecs; final GrpcRetryAlgorithmManager retryAlgorithmManager; @@ -192,10 +193,12 @@ final class GrpcStorageImpl extends BaseService GrpcStorageImpl( GrpcStorageOptions options, StorageClient storageClient, + ResponseContentLifecycleManager responseContentLifecycleManager, WriterFactory writerFactory, Opts defaultOpts) { super(options); this.storageClient = storageClient; + this.responseContentLifecycleManager = responseContentLifecycleManager; this.writerFactory = writerFactory; this.defaultOpts = defaultOpts; this.codecs = Conversions.grpc(); @@ -716,8 +719,10 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) { ReadObjectRequest request = getReadObjectRequest(blob, opts); Set codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request)); GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes); + return new GrpcBlobReadChannel( storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext), + responseContentLifecycleManager, request, !opts.autoGzipDecompression()); } @@ -1868,7 +1873,9 @@ private UnbufferedReadableByteChannelSession unbufferedReadSession( opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes)); return ResumableMedia.gapic() .read() - .byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext)) + .byteChannel( + storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext), + responseContentLifecycleManager) .setAutoGzipDecompression(!opts.autoGzipDecompression()) .unbuffered() .setReadObjectRequest(readObjectRequest) diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java index 841343d311..438b5fd595 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java @@ -25,14 +25,20 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.core.FixedCredentialsProvider; import com.google.api.gax.core.NoCredentialsProvider; +import com.google.api.gax.grpc.GrpcCallSettings; import com.google.api.gax.grpc.GrpcInterceptorProvider; +import com.google.api.gax.grpc.GrpcStubCallableFactory; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.retrying.StreamResumptionStrategy; +import com.google.api.gax.rpc.ClientContext; import com.google.api.gax.rpc.HeaderProvider; import com.google.api.gax.rpc.NoHeaderProvider; +import com.google.api.gax.rpc.RequestParamsBuilder; +import com.google.api.gax.rpc.ServerStreamingCallable; import com.google.api.gax.rpc.StatusCode.Code; import com.google.api.gax.rpc.internal.QuotaProjectIdHidingCredentials; +import com.google.api.pathtemplate.PathTemplate; import com.google.auth.Credentials; import com.google.cloud.NoCredentials; import com.google.cloud.ServiceFactory; @@ -47,25 +53,50 @@ import com.google.cloud.storage.UnifiedOpts.Opts; import com.google.cloud.storage.UnifiedOpts.UserProject; import com.google.cloud.storage.spi.StorageRpcFactory; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; +import com.google.protobuf.InvalidProtocolBufferException; +import com.google.protobuf.MessageLite; +import com.google.protobuf.Parser; +import com.google.protobuf.UnsafeByteOperations; import com.google.storage.v2.ReadObjectRequest; import com.google.storage.v2.ReadObjectResponse; import com.google.storage.v2.StorageClient; import com.google.storage.v2.StorageSettings; +import com.google.storage.v2.stub.GrpcStorageCallableFactory; +import com.google.storage.v2.stub.GrpcStorageStub; +import com.google.storage.v2.stub.StorageStub; +import com.google.storage.v2.stub.StorageStubSettings; import io.grpc.ClientInterceptor; +import io.grpc.Detachable; +import io.grpc.HasByteBuffer; +import io.grpc.KnownLength; import io.grpc.ManagedChannelBuilder; +import io.grpc.MethodDescriptor; +import io.grpc.Status; +import io.grpc.protobuf.ProtoUtils; +import java.io.Closeable; import java.io.IOException; +import java.io.InputStream; import java.io.Serializable; import java.net.URI; +import java.nio.ByteBuffer; import java.time.Clock; +import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.IdentityHashMap; import java.util.List; import java.util.Locale; import java.util.Map; import java.util.Map.Entry; +import java.util.Objects; import java.util.Set; import org.checkerframework.checker.nullness.qual.NonNull; import org.checkerframework.checker.nullness.qual.Nullable; @@ -658,11 +689,32 @@ public Storage create(StorageOptions options) { Tuple> t = grpcStorageOptions.resolveSettingsAndOpts(); StorageSettings storageSettings = t.x(); Opts defaultOpts = t.y(); - return new GrpcStorageImpl( - grpcStorageOptions, - StorageClient.create(storageSettings), - grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), - defaultOpts); + if (ZeroCopyReadinessChecker.isReady()) { + StorageStubSettings stubSettings = + (StorageStubSettings) storageSettings.getStubSettings(); + ClientContext clientContext = ClientContext.create(stubSettings); + GrpcStorageCallableFactory grpcStorageCallableFactory = + new GrpcStorageCallableFactory(); + InternalZeroCopyGrpcStorageStub stub = + new InternalZeroCopyGrpcStorageStub( + stubSettings, clientContext, grpcStorageCallableFactory); + StorageClient client = new InternalStorageClient(stub); + return new GrpcStorageImpl( + grpcStorageOptions, + client, + stub.getObjectMediaResponseMarshaller, + grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), + defaultOpts); + } else { + StorageClient client = StorageClient.create(storageSettings); + return new GrpcStorageImpl( + grpcStorageOptions, + client, + ResponseContentLifecycleManager.noop(), + grpcStorageOptions.blobWriteSessionConfig.createFactory(Clock.systemUTC()), + defaultOpts); + } + } catch (IOException e) { throw new IllegalStateException( "Unable to instantiate gRPC com.google.cloud.storage.Storage client.", e); @@ -780,4 +832,241 @@ private Object readResolve() { return INSTANCE; } } + + private static final class InternalStorageClient extends StorageClient { + + private InternalStorageClient(StorageStub stub) { + super(stub); + } + } + + private static final class InternalZeroCopyGrpcStorageStub extends GrpcStorageStub + implements AutoCloseable { + private final ReadObjectResponseZeroCopyMessageMarshaller getObjectMediaResponseMarshaller; + + private final ServerStreamingCallable + serverStreamingCallable; + + private InternalZeroCopyGrpcStorageStub( + StorageStubSettings settings, + ClientContext clientContext, + GrpcStubCallableFactory callableFactory) + throws IOException { + super(settings, clientContext, callableFactory); + + this.getObjectMediaResponseMarshaller = + new ReadObjectResponseZeroCopyMessageMarshaller(ReadObjectResponse.getDefaultInstance()); + + MethodDescriptor readObjectMethodDescriptor = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.SERVER_STREAMING) + .setFullMethodName("google.storage.v2.Storage/ReadObject") + .setRequestMarshaller(ProtoUtils.marshaller(ReadObjectRequest.getDefaultInstance())) + .setResponseMarshaller(getObjectMediaResponseMarshaller) + .build(); + + GrpcCallSettings readObjectTransportSettings = + GrpcCallSettings.newBuilder() + .setMethodDescriptor(readObjectMethodDescriptor) + .setParamsExtractor( + request -> { + RequestParamsBuilder builder = RequestParamsBuilder.create(); + // todo: this is fragile to proto annotation changes, and would require manual + // maintenance + builder.add(request.getBucket(), "bucket", PathTemplate.create("{bucket=**}")); + return builder.build(); + }) + .build(); + + this.serverStreamingCallable = + callableFactory.createServerStreamingCallable( + readObjectTransportSettings, settings.readObjectSettings(), clientContext); + } + + @Override + public ServerStreamingCallable readObjectCallable() { + return serverStreamingCallable; + } + } + + @VisibleForTesting + static class ReadObjectResponseZeroCopyMessageMarshaller + implements MethodDescriptor.PrototypeMarshaller, + ResponseContentLifecycleManager, + Closeable { + private final Map unclosedStreams; + private final Parser parser; + private final MethodDescriptor.PrototypeMarshaller baseMarshaller; + + ReadObjectResponseZeroCopyMessageMarshaller(ReadObjectResponse defaultInstance) { + parser = defaultInstance.getParserForType(); + baseMarshaller = + (MethodDescriptor.PrototypeMarshaller) + ProtoUtils.marshaller(defaultInstance); + unclosedStreams = Collections.synchronizedMap(new IdentityHashMap<>()); + } + + @Override + public Class getMessageClass() { + return baseMarshaller.getMessageClass(); + } + + @Override + public ReadObjectResponse getMessagePrototype() { + return baseMarshaller.getMessagePrototype(); + } + + @Override + public InputStream stream(ReadObjectResponse value) { + return baseMarshaller.stream(value); + } + + @Override + public ReadObjectResponse parse(InputStream stream) { + CodedInputStream cis = null; + try { + if (stream instanceof KnownLength + && stream instanceof Detachable + && stream instanceof HasByteBuffer + && ((HasByteBuffer) stream).byteBufferSupported()) { + int size = stream.available(); + // Stream is now detached here and should be closed later. + stream = ((Detachable) stream).detach(); + // This mark call is to keep buffer while traversing buffers using skip. + stream.mark(size); + List byteStrings = new ArrayList<>(); + while (stream.available() != 0) { + ByteBuffer buffer = ((HasByteBuffer) stream).getByteBuffer(); + byteStrings.add(UnsafeByteOperations.unsafeWrap(buffer)); + stream.skip(buffer.remaining()); + } + stream.reset(); + cis = ByteString.copyFrom(byteStrings).newCodedInput(); + cis.enableAliasing(true); + cis.setSizeLimit(Integer.MAX_VALUE); + } + } catch (IOException e) { + throw Status.INTERNAL + .withDescription("Error parsing input stream for ReadObject") + .withCause(e) + .asRuntimeException(); + } + if (cis != null) { + // fast path (no memory copy) + ReadObjectResponse message; + try { + message = parseFrom(cis); + } catch (InvalidProtocolBufferException ipbe) { + throw Status.INTERNAL + .withDescription("Invalid protobuf byte sequence for ReadObject") + .withCause(ipbe) + .asRuntimeException(); + } + unclosedStreams.put(message, stream); + return message; + } else { + // slow path + return baseMarshaller.parse(stream); + } + } + + private ReadObjectResponse parseFrom(CodedInputStream stream) + throws InvalidProtocolBufferException { + ReadObjectResponse message = parser.parseFrom(stream); + try { + stream.checkLastTagWas(0); + return message; + } catch (InvalidProtocolBufferException e) { + e.setUnfinishedMessage(message); + throw e; + } + } + + @Override + public ResponseContentLifecycleHandle get(ReadObjectResponse response) { + InputStream stream = unclosedStreams.remove(response); + return new ResponseContentLifecycleHandle(response, stream); + } + + @Override + public void close() throws IOException { + closeAllStreams(unclosedStreams.values()); + } + + /** + * In the event closing the streams results in multiple streams throwing IOExceptions, collect + * them all as suppressed exceptions on the first occurrence. + */ + @VisibleForTesting + static void closeAllStreams(Collection inputStreams) throws IOException { + IOException ioException = + inputStreams.stream() + .map( + stream -> { + try { + stream.close(); + return null; + } catch (IOException e) { + return e; + } + }) + .filter(Objects::nonNull) + .reduce( + null, + (l, r) -> { + if (l != null) { + l.addSuppressed(r); + return l; + } else { + return r; + } + }, + (l, r) -> l); + + if (ioException != null) { + throw ioException; + } + } + } + + static final class ZeroCopyReadinessChecker { + private static final boolean isZeroCopyReady; + + static { + // Check whether io.grpc.Detachable exists? + boolean detachableClassExists = false; + try { + // Try to load Detachable interface in the package where KnownLength is in. + // This can be done directly by looking up io.grpc.Detachable but rather + // done indirectly to handle the case where gRPC is being shaded in a + // different package. + String knownLengthClassName = KnownLength.class.getName(); + String detachableClassName = + knownLengthClassName.substring(0, knownLengthClassName.lastIndexOf('.') + 1) + + "Detachable"; + Class detachableClass = Class.forName(detachableClassName); + detachableClassExists = (detachableClass != null); + } catch (ClassNotFoundException ex) { + // leaves detachableClassExists false + } + // Check whether com.google.protobuf.UnsafeByteOperations exists? + boolean unsafeByteOperationsClassExists = false; + try { + // Same above + String messageLiteClassName = MessageLite.class.getName(); + String unsafeByteOperationsClassName = + messageLiteClassName.substring(0, messageLiteClassName.lastIndexOf('.') + 1) + + "UnsafeByteOperations"; + Class unsafeByteOperationsClass = Class.forName(unsafeByteOperationsClassName); + unsafeByteOperationsClassExists = (unsafeByteOperationsClass != null); + } catch (ClassNotFoundException ex) { + // leaves unsafeByteOperationsClassExists false + } + isZeroCopyReady = detachableClassExists && unsafeByteOperationsClassExists; + } + + public static boolean isReady() { + return isZeroCopyReady; + } + } } diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java index 06fca0413f..d7741b7d91 100644 --- a/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/Hasher.java @@ -20,6 +20,7 @@ import com.google.common.hash.Hashing; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.List; import java.util.function.Supplier; import javax.annotation.concurrent.Immutable; import org.checkerframework.checker.nullness.qual.Nullable; @@ -36,6 +37,8 @@ default Crc32cLengthKnown hash(Supplier b) { void validate(Crc32cValue expected, Supplier b) throws IOException; + void validate(Crc32cValue expected, List buffers) throws IOException; + @Nullable Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown r2); @@ -61,6 +64,9 @@ public Crc32cLengthKnown hash(ByteBuffer b) { @Override public void validate(Crc32cValue expected, Supplier b) {} + @Override + public void validate(Crc32cValue expected, List b) {} + @Override public @Nullable Crc32cLengthKnown nullSafeConcat(Crc32cLengthKnown r1, Crc32cLengthKnown r2) { return null; @@ -79,7 +85,24 @@ public Crc32cLengthKnown hash(ByteBuffer b) { return Crc32cValue.of(Hashing.crc32c().hashBytes(b).asInt(), remaining); } - @SuppressWarnings("ConstantConditions") + @SuppressWarnings({"ConstantConditions", "UnstableApiUsage"}) + @Override + public void validate(Crc32cValue expected, List b) throws IOException { + long remaining = 0; + com.google.common.hash.Hasher crc32c = Hashing.crc32c().newHasher(); + for (ByteBuffer tmp : b) { + remaining += tmp.remaining(); + crc32c.putBytes(tmp); + } + Crc32cLengthKnown actual = Crc32cValue.of(crc32c.hash().asInt(), remaining); + if (!actual.eqValue(expected)) { + throw new IOException( + String.format( + "Mismatch checksum value. Expected %s actual %s", + expected.debugString(), actual.debugString())); + } + } + @Override public void validate(Crc32cValue expected, Supplier b) throws IOException { Crc32cLengthKnown actual = hash(b); diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java new file mode 100644 index 0000000000..b65a6e257b --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.storage; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Shrink wraps a beginning, offset and limit for tracking state of an individual invocation of + * {@link #read} + */ +final class ReadCursor { + private final long beginning; + private long offset; + private final long limit; + + ReadCursor(long beginning, long limit) { + this.limit = limit; + this.beginning = beginning; + this.offset = beginning; + } + + public boolean hasRemaining() { + return limit - offset > 0; + } + + public void advance(long incr) { + checkArgument(incr >= 0); + offset += incr; + } + + public long read() { + return offset - beginning; + } + + @Override + public String toString() { + return String.format("ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit); + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java new file mode 100644 index 0000000000..20fc365832 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java @@ -0,0 +1,57 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.storage; + +import com.google.storage.v2.ReadObjectResponse; +import java.io.Closeable; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import org.checkerframework.checker.nullness.qual.Nullable; + +final class ResponseContentLifecycleHandle implements Closeable { + @Nullable private final Closeable dispose; + + private final List buffers; + + ResponseContentLifecycleHandle(ReadObjectResponse resp, @Nullable Closeable dispose) { + this.dispose = dispose; + + this.buffers = resp.getChecksummedData().getContent().asReadOnlyByteBufferList(); + } + + void copy(ReadCursor c, ByteBuffer[] dsts, int offset, int length) { + for (ByteBuffer b : buffers) { + long copiedBytes = Buffers.copy(b, dsts, offset, length); + c.advance(copiedBytes); + if (b.hasRemaining()) break; + } + } + + boolean hasRemaining() { + for (ByteBuffer b : buffers) { + if (b.hasRemaining()) return true; + } + return false; + } + + @Override + public void close() throws IOException { + if (dispose != null) { + dispose.close(); + } + } +} diff --git a/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java new file mode 100644 index 0000000000..3236513398 --- /dev/null +++ b/google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java @@ -0,0 +1,31 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.storage; + +import com.google.storage.v2.ReadObjectResponse; + +interface ResponseContentLifecycleManager { + ResponseContentLifecycleHandle get(ReadObjectResponse response); + + static ResponseContentLifecycleManager noop() { + return response -> + new ResponseContentLifecycleHandle( + response, + () -> { + // no-op + }); + } +} diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java index ba9a972cc2..3a4fd7e924 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGapicUnbufferedReadableByteChannelTest.java @@ -127,7 +127,8 @@ public void readRetriesAreProperlyOrdered_readLargerThanMessageSize() .withDefaultCallContext( contextWithRetryForCodes(StatusCode.Code.DATA_LOSS)), start, - Hasher.noop())); + Hasher.noop(), + ResponseContentLifecycleManager.noop())); byte[] actualBytes = new byte[40]; try (UnbufferedReadableByteChannel c = session.open()) { c.read(ByteBuffer.wrap(actualBytes)); @@ -155,7 +156,8 @@ public void readRetriesAreProperlyOrdered_readSmallerThanMessageSize() .withDefaultCallContext( contextWithRetryForCodes(StatusCode.Code.DATA_LOSS)), start, - Hasher.noop())); + Hasher.noop(), + ResponseContentLifecycleManager.noop())); byte[] actualBytes = new byte[40]; ImmutableList buffers = TestUtils.subDivide(actualBytes, 2); try (UnbufferedReadableByteChannel c = session.open()) { @@ -213,7 +215,8 @@ public void readObject( .withDefaultCallContext( contextWithRetryForCodes(StatusCode.Code.DATA_LOSS)), start, - Hasher.noop())); + Hasher.noop(), + ResponseContentLifecycleManager.noop())); byte[] actualBytes = new byte[40]; try (UnbufferedReadableByteChannel c = session.open()) { IOException ioException = @@ -260,7 +263,8 @@ public void readObject( .withDefaultCallContext( contextWithRetryForCodes(StatusCode.Code.DATA_LOSS)), start, - Hasher.enabled())); + Hasher.enabled(), + ResponseContentLifecycleManager.noop())); byte[] actualBytes = new byte[40]; try (UnbufferedReadableByteChannel c = session.open()) { IOException ioException = @@ -299,7 +303,8 @@ public void readObject( .withDefaultCallContext( contextWithRetryForCodes(StatusCode.Code.DATA_LOSS)), start, - Hasher.enabled())); + Hasher.enabled(), + ResponseContentLifecycleManager.noop())); byte[] actualBytes = new byte[41]; //noinspection resource UnbufferedReadableByteChannel c = session.open(); diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java index 2e8efa0589..41f6df1de4 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ITGzipReadableByteChannelTest.java @@ -124,7 +124,9 @@ public void autoGzipDecompress_true() throws IOException { UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(storageClient.getInstance().readObjectCallable()) + .byteChannel( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .setAutoGzipDecompression(true) .unbuffered() @@ -143,7 +145,9 @@ public void autoGzipDecompress_false() throws IOException { UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(storageClient.getInstance().readObjectCallable()) + .byteChannel( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .setAutoGzipDecompression(false) .unbuffered() @@ -193,7 +197,9 @@ public void autoGzipDecompress_true() throws IOException { UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(storageClient.getInstance().readObjectCallable()) + .byteChannel( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .setAutoGzipDecompression(true) .unbuffered() @@ -212,7 +218,9 @@ public void autoGzipDecompress_false() throws IOException { UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(storageClient.getInstance().readObjectCallable()) + .byteChannel( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .setAutoGzipDecompression(false) .unbuffered() @@ -231,7 +239,9 @@ public void autoGzipDecompress_default_disabled() throws IOException { UnbufferedReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(storageClient.getInstance().readObjectCallable()) + .byteChannel( + storageClient.getInstance().readObjectCallable(), + ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .unbuffered() .setReadObjectRequest(reqCompressed) @@ -314,7 +324,7 @@ public void readObject( ReadableByteChannelSession session = ResumableMedia.gapic() .read() - .byteChannel(sc.readObjectCallable()) + .byteChannel(sc.readObjectCallable(), ResponseContentLifecycleManager.noop()) .setHasher(Hasher.noop()) .setAutoGzipDecompression(true) .unbuffered() diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java index 76219beccd..32d308a311 100644 --- a/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java @@ -38,7 +38,9 @@ public void verifyUnsupportedMethodsGenerateMeaningfulException() { .setCredentials(NoCredentials.getInstance()) .build(); @SuppressWarnings("resource") - Storage s = new GrpcStorageImpl(options, null, null, Opts.empty()); + Storage s = + new GrpcStorageImpl( + options, null, ResponseContentLifecycleManager.noop(), null, Opts.empty()); ImmutableList messages = Stream.>of( s::batch, diff --git a/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java b/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java new file mode 100644 index 0000000000..57d96dfbca --- /dev/null +++ b/google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java @@ -0,0 +1,223 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.storage; + +import static com.google.cloud.storage.TestUtils.getChecksummedData; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.storage.GrpcStorageOptions.ReadObjectResponseZeroCopyMessageMarshaller; +import com.google.common.collect.ImmutableList; +import com.google.common.hash.Hashing; +import com.google.protobuf.ByteString; +import com.google.storage.v2.ContentRange; +import com.google.storage.v2.Object; +import com.google.storage.v2.ObjectChecksums; +import com.google.storage.v2.ReadObjectResponse; +import io.grpc.StatusRuntimeException; +import io.grpc.internal.ReadableBuffer; +import io.grpc.internal.ReadableBuffers; +import java.io.Closeable; +import java.io.FilterInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import org.junit.Test; + +public class ZeroCopyMarshallerTest { + private final byte[] bytes = DataGenerator.base64Characters().genBytes(40); + private final ByteString data = ByteString.copyFrom(bytes, 0, 10); + private final ReadObjectResponse response = + ReadObjectResponse.newBuilder() + .setMetadata( + Object.newBuilder() + .setName("name") + .setGeneration(3L) + .setContentType("application/octet-stream") + .build()) + .setContentRange(ContentRange.newBuilder().setStart(0).build()) + .setObjectChecksums( + ObjectChecksums.newBuilder().setCrc32C(Hashing.crc32c().hashBytes(bytes).asInt())) + .setChecksummedData(getChecksummedData(data, Hasher.enabled())) + .build(); + + private ReadObjectResponseZeroCopyMessageMarshaller createMarshaller() { + return new ReadObjectResponseZeroCopyMessageMarshaller(ReadObjectResponse.getDefaultInstance()); + } + + private byte[] dropLastOneByte(byte[] bytes) { + return Arrays.copyOfRange(bytes, 0, bytes.length - 1); + } + + private InputStream createInputStream(byte[] bytes, boolean isZeroCopyable) { + ReadableBuffer buffer = + isZeroCopyable ? ReadableBuffers.wrap(ByteBuffer.wrap(bytes)) : ReadableBuffers.wrap(bytes); + return ReadableBuffers.openStream(buffer, true); + } + + @Test + public void testParseOnFastPath() throws IOException { + InputStream stream = createInputStream(response.toByteArray(), true); + ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); + ReadObjectResponse response = marshaller.parse(stream); + assertEquals(response, this.response); + ResponseContentLifecycleHandle stream2 = marshaller.get(response); + assertNotNull(stream2); + stream2.close(); + ResponseContentLifecycleHandle stream3 = marshaller.get(response); + assertNotNull(stream3); + stream3.close(); + } + + @Test + public void testParseOnSlowPath() throws IOException { + InputStream stream = createInputStream(response.toByteArray(), false); + ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); + ReadObjectResponse response = marshaller.parse(stream); + assertEquals(response, this.response); + ResponseContentLifecycleHandle stream2 = marshaller.get(response); + assertNotNull(stream2); + stream2.close(); + } + + @Test + public void testParseBrokenMessageOnFastPath() { + InputStream stream = createInputStream(dropLastOneByte(response.toByteArray()), true); + ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); + assertThrows( + StatusRuntimeException.class, + () -> { + marshaller.parse(stream); + }); + } + + @Test + public void testParseBrokenMessageOnSlowPath() { + InputStream stream = createInputStream(dropLastOneByte(response.toByteArray()), false); + ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller(); + assertThrows( + StatusRuntimeException.class, + () -> { + marshaller.parse(stream); + }); + } + + @Test + public void testResponseContentLifecycleHandle() throws IOException { + AtomicBoolean wasClosedCalled = new AtomicBoolean(false); + Closeable verifyClosed = () -> wasClosedCalled.set(true); + + ResponseContentLifecycleHandle handle = + new ResponseContentLifecycleHandle(response, verifyClosed); + handle.close(); + + assertTrue(wasClosedCalled.get()); + + ResponseContentLifecycleHandle nullHandle = new ResponseContentLifecycleHandle(response, null); + nullHandle.close(); + // No NullPointerException means test passes + } + + @Test + public void testMarshallerClose_clean() throws IOException { + CloseAuditingInputStream stream1 = + CloseAuditingInputStream.of(createInputStream(response.toByteArray(), true)); + CloseAuditingInputStream stream2 = + CloseAuditingInputStream.of(createInputStream(response.toByteArray(), true)); + CloseAuditingInputStream stream3 = + CloseAuditingInputStream.of(createInputStream(response.toByteArray(), true)); + + ReadObjectResponseZeroCopyMessageMarshaller.closeAllStreams( + ImmutableList.of(stream1, stream2, stream3)); + + assertThat(stream1.closed).isTrue(); + assertThat(stream2.closed).isTrue(); + assertThat(stream3.closed).isTrue(); + } + + @SuppressWarnings("resource") + @Test + public void testMarshallerClose_multipleIoExceptions() { + CloseAuditingInputStream stream1 = + new CloseAuditingInputStream(null) { + @Override + void onClose() throws IOException { + throw new IOException("Kaboom stream1"); + } + }; + CloseAuditingInputStream stream2 = + new CloseAuditingInputStream(null) { + @Override + void onClose() throws IOException { + throw new IOException("Kaboom stream2"); + } + }; + CloseAuditingInputStream stream3 = + new CloseAuditingInputStream(null) { + @Override + void onClose() throws IOException { + throw new IOException("Kaboom stream3"); + } + }; + + IOException ioException = + assertThrows( + IOException.class, + () -> + ReadObjectResponseZeroCopyMessageMarshaller.closeAllStreams( + ImmutableList.of(stream1, stream2, stream3))); + + assertThat(stream1.closed).isTrue(); + assertThat(stream2.closed).isTrue(); + assertThat(stream3.closed).isTrue(); + + assertThat(ioException).hasMessageThat().isEqualTo("Kaboom stream1"); + List messages = + Arrays.stream(ioException.getSuppressed()) + .map(Throwable::getMessage) + .collect(Collectors.toList()); + assertThat(messages).isEqualTo(ImmutableList.of("Kaboom stream2", "Kaboom stream3")); + } + + private static class CloseAuditingInputStream extends FilterInputStream { + + private boolean closed = false; + + private CloseAuditingInputStream(InputStream in) { + super(in); + } + + public static CloseAuditingInputStream of(InputStream in) { + return new CloseAuditingInputStream(in); + } + + @Override + public void close() throws IOException { + closed = true; + onClose(); + super.close(); + } + + void onClose() throws IOException {} + } +}