Skip to content

Commit

Permalink
NIFI-13776 Updated CopyS3Object to Handle Files over 5 GB
Browse files Browse the repository at this point in the history
This closes #9418

- Added multipart copying for files over 5 GB

Co-authored-by: Mike Thomsen <[email protected]>
Co-authored-by: David Handermann <[email protected]>
Signed-off-by: Joseph Witt <[email protected]>
  • Loading branch information
2 people authored and joewitt committed Oct 18, 2024
1 parent 80e8893 commit e37cd2b
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ The following binary components are provided under the Apache Software License v
Error Prone Annotations
Copyright 2015 The Error Prone Authors

The multi-part copy code in CopyS3Object was derived from code found in the Amazon S3 documentation. This is the specific file for reference:
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java#L18

This code was provided under the terms of the Apache Software License V2 per this:
https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/LICENSE

************************
Eclipse Distribution License 1.0
************************
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,21 @@
*/
package org.apache.nifi.processors.aws.s3;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
import com.amazonaws.services.s3.model.AccessControlList;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.CopyPartRequest;
import com.amazonaws.services.s3.model.CopyPartResult;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PartETag;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
Expand All @@ -30,8 +40,13 @@
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static org.apache.nifi.processors.aws.util.RegionUtilV1.S3_REGION;

Expand All @@ -40,6 +55,7 @@
@CapabilityDescription("Copies a file from one bucket and key to another in AWS S3")
@SeeAlso({PutS3Object.class, DeleteS3Object.class, ListS3.class, TagS3Object.class, DeleteS3Object.class, FetchS3Object.class})
public class CopyS3Object extends AbstractS3Processor {
public static final long MULTIPART_THRESHOLD = 5L * 1024L * 1024L * 1024L;

static final PropertyDescriptor SOURCE_BUCKET = new PropertyDescriptor.Builder()
.fromPropertyDescriptor(BUCKET_WITH_DEFAULT_VALUE)
Expand Down Expand Up @@ -120,28 +136,129 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final String destinationBucket = context.getProperty(DESTINATION_BUCKET).evaluateAttributeExpressions(flowFile).getValue();
final String destinationKey = context.getProperty(DESTINATION_KEY).evaluateAttributeExpressions(flowFile).getValue();



final AtomicReference<String> multipartIdRef = new AtomicReference<>();
boolean multipartUploadRequired = false;

try {
final CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);
final GetObjectMetadataRequest sourceMetadataRequest = new GetObjectMetadataRequest(sourceBucket, sourceKey);
final ObjectMetadata metadataResult = s3.getObjectMetadata(sourceMetadataRequest);
final long contentLength = metadataResult.getContentLength();
multipartUploadRequired = contentLength > MULTIPART_THRESHOLD;
final AccessControlList acl = createACL(context, flowFile);
if (acl != null) {
request.setAccessControlList(acl);
}

final CannedAccessControlList cannedAccessControlList = createCannedACL(context, flowFile);
if (cannedAccessControlList != null) {
request.setCannedAccessControlList(cannedAccessControlList);
}

s3.copyObject(request);
if (multipartUploadRequired) {
copyMultipart(s3, acl, cannedAccessControlList, sourceBucket, sourceKey, destinationBucket,
destinationKey, multipartIdRef, contentLength);
} else {
copyObject(s3, acl, cannedAccessControlList, sourceBucket, sourceKey, destinationBucket, destinationKey);
}
session.getProvenanceReporter().send(flowFile, getTransitUrl(destinationBucket, destinationKey));
session.transfer(flowFile, REL_SUCCESS);
} catch (final IllegalArgumentException | AmazonClientException e) {
} catch (final Exception e) {
if (multipartUploadRequired && StringUtils.isNotEmpty(multipartIdRef.get())) {
try {
final AbortMultipartUploadRequest abortRequest = new AbortMultipartUploadRequest(destinationBucket, destinationKey, multipartIdRef.get());
s3.abortMultipartUpload(abortRequest);
} catch (final AmazonS3Exception s3e) {
getLogger().warn("Abort Multipart Upload failed for Bucket [{}] Key [{}]", destinationBucket, destinationKey, s3e);
}
}

flowFile = extractExceptionDetails(e, session, flowFile);
getLogger().error("Failed to copy S3 object from Bucket [{}] Key [{}]", sourceBucket, sourceKey, e);
session.transfer(flowFile, REL_FAILURE);
}
}

