Skip to content

Commit

Permalink
Performance Enhancement :Enable Passing of requestPath to CRT, Monito…
Browse files Browse the repository at this point in the history
…r Progress Updates for putObject Operations
  • Loading branch information
joviegas committed Aug 31, 2023
1 parent 233668b commit 63065aa
Show file tree
Hide file tree
Showing 9 changed files with 168 additions and 20 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-S3TransferManager-7ffc393.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "S3 Transfer Manager",
"contributor": "",
"description": "Fix for [Issue#4168](https://github.com/aws/aws-sdk-java-v2/issues/4168) by enabling the direct passing of requestPath to the AWS Common Runtime (CRT). This ensures that Java SDK Transfer Manager no longer performs file I/O when using CRT, regardless."
}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@
<rxjava.version>2.2.21</rxjava.version>
<commons-codec.verion>1.15</commons-codec.verion>
<jmh.version>1.29</jmh.version>
<awscrt.version>0.24.0</awscrt.version>
<awscrt.version>0.25.1</awscrt.version>

<!--Test dependencies -->
<junit5.version>5.8.1</junit5.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@

import static software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute.SDK_HTTP_EXECUTION_ATTRIBUTES;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.DEFAULT_FILE_UPLOAD_CHUNK_SIZE;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SOURCE_REQ_PATH;
import static software.amazon.awssdk.transfer.s3.internal.GenericS3TransferManager.assertNotUnsupportedArn;
import static software.amazon.awssdk.transfer.s3.internal.utils.FileUtils.fileNotModified;

Expand All @@ -27,7 +28,6 @@
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.core.internal.async.FileAsyncRequestBody;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.http.SdkHttpExecutionAttributes;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -65,29 +65,25 @@ public FileUpload uploadFile(UploadFileRequest uploadFileRequest) {
Validate.paramNotNull(uploadFileRequest, "uploadFileRequest");
S3MetaRequestPauseObservable observable = new S3MetaRequestPauseObservable();

AsyncRequestBody requestBody =
FileAsyncRequestBody.builder()
.path(uploadFileRequest.source())
.chunkSizeInBytes(DEFAULT_FILE_UPLOAD_CHUNK_SIZE)
.build();
TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, null);

Consumer<SdkHttpExecutionAttributes.Builder> attachObservable =
b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable);
b -> b.put(METAREQUEST_PAUSE_OBSERVABLE, observable)
.put(SOURCE_REQ_PATH, uploadFileRequest.source())
.put(CRT_PROGRESS_LISTENER, progressUpdater.crtProgressListener());

PutObjectRequest putObjectRequest = attachSdkAttribute(uploadFileRequest.putObjectRequest(), attachObservable);

CompletableFuture<CompletedFileUpload> returnFuture = new CompletableFuture<>();

TransferProgressUpdater progressUpdater = new TransferProgressUpdater(uploadFileRequest, requestBody);
progressUpdater.transferInitiated();
requestBody = progressUpdater.wrapRequestBody(requestBody);
progressUpdater.registerCompletion(returnFuture);

try {
assertNotUnsupportedArn(putObjectRequest.bucket(), "upload");

CompletableFuture<PutObjectResponse> crtFuture =
s3AsyncClient.putObject(putObjectRequest, requestBody);
s3AsyncClient.putObject(putObjectRequest, AsyncRequestBody.fromFile(uploadFileRequest.source()));

// Forward upload cancellation to CRT future
CompletableFutureUtils.forwardExceptionTo(returnFuture, crtFuture);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.listener.AsyncRequestBodyListener;
import software.amazon.awssdk.core.async.listener.AsyncResponseTransformerListener;
import software.amazon.awssdk.core.async.listener.PublisherListener;
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.transfer.s3.model.CompletedObjectTransfer;
import software.amazon.awssdk.transfer.s3.model.TransferObjectRequest;
Expand Down Expand Up @@ -95,6 +97,32 @@ public void subscriberOnComplete() {
});
}

public PublisherListener<S3MetaRequestProgress> crtProgressListener() {

return new PublisherListener<S3MetaRequestProgress>() {
@Override
public void publisherSubscribe(Subscriber<? super S3MetaRequestProgress> subscriber) {
resetBytesTransferred();
}

@Override
public void subscriberOnNext(S3MetaRequestProgress s3MetaRequestProgress) {
incrementBytesTransferred(Long.valueOf(s3MetaRequestProgress.getBytesTransferred()).intValue(),
s3MetaRequestProgress.getContentLength());
}

@Override
public void subscriberOnError(Throwable t) {
transferFailed(t);
}

@Override
public void subscriberOnComplete() {
endOfStreamFuture.complete(null);
}
};
}

