From e2b4a93497f18e22cbc0a36eedaa9950cebb0384 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Thu, 8 Dec 2022 15:47:43 +0100 Subject: [PATCH] Fix math overflow when copying large AWS S3 files Signed-off-by: Paolo Di Tommaso --- .../groovy/nextflow/trace/TraceHelper.groovy | 2 +- .../nextflow/trace/TraceHelperTest.groovy | 2 +- .../com/upplication/s3fs/AmazonS3Client.java | 18 ++- .../s3fs/util/S3MultipartOptions.java | 14 +- .../upplication/s3fs/util/S3UploadHelper.java | 101 ++++++++++++++ .../com/upplication/s3fs/AwsS3NioTest.groovy | 8 +- .../s3fs/util/S3UploadHelperTest.groovy | 128 ++++++++++++++++++ 7 files changed, 256 insertions(+), 17 deletions(-) create mode 100644 plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3UploadHelper.java create mode 100644 plugins/nf-amazon/src/test/com/upplication/s3fs/util/S3UploadHelperTest.groovy diff --git a/modules/nextflow/src/main/groovy/nextflow/trace/TraceHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/trace/TraceHelper.groovy index 855040a063..3a50e6b453 100644 --- a/modules/nextflow/src/main/groovy/nextflow/trace/TraceHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/trace/TraceHelper.groovy @@ -63,7 +63,7 @@ class TraceHelper { Files.newBufferedWriter(path, Charset.defaultCharset(), openOptions(overwrite)) } catch (FileAlreadyExistsException e) { - throw new AbortOperationException("$type file already exists: ${path.toUriString()} -- enable the relevant `overwrite` option in your config file to overwrite existing files", e) + throw new AbortOperationException("$type file already exists: ${path.toUriString()} -- enable the '${type.toLowerCase()}.overwrite' option in your config file to overwrite existing files", e) } } } diff --git a/modules/nextflow/src/test/groovy/nextflow/trace/TraceHelperTest.groovy b/modules/nextflow/src/test/groovy/nextflow/trace/TraceHelperTest.groovy index 70b629f354..25bb2611a4 100644 --- a/modules/nextflow/src/test/groovy/nextflow/trace/TraceHelperTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/trace/TraceHelperTest.groovy @@ -56,7 +56,7 @@ class TraceHelperTest extends Specification { TraceHelper.newFileWriter(path, false, 'Test') then: def e = thrown(AbortOperationException) - e.message == "Test file already exists: $path -- enable the relevant `overwrite` option in your config file to overwrite existing files" + e.message == "Test file already exists: $path -- enable the 'test.overwrite' option in your config file to overwrite existing files" cleanup: folder?.deleteDir() diff --git a/plugins/nf-amazon/src/main/com/upplication/s3fs/AmazonS3Client.java b/plugins/nf-amazon/src/main/com/upplication/s3fs/AmazonS3Client.java index 683924fa1f..d343aa4591 100644 --- a/plugins/nf-amazon/src/main/com/upplication/s3fs/AmazonS3Client.java +++ b/plugins/nf-amazon/src/main/com/upplication/s3fs/AmazonS3Client.java @@ -105,6 +105,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.upplication.s3fs.util.S3UploadHelper.*; + /** * Client Amazon S3 * @see com.amazonaws.services.s3.AmazonS3Client @@ -359,10 +361,12 @@ public ObjectListing listNextBatchOfObjects(ObjectListing objectListing) { return client.listNextBatchOfObjects(objectListing); } + public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSize, S3MultipartOptions opts, List tags, String contentType ) { final String sourceBucketName = s3Source.getBucket(); final String sourceObjectKey = s3Source.getKey(); + final String sourceS3Path = "s3://"+sourceBucketName+'/'+sourceObjectKey; final String targetBucketName = s3Target.getBucket(); final String targetObjectKey = s3Target.getKey(); final ObjectMetadata meta = new ObjectMetadata(); @@ -394,15 +398,25 @@ public void multipartCopyObject(S3Path s3Source, S3Path s3Target, Long objectSiz // Step 3: Save upload Id. String uploadId = initResult.getUploadId(); - final int partSize = opts.getChunkSize(objectSize); + // Multipart upload and copy allows max 10_000 parts + // each part can be up to 5 GB + // Max file size is 5 TB + // See https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + final int defChunkSize = opts.getChunkSize(); + final long partSize = computePartSize(objectSize, defChunkSize); ExecutorService executor = S3OutputStream.getOrCreateExecutor(opts.getMaxThreads()); List> copyPartRequests = new ArrayList<>(); + checkPartSize(partSize); // Step 4. create copy part requests long bytePosition = 0; for (int i = 1; bytePosition < objectSize; i++) { - long lastPosition = bytePosition + partSize -1 >= objectSize ? objectSize - 1 : bytePosition + partSize - 1; + checkPartIndex(i, sourceS3Path, objectSize, partSize); + + long lastPosition = bytePosition + partSize -1; + if( lastPosition >= objectSize ) + lastPosition = objectSize - 1; CopyPartRequest copyRequest = new CopyPartRequest() .withDestinationBucketName(targetBucketName) diff --git a/plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3MultipartOptions.java b/plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3MultipartOptions.java index 1d705063d6..aefa1b60a3 100644 --- a/plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3MultipartOptions.java +++ b/plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3MultipartOptions.java @@ -30,7 +30,7 @@ public class S3MultipartOptions { private static final Logger log = LoggerFactory.getLogger(S3MultipartOptions.class); - public static final int DEFAULT_CHUNK_SIZE = 100 << 20; // 100 MB + public static final int DEFAULT_CHUNK_SIZE = 100 << 20; // 100 MiB public static final int DEFAULT_BUFFER_SIZE = 10485760; @@ -71,7 +71,7 @@ public class S3MultipartOptions { private long retrySleep; - /** + /* * initialize default values */ { @@ -100,16 +100,6 @@ public int getChunkSize() { return chunkSize; } - public int getChunkSize( long objectSize ) { - final int MAX_PARTS = 10_000; - long numOfParts = objectSize / chunkSize; - if( numOfParts > MAX_PARTS ) { - chunkSize = (int) objectSize / MAX_PARTS; - } - - return chunkSize; - } - public int getMaxThreads() { return maxThreads; } diff --git a/plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3UploadHelper.java b/plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3UploadHelper.java new file mode 100644 index 0000000000..5075f17f81 --- /dev/null +++ b/plugins/nf-amazon/src/main/com/upplication/s3fs/util/S3UploadHelper.java @@ -0,0 +1,101 @@ +/* + * Copyright 2020-2022, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.upplication.s3fs.util; + +/** + * + * @author Paolo Di Tommaso + */ +public class S3UploadHelper { + + private static final long _1_KiB = 1024; + private static final long _1_MiB = _1_KiB * _1_KiB; + private static final long _1_GiB = _1_KiB * _1_KiB * _1_KiB; + private static final long _1_TiB = _1_KiB * _1_KiB * _1_KiB * _1_KiB; + + /** + * AWS S3 max part size + * https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + */ + public static final long MIN_PART_SIZE = 5 * _1_MiB; + + /** + * AWS S3 min part size + * https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + */ + public static final long MAX_PART_SIZE = 5 * _1_GiB; + + /** + * AWS S3 max object size + * https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html + */ + public static final long MAX_OBJECT_SIZE = 5 * _1_TiB; + + /** + * AWS S3 max parts in multi-part upload and copy request + */ + public static final int MAX_PARTS_COUNT = 10_000; + + static public long computePartSize( long objectSize, long chunkSize ) { + if( objectSize<0 ) throw new IllegalArgumentException("Argument 'objectSize' cannot be less than zero"); + if( chunkSize MAX_PARTS_COUNT) { + final long x = ceilDiv(objectSize, MAX_PARTS_COUNT); + return ceilDiv(x, 10* _1_MiB) *10* _1_MiB; + } + return chunkSize; + } + + + private static long ceilDiv(long x, long y){ + return -Math.floorDiv(-x,y); + } + + private static long ceilDiv(long x, int y){ + return -Math.floorDiv(-x,y); + } + + static public void checkPartSize(long partSize) { + if( partSizeMAX_PART_SIZE ) { + String msg = String.format("The minimum part size for S3 multipart copy and upload operation cannot be less than 5 GiB -- offending value: %d", partSize); + throw new IllegalArgumentException(msg); + } + } + + static public void checkPartIndex(int i, String path, long fileSize, long chunkSize) { + if( i < 1 ) { + String msg = String.format("S3 multipart copy request index cannot less than 1 -- offending value: %d; file: '%s'; size: %d; part-size: %d", i, path, fileSize, chunkSize); + throw new IllegalArgumentException(msg); + } + if( i > MAX_PARTS_COUNT) { + String msg = String.format("S3 multipart copy request exceed the number of max allowed parts -- offending value: %d; file: '%s'; size: %d; part-size: %d", i, path, fileSize, chunkSize); + throw new IllegalArgumentException(msg); + } + } + +} diff --git a/plugins/nf-amazon/src/test/com/upplication/s3fs/AwsS3NioTest.groovy b/plugins/nf-amazon/src/test/com/upplication/s3fs/AwsS3NioTest.groovy index a165c620c4..409138a93d 100644 --- a/plugins/nf-amazon/src/test/com/upplication/s3fs/AwsS3NioTest.groovy +++ b/plugins/nf-amazon/src/test/com/upplication/s3fs/AwsS3NioTest.groovy @@ -17,6 +17,8 @@ import java.nio.file.attribute.BasicFileAttributes import com.amazonaws.services.s3.AmazonS3 import com.amazonaws.services.s3.model.Tag import groovy.util.logging.Slf4j +import nextflow.Global +import nextflow.Session import nextflow.exception.AbortOperationException import nextflow.file.CopyMoveHelper import nextflow.file.FileHelper @@ -51,6 +53,10 @@ class AwsS3NioTest extends Specification implements AwsS3BaseSpec { s3Client0 = fs.client.getClient() } + def setup() { + Global.session = Mock(Session) { getConfig() >> [:] } + } + def 'should create a blob' () { given: def bucket = createBucket() @@ -1332,7 +1338,7 @@ class AwsS3NioTest extends Specification implements AwsS3BaseSpec { TraceHelper.newFileWriter(path, false, 'Test') then: def e = thrown(AbortOperationException) - e.message == "Test file already exists: ${path.toUriString()}" + e.message == "Test file already exists: ${path.toUriString()} -- enable the 'test.overwrite' option in your config file to overwrite existing files" cleanup: deleteBucket(bucket1) diff --git a/plugins/nf-amazon/src/test/com/upplication/s3fs/util/S3UploadHelperTest.groovy b/plugins/nf-amazon/src/test/com/upplication/s3fs/util/S3UploadHelperTest.groovy new file mode 100644 index 0000000000..5da0f58c33 --- /dev/null +++ b/plugins/nf-amazon/src/test/com/upplication/s3fs/util/S3UploadHelperTest.groovy @@ -0,0 +1,128 @@ +/* + * Copyright 2020-2022, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package com.upplication.s3fs.util + +import com.amazonaws.services.s3.AmazonS3 +import com.upplication.s3fs.AmazonS3Client +import spock.lang.Shared +import spock.lang.Specification +import spock.lang.Unroll +/** + * + * @author Paolo Di Tommaso + */ +class S3UploadHelperTest extends Specification { + + @Shared final long _1_KiB = 1_024 + @Shared final long _1_MiB = _1_KiB **2 + @Shared final long _1_GiB = _1_KiB **3 + @Shared final long _1_TiB = _1_KiB **4 + + @Shared final long _10_MiB = _1_MiB * 10 + @Shared final long _100_MiB = _1_MiB * 100 + + @Unroll + def 'should compute s3 file chunk size' () { + + expect: + S3UploadHelper.computePartSize(FILE_SIZE, CHUNK_SIZE) == EXPECTED_CHUNK_SIZE + and: + def parts = FILE_SIZE / EXPECTED_CHUNK_SIZE + parts <= S3UploadHelper.MAX_PARTS_COUNT + parts > 0 + + where: + FILE_SIZE | EXPECTED_CHUNK_SIZE | CHUNK_SIZE + _1_KiB | _10_MiB | _10_MiB + _1_MiB | _10_MiB | _10_MiB + _1_GiB | _10_MiB | _10_MiB + _1_TiB | 110 * _1_MiB | _10_MiB + 5 * _1_TiB | 530 * _1_MiB | _10_MiB + 10 * _1_TiB | 1050 * _1_MiB | _10_MiB + and: + _1_KiB | _100_MiB | _100_MiB + _1_MiB | _100_MiB | _100_MiB + _1_GiB | _100_MiB | _100_MiB + _1_TiB | 110 * _1_MiB | _100_MiB + 5 * _1_TiB | 530 * _1_MiB | _100_MiB + 10 * _1_TiB | 1050 * _1_MiB | _100_MiB + + } + + + def 'should check s3 part size' () { + when: + S3UploadHelper.checkPartSize(S3UploadHelper.MIN_PART_SIZE) + then: + noExceptionThrown() + + when: + S3UploadHelper.checkPartSize(S3UploadHelper.MIN_PART_SIZE+1) + then: + noExceptionThrown() + + when: + S3UploadHelper.checkPartSize(S3UploadHelper.MAX_PART_SIZE-1) + then: + noExceptionThrown() + + when: + S3UploadHelper.checkPartSize(S3UploadHelper.MAX_PART_SIZE) + then: + noExceptionThrown() + + when: + S3UploadHelper.checkPartSize(S3UploadHelper.MAX_PART_SIZE+1) + then: + thrown(IllegalArgumentException) + + when: + S3UploadHelper.checkPartSize(S3UploadHelper.MIN_PART_SIZE-1) + then: + thrown(IllegalArgumentException) + } + + def 'should check part index' () { + given: + def client = new AmazonS3Client(Mock(AmazonS3)) + + when: + client.checkPartIndex(1, 's3://foo', 1000, 100) + then: + noExceptionThrown() + + when: + client.checkPartIndex(S3MultipartOptions.MAX_PARTS_COUNT, 's3://foo', 1000, 100) + then: + noExceptionThrown() + + when: + client.checkPartIndex(S3MultipartOptions.MAX_PARTS_COUNT+1, 's3://foo', 1000, 100) + then: + def e1 = thrown(IllegalArgumentException) + e1.message == "S3 multipart copy request exceed the number of max allowed parts -- offending value: 10001; file: 's3://foo'; size: 1000; part-size: 100" + + when: + client.checkPartIndex(0, 's3://foo', 1000, 100) + then: + def e2 = thrown(IllegalArgumentException) + e2.message == "S3 multipart copy request index cannot less than 1 -- offending value: 0; file: 's3://foo'; size: 1000; part-size: 100" + + + } +}