From e394e05118f0236e95db14c0f0e9c15564d2a125 Mon Sep 17 00:00:00 2001 From: Nicholas Blair Date: Wed, 18 Dec 2019 16:13:20 -0600 Subject: [PATCH] instrument the uploader ExecutorService, review feedback This change: * Changes MultipartUploader#doStop to use shutdownNow rather than waiting for parts to finish uploading; there is no guarantee the other services we would need to complete the blob upload are still available, we might as well stop quickly. * uses Dropwizard metrics' InstrumentedExecutorService to allow us to peek at the thread pool utilization in the MultipartUploader. * Corrects formatting --- .../gcloud/internal/MultipartUploader.java | 38 +++++++++++-------- .../internal/GoogleCloudBlobStoreIT.groovy | 2 +- .../internal/GoogleCloudBlobStoreTest.groovy | 4 +- .../internal/MultipartUploaderIT.groovy | 20 ++++++---- 4 files changed, 37 insertions(+), 27 deletions(-) diff --git a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java index 4325c6a..f735e93 100644 --- a/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java +++ b/src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java @@ -20,7 +20,6 @@ import java.util.Optional; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import javax.inject.Inject; @@ -28,7 +27,10 @@ import org.sonatype.nexus.blobstore.api.BlobStoreException; import org.sonatype.nexus.common.stateguard.StateGuardLifecycleSupport; +import org.sonatype.nexus.thread.NexusThreadFactory; +import com.codahale.metrics.InstrumentedExecutorService; +import com.codahale.metrics.MetricRegistry; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobInfo; import com.google.cloud.storage.Storage; @@ -38,7 +40,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import static java.lang.String.format; /** * Component that provides parallel multipart upload support for blob binary data (.bytes files). @@ -75,23 +78,24 @@ public class MultipartUploader private static final byte[] EMPTY = new byte[0]; - private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator( - Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setNameFormat("nexus-google-cloud-storage-multipart-upload-%d") - .build())); + private final ListeningExecutorService executorService; private final int chunkSize; @Inject - public MultipartUploader(@Named("${"+CHUNK_SIZE_PROPERTY +":-5242880}") final int chunkSize) { + public MultipartUploader(final MetricRegistry metricRegistry, + @Named("${"+CHUNK_SIZE_PROPERTY +":-5242880}") final int chunkSize) { this.chunkSize = chunkSize; + this.executorService = MoreExecutors.listeningDecorator( + new InstrumentedExecutorService( + Executors.newCachedThreadPool( + new NexusThreadFactory("multipart-upload", "nexus-blobstore-google-cloud")), + metricRegistry, format("%s.%s", MultipartUploader.class.getName(), "executor-service"))); } @Override protected void doStop() throws Exception { - executorService.shutdown(); - log.info("sent signal to shutdown multipart upload queue, waiting up to 3 minutes for termination..."); - executorService.awaitTermination(3L, TimeUnit.MINUTES); + executorService.shutdownNow(); } /** @@ -131,18 +135,18 @@ public Blob upload(final Storage storage, final String bucket, final String dest chunk = readChunk(current); } else { - log.info("Upload for {} has hit Google Cloud Storage multipart-compose limits; " + - "consider increasing '{}' beyond current value of {}", destination, CHUNK_SIZE_PROPERTY, getChunkSize()); // we've hit compose request limit read the rest of the stream composeLimitHit.incrementAndGet(); chunk = EMPTY; + log.info("Upload for {} has hit Google Cloud Storage multipart-compose limit ({} total times limit hit); " + + "consider increasing '{}' beyond current value of {}", destination, composeLimitHit.get(), + CHUNK_SIZE_PROPERTY, getChunkSize()); final String finalChunkName = toChunkName(destination, COMPOSE_REQUEST_LIMIT); chunkNames.add(finalChunkName); chunkFutures.add(executorService.submit(() -> { log.debug("Uploading final chunk {} for {} of unknown remaining bytes", COMPOSE_REQUEST_LIMIT, destination); - BlobInfo blobInfo = BlobInfo.newBuilder( - bucket, finalChunkName).build(); + BlobInfo blobInfo = BlobInfo.newBuilder(bucket, finalChunkName).build(); // read the rest of the current stream // downside here is that since we don't know the stream size, we can't chunk it. // the deprecated create method here does not allow us to disable GZIP compression on these PUTs @@ -162,7 +166,8 @@ public Blob upload(final Storage storage, final String bucket, final String dest BlobInfo blobInfo = BlobInfo.newBuilder(bucket, chunkName).build(); Blob blob = storage.create(blobInfo, chunk, BlobTargetOption.disableGzipContent()); singleChunk = Optional.of(blob); - } else { + } + else { singleChunk = Optional.empty(); // 2nd through N chunks will happen off current thread in parallel final int chunkIndex = partNumber; @@ -198,7 +203,8 @@ public Blob upload(final Storage storage, final String bucket, final String dest } catch(Exception e) { throw new BlobStoreException("Error uploading blob", e, null); - } finally { + } + finally { // remove any .chunkN files off-thread // make sure not to delete the first chunk (which has the desired destination name with no suffix) deferredCleanup(storage, bucket, chunkNames); diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy index 297d870..b5fb176 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreIT.groovy @@ -96,7 +96,7 @@ class GoogleCloudBlobStoreIT // chunkSize = 5 MB // the net effect of this is all tests except for "create large file" will be single thread // create large file will end up using max chunks - MultipartUploader uploader = new MultipartUploader(5242880) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 5242880) def setup() { quotaService = new BlobStoreQuotaServiceImpl([ diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy index 9d8a94a..544018f 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/GoogleCloudBlobStoreTest.groovy @@ -55,13 +55,13 @@ class GoogleCloudBlobStoreTest Bucket bucket = Mock() - MetricRegistry metricRegistry = Mock() + MetricRegistry metricRegistry = new MetricRegistry() GoogleCloudDatastoreFactory datastoreFactory = Mock() Datastore datastore = Mock() - MultipartUploader uploader = new MultipartUploader(1024) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) BlobStoreQuotaService quotaService = Mock() diff --git a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy index cc3bc5b..a03f8c7 100644 --- a/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy +++ b/src/test/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploaderIT.groovy @@ -16,12 +16,14 @@ import java.util.stream.StreamSupport import org.sonatype.nexus.blobstore.api.BlobStoreConfiguration +import com.codahale.metrics.MetricRegistry import com.google.cloud.storage.Blob import com.google.cloud.storage.Blob.BlobSourceOption import com.google.cloud.storage.BucketInfo import com.google.cloud.storage.Storage import groovy.util.logging.Slf4j import org.apache.commons.io.input.BoundedInputStream +import spock.lang.Ignore import spock.lang.Specification @Slf4j @@ -37,6 +39,8 @@ class MultipartUploaderIT static Storage storage + MetricRegistry metricRegistry = new MetricRegistry() + def setupSpec() { config.attributes = [ 'google cloud storage': [ @@ -67,7 +71,7 @@ class MultipartUploaderIT def "simple multipart"() { given: long expectedSize = (1048576 * 3) + 2 - MultipartUploader uploader = new MultipartUploader(1048576) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1048576) byte[] data = new byte[expectedSize] new Random().nextBytes(data) @@ -84,7 +88,7 @@ class MultipartUploaderIT // 5 each of abcdefg final String content = "aaaaabbbbbcccccdddddeeeeefffffggggg" byte[] data = content.bytes - MultipartUploader uploader = new MultipartUploader(5) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 5) when: Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/in_order', new ByteArrayInputStream(data)) @@ -98,7 +102,7 @@ class MultipartUploaderIT def "single part"() { given: long expectedSize = 1048575 - MultipartUploader uploader = new MultipartUploader(1048576) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1048576) byte[] data = new byte[expectedSize] new Random().nextBytes(data) @@ -113,7 +117,7 @@ class MultipartUploaderIT def "zero byte file"() { given: long expectedSize = 0 - MultipartUploader uploader = new MultipartUploader(1024) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) byte[] data = new byte[expectedSize] new Random().nextBytes(data) @@ -128,7 +132,7 @@ class MultipartUploaderIT def "hit compose limit slightly and still successful"() { given: long expectedSize = (1024 * MultipartUploader.COMPOSE_REQUEST_LIMIT) + 10 - MultipartUploader uploader = new MultipartUploader(1024) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) byte[] data = new byte[expectedSize] new Random().nextBytes(data) @@ -145,7 +149,7 @@ class MultipartUploaderIT def "hit compose limit poorly tuned, still successful" () { given: long expectedSize = 1048576 - MultipartUploader uploader = new MultipartUploader(1024) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024) byte[] data = new byte[expectedSize] new Random().nextBytes(data) @@ -173,7 +177,7 @@ class MultipartUploaderIT } }, expectedSize) // default value of 5 MB per chunk - MultipartUploader uploader = new MultipartUploader(1024 * 1024 * 5) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024 * 1024 * 5) when: Blob blob = uploader.upload(storage, bucketName, @@ -202,7 +206,7 @@ class MultipartUploaderIT } }, expectedSize) // with value of 5 MB per chunk, we'll upload 31 5 MB chunks and 1 45 MB chunk - MultipartUploader uploader = new MultipartUploader(1024 * 1024 * 5) + MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024 * 1024 * 5) when: Blob blob = uploader.upload(storage, bucketName,