diff --git a/design/multipart_parallel_upload.md b/design/multipart_parallel_upload.md new file mode 100644 index 0000000..75c0dd9 --- /dev/null +++ b/design/multipart_parallel_upload.md @@ -0,0 +1,38 @@ +# Multipart Parallel Upload Design + +This document is intended to capture the context around the component within this blobstore that provides +Parallel Multipart Upload: + +[MultipartUploader.java](../src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java) + +## Context and Idea + +gsutil concept: [Parallel Composite Uploads](https://cloud.google.com/storage/docs/gsutil/commands/cp#parallel-composite-uploads) + +The google-cloud-storage library for Java this plugin is built with does not have a provided mechanism for parallel +composite uploads. + +In NXRM 3.19, the Amazon S3 blobstore switched to using parallel uploads, see + +https://github.com/sonatype/nexus-public/blob/master/plugins/nexus-blobstore-s3/src/main/java/org/sonatype/nexus/blobstore/s3/internal/ParallelUploader.java + +This switch has resulted in higher overall throughput for the S3 Blobstores (see https://issues.sonatype.org/browse/NEXUS-19566). +The goal for this feature would be to replicate that parallel upload. + +## Implementation + +* We don't have content length in the Blobstore API, have an `InputStream` than can be quite large. +* GCS compose method has a hard limit of 32 chunks. Since we don't have the length, we can't +split into 32 equal parts. We can do 31 chunks of a chunkSize parameter, then 1 chunk of the rest. +* We should expose how many times that compose limit as hit with tips on how to re-configure. +* For files smaller than 1 chunk size, we don't pay the cost of shipping the upload request to a thread. + * The first chunk is written at the expected destination path. + * If we've read chunkSize, and there is still more data on the stream, schedule the 2nd through final chunks off thread +* A blob write still waits until completion of parallel uploads. This is an important characteristic of all BlobStores; +Nexus Repository Manager expects consistency and not deferred, eventual consistency. + +## Tuning + +* Observability via debug logging on `org.sonatype.nexus.blobstore.gcloud.internal.MultipartUploader` +* 5 MB default and 32 chunk compose limit means optimal threads utilization will be in place for files between 10 MB +and 160 MB in size. diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java index 54b1fff..c36769a 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java @@ -133,10 +133,12 @@ public class GoogleCloudBlobStore private BlobStoreQuotaService quotaService; + private final MultipartUploader multipartUploader; + private PeriodicJob quotaCheckingJob; private final int quotaCheckInterval; - + @Inject public GoogleCloudBlobStore(final GoogleCloudStorageFactory storageFactory, final BlobIdLocationResolver blobIdLocationResolver, @@ -144,6 +146,7 @@ public GoogleCloudBlobStore(final GoogleCloudStorageFactory storageFactory, final ShardedCounterMetricsStore metricsStore, final GoogleCloudDatastoreFactory datastoreFactory, final DryRunPrefix dryRunPrefix, + final MultipartUploader multipartUploader, final MetricRegistry metricRegistry, final BlobStoreQuotaService quotaService, @Named("${nexus.blobstore.quota.warnIntervalSeconds:-60}") @@ -154,6 +157,7 @@ public GoogleCloudBlobStore(final GoogleCloudStorageFactory storageFactory, this.storageFactory = checkNotNull(storageFactory); this.metricsStore = metricsStore; this.datastoreFactory = datastoreFactory; + this.multipartUploader = multipartUploader; this.metricRegistry = metricRegistry; this.quotaService = quotaService; this.quotaCheckInterval = quotaCheckInterval; @@ -174,7 +178,7 @@ protected void doStart() throws Exception { metadata.store(); } liveBlobs = CacheBuilder.newBuilder().weakValues().recordStats().build(from(GoogleCloudStorageBlob::new)); - + wrapWithGauge("liveBlobsCache.size", () -> liveBlobs.size()); wrapWithGauge("liveBlobsCache.hitCount", () -> liveBlobs.stats().hitCount()); wrapWithGauge("liveBlobsCache.missCount", () -> liveBlobs.stats().missCount()); @@ -210,7 +214,8 @@ protected Blob doCreate(final InputStream blobData, return createInternal(headers, destination -> { try (InputStream data = blobData) { MetricsInputStream input = new MetricsInputStream(data); - bucket.create(destination, input); + + multipartUploader.upload(storage, getConfiguredBucketName(), destination, input); return input.getMetrics(); } }, blobId); @@ -229,7 +234,6 @@ public Blob copy(final BlobId blobId, final Map headers) { return createInternal(headers, destination -> { sourceBlob.getBlob().copyTo(getConfiguredBucketName(), destination); - BlobMetrics metrics = sourceBlob.getMetrics(); return new StreamMetrics(metrics.getContentSize(), metrics.getSha1Hash()); }, null); diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java new file mode 100644 index 0000000..f735e93 --- /dev/null +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -0,0 +1,262 @@ +/* + * Sonatype Nexus (TM) Open Source Version + * Copyright (c) 2017-present Sonatype, Inc. + * All rights reserved. Includes the third-party code listed at http://links.sonatype.com/products/nexus/oss/attributions. + * + * This program and the accompanying materials are made available under the terms of the Eclipse Public License Version 1.0, + * which accompanies this distribution and is available at http://www.eclipse.org/legal/epl-v10.html. + * + * Sonatype Nexus (TM) Professional Version is available from Sonatype, Inc. "Sonatype" and "Sonatype Nexus" are trademarks + * of Sonatype, Inc. Apache Maven is a trademark of the Apache Software Foundation. M2eclipse is a trademark of the + * Eclipse Foundation. All other trademarks are the property of their respective owners. + */ +package org.sonatype.nexus.blobstore.gcloud.internal; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; + +import javax.inject.Inject; +import javax.inject.Named; + +import org.sonatype.nexus.blobstore.api.BlobStoreException; +import org.sonatype.nexus.common.stateguard.StateGuardLifecycleSupport; +import org.sonatype.nexus.thread.NexusThreadFactory; + +import com.codahale.metrics.InstrumentedExecutorService; +import com.codahale.metrics.MetricRegistry; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobTargetOption; +import com.google.cloud.storage.Storage.ComposeRequest; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; + +import static java.lang.String.format; + +/** + * Component that provides parallel multipart upload support for blob binary data (.bytes files). + */ +@Named +public class MultipartUploader + extends StateGuardLifecycleSupport +{ + + /** + * Use this property in 'nexus.properties' to control how large each multipart part is. Default is 5 MB. + * Smaller numbers increase the number of parallel workers used to upload a file. Match to your workload: + * if you are heavy in docker with large images, increase; if you are heavy in smaller components, decrease. + */ + public static final String CHUNK_SIZE_PROPERTY = "nexus.gcs.multipartupload.chunksize"; + + /** + * This is a hard limit on the number of components to a compose request enforced by Google Cloud Storage API. + */ + static final int COMPOSE_REQUEST_LIMIT = 32; + + /** + * While an invocation of {@link #upload(Storage, String, String, InputStream)} is in-flight, the individual + * chunks of the file will have names like 'destination.chunkPartNumber", like + * 'content/vol-01/chap-01/UUID.bytes.chunk1', 'content/vol-01/chap-01/UUID.bytes.chunk2', etc. + */ + private final String CHUNK_NAME_PART = ".chunk"; + + /** + * Used internally to count how many times we've hit the compose limit. + * Consider exposing this as a bean that can provide tuning feedback to deployers. + */ + private final AtomicLong composeLimitHit = new AtomicLong(0); + + private static final byte[] EMPTY = new byte[0]; + + private final ListeningExecutorService executorService; + + private final int chunkSize; + + @Inject + public MultipartUploader(final MetricRegistry metricRegistry, + @Named("${"+CHUNK_SIZE_PROPERTY +":-5242880}") final int chunkSize) { + this.chunkSize = chunkSize; + this.executorService = MoreExecutors.listeningDecorator( + new InstrumentedExecutorService( + Executors.newCachedThreadPool( + new NexusThreadFactory("multipart-upload", "nexus-blobstore-google-cloud")), + metricRegistry, format("%s.%s", MultipartUploader.class.getName(), "executor-service"))); + } + + @Override + protected void doStop() throws Exception { + executorService.shutdownNow(); + } + + /** + * @return the value for the {@link #CHUNK_SIZE_PROPERTY} + */ + public int getChunkSize() { + return chunkSize; + } + + /** + * @return the number of times {@link #upload(Storage, String, String, InputStream)} hit the multipart-compose limit + */ + public long getNumberOfTimesComposeLimitHit() { + return composeLimitHit.get(); + } + + /** + * @param storage an initialized {@link Storage} instance + * @param bucket the name of the bucket + * @param destination the the destination (relative to the bucket) + * @param contents the stream of data to store + * @return the successfully stored {@link Blob} + * @throws BlobStoreException if any part of the upload failed + */ + public Blob upload(final Storage storage, final String bucket, final String destination, final InputStream contents) { + log.debug("Starting multipart upload for destination {} in bucket {}", destination, bucket); + // this must represent the bucket-relative paths to the chunks, in order of composition + List chunkNames = new ArrayList<>(); + + Optional singleChunk = Optional.empty(); + try (InputStream current = contents) { + List> chunkFutures = new ArrayList<>(); + // MUST respect hard limit of 32 chunks per compose request + for (int partNumber = 1; partNumber <= COMPOSE_REQUEST_LIMIT; partNumber++) { + final byte[] chunk; + if (partNumber < COMPOSE_REQUEST_LIMIT) { + chunk = readChunk(current); + } + else { + // we've hit compose request limit read the rest of the stream + composeLimitHit.incrementAndGet(); + chunk = EMPTY; + log.info("Upload for {} has hit Google Cloud Storage multipart-compose limit ({} total times limit hit); " + + "consider increasing '{}' beyond current value of {}", destination, composeLimitHit.get(), + CHUNK_SIZE_PROPERTY, getChunkSize()); + + final String finalChunkName = toChunkName(destination, COMPOSE_REQUEST_LIMIT); + chunkNames.add(finalChunkName); + chunkFutures.add(executorService.submit(() -> { + log.debug("Uploading final chunk {} for {} of unknown remaining bytes", COMPOSE_REQUEST_LIMIT, destination); + BlobInfo blobInfo = BlobInfo.newBuilder(bucket, finalChunkName).build(); + // read the rest of the current stream + // downside here is that since we don't know the stream size, we can't chunk it. + // the deprecated create method here does not allow us to disable GZIP compression on these PUTs + return storage.create(blobInfo, current); + })); + } + + if (chunk == EMPTY && partNumber > 1) { + break; + } + + final String chunkName = toChunkName(destination, partNumber); + chunkNames.add(chunkName); + + if (partNumber == 1) { + // upload the first part on the current thread + BlobInfo blobInfo = BlobInfo.newBuilder(bucket, chunkName).build(); + Blob blob = storage.create(blobInfo, chunk, BlobTargetOption.disableGzipContent()); + singleChunk = Optional.of(blob); + } + else { + singleChunk = Optional.empty(); + // 2nd through N chunks will happen off current thread in parallel + final int chunkIndex = partNumber; + chunkFutures.add(executorService.submit(() -> { + log.debug("Uploading chunk {} for {} of {} bytes", chunkIndex, destination, chunk.length); + BlobInfo blobInfo = BlobInfo.newBuilder( + bucket, chunkName).build(); + return storage.create(blobInfo, chunk, BlobTargetOption.disableGzipContent()); + })); + } + } + + // return the single result if it exists; otherwise finalize the parallel multipart workers + return singleChunk.orElseGet(() -> { + CountDownLatch block = new CountDownLatch(1); + Futures.whenAllComplete(chunkFutures).run(() -> block.countDown() , MoreExecutors.directExecutor()); + // wait for all the futures to complete + log.debug("waiting for {} remaining chunks to complete", chunkFutures.size()); + try { + block.await(); + } + catch (InterruptedException e) { + log.error("caught InterruptedException waiting for multipart upload to complete on {}", destination); + throw new RuntimeException(e); + } + log.debug("chunk uploads completed, sending compose request"); + + // finalize with compose request to coalesce the chunks + Blob finalBlob = storage.compose(ComposeRequest.of(bucket, chunkNames, destination)); + log.debug("Multipart upload of {} complete", destination); + return finalBlob; + }); + } + catch(Exception e) { + throw new BlobStoreException("Error uploading blob", e, null); + } + finally { + // remove any .chunkN files off-thread + // make sure not to delete the first chunk (which has the desired destination name with no suffix) + deferredCleanup(storage, bucket, chunkNames); + } + } + + private void deferredCleanup(final Storage storage, final String bucket, final List chunkNames) { + executorService.submit(() -> chunkNames.stream() + .filter(part -> part.contains(CHUNK_NAME_PART)) + .forEach(chunk -> storage.delete(bucket, chunk))); + } + + /** + * The name of the first chunk should match the desired end destination. + * For any chunk index 2 or greater, this method will return the destination + the chunk name suffix. + * + * @param destination + * @param chunkNumber + * @return the name to store this chunk + */ + private String toChunkName(String destination, int chunkNumber) { + if (chunkNumber == 1) { + return destination; + } + return destination + CHUNK_NAME_PART + chunkNumber; + } + + /** + * Read a chunk of the stream up to {@link #getChunkSize()} in length. + * + * @param input the stream to read + * @return the read data as a byte array + * @throws IOException + */ + private byte[] readChunk(final InputStream input) throws IOException { + byte[] buffer = new byte[chunkSize]; + int offset = 0; + int remain = chunkSize; + int bytesRead = 0; + + while (remain > 0 && bytesRead >= 0) { + bytesRead = input.read(buffer, offset, remain); + if (bytesRead > 0) { + offset += bytesRead; + remain -= bytesRead; + } + } + if (offset > 0) { + return Arrays.copyOfRange(buffer, 0, offset); + } + else { + return EMPTY; + } + } +} diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy index e783948..b5fb176 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy @@ -93,6 +93,11 @@ class GoogleCloudBlobStoreIT ShardedCounterMetricsStore metricsStore + // chunkSize = 5 MB + // the net effect of this is all tests except for "create large file" will be single thread + // create large file will end up using max chunks + MultipartUploader uploader = new MultipartUploader(metricRegistry, 5242880) + def setup() { quotaService = new BlobStoreQuotaServiceImpl([ (SpaceUsedQuota.ID): new SpaceUsedQuota() @@ -105,7 +110,7 @@ class GoogleCloudBlobStoreIT metricsStore = new ShardedCounterMetricsStore(blobIdLocationResolver, datastoreFactory, periodicJobService) blobStore = new GoogleCloudBlobStore(storageFactory, blobIdLocationResolver, periodicJobService, metricsStore, - datastoreFactory, new DryRunPrefix("TEST "), metricRegistry, quotaService, 60) + datastoreFactory, new DryRunPrefix("TEST "), uploader, metricRegistry, quotaService, 60) blobStore.init(config) blobStore.start() @@ -224,7 +229,7 @@ class GoogleCloudBlobStoreIT (BlobStore.CREATED_BY_HEADER): 'someuser' ] ) assert blob != null // sit for at least the time on our keep alives, so that any held connections close - log.info("waiting for ${(KEEP_ALIVE_DURATION + 1000L) / 1000L} seconds any stale connections to close") + log.info("waiting for ${(KEEP_ALIVE_DURATION + 1000L) / 1000L} seconds so any stale connections close") sleep(KEEP_ALIVE_DURATION + 1000L) Blob blob2 = blobStore.create(new ByteArrayInputStream('hello'.getBytes()), @@ -443,7 +448,7 @@ class GoogleCloudBlobStoreIT ShardedCounterMetricsStore metricsStore2 = new ShardedCounterMetricsStore(blobIdLocationResolver, datastoreFactory, periodicJobService) def blobStore2 = new GoogleCloudBlobStore(storageFactory, blobIdLocationResolver, periodicJobService, metricsStore2, - datastoreFactory, new DryRunPrefix("TEST "), metricRegistry, quotaService, 60) + datastoreFactory, new DryRunPrefix("TEST "), uploader, metricRegistry, quotaService, 60) blobStore2.init(config2) blobStore2.start() metricsStore2.start() @@ -528,4 +533,11 @@ class GoogleCloudBlobStoreIT storage.delete(bucket) log.info("bucket ${bucket} deleted") } + + def createFile(Storage storage, String path, long size) { + byte [] content = new byte[size] + new Random().nextBytes(content) + storage.create(BlobInfo.newBuilder(bucketName, path).build(), + content) + } } diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy index 648f67b..544018f 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy @@ -55,12 +55,14 @@ class GoogleCloudBlobStoreTest Bucket bucket = Mock() - MetricRegistry metricRegistry = Mock() + MetricRegistry metricRegistry = new MetricRegistry() GoogleCloudDatastoreFactory datastoreFactory = Mock() Datastore datastore = Mock() + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) + BlobStoreQuotaService quotaService = Mock() ShardedCounterMetricsStore metricsStore = Mock() @@ -72,8 +74,8 @@ class GoogleCloudBlobStoreTest (BlobStore.CREATED_BY_HEADER): 'admin' ] GoogleCloudBlobStore blobStore = new GoogleCloudBlobStore( - storageFactory, blobIdLocationResolver, periodicJobService, metricsStore, datastoreFactory, - new DryRunPrefix("TEST "), metricRegistry, quotaService, 60) + storageFactory, blobIdLocationResolver, periodicJobService, metricsStore, datastoreFactory, new DryRunPrefix("TEST "), + uploader, metricRegistry, quotaService, 60) def config = new BlobStoreConfiguration() @@ -150,6 +152,7 @@ class GoogleCloudBlobStoreTest blobStore.init(config) blobStore.doStart() + storage.create(_, _, _) >> mockGoogleObject(tempFileBytes) BlobId id = new BlobId(UUID.randomUUID().toString()) String resolved = blobIdLocationResolver.getLocation(id) diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy new file mode 100644 index 0000000..5df48bc --- /dev/null +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -0,0 +1,218 @@ +/* + * Sonatype Nexus (TM) Open Source Version + * Copyright (c) 2017-present Sonatype, Inc. + * All rights reserved. Includes the third-party code listed at http://links.sonatype.com/products/nexus/oss/attributions. + * + * This program and the accompanying materials are made available under the terms of the Eclipse Public License Version 1.0, + * which accompanies this distribution and is available at http://www.eclipse.org/legal/epl-v10.html. + * + * Sonatype Nexus (TM) Professional Version is available from Sonatype, Inc. "Sonatype" and "Sonatype Nexus" are trademarks + * of Sonatype, Inc. Apache Maven is a trademark of the Apache Software Foundation. M2eclipse is a trademark of the + * Eclipse Foundation. All other trademarks are the property of their respective owners. + */ +package org.sonatype.nexus.blobstore.gcloud.internal + +import java.util.stream.StreamSupport + +import org.sonatype.nexus.blobstore.api.BlobStoreConfiguration + +import com.codahale.metrics.MetricRegistry +import com.google.cloud.storage.Blob +import com.google.cloud.storage.Blob.BlobSourceOption +import com.google.cloud.storage.BucketInfo +import com.google.cloud.storage.Storage +import groovy.util.logging.Slf4j +import org.apache.commons.io.input.BoundedInputStream +import spock.lang.Specification + +@Slf4j +class MultipartUploaderIT + extends Specification +{ + + static final BlobStoreConfiguration config = new BlobStoreConfiguration() + + static final GoogleCloudStorageFactory storageFactory = new GoogleCloudStorageFactory() + + static String bucketName = "integration-test-${UUID.randomUUID().toString()}" + + static Storage storage + + MetricRegistry metricRegistry = new MetricRegistry() + + def setupSpec() { + config.attributes = [ + 'google cloud storage': [ + bucket: bucketName, + credential_file: this.getClass().getResource('/gce-credentials.json').getFile() + ] + ] + + log.info("Integration test using bucket ${bucketName}") + storage = storageFactory.create(config) + storage.create(BucketInfo.of(bucketName)) + } + + def cleanupSpec() { + log.debug("Tests complete, deleting files from ${bucketName}") + // must delete all the files within the bucket before we can delete the bucket + Iterator list = storage.list(bucketName, + Storage.BlobListOption.prefix("")).iterateAll() + .iterator() + + Iterable iterable = { _ -> list } + StreamSupport.stream(iterable.spliterator(), true) + .forEach({ b -> b.delete(BlobSourceOption.generationMatch()) }) + storage.delete(bucketName) + log.info("Integration test complete, bucket ${bucketName} deleted") + } + + def "simple multipart"() { + given: + long expectedSize = (1048576 * 3) + 2 + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1048576) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/multi_part', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-01/control/multi_part').getContent() == data + } + + def "confirm parts composed in order"() { + given: + // 5 each of abcdefg + final String content = "aaaaabbbbbcccccdddddeeeeefffffggggg" + byte[] data = content.bytes + MultipartUploader uploader = new MultipartUploader(metricRegistry, 5) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/in_order', new ByteArrayInputStream(data)) + + then: + blob.size == data.length + Blob readback = storage.get(blob.blobId) + readback.getContent() == content.bytes + } + + def "single part"() { + given: + long expectedSize = 1048575 + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1048576) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/single_part', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-01/control/single_part').getContent() == data + } + + def "zero byte file"() { + given: + long expectedSize = 0 + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/zero_byte', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-01/control/zero_byte').getContent() == data + } + + def "hit compose limit slightly and still successful"() { + given: + long expectedSize = (1024 * MultipartUploader.COMPOSE_REQUEST_LIMIT) + 10 + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-01/composeLimitTest/small_miss', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + uploader.numberOfTimesComposeLimitHit == 1L + storage.get(bucketName, 'vol-01/chap-01/composeLimitTest/small_miss').getContent() == data + } + + def "hit compose limit poorly tuned, still successful" () { + given: + long expectedSize = 1048576 + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-01/composeLimitTest/poor_tuning', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + uploader.numberOfTimesComposeLimitHit == 1L + storage.get(bucketName, 'vol-01/chap-01/composeLimitTest/poor_tuning').getContent() == data + } + + /** + * Larger upload that still fits ideally within our default tuning. 100 MB will result in 20 5 MB chunks. + */ + def "upload 100 MB"() { + given: + long expectedSize = 1024 * 1024 * 100 + BoundedInputStream inputStream = new BoundedInputStream(new InputStream() { + private Random random = new Random() + @Override + int read() throws IOException { + return random.nextInt() + } + }, expectedSize) + // default value of 5 MB per chunk + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024 * 1024 * 5) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-02/large/one_hundred_MB', inputStream) + then: + blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-02/large/one_hundred_MB').size == expectedSize + } + + /** + * The difference in this test beyond the 'upload 100 MB' test is that the upload will: + * + * a) result in incrementing {@link MultipartUploader#getNumberOfTimesComposeLimitHit()} and + * b) the last chunk will be significantly larger than the preceding 31. + * + * This represents a situation where the customer may need to adjust their chunk size upward. + */ + def "upload 200 MB"() { + given: + long expectedSize = 1024 * 1024 * 200 + BoundedInputStream inputStream = new BoundedInputStream(new InputStream() { + private Random random = new Random() + @Override + int read() throws IOException { + return random.nextInt() + } + }, expectedSize) + // with value of 5 MB per chunk, we'll upload 31 5 MB chunks and 1 45 MB chunk + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024 * 1024 * 5) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-02/large/two_hundred_MB', inputStream) + then: + blob.size == expectedSize + uploader.getNumberOfTimesComposeLimitHit() == 1L + storage.get(bucketName, 'vol-01/chap-02/large/two_hundred_MB').size == expectedSize + } +}