Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance Enhancement :Enable Passing of requestPath to CRT and monitor the progress using CRTs S3MetaRequestProgress #4379

Merged
merged 12 commits into from
Sep 12, 2023
Merged
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-7ffc393.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
zoewangg marked this conversation as resolved.
Show resolved Hide resolved
"contributor": "",
"description": "Fix for [Issue#4168](https://github.com/aws/aws-sdk-java-v2/issues/4168) by enabling the direct passing of requestPath to the AWS Common Runtime (CRT). This ensures that Java SDK Transfer Manager no longer performs file I/O when using CRT, regardless."
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
<rxjava.version>2.2.21</rxjava.version>
<commons-codec.verion>1.15</commons-codec.verion>
<jmh.version>1.29</jmh.version>
<awscrt.version>0.24.0</awscrt.version>
<awscrt.version>0.25.1</awscrt.version>

<!--Test dependencies -->
<junit5.version>5.8.1</junit5.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.DEFAULT_FILE_UPLOAD_CHUNK_SIZE;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SOURCE_REQ_PATH;
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn;
import static software.amazon.awssdk.transfer.s3.internal.utils.FileUtils.fileNotModified;

Expand All @@ -27,7 +28,6 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -65,29 +65,25 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
Validate.paramNotNull(uploadFileRequest, "uploadFileRequest");
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();

AsyncRequestBody requestBody =
FileAsyncRequestBody.builder()
.path(uploadFileRequest.source())
.chunkSizeInBytes(DEFAULT_FILE_UPLOAD_CHUNK_SIZE)
.build();
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, null);
zoewangg marked this conversation as resolved.
Show resolved Hide resolved

Consumer<SdkHttpExecutionAttributes.Builder> attachObservable =
b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable);
b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable)
.put(SOURCE_REQ_PATH, uploadFileRequest.source())
.put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener());

PutObjectRequest putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservable);

CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();

TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody);
progressUpdater.transferInitiated();
requestBody = progressUpdater.wrapRequestBody(requestBody);
progressUpdater.registerCompletion(returnFuture);

try {
assertNotUnsupportedArn(putObjectRequest.bucket(), "upload");

CompletableFuture<PutObjectResponse> crtFuture =
s3AsyncClient.putObject(putObjectRequest, requestBody);
s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromFile(uploadFileRequest.source()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If CRT is reading directly from disk, does this publisher get used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CRT directly gets this from file. The Publisher here is in the S3Client Interface and a non null is required just to get the content length a I have updated the code to throw an exception if we try to read the Java SDKs Publisher in such cases.


// Forward upload cancellation to CRT future
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.listener.AsyncRequestBodyListener;
import software.amazon.awssdk.core.async.listener.AsyncResponseTransformerListener;
import software.amazon.awssdk.core.async.listener.PublisherListener;
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.transfer.s3.model.CompletedObjectTransfer;
import software.amazon.awssdk.transfer.s3.model.TransferObjectRequest;
Expand Down Expand Up @@ -95,6 +97,32 @@ public void subscriberOnComplete() {
});
}

public PublisherListener<S3MetaRequestProgress> crtProgressListener() {

return new PublisherListener<S3MetaRequestProgress>() {
@Override
public void publisherSubscribe(Subscriber<? super S3MetaRequestProgress> subscriber) {
resetBytesTransferred();
}

@Override
public void subscriberOnNext(S3MetaRequestProgress s3MetaRequestProgress) {
incrementBytesTransferred(Math.toIntExact(s3MetaRequestProgress.getBytesTransferred()),
zoewangg marked this conversation as resolved.
Show resolved Hide resolved
s3MetaRequestProgress.getContentLength());
}

@Override
public void subscriberOnError(Throwable t) {
transferFailed(t);
}

@Override
public void subscriberOnComplete() {
endOfStreamFuture.complete(null);
}
};
}

public <ResultT> AsyncResponseTransformer<GetObjectResponse, ResultT> wrapResponseTransformer(
AsyncResponseTransformer<GetObjectResponse, ResultT> responseTransformer) {
return AsyncResponseTransformerListener.wrap(
Expand Down Expand Up @@ -145,6 +173,13 @@ private void incrementBytesTransferred(int numBytes) {
listenerInvoker.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot)));
}

private void incrementBytesTransferred(int numBytes, long totalBytes) {
TransferProgressSnapshot snapshot = progress.updateAndGet(b -> {
b.transferredBytes(b.getTransferredBytes() + numBytes).totalBytes(totalBytes);
});
listenerInvoker.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot)));
}

