Skip to content

Commit

Permalink
feat(storage): Add support for S3 acceleration mode (#2304)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdhuka authored Mar 8, 2023
1 parent 48e294b commit 86dfdbe
Show file tree
Hide file tree
Showing 45 changed files with 787 additions and 363 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@
import com.amplifyframework.api.ApiException.ApiAuthException;
import com.amplifyframework.api.aws.sigv4.AWS4Signer;

import aws.smithy.kotlin.runtime.net.Url;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import aws.smithy.kotlin.runtime.auth.awscredentials.CredentialsProvider;
import aws.smithy.kotlin.runtime.http.DeferredHeaders;
import aws.smithy.kotlin.runtime.http.Headers;
import aws.smithy.kotlin.runtime.http.HttpMethod;
import aws.smithy.kotlin.runtime.http.Url;
import aws.smithy.kotlin.runtime.http.content.ByteArrayContent;
import aws.smithy.kotlin.runtime.http.request.HttpRequest;
import aws.smithy.kotlin.runtime.http.request.HttpRequestKt;
import okhttp3.MediaType;
import okhttp3.RequestBody;
import okio.Buffer;
Expand Down Expand Up @@ -79,7 +82,7 @@ public final okhttp3.Request decorate(okhttp3.Request req) throws ApiAuthExcepti
return null;
});

HttpRequest req2 = new HttpRequest(method, url, headers, body2);
HttpRequest req2 = HttpRequestKt.HttpRequest(method, url, headers, body2, DeferredHeaders.Companion.getEmpty());

HttpRequest request = v4Signer.signBlocking(req2, credentialsProvider, serviceName).getOutput();

