From 2ef9d19428b9778c74f19946db45a8b018b959c2 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Sun, 28 Oct 2018 17:40:53 -0500 Subject: [PATCH 01/21] feat: parallel multipart upload for blob content --- .../gcloud/internal/GoogleCloudBlobStore.java | 22 +- .../gcloud/internal/MultipartUploader.java | 205 ++++++++++++++++++ .../internal/GoogleCloudBlobStoreIT.groovy | 18 +- .../internal/GoogleCloudBlobStoreTest.groovy | 5 +- .../internal/MultipartUploaderIT.groovy | 135 ++++++++++++ 5 files changed, 373 insertions(+), 12 deletions(-) create mode 100644 src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java create mode 100644 src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java index 07dece1..bcb4c68 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java @@ -126,17 +126,21 @@ public class GoogleCloudBlobStore private final DryRunPrefix dryRunPrefix; + private final MultipartUploader multipartUploader; + @Inject public GoogleCloudBlobStore(final GoogleCloudStorageFactory storageFactory, final BlobIdLocationResolver blobIdLocationResolver, final GoogleCloudBlobStoreMetricsStore metricsStore, final GoogleCloudDatastoreFactory datastoreFactory, - final DryRunPrefix dryRunPrefix) { + final DryRunPrefix dryRunPrefix, + final MultipartUploader multipartUploader) { this.storageFactory = checkNotNull(storageFactory); this.blobIdLocationResolver = checkNotNull(blobIdLocationResolver); this.metricsStore = metricsStore; this.datastoreFactory = datastoreFactory; this.dryRunPrefix = dryRunPrefix; + this.multipartUploader = multipartUploader; } @Override @@ -173,7 +177,19 @@ public Blob create(final InputStream inputStream, final Map head return createInternal(headers, destination -> { try (InputStream data = inputStream) { MetricsInputStream input = new MetricsInputStream(data); - bucket.create(destination, input); + + multipartUploader.upload(storage, getConfiguredBucketName(), destination, input); + /* BlobInfo blobInfo = BlobInfo.newBuilder(getConfiguredBucketName(), destination).build(); + try (WriteChannel writeChannel = storage.writer(blobInfo)) { + int limit; + // 1024 vs 1048576 + byte[] buffer = new byte[1048576]; + while ((limit = input.read(buffer)) >= 0) { + writeChannel.write(ByteBuffer.wrap(buffer, 0, limit)); + } + }*/ + + //bucket.create(destination, input); return input.getMetrics(); } }); @@ -243,8 +259,6 @@ public Blob get(final BlobId blobId, final boolean includeDeleted) { return blob; } - - @Override @Guarded(by = STARTED) public boolean delete(final BlobId blobId, final String reason) { diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java new file mode 100644 index 0000000..149b3db --- /dev/null +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -0,0 +1,205 @@ +package org.sonatype.nexus.blobstore.gcloud.internal; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import javax.inject.Inject; +import javax.inject.Named; + +import org.sonatype.nexus.blobstore.api.BlobStoreException; +import org.sonatype.nexus.common.stateguard.StateGuardLifecycleSupport; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.ComposeRequest; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.io.IOUtils; + +/** + * Component that provides parallel multipart upload support for blob binary data (.bytes files). + */ +@Named +public class MultipartUploader + extends StateGuardLifecycleSupport +{ + + /** + * Use this property in 'nexus.properties' to control how large each multipart part is. Default is 5 MB. + * Smaller numbers increase the number of parallel workers used to upload a file. Match to your workload: + * if you are heavy in docker with large images, increase; if you are heavy in smaller components, decrease. + */ + public static final String CHUNK_SIZE_PROPERTY = "nexus.gcs.multipartupload.chunksize"; + + /** + * This is a hard limit on the number of components to a compose request enforced by Google Cloud Storage API. + */ + static final int COMPOSE_REQUEST_LIMIT = 32; + + /** + * While an invocation of {@link #upload(Storage, String, String, InputStream)} is in-flight, the individual + * chunks of the file will have names like 'destination.chunkPartNumber", like + * 'content/vol-01/chap-01/UUID.bytes.chunk1', 'content/vol-01/chap-01/UUID.bytes.chunk2', etc. + */ + private final String CHUNK_NAME_PART = ".chunk"; + + /** + * Used internally to count how many times we've hit the compose limit. + * Consider exposing this as a bean that can provide tuning feedback to deployers. + */ + private final AtomicLong composeLimitHit = new AtomicLong(0); + + private static final byte[] EMPTY = new byte[0]; + + private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("nexus-google-cloud-storage-multipart-upload-%d") + .build())); + + private final int chunkSize; + + @Inject + public MultipartUploader(@Named("${"+CHUNK_SIZE_PROPERTY +":-5242880}") final int chunkSize) { + this.chunkSize = chunkSize; + } + + @Override + protected void doStop() throws Exception { + executorService.shutdown(); + log.info("sent signal to shutdown multipart upload queue, waiting up to 3 minutes for termination..."); + executorService.awaitTermination(3L, TimeUnit.MINUTES); + } + + /** + * @return the value for the {@link #CHUNK_SIZE_PROPERTY} + */ + public int getChunkSize() { + return chunkSize; + } + + /** + * @return the number of times {@link #upload(Storage, String, String, InputStream)} hit the multipart-compose limit + */ + public long getNumberOfTimesComposeLimitHit() { + return composeLimitHit.get(); + } + + /** + * @param storage an initialized {@link Storage} instance + * @param bucket the name of the bucket + * @param destination the the destination (relative to the bucket) + * @param contents the stream of data to store + * @return the successfully stored {@link Blob} + * @throws BlobStoreException if any part of the upload failed + */ + public Blob upload(final Storage storage, final String bucket, final String destination, final InputStream contents) { + log.debug("Starting multipart upload for destination {} in bucket {}", destination, bucket); + // collect parts as blobids in a list? + List parts = new ArrayList<>(); + + try (InputStream current = contents) { + List> chunkFutures = new ArrayList<>(); + // MUST respect hard limit of 32 chunks per compose request + for (int partNumber = 1; partNumber <= COMPOSE_REQUEST_LIMIT; partNumber++) { + final byte[] chunk; + if (partNumber < COMPOSE_REQUEST_LIMIT) { + chunk = readChunk(current); + } + else { + log.debug("Upload for {} has hit Google Cloud Storage multipart-compose limits; " + + "consider increasing '{}' beyond current value of {}", destination, CHUNK_SIZE_PROPERTY, getChunkSize()); + // we've hit compose request limit read the rest of the stream + composeLimitHit.incrementAndGet(); + chunk = IOUtils.toByteArray(current); + } + if (chunk == EMPTY && partNumber > 1) { + break; + } + else { + final int chunkIndex = partNumber; + chunkFutures.add(executorService.submit(() -> { + log.debug("Uploading chunk {} for {} of {} bytes", chunkIndex, destination, chunk.length); + BlobInfo blobInfo = BlobInfo.newBuilder( + bucket, toChunkName(destination, chunkIndex)).build(); + return storage.create(blobInfo, chunk); + })); + } + } + + final int numberOfChunks = chunkFutures.size(); + CountDownLatch block = new CountDownLatch(1); + Futures.whenAllComplete(chunkFutures).run(() -> block.countDown() , MoreExecutors.directExecutor()); + // create list of all the chunks + List chunkBlobs = IntStream.rangeClosed(1, numberOfChunks) + .mapToObj(i -> toChunkName(destination, i)) + .collect(Collectors.toList()); + + // wait for all the futures to complete + log.debug("waiting for {} chunks to complete", chunkFutures.size()); + block.await(); + log.debug("chunk uploads completed, sending compose request"); + // finalize with compose request to coalesce the chunks + Blob finalBlob = storage.compose(ComposeRequest.of(bucket, chunkBlobs, destination)); + log.debug("Multipart upload of {} complete", destination); + + deferredCleanup(storage, bucket, chunkBlobs); + + return finalBlob; + } + catch(Exception e) { + deferredCleanup(storage, bucket, parts); + throw new BlobStoreException("Error uploading blob", e, null); + } + } + + private void deferredCleanup(final Storage storage, final String bucket, final List parts) { + executorService.submit(() -> { + parts.stream().forEach(part -> storage.delete(bucket, part)); + }); + } + + private String toChunkName(String destination, int chunkNumber) { + return destination + CHUNK_NAME_PART + chunkNumber; + } + + /** + * Read a chunk of the stream up to {@link #getChunkSize()} in length. + * + * @param input + * @return the read data as a byte array + * @throws IOException + */ + private byte[] readChunk(final InputStream input) throws IOException { + byte[] buffer = new byte[chunkSize]; + int offset = 0; + int remain = chunkSize; + int bytesRead = 0; + + while (remain > 0 && bytesRead >= 0) { + bytesRead = input.read(buffer, offset, remain); + if (bytesRead > 0) { + offset += bytesRead; + remain -= bytesRead; + } + } + if (offset > 0) { + return Arrays.copyOfRange(buffer, 0, offset); + } + else { + return EMPTY; + } + } +} diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy index bfb4fbb..a5df118 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy @@ -73,6 +73,8 @@ class GoogleCloudBlobStoreIT BlobStoreUsageChecker usageChecker = Mock() + MultipartUploader uploader = new MultipartUploader(1024) + def setup() { config.attributes = [ 'google cloud storage': [ @@ -86,7 +88,7 @@ class GoogleCloudBlobStoreIT metricsStore = new GoogleCloudBlobStoreMetricsStore(periodicJobService, nodeAccess) // can't start metrics store until blobstore init is done (which creates the bucket) blobStore = new GoogleCloudBlobStore(storageFactory, blobIdLocationResolver, metricsStore, datastoreFactory, - new DryRunPrefix("TEST ")) + new DryRunPrefix("TEST "), uploader) blobStore.init(config) blobStore.start() @@ -139,10 +141,10 @@ class GoogleCloudBlobStoreIT Storage storage = storageFactory.create(config) // mimic some RHC content, which is stored as directpath blobs // 4 files, but only 2 blobIds (a .bytes and a .properties blob for each blobId) - createFile(storage, "content/directpath/health-check/repo1/report.properties.bytes") - createFile(storage, "content/directpath/health-check/repo1/report.properties.properties") - createFile(storage, "content/directpath/health-check/repo1/details/bootstrap.min.css.properties") - createFile(storage, "content/directpath/health-check/repo1/details/bootstrap.min.css.bytes") + createFile(storage, "content/directpath/health-check/repo1/report.properties.bytes", 1024 * 2) + createFile(storage, "content/directpath/health-check/repo1/report.properties.properties", 100) + createFile(storage, "content/directpath/health-check/repo1/details/bootstrap.min.css.properties", 1024* 3) + createFile(storage, "content/directpath/health-check/repo1/details/bootstrap.min.css.bytes", 100) when: Stream stream = blobStore.getDirectPathBlobIdStream('health-check/repo1') @@ -220,8 +222,10 @@ class GoogleCloudBlobStoreIT assert blob2 != null } - def createFile(Storage storage, String path) { + def createFile(Storage storage, String path, long size) { + byte [] content = new byte[size] + new Random().nextBytes(content) storage.create(BlobInfo.newBuilder(bucketName, path).build(), - "content".bytes) + content) } } diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy index 84e2e0e..f400fda 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy @@ -51,6 +51,8 @@ class GoogleCloudBlobStoreTest Datastore datastore = Mock() + MultipartUploader uploader = new MultipartUploader(1024) + KeyFactory keyFactory = new KeyFactory("testing") def blobHeaders = [ @@ -58,7 +60,8 @@ class GoogleCloudBlobStoreTest (BlobStore.CREATED_BY_HEADER): 'admin' ] GoogleCloudBlobStore blobStore = new GoogleCloudBlobStore( - storageFactory, blobIdLocationResolver, metricsStore, datastoreFactory, new DryRunPrefix("TEST ")) + storageFactory, blobIdLocationResolver, metricsStore, datastoreFactory, new DryRunPrefix("TEST "), + uploader) def config = new BlobStoreConfiguration() diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy new file mode 100644 index 0000000..8e46725 --- /dev/null +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -0,0 +1,135 @@ +package org.sonatype.nexus.blobstore.gcloud.internal + +import java.util.stream.StreamSupport + +import org.sonatype.nexus.blobstore.api.BlobStoreConfiguration + +import com.google.cloud.storage.Blob +import com.google.cloud.storage.Blob.BlobSourceOption +import com.google.cloud.storage.BucketInfo +import com.google.cloud.storage.Storage +import groovy.util.logging.Slf4j +import org.apache.commons.io.input.BoundedInputStream +import spock.lang.Specification + +@Slf4j +class MultipartUploaderIT + extends Specification +{ + + static final BlobStoreConfiguration config = new BlobStoreConfiguration() + + static final GoogleCloudStorageFactory storageFactory = new GoogleCloudStorageFactory() + + static String bucketName = "integration-test-${UUID.randomUUID().toString()}" + + static Storage storage + + def setupSpec() { + config.attributes = [ + 'google cloud storage': [ + bucket: bucketName, + credential_file: this.getClass().getResource('/gce-credentials.json').getFile() + ] + ] + + log.info("Integration test using bucket ${bucketName}") + storage = storageFactory.create(config) + storage.create(BucketInfo.of(bucketName)) + } + + def cleanupSpec() { + //Storage storage = new GoogleCloudStorageFactory().create(config) + log.debug("Tests complete, deleting files from ${bucketName}") + // must delete all the files within the bucket before we can delete the bucket + Iterator list = storage.list(bucketName, + Storage.BlobListOption.prefix("")).iterateAll() + .iterator() + + Iterable iterable = { _ -> list } + StreamSupport.stream(iterable.spliterator(), true) + .forEach({ b -> b.delete(BlobSourceOption.generationMatch()) }) + storage.delete(bucketName) + log.info("Integration test complete, bucket ${bucketName} deleted") + } + + def "control experiment"() { + given: + long expectedSize = (1048576 * 3) + 2 + MultipartUploader uploader = new MultipartUploader(1048576) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/test', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + } + + def "zero byte file"() { + given: + long expectedSize = 0 + MultipartUploader uploader = new MultipartUploader(1024) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/zero_byte', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + } + + def "hit compose limit slightly and still successful"() { + given: + long expectedSize = (1024 * MultipartUploader.COMPOSE_REQUEST_LIMIT) + 10 + MultipartUploader uploader = new MultipartUploader(1024) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-01/composeLimitTest/small_miss', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + uploader.numberOfTimesComposeLimitHit == 1L + } + + def "hit compose limit poorly tuned, still successful" () { + given: + long expectedSize = 1048576 + MultipartUploader uploader = new MultipartUploader(1024) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-01/composeLimitTest/poor_tuning', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + uploader.numberOfTimesComposeLimitHit == 1L + } + + def "upload 100 MB"() { + given: + long expectedSize = 1024 * 1024 * 100 + BoundedInputStream inputStream = new BoundedInputStream(new InputStream() { + private Random random = new Random() + @Override + int read() throws IOException { + return random.nextInt() + } + }, expectedSize) + // default value of 5 MB per chunk + MultipartUploader uploader = new MultipartUploader(1024 * 1024 * 5) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-02/large/one_hundred_MB', inputStream) + then: + blob.size == expectedSize + } +} From b8355bef8852b6e681161a4674bfc397466cc2ab Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Tue, 30 Oct 2018 17:01:00 -0500 Subject: [PATCH 02/21] first chunk happens on the calling thread, 2nd+ in the executor --- .../gcloud/internal/MultipartUploader.java | 90 +++++++++++++------ .../internal/MultipartUploaderIT.groovy | 46 +++++++++- 2 files changed, 107 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java index 149b3db..7c7b364 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -1,3 +1,15 @@ +/* + * Sonatype Nexus (TM) Open Source Version + * Copyright (c) 2017-present Sonatype, Inc. + * All rights reserved. Includes the third-party code listed at http://links.sonatype.com/products/nexus/oss/attributions. + * + * This program and the accompanying materials are made available under the terms of the Eclipse Public License Version 1.0, + * which accompanies this distribution and is available at http://www.eclipse.org/legal/epl-v10.html. + * + * Sonatype Nexus (TM) Professional Version is available from Sonatype, Inc. "Sonatype" and "Sonatype Nexus" are trademarks + * of Sonatype, Inc. Apache Maven is a trademark of the Apache Software Foundation. M2eclipse is a trademark of the + * Eclipse Foundation. All other trademarks are the property of their respective owners. + */ package org.sonatype.nexus.blobstore.gcloud.internal; import java.io.IOException; @@ -5,12 +17,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import javax.inject.Inject; import javax.inject.Named; @@ -107,9 +118,10 @@ public long getNumberOfTimesComposeLimitHit() { */ public Blob upload(final Storage storage, final String bucket, final String destination, final InputStream contents) { log.debug("Starting multipart upload for destination {} in bucket {}", destination, bucket); - // collect parts as blobids in a list? - List parts = new ArrayList<>(); + // this must represent the bucket-relative paths to the chunks, in order of composition + List chunkNames = new ArrayList<>(); + Optional singleChunk = Optional.empty(); try (InputStream current = contents) { List> chunkFutures = new ArrayList<>(); // MUST respect hard limit of 32 chunks per compose request @@ -125,43 +137,56 @@ public Blob upload(final Storage storage, final String bucket, final String dest composeLimitHit.incrementAndGet(); chunk = IOUtils.toByteArray(current); } + if (chunk == EMPTY && partNumber > 1) { break; } - else { + + final String chunkName = toChunkName(destination, partNumber); + chunkNames.add(chunkName); + if (partNumber == 1) { + // upload the first part on the current thread + BlobInfo blobInfo = BlobInfo.newBuilder(bucket, chunkName).build(); + Blob blob = storage.create(blobInfo, chunk); + singleChunk = Optional.of(blob); + } else { + singleChunk = Optional.empty(); + // 2nd through N chunks will happen off current thread in parallel final int chunkIndex = partNumber; chunkFutures.add(executorService.submit(() -> { log.debug("Uploading chunk {} for {} of {} bytes", chunkIndex, destination, chunk.length); BlobInfo blobInfo = BlobInfo.newBuilder( - bucket, toChunkName(destination, chunkIndex)).build(); + bucket, chunkName).build(); return storage.create(blobInfo, chunk); })); } } - final int numberOfChunks = chunkFutures.size(); - CountDownLatch block = new CountDownLatch(1); - Futures.whenAllComplete(chunkFutures).run(() -> block.countDown() , MoreExecutors.directExecutor()); - // create list of all the chunks - List chunkBlobs = IntStream.rangeClosed(1, numberOfChunks) - .mapToObj(i -> toChunkName(destination, i)) - .collect(Collectors.toList()); - - // wait for all the futures to complete - log.debug("waiting for {} chunks to complete", chunkFutures.size()); - block.await(); - log.debug("chunk uploads completed, sending compose request"); - // finalize with compose request to coalesce the chunks - Blob finalBlob = storage.compose(ComposeRequest.of(bucket, chunkBlobs, destination)); - log.debug("Multipart upload of {} complete", destination); - - deferredCleanup(storage, bucket, chunkBlobs); - - return finalBlob; + // return the single result if it exists; otherwise finalize the parallel multipart workers + return singleChunk.orElseGet(() -> { + CountDownLatch block = new CountDownLatch(1); + Futures.whenAllComplete(chunkFutures).run(() -> block.countDown() , MoreExecutors.directExecutor()); + // wait for all the futures to complete + log.debug("waiting for {} remaining chunks to complete", chunkFutures.size()); + try { + block.await(); + } + catch (InterruptedException e) { + log.error("caught InterruptedException waiting for multipart upload to complete on {}", destination); + throw new RuntimeException(e); + } + log.debug("chunk uploads completed, sending compose request"); + + // finalize with compose request to coalesce the chunks + Blob finalBlob = storage.compose(ComposeRequest.of(bucket, chunkNames, destination)); + log.debug("Multipart upload of {} complete", destination); + return finalBlob; + }); } catch(Exception e) { - deferredCleanup(storage, bucket, parts); throw new BlobStoreException("Error uploading blob", e, null); + } finally { + deferredCleanup(storage, bucket, chunkNames); } } @@ -171,14 +196,25 @@ private void deferredCleanup(final Storage storage, final String bucket, final L }); } + /** + * The name of the first chunk should match the desired end destination. + * For any chunk index 2 or greater, this method will return the destination + the chunk name suffix. + * + * @param destination + * @param chunkNumber + * @return the name to store this chunk + */ private String toChunkName(String destination, int chunkNumber) { + if (chunkNumber == 1) { + return destination; + } return destination + CHUNK_NAME_PART + chunkNumber; } /** * Read a chunk of the stream up to {@link #getChunkSize()} in length. * - * @param input + * @param input the stream to read * @return the read data as a byte array * @throws IOException */ diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy index 8e46725..e12f532 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -1,3 +1,15 @@ +/* + * Sonatype Nexus (TM) Open Source Version + * Copyright (c) 2017-present Sonatype, Inc. + * All rights reserved. Includes the third-party code listed at http://links.sonatype.com/products/nexus/oss/attributions. + * + * This program and the accompanying materials are made available under the terms of the Eclipse Public License Version 1.0, + * which accompanies this distribution and is available at http://www.eclipse.org/legal/epl-v10.html. + * + * Sonatype Nexus (TM) Professional Version is available from Sonatype, Inc. "Sonatype" and "Sonatype Nexus" are trademarks + * of Sonatype, Inc. Apache Maven is a trademark of the Apache Software Foundation. M2eclipse is a trademark of the + * Eclipse Foundation. All other trademarks are the property of their respective owners. + */ package org.sonatype.nexus.blobstore.gcloud.internal import java.util.stream.StreamSupport @@ -53,7 +65,7 @@ class MultipartUploaderIT log.info("Integration test complete, bucket ${bucketName} deleted") } - def "control experiment"() { + def "simple multipart"() { given: long expectedSize = (1048576 * 3) + 2 MultipartUploader uploader = new MultipartUploader(1048576) @@ -61,7 +73,37 @@ class MultipartUploaderIT new Random().nextBytes(data) when: - Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/test', new ByteArrayInputStream(data)) + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/multi_part', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + } + + def "confirm parts composed in order"() { + given: + // 5 each of abcdefg + final String content = "aaaaabbbbbcccccdddddeeeeefffffggggg" + byte[] data = content.bytes + MultipartUploader uploader = new MultipartUploader(5) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/in_order', new ByteArrayInputStream(data)) + + then: + blob.size == data.length + Blob readback = storage.get(blob.blobId) + readback.getContent() == content.bytes + } + + def "single part"() { + given: + long expectedSize = 1048575 + MultipartUploader uploader = new MultipartUploader(1048576) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/single_part', new ByteArrayInputStream(data)) then: blob.size == expectedSize From a32441c467deb05ee1220a9c8b3e5b0281ba34fc Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Tue, 30 Oct 2018 17:21:18 -0500 Subject: [PATCH 03/21] add missing mock behavior for 'store a blob' test --- .../blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy index f400fda..fe7ec0c 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy @@ -134,6 +134,7 @@ class GoogleCloudBlobStoreTest storage.get('mybucket') >> bucket blobStore.init(config) blobStore.doStart() + storage.create(_, _) >> mockGoogleObject(tempFileBytes) when: 'call create' Blob blob = blobStore.create(new ByteArrayInputStream('hello world'.bytes), blobHeaders) From 53e777a10ce59e9f28440413e83de2c92437f521 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Thu, 15 Nov 2018 13:00:22 -0600 Subject: [PATCH 04/21] wip ADR document --- design/multipart_parallel_upload.md | 36 +++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 design/multipart_parallel_upload.md diff --git a/design/multipart_parallel_upload.md b/design/multipart_parallel_upload.md new file mode 100644 index 0000000..bec7273 --- /dev/null +++ b/design/multipart_parallel_upload.md @@ -0,0 +1,36 @@ +# Multipart Parallel Upload Design + +This document is intended to capture the context around the component within this blobstore that provides +Parallel Multipart Upload: + +[MultipartUploader.java](../src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java) + +## Context and Idea + +gsutil concept: [Parallel Composite Uploads](https://cloud.google.com/storage/docs/gsutil/commands/cp#parallel-composite-uploads) + +The google-cloud-storage library for Java this blobstore is built with does not have baked in mechanism for parallel composite uploads. +The storage#create function is synchronous. Fine for some workloads + +It does support however, the Compose request in the API. +This module then implements this client using the Storage request and Compose request functions. + +## Implementation + +* Don't have content length in the API, have a stream. +* GCS compose method has a hard limit of 32. Since we don't have the length, we can't +split into 32 equal parts. We can do 31 chunks of a chunkSize parameter, then 1 chunk of the rest. +* Exposes the "composeLimitHit" field +* For files smaller than the chunk size, we don't pay the cost of shipping the upload request to a thread. + * The first chunk is written at the expected destination path. + * If we've read chunkSize, and there is still more data on the stream, schedule the 2nd through final chunks off thread +* A blob write still waits until completion of parallel uploads. This is an important characteristic of all BlobStores; +Nexus Repository Manager expects consistency and not deferred, eventual consistency. + +## Tuning + +* debug logging for `org.sonatype.nexus.blobstore.gcloud.internal.MultipartUploader` +* 5 MB default and 32 chunk compose limit means optimal threads utilization will be in place for files between 10 MB +and 160 MB in size. +* When to increase? When you have fewer CPUs, and larger files. +* To effectively disable parallel uploads, you could put max int in. Will upload on the request thread From f98f5a5e36ca87e81fd8fd4cd1ac8f93242f14fe Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Fri, 4 Jan 2019 15:26:32 -0600 Subject: [PATCH 05/21] fix: no longer including first chunk in names passed to cleanup Previously, a MPU would succeed in writing all the chunks and composing into the desired destination. The chunkNames argument however included the final destination name (as it is used for the "first" chunk); when deferredCleanup would finally complete the destination file would also be deleted. This corrects the behavior by excluding the first chunk name in deferredCleanup via a filter on the argument. This also includes a fix for a possible OutOfMemoryError condition being raised with blobs that have a really large final chunk. We avoid the issue by avoiding a call to readChunk (and a Arrays.copyOfRange on the chunk) for the final chunk. Instead, we pass the remaining InputStream to the storage client, with the unfortunate side effect of calling a deprecated method. Per the docs, the storage#create method is deprecated because it is not retryable. That means we may have failed uploads where the first 31 chunks succeed, and the last fails without retry; that highlights the importance of the tuning option and the log messaging alerting of the issue. Also includes: * an integration test to mimic storage facet's write temp and move behavior * new tests on the uploader with larger blobs * increases the maxConnectionsPerRoute to equal maxTotalConnections --- pom.xml | 6 +++ .../internal/AbstractGoogleClientFactory.java | 5 +- .../gcloud/internal/MultipartUploader.java | 48 ++++++++++++++++--- .../internal/GoogleCloudBlobStoreIT.groovy | 43 ++++++++++++++++- .../internal/MultipartUploaderIT.groovy | 40 ++++++++++++++++ 5 files changed, 131 insertions(+), 11 deletions(-) diff --git a/pom.xml b/pom.xml index fa57458..bf685ca 100644 --- a/pom.xml +++ b/pom.xml @@ -76,6 +76,12 @@ test + + org.sonatype.nexus + nexus-repository + test + + cglib cglib-nodep diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/AbstractGoogleClientFactory.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/AbstractGoogleClientFactory.java index c588573..d62aca1 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/AbstractGoogleClientFactory.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/AbstractGoogleClientFactory.java @@ -16,9 +16,6 @@ import java.io.IOException; import java.io.Reader; import java.net.ProxySelector; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Paths; import com.google.api.client.http.apache.ApacheHttpTransport; import com.google.cloud.TransportOptions; @@ -79,7 +76,7 @@ HttpParams newDefaultHttpParams() { HttpConnectionParams.setStaleCheckingEnabled(params, true); HttpConnectionParams.setSocketBufferSize(params, 8192); ConnManagerParams.setMaxTotalConnections(params, 200); - ConnManagerParams.setMaxConnectionsPerRoute(params, new ConnPerRouteBean(20)); + ConnManagerParams.setMaxConnectionsPerRoute(params, new ConnPerRouteBean(200)); return params; } diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java index 7c7b364..153379a 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -12,6 +12,7 @@ */ package org.sonatype.nexus.blobstore.gcloud.internal; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -38,7 +39,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.commons.io.IOUtils; /** * Component that provides parallel multipart upload support for blob binary data (.bytes files). @@ -75,6 +75,8 @@ public class MultipartUploader private static final byte[] EMPTY = new byte[0]; + private static final InputStream EMPTY_STREAM = new ByteArrayInputStream(EMPTY); + private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator( Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("nexus-google-cloud-storage-multipart-upload-%d") @@ -135,7 +137,17 @@ public Blob upload(final Storage storage, final String bucket, final String dest "consider increasing '{}' beyond current value of {}", destination, CHUNK_SIZE_PROPERTY, getChunkSize()); // we've hit compose request limit read the rest of the stream composeLimitHit.incrementAndGet(); - chunk = IOUtils.toByteArray(current); + chunk = EMPTY; + + final String finalChunkName = toChunkName(destination, COMPOSE_REQUEST_LIMIT); + chunkNames.add(finalChunkName); + chunkFutures.add(executorService.submit(() -> { + log.debug("Uploading final chunk {} for {} of unknown remaining bytes", COMPOSE_REQUEST_LIMIT, destination); + BlobInfo blobInfo = BlobInfo.newBuilder( + bucket, finalChunkName).build(); + // read the rest of the current stream + return storage.create(blobInfo, current); + })); } if (chunk == EMPTY && partNumber > 1) { @@ -144,6 +156,7 @@ public Blob upload(final Storage storage, final String bucket, final String dest final String chunkName = toChunkName(destination, partNumber); chunkNames.add(chunkName); + if (partNumber == 1) { // upload the first part on the current thread BlobInfo blobInfo = BlobInfo.newBuilder(bucket, chunkName).build(); @@ -186,14 +199,16 @@ public Blob upload(final Storage storage, final String bucket, final String dest catch(Exception e) { throw new BlobStoreException("Error uploading blob", e, null); } finally { + // remove any .chunkN files off-thread + // make sure not to delete the first chunk (which has the desired destination name with no suffix) deferredCleanup(storage, bucket, chunkNames); } } - private void deferredCleanup(final Storage storage, final String bucket, final List parts) { - executorService.submit(() -> { - parts.stream().forEach(part -> storage.delete(bucket, part)); - }); + private void deferredCleanup(final Storage storage, final String bucket, final List chunkNames) { + executorService.submit(() -> chunkNames.stream() + .filter(part -> part.contains(CHUNK_NAME_PART)) + .forEach(chunk -> storage.delete(bucket, chunk))); } /** @@ -238,4 +253,25 @@ private byte[] readChunk(final InputStream input) throws IOException { return EMPTY; } } + + private InputStream streamChunk(final InputStream input) throws IOException { + byte[] buffer = new byte[chunkSize]; + int offset = 0; + int remain = chunkSize; + int bytesRead = 0; + + while (remain > 0 && bytesRead >= 0) { + bytesRead = input.read(buffer, offset, remain); + if (bytesRead > 0) { + offset += bytesRead; + remain -= bytesRead; + } + } + if (offset > 0) { + return new ByteArrayInputStream(buffer, 0, offset); + } + else { + return EMPTY_STREAM; + } + } } diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy index a5df118..0a9c968 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy @@ -26,12 +26,19 @@ import org.sonatype.nexus.blobstore.api.BlobId import org.sonatype.nexus.blobstore.api.BlobStore import org.sonatype.nexus.blobstore.api.BlobStoreConfiguration import org.sonatype.nexus.blobstore.api.BlobStoreUsageChecker +import org.sonatype.nexus.common.hash.HashAlgorithm +import org.sonatype.nexus.common.hash.MultiHashingInputStream import org.sonatype.nexus.common.log.DryRunPrefix import org.sonatype.nexus.common.node.NodeAccess +import org.sonatype.nexus.repository.storage.TempBlob import com.google.cloud.storage.Blob.BlobSourceOption import com.google.cloud.storage.BlobInfo import com.google.cloud.storage.Storage +import com.google.common.collect.ImmutableList +import com.google.common.collect.ImmutableMap +import com.google.common.collect.Maps +import com.google.common.hash.Hashing import org.slf4j.Logger import org.slf4j.LoggerFactory import spock.lang.Specification @@ -75,6 +82,10 @@ class GoogleCloudBlobStoreIT MultipartUploader uploader = new MultipartUploader(1024) + def hashAlgorithms = ImmutableList.of( + new HashAlgorithm("sha1", Hashing.sha1()), + new HashAlgorithm("md5", Hashing.md5())) + def setup() { config.attributes = [ 'google cloud storage': [ @@ -213,7 +224,7 @@ class GoogleCloudBlobStoreIT (BlobStore.CREATED_BY_HEADER): 'someuser' ] ) assert blob != null // sit for at least the time on our keep alives, so that any held connections close - log.info("waiting for ${(KEEP_ALIVE_DURATION + 1000L) / 1000L} seconds any stale connections to close") + log.info("waiting for ${(KEEP_ALIVE_DURATION + 1000L) / 1000L} seconds so any stale connections close") sleep(KEEP_ALIVE_DURATION + 1000L) Blob blob2 = blobStore.create(new ByteArrayInputStream('hello'.getBytes()), @@ -222,6 +233,36 @@ class GoogleCloudBlobStoreIT assert blob2 != null } + def "mimic storage facet write-temp-and-move"() { + given: + def expectedSize = 2048 + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + def headers = ImmutableMap.of( + BlobStore.BLOB_NAME_HEADER, "temp", + BlobStore.CREATED_BY_HEADER, "system", + BlobStore.CREATED_BY_IP_HEADER, "system", + BlobStore.TEMPORARY_BLOB_HEADER, "") + + expect: + // write tempBlob + MultiHashingInputStream hashingStream = new MultiHashingInputStream(hashAlgorithms, + new ByteArrayInputStream(data)) + Blob blob = blobStore.create(hashingStream, headers) + TempBlob tempBlob = new TempBlob(blob, hashingStream.hashes(), true, blobStore) + + assert tempBlob != null + assert tempBlob.blob.id.toString().startsWith('tmp$') + // put the tempBlob into the final location + Map filtered = Maps.filterKeys(headers, { k -> !k.equals(BlobStore.TEMPORARY_BLOB_HEADER) }) + Blob result = blobStore.copy(tempBlob.blob.id, filtered) + // close the tempBlob (results in deleteHard on the tempBlob) + tempBlob.close() + + Blob retrieve = blobStore.get(result.getId()) + assert retrieve != null + } + def createFile(Storage storage, String path, long size) { byte [] content = new byte[size] new Random().nextBytes(content) diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy index e12f532..37ce8f6 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -77,6 +77,7 @@ class MultipartUploaderIT then: blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-01/control/multi_part').getContent() == data } def "confirm parts composed in order"() { @@ -107,6 +108,7 @@ class MultipartUploaderIT then: blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-01/control/single_part').getContent() == data } def "zero byte file"() { @@ -121,6 +123,7 @@ class MultipartUploaderIT then: blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-01/control/zero_byte').getContent() == data } def "hit compose limit slightly and still successful"() { @@ -137,6 +140,7 @@ class MultipartUploaderIT then: blob.size == expectedSize uploader.numberOfTimesComposeLimitHit == 1L + storage.get(bucketName, 'vol-01/chap-01/composeLimitTest/small_miss').getContent() == data } def "hit compose limit poorly tuned, still successful" () { @@ -153,8 +157,12 @@ class MultipartUploaderIT then: blob.size == expectedSize uploader.numberOfTimesComposeLimitHit == 1L + storage.get(bucketName, 'vol-01/chap-01/composeLimitTest/poor_tuning').getContent() == data } + /** + * Larger upload that still fits ideally within our default tuning. 100 MB will result in 20 5 MB chunks. + */ def "upload 100 MB"() { given: long expectedSize = 1024 * 1024 * 100 @@ -173,5 +181,37 @@ class MultipartUploaderIT 'vol-01/chap-02/large/one_hundred_MB', inputStream) then: blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-02/large/one_hundred_MB').size == expectedSize + } + + /** + * The difference in this test beyond the 'upload 100 MB' test is that the upload will: + * + * a) result in incrementing {@link MultipartUploader#getNumberOfTimesComposeLimitHit()} and + * b) the last chunk will be significantly larger than the preceding 31. + * + * This represents a situation where the customer may need to adjust their chunk size upward. The larger final chunk + * may also elicit runtime pressure on memory. + */ + def "upload 300 MB"() { + given: + long expectedSize = 1024 * 1024 * 300 + BoundedInputStream inputStream = new BoundedInputStream(new InputStream() { + private Random random = new Random() + @Override + int read() throws IOException { + return random.nextInt() + } + }, expectedSize) + // with value of 5 MB per chunk, we'll upload 31 5 MB chunks and 1 145 MB chunk + MultipartUploader uploader = new MultipartUploader(1024 * 1024 * 5) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-02/large/three_hundred_MB', inputStream) + then: + blob.size == expectedSize + uploader.getNumberOfTimesComposeLimitHit() == 1L + storage.get(bucketName, 'vol-01/chap-02/large/three_hundred_MB').size == expectedSize } } From 8734395bf24ed035195a704f358e29fc85fc47ff Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Thu, 10 Jan 2019 09:48:44 -0600 Subject: [PATCH 06/21] remove unused method --- .../gcloud/internal/MultipartUploader.java | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java index 153379a..d40f10d 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -253,25 +253,4 @@ private byte[] readChunk(final InputStream input) throws IOException { return EMPTY; } } - - private InputStream streamChunk(final InputStream input) throws IOException { - byte[] buffer = new byte[chunkSize]; - int offset = 0; - int remain = chunkSize; - int bytesRead = 0; - - while (remain > 0 && bytesRead >= 0) { - bytesRead = input.read(buffer, offset, remain); - if (bytesRead > 0) { - offset += bytesRead; - remain -= bytesRead; - } - } - if (offset > 0) { - return new ByteArrayInputStream(buffer, 0, offset); - } - else { - return EMPTY_STREAM; - } - } } From f2803ab864db57b3ee8d1604860e564ba1fd50c7 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Thu, 10 Jan 2019 16:12:03 -0600 Subject: [PATCH 07/21] remove unused code --- .../gcloud/internal/GoogleCloudBlobStore.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java index bcb4c68..4a406c5 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java @@ -179,17 +179,6 @@ public Blob create(final InputStream inputStream, final Map head MetricsInputStream input = new MetricsInputStream(data); multipartUploader.upload(storage, getConfiguredBucketName(), destination, input); - /* BlobInfo blobInfo = BlobInfo.newBuilder(getConfiguredBucketName(), destination).build(); - try (WriteChannel writeChannel = storage.writer(blobInfo)) { - int limit; - // 1024 vs 1048576 - byte[] buffer = new byte[1048576]; - while ((limit = input.read(buffer)) >= 0) { - writeChannel.write(ByteBuffer.wrap(buffer, 0, limit)); - } - }*/ - - //bucket.create(destination, input); return input.getMetrics(); } }); From 77a323258ba864b4517ee6ce80e252ab144719e7 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Sun, 28 Oct 2018 17:40:53 -0500 Subject: [PATCH 08/21] feat: parallel multipart upload for blob content --- .../gcloud/internal/GoogleCloudBlobStore.java | 25 ++- .../gcloud/internal/MultipartUploader.java | 205 ++++++++++++++++++ .../internal/GoogleCloudBlobStoreIT.groovy | 15 +- .../internal/GoogleCloudBlobStoreTest.groovy | 6 +- .../internal/MultipartUploaderIT.groovy | 135 ++++++++++++ 5 files changed, 377 insertions(+), 9 deletions(-) create mode 100644 src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java create mode 100644 src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java index 01258ab..3a8b14b 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java @@ -126,23 +126,26 @@ public class GoogleCloudBlobStore private MetricRegistry metricRegistry; + private final MultipartUploader multipartUploader; + private PeriodicJob quotaCheckingJob; - + @Inject public GoogleCloudBlobStore(final GoogleCloudStorageFactory storageFactory, final BlobIdLocationResolver blobIdLocationResolver, final GoogleCloudDatastoreFactory datastoreFactory, final PeriodicJobService periodicJobService, final DryRunPrefix dryRunPrefix, + final MultipartUploader multipartUploader, final MetricRegistry metricRegistry, final BlobStoreQuotaService quotaService, @Named("${nexus.blobstore.quota.warnIntervalSeconds:-60}") - final int quotaCheckInterval) - { + final int quotaCheckInterval) { super(blobIdLocationResolver, dryRunPrefix); this.storageFactory = checkNotNull(storageFactory); this.metricsStore = new ShardedCounterMetricsStore(blobIdLocationResolver, datastoreFactory, periodicJobService); this.datastoreFactory = datastoreFactory; + this.multipartUploader = multipartUploader; this.metricRegistry = metricRegistry; this.quotaCheckingJob = periodicJobService.schedule(createQuotaCheckJob(this, quotaService, log), quotaCheckInterval); @@ -163,7 +166,7 @@ protected void doStart() throws Exception { metadata.store(); } liveBlobs = CacheBuilder.newBuilder().weakValues().recordStats().build(from(GoogleCloudStorageBlob::new)); - + wrapWithGauge("liveBlobsCache.size", () -> liveBlobs.size()); wrapWithGauge("liveBlobsCache.hitCount", () -> liveBlobs.stats().hitCount()); wrapWithGauge("liveBlobsCache.missCount", () -> liveBlobs.stats().missCount()); @@ -195,7 +198,19 @@ protected Blob doCreate(final InputStream blobData, return createInternal(headers, destination -> { try (InputStream data = blobData) { MetricsInputStream input = new MetricsInputStream(data); - bucket.create(destination, input); + + multipartUploader.upload(storage, getConfiguredBucketName(), destination, input); + /* BlobInfo blobInfo = BlobInfo.newBuilder(getConfiguredBucketName(), destination).build(); + try (WriteChannel writeChannel = storage.writer(blobInfo)) { + int limit; + // 1024 vs 1048576 + byte[] buffer = new byte[1048576]; + while ((limit = input.read(buffer)) >= 0) { + writeChannel.write(ByteBuffer.wrap(buffer, 0, limit)); + } + }*/ + + //bucket.create(destination, input); return input.getMetrics(); } }, blobId); diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java new file mode 100644 index 0000000..149b3db --- /dev/null +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -0,0 +1,205 @@ +package org.sonatype.nexus.blobstore.gcloud.internal; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import javax.inject.Inject; +import javax.inject.Named; + +import org.sonatype.nexus.blobstore.api.BlobStoreException; +import org.sonatype.nexus.common.stateguard.StateGuardLifecycleSupport; + +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.ComposeRequest; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.commons.io.IOUtils; + +/** + * Component that provides parallel multipart upload support for blob binary data (.bytes files). + */ +@Named +public class MultipartUploader + extends StateGuardLifecycleSupport +{ + + /** + * Use this property in 'nexus.properties' to control how large each multipart part is. Default is 5 MB. + * Smaller numbers increase the number of parallel workers used to upload a file. Match to your workload: + * if you are heavy in docker with large images, increase; if you are heavy in smaller components, decrease. + */ + public static final String CHUNK_SIZE_PROPERTY = "nexus.gcs.multipartupload.chunksize"; + + /** + * This is a hard limit on the number of components to a compose request enforced by Google Cloud Storage API. + */ + static final int COMPOSE_REQUEST_LIMIT = 32; + + /** + * While an invocation of {@link #upload(Storage, String, String, InputStream)} is in-flight, the individual + * chunks of the file will have names like 'destination.chunkPartNumber", like + * 'content/vol-01/chap-01/UUID.bytes.chunk1', 'content/vol-01/chap-01/UUID.bytes.chunk2', etc. + */ + private final String CHUNK_NAME_PART = ".chunk"; + + /** + * Used internally to count how many times we've hit the compose limit. + * Consider exposing this as a bean that can provide tuning feedback to deployers. + */ + private final AtomicLong composeLimitHit = new AtomicLong(0); + + private static final byte[] EMPTY = new byte[0]; + + private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + new ThreadFactoryBuilder().setNameFormat("nexus-google-cloud-storage-multipart-upload-%d") + .build())); + + private final int chunkSize; + + @Inject + public MultipartUploader(@Named("${"+CHUNK_SIZE_PROPERTY +":-5242880}") final int chunkSize) { + this.chunkSize = chunkSize; + } + + @Override + protected void doStop() throws Exception { + executorService.shutdown(); + log.info("sent signal to shutdown multipart upload queue, waiting up to 3 minutes for termination..."); + executorService.awaitTermination(3L, TimeUnit.MINUTES); + } + + /** + * @return the value for the {@link #CHUNK_SIZE_PROPERTY} + */ + public int getChunkSize() { + return chunkSize; + } + + /** + * @return the number of times {@link #upload(Storage, String, String, InputStream)} hit the multipart-compose limit + */ + public long getNumberOfTimesComposeLimitHit() { + return composeLimitHit.get(); + } + + /** + * @param storage an initialized {@link Storage} instance + * @param bucket the name of the bucket + * @param destination the the destination (relative to the bucket) + * @param contents the stream of data to store + * @return the successfully stored {@link Blob} + * @throws BlobStoreException if any part of the upload failed + */ + public Blob upload(final Storage storage, final String bucket, final String destination, final InputStream contents) { + log.debug("Starting multipart upload for destination {} in bucket {}", destination, bucket); + // collect parts as blobids in a list? + List parts = new ArrayList<>(); + + try (InputStream current = contents) { + List> chunkFutures = new ArrayList<>(); + // MUST respect hard limit of 32 chunks per compose request + for (int partNumber = 1; partNumber <= COMPOSE_REQUEST_LIMIT; partNumber++) { + final byte[] chunk; + if (partNumber < COMPOSE_REQUEST_LIMIT) { + chunk = readChunk(current); + } + else { + log.debug("Upload for {} has hit Google Cloud Storage multipart-compose limits; " + + "consider increasing '{}' beyond current value of {}", destination, CHUNK_SIZE_PROPERTY, getChunkSize()); + // we've hit compose request limit read the rest of the stream + composeLimitHit.incrementAndGet(); + chunk = IOUtils.toByteArray(current); + } + if (chunk == EMPTY && partNumber > 1) { + break; + } + else { + final int chunkIndex = partNumber; + chunkFutures.add(executorService.submit(() -> { + log.debug("Uploading chunk {} for {} of {} bytes", chunkIndex, destination, chunk.length); + BlobInfo blobInfo = BlobInfo.newBuilder( + bucket, toChunkName(destination, chunkIndex)).build(); + return storage.create(blobInfo, chunk); + })); + } + } + + final int numberOfChunks = chunkFutures.size(); + CountDownLatch block = new CountDownLatch(1); + Futures.whenAllComplete(chunkFutures).run(() -> block.countDown() , MoreExecutors.directExecutor()); + // create list of all the chunks + List chunkBlobs = IntStream.rangeClosed(1, numberOfChunks) + .mapToObj(i -> toChunkName(destination, i)) + .collect(Collectors.toList()); + + // wait for all the futures to complete + log.debug("waiting for {} chunks to complete", chunkFutures.size()); + block.await(); + log.debug("chunk uploads completed, sending compose request"); + // finalize with compose request to coalesce the chunks + Blob finalBlob = storage.compose(ComposeRequest.of(bucket, chunkBlobs, destination)); + log.debug("Multipart upload of {} complete", destination); + + deferredCleanup(storage, bucket, chunkBlobs); + + return finalBlob; + } + catch(Exception e) { + deferredCleanup(storage, bucket, parts); + throw new BlobStoreException("Error uploading blob", e, null); + } + } + + private void deferredCleanup(final Storage storage, final String bucket, final List parts) { + executorService.submit(() -> { + parts.stream().forEach(part -> storage.delete(bucket, part)); + }); + } + + private String toChunkName(String destination, int chunkNumber) { + return destination + CHUNK_NAME_PART + chunkNumber; + } + + /** + * Read a chunk of the stream up to {@link #getChunkSize()} in length. + * + * @param input + * @return the read data as a byte array + * @throws IOException + */ + private byte[] readChunk(final InputStream input) throws IOException { + byte[] buffer = new byte[chunkSize]; + int offset = 0; + int remain = chunkSize; + int bytesRead = 0; + + while (remain > 0 && bytesRead >= 0) { + bytesRead = input.read(buffer, offset, remain); + if (bytesRead > 0) { + offset += bytesRead; + remain -= bytesRead; + } + } + if (offset > 0) { + return Arrays.copyOfRange(buffer, 0, offset); + } + else { + return EMPTY; + } + } +} diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy index f729f32..3f68c72 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy @@ -90,6 +90,8 @@ class GoogleCloudBlobStoreIT BlobStoreUsageChecker usageChecker = Mock() + MultipartUploader uploader = new MultipartUploader(1024) + def setup() { quotaService = new BlobStoreQuotaServiceImpl([ (SpaceUsedQuota.ID): new SpaceUsedQuota() @@ -100,7 +102,7 @@ class GoogleCloudBlobStoreIT log.info("Integration test using bucket ${bucketName}") blobStore = new GoogleCloudBlobStore(storageFactory, blobIdLocationResolver, datastoreFactory, - periodicJobService, new DryRunPrefix("TEST "), metricRegistry, quotaService, 60) + periodicJobService, new DryRunPrefix("TEST "), uploader, metricRegistry, quotaService, 60) blobStore.init(config) blobStore.start() @@ -335,7 +337,7 @@ class GoogleCloudBlobStoreIT def bucket2 = "multi-tenancy-test-${uid}" def config2 = makeConfig("multi-tenant-test-${uid}", bucket2) def blobStore2 = new GoogleCloudBlobStore(storageFactory, blobIdLocationResolver, datastoreFactory, - periodicJobService, new DryRunPrefix("TEST "), metricRegistry, quotaService, 60) + periodicJobService, new DryRunPrefix("TEST "), uploader, metricRegistry, quotaService, 60) blobStore2.init(config2) blobStore2.start() @@ -416,4 +418,13 @@ class GoogleCloudBlobStoreIT storage.delete(bucket) log.info("bucket ${bucket} deleted") } + + def createFile(Storage storage, String path, long size) { + byte [] content = new byte[size] + new Random().nextBytes(content) + storage.create(BlobInfo.newBuilder(bucketName, path).build(), + content) + } + + } diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy index 0b2016e..340f4e2 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy @@ -60,6 +60,8 @@ class GoogleCloudBlobStoreTest Datastore datastore = Mock() + MultipartUploader uploader = new MultipartUploader(1024) + BlobStoreQuotaService quotaService = Mock() KeyFactory keyFactory = new KeyFactory("testing") @@ -69,8 +71,8 @@ class GoogleCloudBlobStoreTest (BlobStore.CREATED_BY_HEADER): 'admin' ] GoogleCloudBlobStore blobStore = new GoogleCloudBlobStore( - storageFactory, blobIdLocationResolver, datastoreFactory, periodicJobService, new DryRunPrefix("TEST "), - metricRegistry, quotaService, 60) + storageFactory, blobIdLocationResolver, metricsStore, datastoreFactory, new DryRunPrefix("TEST "), + uploader, metricRegistry, quotaService, 60) def config = new BlobStoreConfiguration() diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy new file mode 100644 index 0000000..8e46725 --- /dev/null +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -0,0 +1,135 @@ +package org.sonatype.nexus.blobstore.gcloud.internal + +import java.util.stream.StreamSupport + +import org.sonatype.nexus.blobstore.api.BlobStoreConfiguration + +import com.google.cloud.storage.Blob +import com.google.cloud.storage.Blob.BlobSourceOption +import com.google.cloud.storage.BucketInfo +import com.google.cloud.storage.Storage +import groovy.util.logging.Slf4j +import org.apache.commons.io.input.BoundedInputStream +import spock.lang.Specification + +@Slf4j +class MultipartUploaderIT + extends Specification +{ + + static final BlobStoreConfiguration config = new BlobStoreConfiguration() + + static final GoogleCloudStorageFactory storageFactory = new GoogleCloudStorageFactory() + + static String bucketName = "integration-test-${UUID.randomUUID().toString()}" + + static Storage storage + + def setupSpec() { + config.attributes = [ + 'google cloud storage': [ + bucket: bucketName, + credential_file: this.getClass().getResource('/gce-credentials.json').getFile() + ] + ] + + log.info("Integration test using bucket ${bucketName}") + storage = storageFactory.create(config) + storage.create(BucketInfo.of(bucketName)) + } + + def cleanupSpec() { + //Storage storage = new GoogleCloudStorageFactory().create(config) + log.debug("Tests complete, deleting files from ${bucketName}") + // must delete all the files within the bucket before we can delete the bucket + Iterator list = storage.list(bucketName, + Storage.BlobListOption.prefix("")).iterateAll() + .iterator() + + Iterable iterable = { _ -> list } + StreamSupport.stream(iterable.spliterator(), true) + .forEach({ b -> b.delete(BlobSourceOption.generationMatch()) }) + storage.delete(bucketName) + log.info("Integration test complete, bucket ${bucketName} deleted") + } + + def "control experiment"() { + given: + long expectedSize = (1048576 * 3) + 2 + MultipartUploader uploader = new MultipartUploader(1048576) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/test', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + } + + def "zero byte file"() { + given: + long expectedSize = 0 + MultipartUploader uploader = new MultipartUploader(1024) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/zero_byte', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + } + + def "hit compose limit slightly and still successful"() { + given: + long expectedSize = (1024 * MultipartUploader.COMPOSE_REQUEST_LIMIT) + 10 + MultipartUploader uploader = new MultipartUploader(1024) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-01/composeLimitTest/small_miss', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + uploader.numberOfTimesComposeLimitHit == 1L + } + + def "hit compose limit poorly tuned, still successful" () { + given: + long expectedSize = 1048576 + MultipartUploader uploader = new MultipartUploader(1024) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-01/composeLimitTest/poor_tuning', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + uploader.numberOfTimesComposeLimitHit == 1L + } + + def "upload 100 MB"() { + given: + long expectedSize = 1024 * 1024 * 100 + BoundedInputStream inputStream = new BoundedInputStream(new InputStream() { + private Random random = new Random() + @Override + int read() throws IOException { + return random.nextInt() + } + }, expectedSize) + // default value of 5 MB per chunk + MultipartUploader uploader = new MultipartUploader(1024 * 1024 * 5) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-02/large/one_hundred_MB', inputStream) + then: + blob.size == expectedSize + } +} From f8fe68bf3c08826616132d0ecdc062e3a2efd51b Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Tue, 30 Oct 2018 17:01:00 -0500 Subject: [PATCH 09/21] first chunk happens on the calling thread, 2nd+ in the executor --- .../gcloud/internal/MultipartUploader.java | 90 +++++++++++++------ .../internal/MultipartUploaderIT.groovy | 46 +++++++++- 2 files changed, 107 insertions(+), 29 deletions(-) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java index 149b3db..7c7b364 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -1,3 +1,15 @@ +/* + * Sonatype Nexus (TM) Open Source Version + * Copyright (c) 2017-present Sonatype, Inc. + * All rights reserved. Includes the third-party code listed at http://links.sonatype.com/products/nexus/oss/attributions. + * + * This program and the accompanying materials are made available under the terms of the Eclipse Public License Version 1.0, + * which accompanies this distribution and is available at http://www.eclipse.org/legal/epl-v10.html. + * + * Sonatype Nexus (TM) Professional Version is available from Sonatype, Inc. "Sonatype" and "Sonatype Nexus" are trademarks + * of Sonatype, Inc. Apache Maven is a trademark of the Apache Software Foundation. M2eclipse is a trademark of the + * Eclipse Foundation. All other trademarks are the property of their respective owners. + */ package org.sonatype.nexus.blobstore.gcloud.internal; import java.io.IOException; @@ -5,12 +17,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import javax.inject.Inject; import javax.inject.Named; @@ -107,9 +118,10 @@ public long getNumberOfTimesComposeLimitHit() { */ public Blob upload(final Storage storage, final String bucket, final String destination, final InputStream contents) { log.debug("Starting multipart upload for destination {} in bucket {}", destination, bucket); - // collect parts as blobids in a list? - List parts = new ArrayList<>(); + // this must represent the bucket-relative paths to the chunks, in order of composition + List chunkNames = new ArrayList<>(); + Optional singleChunk = Optional.empty(); try (InputStream current = contents) { List> chunkFutures = new ArrayList<>(); // MUST respect hard limit of 32 chunks per compose request @@ -125,43 +137,56 @@ public Blob upload(final Storage storage, final String bucket, final String dest composeLimitHit.incrementAndGet(); chunk = IOUtils.toByteArray(current); } + if (chunk == EMPTY && partNumber > 1) { break; } - else { + + final String chunkName = toChunkName(destination, partNumber); + chunkNames.add(chunkName); + if (partNumber == 1) { + // upload the first part on the current thread + BlobInfo blobInfo = BlobInfo.newBuilder(bucket, chunkName).build(); + Blob blob = storage.create(blobInfo, chunk); + singleChunk = Optional.of(blob); + } else { + singleChunk = Optional.empty(); + // 2nd through N chunks will happen off current thread in parallel final int chunkIndex = partNumber; chunkFutures.add(executorService.submit(() -> { log.debug("Uploading chunk {} for {} of {} bytes", chunkIndex, destination, chunk.length); BlobInfo blobInfo = BlobInfo.newBuilder( - bucket, toChunkName(destination, chunkIndex)).build(); + bucket, chunkName).build(); return storage.create(blobInfo, chunk); })); } } - final int numberOfChunks = chunkFutures.size(); - CountDownLatch block = new CountDownLatch(1); - Futures.whenAllComplete(chunkFutures).run(() -> block.countDown() , MoreExecutors.directExecutor()); - // create list of all the chunks - List chunkBlobs = IntStream.rangeClosed(1, numberOfChunks) - .mapToObj(i -> toChunkName(destination, i)) - .collect(Collectors.toList()); - - // wait for all the futures to complete - log.debug("waiting for {} chunks to complete", chunkFutures.size()); - block.await(); - log.debug("chunk uploads completed, sending compose request"); - // finalize with compose request to coalesce the chunks - Blob finalBlob = storage.compose(ComposeRequest.of(bucket, chunkBlobs, destination)); - log.debug("Multipart upload of {} complete", destination); - - deferredCleanup(storage, bucket, chunkBlobs); - - return finalBlob; + // return the single result if it exists; otherwise finalize the parallel multipart workers + return singleChunk.orElseGet(() -> { + CountDownLatch block = new CountDownLatch(1); + Futures.whenAllComplete(chunkFutures).run(() -> block.countDown() , MoreExecutors.directExecutor()); + // wait for all the futures to complete + log.debug("waiting for {} remaining chunks to complete", chunkFutures.size()); + try { + block.await(); + } + catch (InterruptedException e) { + log.error("caught InterruptedException waiting for multipart upload to complete on {}", destination); + throw new RuntimeException(e); + } + log.debug("chunk uploads completed, sending compose request"); + + // finalize with compose request to coalesce the chunks + Blob finalBlob = storage.compose(ComposeRequest.of(bucket, chunkNames, destination)); + log.debug("Multipart upload of {} complete", destination); + return finalBlob; + }); } catch(Exception e) { - deferredCleanup(storage, bucket, parts); throw new BlobStoreException("Error uploading blob", e, null); + } finally { + deferredCleanup(storage, bucket, chunkNames); } } @@ -171,14 +196,25 @@ private void deferredCleanup(final Storage storage, final String bucket, final L }); } + /** + * The name of the first chunk should match the desired end destination. + * For any chunk index 2 or greater, this method will return the destination + the chunk name suffix. + * + * @param destination + * @param chunkNumber + * @return the name to store this chunk + */ private String toChunkName(String destination, int chunkNumber) { + if (chunkNumber == 1) { + return destination; + } return destination + CHUNK_NAME_PART + chunkNumber; } /** * Read a chunk of the stream up to {@link #getChunkSize()} in length. * - * @param input + * @param input the stream to read * @return the read data as a byte array * @throws IOException */ diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy index 8e46725..e12f532 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -1,3 +1,15 @@ +/* + * Sonatype Nexus (TM) Open Source Version + * Copyright (c) 2017-present Sonatype, Inc. + * All rights reserved. Includes the third-party code listed at http://links.sonatype.com/products/nexus/oss/attributions. + * + * This program and the accompanying materials are made available under the terms of the Eclipse Public License Version 1.0, + * which accompanies this distribution and is available at http://www.eclipse.org/legal/epl-v10.html. + * + * Sonatype Nexus (TM) Professional Version is available from Sonatype, Inc. "Sonatype" and "Sonatype Nexus" are trademarks + * of Sonatype, Inc. Apache Maven is a trademark of the Apache Software Foundation. M2eclipse is a trademark of the + * Eclipse Foundation. All other trademarks are the property of their respective owners. + */ package org.sonatype.nexus.blobstore.gcloud.internal import java.util.stream.StreamSupport @@ -53,7 +65,7 @@ class MultipartUploaderIT log.info("Integration test complete, bucket ${bucketName} deleted") } - def "control experiment"() { + def "simple multipart"() { given: long expectedSize = (1048576 * 3) + 2 MultipartUploader uploader = new MultipartUploader(1048576) @@ -61,7 +73,37 @@ class MultipartUploaderIT new Random().nextBytes(data) when: - Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/test', new ByteArrayInputStream(data)) + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/multi_part', new ByteArrayInputStream(data)) + + then: + blob.size == expectedSize + } + + def "confirm parts composed in order"() { + given: + // 5 each of abcdefg + final String content = "aaaaabbbbbcccccdddddeeeeefffffggggg" + byte[] data = content.bytes + MultipartUploader uploader = new MultipartUploader(5) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/in_order', new ByteArrayInputStream(data)) + + then: + blob.size == data.length + Blob readback = storage.get(blob.blobId) + readback.getContent() == content.bytes + } + + def "single part"() { + given: + long expectedSize = 1048575 + MultipartUploader uploader = new MultipartUploader(1048576) + byte[] data = new byte[expectedSize] + new Random().nextBytes(data) + + when: + Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/single_part', new ByteArrayInputStream(data)) then: blob.size == expectedSize From 492fb3648bf46a1406664fd806ada69cf93e645b Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Tue, 30 Oct 2018 17:21:18 -0500 Subject: [PATCH 10/21] add missing mock behavior for 'store a blob' test --- .../blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy index 340f4e2..224f283 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy @@ -146,6 +146,7 @@ class GoogleCloudBlobStoreTest blobStore.init(config) blobStore.doStart() + storage.create(_, _) >> mockGoogleObject(tempFileBytes) BlobId id = new BlobId(UUID.randomUUID().toString()) String resolved = blobIdLocationResolver.getLocation(id) From 886c09352267f5a1ebe759619325236e7b8a942f Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Thu, 15 Nov 2018 13:00:22 -0600 Subject: [PATCH 11/21] wip ADR document --- design/multipart_parallel_upload.md | 36 +++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) create mode 100644 design/multipart_parallel_upload.md diff --git a/design/multipart_parallel_upload.md b/design/multipart_parallel_upload.md new file mode 100644 index 0000000..bec7273 --- /dev/null +++ b/design/multipart_parallel_upload.md @@ -0,0 +1,36 @@ +# Multipart Parallel Upload Design + +This document is intended to capture the context around the component within this blobstore that provides +Parallel Multipart Upload: + +[MultipartUploader.java](../src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java) + +## Context and Idea + +gsutil concept: [Parallel Composite Uploads](https://cloud.google.com/storage/docs/gsutil/commands/cp#parallel-composite-uploads) + +The google-cloud-storage library for Java this blobstore is built with does not have baked in mechanism for parallel composite uploads. +The storage#create function is synchronous. Fine for some workloads + +It does support however, the Compose request in the API. +This module then implements this client using the Storage request and Compose request functions. + +## Implementation + +* Don't have content length in the API, have a stream. +* GCS compose method has a hard limit of 32. Since we don't have the length, we can't +split into 32 equal parts. We can do 31 chunks of a chunkSize parameter, then 1 chunk of the rest. +* Exposes the "composeLimitHit" field +* For files smaller than the chunk size, we don't pay the cost of shipping the upload request to a thread. + * The first chunk is written at the expected destination path. + * If we've read chunkSize, and there is still more data on the stream, schedule the 2nd through final chunks off thread +* A blob write still waits until completion of parallel uploads. This is an important characteristic of all BlobStores; +Nexus Repository Manager expects consistency and not deferred, eventual consistency. + +## Tuning + +* debug logging for `org.sonatype.nexus.blobstore.gcloud.internal.MultipartUploader` +* 5 MB default and 32 chunk compose limit means optimal threads utilization will be in place for files between 10 MB +and 160 MB in size. +* When to increase? When you have fewer CPUs, and larger files. +* To effectively disable parallel uploads, you could put max int in. Will upload on the request thread From cc4b7092ba3e713e3939f4c995b2d30d0d2f1143 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Fri, 4 Jan 2019 15:26:32 -0600 Subject: [PATCH 12/21] fix: no longer including first chunk in names passed to cleanup Previously, a MPU would succeed in writing all the chunks and composing into the desired destination. The chunkNames argument however included the final destination name (as it is used for the "first" chunk); when deferredCleanup would finally complete the destination file would also be deleted. This corrects the behavior by excluding the first chunk name in deferredCleanup via a filter on the argument. This also includes a fix for a possible OutOfMemoryError condition being raised with blobs that have a really large final chunk. We avoid the issue by avoiding a call to readChunk (and a Arrays.copyOfRange on the chunk) for the final chunk. Instead, we pass the remaining InputStream to the storage client, with the unfortunate side effect of calling a deprecated method. Per the docs, the storage#create method is deprecated because it is not retryable. That means we may have failed uploads where the first 31 chunks succeed, and the last fails without retry; that highlights the importance of the tuning option and the log messaging alerting of the issue. Also includes: * an integration test to mimic storage facet's write temp and move behavior * new tests on the uploader with larger blobs * increases the maxConnectionsPerRoute to equal maxTotalConnections --- .../internal/AbstractGoogleClientFactory.java | 2 +- .../gcloud/internal/MultipartUploader.java | 48 ++++++++++++++++--- .../internal/GoogleCloudBlobStoreIT.groovy | 4 +- .../internal/MultipartUploaderIT.groovy | 40 ++++++++++++++++ 4 files changed, 84 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/AbstractGoogleClientFactory.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/AbstractGoogleClientFactory.java index 8844af0..d62aca1 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/AbstractGoogleClientFactory.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/AbstractGoogleClientFactory.java @@ -76,7 +76,7 @@ HttpParams newDefaultHttpParams() { HttpConnectionParams.setStaleCheckingEnabled(params, true); HttpConnectionParams.setSocketBufferSize(params, 8192); ConnManagerParams.setMaxTotalConnections(params, 200); - ConnManagerParams.setMaxConnectionsPerRoute(params, new ConnPerRouteBean(20)); + ConnManagerParams.setMaxConnectionsPerRoute(params, new ConnPerRouteBean(200)); return params; } diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java index 7c7b364..153379a 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -12,6 +12,7 @@ */ package org.sonatype.nexus.blobstore.gcloud.internal; +import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -38,7 +39,6 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import org.apache.commons.io.IOUtils; /** * Component that provides parallel multipart upload support for blob binary data (.bytes files). @@ -75,6 +75,8 @@ public class MultipartUploader private static final byte[] EMPTY = new byte[0]; + private static final InputStream EMPTY_STREAM = new ByteArrayInputStream(EMPTY); + private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator( Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("nexus-google-cloud-storage-multipart-upload-%d") @@ -135,7 +137,17 @@ public Blob upload(final Storage storage, final String bucket, final String dest "consider increasing '{}' beyond current value of {}", destination, CHUNK_SIZE_PROPERTY, getChunkSize()); // we've hit compose request limit read the rest of the stream composeLimitHit.incrementAndGet(); - chunk = IOUtils.toByteArray(current); + chunk = EMPTY; + + final String finalChunkName = toChunkName(destination, COMPOSE_REQUEST_LIMIT); + chunkNames.add(finalChunkName); + chunkFutures.add(executorService.submit(() -> { + log.debug("Uploading final chunk {} for {} of unknown remaining bytes", COMPOSE_REQUEST_LIMIT, destination); + BlobInfo blobInfo = BlobInfo.newBuilder( + bucket, finalChunkName).build(); + // read the rest of the current stream + return storage.create(blobInfo, current); + })); } if (chunk == EMPTY && partNumber > 1) { @@ -144,6 +156,7 @@ public Blob upload(final Storage storage, final String bucket, final String dest final String chunkName = toChunkName(destination, partNumber); chunkNames.add(chunkName); + if (partNumber == 1) { // upload the first part on the current thread BlobInfo blobInfo = BlobInfo.newBuilder(bucket, chunkName).build(); @@ -186,14 +199,16 @@ public Blob upload(final Storage storage, final String bucket, final String dest catch(Exception e) { throw new BlobStoreException("Error uploading blob", e, null); } finally { + // remove any .chunkN files off-thread + // make sure not to delete the first chunk (which has the desired destination name with no suffix) deferredCleanup(storage, bucket, chunkNames); } } - private void deferredCleanup(final Storage storage, final String bucket, final List parts) { - executorService.submit(() -> { - parts.stream().forEach(part -> storage.delete(bucket, part)); - }); + private void deferredCleanup(final Storage storage, final String bucket, final List chunkNames) { + executorService.submit(() -> chunkNames.stream() + .filter(part -> part.contains(CHUNK_NAME_PART)) + .forEach(chunk -> storage.delete(bucket, chunk))); } /** @@ -238,4 +253,25 @@ private byte[] readChunk(final InputStream input) throws IOException { return EMPTY; } } + + private InputStream streamChunk(final InputStream input) throws IOException { + byte[] buffer = new byte[chunkSize]; + int offset = 0; + int remain = chunkSize; + int bytesRead = 0; + + while (remain > 0 && bytesRead >= 0) { + bytesRead = input.read(buffer, offset, remain); + if (bytesRead > 0) { + offset += bytesRead; + remain -= bytesRead; + } + } + if (offset > 0) { + return new ByteArrayInputStream(buffer, 0, offset); + } + else { + return EMPTY_STREAM; + } + } } diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy index 3f68c72..8d42224 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy @@ -218,7 +218,7 @@ class GoogleCloudBlobStoreIT (BlobStore.CREATED_BY_HEADER): 'someuser' ] ) assert blob != null // sit for at least the time on our keep alives, so that any held connections close - log.info("waiting for ${(KEEP_ALIVE_DURATION + 1000L) / 1000L} seconds any stale connections to close") + log.info("waiting for ${(KEEP_ALIVE_DURATION + 1000L) / 1000L} seconds so any stale connections close") sleep(KEEP_ALIVE_DURATION + 1000L) Blob blob2 = blobStore.create(new ByteArrayInputStream('hello'.getBytes()), @@ -425,6 +425,4 @@ class GoogleCloudBlobStoreIT storage.create(BlobInfo.newBuilder(bucketName, path).build(), content) } - - } diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy index e12f532..37ce8f6 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -77,6 +77,7 @@ class MultipartUploaderIT then: blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-01/control/multi_part').getContent() == data } def "confirm parts composed in order"() { @@ -107,6 +108,7 @@ class MultipartUploaderIT then: blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-01/control/single_part').getContent() == data } def "zero byte file"() { @@ -121,6 +123,7 @@ class MultipartUploaderIT then: blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-01/control/zero_byte').getContent() == data } def "hit compose limit slightly and still successful"() { @@ -137,6 +140,7 @@ class MultipartUploaderIT then: blob.size == expectedSize uploader.numberOfTimesComposeLimitHit == 1L + storage.get(bucketName, 'vol-01/chap-01/composeLimitTest/small_miss').getContent() == data } def "hit compose limit poorly tuned, still successful" () { @@ -153,8 +157,12 @@ class MultipartUploaderIT then: blob.size == expectedSize uploader.numberOfTimesComposeLimitHit == 1L + storage.get(bucketName, 'vol-01/chap-01/composeLimitTest/poor_tuning').getContent() == data } + /** + * Larger upload that still fits ideally within our default tuning. 100 MB will result in 20 5 MB chunks. + */ def "upload 100 MB"() { given: long expectedSize = 1024 * 1024 * 100 @@ -173,5 +181,37 @@ class MultipartUploaderIT 'vol-01/chap-02/large/one_hundred_MB', inputStream) then: blob.size == expectedSize + storage.get(bucketName, 'vol-01/chap-02/large/one_hundred_MB').size == expectedSize + } + + /** + * The difference in this test beyond the 'upload 100 MB' test is that the upload will: + * + * a) result in incrementing {@link MultipartUploader#getNumberOfTimesComposeLimitHit()} and + * b) the last chunk will be significantly larger than the preceding 31. + * + * This represents a situation where the customer may need to adjust their chunk size upward. The larger final chunk + * may also elicit runtime pressure on memory. + */ + def "upload 300 MB"() { + given: + long expectedSize = 1024 * 1024 * 300 + BoundedInputStream inputStream = new BoundedInputStream(new InputStream() { + private Random random = new Random() + @Override + int read() throws IOException { + return random.nextInt() + } + }, expectedSize) + // with value of 5 MB per chunk, we'll upload 31 5 MB chunks and 1 145 MB chunk + MultipartUploader uploader = new MultipartUploader(1024 * 1024 * 5) + + when: + Blob blob = uploader.upload(storage, bucketName, + 'vol-01/chap-02/large/three_hundred_MB', inputStream) + then: + blob.size == expectedSize + uploader.getNumberOfTimesComposeLimitHit() == 1L + storage.get(bucketName, 'vol-01/chap-02/large/three_hundred_MB').size == expectedSize } } From f897b70dff6b68c23cb2377c5cdf3a8532d478f5 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Thu, 10 Jan 2019 09:48:44 -0600 Subject: [PATCH 13/21] remove unused method --- .../gcloud/internal/MultipartUploader.java | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java index 153379a..d40f10d 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -253,25 +253,4 @@ private byte[] readChunk(final InputStream input) throws IOException { return EMPTY; } } - - private InputStream streamChunk(final InputStream input) throws IOException { - byte[] buffer = new byte[chunkSize]; - int offset = 0; - int remain = chunkSize; - int bytesRead = 0; - - while (remain > 0 && bytesRead >= 0) { - bytesRead = input.read(buffer, offset, remain); - if (bytesRead > 0) { - offset += bytesRead; - remain -= bytesRead; - } - } - if (offset > 0) { - return new ByteArrayInputStream(buffer, 0, offset); - } - else { - return EMPTY_STREAM; - } - } } From ba48231d5e5c1485cde3fd33791620a100ee0586 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Thu, 10 Jan 2019 16:12:03 -0600 Subject: [PATCH 14/21] remove unused code --- .../gcloud/internal/GoogleCloudBlobStore.java | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java index 3a8b14b..19bec8f 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java @@ -200,17 +200,6 @@ protected Blob doCreate(final InputStream blobData, MetricsInputStream input = new MetricsInputStream(data); multipartUploader.upload(storage, getConfiguredBucketName(), destination, input); - /* BlobInfo blobInfo = BlobInfo.newBuilder(getConfiguredBucketName(), destination).build(); - try (WriteChannel writeChannel = storage.writer(blobInfo)) { - int limit; - // 1024 vs 1048576 - byte[] buffer = new byte[1048576]; - while ((limit = input.read(buffer)) >= 0) { - writeChannel.write(ByteBuffer.wrap(buffer, 0, limit)); - } - }*/ - - //bucket.create(destination, input); return input.getMetrics(); } }, blobId); From 4869f24793d096a1d65a17e280837334820fc45a Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Wed, 9 Oct 2019 08:56:49 -0500 Subject: [PATCH 15/21] wip --- .../nexus/blobstore/gcloud/internal/MultipartUploader.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java index d40f10d..def4259 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -15,6 +15,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.Writer; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -30,6 +31,7 @@ import org.sonatype.nexus.blobstore.api.BlobStoreException; import org.sonatype.nexus.common.stateguard.StateGuardLifecycleSupport; +import com.google.cloud.WriteChannel; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; @@ -133,6 +135,8 @@ public Blob upload(final Storage storage, final String bucket, final String dest chunk = readChunk(current); } else { + // TODO trigger compose for preceding 32 chunks instead (off-thread) + // TODO start writing as next chunk of chunk size log.debug("Upload for {} has hit Google Cloud Storage multipart-compose limits; " + "consider increasing '{}' beyond current value of {}", destination, CHUNK_SIZE_PROPERTY, getChunkSize()); // we've hit compose request limit read the rest of the stream From a1e270b48c5a53300716fdcfe044c7c5c56a0c19 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Thu, 12 Dec 2019 20:51:47 -0600 Subject: [PATCH 16/21] disable GZIP compression on storage writes Recent profiling sessions discovered that the google-cloud-storage java library gzip compresses streams on write to the bucket. Since the overwhelming majority of content we write through this plugin is already compressed, this has no benefit and results in high CPU utilization and low network throughput. By disabling gzip compression on the fixed-size chunk writes, we observe a 30-50% increase in network throughput. The edge case that this commit does not address is the final chunk; since we don't have the size and we can't chunk it, we have to call the deprecated storage write method that takes an InputStream. This variant has no option available to disable gzip compression. --- .../blobstore/gcloud/internal/MultipartUploader.java | 10 +++++----- .../gcloud/internal/GoogleCloudBlobStoreIT.groovy | 11 +++++++---- .../gcloud/internal/GoogleCloudBlobStoreTest.groovy | 2 +- .../gcloud/internal/MultipartUploaderIT.groovy | 10 +++++----- 4 files changed, 18 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java index d40f10d..36b5f7c 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -12,7 +12,6 @@ */ package org.sonatype.nexus.blobstore.gcloud.internal; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -33,6 +32,7 @@ import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; +import com.google.cloud.storage.Storage.BlobTargetOption; import com.google.cloud.storage.Storage.ComposeRequest; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -75,8 +75,6 @@ public class MultipartUploader private static final byte[] EMPTY = new byte[0]; - private static final InputStream EMPTY_STREAM = new ByteArrayInputStream(EMPTY); - private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator( Executors.newCachedThreadPool( new ThreadFactoryBuilder().setNameFormat("nexus-google-cloud-storage-multipart-upload-%d") @@ -146,6 +144,8 @@ public Blob upload(final Storage storage, final String bucket, final String dest BlobInfo blobInfo = BlobInfo.newBuilder( bucket, finalChunkName).build(); // read the rest of the current stream + // downside here is that since we don't know the stream size, we can't chunk it. + // the deprecated create method here does not allow us to disable GZIP compression on these PUTs return storage.create(blobInfo, current); })); } @@ -160,7 +160,7 @@ public Blob upload(final Storage storage, final String bucket, final String dest if (partNumber == 1) { // upload the first part on the current thread BlobInfo blobInfo = BlobInfo.newBuilder(bucket, chunkName).build(); - Blob blob = storage.create(blobInfo, chunk); + Blob blob = storage.create(blobInfo, chunk, BlobTargetOption.disableGzipContent()); singleChunk = Optional.of(blob); } else { singleChunk = Optional.empty(); @@ -170,7 +170,7 @@ public Blob upload(final Storage storage, final String bucket, final String dest log.debug("Uploading chunk {} for {} of {} bytes", chunkIndex, destination, chunk.length); BlobInfo blobInfo = BlobInfo.newBuilder( bucket, chunkName).build(); - return storage.create(blobInfo, chunk); + return storage.create(blobInfo, chunk, BlobTargetOption.disableGzipContent()); })); } } diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy index daa0b38..4e2a152 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy @@ -93,7 +93,10 @@ class GoogleCloudBlobStoreIT ShardedCounterMetricsStore metricsStore - MultipartUploader uploader = new MultipartUploader(1024) + // chunkSize = 5 MB + // the net effect of this is all tests except for "create large file" will be single thread + // create large file will end up using max chunks + MultipartUploader uploader = new MultipartUploader(5242880) def setup() { quotaService = new BlobStoreQuotaServiceImpl([ @@ -274,7 +277,7 @@ class GoogleCloudBlobStoreIT * On my workstation a few hundred miles from the GCP region with a typical consumer grade ISP (limited to 6-7 Mbps * upload), this test completes successfully in around 10 minutes (with the 500 MB file generated by the dd command). */ - @Ignore + //@Ignore def "create large file" () { given: def url = getClass().getResource('/large_file') @@ -444,8 +447,8 @@ class GoogleCloudBlobStoreIT def config2 = makeConfig("multi-tenant-test-${uid}", bucket2) ShardedCounterMetricsStore metricsStore2 = new ShardedCounterMetricsStore(blobIdLocationResolver, datastoreFactory, periodicJobService) - def blobStore2 = new GoogleCloudBlobStore(storageFactory, blobIdLocationResolver, datastoreFactory, - periodicJobService, new DryRunPrefix("TEST "), uploader, metricRegistry, quotaService, 60) + def blobStore2 = new GoogleCloudBlobStore(storageFactory, blobIdLocationResolver, periodicJobService, metricsStore2, + datastoreFactory, new DryRunPrefix("TEST "), uploader, metricRegistry, quotaService, 60) blobStore2.init(config2) blobStore2.start() metricsStore2.start() diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy index 8d3e78e..9d8a94a 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy @@ -152,7 +152,7 @@ class GoogleCloudBlobStoreTest blobStore.init(config) blobStore.doStart() - storage.create(_, _) >> mockGoogleObject(tempFileBytes) + storage.create(_, _, _) >> mockGoogleObject(tempFileBytes) BlobId id = new BlobId(UUID.randomUUID().toString()) String resolved = blobIdLocationResolver.getLocation(id) diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy index 37ce8f6..320c617 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -193,9 +193,9 @@ class MultipartUploaderIT * This represents a situation where the customer may need to adjust their chunk size upward. The larger final chunk * may also elicit runtime pressure on memory. */ - def "upload 300 MB"() { + def "upload 200 MB"() { given: - long expectedSize = 1024 * 1024 * 300 + long expectedSize = 1024 * 1024 * 200 BoundedInputStream inputStream = new BoundedInputStream(new InputStream() { private Random random = new Random() @Override @@ -203,15 +203,15 @@ class MultipartUploaderIT return random.nextInt() } }, expectedSize) - // with value of 5 MB per chunk, we'll upload 31 5 MB chunks and 1 145 MB chunk + // with value of 5 MB per chunk, we'll upload 31 5 MB chunks and 1 45 MB chunk MultipartUploader uploader = new MultipartUploader(1024 * 1024 * 5) when: Blob blob = uploader.upload(storage, bucketName, - 'vol-01/chap-02/large/three_hundred_MB', inputStream) + 'vol-01/chap-02/large/two_hundred_MB', inputStream) then: blob.size == expectedSize uploader.getNumberOfTimesComposeLimitHit() == 1L - storage.get(bucketName, 'vol-01/chap-02/large/three_hundred_MB').size == expectedSize + storage.get(bucketName, 'vol-01/chap-02/large/two_hundred_MB').size == expectedSize } } From e3c046fd6535a901b127d4d17364930d99335a68 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Fri, 13 Dec 2019 15:02:04 -0600 Subject: [PATCH 17/21] cleanup formatting --- .../blobstore/gcloud/internal/GoogleCloudBlobStore.java | 9 +++++---- .../blobstore/gcloud/internal/MultipartUploader.java | 2 +- .../blobstore/gcloud/internal/MultipartUploaderIT.groovy | 4 +--- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java index 4d84418..c36769a 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStore.java @@ -234,7 +234,7 @@ public Blob copy(final BlobId blobId, final Map headers) { return createInternal(headers, destination -> { sourceBlob.getBlob().copyTo(getConfiguredBucketName(), destination); - BlobMetrics metrics = get(blobId).getMetrics(); + BlobMetrics metrics = sourceBlob.getMetrics(); return new StreamMetrics(metrics.getContentSize(), metrics.getSha1Hash()); }, null); } @@ -283,7 +283,6 @@ public Blob get(final BlobId blobId, final boolean includeDeleted) { } log.debug("Accessing blob {}", blobId); - return blob; } @@ -527,7 +526,8 @@ public boolean isWritable() { List results = storage.testIamPermissions(getConfiguredBucketName(), Arrays.asList("storage.objects.create", "storage.objects.delete")); return !results.contains(false); - } catch (StorageException e) { + } + catch (StorageException e) { throw new BlobStoreException("failed to retrive User ACL for " + getConfiguredBucketName(), e, null); } } @@ -594,7 +594,8 @@ void flushMetricsStore() { private void deleteNonExplosively(final String contentPath) { try { storage.delete(getConfiguredBucketName(), contentPath); - } catch (Exception e) { + } + catch (Exception e) { log.warn("caught exception attempting to delete during cleanup", e); } } diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java index 36b5f7c..4325c6a 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -131,7 +131,7 @@ public Blob upload(final Storage storage, final String bucket, final String dest chunk = readChunk(current); } else { - log.debug("Upload for {} has hit Google Cloud Storage multipart-compose limits; " + + log.info("Upload for {} has hit Google Cloud Storage multipart-compose limits; " + "consider increasing '{}' beyond current value of {}", destination, CHUNK_SIZE_PROPERTY, getChunkSize()); // we've hit compose request limit read the rest of the stream composeLimitHit.incrementAndGet(); diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy index 320c617..cc3bc5b 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -51,7 +51,6 @@ class MultipartUploaderIT } def cleanupSpec() { - //Storage storage = new GoogleCloudStorageFactory().create(config) log.debug("Tests complete, deleting files from ${bucketName}") // must delete all the files within the bucket before we can delete the bucket Iterator list = storage.list(bucketName, @@ -190,8 +189,7 @@ class MultipartUploaderIT * a) result in incrementing {@link MultipartUploader#getNumberOfTimesComposeLimitHit()} and * b) the last chunk will be significantly larger than the preceding 31. * - * This represents a situation where the customer may need to adjust their chunk size upward. The larger final chunk - * may also elicit runtime pressure on memory. + * This represents a situation where the customer may need to adjust their chunk size upward. */ def "upload 200 MB"() { given: From f17a17b718d04d31e77b1edfcd8999a7da5857e3 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Fri, 13 Dec 2019 15:17:11 -0600 Subject: [PATCH 18/21] re-ignore large file test --- .../blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy index 4e2a152..297d870 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy @@ -277,7 +277,7 @@ class GoogleCloudBlobStoreIT * On my workstation a few hundred miles from the GCP region with a typical consumer grade ISP (limited to 6-7 Mbps * upload), this test completes successfully in around 10 minutes (with the 500 MB file generated by the dd command). */ - //@Ignore + @Ignore def "create large file" () { given: def url = getClass().getResource('/large_file') From a41e736c0079aa291b16941d7e21b9522453a896 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Fri, 13 Dec 2019 15:28:05 -0600 Subject: [PATCH 19/21] update format and language in ADR --- design/multipart_parallel_upload.md | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/design/multipart_parallel_upload.md b/design/multipart_parallel_upload.md index bec7273..75c0dd9 100644 --- a/design/multipart_parallel_upload.md +++ b/design/multipart_parallel_upload.md @@ -9,19 +9,23 @@ Parallel Multipart Upload: gsutil concept: [Parallel Composite Uploads](https://cloud.google.com/storage/docs/gsutil/commands/cp#parallel-composite-uploads) -The google-cloud-storage library for Java this blobstore is built with does not have baked in mechanism for parallel composite uploads. -The storage#create function is synchronous. Fine for some workloads +The google-cloud-storage library for Java this plugin is built with does not have a provided mechanism for parallel +composite uploads. -It does support however, the Compose request in the API. -This module then implements this client using the Storage request and Compose request functions. +In NXRM 3.19, the Amazon S3 blobstore switched to using parallel uploads, see + +https://github.com/sonatype/nexus-public/blob/master/plugins/nexus-blobstore-s3/src/main/java/org/sonatype/nexus/blobstore/s3/internal/ParallelUploader.java + +This switch has resulted in higher overall throughput for the S3 Blobstores (see https://issues.sonatype.org/browse/NEXUS-19566). +The goal for this feature would be to replicate that parallel upload. ## Implementation -* Don't have content length in the API, have a stream. -* GCS compose method has a hard limit of 32. Since we don't have the length, we can't +* We don't have content length in the Blobstore API, have an `InputStream` than can be quite large. +* GCS compose method has a hard limit of 32 chunks. Since we don't have the length, we can't split into 32 equal parts. We can do 31 chunks of a chunkSize parameter, then 1 chunk of the rest. -* Exposes the "composeLimitHit" field -* For files smaller than the chunk size, we don't pay the cost of shipping the upload request to a thread. +* We should expose how many times that compose limit as hit with tips on how to re-configure. +* For files smaller than 1 chunk size, we don't pay the cost of shipping the upload request to a thread. * The first chunk is written at the expected destination path. * If we've read chunkSize, and there is still more data on the stream, schedule the 2nd through final chunks off thread * A blob write still waits until completion of parallel uploads. This is an important characteristic of all BlobStores; @@ -29,8 +33,6 @@ Nexus Repository Manager expects consistency and not deferred, eventual consiste ## Tuning -* debug logging for `org.sonatype.nexus.blobstore.gcloud.internal.MultipartUploader` +* Observability via debug logging on `org.sonatype.nexus.blobstore.gcloud.internal.MultipartUploader` * 5 MB default and 32 chunk compose limit means optimal threads utilization will be in place for files between 10 MB and 160 MB in size. -* When to increase? When you have fewer CPUs, and larger files. -* To effectively disable parallel uploads, you could put max int in. Will upload on the request thread From e394e05118f0236e95db14c0f0e9c15564d2a125 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Wed, 18 Dec 2019 16:13:20 -0600 Subject: [PATCH 20/21] instrument the uploader ExecutorService, review feedback This change: * Changes MultipartUploader#doStop to use shutdownNow rather than waiting for parts to finish uploading; there is no guarantee the other services we would need to complete the blob upload are still available, we might as well stop quickly. * uses Dropwizard metrics' InstrumentedExecutorService to allow us to peek at the thread pool utilization in the MultipartUploader. * Corrects formatting --- .../gcloud/internal/MultipartUploader.java | 38 +++++++++++-------- .../internal/GoogleCloudBlobStoreIT.groovy | 2 +- .../internal/GoogleCloudBlobStoreTest.groovy | 4 +- .../internal/MultipartUploaderIT.groovy | 20 ++++++---- 4 files changed, 37 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java index 4325c6a..f735e93 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -20,7 +20,6 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; @@ -28,7 +27,10 @@ import org.sonatype.nexus.blobstore.api.BlobStoreException; import org.sonatype.nexus.common.stateguard.StateGuardLifecycleSupport; +import org.sonatype.nexus.thread.NexusThreadFactory; +import com.codahale.metrics.InstrumentedExecutorService; +import com.codahale.metrics.MetricRegistry; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; @@ -38,7 +40,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import static java.lang.String.format; /** * Component that provides parallel multipart upload support for blob binary data (.bytes files). @@ -75,23 +78,24 @@ public class MultipartUploader private static final byte[] EMPTY = new byte[0]; - private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator( - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("nexus-google-cloud-storage-multipart-upload-%d") - .build())); + private final ListeningExecutorService executorService; private final int chunkSize; @Inject - public MultipartUploader(@Named("${"+CHUNK_SIZE_PROPERTY +":-5242880}") final int chunkSize) { + public MultipartUploader(final MetricRegistry metricRegistry, + @Named("${"+CHUNK_SIZE_PROPERTY +":-5242880}") final int chunkSize) { this.chunkSize = chunkSize; + this.executorService = MoreExecutors.listeningDecorator( + new InstrumentedExecutorService( + Executors.newCachedThreadPool( + new NexusThreadFactory("multipart-upload", "nexus-blobstore-google-cloud")), + metricRegistry, format("%s.%s", MultipartUploader.class.getName(), "executor-service"))); } @Override protected void doStop() throws Exception { - executorService.shutdown(); - log.info("sent signal to shutdown multipart upload queue, waiting up to 3 minutes for termination..."); - executorService.awaitTermination(3L, TimeUnit.MINUTES); + executorService.shutdownNow(); } /** @@ -131,18 +135,18 @@ public Blob upload(final Storage storage, final String bucket, final String dest chunk = readChunk(current); } else { - log.info("Upload for {} has hit Google Cloud Storage multipart-compose limits; " + - "consider increasing '{}' beyond current value of {}", destination, CHUNK_SIZE_PROPERTY, getChunkSize()); // we've hit compose request limit read the rest of the stream composeLimitHit.incrementAndGet(); chunk = EMPTY; + log.info("Upload for {} has hit Google Cloud Storage multipart-compose limit ({} total times limit hit); " + + "consider increasing '{}' beyond current value of {}", destination, composeLimitHit.get(), + CHUNK_SIZE_PROPERTY, getChunkSize()); final String finalChunkName = toChunkName(destination, COMPOSE_REQUEST_LIMIT); chunkNames.add(finalChunkName); chunkFutures.add(executorService.submit(() -> { log.debug("Uploading final chunk {} for {} of unknown remaining bytes", COMPOSE_REQUEST_LIMIT, destination); - BlobInfo blobInfo = BlobInfo.newBuilder( - bucket, finalChunkName).build(); + BlobInfo blobInfo = BlobInfo.newBuilder(bucket, finalChunkName).build(); // read the rest of the current stream // downside here is that since we don't know the stream size, we can't chunk it. // the deprecated create method here does not allow us to disable GZIP compression on these PUTs @@ -162,7 +166,8 @@ public Blob upload(final Storage storage, final String bucket, final String dest BlobInfo blobInfo = BlobInfo.newBuilder(bucket, chunkName).build(); Blob blob = storage.create(blobInfo, chunk, BlobTargetOption.disableGzipContent()); singleChunk = Optional.of(blob); - } else { + } + else { singleChunk = Optional.empty(); // 2nd through N chunks will happen off current thread in parallel final int chunkIndex = partNumber; @@ -198,7 +203,8 @@ public Blob upload(final Storage storage, final String bucket, final String dest } catch(Exception e) { throw new BlobStoreException("Error uploading blob", e, null); - } finally { + } + finally { // remove any .chunkN files off-thread // make sure not to delete the first chunk (which has the desired destination name with no suffix) deferredCleanup(storage, bucket, chunkNames); diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy index 297d870..b5fb176 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy @@ -96,7 +96,7 @@ class GoogleCloudBlobStoreIT // chunkSize = 5 MB // the net effect of this is all tests except for "create large file" will be single thread // create large file will end up using max chunks - MultipartUploader uploader = new MultipartUploader(5242880) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 5242880) def setup() { quotaService = new BlobStoreQuotaServiceImpl([ diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy index 9d8a94a..544018f 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy @@ -55,13 +55,13 @@ class GoogleCloudBlobStoreTest Bucket bucket = Mock() - MetricRegistry metricRegistry = Mock() + MetricRegistry metricRegistry = new MetricRegistry() GoogleCloudDatastoreFactory datastoreFactory = Mock() Datastore datastore = Mock() - MultipartUploader uploader = new MultipartUploader(1024) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) BlobStoreQuotaService quotaService = Mock() diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy index cc3bc5b..a03f8c7 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -16,12 +16,14 @@ import java.util.stream.StreamSupport import org.sonatype.nexus.blobstore.api.BlobStoreConfiguration +import com.codahale.metrics.MetricRegistry import com.google.cloud.storage.Blob import com.google.cloud.storage.Blob.BlobSourceOption import com.google.cloud.storage.BucketInfo import com.google.cloud.storage.Storage import groovy.util.logging.Slf4j import org.apache.commons.io.input.BoundedInputStream +import spock.lang.Ignore import spock.lang.Specification @Slf4j @@ -37,6 +39,8 @@ class MultipartUploaderIT static Storage storage + MetricRegistry metricRegistry = new MetricRegistry() + def setupSpec() { config.attributes = [ 'google cloud storage': [ @@ -67,7 +71,7 @@ class MultipartUploaderIT def "simple multipart"() { given: long expectedSize = (1048576 * 3) + 2 - MultipartUploader uploader = new MultipartUploader(1048576) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1048576) byte[] data = new byte[expectedSize] new Random().nextBytes(data) @@ -84,7 +88,7 @@ class MultipartUploaderIT // 5 each of abcdefg final String content = "aaaaabbbbbcccccdddddeeeeefffffggggg" byte[] data = content.bytes - MultipartUploader uploader = new MultipartUploader(5) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 5) when: Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/in_order', new ByteArrayInputStream(data)) @@ -98,7 +102,7 @@ class MultipartUploaderIT def "single part"() { given: long expectedSize = 1048575 - MultipartUploader uploader = new MultipartUploader(1048576) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1048576) byte[] data = new byte[expectedSize] new Random().nextBytes(data) @@ -113,7 +117,7 @@ class MultipartUploaderIT def "zero byte file"() { given: long expectedSize = 0 - MultipartUploader uploader = new MultipartUploader(1024) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) byte[] data = new byte[expectedSize] new Random().nextBytes(data) @@ -128,7 +132,7 @@ class MultipartUploaderIT def "hit compose limit slightly and still successful"() { given: long expectedSize = (1024 * MultipartUploader.COMPOSE_REQUEST_LIMIT) + 10 - MultipartUploader uploader = new MultipartUploader(1024) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) byte[] data = new byte[expectedSize] new Random().nextBytes(data) @@ -145,7 +149,7 @@ class MultipartUploaderIT def "hit compose limit poorly tuned, still successful" () { given: long expectedSize = 1048576 - MultipartUploader uploader = new MultipartUploader(1024) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) byte[] data = new byte[expectedSize] new Random().nextBytes(data) @@ -173,7 +177,7 @@ class MultipartUploaderIT } }, expectedSize) // default value of 5 MB per chunk - MultipartUploader uploader = new MultipartUploader(1024 * 1024 * 5) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024 * 1024 * 5) when: Blob blob = uploader.upload(storage, bucketName, @@ -202,7 +206,7 @@ class MultipartUploaderIT } }, expectedSize) // with value of 5 MB per chunk, we'll upload 31 5 MB chunks and 1 45 MB chunk - MultipartUploader uploader = new MultipartUploader(1024 * 1024 * 5) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024 * 1024 * 5) when: Blob blob = uploader.upload(storage, bucketName, From 8b082b2aab479205f9a5e90b4cc1275c4386936f Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Wed, 18 Dec 2019 16:16:30 -0600 Subject: [PATCH 21/21] remove unused import --- .../nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy | 1 - 1 file changed, 1 deletion(-) diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy index a03f8c7..5df48bc 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -23,7 +23,6 @@ import com.google.cloud.storage.BucketInfo import com.google.cloud.storage.Storage import groovy.util.logging.Slf4j import org.apache.commons.io.input.BoundedInputStream -import spock.lang.Ignore import spock.lang.Specification @Slf4j