From 0e284411e40fb546e13549d3c587bded51802f7e Mon Sep 17 00:00:00 2001 From: John Viegas Date: Thu, 31 Aug 2023 11:45:55 -0700 Subject: [PATCH 1/8] Performance Enhancement :Enable Passing of requestPath to CRT, Monitor Progress Updates for putObject Operations --- .../bugfix-S3TransferManager-7ffc393.json | 6 ++ pom.xml | 2 +- .../s3/internal/CrtS3TransferManager.java | 18 ++--- .../progress/TransferProgressUpdater.java | 35 ++++++++ .../s3/internal/crt/S3CrtAsyncHttpClient.java | 17 +++- .../internal/crt/S3CrtProgressListener.java | 80 +++++++++++++++++++ .../crt/S3CrtResponseHandlerAdapter.java | 17 +++- .../S3InternalSdkHttpExecutionAttribute.java | 9 +++ .../crt/S3CrtResponseHandlerAdapterTest.java | 4 +- 9 files changed, 168 insertions(+), 20 deletions(-) create mode 100644 .changes/next-release/bugfix-S3TransferManager-7ffc393.json create mode 100644 services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtProgressListener.java diff --git a/.changes/next-release/bugfix-S3TransferManager-7ffc393.json b/.changes/next-release/bugfix-S3TransferManager-7ffc393.json new file mode 100644 index 000000000000..8b7a29abf28a --- /dev/null +++ b/.changes/next-release/bugfix-S3TransferManager-7ffc393.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "S3 Transfer Manager", + "contributor": "", + "description": "Fix for [Issue#4168](https://github.com/aws/aws-sdk-java-v2/issues/4168) by enabling the direct passing of requestPath to the AWS Common Runtime (CRT). This ensures that Java SDK Transfer Manager no longer performs file I/O when using CRT, regardless." +} diff --git a/pom.xml b/pom.xml index d9809eea43f0..0e18e170629c 100644 --- a/pom.xml +++ b/pom.xml @@ -115,7 +115,7 @@ 2.2.21 1.15 1.29 - 0.24.0 + 0.25.1 5.8.1 diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java index f118c84693f2..be6900ec5c89 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java @@ -17,8 +17,9 @@ import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN; +import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE; -import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.DEFAULT_FILE_UPLOAD_CHUNK_SIZE; +import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SOURCE_REQ_PATH; import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn; import static software.amazon.awssdk.transfer.s3.internal.utils.FileUtils.fileNotModified; @@ -27,7 +28,6 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration; import software.amazon.awssdk.core.async.AsyncRequestBody; -import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody; import software.amazon.awssdk.crt.s3.ResumeToken; import software.amazon.awssdk.http.SdkHttpExecutionAttributes; import software.amazon.awssdk.services.s3.S3AsyncClient; @@ -65,29 +65,25 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) { Validate.paramNotNull(uploadFileRequest, "uploadFileRequest"); S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable(); - AsyncRequestBody requestBody = - FileAsyncRequestBody.builder() - .path(uploadFileRequest.source()) - .chunkSizeInBytes(DEFAULT_FILE_UPLOAD_CHUNK_SIZE) - .build(); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, null); Consumer attachObservable = - b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable); + b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable) + .put(SOURCE_REQ_PATH, uploadFileRequest.source()) + .put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener()); PutObjectRequest putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservable); CompletableFuture returnFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody); progressUpdater.transferInitiated(); - requestBody = progressUpdater.wrapRequestBody(requestBody); progressUpdater.registerCompletion(returnFuture); try { assertNotUnsupportedArn(putObjectRequest.bucket(), "upload"); CompletableFuture crtFuture = - s3AsyncClient.putObject(putObjectRequest, requestBody); + s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromFile(uploadFileRequest.source())); // Forward upload cancellation to CRT future CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java index 64be49a92388..0019aaa7bb17 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java @@ -25,6 +25,8 @@ import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.core.async.listener.AsyncRequestBodyListener; import software.amazon.awssdk.core.async.listener.AsyncResponseTransformerListener; +import software.amazon.awssdk.core.async.listener.PublisherListener; +import software.amazon.awssdk.crt.s3.S3MetaRequestProgress; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.transfer.s3.model.CompletedObjectTransfer; import software.amazon.awssdk.transfer.s3.model.TransferObjectRequest; @@ -95,6 +97,32 @@ public void subscriberOnComplete() { }); } + public PublisherListener crtProgressListener() { + + return new PublisherListener() { + @Override + public void publisherSubscribe(Subscriber subscriber) { + resetBytesTransferred(); + } + + @Override + public void subscriberOnNext(S3MetaRequestProgress s3MetaRequestProgress) { + incrementBytesTransferred(Math.toIntExact(s3MetaRequestProgress.getBytesTransferred()), + s3MetaRequestProgress.getContentLength()); + } + + @Override + public void subscriberOnError(Throwable t) { + transferFailed(t); + } + + @Override + public void subscriberOnComplete() { + endOfStreamFuture.complete(null); + } + }; + } + public AsyncResponseTransformer wrapResponseTransformer( AsyncResponseTransformer responseTransformer) { return AsyncResponseTransformerListener.wrap( @@ -145,6 +173,13 @@ private void incrementBytesTransferred(int numBytes) { listenerInvoker.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); } + private void incrementBytesTransferred(int numBytes, long totalBytes) { + TransferProgressSnapshot snapshot = progress.updateAndGet(b -> { + b.transferredBytes(b.getTransferredBytes() + numBytes).totalBytes(totalBytes); + }); + listenerInvoker.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); + } + public void registerCompletion(CompletableFuture future) { future.whenComplete((r, t) -> { if (t == null) { diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java index f8bf0d809ff1..f32c30c61ad0 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java @@ -17,13 +17,16 @@ import static software.amazon.awssdk.services.s3.internal.crt.CrtChecksumUtils.checksumConfig; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN; +import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION; +import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SOURCE_REQ_PATH; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; import java.net.URI; +import java.nio.file.Path; import java.time.Duration; import java.util.ArrayList; import java.util.List; @@ -115,14 +118,16 @@ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) { URI uri = asyncRequest.request().getUri(); HttpRequest httpRequest = toCrtRequest(asyncRequest); S3CrtResponseHandlerAdapter responseHandler = - new S3CrtResponseHandlerAdapter(executeFuture, asyncRequest.responseHandler()); + new S3CrtResponseHandlerAdapter(executeFuture, + asyncRequest.responseHandler(), + asyncRequest.httpExecutionAttributes().getAttribute(CRT_PROGRESS_LISTENER)); S3MetaRequestOptions.MetaRequestType requestType = requestType(asyncRequest); HttpChecksum httpChecksum = asyncRequest.httpExecutionAttributes().getAttribute(HTTP_CHECKSUM); ResumeToken resumeToken = asyncRequest.httpExecutionAttributes().getAttribute(CRT_PAUSE_RESUME_TOKEN); Region signingRegion = asyncRequest.httpExecutionAttributes().getAttribute(SIGNING_REGION); - + Path requestFilePath = asyncRequest.httpExecutionAttributes().getAttribute(SOURCE_REQ_PATH); ChecksumConfig checksumConfig = checksumConfig(httpChecksum, requestType, s3NativeClientConfiguration.checksumValidationEnabled()); URI endpoint = getEndpoint(uri); @@ -133,7 +138,8 @@ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) { .withChecksumConfig(checksumConfig) .withEndpoint(endpoint) .withResponseHandler(responseHandler) - .withResumeToken(resumeToken); + .withResumeToken(resumeToken) + .withRequestFilePath(requestFilePath); // Create a new SigningConfig object only if the signing region has changed from the previously configured region. if (signingRegion != null && !s3ClientOptions.getRegion().equals(signingRegion.id())) { @@ -196,6 +202,8 @@ private static void addCancelCallback(CompletableFuture executeFuture, private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) { SdkHttpRequest sdkRequest = asyncRequest.request(); + Path requestFilePath = asyncRequest.httpExecutionAttributes().getAttribute(SOURCE_REQ_PATH); + String method = sdkRequest.method().name(); String encodedPath = sdkRequest.encodedPath(); if (encodedPath == null || encodedPath.isEmpty()) { @@ -208,8 +216,9 @@ private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) { HttpHeader[] crtHeaderArray = createHttpHeaderList(asyncRequest).toArray(new HttpHeader[0]); + S3CrtRequestBodyStreamAdapter sdkToCrtRequestPublisher = - new S3CrtRequestBodyStreamAdapter(asyncRequest.requestContentPublisher()); + requestFilePath == null ? new S3CrtRequestBodyStreamAdapter(asyncRequest.requestContentPublisher()) : null; return new HttpRequest(method, encodedPath + encodedQueryString, crtHeaderArray, sdkToCrtRequestPublisher); } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtProgressListener.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtProgressListener.java new file mode 100644 index 000000000000..0b216b5b7642 --- /dev/null +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtProgressListener.java @@ -0,0 +1,80 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.internal.crt; + +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.async.listener.PublisherListener; +import software.amazon.awssdk.crt.s3.S3MetaRequestProgress; + +/** + * S3CrtProgressListener delegates events to the underlying delegateListener if defined, avoiding null checks for API when using + * S3MetaRequestProgress for GET calls. + */ +@SdkInternalApi +public final class S3CrtProgressListener implements PublisherListener { + + PublisherListener delegateListener; + + private S3CrtProgressListener(Builder builder) { + this.delegateListener = builder.delegateListener; + } + + static Builder builder() { + return new Builder(); + } + + @Override + public void subscriberOnNext(S3MetaRequestProgress s3MetaRequestProgress) { + if (delegateListener != null) { + delegateListener.subscriberOnNext(s3MetaRequestProgress); + } + } + + @Override + public void subscriberOnComplete() { + if (delegateListener != null) { + delegateListener.subscriberOnComplete(); + } + } + + @Override + public void subscriberOnError(Throwable t) { + if (delegateListener != null) { + delegateListener.subscriberOnError(t); + } + } + + @Override + public void subscriptionCancel() { + if (delegateListener != null) { + delegateListener.subscriptionCancel(); + } + } + + static class Builder { + + PublisherListener delegateListener; + + Builder delegateListener(PublisherListener delegateListener) { + this.delegateListener = delegateListener; + return this; + } + + public S3CrtProgressListener build() { + return new S3CrtProgressListener(this); + } + } +} \ No newline at end of file diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java index f743c0f3bc3c..e3c028e26fe4 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java @@ -20,11 +20,13 @@ import java.nio.ByteBuffer; import java.util.concurrent.CompletableFuture; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.async.listener.PublisherListener; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.crt.CRT; import software.amazon.awssdk.crt.http.HttpHeader; import software.amazon.awssdk.crt.s3.S3FinishedResponseContext; import software.amazon.awssdk.crt.s3.S3MetaRequest; +import software.amazon.awssdk.crt.s3.S3MetaRequestProgress; import software.amazon.awssdk.crt.s3.S3MetaRequestResponseHandler; import software.amazon.awssdk.http.SdkCancellationException; import software.amazon.awssdk.http.SdkHttpResponse; @@ -46,9 +48,14 @@ public final class S3CrtResponseHandlerAdapter implements S3MetaRequestResponseH private final SdkHttpResponse.Builder respBuilder = SdkHttpResponse.builder(); private volatile S3MetaRequest metaRequest; - public S3CrtResponseHandlerAdapter(CompletableFuture executeFuture, SdkAsyncHttpResponseHandler responseHandler) { + private final PublisherListener progressListener; + + public S3CrtResponseHandlerAdapter(CompletableFuture executeFuture, + SdkAsyncHttpResponseHandler responseHandler, + PublisherListener progressListener) { this.resultFuture = executeFuture; this.responseHandler = responseHandler; + this.progressListener = S3CrtProgressListener.builder().delegateListener(progressListener).build(); } @Override @@ -103,7 +110,7 @@ private void onSuccessfulResponseComplete() { failResponseHandlerAndFuture(failure); return; } - + this.progressListener.subscriberOnComplete(); completeFutureAndCloseRequest(); }); } @@ -145,6 +152,7 @@ private void onErrorResponseComplete(byte[] errorPayload) { } private void failResponseHandlerAndFuture(Throwable exception) { + this.progressListener.subscriberOnError(exception); resultFuture.completeExceptionally(exception); runAndLogError(log.logger(), "Exception thrown in SdkAsyncHttpResponseHandler#onError, ignoring", () -> responseHandler.onError(exception)); @@ -159,4 +167,9 @@ private static boolean isErrorResponse(int responseStatus) { public void metaRequest(S3MetaRequest s3MetaRequest) { metaRequest = s3MetaRequest; } + + @Override + public void onProgress(S3MetaRequestProgress progress) { + this.progressListener.subscriberOnNext(progress); + } } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3InternalSdkHttpExecutionAttribute.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3InternalSdkHttpExecutionAttribute.java index f7b817ab9ad2..41f2151f439c 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3InternalSdkHttpExecutionAttribute.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3InternalSdkHttpExecutionAttribute.java @@ -15,7 +15,9 @@ package software.amazon.awssdk.services.s3.internal.crt; +import java.nio.file.Path; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.async.listener.PublisherListener; import software.amazon.awssdk.core.interceptor.trait.HttpChecksum; import software.amazon.awssdk.crt.s3.ResumeToken; import software.amazon.awssdk.http.SdkHttpExecutionAttribute; @@ -41,6 +43,13 @@ public final class S3InternalSdkHttpExecutionAttribute extends SdkHttpExecuti public static final S3InternalSdkHttpExecutionAttribute SIGNING_REGION = new S3InternalSdkHttpExecutionAttribute<>(Region.class); + public static final S3InternalSdkHttpExecutionAttribute SOURCE_REQ_PATH = + new S3InternalSdkHttpExecutionAttribute<>(Path.class); + + + public static final S3InternalSdkHttpExecutionAttribute CRT_PROGRESS_LISTENER = + new S3InternalSdkHttpExecutionAttribute<>(PublisherListener.class); + private S3InternalSdkHttpExecutionAttribute(Class valueClass) { super(valueClass); } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapterTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapterTest.java index d6f263886c05..681eef34b9fa 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapterTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapterTest.java @@ -61,8 +61,8 @@ public void setup() { future = new CompletableFuture<>(); sdkResponseHandler = new TestResponseHandler(); responseHandlerAdapter = new S3CrtResponseHandlerAdapter(future, - sdkResponseHandler); - + sdkResponseHandler, + null); responseHandlerAdapter.metaRequest(s3MetaRequest); } From 66467b5ffb42d0febeda3d4da9a07fc3cfc2c843 Mon Sep 17 00:00:00 2001 From: John Viegas Date: Wed, 6 Sep 2023 13:55:10 -0700 Subject: [PATCH 2/8] updated the change log --- .changes/next-release/bugfix-S3TransferManager-7ffc393.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changes/next-release/bugfix-S3TransferManager-7ffc393.json b/.changes/next-release/bugfix-S3TransferManager-7ffc393.json index 8b7a29abf28a..e96994be05c1 100644 --- a/.changes/next-release/bugfix-S3TransferManager-7ffc393.json +++ b/.changes/next-release/bugfix-S3TransferManager-7ffc393.json @@ -2,5 +2,5 @@ "type": "bugfix", "category": "S3 Transfer Manager", "contributor": "", - "description": "Fix for [Issue#4168](https://github.com/aws/aws-sdk-java-v2/issues/4168) by enabling the direct passing of requestPath to the AWS Common Runtime (CRT). This ensures that Java SDK Transfer Manager no longer performs file I/O when using CRT, regardless." + "description": "Improved CRT client uploads by directly passing requestPath, eliminating unnecessary file I/O in the Java SDK Transfer Manager." } From 6af33970af3c5b88ade8cc5c8cab275a75647277 Mon Sep 17 00:00:00 2001 From: John Viegas Date: Thu, 7 Sep 2023 09:30:06 -0700 Subject: [PATCH 3/8] updated with Matts review comments --- .../bugfix-S3TransferManager-7ffc393.json | 2 +- .../s3/internal/CrtS3TransferManager.java | 13 +-- .../s3/internal/GenericS3TransferManager.java | 6 +- .../progress/TransferProgressUpdater.java | 14 +--- ...ansferManagerUploadPauseAndResumeTest.java | 12 +-- .../S3CrtClientPutObjectIntegrationTest.java | 23 ++++++ .../crt/S3CrtSdkHttpExecutionAttribute.java | 38 +++++++++ ...ContentLengthOnlyAsyncFileRequestBody.java | 54 +++++++++++++ .../internal/crt/DefaultS3CrtAsyncClient.java | 19 +++++ .../s3/internal/crt/S3CrtAsyncHttpClient.java | 10 +-- .../internal/crt/S3CrtProgressListener.java | 80 ------------------- .../crt/S3CrtResponseHandlerAdapter.java | 5 +- .../S3InternalSdkHttpExecutionAttribute.java | 8 +- 13 files changed, 166 insertions(+), 118 deletions(-) create mode 100644 services/s3/src/main/java/software/amazon/awssdk/services/s3/crt/S3CrtSdkHttpExecutionAttribute.java create mode 100644 services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/CrtContentLengthOnlyAsyncFileRequestBody.java delete mode 100644 services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtProgressListener.java diff --git a/.changes/next-release/bugfix-S3TransferManager-7ffc393.json b/.changes/next-release/bugfix-S3TransferManager-7ffc393.json index e96994be05c1..b251e7394dc4 100644 --- a/.changes/next-release/bugfix-S3TransferManager-7ffc393.json +++ b/.changes/next-release/bugfix-S3TransferManager-7ffc393.json @@ -1,6 +1,6 @@ { "type": "bugfix", - "category": "S3 Transfer Manager", + "category": "AWS CRT-based S3 Client", "contributor": "", "description": "Improved CRT client uploads by directly passing requestPath, eliminating unnecessary file I/O in the Java SDK Transfer Manager." } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java index be6900ec5c89..2d7b809d8251 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/CrtS3TransferManager.java @@ -16,13 +16,13 @@ package software.amazon.awssdk.transfer.s3.internal; import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES; +import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER; +import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN; -import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER; -import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE; -import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SOURCE_REQ_PATH; import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn; import static software.amazon.awssdk.transfer.s3.internal.utils.FileUtils.fileNotModified; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Consumer; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -31,6 +31,7 @@ import software.amazon.awssdk.crt.s3.ResumeToken; import software.amazon.awssdk.http.SdkHttpExecutionAttributes; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.internal.crt.CrtContentLengthOnlyAsyncFileRequestBody; import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; @@ -65,11 +66,11 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) { Validate.paramNotNull(uploadFileRequest, "uploadFileRequest"); S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, null); + Long fileContentLength = AsyncRequestBody.fromFile(uploadFileRequest.source()).contentLength().orElse(null); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, fileContentLength); Consumer attachObservable = b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable) - .put(SOURCE_REQ_PATH, uploadFileRequest.source()) .put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener()); PutObjectRequest putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservable); @@ -83,7 +84,7 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) { assertNotUnsupportedArn(putObjectRequest.bucket(), "upload"); CompletableFuture crtFuture = - s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromFile(uploadFileRequest.source())); + s3AsyncClient.putObject(putObjectRequest, uploadFileRequest.source()); // Forward upload cancellation to CRT future CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java index 83d63a2a48ff..b06d0824b709 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/GenericS3TransferManager.java @@ -120,7 +120,8 @@ public Upload upload(UploadRequest uploadRequest) { CompletableFuture returnFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest, requestBody); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadRequest, + requestBody.contentLength().orElse(null)); progressUpdater.transferInitiated(); requestBody = progressUpdater.wrapRequestBody(requestBody); progressUpdater.registerCompletion(returnFuture); @@ -159,7 +160,8 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) { CompletableFuture returnFuture = new CompletableFuture<>(); - TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody); + TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, + requestBody.contentLength().orElse(null)); progressUpdater.transferInitiated(); requestBody = progressUpdater.wrapRequestBody(requestBody); progressUpdater.registerCompletion(returnFuture); diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java index 0019aaa7bb17..8b5f7cce741d 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java @@ -45,10 +45,10 @@ public class TransferProgressUpdater { private final CompletableFuture endOfStreamFuture; public TransferProgressUpdater(TransferObjectRequest request, - AsyncRequestBody requestBody) { + Long contentLength) { DefaultTransferProgressSnapshot.Builder snapshotBuilder = DefaultTransferProgressSnapshot.builder(); snapshotBuilder.transferredBytes(0L); - getContentLengthSafe(requestBody).ifPresent(snapshotBuilder::totalBytes); + Optional.ofNullable(contentLength).ifPresent(snapshotBuilder::totalBytes); TransferProgressSnapshot snapshot = snapshotBuilder.build(); progress = new DefaultTransferProgress(snapshot); context = TransferListenerContext.builder() @@ -107,8 +107,7 @@ public void publisherSubscribe(Subscriber subscri @Override public void subscriberOnNext(S3MetaRequestProgress s3MetaRequestProgress) { - incrementBytesTransferred(Math.toIntExact(s3MetaRequestProgress.getBytesTransferred()), - s3MetaRequestProgress.getContentLength()); + incrementBytesTransferred(Math.toIntExact(s3MetaRequestProgress.getBytesTransferred())); } @Override @@ -173,13 +172,6 @@ private void incrementBytesTransferred(int numBytes) { listenerInvoker.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); } - private void incrementBytesTransferred(int numBytes, long totalBytes) { - TransferProgressSnapshot snapshot = progress.updateAndGet(b -> { - b.transferredBytes(b.getTransferredBytes() + numBytes).totalBytes(totalBytes); - }); - listenerInvoker.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot))); - } - public void registerCompletion(CompletableFuture future) { future.whenComplete((r, t) -> { if (t == null) { diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerUploadPauseAndResumeTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerUploadPauseAndResumeTest.java index d1d998c055d8..351fd03f7495 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerUploadPauseAndResumeTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3TransferManagerUploadPauseAndResumeTest.java @@ -28,6 +28,7 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -87,7 +88,7 @@ void resumeUploadFile_noResumeToken_shouldUploadFromBeginning() { .build(); - when(mockS3Crt.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))) + when(mockS3Crt.putObject(any(PutObjectRequest.class), any(Path.class))) .thenReturn(CompletableFuture.completedFuture(response)); CompletedFileUpload completedFileUpload = tm.resumeUploadFile(r -> r.fileLength(fileLength) @@ -112,7 +113,7 @@ void resumeUploadFile_fileModified_shouldAbortExistingAndUploadFromBeginning() { .build(); - when(mockS3Crt.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))) + when(mockS3Crt.putObject(any(PutObjectRequest.class), any(Path.class))) .thenReturn(CompletableFuture.completedFuture(response)); when(mockS3Crt.abortMultipartUpload(any(AbortMultipartUploadRequest.class))) @@ -151,9 +152,10 @@ void resumeUploadFile_hasValidResumeToken_shouldResumeUpload() { .build(); - when(mockS3Crt.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))) + when(mockS3Crt.putObject(any(PutObjectRequest.class), any(Path.class))) .thenReturn(CompletableFuture.completedFuture(response)); + String multipartId = "someId"; long totalParts = 10L; long partSizeInBytes = 8 * MB; @@ -169,7 +171,7 @@ void resumeUploadFile_hasValidResumeToken_shouldResumeUpload() { ArgumentCaptor putObjectRequestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); - verify(mockS3Crt).putObject(putObjectRequestArgumentCaptor.capture(), any(AsyncRequestBody.class)); + verify(mockS3Crt).putObject(putObjectRequestArgumentCaptor.capture(), any(Path.class)); PutObjectRequest actualRequest = putObjectRequestArgumentCaptor.getValue(); AwsRequestOverrideConfiguration awsRequestOverrideConfiguration = actualRequest.overrideConfiguration().get(); SdkHttpExecutionAttributes attribute = @@ -185,7 +187,7 @@ void resumeUploadFile_hasValidResumeToken_shouldResumeUpload() { private void verifyActualPutObjectRequestNotResumed() { ArgumentCaptor putObjectRequestArgumentCaptor = ArgumentCaptor.forClass(PutObjectRequest.class); - verify(mockS3Crt).putObject(putObjectRequestArgumentCaptor.capture(), any(AsyncRequestBody.class)); + verify(mockS3Crt).putObject(putObjectRequestArgumentCaptor.capture(), any(Path.class)); PutObjectRequest actualRequest = putObjectRequestArgumentCaptor.getValue(); AwsRequestOverrideConfiguration awsRequestOverrideConfiguration = actualRequest.overrideConfiguration().get(); SdkHttpExecutionAttributes attribute = diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientPutObjectIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientPutObjectIntegrationTest.java index f81e700395eb..133b9da0a858 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientPutObjectIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientPutObjectIntegrationTest.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.services.s3.crt; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import io.reactivex.Flowable; @@ -24,6 +25,7 @@ import java.util.List; import java.util.Optional; import java.util.Random; +import java.util.concurrent.CompletionException; import java.util.stream.Collectors; import java.util.stream.Stream; import org.assertj.core.api.Assertions; @@ -34,10 +36,12 @@ import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.ResponseInputStream; import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.crt.CrtResource; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3IntegrationTestBase; +import software.amazon.awssdk.services.s3.internal.crt.CrtContentLengthOnlyAsyncFileRequestBody; import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.utils.ChecksumUtils; @@ -86,6 +90,25 @@ void putObject_fileRequestBody_objectSentCorrectly() throws Exception { Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum); } + @Test + void putObject_file_objectSentCorrectly() throws Exception { + s3Crt.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), testFile.toPath()).join(); + ResponseInputStream objContent = S3IntegrationTestBase.s3.getObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + ResponseTransformer.toInputStream()); + byte[] expectedSum = ChecksumUtils.computeCheckSum(Files.newInputStream(testFile.toPath())); + Assertions.assertThat(ChecksumUtils.computeCheckSum(objContent)).isEqualTo(expectedSum); + } + + + @Test + void putObject_failsFor_CrtContentLengthOnlyAsyncFileRequestBody() { + assertThatThrownBy(() -> + s3Crt.putObject(r -> r.bucket(TEST_BUCKET).key(TEST_KEY), + new CrtContentLengthOnlyAsyncFileRequestBody(testFile.toPath())).join()) + .isInstanceOf(CompletionException.class) + .hasCauseInstanceOf(SdkClientException.class); + } + @Test void putObject_byteBufferBody_objectSentCorrectly() { byte[] data = new byte[16384]; diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/crt/S3CrtSdkHttpExecutionAttribute.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/crt/S3CrtSdkHttpExecutionAttribute.java new file mode 100644 index 000000000000..5f1e33258e65 --- /dev/null +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/crt/S3CrtSdkHttpExecutionAttribute.java @@ -0,0 +1,38 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.crt; + + +import software.amazon.awssdk.annotations.SdkProtectedApi; +import software.amazon.awssdk.core.async.listener.PublisherListener; +import software.amazon.awssdk.http.SdkHttpExecutionAttribute; +import software.amazon.awssdk.services.s3.internal.crt.S3MetaRequestPauseObservable; + +@SdkProtectedApi +public final class S3CrtSdkHttpExecutionAttribute extends SdkHttpExecutionAttribute { + + public static final S3CrtSdkHttpExecutionAttribute METAREQUEST_PAUSE_OBSERVABLE = + new S3CrtSdkHttpExecutionAttribute<>(S3MetaRequestPauseObservable.class); + + public static final S3CrtSdkHttpExecutionAttribute CRT_PROGRESS_LISTENER = + new S3CrtSdkHttpExecutionAttribute<>(PublisherListener.class); + + private S3CrtSdkHttpExecutionAttribute(Class valueClass) { + super(valueClass); + } + + +} diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/CrtContentLengthOnlyAsyncFileRequestBody.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/CrtContentLengthOnlyAsyncFileRequestBody.java new file mode 100644 index 000000000000..72e8b0dba363 --- /dev/null +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/CrtContentLengthOnlyAsyncFileRequestBody.java @@ -0,0 +1,54 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services.s3.internal.crt; + +import java.nio.ByteBuffer; +import java.nio.file.Path; +import java.util.Optional; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.async.AsyncRequestBody; + +@SdkInternalApi +public final class CrtContentLengthOnlyAsyncFileRequestBody implements AsyncRequestBody { + private final AsyncRequestBody asyncRequestBody; + + public CrtContentLengthOnlyAsyncFileRequestBody(Path path) { + this.asyncRequestBody = AsyncRequestBody.fromFile(path); + } + + @Override + public Optional contentLength() { + return asyncRequestBody.contentLength(); + } + + @Override + public void subscribe(Subscriber subscriber) { + subscriber.onSubscribe(new Subscription() { + @Override + public void request(long l) { + subscriber.onError(new IllegalStateException("subscription not supported")); + } + + @Override + public void cancel() { + + } + }); + + } +} diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java index 284748f163bd..0ebc219faf86 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java @@ -22,6 +22,7 @@ import static software.amazon.awssdk.services.s3.internal.crt.S3NativeClientConfiguration.DEFAULT_PART_SIZE_IN_BYTES; import java.net.URI; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -36,6 +37,7 @@ import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration; import software.amazon.awssdk.core.client.config.SdkAdvancedClientOption; import software.amazon.awssdk.core.interceptor.Context; +import software.amazon.awssdk.core.interceptor.ExecutionAttribute; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute; @@ -58,11 +60,13 @@ import software.amazon.awssdk.services.s3.model.CopyObjectResponse; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectResponse; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.Validate; @SdkInternalApi public final class DefaultS3CrtAsyncClient extends DelegatingS3AsyncClient implements S3CrtAsyncClient { + public static final ExecutionAttribute OBJECT_FILE_PATH = new ExecutionAttribute<>("objectFilePath"); private static final String CRT_CLIENT_CLASSPATH = "software.amazon.awssdk.crt.s3.S3Client"; private final CopyObjectHelper copyObjectHelper; @@ -76,6 +80,19 @@ private DefaultS3CrtAsyncClient(DefaultS3CrtClientBuilder builder) { thresholdInBytes); } + @Override + public CompletableFuture putObject(PutObjectRequest putObjectRequest, Path sourcePath) { + AwsRequestOverrideConfiguration overrideConfig = + putObjectRequest.overrideConfiguration() + .map(config -> config.toBuilder().putExecutionAttribute(OBJECT_FILE_PATH, sourcePath)) + .orElseGet(() -> AwsRequestOverrideConfiguration.builder() + .putExecutionAttribute(OBJECT_FILE_PATH, sourcePath)) + .build(); + + return putObject(putObjectRequest.toBuilder().overrideConfiguration(overrideConfig).build(), + new CrtContentLengthOnlyAsyncFileRequestBody(sourcePath)); + } + @Override public CompletableFuture copyObject(CopyObjectRequest copyObjectRequest) { return copyObjectHelper.copyObject(copyObjectRequest); @@ -308,6 +325,8 @@ public void afterMarshalling(Context.AfterMarshalling context, executionAttributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME)) .put(HTTP_CHECKSUM, executionAttributes.getAttribute(SdkInternalExecutionAttribute.HTTP_CHECKSUM)) .put(SIGNING_REGION, executionAttributes.getAttribute(AwsSignerExecutionAttribute.SIGNING_REGION)) + .put(S3InternalSdkHttpExecutionAttribute.OBJECT_FILE_PATH, + executionAttributes.getAttribute(OBJECT_FILE_PATH)) .build(); // For putObject and getObject, we rely on CRT to perform checksum validation diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java index f32c30c61ad0..e8974d57d0af 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtAsyncHttpClient.java @@ -15,14 +15,14 @@ package software.amazon.awssdk.services.s3.internal.crt; +import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER; +import static software.amazon.awssdk.services.s3.crt.S3CrtSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE; import static software.amazon.awssdk.services.s3.internal.crt.CrtChecksumUtils.checksumConfig; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN; -import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM; -import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE; +import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OBJECT_FILE_PATH; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME; import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION; -import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SOURCE_REQ_PATH; import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely; import java.net.URI; @@ -127,7 +127,7 @@ public CompletableFuture execute(AsyncExecuteRequest asyncRequest) { HttpChecksum httpChecksum = asyncRequest.httpExecutionAttributes().getAttribute(HTTP_CHECKSUM); ResumeToken resumeToken = asyncRequest.httpExecutionAttributes().getAttribute(CRT_PAUSE_RESUME_TOKEN); Region signingRegion = asyncRequest.httpExecutionAttributes().getAttribute(SIGNING_REGION); - Path requestFilePath = asyncRequest.httpExecutionAttributes().getAttribute(SOURCE_REQ_PATH); + Path requestFilePath = asyncRequest.httpExecutionAttributes().getAttribute(OBJECT_FILE_PATH); ChecksumConfig checksumConfig = checksumConfig(httpChecksum, requestType, s3NativeClientConfiguration.checksumValidationEnabled()); URI endpoint = getEndpoint(uri); @@ -202,7 +202,7 @@ private static void addCancelCallback(CompletableFuture executeFuture, private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) { SdkHttpRequest sdkRequest = asyncRequest.request(); - Path requestFilePath = asyncRequest.httpExecutionAttributes().getAttribute(SOURCE_REQ_PATH); + Path requestFilePath = asyncRequest.httpExecutionAttributes().getAttribute(OBJECT_FILE_PATH); String method = sdkRequest.method().name(); String encodedPath = sdkRequest.encodedPath(); diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtProgressListener.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtProgressListener.java deleted file mode 100644 index 0b216b5b7642..000000000000 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtProgressListener.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://aws.amazon.com/apache2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package software.amazon.awssdk.services.s3.internal.crt; - -import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.core.async.listener.PublisherListener; -import software.amazon.awssdk.crt.s3.S3MetaRequestProgress; - -/** - * S3CrtProgressListener delegates events to the underlying delegateListener if defined, avoiding null checks for API when using - * S3MetaRequestProgress for GET calls. - */ -@SdkInternalApi -public final class S3CrtProgressListener implements PublisherListener { - - PublisherListener delegateListener; - - private S3CrtProgressListener(Builder builder) { - this.delegateListener = builder.delegateListener; - } - - static Builder builder() { - return new Builder(); - } - - @Override - public void subscriberOnNext(S3MetaRequestProgress s3MetaRequestProgress) { - if (delegateListener != null) { - delegateListener.subscriberOnNext(s3MetaRequestProgress); - } - } - - @Override - public void subscriberOnComplete() { - if (delegateListener != null) { - delegateListener.subscriberOnComplete(); - } - } - - @Override - public void subscriberOnError(Throwable t) { - if (delegateListener != null) { - delegateListener.subscriberOnError(t); - } - } - - @Override - public void subscriptionCancel() { - if (delegateListener != null) { - delegateListener.subscriptionCancel(); - } - } - - static class Builder { - - PublisherListener delegateListener; - - Builder delegateListener(PublisherListener delegateListener) { - this.delegateListener = delegateListener; - return this; - } - - public S3CrtProgressListener build() { - return new S3CrtProgressListener(this); - } - } -} \ No newline at end of file diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java index e3c028e26fe4..ec81d624351e 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java @@ -55,7 +55,7 @@ public S3CrtResponseHandlerAdapter(CompletableFuture executeFuture, PublisherListener progressListener) { this.resultFuture = executeFuture; this.responseHandler = responseHandler; - this.progressListener = S3CrtProgressListener.builder().delegateListener(progressListener).build(); + this.progressListener = progressListener == null ? new NoOpPublisherListener() : progressListener; } @Override @@ -172,4 +172,7 @@ public void metaRequest(S3MetaRequest s3MetaRequest) { public void onProgress(S3MetaRequestProgress progress) { this.progressListener.subscriberOnNext(progress); } + + private static class NoOpPublisherListener implements PublisherListener { + } } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3InternalSdkHttpExecutionAttribute.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3InternalSdkHttpExecutionAttribute.java index 41f2151f439c..5a0115222cd7 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3InternalSdkHttpExecutionAttribute.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3InternalSdkHttpExecutionAttribute.java @@ -17,7 +17,6 @@ import java.nio.file.Path; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.core.async.listener.PublisherListener; import software.amazon.awssdk.core.interceptor.trait.HttpChecksum; import software.amazon.awssdk.crt.s3.ResumeToken; import software.amazon.awssdk.http.SdkHttpExecutionAttribute; @@ -34,8 +33,6 @@ public final class S3InternalSdkHttpExecutionAttribute extends SdkHttpExecuti public static final S3InternalSdkHttpExecutionAttribute HTTP_CHECKSUM = new S3InternalSdkHttpExecutionAttribute<>(HttpChecksum.class); - public static final S3InternalSdkHttpExecutionAttribute METAREQUEST_PAUSE_OBSERVABLE = - new S3InternalSdkHttpExecutionAttribute<>(S3MetaRequestPauseObservable.class); public static final S3InternalSdkHttpExecutionAttribute CRT_PAUSE_RESUME_TOKEN = new S3InternalSdkHttpExecutionAttribute<>(ResumeToken.class); @@ -43,13 +40,10 @@ public final class S3InternalSdkHttpExecutionAttribute extends SdkHttpExecuti public static final S3InternalSdkHttpExecutionAttribute SIGNING_REGION = new S3InternalSdkHttpExecutionAttribute<>(Region.class); - public static final S3InternalSdkHttpExecutionAttribute SOURCE_REQ_PATH = + public static final S3InternalSdkHttpExecutionAttribute OBJECT_FILE_PATH = new S3InternalSdkHttpExecutionAttribute<>(Path.class); - public static final S3InternalSdkHttpExecutionAttribute CRT_PROGRESS_LISTENER = - new S3InternalSdkHttpExecutionAttribute<>(PublisherListener.class); - private S3InternalSdkHttpExecutionAttribute(Class valueClass) { super(valueClass); } From 65ddbcab4585f2bb5e3612143d99711b560a4e48 Mon Sep 17 00:00:00 2001 From: John Viegas Date: Fri, 8 Sep 2023 09:35:14 -0700 Subject: [PATCH 4/8] Added Integ Test for CRT Progress listener updates --- .../transfer/s3/CaptureTransferListener.java | 67 +++++++++++++++++++ ...3TransferManagerUploadIntegrationTest.java | 58 +++++++++++++++- .../progress/TransferProgressUpdater.java | 4 +- 3 files changed, 126 insertions(+), 3 deletions(-) create mode 100644 services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/CaptureTransferListener.java diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/CaptureTransferListener.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/CaptureTransferListener.java new file mode 100644 index 000000000000..315a378ec11a --- /dev/null +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/CaptureTransferListener.java @@ -0,0 +1,67 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3; + +import java.util.ArrayList; +import java.util.List; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; + +public class CaptureTransferListener implements TransferListener { + public Boolean isTransferInitiated() { + return transferInitiated; + } + + public Boolean isTransferComplete() { + return transferComplete; + } + + public List getRatioTransferredList() { + return ratioTransferredList; + } + + public Throwable getExceptionCaught() { + return exceptionCaught; + } + + private Boolean transferInitiated = false; + private Boolean transferComplete = false; + + private List ratioTransferredList = new ArrayList<>(); + private Throwable exceptionCaught; + + @Override + public void transferInitiated(Context.TransferInitiated context) { + transferInitiated = true; + context.progressSnapshot().ratioTransferred().ifPresent(ratioTransferredList::add); + + } + + @Override + public void bytesTransferred(Context.BytesTransferred context) { + context.progressSnapshot().ratioTransferred().ifPresent(ratioTransferredList::add); + } + + @Override + public void transferComplete(Context.TransferComplete context) { + context.progressSnapshot().ratioTransferred().ifPresent(ratioTransferredList::add); + transferComplete = true; + } + + @Override + public void transferFailed(Context.TransferFailed context) { + exceptionCaught = context.exception(); + } +} diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java index a1211b2f9f0b..f87059596732 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java @@ -16,6 +16,7 @@ package software.amazon.awssdk.transfer.s3; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static software.amazon.awssdk.testutils.service.S3BucketUtils.temporaryBucketName; import java.io.IOException; @@ -24,6 +25,8 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -32,6 +35,7 @@ import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload; import software.amazon.awssdk.transfer.s3.model.CompletedUpload; @@ -64,11 +68,13 @@ public static void teardown() throws IOException { @Test void upload_file_SentCorrectly() throws IOException { Map metadata = new HashMap<>(); - metadata.put("x-amz-meta-foobar", "FOO BAR"); + CaptureTransferListener transferListener = new CaptureTransferListener(); + metadata.put("x-amz-meta-foobar", "FOO BAR"); FileUpload fileUpload = tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(TEST_BUCKET).key(TEST_KEY).metadata(metadata).checksumAlgorithm(ChecksumAlgorithm.CRC32)) .source(testFile.toPath()) .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) .build()); CompletedFileUpload completedFileUpload = fileUpload.completionFuture().join(); @@ -83,17 +89,29 @@ void upload_file_SentCorrectly() throws IOException { assertThat(obj.response().responseMetadata().requestId()).isNotNull(); assertThat(obj.response().metadata()).containsEntry("foobar", "FOO BAR"); assertThat(fileUpload.progress().snapshot().sdkResponse()).isPresent(); + assertListenerForSuccessfulTransferComplete(transferListener); + } + + private static void assertListenerForSuccessfulTransferComplete(CaptureTransferListener transferListener) { + assertThat(transferListener.isTransferInitiated()).isTrue(); + assertThat(transferListener.isTransferInitiated()).isTrue(); + assertThat(transferListener.getRatioTransferredList()).isNotEmpty(); + assertThat(transferListener.getRatioTransferredList().contains(0.0)); + assertThat(transferListener.getRatioTransferredList().contains(100.0)); + assertThat(transferListener.getExceptionCaught()).isNull(); } @Test void upload_asyncRequestBody_SentCorrectly() throws IOException { String content = UUID.randomUUID().toString(); + CaptureTransferListener transferListener = new CaptureTransferListener(); Upload upload = tm.upload(UploadRequest.builder() .putObjectRequest(b -> b.bucket(TEST_BUCKET).key(TEST_KEY)) .requestBody(AsyncRequestBody.fromString(content)) .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) .build()); CompletedUpload completedUpload = upload.completionFuture().join(); @@ -107,5 +125,43 @@ void upload_asyncRequestBody_SentCorrectly() throws IOException { .isEqualTo(ChecksumUtils.computeCheckSum(obj)); assertThat(obj.response().responseMetadata().requestId()).isNotNull(); assertThat(upload.progress().snapshot().sdkResponse()).isPresent(); + assertListenerForSuccessfulTransferComplete(transferListener); + + } + + @Test + void upload_file_Interupted_CancelsTheListener() throws IOException, InterruptedException { + Map metadata = new HashMap<>(); + CaptureTransferListener transferListener = new CaptureTransferListener(); + metadata.put("x-amz-meta-foobar", "FOO BAR"); + FileUpload fileUpload = + tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(TEST_BUCKET).key(TEST_KEY).metadata(metadata).checksumAlgorithm(ChecksumAlgorithm.CRC32)) + .source(testFile.toPath()) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .build()); + + fileUpload.completionFuture().cancel(true); + assertThat(transferListener.isTransferInitiated()).isTrue(); + assertThat(transferListener.isTransferComplete()).isFalse(); + assertThat(transferListener.getExceptionCaught()).isInstanceOf(CancellationException.class); + assertThat(transferListener.getRatioTransferredList().get(transferListener.getRatioTransferredList().size() - 1)) + .isNotEqualTo(100.0); + } + + @Test + void upload_file_errorPropagatedToListener() throws IOException, InterruptedException { + Map metadata = new HashMap<>(); + CaptureTransferListener transferListener = new CaptureTransferListener(); + metadata.put("x-amz-meta-foobar", "FOO BAR"); + FileUpload fileUpload = + tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(TEST_BUCKET+TEST_BUCKET).key(TEST_KEY).metadata(metadata).checksumAlgorithm(ChecksumAlgorithm.CRC32)) + .source(testFile.toPath()) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .build()); + assertThatExceptionOfType(CompletionException.class).isThrownBy( + () -> fileUpload.completionFuture().join()); + assertThat(transferListener.getExceptionCaught()).isInstanceOf(NoSuchBucketException.class); } } diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java index 8b5f7cce741d..c6c8bb056b10 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java @@ -107,7 +107,7 @@ public void publisherSubscribe(Subscriber subscri @Override public void subscriberOnNext(S3MetaRequestProgress s3MetaRequestProgress) { - incrementBytesTransferred(Math.toIntExact(s3MetaRequestProgress.getBytesTransferred())); + incrementBytesTransferred(s3MetaRequestProgress.getBytesTransferred()); } @Override @@ -165,7 +165,7 @@ private void resetBytesTransferred() { progress.updateAndGet(b -> b.transferredBytes(0L)); } - private void incrementBytesTransferred(int numBytes) { + private void incrementBytesTransferred(long numBytes) { TransferProgressSnapshot snapshot = progress.updateAndGet(b -> { b.transferredBytes(b.getTransferredBytes() + numBytes); }); From 137862e2f3f95e6343e51a5634a2543f0888be58 Mon Sep 17 00:00:00 2001 From: John Viegas Date: Fri, 8 Sep 2023 14:46:59 -0700 Subject: [PATCH 5/8] Test cases to check on Failure --- services-custom/s3-transfer-manager/pom.xml | 5 + ...3TransferManagerUploadIntegrationTest.java | 18 -- .../S3CrtTransferProgressListenerTest.java | 195 ++++++++++++++++++ .../crt/S3CrtResponseHandlerAdapter.java | 1 - 4 files changed, 200 insertions(+), 19 deletions(-) create mode 100644 services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java diff --git a/services-custom/s3-transfer-manager/pom.xml b/services-custom/s3-transfer-manager/pom.xml index 42ba838e9ed8..9968535f9df9 100644 --- a/services-custom/s3-transfer-manager/pom.xml +++ b/services-custom/s3-transfer-manager/pom.xml @@ -212,6 +212,11 @@ ${commons-codec.verion} test + + wiremock-jre8 + com.github.tomakehurst + test + diff --git a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java index f87059596732..026652ecef62 100644 --- a/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java +++ b/services-custom/s3-transfer-manager/src/it/java/software/amazon/awssdk/transfer/s3/S3TransferManagerUploadIntegrationTest.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.CancellationException; -import java.util.concurrent.CompletionException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -35,7 +34,6 @@ import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm; import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.NoSuchBucketException; import software.amazon.awssdk.testutils.RandomTempFile; import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload; import software.amazon.awssdk.transfer.s3.model.CompletedUpload; @@ -148,20 +146,4 @@ void upload_file_Interupted_CancelsTheListener() throws IOException, Interrupted assertThat(transferListener.getRatioTransferredList().get(transferListener.getRatioTransferredList().size() - 1)) .isNotEqualTo(100.0); } - - @Test - void upload_file_errorPropagatedToListener() throws IOException, InterruptedException { - Map metadata = new HashMap<>(); - CaptureTransferListener transferListener = new CaptureTransferListener(); - metadata.put("x-amz-meta-foobar", "FOO BAR"); - FileUpload fileUpload = - tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(TEST_BUCKET+TEST_BUCKET).key(TEST_KEY).metadata(metadata).checksumAlgorithm(ChecksumAlgorithm.CRC32)) - .source(testFile.toPath()) - .addTransferListener(LoggingTransferListener.create()) - .addTransferListener(transferListener) - .build()); - assertThatExceptionOfType(CompletionException.class).isThrownBy( - () -> fileUpload.completionFuture().join()); - assertThat(transferListener.getExceptionCaught()).isInstanceOf(NoSuchBucketException.class); - } } diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java new file mode 100644 index 000000000000..99d59c4472da --- /dev/null +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java @@ -0,0 +1,195 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.transfer.s3.internal; + + +import static com.github.tomakehurst.wiremock.client.WireMock.aResponse; +import static com.github.tomakehurst.wiremock.client.WireMock.any; +import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl; +import static com.github.tomakehurst.wiremock.client.WireMock.stubFor; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +import com.github.tomakehurst.wiremock.junit5.WireMockRuntimeInfo; +import com.github.tomakehurst.wiremock.junit5.WireMockTest; +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletionException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; +import software.amazon.awssdk.services.s3.model.S3Exception; +import software.amazon.awssdk.testutils.RandomTempFile; +import software.amazon.awssdk.transfer.s3.CaptureTransferListener; +import software.amazon.awssdk.transfer.s3.S3TransferManager; +import software.amazon.awssdk.transfer.s3.model.FileUpload; +import software.amazon.awssdk.transfer.s3.progress.LoggingTransferListener; +import software.amazon.awssdk.transfer.s3.progress.TransferListener; + +@WireMockTest +public class S3CrtTransferProgressListenerTest { + public static final String ERROR_CODE = "NoSuchBucket"; + public static final String ERROR_MESSAGE = "We encountered an internal error. Please try again."; + public static final String ERROR_BODY = "\n" + + "\n" + + " " + ERROR_CODE + "\n" + + " " + ERROR_MESSAGE + "\n" + + ""; + private static final String EXAMPLE_BUCKET = "Example-Bucket"; + private static final String TEST_KEY = "16mib_file.dat"; + private static final int OBJ_SIZE = 16 * 1024; + private RandomTempFile testFile; + + + @BeforeEach + public void setUp() throws IOException { + testFile = new RandomTempFile(TEST_KEY, OBJ_SIZE); + } + + private static void assertMockOnFailure(TransferListener transferListenerMock) { + Mockito.verify(transferListenerMock, times(1)).bytesTransferred(ArgumentMatchers.any()); + Mockito.verify(transferListenerMock, times(1)).transferFailed(ArgumentMatchers.any()); + Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any()); + Mockito.verify(transferListenerMock, times(0)).transferComplete(ArgumentMatchers.any()); + } + + private S3CrtAsyncClientBuilder getAsyncClientBuilder(WireMockRuntimeInfo wm) { + return S3AsyncClient.crtBuilder() + .region(Region.US_EAST_1) + .endpointOverride(URI.create(wm.getHttpBaseUrl())) + .credentialsProvider( + StaticCredentialsProvider.create(AwsBasicCredentials.create("key", "secret"))); + + } + + @Test + void listeners_reports_ErrorsWithValidPayload(WireMockRuntimeInfo wm) { + TransferListener transferListenerMock = mock(TransferListener.class); + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(404).withBody(ERROR_BODY))); + S3TransferManager tm = new GenericS3TransferManager(getAsyncClientBuilder(wm).build(), mock(UploadDirectoryHelper.class), + mock(TransferManagerConfiguration.class), + mock(DownloadDirectoryHelper.class)); + CaptureTransferListener transferListener = new CaptureTransferListener(); + FileUpload fileUpload = + tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key("KEY")) + .source(testFile) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .addTransferListener(transferListenerMock) + .build()); + + assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> fileUpload.completionFuture().join()); + assertThat(transferListener.getExceptionCaught()).isInstanceOf(NoSuchBucketException.class); + assertThat(transferListener.isTransferComplete()).isFalse(); + assertThat(transferListener.isTransferInitiated()).isTrue(); + + assertMockOnFailure(transferListenerMock); + } + + @Test + void listeners_reports_ErrorsWithValidInValidPayload(WireMockRuntimeInfo wm) throws InterruptedException { + TransferListener transferListenerMock = mock(TransferListener.class); + + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(404).withBody("?"))); + S3TransferManager tm = new GenericS3TransferManager(getAsyncClientBuilder(wm).build(), mock(UploadDirectoryHelper.class), + mock(TransferManagerConfiguration.class), + mock(DownloadDirectoryHelper.class)); + CaptureTransferListener transferListener = new CaptureTransferListener(); + FileUpload fileUpload = + tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key("KEY")) + .source(testFile) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .addTransferListener(transferListenerMock) + .build()); + + Thread.sleep(10); + + assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> fileUpload.completionFuture().join()); + + assertThat(transferListener.getExceptionCaught()).isInstanceOf(S3Exception.class); + assertThat(transferListener.isTransferComplete()).isFalse(); + assertThat(transferListener.isTransferInitiated()).isTrue(); + assertMockOnFailure(transferListenerMock); + + } + + + @Test + void listeners_reports_ErrorsWhenCancelled(WireMockRuntimeInfo wm) { + TransferListener transferListenerMock = mock(TransferListener.class); + + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody("{}"))); + S3TransferManager tm = new GenericS3TransferManager(getAsyncClientBuilder(wm).build(), mock(UploadDirectoryHelper.class), + mock(TransferManagerConfiguration.class), + mock(DownloadDirectoryHelper.class)); + CaptureTransferListener transferListener = new CaptureTransferListener(); + + tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key("KEY")) + .source(testFile) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .addTransferListener(transferListenerMock) + .build()).completionFuture().cancel(true); + + assertThat(transferListener.getExceptionCaught()).isInstanceOf(CancellationException.class); + assertThat(transferListener.isTransferComplete()).isFalse(); + assertThat(transferListener.isTransferInitiated()).isTrue(); + assertMockOnFailure(transferListenerMock); + + } + + @Test + void listeners_reports_ProgressWhenSuccess(WireMockRuntimeInfo wm) throws InterruptedException { + TransferListener transferListenerMock = mock(TransferListener.class); + + stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody("{}"))); + S3TransferManager tm = new GenericS3TransferManager(getAsyncClientBuilder(wm).build(), mock(UploadDirectoryHelper.class), + mock(TransferManagerConfiguration.class), + mock(DownloadDirectoryHelper.class)); + + CaptureTransferListener transferListener = new CaptureTransferListener(); + + tm.uploadFile(u -> u.putObjectRequest(p -> p.bucket(EXAMPLE_BUCKET).key("KEY")) + .source(testFile) + .addTransferListener(LoggingTransferListener.create()) + .addTransferListener(transferListener) + .addTransferListener(transferListenerMock) + .build()).completionFuture().join(); + + Thread.sleep(20); + + assertThat(transferListener.getExceptionCaught()).isNull(); + assertThat(transferListener.isTransferComplete()).isTrue(); + assertThat(transferListener.isTransferInitiated()).isTrue(); + Mockito.verify(transferListenerMock, times(1)).bytesTransferred(ArgumentMatchers.any()); + Mockito.verify(transferListenerMock, times(0)).transferFailed(ArgumentMatchers.any()); + Mockito.verify(transferListenerMock, times(1)).transferInitiated(ArgumentMatchers.any()); + Mockito.verify(transferListenerMock, times(1)).transferComplete(ArgumentMatchers.any()); + } + + +} diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java index ec81d624351e..cff1a6b6193a 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/S3CrtResponseHandlerAdapter.java @@ -152,7 +152,6 @@ private void onErrorResponseComplete(byte[] errorPayload) { } private void failResponseHandlerAndFuture(Throwable exception) { - this.progressListener.subscriberOnError(exception); resultFuture.completeExceptionally(exception); runAndLogError(log.logger(), "Exception thrown in SdkAsyncHttpResponseHandler#onError, ignoring", () -> responseHandler.onError(exception)); From 99b773668372da603fd2c50286aa9e5a4eccca8b Mon Sep 17 00:00:00 2001 From: John Viegas Date: Fri, 8 Sep 2023 16:09:46 -0700 Subject: [PATCH 6/8] Updated the crt version to lattest one --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 55ac618514a0..24cf140ce653 100644 --- a/pom.xml +++ b/pom.xml @@ -115,7 +115,7 @@ 2.2.21 1.15 1.29 - 0.25.1 + 0.26.0 5.8.1 From 0b88836bc49ccf364d158aedaaffd6d92d87a7fb Mon Sep 17 00:00:00 2001 From: John Viegas Date: Mon, 11 Sep 2023 12:31:23 -0700 Subject: [PATCH 7/8] removed the Findbug error --- .../internal/progress/TransferProgressUpdater.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java index c6c8bb056b10..75f21ade7214 100644 --- a/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java +++ b/services-custom/s3-transfer-manager/src/main/java/software/amazon/awssdk/transfer/s3/internal/progress/TransferProgressUpdater.java @@ -208,18 +208,4 @@ private void transferFailed(Throwable t) { .exception(t) .build()); } - - private static Optional getContentLengthSafe(AsyncRequestBody requestBody) { - if (requestBody == null) { - return Optional.empty(); - } - // requestBody.contentLength() may throw if the file does not exist. - // We ignore any potential exception here to defer failure - // to the s3CrtAsyncClient call and its associated future. - try { - return requestBody.contentLength(); - } catch (Exception ignored) { - return Optional.empty(); - } - } } From 152d896cab122b97661d1d716ccf61f36160915e Mon Sep 17 00:00:00 2001 From: John Viegas Date: Mon, 11 Sep 2023 13:44:39 -0700 Subject: [PATCH 8/8] Added delay in Test so that listeners are updated parallely --- .../internal/S3CrtTransferProgressListenerTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java index 99d59c4472da..580cced1808a 100644 --- a/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java +++ b/services-custom/s3-transfer-manager/src/test/java/software/amazon/awssdk/transfer/s3/internal/S3CrtTransferProgressListenerTest.java @@ -86,7 +86,7 @@ private S3CrtAsyncClientBuilder getAsyncClientBuilder(WireMockRuntimeInfo wm) { } @Test - void listeners_reports_ErrorsWithValidPayload(WireMockRuntimeInfo wm) { + void listeners_reports_ErrorsWithValidPayload(WireMockRuntimeInfo wm) throws InterruptedException { TransferListener transferListenerMock = mock(TransferListener.class); stubFor(any(anyUrl()).willReturn(aResponse().withStatus(404).withBody(ERROR_BODY))); S3TransferManager tm = new GenericS3TransferManager(getAsyncClientBuilder(wm).build(), mock(UploadDirectoryHelper.class), @@ -102,6 +102,7 @@ void listeners_reports_ErrorsWithValidPayload(WireMockRuntimeInfo wm) { .build()); assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> fileUpload.completionFuture().join()); + Thread.sleep(500); assertThat(transferListener.getExceptionCaught()).isInstanceOf(NoSuchBucketException.class); assertThat(transferListener.isTransferComplete()).isFalse(); assertThat(transferListener.isTransferInitiated()).isTrue(); @@ -126,9 +127,8 @@ void listeners_reports_ErrorsWithValidInValidPayload(WireMockRuntimeInfo wm) thr .addTransferListener(transferListenerMock) .build()); - Thread.sleep(10); - assertThatExceptionOfType(CompletionException.class).isThrownBy(() -> fileUpload.completionFuture().join()); + Thread.sleep(500); assertThat(transferListener.getExceptionCaught()).isInstanceOf(S3Exception.class); assertThat(transferListener.isTransferComplete()).isFalse(); @@ -139,7 +139,7 @@ void listeners_reports_ErrorsWithValidInValidPayload(WireMockRuntimeInfo wm) thr @Test - void listeners_reports_ErrorsWhenCancelled(WireMockRuntimeInfo wm) { + void listeners_reports_ErrorsWhenCancelled(WireMockRuntimeInfo wm) throws InterruptedException { TransferListener transferListenerMock = mock(TransferListener.class); stubFor(any(anyUrl()).willReturn(aResponse().withStatus(200).withBody("{}"))); @@ -155,6 +155,8 @@ void listeners_reports_ErrorsWhenCancelled(WireMockRuntimeInfo wm) { .addTransferListener(transferListenerMock) .build()).completionFuture().cancel(true); + Thread.sleep(500); + assertThat(transferListener.getExceptionCaught()).isInstanceOf(CancellationException.class); assertThat(transferListener.isTransferComplete()).isFalse(); assertThat(transferListener.isTransferInitiated()).isTrue(); @@ -180,8 +182,7 @@ void listeners_reports_ProgressWhenSuccess(WireMockRuntimeInfo wm) throws Interr .addTransferListener(transferListenerMock) .build()).completionFuture().join(); - Thread.sleep(20); - + Thread.sleep(500); assertThat(transferListener.getExceptionCaught()).isNull(); assertThat(transferListener.isTransferComplete()).isTrue(); assertThat(transferListener.isTransferInitiated()).isTrue();