/*
* Sections of this code were derived from example code from the official AWS S3 documentation. Specifically this example:
* https://github.com/awsdocs/aws-doc-sdk-examples/blob/df606a664bf2f7cfe3abc76c187e024451d0279c/java/example_code/s3/src/main/java/aws/example/s3/LowLevelMultipartCopy.java
*/
private void copyMultipart(final AmazonS3Client s3, final AccessControlList acl, final CannedAccessControlList cannedAccessControlList,
final String sourceBucket, final String sourceKey,
final String destinationBucket, final String destinationKey, final AtomicReference<String> multipartIdRef,
final long contentLength) {
InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(destinationBucket,
destinationKey);
if (acl != null) {
initRequest.setAccessControlList(acl);
}
if (cannedAccessControlList != null) {
initRequest.setCannedACL(cannedAccessControlList);
}

final InitiateMultipartUploadResult initResult = s3.initiateMultipartUpload(initRequest);

multipartIdRef.set(initResult.getUploadId());

long bytePosition = 0;
int partNumber = 1;
final List<CopyPartResult> copyPartResults = new ArrayList<>();
while (bytePosition < contentLength) {
long lastByte = Math.min(bytePosition + MULTIPART_THRESHOLD - 1, contentLength - 1);

final CopyPartRequest copyPartRequest = new CopyPartRequest()
.withSourceBucketName(sourceBucket)
.withSourceKey(sourceKey)
.withDestinationBucketName(destinationBucket)
.withDestinationKey(destinationKey)
.withUploadId(initResult.getUploadId())
.withFirstByte(bytePosition)
.withLastByte(lastByte)
.withPartNumber(partNumber++);

doRetryLoop(partRequest -> copyPartResults.add(s3.copyPart((CopyPartRequest) partRequest)), copyPartRequest);

bytePosition += MULTIPART_THRESHOLD;
}

final CompleteMultipartUploadRequest completeRequest = new CompleteMultipartUploadRequest(
destinationBucket,
destinationKey,
initResult.getUploadId(),
copyPartResults.stream().map(response -> new PartETag(response.getPartNumber(), response.getETag()))
.collect(Collectors.toList()));
doRetryLoop(complete -> s3.completeMultipartUpload(completeRequest), completeRequest);
}

private void doRetryLoop(Consumer<AmazonWebServiceRequest> consumer, AmazonWebServiceRequest request) {
boolean requestComplete = false;
int retryIndex = 0;

while (!requestComplete) {
try {
consumer.accept(request);
requestComplete = true;
} catch (AmazonS3Exception e) {
if (e.getStatusCode() == 503 && retryIndex < 3) {
retryIndex++;
} else {
throw e;
}
}
}
}

private void copyObject(final AmazonS3Client s3, final AccessControlList acl,
final CannedAccessControlList cannedAcl,
final String sourceBucket, final String sourceKey,
final String destinationBucket, final String destinationKey) {
final CopyObjectRequest request = new CopyObjectRequest(sourceBucket, sourceKey, destinationBucket, destinationKey);

if (acl != null) {
request.setAccessControlList(acl);
}

if (cannedAcl != null) {
request.setCannedAccessControlList(cannedAcl);
}

s3.copyObject(request);
}

private String getTransitUrl(final String destinationBucket, final String destinationKey) {
final String spacer = destinationKey.startsWith("/") ? "" : "/";
return String.format("s3://%s%s%s", destinationBucket, spacer, destinationKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.AmazonS3Exception;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.ObjectMetadata;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.aws.testutil.AuthUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
Expand Down Expand Up @@ -56,6 +58,12 @@ void setUp() {
@Override
protected AmazonS3Client createClient(final ProcessContext context, final AWSCredentialsProvider credentialsProvider, final Region region, final ClientConfiguration config,
final AwsClientBuilder.EndpointConfiguration endpointConfiguration) {
ObjectMetadata metadata = mock(ObjectMetadata.class);

when(metadata.getContentLength()).thenReturn(1000L);
when(mockS3Client.getObjectMetadata(any(GetObjectMetadataRequest.class)))
.thenReturn(metadata);

return mockS3Client;
}
};
Expand Down

0 comments on commit e37cd2b

Please sign in to comment.