Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Enhancement :Enable Passing of requestPath to CRT and monitor the progress using CRTs S3MetaRequestProgress #4379

Merged
merged 12 commits into from
Sep 12, 2023
Merged
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-7ffc393.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "AWS CRT-based S3 Client",
"contributor": "",
"description": "Improved CRT client uploads by directly passing requestPath, eliminating unnecessary file I/O in the Java SDK Transfer Manager."
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
<rxjava.version>2.2.21</rxjava.version>
<commons-codec.verion>1.15</commons-codec.verion>
<jmh.version>1.29</jmh.version>
<awscrt.version>0.24.0</awscrt.version>
<awscrt.version>0.26.0</awscrt.version>

<!--Test dependencies -->
<junit5.version>5.8.1</junit5.version>
Expand Down
5 changes: 5 additions & 0 deletions services-custom/s3-transfer-manager/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,11 @@
<version>${commons-codec.verion}</version>
<scope>test</scope>
</dependency>
<dependency>
<artifactId>wiremock-jre8</artifactId>
<groupId>com.github.tomakehurst</groupId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.transfer.s3;

import java.util.ArrayList;
import java.util.List;
import software.amazon.awssdk.transfer.s3.progress.TransferListener;

public class CaptureTransferListener implements TransferListener {
public Boolean isTransferInitiated() {
return transferInitiated;
}

public Boolean isTransferComplete() {
return transferComplete;
}

public List<Double> getRatioTransferredList() {
return ratioTransferredList;
}

public Throwable getExceptionCaught() {
return exceptionCaught;
}

private Boolean transferInitiated = false;
private Boolean transferComplete = false;

private List<Double> ratioTransferredList = new ArrayList<>();
private Throwable exceptionCaught;

@Override
public void transferInitiated(Context.TransferInitiated context) {
transferInitiated = true;
context.progressSnapshot().ratioTransferred().ifPresent(ratioTransferredList::add);

}

@Override
public void bytesTransferred(Context.BytesTransferred context) {
context.progressSnapshot().ratioTransferred().ifPresent(ratioTransferredList::add);
}

@Override
public void transferComplete(Context.TransferComplete context) {
context.progressSnapshot().ratioTransferred().ifPresent(ratioTransferredList::add);
transferComplete = true;
}

@Override
public void transferFailed(Context.TransferFailed context) {
exceptionCaught = context.exception();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package software.amazon.awssdk.transfer.s3;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName;

import java.io.IOException;
Expand All @@ -24,6 +25,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -64,11 +66,13 @@ public static void teardown() throws IOException {
@Test
void upload_file_SentCorrectly() throws IOException {
Map<String, String> metadata = new HashMap<>();
metadata.put("x-amz-meta-foobar", "FOO BAR");
CaptureTransferListener transferListener = new CaptureTransferListener();
metadata.put("x-amz-meta-foobar", "FOO BAR");
FileUpload fileUpload =
tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(TEST_BUCKET).key(TEST_KEY).metadata(metadata).checksumAlgorithm(ChecksumAlgorithm.CRC32))
.source(testFile.toPath())
.addTransferListener(LoggingTransferListener.create())
.addTransferListener(transferListener)
.build());

CompletedFileUpload completedFileUpload = fileUpload.completionFuture().join();
Expand All @@ -83,17 +87,29 @@ void upload_file_SentCorrectly() throws IOException {
assertThat(obj.response().responseMetadata().requestId()).isNotNull();
assertThat(obj.response().metadata()).containsEntry("foobar", "FOO BAR");
assertThat(fileUpload.progress().snapshot().sdkResponse()).isPresent();
assertListenerForSuccessfulTransferComplete(transferListener);
}

private static void assertListenerForSuccessfulTransferComplete(CaptureTransferListener transferListener) {
assertThat(transferListener.isTransferInitiated()).isTrue();
assertThat(transferListener.isTransferInitiated()).isTrue();
assertThat(transferListener.getRatioTransferredList()).isNotEmpty();
assertThat(transferListener.getRatioTransferredList().contains(0.0));
assertThat(transferListener.getRatioTransferredList().contains(100.0));
assertThat(transferListener.getExceptionCaught()).isNull();
}

@Test
void upload_asyncRequestBody_SentCorrectly() throws IOException {
String content = UUID.randomUUID().toString();
CaptureTransferListener transferListener = new CaptureTransferListener();

Upload upload =
tm.upload(UploadRequest.builder()
.putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY))
.requestBody(AsyncRequestBody.fromString(content))
.addTransferListener(LoggingTransferListener.create())
.addTransferListener(transferListener)
.build());

CompletedUpload completedUpload = upload.completionFuture().join();
Expand All @@ -107,5 +123,27 @@ void upload_asyncRequestBody_SentCorrectly() throws IOException {
.isEqualTo(ChecksumUtils.computeCheckSum(obj));
assertThat(obj.response().responseMetadata().requestId()).isNotNull();
assertThat(upload.progress().snapshot().sdkResponse()).isPresent();
assertListenerForSuccessfulTransferComplete(transferListener);

}

@Test
void upload_file_Interupted_CancelsTheListener() throws IOException, InterruptedException {
Map<String, String> metadata = new HashMap<>();
CaptureTransferListener transferListener = new CaptureTransferListener();
metadata.put("x-amz-meta-foobar", "FOO BAR");
FileUpload fileUpload =
tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(TEST_BUCKET).key(TEST_KEY).metadata(metadata).checksumAlgorithm(ChecksumAlgorithm.CRC32))
.source(testFile.toPath())
.addTransferListener(LoggingTransferListener.create())
.addTransferListener(transferListener)
.build());