public void registerCompletion(CompletableFuture<? extends CompletedObjectTransfer> future) {
future.whenComplete((r, t) -> {
if (t == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

import static software.amazon.awssdk.services.s3.internal.crt.CrtChecksumUtils.checksumConfig;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SOURCE_REQ_PATH;
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;

import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -115,14 +118,16 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
URI uri = asyncRequest.request().getUri();
HttpRequest httpRequest = toCrtRequest(asyncRequest);
S3CrtResponseHandlerAdapter responseHandler =
new S3CrtResponseHandlerAdapter(executeFuture, asyncRequest.responseHandler());
new S3CrtResponseHandlerAdapter(executeFuture,
asyncRequest.responseHandler(),
asyncRequest.httpExecutionAttributes().getAttribute(CRT_PROGRESS_LISTENER));

S3MetaRequestOptions.MetaRequestType requestType = requestType(asyncRequest);

HttpChecksum httpChecksum = asyncRequest.httpExecutionAttributes().getAttribute(HTTP_CHECKSUM);
ResumeToken resumeToken = asyncRequest.httpExecutionAttributes().getAttribute(CRT_PAUSE_RESUME_TOKEN);
Region signingRegion = asyncRequest.httpExecutionAttributes().getAttribute(SIGNING_REGION);

Path requestFilePath = asyncRequest.httpExecutionAttributes().getAttribute(SOURCE_REQ_PATH);
ChecksumConfig checksumConfig =
checksumConfig(httpChecksum, requestType, s3NativeClientConfiguration.checksumValidationEnabled());
URI endpoint = getEndpoint(uri);
Expand All @@ -133,7 +138,8 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
.withChecksumConfig(checksumConfig)
.withEndpoint(endpoint)
.withResponseHandler(responseHandler)
.withResumeToken(resumeToken);
.withResumeToken(resumeToken)
.withRequestFilePath(requestFilePath);

// Create a new SigningConfig object only if the signing region has changed from the previously configured region.
if (signingRegion != null && !s3ClientOptions.getRegion().equals(signingRegion.id())) {
Expand Down Expand Up @@ -196,6 +202,8 @@ private static void addCancelCallback(CompletableFuture<Void> executeFuture,
private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) {
SdkHttpRequest sdkRequest = asyncRequest.request();

Path requestFilePath = asyncRequest.httpExecutionAttributes().getAttribute(SOURCE_REQ_PATH);

String method = sdkRequest.method().name();
String encodedPath = sdkRequest.encodedPath();
if (encodedPath == null || encodedPath.isEmpty()) {
Expand All @@ -208,8 +216,9 @@ private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) {

HttpHeader[] crtHeaderArray = createHttpHeaderList(asyncRequest).toArray(new HttpHeader[0]);


S3CrtRequestBodyStreamAdapter sdkToCrtRequestPublisher =
new S3CrtRequestBodyStreamAdapter(asyncRequest.requestContentPublisher());
requestFilePath == null ? new S3CrtRequestBodyStreamAdapter(asyncRequest.requestContentPublisher()) : null;

return new HttpRequest(method, encodedPath + encodedQueryString, crtHeaderArray, sdkToCrtRequestPublisher);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crt;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.listener.PublisherListener;
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;

/**
* S3CrtProgressListener delegates events to the underlying delegateListener if defined, avoiding null checks for API when using
* S3MetaRequestProgress for GET calls.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels like a weird application of polymorphism. Instead of S3CrtProgressListener, can we just have a NoOpPublisherListener that we create if delegateListener is null? That just means we need the null check in the setup code of S3CrtResponseHandlerAdapter instead of per-method here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created NoOpPublisherListener

@SdkInternalApi
public final class S3CrtProgressListener implements PublisherListener<S3MetaRequestProgress> {

PublisherListener<S3MetaRequestProgress> delegateListener;

private S3CrtProgressListener(Builder builder) {
this.delegateListener = builder.delegateListener;
}

static Builder builder() {
return new Builder();
}

@Override
public void subscriberOnNext(S3MetaRequestProgress s3MetaRequestProgress) {
if (delegateListener != null) {
delegateListener.subscriberOnNext(s3MetaRequestProgress);
}
}

@Override
public void subscriberOnComplete() {
if (delegateListener != null) {
delegateListener.subscriberOnComplete();
}
}

@Override
public void subscriberOnError(Throwable t) {
if (delegateListener != null) {
delegateListener.subscriberOnError(t);
}
}

@Override
public void subscriptionCancel() {
if (delegateListener != null) {
delegateListener.subscriptionCancel();
}
}

static class Builder {

PublisherListener<S3MetaRequestProgress> delegateListener;

Builder delegateListener(PublisherListener<S3MetaRequestProgress> delegateListener) {
this.delegateListener = delegateListener;
return this;
}

public S3CrtProgressListener build() {
return new S3CrtProgressListener(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.listener.PublisherListener;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.s3.S3FinishedResponseContext;
import software.amazon.awssdk.crt.s3.S3MetaRequest;
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;
import software.amazon.awssdk.crt.s3.S3MetaRequestResponseHandler;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpResponse;
Expand All @@ -46,9 +48,14 @@ public final class S3CrtResponseHandlerAdapter implements S3MetaRequestResponseH
private final SdkHttpResponse.Builder respBuilder = SdkHttpResponse.builder();
private volatile S3MetaRequest metaRequest;

public S3CrtResponseHandlerAdapter(CompletableFuture<Void> executeFuture, SdkAsyncHttpResponseHandler responseHandler) {
private final PublisherListener<S3MetaRequestProgress> progressListener;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to invoke this.progressListener.subscriberOnError(); in onErrorResponseComplete?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are already calling this.progressListener.subscriberOnError(exception); from failResponseHandlerAndFuture which is called from errorPayload != null also errorPayload == null for valid errorPayloads

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failResponseHandlerAndFuture doesn't seem to be invoked from onErrorResponseComplete if it successfully marshalls the error response though. Can we add a test to verify it? This test may not suffice now that we are relying on the real CRT client to report progress and it is using a mock s3 client.

        responsePublisher.send(ByteBuffer.wrap(errorPayload))
                         .thenRun(responsePublisher::complete)
                         .handle((ignore, throwable) -> {
                             if (throwable != null) {
                                 failResponseHandlerAndFuture(throwable);
                                 return null;
                             }
                             completeFutureAndCloseRequest();
                             return null;
                         });

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added new mock tests and removed explicit onError


public S3CrtResponseHandlerAdapter(CompletableFuture<Void> executeFuture,
SdkAsyncHttpResponseHandler responseHandler,
PublisherListener<S3MetaRequestProgress> progressListener) {
this.resultFuture = executeFuture;
this.responseHandler = responseHandler;
this.progressListener = S3CrtProgressListener.builder().delegateListener(progressListener).build();
}

@Override
Expand Down Expand Up @@ -103,7 +110,7 @@ private void onSuccessfulResponseComplete() {
failResponseHandlerAndFuture(failure);
return;
}

this.progressListener.subscriberOnComplete();
completeFutureAndCloseRequest();
});
}
Expand Down Expand Up @@ -145,6 +152,7 @@ private void onErrorResponseComplete(byte[] errorPayload) {
}

private void failResponseHandlerAndFuture(Throwable exception) {
this.progressListener.subscriberOnError(exception);
resultFuture.completeExceptionally(exception);
runAndLogError(log.logger(), "Exception thrown in SdkAsyncHttpResponseHandler#onError, ignoring",
() -> responseHandler.onError(exception));
Expand All @@ -159,4 +167,9 @@ private static boolean isErrorResponse(int responseStatus) {
public void metaRequest(S3MetaRequest s3MetaRequest) {
metaRequest = s3MetaRequest;
}

@Override
public void onProgress(S3MetaRequestProgress progress) {
this.progressListener.subscriberOnNext(progress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

package software.amazon.awssdk.services.s3.internal.crt;

import java.nio.file.Path;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.listener.PublisherListener;
import software.amazon.awssdk.core.interceptor.trait.HttpChecksum;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.http.SdkHttpExecutionAttribute;
Expand All @@ -41,6 +43,13 @@ public final class S3InternalSdkHttpExecutionAttribute<T> extends SdkHttpExecuti
public static final S3InternalSdkHttpExecutionAttribute<Region> SIGNING_REGION =
new S3InternalSdkHttpExecutionAttribute<>(Region.class);

public static final S3InternalSdkHttpExecutionAttribute<Path> SOURCE_REQ_PATH =
new S3InternalSdkHttpExecutionAttribute<>(Path.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming nitpicks, since this actually seems like a protected API.

  • SOURCE - Do we ever expect to use this for something other than a PutObject - like a GetObject, where it's actually the DESTINATION?
  • REQ - We should almost always use the full word for something instead of shortening it, so REQUEST. It looks like a lot of the existing properties are request properties, though, so I don't think REQUEST is necessary.

If it's always the source, what about PUT_OBJECT_PATH? UPLOAD_PATH?
If it's potentially also destination, what about UPLOAD_DOWNLOAD_PATH?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Matt..
Named it as OBJECT_FILE_PATH so that it can be used for any file path for get/put



public static final S3InternalSdkHttpExecutionAttribute<PublisherListener> CRT_PROGRESS_LISTENER =
new S3InternalSdkHttpExecutionAttribute<>(PublisherListener.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is marked as @SdkInternalApi, so it can't be used by the transfer manager module, like we're doing above. Do we need to move a subset to a @SdkProtectedApi?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


private S3InternalSdkHttpExecutionAttribute(Class<T> valueClass) {
super(valueClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public void setup() {
future = new CompletableFuture<>();
sdkResponseHandler = new TestResponseHandler();
responseHandlerAdapter = new S3CrtResponseHandlerAdapter(future,
sdkResponseHandler);

sdkResponseHandler,
null);
responseHandlerAdapter.metaRequest(s3MetaRequest);
}

Expand Down