Skip to content
This repository has been archived by the owner on Nov 8, 2024. It is now read-only.

Commit

Permalink
instrument the uploader ExecutorService, review feedback
Browse files Browse the repository at this point in the history
This change:
* Changes MultipartUploader#doStop to use shutdownNow rather than waiting for parts to finish uploading; there is no guarantee the other services we would need to complete the blob upload are still available, we might as well stop quickly.
* uses Dropwizard metrics' InstrumentedExecutorService to allow us to peek at the thread pool utilization in the MultipartUploader.
* Corrects formatting
  • Loading branch information
nblair committed Dec 18, 2019
1 parent a41e736 commit e394e05
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import javax.inject.Inject;
import javax.inject.Named;

import org.sonatype.nexus.blobstore.api.BlobStoreException;
import org.sonatype.nexus.common.stateguard.StateGuardLifecycleSupport;
import org.sonatype.nexus.thread.NexusThreadFactory;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
Expand All @@ -38,7 +40,8 @@
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

import static java.lang.String.format;

/**
* Component that provides parallel multipart upload support for blob binary data (.bytes files).
Expand Down Expand Up @@ -75,23 +78,24 @@ public class MultipartUploader

private static final byte[] EMPTY = new byte[0];

private final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(
Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("nexus-google-cloud-storage-multipart-upload-%d")
.build()));
private final ListeningExecutorService executorService;

private final int chunkSize;

@Inject
public MultipartUploader(@Named("${"+CHUNK_SIZE_PROPERTY +":-5242880}") final int chunkSize) {
public MultipartUploader(final MetricRegistry metricRegistry,
@Named("${"+CHUNK_SIZE_PROPERTY +":-5242880}") final int chunkSize) {
this.chunkSize = chunkSize;
this.executorService = MoreExecutors.listeningDecorator(
new InstrumentedExecutorService(
Executors.newCachedThreadPool(
new NexusThreadFactory("multipart-upload", "nexus-blobstore-google-cloud")),
metricRegistry, format("%s.%s", MultipartUploader.class.getName(), "executor-service")));
}

@Override
protected void doStop() throws Exception {
executorService.shutdown();
log.info("sent signal to shutdown multipart upload queue, waiting up to 3 minutes for termination...");
executorService.awaitTermination(3L, TimeUnit.MINUTES);
executorService.shutdownNow();
}

/**
Expand Down Expand Up @@ -131,18 +135,18 @@ public Blob upload(final Storage storage, final String bucket, final String dest
chunk = readChunk(current);
}
else {
log.info("Upload for {} has hit Google Cloud Storage multipart-compose limits; " +
"consider increasing '{}' beyond current value of {}", destination, CHUNK_SIZE_PROPERTY, getChunkSize());
// we've hit compose request limit read the rest of the stream
composeLimitHit.incrementAndGet();
chunk = EMPTY;
log.info("Upload for {} has hit Google Cloud Storage multipart-compose limit ({} total times limit hit); " +
"consider increasing '{}' beyond current value of {}", destination, composeLimitHit.get(),
CHUNK_SIZE_PROPERTY, getChunkSize());

final String finalChunkName = toChunkName(destination, COMPOSE_REQUEST_LIMIT);
chunkNames.add(finalChunkName);
chunkFutures.add(executorService.submit(() -> {
log.debug("Uploading final chunk {} for {} of unknown remaining bytes", COMPOSE_REQUEST_LIMIT, destination);
BlobInfo blobInfo = BlobInfo.newBuilder(
bucket, finalChunkName).build();
BlobInfo blobInfo = BlobInfo.newBuilder(bucket, finalChunkName).build();
// read the rest of the current stream
// downside here is that since we don't know the stream size, we can't chunk it.
// the deprecated create method here does not allow us to disable GZIP compression on these PUTs
Expand All @@ -162,7 +166,8 @@ public Blob upload(final Storage storage, final String bucket, final String dest
BlobInfo blobInfo = BlobInfo.newBuilder(bucket, chunkName).build();
Blob blob = storage.create(blobInfo, chunk, BlobTargetOption.disableGzipContent());
singleChunk = Optional.of(blob);
} else {
}
else {
singleChunk = Optional.empty();
// 2nd through N chunks will happen off current thread in parallel
final int chunkIndex = partNumber;
Expand Down Expand Up @@ -198,7 +203,8 @@ public Blob upload(final Storage storage, final String bucket, final String dest
}
catch(Exception e) {
throw new BlobStoreException("Error uploading blob", e, null);
} finally {
}
finally {
// remove any .chunkN files off-thread
// make sure not to delete the first chunk (which has the desired destination name with no suffix)
deferredCleanup(storage, bucket, chunkNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class GoogleCloudBlobStoreIT
// chunkSize = 5 MB
// the net effect of this is all tests except for "create large file" will be single thread
// create large file will end up using max chunks
MultipartUploader uploader = new MultipartUploader(5242880)
MultipartUploader uploader = new MultipartUploader(metricRegistry, 5242880)

def setup() {
quotaService = new BlobStoreQuotaServiceImpl([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,13 @@ class GoogleCloudBlobStoreTest

Bucket bucket = Mock()

MetricRegistry metricRegistry = Mock()
MetricRegistry metricRegistry = new MetricRegistry()

GoogleCloudDatastoreFactory datastoreFactory = Mock()

Datastore datastore = Mock()

MultipartUploader uploader = new MultipartUploader(1024)
MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024)

BlobStoreQuotaService quotaService = Mock()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ import java.util.stream.StreamSupport

import org.sonatype.nexus.blobstore.api.BlobStoreConfiguration

import com.codahale.metrics.MetricRegistry
import com.google.cloud.storage.Blob
import com.google.cloud.storage.Blob.BlobSourceOption
import com.google.cloud.storage.BucketInfo
import com.google.cloud.storage.Storage
import groovy.util.logging.Slf4j
import org.apache.commons.io.input.BoundedInputStream
import spock.lang.Ignore
import spock.lang.Specification

@Slf4j
Expand All @@ -37,6 +39,8 @@ class MultipartUploaderIT

static Storage storage

MetricRegistry metricRegistry = new MetricRegistry()

def setupSpec() {
config.attributes = [
'google cloud storage': [
Expand Down Expand Up @@ -67,7 +71,7 @@ class MultipartUploaderIT
def "simple multipart"() {
given:
long expectedSize = (1048576 * 3) + 2
MultipartUploader uploader = new MultipartUploader(1048576)
MultipartUploader uploader = new MultipartUploader(metricRegistry, 1048576)
byte[] data = new byte[expectedSize]
new Random().nextBytes(data)

Expand All @@ -84,7 +88,7 @@ class MultipartUploaderIT
// 5 each of abcdefg
final String content = "aaaaabbbbbcccccdddddeeeeefffffggggg"
byte[] data = content.bytes
MultipartUploader uploader = new MultipartUploader(5)
MultipartUploader uploader = new MultipartUploader(metricRegistry, 5)

when:
Blob blob = uploader.upload(storage, bucketName, 'vol-01/chap-01/control/in_order', new ByteArrayInputStream(data))
Expand All @@ -98,7 +102,7 @@ class MultipartUploaderIT
def "single part"() {
given:
long expectedSize = 1048575
MultipartUploader uploader = new MultipartUploader(1048576)
MultipartUploader uploader = new MultipartUploader(metricRegistry, 1048576)
byte[] data = new byte[expectedSize]
new Random().nextBytes(data)

Expand All @@ -113,7 +117,7 @@ class MultipartUploaderIT
def "zero byte file"() {
given:
long expectedSize = 0
MultipartUploader uploader = new MultipartUploader(1024)
MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024)
byte[] data = new byte[expectedSize]
new Random().nextBytes(data)

Expand All @@ -128,7 +132,7 @@ class MultipartUploaderIT
def "hit compose limit slightly and still successful"() {
given:
long expectedSize = (1024 * MultipartUploader.COMPOSE_REQUEST_LIMIT) + 10
MultipartUploader uploader = new MultipartUploader(1024)
MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024)
byte[] data = new byte[expectedSize]
new Random().nextBytes(data)

Expand All @@ -145,7 +149,7 @@ class MultipartUploaderIT
def "hit compose limit poorly tuned, still successful" () {
given:
long expectedSize = 1048576
MultipartUploader uploader = new MultipartUploader(1024)
MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024)
byte[] data = new byte[expectedSize]
new Random().nextBytes(data)

Expand Down Expand Up @@ -173,7 +177,7 @@ class MultipartUploaderIT
}
}, expectedSize)
// default value of 5 MB per chunk
MultipartUploader uploader = new MultipartUploader(1024 * 1024 * 5)
MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024 * 1024 * 5)

when:
Blob blob = uploader.upload(storage, bucketName,
Expand Down Expand Up @@ -202,7 +206,7 @@ class MultipartUploaderIT
}
}, expectedSize)
// with value of 5 MB per chunk, we'll upload 31 5 MB chunks and 1 45 MB chunk
MultipartUploader uploader = new MultipartUploader(1024 * 1024 * 5)
MultipartUploader uploader = new MultipartUploader(metricRegistry, 1024 * 1024 * 5)

when:
Blob blob = uploader.upload(storage, bucketName,
Expand Down

0 comments on commit e394e05

Please sign in to comment.