fileUpload.completionFuture().cancel(true);
assertThat(transferListener.isTransferInitiated()).isTrue();
assertThat(transferListener.isTransferComplete()).isFalse();
assertThat(transferListener.getExceptionCaught()).isInstanceOf(CancellationException.class);
assertThat(transferListener.getRatioTransferredList().get(transferListener.getRatioTransferredList().size() - 1))
.isNotEqualTo(100.0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package software.amazon.awssdk.transfer.s3.internal;

import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER;
import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.DEFAULT_FILE_UPLOAD_CHUNK_SIZE;
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn;
import static software.amazon.awssdk.transfer.s3.internal.utils.FileUtils.fileNotModified;

Expand All @@ -27,7 +27,6 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -65,29 +64,25 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
Validate.paramNotNull(uploadFileRequest, "uploadFileRequest");
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();

AsyncRequestBody requestBody =
FileAsyncRequestBody.builder()
.path(uploadFileRequest.source())
.chunkSizeInBytes(DEFAULT_FILE_UPLOAD_CHUNK_SIZE)
.build();
Long fileContentLength = AsyncRequestBody.fromFile(uploadFileRequest.source()).contentLength().orElse(null);
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, fileContentLength);

Consumer<SdkHttpExecutionAttributes.Builder> attachObservable =
b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable);
b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable)
.put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener());

PutObjectRequest putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservable);

CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();

TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody);
progressUpdater.transferInitiated();
requestBody = progressUpdater.wrapRequestBody(requestBody);
progressUpdater.registerCompletion(returnFuture);

try {
assertNotUnsupportedArn(putObjectRequest.bucket(), "upload");

CompletableFuture<PutObjectResponse> crtFuture =
s3AsyncClient.putObject(putObjectRequest, requestBody);
s3AsyncClient.putObject(putObjectRequest, uploadFileRequest.source());

// Forward upload cancellation to CRT future
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ public Upload upload(UploadRequest uploadRequest) {

CompletableFuture<CompletedUpload> returnFuture = new CompletableFuture<>();

TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest, requestBody);
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest,
requestBody.contentLength().orElse(null));
progressUpdater.transferInitiated();
requestBody = progressUpdater.wrapRequestBody(requestBody);
progressUpdater.registerCompletion(returnFuture);
Expand Down Expand Up @@ -159,7 +160,8 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {

CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();

TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody);
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest,
requestBody.contentLength().orElse(null));
progressUpdater.transferInitiated();
requestBody = progressUpdater.wrapRequestBody(requestBody);
progressUpdater.registerCompletion(returnFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.listener.AsyncRequestBodyListener;
import software.amazon.awssdk.core.async.listener.AsyncResponseTransformerListener;
import software.amazon.awssdk.core.async.listener.PublisherListener;
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.transfer.s3.model.CompletedObjectTransfer;
import software.amazon.awssdk.transfer.s3.model.TransferObjectRequest;
Expand All @@ -43,10 +45,10 @@ public class TransferProgressUpdater {
private final CompletableFuture<Void> endOfStreamFuture;

public TransferProgressUpdater(TransferObjectRequest request,
AsyncRequestBody requestBody) {
Long contentLength) {
DefaultTransferProgressSnapshot.Builder snapshotBuilder = DefaultTransferProgressSnapshot.builder();
snapshotBuilder.transferredBytes(0L);
getContentLengthSafe(requestBody).ifPresent(snapshotBuilder::totalBytes);
Optional.ofNullable(contentLength).ifPresent(snapshotBuilder::totalBytes);
TransferProgressSnapshot snapshot = snapshotBuilder.build();
progress = new DefaultTransferProgress(snapshot);
context = TransferListenerContext.builder()
Expand Down Expand Up @@ -95,6 +97,31 @@ public void subscriberOnComplete() {
});
}

public PublisherListener<S3MetaRequestProgress> crtProgressListener() {

return new PublisherListener<S3MetaRequestProgress>() {
@Override
public void publisherSubscribe(Subscriber<? super S3MetaRequestProgress> subscriber) {
resetBytesTransferred();
}

@Override
public void subscriberOnNext(S3MetaRequestProgress s3MetaRequestProgress) {
incrementBytesTransferred(s3MetaRequestProgress.getBytesTransferred());
}

@Override
public void subscriberOnError(Throwable t) {
transferFailed(t);
}

@Override
public void subscriberOnComplete() {
endOfStreamFuture.complete(null);
}
};
}

public <ResultT> AsyncResponseTransformer<GetObjectResponse, ResultT> wrapResponseTransformer(
AsyncResponseTransformer<GetObjectResponse, ResultT> responseTransformer) {
return AsyncResponseTransformerListener.wrap(
Expand Down Expand Up @@ -138,7 +165,7 @@ private void resetBytesTransferred() {
progress.updateAndGet(b -> b.transferredBytes(0L));
}

private void incrementBytesTransferred(int numBytes) {
private void incrementBytesTransferred(long numBytes) {
TransferProgressSnapshot snapshot = progress.updateAndGet(b -> {
b.transferredBytes(b.getTransferredBytes() + numBytes);
});
Expand Down Expand Up @@ -181,18 +208,4 @@ private void transferFailed(Throwable t) {
.exception(t)
.build());
}

private static Optional<Long> getContentLengthSafe(AsyncRequestBody requestBody) {
if (requestBody == null) {
return Optional.empty();
}
// requestBody.contentLength() may throw if the file does not exist.
// We ignore any potential exception here to defer failure
// to the s3CrtAsyncClient call and its associated future.
try {
return requestBody.contentLength();
} catch (Exception ignored) {
return Optional.empty();
}
}
}
Loading
Loading