Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: ensure all BlobWriteSession types conform to the semantics specified in BlobWriteSession #2482

Merged
merged 1 commit into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions google-cloud-storage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,11 @@
<className>com/google/cloud/storage/transfermanager/TransferManagerConfig$Builder</className>
<method>* setAllowDivideAndConquer(boolean)</method>
</difference>

<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/storage/StorageOptions$Builder</className>
<method>com.google.cloud.storage.StorageOptions$Builder setBlobWriteSessionConfig(com.google.cloud.storage.BlobWriteSessionConfig)</method>
</difference>

</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.BetaApi;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;
import java.util.concurrent.TimeUnit;

/**
* A session to write an object to Google Cloud Storage.
Expand Down Expand Up @@ -50,6 +51,10 @@ public interface BlobWriteSession {
* <p>Upon calling {@link WritableByteChannel#close()} the object creation will be finalized, and
* {@link #getResult()}s future should resolve.
*
* <p>The returned {@code WritableByteChannel} can throw IOExceptions from any of its usual
* methods. Any {@link IOException} thrown can have a cause of a {@link StorageException}.
* However, not all {@code IOExceptions} will have {@code StorageException}s.
*
* @throws IOException When creating the {@link WritableByteChannel} if an unrecoverable
* underlying IOException occurs it can be rethrown
* @throws IllegalStateException if open is called more than once
Expand All @@ -66,6 +71,10 @@ public interface BlobWriteSession {
* Google Cloud Storage 2. A terminal failure occurs, the terminal failure will become the
* exception result
*
* <p>If a terminal failure is encountered, calling either {@link ApiFuture#get()} or {@link
* ApiFuture#get(long, TimeUnit)} will result in an {@link
* java.util.concurrent.ExecutionException} with a cause that is the {@link StorageException}.
*
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.storage;

import com.google.api.core.ApiFuture;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.nio.channels.WritableByteChannel;

Expand All @@ -30,14 +31,20 @@ static BlobWriteSession of(WritableByteChannelSession<?, BlobInfo> s) {

static final class WritableByteChannelSessionAdapter implements BlobWriteSession {
private final WritableByteChannelSession<?, BlobInfo> delegate;
private boolean open;

private WritableByteChannelSessionAdapter(WritableByteChannelSession<?, BlobInfo> delegate) {
this.delegate = delegate;
open = false;
}

@Override
public WritableByteChannel open() throws IOException {
return delegate.open();
synchronized (this) {
Preconditions.checkState(!open, "already open");
open = true;
return delegate.open();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.google.storage.v2.WriteObjectRequest;
import com.google.storage.v2.WriteObjectResponse;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.time.Clock;
import java.util.Map;
Expand Down Expand Up @@ -202,24 +204,25 @@ static final class DecoratedWritableByteChannelSession<WBC extends WritableByteC
this.decoder = decoder;
}

@Override
public WBC open() {
try {
return WritableByteChannelSession.super.open();
} catch (Exception e) {
throw StorageException.coalesce(e);
}
}

@Override
public ApiFuture<WBC> openAsync() {
return delegate.openAsync();
return ApiFutures.catchingAsync(
delegate.openAsync(),
Throwable.class,
throwable -> ApiFutures.immediateFailedFuture(StorageException.coalesce(throwable)),
MoreExecutors.directExecutor());
}

@Override
public ApiFuture<BlobInfo> getResult() {
return ApiFutures.transform(
delegate.getResult(), decoder::decode, MoreExecutors.directExecutor());
ApiFuture<BlobInfo> decodeResult =
ApiFutures.transform(
delegate.getResult(), decoder::decode, MoreExecutors.directExecutor());
return ApiFutures.catchingAsync(
decodeResult,
Throwable.class,
throwable -> ApiFutures.immediateFailedFuture(StorageException.coalesce(throwable)),
MoreExecutors.directExecutor());
}
}

Expand All @@ -233,7 +236,55 @@ static final class LazySession<R>

@Override
public ApiFuture<BufferedWritableByteChannel> openAsync() {
return lazy.getSession().openAsync();
// make sure the errors coming out of the BufferedWritableByteChannel are either IOException
// or StorageException
return ApiFutures.transform(
lazy.getSession().openAsync(),
delegate ->
new BufferedWritableByteChannel() {
@Override
public int write(ByteBuffer src) throws IOException {
try {
return delegate.write(src);
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw StorageException.coalesce(e);
}
}

@Override
public void flush() throws IOException {
try {
delegate.flush();
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw StorageException.coalesce(e);
}
}

@Override
public boolean isOpen() {
try {
return delegate.isOpen();
} catch (Exception e) {
throw StorageException.coalesce(e);
}
}

@Override
public void close() throws IOException {
try {
delegate.close();
} catch (IOException e) {
throw e;
} catch (Exception e) {
throw StorageException.coalesce(e);
}
}
},
MoreExecutors.directExecutor());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ public void close() throws IOException {
throw e;
}
} else {
flusher.close(null);
try {
flusher.close(null);
} catch (RuntimeException e) {
resultFuture.setException(e);
throw e;
}
}
open = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ GapicBidiWritableByteChannelSessionBuilder bidiByteChannel(
}

ApiFuture<ResumableWrite> resumableWrite(
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> x,
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> callable,
WriteObjectRequest writeObjectRequest) {
StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder();
if (writeObjectRequest.hasWriteObjectSpec()) {
Expand All @@ -65,9 +65,16 @@ ApiFuture<ResumableWrite> resumableWrite(
Function<String, WriteObjectRequest> f =
uploadId ->
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
return ApiFutures.transform(
x.futureCall(req),
(resp) -> new ResumableWrite(req, resp, f),
ApiFuture<ResumableWrite> futureResumableWrite =
ApiFutures.transform(
callable.futureCall(req),
(resp) -> new ResumableWrite(req, resp, f),
MoreExecutors.directExecutor());
// make sure we wrap any failure as a storage exception
return ApiFutures.catchingAsync(
futureResumableWrite,
Throwable.class,
throwable -> ApiFutures.immediateFailedFuture(StorageException.coalesce(throwable)),
MoreExecutors.directExecutor());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void rewindTo(long offset) {
}
} else if (finalizing && JsonResumableSessionFailureScenario.isOk(code)) {
@Nullable StorageObject storageObject;
@Nullable BigInteger actualSize;
BigInteger actualSize = BigInteger.ZERO;

Long contentLength = response.getHeaders().getContentLength();
String contentType = response.getHeaders().getContentType();
Expand All @@ -130,7 +130,12 @@ public void rewindTo(long offset) {
boolean isJson = contentType != null && contentType.startsWith("application/json");
if (isJson) {
storageObject = response.parseAs(StorageObject.class);
actualSize = storageObject != null ? storageObject.getSize() : null;
if (storageObject != null) {
BigInteger size = storageObject.getSize();
if (size != null) {
actualSize = size;
}
}
} else if ((contentLength == null || contentLength == 0) && storedContentLength != null) {
// when a signed url is used, the finalize response is empty
response.ignore();
Expand All @@ -150,7 +155,6 @@ public void rewindTo(long offset) {
int compare = expectedSize.compareTo(actualSize);
if (compare == 0) {
success = true;
//noinspection DataFlowIssue compareTo result will filter out actualSize == null
return ResumableOperationResult.complete(storageObject, actualSize.longValue());
} else if (compare > 0) {
StorageException se =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ static BaseServiceException coalesce(Throwable t) {
if (t instanceof ApiException) {
return asStorageException((ApiException) t);
}
if (t.getCause() instanceof ApiException) {
return asStorageException((ApiException) t.getCause());
}
return getStorageException(t);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,17 @@
import com.google.cloud.ServiceDefaults;
import com.google.cloud.ServiceOptions;
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.storage.GrpcStorageOptions.GrpcStorageDefaults;
import com.google.cloud.storage.HttpStorageOptions.HttpStorageDefaults;
import com.google.cloud.storage.HttpStorageOptions.HttpStorageFactory;
import com.google.cloud.storage.HttpStorageOptions.HttpStorageRpcFactory;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.cloud.storage.TransportCompatibility.Transport;
import com.google.cloud.storage.spi.StorageRpcFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import org.checkerframework.checker.nullness.qual.NonNull;

public abstract class StorageOptions extends ServiceOptions<Storage, StorageOptions> {

Expand Down Expand Up @@ -95,6 +98,18 @@ public abstract static class Builder

public abstract Builder setStorageRetryStrategy(StorageRetryStrategy storageRetryStrategy);

/**
* @see BlobWriteSessionConfig
* @see BlobWriteSessionConfigs
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see HttpStorageDefaults#getDefaultStorageWriterConfig()
* @see GrpcStorageDefaults#getDefaultStorageWriterConfig()
* @since 2.37.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public abstract StorageOptions.Builder setBlobWriteSessionConfig(
@NonNull BlobWriteSessionConfig blobWriteSessionConfig);

@Override
public abstract StorageOptions build();
}
Expand Down
Loading
Loading