public <ResultT> AsyncResponseTransformer<GetObjectResponse, ResultT> wrapResponseTransformer(
AsyncResponseTransformer<GetObjectResponse, ResultT> responseTransformer) {
return AsyncResponseTransformerListener.wrap(
Expand Down Expand Up @@ -145,6 +173,13 @@ private void incrementBytesTransferred(int numBytes) {
listenerInvoker.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot)));
}

private void incrementBytesTransferred(int numBytes, long totalBytes) {
TransferProgressSnapshot snapshot = progress.updateAndGet(b -> {
b.transferredBytes(b.getTransferredBytes() + numBytes).totalBytes(totalBytes);
});
listenerInvoker.bytesTransferred(context.copy(b -> b.progressSnapshot(snapshot)));
}

public void registerCompletion(CompletableFuture<? extends CompletedObjectTransfer> future) {
future.whenComplete((r, t) -> {
if (t == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

import static software.amazon.awssdk.services.s3.internal.crt.CrtChecksumUtils.checksumConfig;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PAUSE_RESUME_TOKEN;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.CRT_PROGRESS_LISTENER;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.HTTP_CHECKSUM;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.METAREQUEST_PAUSE_OBSERVABLE;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.OPERATION_NAME;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SIGNING_REGION;
import static software.amazon.awssdk.services.s3.internal.crt.S3InternalSdkHttpExecutionAttribute.SOURCE_REQ_PATH;
import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;

import java.net.URI;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -115,14 +118,16 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
URI uri = asyncRequest.request().getUri();
HttpRequest httpRequest = toCrtRequest(asyncRequest);
S3CrtResponseHandlerAdapter responseHandler =
new S3CrtResponseHandlerAdapter(executeFuture, asyncRequest.responseHandler());
new S3CrtResponseHandlerAdapter(executeFuture,
asyncRequest.responseHandler(),
asyncRequest.httpExecutionAttributes().getAttribute(CRT_PROGRESS_LISTENER));

S3MetaRequestOptions.MetaRequestType requestType = requestType(asyncRequest);

HttpChecksum httpChecksum = asyncRequest.httpExecutionAttributes().getAttribute(HTTP_CHECKSUM);
ResumeToken resumeToken = asyncRequest.httpExecutionAttributes().getAttribute(CRT_PAUSE_RESUME_TOKEN);
Region signingRegion = asyncRequest.httpExecutionAttributes().getAttribute(SIGNING_REGION);

Path requestFilePath = asyncRequest.httpExecutionAttributes().getAttribute(SOURCE_REQ_PATH);
ChecksumConfig checksumConfig =
checksumConfig(httpChecksum, requestType, s3NativeClientConfiguration.checksumValidationEnabled());
URI endpoint = getEndpoint(uri);
Expand All @@ -133,7 +138,8 @@ public CompletableFuture<Void> execute(AsyncExecuteRequest asyncRequest) {
.withChecksumConfig(checksumConfig)
.withEndpoint(endpoint)
.withResponseHandler(responseHandler)
.withResumeToken(resumeToken);
.withResumeToken(resumeToken)
.withRequestFilePath(requestFilePath);

// Create a new SigningConfig object only if the signing region has changed from the previously configured region.
if (signingRegion != null && !s3ClientOptions.getRegion().equals(signingRegion.id())) {
Expand Down Expand Up @@ -196,6 +202,8 @@ private static void addCancelCallback(CompletableFuture<Void> executeFuture,
private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) {
SdkHttpRequest sdkRequest = asyncRequest.request();

Path requestFilePath = asyncRequest.httpExecutionAttributes().getAttribute(SOURCE_REQ_PATH);

String method = sdkRequest.method().name();
String encodedPath = sdkRequest.encodedPath();
if (encodedPath == null || encodedPath.isEmpty()) {
Expand All @@ -208,8 +216,9 @@ private static HttpRequest toCrtRequest(AsyncExecuteRequest asyncRequest) {

HttpHeader[] crtHeaderArray = createHttpHeaderList(asyncRequest).toArray(new HttpHeader[0]);


S3CrtRequestBodyStreamAdapter sdkToCrtRequestPublisher =
new S3CrtRequestBodyStreamAdapter(asyncRequest.requestContentPublisher());
requestFilePath == null ? new S3CrtRequestBodyStreamAdapter(asyncRequest.requestContentPublisher()) : null;

return new HttpRequest(method, encodedPath + encodedQueryString, crtHeaderArray, sdkToCrtRequestPublisher);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services.s3.internal.crt;

import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.listener.PublisherListener;
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;

/**
* S3CrtProgressListener delegates events to the underlying delegateListener if defined, avoiding null checks for API when using
* S3MetaRequestProgress for GET calls.
*/
@SdkInternalApi
public final class S3CrtProgressListener implements PublisherListener<S3MetaRequestProgress> {

PublisherListener<S3MetaRequestProgress> delegateListener;

private S3CrtProgressListener(Builder builder) {
this.delegateListener = builder.delegateListener;
}

static Builder builder() {
return new Builder();
}

@Override
public void subscriberOnNext(S3MetaRequestProgress s3MetaRequestProgress) {
if (delegateListener != null) {
delegateListener.subscriberOnNext(s3MetaRequestProgress);
}
}

@Override
public void subscriberOnComplete() {
if (delegateListener != null) {
delegateListener.subscriberOnComplete();
}
}

@Override
public void subscriberOnError(Throwable t) {
if (delegateListener != null) {
delegateListener.subscriberOnError(t);
}
}

@Override
public void subscriptionCancel() {
if (delegateListener != null) {
delegateListener.subscriptionCancel();
}
}

static class Builder {

PublisherListener<S3MetaRequestProgress> delegateListener;

Builder delegateListener(PublisherListener<S3MetaRequestProgress> delegateListener) {
this.delegateListener = delegateListener;
return this;
}

public S3CrtProgressListener build() {
return new S3CrtProgressListener(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.listener.PublisherListener;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.crt.CRT;
import software.amazon.awssdk.crt.http.HttpHeader;
import software.amazon.awssdk.crt.s3.S3FinishedResponseContext;
import software.amazon.awssdk.crt.s3.S3MetaRequest;
import software.amazon.awssdk.crt.s3.S3MetaRequestProgress;
import software.amazon.awssdk.crt.s3.S3MetaRequestResponseHandler;
import software.amazon.awssdk.http.SdkCancellationException;
import software.amazon.awssdk.http.SdkHttpResponse;
Expand All @@ -46,9 +48,14 @@ public final class S3CrtResponseHandlerAdapter implements S3MetaRequestResponseH
private final SdkHttpResponse.Builder respBuilder = SdkHttpResponse.builder();
private volatile S3MetaRequest metaRequest;

public S3CrtResponseHandlerAdapter(CompletableFuture<Void> executeFuture, SdkAsyncHttpResponseHandler responseHandler) {
private final PublisherListener<S3MetaRequestProgress> progressListener;

public S3CrtResponseHandlerAdapter(CompletableFuture<Void> executeFuture,
SdkAsyncHttpResponseHandler responseHandler,
PublisherListener<S3MetaRequestProgress> progressListener) {
this.resultFuture = executeFuture;
this.responseHandler = responseHandler;
this.progressListener = S3CrtProgressListener.builder().delegateListener(progressListener).build();
}

@Override
Expand Down Expand Up @@ -103,7 +110,7 @@ private void onSuccessfulResponseComplete() {
failResponseHandlerAndFuture(failure);
return;
}

this.progressListener.subscriberOnComplete();
completeFutureAndCloseRequest();
});
}
Expand Down Expand Up @@ -145,6 +152,7 @@ private void onErrorResponseComplete(byte[] errorPayload) {
}

private void failResponseHandlerAndFuture(Throwable exception) {
this.progressListener.subscriberOnError(exception);
resultFuture.completeExceptionally(exception);
runAndLogError(log.logger(), "Exception thrown in SdkAsyncHttpResponseHandler#onError, ignoring",
() -> responseHandler.onError(exception));
Expand All @@ -159,4 +167,9 @@ private static boolean isErrorResponse(int responseStatus) {
public void metaRequest(S3MetaRequest s3MetaRequest) {
metaRequest = s3MetaRequest;
}

@Override
public void onProgress(S3MetaRequestProgress progress) {
this.progressListener.subscriberOnNext(progress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

package software.amazon.awssdk.services.s3.internal.crt;

import java.nio.file.Path;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.async.listener.PublisherListener;
import software.amazon.awssdk.core.interceptor.trait.HttpChecksum;
import software.amazon.awssdk.crt.s3.ResumeToken;
import software.amazon.awssdk.http.SdkHttpExecutionAttribute;
Expand All @@ -41,6 +43,13 @@ public final class S3InternalSdkHttpExecutionAttribute<T> extends SdkHttpExecuti
public static final S3InternalSdkHttpExecutionAttribute<Region> SIGNING_REGION =
new S3InternalSdkHttpExecutionAttribute<>(Region.class);

public static final S3InternalSdkHttpExecutionAttribute<Path> SOURCE_REQ_PATH =
new S3InternalSdkHttpExecutionAttribute<>(Path.class);


public static final S3InternalSdkHttpExecutionAttribute<PublisherListener> CRT_PROGRESS_LISTENER =
new S3InternalSdkHttpExecutionAttribute<>(PublisherListener.class);

private S3InternalSdkHttpExecutionAttribute(Class<T> valueClass) {
super(valueClass);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ public void setup() {
future = new CompletableFuture<>();
sdkResponseHandler = new TestResponseHandler();
responseHandlerAdapter = new S3CrtResponseHandlerAdapter(future,
sdkResponseHandler);

sdkResponseHandler,
null);
responseHandlerAdapter.metaRequest(s3MetaRequest);
}

Expand Down

0 comments on commit 63065aa

Please sign in to comment.