Expand Down Expand Up @@ -124,8 +127,8 @@ private byte[] getBytes(RequestBody body) throws ApiAuthException {
return output.toByteArray();
} catch (IOException exception) {
throw new ApiAuthException("Unable to calculate SigV4 signature for the request",
exception,
"Check your application logs for details.");
exception,
"Check your application logs for details.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package com.amplifyframework.auth.cognito

import aws.sdk.kotlin.services.cognitoidentity.CognitoIdentityClient
import aws.sdk.kotlin.services.cognitoidentityprovider.CognitoIdentityProviderClient
import aws.smithy.kotlin.runtime.http.endpoints.Endpoint
import aws.smithy.kotlin.runtime.client.endpoints.Endpoint
import aws.smithy.kotlin.runtime.client.endpoints.EndpointProvider
import com.amplifyframework.statemachine.codegen.data.AuthConfiguration

interface AWSCognitoAuthService {
Expand All @@ -30,8 +31,8 @@ interface AWSCognitoAuthService {

CognitoIdentityProviderClient {
this.region = it.region
this.endpointResolver = it.endpoint?.let { endpoint ->
AWSEndpointResolver(Endpoint(endpoint))
this.endpointProvider = it.endpoint?.let { endpoint ->
EndpointProvider { Endpoint(endpoint) }
}
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package com.amplifyframework.predictions.aws.service
import androidx.annotation.WorkerThread
import aws.sdk.kotlin.services.polly.PollyClient
import aws.sdk.kotlin.services.polly.model.SynthesizeSpeechRequest
import aws.sdk.kotlin.services.polly.presigners.PollyPresignConfig
import aws.sdk.kotlin.services.polly.presigners.presign
import java.net.URL
import kotlin.time.Duration.Companion.seconds
Expand Down Expand Up @@ -51,12 +50,10 @@ class AmazonPollyPresigningClient(pollyClient: PollyClient) : PollyClient by pol
options: PresignedSynthesizeSpeechUrlOptions
): URL {
val presignCredentialsProvider = options.credentialsProvider ?: this.config.credentialsProvider
val presignConfig = PollyPresignConfig {
region = this@AmazonPollyPresigningClient.config.region
credentialsProvider = presignCredentialsProvider
}
val configBuilder = this@AmazonPollyPresigningClient.config.toBuilder()
configBuilder.credentialsProvider = presignCredentialsProvider
val presignedRequest = runBlocking {
synthesizeSpeechRequest.presign(presignConfig, options.expires.seconds)
synthesizeSpeechRequest.presign(configBuilder.build(), options.expires.seconds)
}
return URL(presignedRequest.url.toString())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.amplifyframework.storage.operation.StorageDownloadFileOperation;
import com.amplifyframework.storage.options.StorageDownloadFileOptions;
import com.amplifyframework.storage.options.StorageUploadFileOptions;
import com.amplifyframework.storage.s3.options.AWSS3StorageDownloadFileOptions;
import com.amplifyframework.storage.s3.test.R;
import com.amplifyframework.storage.s3.util.WorkmanagerTestUtils;
import com.amplifyframework.testutils.FileAssert;
Expand Down Expand Up @@ -96,8 +97,8 @@ public static void setUpOnce() throws Exception {
// Upload to PUBLIC for consistency
String key;
StorageUploadFileOptions uploadOptions = StorageUploadFileOptions.builder()
.accessLevel(TESTING_ACCESS_LEVEL)
.build();
.accessLevel(TESTING_ACCESS_LEVEL)
.build();

// Upload large test file
largeFile = new RandomTempFile(LARGE_FILE_NAME, LARGE_FILE_SIZE);
Expand All @@ -119,8 +120,8 @@ public static void setUpOnce() throws Exception {
public void setUp() throws Exception {
// Always interact with PUBLIC access for consistency
options = StorageDownloadFileOptions.builder()
.accessLevel(TESTING_ACCESS_LEVEL)
.build();
.accessLevel(TESTING_ACCESS_LEVEL)
.build();

// Create a set to remember all the subscriptions
subscriptions = new HashSet<>();
Expand Down Expand Up @@ -273,7 +274,7 @@ public void testGetTransferOnPause() throws Exception {
final AtomicReference<StorageDownloadFileOperation<?>> opContainer = new AtomicReference<>();
final AtomicReference<String> transferId = new AtomicReference<>();
final AtomicReference<Throwable> errorContainer = new AtomicReference<>();
// Listen to Hub events to resume when operation has been paused
// Listen to Hub events to resume when operation has been paused
SubscriptionToken resumeToken = Amplify.Hub.subscribe(HubChannel.STORAGE, hubEvent -> {
if (StorageChannelEventName.DOWNLOAD_STATE.toString().equals(hubEvent.getName())) {
HubEvent<String> stateEvent = (HubEvent<String>) hubEvent;
Expand Down Expand Up @@ -317,4 +318,16 @@ public void testGetTransferOnPause() throws Exception {
assertNull(errorContainer.get());
FileAssert.assertEquals(largeFile, downloadFile);
}

/**
* Tests download fails due to acceleration mode disabled.
*
* @throws Exception download fails because acceleration is not enabled on test bucket.
*/
@Test
public void testDownloadLargeFileWithAccelerationEnabled() throws Exception {
AWSS3StorageDownloadFileOptions awsS3Options =
AWSS3StorageDownloadFileOptions.builder().setUseAccelerateEndpoint(true).build();
synchronousStorage.downloadFile(LARGE_FILE_NAME, downloadFile, awsS3Options, EXTENDED_TIMEOUT_MS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.amplifyframework.storage.operation.StorageUploadInputStreamOperation;
import com.amplifyframework.storage.options.StorageUploadFileOptions;
import com.amplifyframework.storage.options.StorageUploadInputStreamOptions;
import com.amplifyframework.storage.s3.options.AWSS3StorageUploadFileOptions;
import com.amplifyframework.storage.s3.test.R;
import com.amplifyframework.storage.s3.util.WorkmanagerTestUtils;
import com.amplifyframework.testutils.random.RandomTempFile;
Expand Down Expand Up @@ -74,6 +75,7 @@ public final class AWSS3StorageUploadTest {

/**
* Initialize mobile client and configure the storage.
*
* @throws Exception if mobile client initialization fails
*/
@BeforeClass
Expand All @@ -95,8 +97,8 @@ public static void setUpOnce() throws Exception {
public void setUp() {
// Always interact with PUBLIC access for consistency
options = StorageUploadFileOptions.builder()
.accessLevel(TESTING_ACCESS_LEVEL)
.build();
.accessLevel(TESTING_ACCESS_LEVEL)
.build();

// Create a set to remember all the subscriptions
subscriptions = new HashSet<>();
Expand Down Expand Up @@ -132,7 +134,7 @@ public void testUploadSmallFile() throws Exception {
*/
@Test
public void testUploadSmallFileStream() throws Exception {
File uploadFile = new RandomTempFile(4 * 1024 * 1024);
File uploadFile = new RandomTempFile(SMALL_FILE_SIZE);
String fileName = uploadFile.getName();
StorageUploadInputStreamOptions options = StorageUploadInputStreamOptions.builder()
.accessLevel(TESTING_ACCESS_LEVEL)
Expand All @@ -149,6 +151,8 @@ public void testUploadSmallFileStream() throws Exception {
public void testUploadLargeFile() throws Exception {
File uploadFile = new RandomTempFile(LARGE_FILE_SIZE);
String fileName = uploadFile.getName();
AWSS3StorageUploadFileOptions options =
AWSS3StorageUploadFileOptions.builder().setUseAccelerateEndpoint(true).build();
synchronousStorage.uploadFile(fileName, uploadFile, options, EXTENDED_TIMEOUT_MS);
}

Expand All @@ -157,7 +161,7 @@ public void testUploadLargeFile() throws Exception {
* transfer hasn't completed yet.
*
* @throws Exception if upload is not canceled successfully
* before timeout
* before timeout
*/
@SuppressWarnings("unchecked")
@Test
Expand Down Expand Up @@ -206,7 +210,7 @@ public void testUploadFileIsCancelable() throws Exception {
* while the transfer hasn't completed yet.
*
* @throws Exception if upload is not paused, resumed, and
* completed successfully before timeout
* completed successfully before timeout
*/
@SuppressWarnings("unchecked")
@Test
Expand Down Expand Up @@ -258,7 +262,7 @@ public void testUploadFileIsResumable() throws Exception {
* using getTransfer API.
*
* @throws Exception if upload is not paused, resumed, and
* completed successfully before timeout
* completed successfully before timeout
*/
@SuppressWarnings("unchecked")
@Test
Expand Down Expand Up @@ -303,7 +307,8 @@ public void testUploadFileGetTransferOnPause() throws Exception {
opContainer.get().pause();
}
},
result -> { },
result -> {
},
errorContainer::set
);
opContainer.set(op);
Expand All @@ -320,7 +325,7 @@ public void testUploadFileGetTransferOnPause() throws Exception {
* using getTransfer API.
*
* @throws Exception if upload is not paused, resumed, and
* completed successfully before timeout
* completed successfully before timeout
*/
@SuppressWarnings("unchecked")
@Test
Expand Down Expand Up @@ -367,7 +372,8 @@ public void testUploadInputStreamGetTransferOnPause() throws Exception {
opContainer.get().pause();
}
},
result -> { },
result -> {
},
errorContainer::set
);
opContainer.set(op);
Expand All @@ -378,4 +384,35 @@ public void testUploadInputStreamGetTransferOnPause() throws Exception {
assertTrue(completed.await(EXTENDED_TIMEOUT_MS, TimeUnit.MILLISECONDS));
assertNull(errorContainer.get());
}

/**
* Tests that small file (single-part) uploads successfully.
*
* @throws Exception if upload fails
*/
@Test
public void testUploadSmallFileWithAccelerationEnabled() throws Exception {
File uploadFile = new RandomTempFile(SMALL_FILE_SIZE);
String fileName = uploadFile.getName();
AWSS3StorageUploadFileOptions awss3StorageUploadFileOptions =
AWSS3StorageUploadFileOptions.builder().setUseAccelerateEndpoint(true).build();
synchronousStorage.uploadFile(fileName, uploadFile,
awss3StorageUploadFileOptions);
}

/**
* Tests that large file (single-part) uploads successfully.
*
* @throws Exception if upload fails
*/
@Test
public void testUploadLargeFileWithAccelerationEnabled() throws Exception {
File uploadFile = new RandomTempFile(LARGE_FILE_SIZE);
String fileName = uploadFile.getName();
AWSS3StorageUploadFileOptions awss3StorageUploadFileOptions =
AWSS3StorageUploadFileOptions.builder().setUseAccelerateEndpoint(true).build();
synchronousStorage.uploadFile(fileName, uploadFile,
awss3StorageUploadFileOptions);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,8 @@ open class TransferDBTest {
ongoingRestore = false,
restoreExpirationTime = restoreExpirationTime
),
null
null,
useAccelerateEndpoint = false
)
val uri = transferDB.bulkInsertTransferRecords(contentValues)
transferDB.getTransferRecordById(uri).run {
Expand All @@ -144,7 +145,8 @@ open class TransferDBTest {
1L,
0,
null,
null
null,
false
)
contentValues[1] = transferDB.generateContentValuesForMultiPartUpload(
key,
Expand All @@ -157,7 +159,8 @@ open class TransferDBTest {
1L,
0,
null,
null
null,
false
)
contentValues[2] = transferDB.generateContentValuesForMultiPartUpload(
key,
Expand All @@ -170,7 +173,8 @@ open class TransferDBTest {
1L,
1,
null,
null
null,
false
)
val bulkInsertUri = transferDB.bulkInsertTransferRecords(contentValues)
transferDB.getTransferRecordById(bulkInsertUri)
Expand Down
Loading

0 comments on commit 86dfdbe

Please sign in to comment.