Skip to content

Commit

Permalink
fix: update GrpcStorageImpl.createFrom(BlobInfo, Path) to use Rewinda…
Browse files Browse the repository at this point in the history
…bleContent (#2112)

With the introduction of RewindableContent, we can now upload an entire file in a single WriteObjectRequest stream.

* chore: rename RewindableHttpContent -> RewindableContent
  • Loading branch information
BenWhitehead authored Jul 14, 2023
1 parent 889e433 commit c805051
Show file tree
Hide file tree
Showing 17 changed files with 346 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
@ParametersAreNonnullByDefault
final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {

private final ResumableSession<StorageObject> session;
private final JsonResumableSession session;

private final SettableApiFuture<StorageObject> result;
private final LongConsumer committedBytesCallback;
Expand All @@ -57,7 +57,7 @@ public long write(ByteBuffer[] srcs, int offset, int length) throws IOException
if (!open) {
throw new ClosedChannelException();
}
RewindableHttpContent content = RewindableHttpContent.of(Utils.subArray(srcs, offset, length));
RewindableContent content = RewindableContent.of(Utils.subArray(srcs, offset, length));
long available = content.getLength();
long newFinalByteOffset = cumulativeByteCount + available;
final HttpContentRange header;
Expand Down Expand Up @@ -96,7 +96,7 @@ public void close() throws IOException {
if (!finished) {
try {
ResumableOperationResult<@Nullable StorageObject> operationResult =
session.put(RewindableHttpContent.empty(), HttpContentRange.of(cumulativeByteCount));
session.put(RewindableContent.empty(), HttpContentRange.of(cumulativeByteCount));
long persistedSize = operationResult.getPersistedSize();
committedBytesCallback.accept(persistedSize);
result.set(operationResult.getObject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ final class ByteSizeConstants {
static final int _2MiB = 2 * _1MiB;
static final int _16MiB = 16 * _1MiB;
static final int _32MiB = 32 * _1MiB;
static final long _1GiB = 1024 * _1MiB;
static final long _1TiB = 1024 * _1GiB;
static final long _5TiB = 5 * _1TiB;

static final long _128KiBL = 131072L;
static final long _256KiBL = 262144L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.InternalApi;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.rpc.ClientStreamingCallable;
Expand Down Expand Up @@ -219,10 +220,12 @@ final class ResumableUploadBuilder {

private RetryingDependencies deps;
private ResultRetryAlgorithm<?> alg;
private boolean fsyncEvery;

ResumableUploadBuilder() {
this.deps = RetryingDependencies.attemptOnce();
this.alg = Retrying.neverRetry();
this.fsyncEvery = true;
}

ResumableUploadBuilder withRetryConfig(RetryingDependencies deps, ResultRetryAlgorithm<?> alg) {
Expand All @@ -231,6 +234,12 @@ ResumableUploadBuilder withRetryConfig(RetryingDependencies deps, ResultRetryAlg
return this;
}

@InternalApi
ResumableUploadBuilder setFsyncEvery(boolean fsyncEvery) {
this.fsyncEvery = fsyncEvery;
return this;
}

/**
* Do not apply any intermediate buffering. Any call to {@link
* java.nio.channels.WritableByteChannel#write(ByteBuffer)} will be segmented as is and sent to
Expand Down Expand Up @@ -281,7 +290,10 @@ UnbufferedWritableByteChannelSession<WriteObjectResponse> build() {
return new UnbufferedWriteSession<>(
requireNonNull(start, "start must be non null"),
bindFunction(
WriteFlushStrategy.fsyncEveryFlush(write, deps, alg, Retrying::newCallContext),
fsyncEvery
? WriteFlushStrategy.fsyncEveryFlush(
write, deps, alg, Retrying::newCallContext)
: WriteFlushStrategy.fsyncOnClose(write),
ResumableWrite::identity)
.andThen(StorageByteChannels.writable()::createSynchronized));
}
Expand Down Expand Up @@ -310,7 +322,10 @@ BufferedWritableByteChannelSession<WriteObjectResponse> build() {
return new BufferedWriteSession<>(
requireNonNull(start, "start must be non null"),
bindFunction(
WriteFlushStrategy.fsyncEveryFlush(write, deps, alg, Retrying::newCallContext),
fsyncEvery
? WriteFlushStrategy.fsyncEveryFlush(
write, deps, alg, Retrying::newCallContext)
: WriteFlushStrategy.fsyncOnClose(write),
ResumableWrite::identity)
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
.andThen(StorageByteChannels.writable()::createSynchronized));
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.api.core.ApiFutures;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.Retrying.RetryingDependencies;
import com.google.storage.v2.Object;
import com.google.storage.v2.QueryWriteStatusRequest;
import com.google.storage.v2.QueryWriteStatusResponse;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.nullness.qual.Nullable;

final class GrpcResumableSession {

private final RetryingDependencies deps;
private final ResultRetryAlgorithm<?> alg;
private final ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> writeCallable;
private final UnaryCallable<QueryWriteStatusRequest, QueryWriteStatusResponse>
queryWriteStatusCallable;
private final ResumableWrite resumableWrite;
private final Hasher hasher;

GrpcResumableSession(
RetryingDependencies deps,
ResultRetryAlgorithm<?> alg,
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> writeCallable,
UnaryCallable<QueryWriteStatusRequest, QueryWriteStatusResponse> queryWriteStatusCallable,
ResumableWrite resumableWrite,
Hasher hasher) {
this.deps = deps;
this.alg = alg;
this.writeCallable = writeCallable;
this.queryWriteStatusCallable = queryWriteStatusCallable;
this.resumableWrite = resumableWrite;
this.hasher = hasher;
}

ResumableOperationResult<@Nullable Object> query() {
QueryWriteStatusRequest.Builder b =
QueryWriteStatusRequest.newBuilder().setUploadId(resumableWrite.getRes().getUploadId());
if (resumableWrite.getReq().hasCommonObjectRequestParams()) {
b.setCommonObjectRequestParams(resumableWrite.getReq().getCommonObjectRequestParams());
}
QueryWriteStatusRequest req = b.build();
try {
QueryWriteStatusResponse response = queryWriteStatusCallable.call(req);
if (response.hasResource()) {
return ResumableOperationResult.complete(
response.getResource(), response.getResource().getSize());
} else {
return ResumableOperationResult.incremental(response.getPersistedSize());
}
} catch (Exception e) {
throw StorageException.coalesce(e);
}
}

ResumableOperationResult<@Nullable Object> put(RewindableContent content) {
AtomicBoolean dirty = new AtomicBoolean(false);
GrpcCallContext retryingCallContext = Retrying.newCallContext();
BufferHandle handle = BufferHandle.allocate(ByteSizeConstants._2MiB);

return Retrying.run(
deps,
alg,
() -> {
if (dirty.getAndSet(true)) {
ResumableOperationResult<@Nullable Object> query = query();
if (query.getObject() != null) {
return query;
} else {
content.rewindTo(query.getPersistedSize());
}
}
WritableByteChannelSession<BufferedWritableByteChannel, WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(writeCallable.withDefaultCallContext(retryingCallContext))
.setByteStringStrategy(ByteStringStrategy.copy())
.setHasher(hasher)
.resumable()
.setFsyncEvery(false)
.buffered(handle)
.setStartAsync(ApiFutures.immediateFuture(resumableWrite))
.build();

try (BufferedWritableByteChannel channel = session.open()) {
content.writeTo(channel);
}

WriteObjectResponse response = session.getResult().get();
if (response.hasResource()) {
return ResumableOperationResult.complete(
response.getResource(), response.getResource().getSize());
} else {
return ResumableOperationResult.incremental(response.getPersistedSize());
}
},
Decoder.identity());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@
import static java.util.Objects.requireNonNull;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.BetaApi;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.paging.AbstractPage;
import com.google.api.gax.paging.Page;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.ApiExceptions;
import com.google.api.gax.rpc.ClientStreamingCallable;
import com.google.api.gax.rpc.NotFoundException;
import com.google.api.gax.rpc.StatusCode;
import com.google.api.gax.rpc.UnaryCallable;
Expand Down Expand Up @@ -72,6 +74,7 @@
import com.google.common.collect.Streams;
import com.google.common.io.BaseEncoding;
import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.iam.v1.GetIamPolicyRequest;
import com.google.iam.v1.SetIamPolicyRequest;
import com.google.iam.v1.TestIamPermissionsRequest;
Expand Down Expand Up @@ -123,7 +126,6 @@
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.SeekableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
Expand All @@ -138,6 +140,7 @@
import java.util.Spliterator;
import java.util.Spliterators.AbstractSpliterator;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Predicate;
Expand Down Expand Up @@ -285,55 +288,34 @@ public Blob createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOp
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);

long size = Files.size(path);
if (size < bufferSize) {
// ignore the bufferSize argument if the file is smaller than it
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
return Retrying.run(
getOptions(),
retryAlgorithmManager.getFor(req),
() -> {
BufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(storageClient.writeObjectCallable().withDefaultCallContext(merge))
.setHasher(Hasher.enabled())
.setByteStringStrategy(ByteStringStrategy.noCopy())
.direct()
.buffered(Buffers.allocate(size))
.setRequest(req)
.build();

try (SeekableByteChannel src = Files.newByteChannel(path, READ_OPS);
BufferedWritableByteChannel dst = session.open()) {
ByteStreams.copy(src, dst);
} catch (Exception e) {
throw StorageException.coalesce(e);
}
return session.getResult();
},
this::getBlob);
} else {
ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req);
BufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
.write()
.byteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
.setHasher(Hasher.noop())
.setByteStringStrategy(ByteStringStrategy.noCopy())
.resumable()
.withRetryConfig(getOptions(), retryAlgorithmManager.idempotent())
.buffered(Buffers.allocateAligned(bufferSize, _256KiB))
.setStartAsync(start)
.build();
try (SeekableByteChannel src = Files.newByteChannel(path, READ_OPS);
BufferedWritableByteChannel dst = session.open()) {
ByteStreams.copy(src, dst);
} catch (Exception e) {
throw StorageException.coalesce(e);
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write =
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext);

ApiFuture<ResumableWrite> start = startResumableWrite(grpcCallContext, req);
ApiFuture<GrpcResumableSession> session2 =
ApiFutures.transform(
start,
rw ->
ResumableSession.grpc(
getOptions(),
retryAlgorithmManager.idempotent(),
write,
storageClient.queryWriteStatusCallable(),
rw,
Hasher.noop()),
MoreExecutors.directExecutor());
try {
GrpcResumableSession got = session2.get();
ResumableOperationResult<@Nullable Object> put = got.put(RewindableContent.of(path));
Object object = put.getObject();
if (object == null) {
// if by some odd chance the put didn't get the Object, query for it
ResumableOperationResult<@Nullable Object> query = got.query();
object = query.getObject();
}
return getBlob(session.getResult());
return codecs.blobInfo().decode(object).asBlob(this);
} catch (InterruptedException | ExecutionException e) {
throw StorageException.coalesce(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import org.checkerframework.checker.nullness.qual.Nullable;

final class JsonResumableSession extends ResumableSession<StorageObject> {
final class JsonResumableSession {

static final String SPAN_NAME_WRITE =
String.format("Sent.%s.write", HttpStorageRpc.class.getName());
Expand All @@ -53,14 +53,12 @@ final class JsonResumableSession extends ResumableSession<StorageObject> {
* Not automatically retried. Usually called from within another retrying context. We don't yet
* have the concept of nested retry handling.
*/
@Override
ResumableOperationResult<@Nullable StorageObject> query() {
return new JsonResumableSessionQueryTask(context, resumableWrite.getUploadId()).call();
}

@Override
ResumableOperationResult<@Nullable StorageObject> put(
RewindableHttpContent content, HttpContentRange contentRange) {
RewindableContent content, HttpContentRange contentRange) {
JsonResumableSessionPutTask task =
new JsonResumableSessionPutTask(
context, resumableWrite.getUploadId(), content, contentRange);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ final class JsonResumableSessionPutTask

private final HttpClientContext context;
private final String uploadId;
private final RewindableHttpContent content;
private final RewindableContent content;
private final HttpContentRange originalContentRange;

private HttpContentRange contentRange;
Expand All @@ -48,7 +48,7 @@ final class JsonResumableSessionPutTask
JsonResumableSessionPutTask(
HttpClientContext httpClientContext,
String uploadId,
RewindableHttpContent content,
RewindableContent content,
HttpContentRange originalContentRange) {
this.context = httpClientContext;
this.uploadId = uploadId;
Expand All @@ -64,9 +64,9 @@ public void rewindTo(long offset) {
long originalBegin = range.beginOffset();
long contentOffset = offset - originalBegin;
Preconditions.checkArgument(
0 <= contentOffset && contentOffset < content.getLength(),
0 <= contentOffset && contentOffset < range.length(),
"Rewind offset is out of bounds. (%s <= %s < %s)",
range.beginOffset(),
originalBegin,
offset,
range.endOffset());
content.rewindTo(contentOffset);
Expand Down
Loading

0 comments on commit c805051

Please sign in to comment.