diff --git a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java index f03287e7310e..0b76f02af29e 100644 --- a/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java +++ b/extensions-core/azure-extensions/src/test/java/org/apache/druid/storage/azure/output/AzureStorageConnectorProviderTest.java @@ -56,15 +56,14 @@ public void createAzureStorageFactoryWithRequiredProperties() properties.setProperty(CUSTOM_NAMESPACE + ".container", "container"); properties.setProperty(CUSTOM_NAMESPACE + ".prefix", "prefix"); properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp"); - StorageConnectorProvider s3StorageConnectorProvider = getStorageConnectorProvider(properties); + StorageConnectorProvider storageConnectorProvider = getStorageConnectorProvider(properties); - assertInstanceOf(AzureStorageConnectorProvider.class, s3StorageConnectorProvider); - assertInstanceOf(AzureStorageConnector.class, s3StorageConnectorProvider.get()); - assertEquals("container", ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getContainer()); - assertEquals("prefix", ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getPrefix()); + assertInstanceOf(AzureStorageConnectorProvider.class, storageConnectorProvider); + assertInstanceOf(AzureStorageConnector.class, storageConnectorProvider.get()); + assertEquals("container", ((AzureStorageConnectorProvider) storageConnectorProvider).getContainer()); + assertEquals("prefix", ((AzureStorageConnectorProvider) storageConnectorProvider).getPrefix()); assertEquals(new File("/tmp"), - ((AzureStorageConnectorProvider) s3StorageConnectorProvider).getTempDir()); - + ((AzureStorageConnectorProvider) storageConnectorProvider).getTempDir()); } @Test @@ -72,7 +71,7 @@ public void createAzureStorageFactoryWithMissingPrefix() { final Properties properties = new Properties(); - properties.setProperty(CUSTOM_NAMESPACE + ".type", "s3"); + properties.setProperty(CUSTOM_NAMESPACE + ".type", "azure"); properties.setProperty(CUSTOM_NAMESPACE + ".container", "container"); properties.setProperty(CUSTOM_NAMESPACE + ".tempDir", "/tmp"); assertThrows( diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java index c71bb4e788b7..d0e5d0ee3ff6 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/RetryableS3OutputStream.java @@ -19,16 +19,12 @@ package org.apache.druid.storage.s3.output; -import com.amazonaws.AmazonServiceException; import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; -import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PartETag; -import com.amazonaws.services.s3.model.UploadPartRequest; import com.amazonaws.services.s3.model.UploadPartResult; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Stopwatch; import com.google.common.io.CountingOutputStream; import it.unimi.dsi.fastutil.io.FastBufferedOutputStream; @@ -49,6 +45,7 @@ import java.util.List; import java.util.Objects; import java.util.UUID; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; /** @@ -81,7 +78,6 @@ public class RetryableS3OutputStream extends OutputStream private final File chunkStorePath; private final long chunkSize; - private final List pushResults = new ArrayList<>(); private final byte[] singularBuffer = new byte[1]; // metric @@ -89,12 +85,6 @@ public class RetryableS3OutputStream extends OutputStream private Chunk currentChunk; private int nextChunkId = 1; // multipart upload requires partNumber to be in the range between 1 and 10000 - private int numChunksPushed; - /** - * Total size of all chunks. This size is updated whenever the chunk is ready for push, - * not when {@link #write(byte[], int, int)} is called. - */ - private long resultsSize; /** * A flag indicating whether there was an upload error. @@ -103,27 +93,28 @@ public class RetryableS3OutputStream extends OutputStream private boolean error; private boolean closed; - public RetryableS3OutputStream( - S3OutputConfig config, - ServerSideEncryptingAmazonS3 s3, - String s3Key - ) throws IOException - { + /** + * Helper class for calculating maximum number of simultaneous chunks allowed on local disk. + */ + private final S3UploadManager uploadManager; - this(config, s3, s3Key, true); - } + /** + * A list of futures to allow us to wait for completion of all uploadPart() calls + * before hitting {@link ServerSideEncryptingAmazonS3#completeMultipartUpload(CompleteMultipartUploadRequest)}. + */ + private final List> futures = new ArrayList<>(); - @VisibleForTesting - protected RetryableS3OutputStream( + public RetryableS3OutputStream( S3OutputConfig config, ServerSideEncryptingAmazonS3 s3, String s3Key, - boolean chunkValidation + S3UploadManager uploadManager ) throws IOException { this.config = config; this.s3 = s3; this.s3Key = s3Key; + this.uploadManager = uploadManager; final InitiateMultipartUploadResult result; try { @@ -138,9 +129,7 @@ protected RetryableS3OutputStream( this.chunkStorePath = new File(config.getTempDir(), uploadId + UUID.randomUUID()); FileUtils.mkdirp(this.chunkStorePath); this.chunkSize = config.getChunkSize(); - this.pushStopwatch = Stopwatch.createUnstarted(); - this.pushStopwatch.reset(); - + this.pushStopwatch = Stopwatch.createStarted(); this.currentChunk = new Chunk(nextChunkId, new File(chunkStorePath, String.valueOf(nextChunkId++))); } @@ -172,7 +161,6 @@ public void write(byte[] b, int off, int len) throws IOException while (remainingBytesToWrite > 0) { final int writtenBytes = writeToCurrentChunk(b, offsetToWrite, remainingBytesToWrite); - if (currentChunk.length() >= chunkSize) { pushCurrentChunk(); currentChunk = new Chunk(nextChunkId, new File(chunkStorePath, String.valueOf(nextChunkId++))); @@ -199,62 +187,11 @@ private void pushCurrentChunk() throws IOException { currentChunk.close(); final Chunk chunk = currentChunk; - try { - if (chunk.length() > 0) { - resultsSize += chunk.length(); - - pushStopwatch.start(); - pushResults.add(push(chunk)); - pushStopwatch.stop(); - numChunksPushed++; - } - } - finally { - if (!chunk.delete()) { - LOG.warn("Failed to delete chunk [%s]", chunk.getAbsolutePath()); - } - } - } - - private PartETag push(Chunk chunk) throws IOException - { - try { - return RetryUtils.retry( - () -> uploadPartIfPossible(uploadId, config.getBucket(), s3Key, chunk), - S3Utils.S3RETRY, - config.getMaxRetry() + if (chunk.length() > 0) { + futures.add( + uploadManager.queueChunkForUpload(s3, s3Key, chunk.id, chunk.file, uploadId, config) ); } - catch (AmazonServiceException e) { - throw new IOException(e); - } - catch (Exception e) { - throw new RuntimeException(e); - } - } - - private PartETag uploadPartIfPossible( - String uploadId, - String bucket, - String key, - Chunk chunk - ) - { - final ObjectMetadata objectMetadata = new ObjectMetadata(); - objectMetadata.setContentLength(resultsSize); - final UploadPartRequest uploadPartRequest = new UploadPartRequest() - .withUploadId(uploadId) - .withBucketName(bucket) - .withKey(key) - .withFile(chunk.file) - .withPartNumber(chunk.id) - .withPartSize(chunk.length()); - - if (LOG.isDebugEnabled()) { - LOG.debug("Pushing chunk [%s] to bucket[%s] and key[%s].", chunk, bucket, key); - } - UploadPartResult uploadResult = s3.uploadPart(uploadPartRequest); - return uploadResult.getPartETag(); } @Override @@ -268,53 +205,68 @@ public void close() throws IOException // Closeables are closed in LIFO order closer.register(() -> { + org.apache.commons.io.FileUtils.forceDelete(chunkStorePath); + LOG.info("Deleted chunkStorePath[%s]", chunkStorePath); + // This should be emitted as a metric + long totalChunkSize = (currentChunk.id - 1) * chunkSize + currentChunk.length(); LOG.info( "Pushed total [%d] parts containing [%d] bytes in [%d]ms.", - numChunksPushed, - resultsSize, + futures.size(), + totalChunkSize, pushStopwatch.elapsed(TimeUnit.MILLISECONDS) ); }); - closer.register(() -> org.apache.commons.io.FileUtils.forceDelete(chunkStorePath)); - - closer.register(() -> { - try { - if (resultsSize > 0 && isAllPushSucceeded()) { - RetryUtils.retry( - () -> s3.completeMultipartUpload( - new CompleteMultipartUploadRequest(config.getBucket(), s3Key, uploadId, pushResults) - ), - S3Utils.S3RETRY, - config.getMaxRetry() - ); - } else { - RetryUtils.retry( - () -> { - s3.cancelMultiPartUpload(new AbortMultipartUploadRequest(config.getBucket(), s3Key, uploadId)); - return null; - }, - S3Utils.S3RETRY, - config.getMaxRetry() - ); - } - } - catch (Exception e) { - throw new IOException(e); - } - }); - try (Closer ignored = closer) { if (!error) { pushCurrentChunk(); + completeMultipartUpload(); } } } - private boolean isAllPushSucceeded() + private void completeMultipartUpload() { - return !error && !pushResults.isEmpty() && numChunksPushed == pushResults.size(); + final List pushResults = new ArrayList<>(); + for (Future future : futures) { + if (error) { + future.cancel(true); + } + try { + UploadPartResult result = future.get(1, TimeUnit.HOURS); + pushResults.add(result.getPartETag()); + } + catch (Exception e) { + error = true; + LOG.error(e, "Error in uploading part for upload ID [%s]", uploadId); + } + } + + try { + boolean isAllPushSucceeded = !error && !pushResults.isEmpty() && futures.size() == pushResults.size(); + if (isAllPushSucceeded) { + RetryUtils.retry( + () -> s3.completeMultipartUpload( + new CompleteMultipartUploadRequest(config.getBucket(), s3Key, uploadId, pushResults) + ), + S3Utils.S3RETRY, + config.getMaxRetry() + ); + } else { + RetryUtils.retry( + () -> { + s3.cancelMultiPartUpload(new AbortMultipartUploadRequest(config.getBucket(), s3Key, uploadId)); + return null; + }, + S3Utils.S3RETRY, + config.getMaxRetry() + ); + } + } + catch (Exception e) { + throw new RuntimeException(e); + } } private static class Chunk implements Closeable @@ -336,16 +288,6 @@ private long length() return outputStream.getCount(); } - private boolean delete() - { - return file.delete(); - } - - private String getAbsolutePath() - { - return file.getAbsolutePath(); - } - @Override public boolean equals(Object o) { diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java index 9b03a4f07c74..ca599fc9d495 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3ExportStorageProvider.java @@ -56,6 +56,9 @@ public class S3ExportStorageProvider implements ExportStorageProvider @JacksonInject ServerSideEncryptingAmazonS3 s3; + @JacksonInject + S3UploadManager s3UploadManager; + @JsonCreator public S3ExportStorageProvider( @JsonProperty(value = "bucket", required = true) String bucket, @@ -90,7 +93,7 @@ public StorageConnector get() s3ExportConfig.getChunkSize(), s3ExportConfig.getMaxRetry() ); - return new S3StorageConnector(s3OutputConfig, s3); + return new S3StorageConnector(s3OutputConfig, s3, s3UploadManager); } @VisibleForTesting diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java index a68ed9c1c00c..8eb391a24d39 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnector.java @@ -55,15 +55,17 @@ public class S3StorageConnector extends ChunkingStorageConnector getJacksonModules() public void configure(Binder binder) { JsonConfigProvider.bind(binder, "druid.export.storage.s3", S3ExportConfig.class); + JsonConfigProvider.bind(binder, "druid.msq.intermediate.storage", S3OutputConfig.class); } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java index 7f4b43a0ede8..f86aee9a1aaf 100644 --- a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3StorageConnectorProvider.java @@ -38,6 +38,9 @@ public class S3StorageConnectorProvider extends S3OutputConfig implements Storag @JacksonInject ServerSideEncryptingAmazonS3 s3; + @JacksonInject + S3UploadManager s3UploadManager; + @JsonCreator public S3StorageConnectorProvider( @JsonProperty(value = "bucket", required = true) String bucket, @@ -53,6 +56,6 @@ public S3StorageConnectorProvider( @Override public StorageConnector get() { - return new S3StorageConnector(this, s3); + return new S3StorageConnector(this, s3, s3UploadManager); } } diff --git a/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java new file mode 100644 index 000000000000..9caa2bcb2e31 --- /dev/null +++ b/extensions-core/s3-extensions/src/main/java/org/apache/druid/storage/s3/output/S3UploadManager.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3.output; + +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import com.google.common.annotations.VisibleForTesting; +import com.google.inject.Inject; +import org.apache.druid.guice.ManageLifecycle; +import org.apache.druid.java.util.common.RetryUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.lifecycle.LifecycleStart; +import org.apache.druid.java.util.common.lifecycle.LifecycleStop; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.druid.storage.s3.S3Utils; +import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; +import org.apache.druid.utils.RuntimeInfo; + +import java.io.File; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +/** + * This class manages uploading files to S3 in chunks, while ensuring that the + * number of chunks currently present on local disk does not exceed a specific limit. + */ +@ManageLifecycle +public class S3UploadManager +{ + private final ExecutorService uploadExecutor; + + private static final Logger log = new Logger(S3UploadManager.class); + + @Inject + public S3UploadManager(S3OutputConfig s3OutputConfig, S3ExportConfig s3ExportConfig, RuntimeInfo runtimeInfo) + { + int poolSize = Math.max(4, runtimeInfo.getAvailableProcessors()); + int maxNumChunksOnDisk = computeMaxNumChunksOnDisk(s3OutputConfig, s3ExportConfig); + this.uploadExecutor = createExecutorService(poolSize, maxNumChunksOnDisk); + log.info("Initialized executor service for S3 multipart upload with pool size [%d] and work queue capacity [%d]", + poolSize, maxNumChunksOnDisk); + } + + /** + * Computes the maximum number of S3 upload chunks that can be kept on disk using the + * maximum chunk size specified in {@link S3OutputConfig} and {@link S3ExportConfig}. + */ + public static int computeMaxNumChunksOnDisk(S3OutputConfig s3OutputConfig, S3ExportConfig s3ExportConfig) + { + long maxChunkSize = S3OutputConfig.S3_MULTIPART_UPLOAD_MIN_PART_SIZE_BYTES; + if (s3OutputConfig != null && s3OutputConfig.getChunkSize() != null) { + maxChunkSize = Math.max(maxChunkSize, s3OutputConfig.getChunkSize()); + } + if (s3ExportConfig != null && s3ExportConfig.getChunkSize() != null) { + maxChunkSize = Math.max(maxChunkSize, s3ExportConfig.getChunkSize().getBytes()); + } + + return (int) (S3OutputConfig.S3_MULTIPART_UPLOAD_MAX_PART_SIZE_BYTES / maxChunkSize); + } + + /** + * Queues a chunk of a file for upload to S3 as part of a multipart upload. + */ + public Future queueChunkForUpload( + ServerSideEncryptingAmazonS3 s3Client, + String key, + int chunkNumber, + File chunkFile, + String uploadId, + S3OutputConfig config + ) + { + return uploadExecutor.submit(() -> RetryUtils.retry( + () -> { + log.debug("Uploading chunk[%d] for uploadId[%s].", chunkNumber, uploadId); + UploadPartResult uploadPartResult = uploadPartIfPossible( + s3Client, + uploadId, + config.getBucket(), + key, + chunkNumber, + chunkFile + ); + if (!chunkFile.delete()) { + log.warn("Failed to delete chunk [%s]", chunkFile.getAbsolutePath()); + } + return uploadPartResult; + }, + S3Utils.S3RETRY, + config.getMaxRetry() + )); + } + + @VisibleForTesting + UploadPartResult uploadPartIfPossible( + ServerSideEncryptingAmazonS3 s3Client, + String uploadId, + String bucket, + String key, + int chunkNumber, + File chunkFile + ) + { + final UploadPartRequest uploadPartRequest = new UploadPartRequest() + .withUploadId(uploadId) + .withBucketName(bucket) + .withKey(key) + .withFile(chunkFile) + .withPartNumber(chunkNumber) + .withPartSize(chunkFile.length()); + + if (log.isDebugEnabled()) { + log.debug("Pushing chunk[%s] to bucket[%s] and key[%s].", chunkNumber, bucket, key); + } + return s3Client.uploadPart(uploadPartRequest); + } + + private ExecutorService createExecutorService(int poolSize, int maxNumConcurrentChunks) + { + return Execs.newBlockingThreaded("S3UploadThreadPool-%d", poolSize, maxNumConcurrentChunks); + } + + @LifecycleStart + public void start() + { + // No state startup required + } + + @LifecycleStop + public void stop() + { + uploadExecutor.shutdown(); + } + +} diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java index 9f9d632f6181..676352daf4f5 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageConnectorProviderTest.java @@ -31,12 +31,18 @@ import org.apache.druid.guice.JsonConfigProvider; import org.apache.druid.guice.LazySingleton; import org.apache.druid.guice.StartupInjectorBuilder; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.StorageConnectorModule; import org.apache.druid.storage.StorageConnectorProvider; +import org.apache.druid.storage.s3.output.S3ExportConfig; +import org.apache.druid.storage.s3.output.S3OutputConfig; import org.apache.druid.storage.s3.output.S3StorageConnector; import org.apache.druid.storage.s3.output.S3StorageConnectorModule; import org.apache.druid.storage.s3.output.S3StorageConnectorProvider; +import org.apache.druid.storage.s3.output.S3UploadManager; +import org.easymock.EasyMock; import org.junit.Assert; import org.junit.Test; @@ -146,7 +152,15 @@ public void configure(Binder binder) .addValue( ServerSideEncryptingAmazonS3.class, new ServerSideEncryptingAmazonS3(null, new NoopServerSideEncryption()) - )); + ) + .addValue( + S3UploadManager.class, + new S3UploadManager( + new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1), + new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null), + new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)) + ) + ); StorageConnectorProvider storageConnectorProvider = injector.getInstance(Key.get( diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java index 4d9b4f2d6ee6..3932c147695b 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/S3StorageDruidModuleTest.java @@ -23,6 +23,7 @@ import com.google.inject.Injector; import org.apache.druid.common.aws.AWSModule; import org.apache.druid.guice.GuiceInjectors; +import org.apache.druid.guice.ServerModule; import org.apache.druid.segment.loading.OmniDataSegmentArchiver; import org.apache.druid.segment.loading.OmniDataSegmentKiller; import org.apache.druid.segment.loading.OmniDataSegmentMover; @@ -72,7 +73,8 @@ private static Injector createInjector() return GuiceInjectors.makeStartupInjectorWithModules( ImmutableList.of( new AWSModule(), - new S3StorageDruidModule() + new S3StorageDruidModule(), + new ServerModule() ) ); } diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java index 1f8eac3bbae0..8e7a81eb48dd 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/RetryableS3OutputStreamTest.java @@ -33,10 +33,10 @@ import org.apache.druid.java.util.common.HumanReadableBytes; import org.apache.druid.java.util.common.IOE; import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.storage.s3.NoopServerSideEncryption; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; import org.easymock.EasyMock; -import org.hamcrest.CoreMatchers; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -50,6 +50,7 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; public class RetryableS3OutputStreamTest @@ -63,10 +64,11 @@ public class RetryableS3OutputStreamTest private final TestAmazonS3 s3 = new TestAmazonS3(0); private final String path = "resultId"; - private S3OutputConfig config; private long chunkSize; + private S3UploadManager s3UploadManager; + @Before public void setup() throws IOException { @@ -99,6 +101,11 @@ public int getMaxRetry() return 2; } }; + + s3UploadManager = new S3UploadManager( + new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("5MiB"), 1), + new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null), + new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0)); } @Test @@ -110,7 +117,7 @@ public void testWriteAndHappy() throws IOException config, s3, path, - false + s3UploadManager )) { for (int i = 0; i < 25; i++) { bb.clear(); @@ -132,7 +139,7 @@ public void testWriteSizeLargerThanConfiguredMaxChunkSizeShouldSucceed() throws config, s3, path, - false + s3UploadManager )) { bb.clear(); bb.putInt(1); @@ -153,7 +160,7 @@ public void testWriteSmallBufferShouldSucceed() throws IOException config, s3, path, - false + s3UploadManager )) { for (int i = 0; i < 600; i++) { out.write(i); @@ -175,7 +182,7 @@ public void testSuccessToUploadAfterRetry() throws IOException config, s3, path, - false + s3UploadManager )) { for (int i = 0; i < 25; i++) { bb.clear(); @@ -198,7 +205,7 @@ public void testFailToUploadAfterRetries() throws IOException config, s3, path, - false + s3UploadManager )) { for (int i = 0; i < 2; i++) { bb.clear(); @@ -206,9 +213,6 @@ public void testFailToUploadAfterRetries() throws IOException out.write(bb.array()); } - expectedException.expect(RuntimeException.class); - expectedException.expectCause(CoreMatchers.instanceOf(AmazonClientException.class)); - expectedException.expectMessage("Upload failure test. Remaining failures [1]"); bb.clear(); bb.putInt(3); out.write(bb.array()); @@ -249,9 +253,11 @@ public UploadPartResult uploadPart(UploadPartRequest request) throws SdkClientEx new IOE("Upload failure test. Remaining failures [%s]", --uploadFailuresLeft) ); } - partRequests.add(request); + synchronized (partRequests) { + partRequests.add(request); + } UploadPartResult result = new UploadPartResult(); - result.setETag(StringUtils.format("%s", request.getPartNumber())); + result.setETag(StringUtils.format("etag-%s", request.getPartNumber())); result.setPartNumber(request.getPartNumber()); return result; } @@ -275,8 +281,10 @@ private void assertCompleted(long chunkSize, long expectedFileSize) Assert.assertNotNull(completeRequest); Assert.assertFalse(cancelled); + Set partNumbersFromRequest = partRequests.stream().map(UploadPartRequest::getPartNumber).collect(Collectors.toSet()); + Assert.assertEquals(partRequests.size(), partNumbersFromRequest.size()); + for (int i = 0; i < partRequests.size(); i++) { - Assert.assertEquals(i + 1, partRequests.get(i).getPartNumber()); if (i < partRequests.size() - 1) { Assert.assertEquals(chunkSize, partRequests.get(i).getPartSize()); } else { @@ -286,12 +294,12 @@ private void assertCompleted(long chunkSize, long expectedFileSize) final List eTags = completeRequest.getPartETags(); Assert.assertEquals(partRequests.size(), eTags.size()); Assert.assertEquals( - partRequests.stream().map(UploadPartRequest::getPartNumber).collect(Collectors.toList()), - eTags.stream().map(PartETag::getPartNumber).collect(Collectors.toList()) + partNumbersFromRequest, + eTags.stream().map(PartETag::getPartNumber).collect(Collectors.toSet()) ); Assert.assertEquals( - partRequests.stream().map(UploadPartRequest::getPartNumber).collect(Collectors.toList()), - eTags.stream().map(tag -> Integer.parseInt(tag.getETag())).collect(Collectors.toList()) + partNumbersFromRequest.stream().map(partNumber -> "etag-" + partNumber).collect(Collectors.toSet()), + eTags.stream().map(PartETag::getETag).collect(Collectors.toSet()) ); Assert.assertEquals( expectedFileSize, diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java index 380c5cb1e508..67dcb3b6db6c 100644 --- a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3StorageConnectorTest.java @@ -31,6 +31,8 @@ import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.query.DruidProcessingConfigTest; import org.apache.druid.storage.StorageConnector; import org.apache.druid.storage.s3.NoopServerSideEncryption; import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; @@ -77,14 +79,18 @@ public class S3StorageConnectorTest public void setup() { try { - storageConnector = new S3StorageConnector(new S3OutputConfig( + S3OutputConfig s3OutputConfig = new S3OutputConfig( BUCKET, PREFIX, temporaryFolder.newFolder(), null, null, true - ), service); + ); + storageConnector = new S3StorageConnector(s3OutputConfig, service, new S3UploadManager( + s3OutputConfig, + new S3ExportConfig("tempDir", new HumanReadableBytes("5MiB"), 1, null), + new DruidProcessingConfigTest.MockRuntimeInfo(10, 0, 0))); } catch (IOException e) { throw new RuntimeException(e); diff --git a/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java new file mode 100644 index 000000000000..b79c392844d3 --- /dev/null +++ b/extensions-core/s3-extensions/src/test/java/org/apache/druid/storage/s3/output/S3UploadManagerTest.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.storage.s3.output; + +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.amazonaws.services.s3.model.UploadPartResult; +import org.apache.druid.java.util.common.HumanReadableBytes; +import org.apache.druid.query.DruidProcessingConfigTest; +import org.apache.druid.storage.s3.ServerSideEncryptingAmazonS3; +import org.apache.druid.utils.RuntimeInfo; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; + +public class S3UploadManagerTest +{ + + private S3UploadManager s3UploadManager; + private S3OutputConfig s3OutputConfig; + private S3ExportConfig s3ExportConfig; + + @Before + public void setUp() + { + s3OutputConfig = new S3OutputConfig("bucket", "prefix", EasyMock.mock(File.class), new HumanReadableBytes("100MiB"), 1); + s3ExportConfig = new S3ExportConfig("tempDir", new HumanReadableBytes("200MiB"), 1, null); + final RuntimeInfo runtimeInfo = new DruidProcessingConfigTest.MockRuntimeInfo(8, 0, 0); + s3UploadManager = new S3UploadManager(s3OutputConfig, s3ExportConfig, runtimeInfo); + } + + @Test + public void testQueueChunkForUpload() throws Exception + { + ServerSideEncryptingAmazonS3 s3Client = EasyMock.mock(ServerSideEncryptingAmazonS3.class); + + File chunkFile = EasyMock.mock(File.class); + EasyMock.expect(chunkFile.length()).andReturn(1024L).anyTimes(); + EasyMock.expect(chunkFile.delete()).andReturn(true).anyTimes(); + + int chunkId = 42; + UploadPartResult uploadPartResult = new UploadPartResult(); + uploadPartResult.setPartNumber(chunkId); + uploadPartResult.setETag("etag"); + EasyMock.expect(s3Client.uploadPart(EasyMock.anyObject(UploadPartRequest.class))).andReturn(uploadPartResult); + + EasyMock.replay(chunkFile, s3Client); + + Future result = s3UploadManager.queueChunkForUpload(s3Client, "test-key", chunkId, chunkFile, "upload-id", s3OutputConfig); + + UploadPartResult futureResult = result.get(); + Assert.assertEquals(chunkId, futureResult.getPartNumber()); + Assert.assertEquals("etag", futureResult.getETag()); + } + + @Test + public void testComputeMaxNumChunksOnDisk() + { + int maxNumConcurrentChunks = S3UploadManager.computeMaxNumChunksOnDisk(s3OutputConfig, s3ExportConfig); + int expectedMaxNumConcurrentChunks = 25; // maxChunkSizePossible/200 MB + assertEquals(expectedMaxNumConcurrentChunks, maxNumConcurrentChunks); + } + + @Test + public void testComputeMaxNumChunksOnDiskWithNullOutputConfig() + { + // Null S3OutputConfig + int maxNumConcurrentChunks = S3UploadManager.computeMaxNumChunksOnDisk(null, s3ExportConfig); + int expectedMaxNumConcurrentChunks = 25; // maxChunkSizePossible / s3ExportConfig's chunk size + assertEquals(expectedMaxNumConcurrentChunks, maxNumConcurrentChunks); + + // Null S3OutputConfig#getChunkSize() + maxNumConcurrentChunks = S3UploadManager.computeMaxNumChunksOnDisk(EasyMock.mock(S3OutputConfig.class), s3ExportConfig); + assertEquals(expectedMaxNumConcurrentChunks, maxNumConcurrentChunks); + } + + @Test + public void testComputeMaxNumChunksOnDiskWithNullExportConfig() + { + // Null S3ExportConfig + int maxNumConcurrentChunks = S3UploadManager.computeMaxNumChunksOnDisk(s3OutputConfig, null); + int expectedMaxNumConcurrentChunks = 51; // maxChunkSizePossible / s3OutputConfig's chunk size + assertEquals(expectedMaxNumConcurrentChunks, maxNumConcurrentChunks); + + // Null S3ExportConfig#getChunkSize() + maxNumConcurrentChunks = S3UploadManager.computeMaxNumChunksOnDisk(s3OutputConfig, EasyMock.mock(S3ExportConfig.class)); + assertEquals(expectedMaxNumConcurrentChunks, maxNumConcurrentChunks); + } + + @Test + public void testUploadPartIfPossible() + { + ServerSideEncryptingAmazonS3 s3Client = EasyMock.mock(ServerSideEncryptingAmazonS3.class); + + File chunkFile = EasyMock.mock(File.class); + EasyMock.expect(chunkFile.length()).andReturn(1024L).anyTimes(); + + UploadPartResult uploadPartResult = new UploadPartResult(); + Capture partRequestCapture = EasyMock.newCapture(); + EasyMock.expect(s3Client.uploadPart(EasyMock.capture(partRequestCapture))).andReturn(uploadPartResult); + EasyMock.replay(s3Client, chunkFile); + + UploadPartResult result = s3UploadManager.uploadPartIfPossible(s3Client, "upload-id", "bucket", "key", 1, chunkFile); + + UploadPartRequest capturedRequest = partRequestCapture.getValue(); + assertEquals("upload-id", capturedRequest.getUploadId()); + assertEquals("bucket", capturedRequest.getBucketName()); + assertEquals("key", capturedRequest.getKey()); + assertEquals(1, capturedRequest.getPartNumber()); + assertEquals(chunkFile, capturedRequest.getFile()); + assertEquals(1024L, capturedRequest.getPartSize()); + + assertEquals(uploadPartResult, result); + } + + @After + public void teardown() + { + s3UploadManager.stop(); + } +} diff --git a/processing/src/main/java/org/apache/druid/storage/StorageConnector.java b/processing/src/main/java/org/apache/druid/storage/StorageConnector.java index d99d9469f08f..65e5c626d959 100644 --- a/processing/src/main/java/org/apache/druid/storage/StorageConnector.java +++ b/processing/src/main/java/org/apache/druid/storage/StorageConnector.java @@ -42,7 +42,7 @@ *
  • {@code druid.extension.custom.type="s3"} *
  • {@code druid.extension.custom.bucket="myBucket"} * - * The final state of this inteface would have + * The final state of this interface would have *
      *
    1. Future Non blocking API's
    2. *
    3. Offset based fetch