Skip to content

Commit

Permalink
chore: remove WriteFlusher instead having dedicated WritableByteChann…
Browse files Browse the repository at this point in the history
…els (#2514)

* chore: start teasing apart grpc write path to allow removal of WriteFlusher abstraction

WriteFlusher has grown unwieldy to be able to properly handle the kinds of uploads that are performed while being able to perform appropriate finegrained response validation.

This is the first change in a series to create an UnbufferedWritableByteChannel for each kind of upload where all logic for that upload can be encapsulated.

* chore: pt.2 chunked resumable upload

Make dedicated WritableByteChannel to handle chunked resumable uploads.

* chore: pt.3 streamed resumable upload

Make dedicated WritableByteChannel to handle streamed resumable uploads.

* chore: delete GapicUnbufferedWritableByteChannel

* chore: cleanup WriteFlushStrategy

* chore: pt.4 BidiWrite chunked resumable upload
  • Loading branch information
BenWhitehead authored May 3, 2024
1 parent d1f6bcc commit cebf059
Show file tree
Hide file tree
Showing 14 changed files with 1,127 additions and 843 deletions.
6 changes: 6 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,10 @@
<method>com.google.cloud.storage.StorageOptions$Builder setBlobWriteSessionConfig(com.google.cloud.storage.BlobWriteSessionConfig)</method>
</difference>

<!-- somehow clirr things a public class in a package-private class is part of the public api -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/storage/WriteFlushStrategy$DefaultBidiFlusher</className>
</difference>

</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,17 @@

package com.google.cloud.storage;

import static com.google.cloud.storage.GrpcUtils.contextWithBucketName;

import com.google.api.core.SettableApiFuture;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
import com.google.api.gax.rpc.ApiStreamObserver;
import com.google.api.gax.rpc.BidiStreamingCallable;
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
import com.google.cloud.storage.Conversions.Decoder;
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
import com.google.cloud.storage.Retrying.RetryingDependencies;
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
Expand All @@ -31,64 +39,89 @@
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Semaphore;
import java.util.function.Supplier;
import org.checkerframework.checker.nullness.qual.NonNull;

final class GapicBidiUnbufferedWritableByteChannel<
RequestFactoryT extends BidiWriteCtx.BidiWriteObjectRequestBuilderFactory>
implements UnbufferedWritableByteChannel {

final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {
private final BidiStreamingCallable<BidiWriteObjectRequest, BidiWriteObjectResponse> write;
private final RetryingDependencies deps;
private final ResultRetryAlgorithm<?> alg;
private final String bucketName;
private final Supplier<GrpcCallContext> baseContextSupplier;
private final SettableApiFuture<BidiWriteObjectResponse> resultFuture;
private final ChunkSegmenter chunkSegmenter;

private final BidiWriteCtx<RequestFactoryT> writeCtx;
private final WriteFlushStrategy.BidiFlusher flusher;
private final BidiWriteCtx<BidiResumableWrite> writeCtx;
private final BidiObserver responseObserver;

private volatile ApiStreamObserver<BidiWriteObjectRequest> stream;
private boolean open = true;
private boolean first = true;
private boolean finished = false;

GapicBidiUnbufferedWritableByteChannel(
BidiStreamingCallable<BidiWriteObjectRequest, BidiWriteObjectResponse> write,
RetryingDependencies deps,
ResultRetryAlgorithm<?> alg,
SettableApiFuture<BidiWriteObjectResponse> resultFuture,
ChunkSegmenter chunkSegmenter,
RequestFactoryT requestFactory,
WriteFlushStrategy.BidiFlusherFactory flusherFactory) {
BidiResumableWrite requestFactory,
Supplier<GrpcCallContext> baseContextSupplier) {
this.write = write;
this.deps = deps;
this.alg = alg;
this.baseContextSupplier = baseContextSupplier;
this.bucketName = requestFactory.bucketName();
this.resultFuture = resultFuture;
this.chunkSegmenter = chunkSegmenter;

this.writeCtx = new BidiWriteCtx<>(requestFactory);
this.flusher =
flusherFactory.newFlusher(
requestFactory.bucketName(), writeCtx.getConfirmedBytes()::set, resultFuture::set);
this.responseObserver = new BidiObserver();
}

@Override
public long write(ByteBuffer[] srcs, int srcsOffset, int srcsLength) throws IOException {
return internalWrite(srcs, srcsOffset, srcsLength, false);
}

@Override
public long writeAndClose(ByteBuffer[] srcs, int offset, int length) throws IOException {
long written = internalWrite(srcs, offset, length, true);
close();
return written;
}

@Override
public boolean isOpen() {
return open;
}

@Override
public void close() throws IOException {
if (!open) {
return;
}
ApiStreamObserver<BidiWriteObjectRequest> openedStream = openedStream();
if (!finished) {
BidiWriteObjectRequest message = finishMessage();
try {
flusher.close(message);
openedStream.onNext(message);
finished = true;
openedStream.onCompleted();
} catch (RuntimeException e) {
resultFuture.setException(e);
throw e;
}
} else {
flusher.close(null);
openedStream.onCompleted();
}
responseObserver.await();
open = false;
}

@VisibleForTesting
BidiWriteCtx<RequestFactoryT> getWriteCtx() {
BidiWriteCtx<BidiResumableWrite> getWriteCtx() {
return writeCtx;
}

Expand Down Expand Up @@ -130,7 +163,8 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
finished = true;
}

BidiWriteObjectRequest build = builder.build();
BidiWriteObjectRequest build = possiblyPairDownBidiRequest(builder, first).build();
first = false;
messages.add(build);
bytesConsumed += contentSize;
}
Expand All @@ -140,7 +174,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
}

try {
flusher.flush(messages);
flush(messages);
} catch (RuntimeException e) {
resultFuture.setException(e);
throw e;
Expand All @@ -162,4 +196,123 @@ private BidiWriteObjectRequest finishMessage() {
BidiWriteObjectRequest message = b.build();
return message;
}

private ApiStreamObserver<BidiWriteObjectRequest> openedStream() {
if (stream == null) {
synchronized (this) {
if (stream == null) {
GrpcCallContext internalContext =
contextWithBucketName(bucketName, baseContextSupplier.get());
stream =
this.write
.withDefaultCallContext(internalContext)
.bidiStreamingCall(responseObserver);
responseObserver.sem.drainPermits();
}
}
}
return stream;
}

private void flush(@NonNull List<BidiWriteObjectRequest> segments) {
Retrying.run(
deps,
alg,
() -> {
try {
ApiStreamObserver<BidiWriteObjectRequest> opened = openedStream();
for (BidiWriteObjectRequest message : segments) {
opened.onNext(message);
}
if (!finished) {
BidiWriteObjectRequest message =
BidiWriteObjectRequest.newBuilder().setFlush(true).setStateLookup(true).build();
opened.onNext(message);
}
responseObserver.await();
return null;
} catch (Exception e) {
stream = null;
first = true;
throw e;
}
},
Decoder.identity());
}

private static BidiWriteObjectRequest.Builder possiblyPairDownBidiRequest(
BidiWriteObjectRequest.Builder b, boolean firstMessageOfStream) {
if (firstMessageOfStream && b.getWriteOffset() == 0) {
return b;
}

if (!firstMessageOfStream) {
b.clearUploadId();
}

if (b.getWriteOffset() > 0) {
b.clearWriteObjectSpec();
}

if (b.getWriteOffset() > 0 && !b.getFinishWrite()) {
b.clearObjectChecksums();
}
return b;
}

private class BidiObserver implements ApiStreamObserver<BidiWriteObjectResponse> {

private final Semaphore sem;
private volatile BidiWriteObjectResponse last;
private volatile RuntimeException previousError;

private BidiObserver() {
this.sem = new Semaphore(0);
}

@Override
public void onNext(BidiWriteObjectResponse value) {
// incremental update
if (value.hasPersistedSize()) {
writeCtx.getConfirmedBytes().set((value.getPersistedSize()));
} else if (value.hasResource()) {
writeCtx.getConfirmedBytes().set(value.getResource().getSize());
}
sem.release();
last = value;
}

@Override
public void onError(Throwable t) {
if (t instanceof RuntimeException) {
previousError = (RuntimeException) t;
}
sem.release();
}

@Override
public void onCompleted() {
if (last != null && last.hasResource()) {
resultFuture.set(last);
}
sem.release();
}

void await() {
try {
sem.acquire();
} catch (InterruptedException e) {
if (e.getCause() instanceof RuntimeException) {
throw (RuntimeException) e.getCause();
} else {
throw new RuntimeException(e);
}
}
RuntimeException err = previousError;
if (err != null) {
previousError = null;
throw err;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.storage.v2.ServiceConstants.Values;
import java.nio.ByteBuffer;
import java.util.function.BiFunction;
import java.util.function.Function;

final class GapicBidiWritableByteChannelSessionBuilder {

Expand Down Expand Up @@ -78,36 +77,6 @@ GapicBidiWritableByteChannelSessionBuilder setByteStringStrategy(
return this;
}

/**
* When constructing a bidi channel session, there is always a {@link
* GapicBidiUnbufferedWritableByteChannel} at the bottom of it. This method creates a BiFunction
* which will instantiate the {@link GapicBidiUnbufferedWritableByteChannel} when provided with a
* {@code StartT} value and a {@code SettableApiFuture<BidiWriteObjectResponse>}.
*
* <p>As part of providing the function, the provided parameters {@code BidiFlusherFactory} and
* {@code f} are "bound" into the returned function. In conjunction with the configured fields of
* this class a new instance of {@link GapicBidiUnbufferedWritableByteChannel} can be constructed.
*/
private <StartT, RequestFactoryT extends BidiWriteCtx.BidiWriteObjectRequestBuilderFactory>
BiFunction<StartT, SettableApiFuture<BidiWriteObjectResponse>, UnbufferedWritableByteChannel>
bindFunction(
WriteFlushStrategy.BidiFlusherFactory flusherFactory,
Function<StartT, RequestFactoryT> f) {
// it is theoretically possible that the setter methods for the following variables could
// be called again between when this method is invoked and the resulting function is invoked.
// To ensure we are using the specified values at the point in time they are bound to the
// function read them into local variables which will be closed over rather than the class
// fields.
ByteStringStrategy boundStrategy = byteStringStrategy;
Hasher boundHasher = hasher;
return (start, resultFuture) ->
new GapicBidiUnbufferedWritableByteChannel<>(
resultFuture,
new ChunkSegmenter(boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE),
f.apply(start),
flusherFactory);
}

GapicBidiWritableByteChannelSessionBuilder.ResumableUploadBuilder resumable() {
return new GapicBidiWritableByteChannelSessionBuilder.ResumableUploadBuilder();
}
Expand Down Expand Up @@ -164,12 +133,30 @@ BufferedResumableUploadBuilder setStartAsync(ApiFuture<BidiResumableWrite> start
}

BufferedWritableByteChannelSession<BidiWriteObjectResponse> build() {
// it is theoretically possible that the setter methods for the following variables could
// be called again between when this method is invoked and the resulting function is
// invoked.
// To ensure we are using the specified values at the point in time they are bound to the
// function read them into local variables which will be closed over rather than the class
// fields.
ByteStringStrategy boundStrategy = byteStringStrategy;
Hasher boundHasher = hasher;
return new BufferedWriteSession<>(
requireNonNull(start, "start must be non null"),
bindFunction(
WriteFlushStrategy.defaultBidiFlusher(
write, deps, alg, Retrying::newCallContext),
BidiResumableWrite::identity)
((BiFunction<
BidiResumableWrite,
SettableApiFuture<BidiWriteObjectResponse>,
UnbufferedWritableByteChannel>)
(start, resultFuture) ->
new GapicBidiUnbufferedWritableByteChannel(
write,
deps,
alg,
resultFuture,
new ChunkSegmenter(
boundHasher, boundStrategy, Values.MAX_WRITE_CHUNK_BYTES_VALUE),
start,
Retrying::newCallContext))
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
.andThen(StorageByteChannels.writable()::createSynchronized));
}
Expand Down
Loading

0 comments on commit cebf059

Please sign in to comment.