Skip to content
This repository has been archived by the owner on Nov 8, 2024. It is now read-only.

Improve throughput by 50+% via parallel uploads and disabling gzip compression #53

Merged
merged 23 commits into from
Dec 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2ef9d19
feat: parallel multipart upload for blob content
nblair Oct 28, 2018
b8355be
first chunk happens on the calling thread, 2nd+ in the executor
nblair Oct 30, 2018
a32441c
add missing mock behavior for 'store a blob' test
nblair Oct 30, 2018
53e777a
wip ADR document
nblair Nov 15, 2018
f98f5a5
fix: no longer including first chunk in names passed to cleanup
nblair Jan 4, 2019
8734395
remove unused method
nblair Jan 10, 2019
f2803ab
remove unused code
nblair Jan 10, 2019
77a3232
feat: parallel multipart upload for blob content
nblair Oct 28, 2018
f8fe68b
first chunk happens on the calling thread, 2nd+ in the executor
nblair Oct 30, 2018
492fb36
add missing mock behavior for 'store a blob' test
nblair Oct 30, 2018
886c093
wip ADR document
nblair Nov 15, 2018
cc4b709
fix: no longer including first chunk in names passed to cleanup
nblair Jan 4, 2019
f897b70
remove unused method
nblair Jan 10, 2019
ba48231
remove unused code
nblair Jan 10, 2019
4869f24
wip
nblair Oct 9, 2019
5a894ae
Merge branch 'multipart-upload' of github.com:sonatype-nexus-communit…
nblair Dec 12, 2019
3376574
Merge branch 'master' into multipart-upload
nblair Dec 12, 2019
a1e270b
disable GZIP compression on storage writes
nblair Dec 13, 2019
e3c046f
cleanup formatting
nblair Dec 13, 2019
f17a17b
re-ignore large file test
nblair Dec 13, 2019
a41e736
update format and language in ADR
nblair Dec 13, 2019
e394e05
instrument the uploader ExecutorService, review feedback
nblair Dec 18, 2019
8b082b2
remove unused import
nblair Dec 18, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions design/multipart_parallel_upload.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Multipart Parallel Upload Design

This document is intended to capture the context around the component within this blobstore that provides
Parallel Multipart Upload:

[MultipartUploader.java](../src/main/java/org/sonatype/nexus/blobstore/gcloud/internal/MultipartUploader.java)

## Context and Idea

