Skip to content

Commit

Permalink
feat: add @BetaApi BlobWriteSession#parallelCompositeUpload (#2239)
Browse files Browse the repository at this point in the history
  • Loading branch information
BenWhitehead authored Oct 3, 2023
1 parent c9b82f6 commit f8f4e22
Show file tree
Hide file tree
Showing 4 changed files with 657 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.google.api.core.BetaApi;
import com.google.cloud.storage.GrpcStorageOptions.GrpcStorageDefaults;
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.BlobWriteOption;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
Expand Down Expand Up @@ -108,6 +110,10 @@
* retryable error query the offset of the Resumable Upload Session, then open the recovery
* file from the offset and transmit the bytes to Cloud Storage.
* </td>
* <td>
* Opening the stream for upload will be retried up to the limitations specified in {@link StorageOptions#getRetrySettings()}
* All bytes are buffered to disk and allow for recovery from any arbitrary offset.
* </td>
* <td>gRPC</td>
* <td><a href="https://cloud.google.com/storage/docs/resumable-uploads">Resumable Upload</a></td>
* <td>
Expand All @@ -128,6 +134,90 @@
* </ol>
* </td>
* </tr>
* <tr>
* <td>Parallel Composite Upload</td>
* <td>{@link #parallelCompositeUpload()}</td>
* <td>
* Break the stream of bytes into smaller part objects uploading each part in parallel. Then
* composing the parts together to make the ultimate object.
* </td>
* <td>
* Automatic retires will be applied for the following:
* <ol>
* <li>Creation of each individual part</li>
* <li>Performing an intermediary compose</li>
* <li>Performing a delete to cleanup each part and intermediary compose object</li>
* </ol>
*
* Retrying the creation of the final object is contingent upon if an appropriate precondition
* is supplied when calling {@link Storage#blobWriteSession(BlobInfo, BlobWriteOption...)}.
* Either {@link BlobTargetOption#doesNotExist()} or {@link Storage.BlobTargetOption#generationMatch(long)}
* should be specified in order to make the final request idempotent.
* <p>Each operation will be retried up to the limitations specified in {@link StorageOptions#getRetrySettings()}
* </td>
* <td>gRPC</td>
* <td>
* <ul>
* <li><a href="https://cloud.google.com/storage/docs/parallel-composite-uploads">Parallel composite uploads</a></li>
* <li><a href="https://cloud.google.com/storage/docs/uploading-objects-from-memory">Direct uploads</a></li>
* <li><a href="https://cloud.google.com/storage/docs/composite-objects">Compose</a></li>
* <li><a href="https://cloud.google.com/storage/docs/deleting-objects">Object delete</a></li>
* </ul>
* </td>
* <td>
* <ol>
* <li>
* Performing parallel composite uploads costs more money.
* <a href="https://cloud.google.com/storage/pricing#operations-by-class">Class A</a>
* operations are performed to create each part and to perform each compose. If a storage
* tier other than
* <a href="https://cloud.google.com/storage/docs/storage-classes"><code>STANDARD</code></a>
* is used, early deletion fees apply to deletion of the parts.
* <p>An illustrative example. Upload a 5GiB object using 64MiB as the max size per part.
* <ol>
* <li>80 Parts will be created (Class A)</li>
* <li>3 compose calls will be performed (Class A)</li>
* <li>Delete 80 Parts along with 2 intermediary Compose objects (Free tier as long as {@code STANDARD} class)</li>
* </ol>
*
* Once the parts and intermediary compose objects are deleted, there will be no storage charges related to those temporary objects.
* </li>
* <li>
* The service account/credentials used to perform the parallel composite upload require
* <a href="https://cloud.google.com/storage/docs/access-control/iam-permissions#object_permissions">{@code storage.objects.delete}</a>
* in order to cleanup the temporary part and intermediary compose objects.
* <p><i>To handle handle part and intermediary compose object deletion out of band</i>
* passing {@link PartCleanupStrategy#never()} to {@link ParallelCompositeUploadBlobWriteSessionConfig#withPartCleanupStrategy(PartCleanupStrategy)}
* will prevent automatic cleanup.
* </li>
* <li>
* Please see the <a href="https://cloud.google.com/storage/docs/parallel-composite-uploads">
* Parallel composite uploads</a> documentation for a more in depth explanation of the
* limitations of Parallel composite uploads.
* </li>
* <li>
* A failed upload can leave part and intermediary compose objects behind which will count
* as storage usage, and you will be billed for it.
* <p>By default if an upload fails, an attempt to cleanup the part and intermediary compose
* will be made. However if the program were to crash there is no means for the client to
* perform the cleanup.
* <p>Every part and intermediary compose object will be created with a name which ends in
* {@code .part}. An Object Lifecycle Management rule can be setup on your bucket to automatically
* cleanup objects with the suffix after some period of time. See
* <a href="https://cloud.google.com/storage/docs/lifecycle">Object Lifecycle Management</a>
* for full details and a guide on how to setup a <a href="https://cloud.google.com/storage/docs/lifecycle#delete">Delete</a>
* rule with a <a href="https://cloud.google.com/storage/docs/lifecycle#matchesprefix-suffix">suffix match</a> condition.
* </li>
* <li>
* Using parallel composite uploads are not a one size fits all solution. They have very
* real overhead until uploading a large enough object. The inflection point is dependent
* upon many factors, and there is no one size fits all value. You will need to experiment
* with your deployment and workload to determine if parallel composite uploads are useful
* to you.
* </li>
* </ol>
* </td>
* </tr>
* </table>
*
* @see BlobWriteSessionConfig
Expand Down Expand Up @@ -219,4 +309,19 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection<Path> pat
public static JournalingBlobWriteSessionConfig journaling(Collection<Path> paths) {
return new JournalingBlobWriteSessionConfig(ImmutableList.copyOf(paths), false);
}

/**
* Create a new {@link BlobWriteSessionConfig} which will perform a <a
* href="https://cloud.google.com/storage/docs/parallel-composite-uploads">Parallel Composite
* Upload</a> by breaking the stream into parts and composing the parts together to make the
* ultimate object.
*
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
* @see GrpcStorageOptions.Builder#setBlobWriteSessionConfig(BlobWriteSessionConfig)
* @since 2.28.0 This new api is in preview and is subject to breaking changes.
*/
@BetaApi
public static ParallelCompositeUploadBlobWriteSessionConfig parallelCompositeUpload() {
return ParallelCompositeUploadBlobWriteSessionConfig.withDefaults();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1815,7 +1815,8 @@ private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
ReadObjectRequest readObjectRequest = getReadObjectRequest(blob, opts);
Set<StatusCode.Code> codes =
resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(readObjectRequest));
GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes);
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes));
return ResumableMedia.gapic()
.read()
.byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext))
Expand Down
Loading

0 comments on commit f8f4e22

Please sign in to comment.