gsutil concept: [Parallel Composite Uploads](https://cloud.google.com/storage/docs/gsutil/commands/cp#parallel-composite-uploads)

The google-cloud-storage library for Java this plugin is built with does not have a provided mechanism for parallel
composite uploads.

In NXRM 3.19, the Amazon S3 blobstore switched to using parallel uploads, see

https://github.com/sonatype/nexus-public/blob/master/plugins/nexus-blobstore-s3/src/main/java/org/sonatype/nexus/blobstore/s3/internal/ParallelUploader.java

This switch has resulted in higher overall throughput for the S3 Blobstores (see https://issues.sonatype.org/browse/NEXUS-19566).
The goal for this feature would be to replicate that parallel upload.

## Implementation

* We don't have content length in the Blobstore API, have an `InputStream` than can be quite large.
* GCS compose method has a hard limit of 32 chunks. Since we don't have the length, we can't
split into 32 equal parts. We can do 31 chunks of a chunkSize parameter, then 1 chunk of the rest.
* We should expose how many times that compose limit as hit with tips on how to re-configure.
* For files smaller than 1 chunk size, we don't pay the cost of shipping the upload request to a thread.
* The first chunk is written at the expected destination path.
* If we've read chunkSize, and there is still more data on the stream, schedule the 2nd through final chunks off thread
* A blob write still waits until completion of parallel uploads. This is an important characteristic of all BlobStores;
Nexus Repository Manager expects consistency and not deferred, eventual consistency.

## Tuning

* Observability via debug logging on `org.sonatype.nexus.blobstore.gcloud.internal.MultipartUploader`
* 5 MB default and 32 chunk compose limit means optimal threads utilization will be in place for files between 10 MB
and 160 MB in size.
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,20 @@ public class GoogleCloudBlobStore

private BlobStoreQuotaService quotaService;

private final MultipartUploader multipartUploader;

private PeriodicJob quotaCheckingJob;

private final int quotaCheckInterval;

@Inject
public GoogleCloudBlobStore(final GoogleCloudStorageFactory storageFactory,
final BlobIdLocationResolver blobIdLocationResolver,
final PeriodicJobService periodicJobService,
final ShardedCounterMetricsStore metricsStore,
final GoogleCloudDatastoreFactory datastoreFactory,
final DryRunPrefix dryRunPrefix,
final MultipartUploader multipartUploader,
final MetricRegistry metricRegistry,
final BlobStoreQuotaService quotaService,
@Named("${nexus.blobstore.quota.warnIntervalSeconds:-60}")
Expand All @@ -154,6 +157,7 @@ public GoogleCloudBlobStore(final GoogleCloudStorageFactory storageFactory,
this.storageFactory = checkNotNull(storageFactory);
this.metricsStore = metricsStore;
this.datastoreFactory = datastoreFactory;
this.multipartUploader = multipartUploader;
this.metricRegistry = metricRegistry;
this.quotaService = quotaService;
this.quotaCheckInterval = quotaCheckInterval;
Expand All @@ -174,7 +178,7 @@ protected void doStart() throws Exception {
metadata.store();
}
liveBlobs = CacheBuilder.newBuilder().weakValues().recordStats().build(from(GoogleCloudStorageBlob::new));

wrapWithGauge("liveBlobsCache.size", () -> liveBlobs.size());
wrapWithGauge("liveBlobsCache.hitCount", () -> liveBlobs.stats().hitCount());
wrapWithGauge("liveBlobsCache.missCount", () -> liveBlobs.stats().missCount());
Expand Down Expand Up @@ -210,7 +214,8 @@ protected Blob doCreate(final InputStream blobData,
return createInternal(headers, destination -> {
try (InputStream data = blobData) {
MetricsInputStream input = new MetricsInputStream(data);
bucket.create(destination, input);

multipartUploader.upload(storage, getConfiguredBucketName(), destination, input);
return input.getMetrics();
}
}, blobId);
Expand All @@ -229,7 +234,6 @@ public Blob copy(final BlobId blobId, final Map<String, String> headers) {

return createInternal(headers, destination -> {
sourceBlob.getBlob().copyTo(getConfiguredBucketName(), destination);

BlobMetrics metrics = sourceBlob.getMetrics();
return new StreamMetrics(metrics.getContentSize(), metrics.getSha1Hash());
}, null);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/*
* Sonatype Nexus (TM) Open Source Version
* Copyright (c) 2017-present Sonatype, Inc.
* All rights reserved. Includes the third-party code listed at http://links.sonatype.com/products/nexus/oss/attributions.
*
* This program and the accompanying materials are made available under the terms of the Eclipse Public License Version 1.0,
* which accompanies this distribution and is available at http://www.eclipse.org/legal/epl-v10.html.
*
* Sonatype Nexus (TM) Professional Version is available from Sonatype, Inc. "Sonatype" and "Sonatype Nexus" are trademarks
* of Sonatype, Inc. Apache Maven is a trademark of the Apache Software Foundation. M2eclipse is a trademark of the
* Eclipse Foundation. All other trademarks are the property of their respective owners.
*/
package org.sonatype.nexus.blobstore.gcloud.internal;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

import javax.inject.Inject;
import javax.inject.Named;

import org.sonatype.nexus.blobstore.api.BlobStoreException;
import org.sonatype.nexus.common.stateguard.StateGuardLifecycleSupport;
import org.sonatype.nexus.thread.NexusThreadFactory;

import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.MetricRegistry;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.Storage.BlobTargetOption;
import com.google.cloud.storage.Storage.ComposeRequest;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import static java.lang.String.format;

/**
* Component that provides parallel multipart upload support for blob binary data (.bytes files).
*/
@Named
public class MultipartUploader
extends StateGuardLifecycleSupport
{

/**
* Use this property in 'nexus.properties' to control how large each multipart part is. Default is 5 MB.
* Smaller numbers increase the number of parallel workers used to upload a file. Match to your workload:
* if you are heavy in docker with large images, increase; if you are heavy in smaller components, decrease.
*/
public static final String CHUNK_SIZE_PROPERTY = "nexus.gcs.multipartupload.chunksize";

/**
* This is a hard limit on the number of components to a compose request enforced by Google Cloud Storage API.
*/
static final int COMPOSE_REQUEST_LIMIT = 32;

/**
* While an invocation of {@link #upload(Storage, String, String, InputStream)} is in-flight, the individual
* chunks of the file will have names like 'destination.chunkPartNumber", like
* 'content/vol-01/chap-01/UUID.bytes.chunk1', 'content/vol-01/chap-01/UUID.bytes.chunk2', etc.
*/
private final String CHUNK_NAME_PART = ".chunk";

/**
* Used internally to count how many times we've hit the compose limit.
* Consider exposing this as a bean that can provide tuning feedback to deployers.
*/
private final AtomicLong composeLimitHit = new AtomicLong(0);
nblair marked this conversation as resolved.
Show resolved Hide resolved

private static final byte[] EMPTY = new byte[0];

private final ListeningExecutorService executorService;

private final int chunkSize;

@Inject
public MultipartUploader(final MetricRegistry metricRegistry,
@Named("${"+CHUNK_SIZE_PROPERTY +":-5242880}") final int chunkSize) {
this.chunkSize = chunkSize;
this.executorService = MoreExecutors.listeningDecorator(
new InstrumentedExecutorService(
Executors.newCachedThreadPool(
new NexusThreadFactory("multipart-upload", "nexus-blobstore-google-cloud")),
metricRegistry, format("%s.%s", MultipartUploader.class.getName(), "executor-service")));
}

@Override
protected void doStop() throws Exception {
executorService.shutdownNow();
}

/**
* @return the value for the {@link #CHUNK_SIZE_PROPERTY}
*/
public int getChunkSize() {
return chunkSize;
}

/**
* @return the number of times {@link #upload(Storage, String, String, InputStream)} hit the multipart-compose limit
*/
public long getNumberOfTimesComposeLimitHit() {
return composeLimitHit.get();
}

/**
* @param storage an initialized {@link Storage} instance
* @param bucket the name of the bucket
* @param destination the the destination (relative to the bucket)
* @param contents the stream of data to store
* @return the successfully stored {@link Blob}
* @throws BlobStoreException if any part of the upload failed
*/
public Blob upload(final Storage storage, final String bucket, final String destination, final InputStream contents) {
log.debug("Starting multipart upload for destination {} in bucket {}", destination, bucket);
// this must represent the bucket-relative paths to the chunks, in order of composition
List<String> chunkNames = new ArrayList<>();

Optional<Blob> singleChunk = Optional.empty();
try (InputStream current = contents) {
List<ListenableFuture<Blob>> chunkFutures = new ArrayList<>();
// MUST respect hard limit of 32 chunks per compose request
for (int partNumber = 1; partNumber <= COMPOSE_REQUEST_LIMIT; partNumber++) {
final byte[] chunk;
if (partNumber < COMPOSE_REQUEST_LIMIT) {
chunk = readChunk(current);
}
else {
// we've hit compose request limit read the rest of the stream
composeLimitHit.incrementAndGet();
chunk = EMPTY;
log.info("Upload for {} has hit Google Cloud Storage multipart-compose limit ({} total times limit hit); " +
"consider increasing '{}' beyond current value of {}", destination, composeLimitHit.get(),
CHUNK_SIZE_PROPERTY, getChunkSize());

final String finalChunkName = toChunkName(destination, COMPOSE_REQUEST_LIMIT);
chunkNames.add(finalChunkName);
chunkFutures.add(executorService.submit(() -> {
log.debug("Uploading final chunk {} for {} of unknown remaining bytes", COMPOSE_REQUEST_LIMIT, destination);
BlobInfo blobInfo = BlobInfo.newBuilder(bucket, finalChunkName).build();
// read the rest of the current stream
// downside here is that since we don't know the stream size, we can't chunk it.
// the deprecated create method here does not allow us to disable GZIP compression on these PUTs
return storage.create(blobInfo, current);
}));
}

if (chunk == EMPTY && partNumber > 1) {
break;
}

final String chunkName = toChunkName(destination, partNumber);
chunkNames.add(chunkName);

if (partNumber == 1) {
// upload the first part on the current thread
BlobInfo blobInfo = BlobInfo.newBuilder(bucket, chunkName).build();
Blob blob = storage.create(blobInfo, chunk, BlobTargetOption.disableGzipContent());
singleChunk = Optional.of(blob);
}
else {
singleChunk = Optional.empty();
// 2nd through N chunks will happen off current thread in parallel
final int chunkIndex = partNumber;
chunkFutures.add(executorService.submit(() -> {
log.debug("Uploading chunk {} for {} of {} bytes", chunkIndex, destination, chunk.length);
BlobInfo blobInfo = BlobInfo.newBuilder(
bucket, chunkName).build();
return storage.create(blobInfo, chunk, BlobTargetOption.disableGzipContent());
}));
}
}

// return the single result if it exists; otherwise finalize the parallel multipart workers
return singleChunk.orElseGet(() -> {
CountDownLatch block = new CountDownLatch(1);
Futures.whenAllComplete(chunkFutures).run(() -> block.countDown() , MoreExecutors.directExecutor());
// wait for all the futures to complete
log.debug("waiting for {} remaining chunks to complete", chunkFutures.size());
try {
block.await();
}
catch (InterruptedException e) {
log.error("caught InterruptedException waiting for multipart upload to complete on {}", destination);
throw new RuntimeException(e);
}
log.debug("chunk uploads completed, sending compose request");

// finalize with compose request to coalesce the chunks
Blob finalBlob = storage.compose(ComposeRequest.of(bucket, chunkNames, destination));
log.debug("Multipart upload of {} complete", destination);
return finalBlob;
});
}
catch(Exception e) {
throw new BlobStoreException("Error uploading blob", e, null);
}
finally {
// remove any .chunkN files off-thread
// make sure not to delete the first chunk (which has the desired destination name with no suffix)
deferredCleanup(storage, bucket, chunkNames);
}
}

private void deferredCleanup(final Storage storage, final String bucket, final List<String> chunkNames) {
executorService.submit(() -> chunkNames.stream()
.filter(part -> part.contains(CHUNK_NAME_PART))
.forEach(chunk -> storage.delete(bucket, chunk)));
}

/**
* The name of the first chunk should match the desired end destination.
* For any chunk index 2 or greater, this method will return the destination + the chunk name suffix.
*
* @param destination
* @param chunkNumber
* @return the name to store this chunk
*/
private String toChunkName(String destination, int chunkNumber) {
if (chunkNumber == 1) {
return destination;
}
return destination + CHUNK_NAME_PART + chunkNumber;
}

/**
* Read a chunk of the stream up to {@link #getChunkSize()} in length.
*
* @param input the stream to read
* @return the read data as a byte array
* @throws IOException
*/
private byte[] readChunk(final InputStream input) throws IOException {
byte[] buffer = new byte[chunkSize];
int offset = 0;
int remain = chunkSize;
int bytesRead = 0;

while (remain > 0 && bytesRead >= 0) {
bytesRead = input.read(buffer, offset, remain);
if (bytesRead > 0) {
offset += bytesRead;
remain -= bytesRead;
}
}
if (offset > 0) {
return Arrays.copyOfRange(buffer, 0, offset);
}
else {
return EMPTY;
}
}
}
Loading