From 55b6398cf035388e6732fa1e87edc4e03522ad9d Mon Sep 17 00:00:00 2001 From: Richard Smith Date: Tue, 25 Oct 2022 11:34:30 -0700 Subject: [PATCH 1/3] Add S3AsyncClient support --- .../PayloadStorageConfiguration.java | 50 +++++ .../payloadoffloading/PayloadStoreAsync.java | 78 ++++++++ .../amazon/payloadoffloading/S3AsyncDao.java | 118 ++++++++++++ .../S3BackedPayloadStoreAsync.java | 77 ++++++++ .../amazon/payloadoffloading/Util.java | 9 + .../PayloadStorageConfigurationTest.java | 42 +++++ .../payloadoffloading/S3AsyncDaoTest.java | 117 ++++++++++++ .../S3BackedPayloadStoreAsyncTest.java | 176 ++++++++++++++++++ 8 files changed, 667 insertions(+) create mode 100644 src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java create mode 100644 src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java create mode 100644 src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java create mode 100644 src/test/java/software/amazon/payloadoffloading/S3AsyncDaoTest.java create mode 100644 src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java b/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java index d57bcc2..1c16740 100644 --- a/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java +++ b/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java @@ -4,6 +4,7 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.annotations.NotThreadSafe; import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; @@ -35,6 +36,7 @@ public class PayloadStorageConfiguration { private static final Logger LOG = LoggerFactory.getLogger(PayloadStorageConfiguration.class); private S3Client s3; + private S3AsyncClient s3Async; private String s3BucketName; private int payloadSizeThreshold = 0; private boolean alwaysThroughS3 = false; @@ -57,6 +59,7 @@ public PayloadStorageConfiguration() { public PayloadStorageConfiguration(PayloadStorageConfiguration other) { this.s3 = other.getS3Client(); + this.s3Async = other.getS3AsyncClient(); this.s3BucketName = other.getS3BucketName(); this.payloadSupport = other.isPayloadSupportEnabled(); this.alwaysThroughS3 = other.isAlwaysThroughS3(); @@ -82,6 +85,7 @@ public void setPayloadSupportEnabled(S3Client s3, String s3BucketName) { LOG.warn("Payload support is already enabled. Overwriting AmazonS3Client and S3BucketName."); } this.s3 = s3; + this.s3Async = null; this.s3BucketName = s3BucketName; this.payloadSupport = true; LOG.info("Payload support enabled."); @@ -100,11 +104,48 @@ public PayloadStorageConfiguration withPayloadSupportEnabled(S3Client s3, String return this; } + /** + * Enables support for payloads using asynchronous storage. + * + * @param s3Async Amazon S3 client which is going to be used for storing payload. + * @param s3BucketName Name of the bucket which is going to be used for storing payload. + * The bucket must be already created and configured in s3. + */ + public void setPayloadSupportEnabled(S3AsyncClient s3Async, String s3BucketName) { + if (s3Async == null || s3BucketName == null) { + String errorMessage = "S3 client and/or S3 bucket name cannot be null."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + if (isPayloadSupportEnabled()) { + LOG.warn("Payload support is already enabled. Overwriting AmazonS3Client and S3BucketName."); + } + this.s3 = null; + this.s3Async = s3Async; + this.s3BucketName = s3BucketName; + this.payloadSupport = true; + LOG.info("Payload support enabled."); + } + + /** + * Enables support for payload. + * + * @param s3Async Amazon S3 client which is going to be used for storing payload. + * @param s3BucketName Name of the bucket which is going to be used for storing payloads. + * The bucket must be already created and configured in s3. + * @return the updated PayloadStorageConfiguration object. + */ + public PayloadStorageConfiguration withPayloadSupportEnabled(S3AsyncClient s3Async, String s3BucketName) { + setPayloadSupportEnabled(s3Async, s3BucketName); + return this; + } + /** * Disables support for payloads. */ public void setPayloadSupportDisabled() { s3 = null; + s3Async = null; s3BucketName = null; payloadSupport = false; LOG.info("Payload support disabled."); @@ -138,6 +179,15 @@ public S3Client getS3Client() { return s3; } + /** + * Gets the Amazon S3 async client which is being used for storing payloads. + * + * @return Reference to the Amazon S3 async client which is being used. + */ + public S3AsyncClient getS3AsyncClient() { + return s3Async; + } + /** * Gets the name of the S3 bucket which is being used for storing payload. * diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java b/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java new file mode 100644 index 0000000..43590b4 --- /dev/null +++ b/src/main/java/software/amazon/payloadoffloading/PayloadStoreAsync.java @@ -0,0 +1,78 @@ +package software.amazon.payloadoffloading; + +import java.util.concurrent.CompletableFuture; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.s3.model.S3Exception; + +/** + * An AWS storage service that supports saving high payload sizes. + */ +public interface PayloadStoreAsync { + + /** + * Stores payload in a store that has higher payload size limit than that is supported by original payload store. + *

+ * This call is asynchronous, and so documented return values and exceptions are propagated through + * the returned {@link CompletableFuture}. + * + * @param payload + * @return future value of a pointer that must be used to retrieve the original payload later. + * @throws SdkClientException If any internal errors are encountered on the client side while + * attempting to make the request or handle the response. For example + * if a network connection is not available. + * @throws S3Exception If an error response is returned by actual PayloadStore indicating + * either a problem with the data in the request, or a server side issue. + */ + CompletableFuture storeOriginalPayload(String payload); + + /** + * Stores payload in a store that has higher payload size limit than that is supported by original payload store. + *

+ * This call is asynchronous, and so documented return values and exceptions are propagated through + * the returned {@link CompletableFuture}. + * + * @param payload + * @param s3Key + * @return future value of a pointer that must be used to retrieve the original payload later. + * @throws SdkClientException If any internal errors are encountered on the client side while + * attempting to make the request or handle the response. For example + * if a network connection is not available. + * @throws S3Exception If an error response is returned by actual PayloadStore indicating + * either a problem with the data in the request, or a server side issue. + */ + CompletableFuture storeOriginalPayload(String payload, String s3Key); + + /** + * Retrieves the original payload using the given payloadPointer. The pointer must + * have been obtained using {@link #storeOriginalPayload(String)} + *

+ * This call is asynchronous, and so documented return values and exceptions are propagated through + * the returned {@link CompletableFuture}. + * + * @param payloadPointer + * @return future value of the original payload + * @throws SdkClientException If any internal errors are encountered on the client side while + * attempting to make the request or handle the response. For example + * if payloadPointer is invalid or a network connection is not available. + * @throws S3Exception If an error response is returned by actual PayloadStore indicating + * a server side issue. + */ + CompletableFuture getOriginalPayload(String payloadPointer); + + /** + * Deletes the original payload using the given payloadPointer. The pointer must + * have been obtained using {@link #storeOriginalPayload(String)} + *

+ * This call is asynchronous, and so documented return values and exceptions are propagated through + * the returned {@link CompletableFuture}. + * + * @param payloadPointer + * @return future value that completes when the delete operation finishes + * @throws SdkClientException If any internal errors are encountered on the client side while + * attempting to make the request or handle the response to/from PayloadStore. + * For example, if payloadPointer is invalid or a network connection is not available. + * @throws S3Exception If an error response is returned by actual PayloadStore indicating + * a server side issue. + */ + CompletableFuture deleteOriginalPayload(String payloadPointer); +} diff --git a/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java b/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java new file mode 100644 index 0000000..a0dc868 --- /dev/null +++ b/src/main/java/software/amazon/payloadoffloading/S3AsyncDao.java @@ -0,0 +1,118 @@ +package software.amazon.payloadoffloading; + +import java.io.UncheckedIOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; + +/** + * Dao layer to access S3. + */ +public class S3AsyncDao { + private static final Logger LOG = LoggerFactory.getLogger(S3AsyncDao.class); + private final S3AsyncClient s3Client; + private final ServerSideEncryptionStrategy serverSideEncryptionStrategy; + private final ObjectCannedACL objectCannedACL; + + public S3AsyncDao(S3AsyncClient s3Client) { + this(s3Client, null, null); + } + + public S3AsyncDao( + S3AsyncClient s3Client, + ServerSideEncryptionStrategy serverSideEncryptionStrategy, + ObjectCannedACL objectCannedACL) { + this.s3Client = s3Client; + this.serverSideEncryptionStrategy = serverSideEncryptionStrategy; + this.objectCannedACL = objectCannedACL; + } + + public CompletableFuture getTextFromS3(String s3BucketName, String s3Key) { + GetObjectRequest getObjectRequest = GetObjectRequest.builder() + .bucket(s3BucketName) + .key(s3Key) + .build(); + + return s3Client.getObject(getObjectRequest, AsyncResponseTransformer.toBytes()) + .thenApply(ResponseBytes::asUtf8String) + .handle((v, tIn) -> { + if (tIn != null) { + Throwable t = Util.unwrapFutureException(tIn); + if (t instanceof SdkException) { + String errorMessage = "Failed to get the S3 object which contains the payload."; + LOG.error(errorMessage, t); + throw SdkException.create(errorMessage, t); + } + if (t instanceof UncheckedIOException) { + String errorMessage = "Failure when handling the message which was read from S3 object."; + LOG.error(errorMessage, t); + throw SdkClientException.create(errorMessage, t); + } + throw new CompletionException(t); + } + return v; + }); + } + + public CompletableFuture storeTextInS3(String s3BucketName, String s3Key, String payloadContentStr) { + PutObjectRequest.Builder putObjectRequestBuilder = PutObjectRequest.builder() + .bucket(s3BucketName) + .key(s3Key); + + if (objectCannedACL != null) { + putObjectRequestBuilder.acl(objectCannedACL); + } + + // https://docs.aws.amazon.com/AmazonS3/latest/dev/kms-using-sdks.html + if (serverSideEncryptionStrategy != null) { + serverSideEncryptionStrategy.decorate(putObjectRequestBuilder); + } + + return s3Client.putObject(putObjectRequestBuilder.build(), AsyncRequestBody.fromString(payloadContentStr)) + .handle((v, tIn) -> { + if (tIn != null) { + Throwable t = Util.unwrapFutureException(tIn); + if (t instanceof SdkException) { + String errorMessage = "Failed to store the message content in an S3 object."; + LOG.error(errorMessage, t); + throw SdkException.create(errorMessage, t); + } + throw new CompletionException(t); + } + return null; + }); + } + + public CompletableFuture deletePayloadFromS3(String s3BucketName, String s3Key) { + DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder() + .bucket(s3BucketName) + .key(s3Key) + .build(); + return s3Client.deleteObject(deleteObjectRequest) + .handle((v, tIn) -> { + if (tIn != null) { + Throwable t = Util.unwrapFutureException(tIn); + if (t instanceof SdkException) { + String errorMessage = "Failed to delete the S3 object which contains the payload"; + LOG.error(errorMessage, t); + throw SdkException.create(errorMessage, t); + } + throw new CompletionException(t); + } + + LOG.info("S3 object deleted, Bucket name: " + s3BucketName + ", Object key: " + s3Key + "."); + return null; + }); + } +} diff --git a/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java new file mode 100644 index 0000000..5b84ede --- /dev/null +++ b/src/main/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsync.java @@ -0,0 +1,77 @@ +package software.amazon.payloadoffloading; + +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.payloadoffloading.PayloadS3Pointer; + +/** + * S3 based implementation for PayloadStoreAsync. + */ +public class S3BackedPayloadStoreAsync implements PayloadStoreAsync { + private static final Logger LOG = LoggerFactory.getLogger(S3BackedPayloadStoreAsync.class); + + private final String s3BucketName; + private final S3AsyncDao s3Dao; + + public S3BackedPayloadStoreAsync(S3AsyncDao s3Dao, String s3BucketName) { + this.s3BucketName = s3BucketName; + this.s3Dao = s3Dao; + } + + @Override + public CompletableFuture storeOriginalPayload(String payload) { + String s3Key = UUID.randomUUID().toString(); + return storeOriginalPayload(payload, s3Key); + } + + @Override + public CompletableFuture storeOriginalPayload(String payload, String s3Key) { + return s3Dao.storeTextInS3(s3BucketName, s3Key, payload) + .thenApply(v -> { + LOG.info("S3 object created, Bucket name: " + s3BucketName + ", Object key: " + s3Key + "."); + + // Convert S3 pointer (bucket name, key, etc) to JSON string + PayloadS3Pointer s3Pointer = new PayloadS3Pointer(s3BucketName, s3Key); + + return s3Pointer.toJson(); + }); + } + + @Override + public CompletableFuture getOriginalPayload(String payloadPointer) { + try { + PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(payloadPointer); + + String s3BucketName = s3Pointer.getS3BucketName(); + String s3Key = s3Pointer.getS3Key(); + + return s3Dao.getTextFromS3(s3BucketName, s3Key) + .thenApply(originalPayload -> { + LOG.info("S3 object read, Bucket name: " + s3BucketName + ", Object key: " + s3Key + "."); + return originalPayload; + }); + } catch (Exception e) { + CompletableFuture futureEx = new CompletableFuture<>(); + futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e)); + return futureEx; + } + } + + @Override + public CompletableFuture deleteOriginalPayload(String payloadPointer) { + try { + PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(payloadPointer); + + String s3BucketName = s3Pointer.getS3BucketName(); + String s3Key = s3Pointer.getS3Key(); + return s3Dao.deletePayloadFromS3(s3BucketName, s3Key); + } catch (Exception e) { + CompletableFuture futureEx = new CompletableFuture<>(); + futureEx.completeExceptionally((e instanceof RuntimeException) ? e : new CompletionException(e)); + return futureEx; + } + } +} diff --git a/src/main/java/software/amazon/payloadoffloading/Util.java b/src/main/java/software/amazon/payloadoffloading/Util.java index bd3932f..28e3966 100644 --- a/src/main/java/software/amazon/payloadoffloading/Util.java +++ b/src/main/java/software/amazon/payloadoffloading/Util.java @@ -1,5 +1,6 @@ package software.amazon.payloadoffloading; +import java.util.concurrent.CompletionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.core.exception.SdkClientException; @@ -34,4 +35,12 @@ public static long getStringSizeInBytes(String str) { public static String getUserAgentHeader(String clientName) { return clientName + "/" + VersionInfo.SDK_VERSION; } + + public static Throwable unwrapFutureException(Throwable t) { + if ((t instanceof CompletionException) && t.getCause() != null) { + t = t.getCause(); + } + return t; + } + } diff --git a/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java b/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java index e9b473a..6d0c51f 100644 --- a/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java +++ b/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java @@ -1,6 +1,7 @@ package software.amazon.payloadoffloading; import org.junit.Test; +import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; @@ -43,6 +44,33 @@ public void testCopyConstructor() { assertNotSame(newPayloadStorageConfiguration, payloadStorageConfiguration); } + @Test + public void testCopyConstructorForAsync() { + S3AsyncClient s3Async = mock(S3AsyncClient.class); + + boolean alwaysThroughS3 = true; + int payloadSizeThreshold = 500; + + PayloadStorageConfiguration payloadStorageConfiguration = new PayloadStorageConfiguration(); + + payloadStorageConfiguration.withPayloadSupportEnabled(s3Async, s3BucketName) + .withAlwaysThroughS3(alwaysThroughS3) + .withPayloadSizeThreshold(payloadSizeThreshold) + .withServerSideEncryption(SERVER_SIDE_ENCRYPTION_STRATEGY) + .withObjectCannedACL(objectCannelACL); + + PayloadStorageConfiguration newPayloadStorageConfiguration = new PayloadStorageConfiguration(payloadStorageConfiguration); + + assertEquals(s3Async, newPayloadStorageConfiguration.getS3AsyncClient()); + assertEquals(s3BucketName, newPayloadStorageConfiguration.getS3BucketName()); + assertEquals(SERVER_SIDE_ENCRYPTION_STRATEGY, newPayloadStorageConfiguration.getServerSideEncryptionStrategy()); + assertTrue(newPayloadStorageConfiguration.isPayloadSupportEnabled()); + assertEquals(objectCannelACL, newPayloadStorageConfiguration.getObjectCannedACL()); + assertEquals(alwaysThroughS3, newPayloadStorageConfiguration.isAlwaysThroughS3()); + assertEquals(payloadSizeThreshold, newPayloadStorageConfiguration.getPayloadSizeThreshold()); + assertNotSame(newPayloadStorageConfiguration, payloadStorageConfiguration); + } + @Test public void testPayloadSupportEnabled() { S3Client s3 = mock(S3Client.class); @@ -51,6 +79,19 @@ public void testPayloadSupportEnabled() { assertTrue(payloadStorageConfiguration.isPayloadSupportEnabled()); assertNotNull(payloadStorageConfiguration.getS3Client()); + assertNull(payloadStorageConfiguration.getS3AsyncClient()); + assertEquals(s3BucketName, payloadStorageConfiguration.getS3BucketName()); + } + + @Test + public void testPayloadSupportEnabledForAsync() { + S3AsyncClient s3Async = mock(S3AsyncClient.class); + PayloadStorageConfiguration payloadStorageConfiguration = new PayloadStorageConfiguration(); + payloadStorageConfiguration.setPayloadSupportEnabled(s3Async, s3BucketName); + + assertTrue(payloadStorageConfiguration.isPayloadSupportEnabled()); + assertNull(payloadStorageConfiguration.getS3Client()); + assertNotNull(payloadStorageConfiguration.getS3AsyncClient()); assertEquals(s3BucketName, payloadStorageConfiguration.getS3BucketName()); } @@ -60,6 +101,7 @@ public void testDisablePayloadSupport() { payloadStorageConfiguration.setPayloadSupportDisabled(); assertNull(payloadStorageConfiguration.getS3Client()); + assertNull(payloadStorageConfiguration.getS3AsyncClient()); assertNull(payloadStorageConfiguration.getS3BucketName()); } diff --git a/src/test/java/software/amazon/payloadoffloading/S3AsyncDaoTest.java b/src/test/java/software/amazon/payloadoffloading/S3AsyncDaoTest.java new file mode 100644 index 0000000..4f926e4 --- /dev/null +++ b/src/test/java/software/amazon/payloadoffloading/S3AsyncDaoTest.java @@ -0,0 +1,117 @@ +package software.amazon.payloadoffloading; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CompletableFuture; +import junitparams.JUnitParamsRunner; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.core.ResponseBytes; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.core.async.AsyncResponseTransformer; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.services.s3.model.ServerSideEncryption; + +@RunWith(JUnitParamsRunner.class) +public class S3AsyncDaoTest { + + private static String s3ServerSideEncryptionKMSKeyId = "test-customer-managed-kms-key-id"; + private static final String S3_BUCKET_NAME = "test-bucket-name"; + private static final String ANY_PAYLOAD = "AnyPayload"; + private static final String ANY_S3_KEY = "AnyS3key"; + private ServerSideEncryptionStrategy serverSideEncryptionStrategy = ServerSideEncryptionFactory.awsManagedCmk(); + private ObjectCannedACL objectCannedACL = ObjectCannedACL.PUBLIC_READ; + private S3AsyncClient s3AsyncClient; + private S3AsyncDao dao; + + @Before + public void setup() { + s3AsyncClient = mock(S3AsyncClient.class); + } + + @Test + public void storeTextInS3WithoutSSEOrCannedTest() { + dao = new S3AsyncDao(s3AsyncClient); + when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn( + CompletableFuture.completedFuture(null)); + ArgumentCaptor argument = ArgumentCaptor.forClass(PutObjectRequest.class); + + dao.storeTextInS3(S3_BUCKET_NAME, ANY_S3_KEY, ANY_PAYLOAD).join(); + + verify(s3AsyncClient, times(1)).putObject(argument.capture(), any(AsyncRequestBody.class)); + + assertNull(argument.getValue().serverSideEncryption()); + assertNull(argument.getValue().acl()); + assertEquals(S3_BUCKET_NAME, argument.getValue().bucket()); + } + + @Test + public void storeTextInS3WithSSETest() { + dao = new S3AsyncDao(s3AsyncClient, serverSideEncryptionStrategy, null); + when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn( + CompletableFuture.completedFuture(null)); + ArgumentCaptor argument = ArgumentCaptor.forClass(PutObjectRequest.class); + + dao.storeTextInS3(S3_BUCKET_NAME, ANY_S3_KEY, ANY_PAYLOAD).join(); + + verify(s3AsyncClient, times(1)).putObject(argument.capture(), any(AsyncRequestBody.class)); + + assertEquals(ServerSideEncryption.AWS_KMS, argument.getValue().serverSideEncryption()); + assertNull(argument.getValue().acl()); + assertEquals(S3_BUCKET_NAME, argument.getValue().bucket()); + } + + @Test + public void storeTextInS3WithBothTest() { + dao = new S3AsyncDao(s3AsyncClient, serverSideEncryptionStrategy, objectCannedACL); + when(s3AsyncClient.putObject(any(PutObjectRequest.class), any(AsyncRequestBody.class))).thenReturn( + CompletableFuture.completedFuture(null)); + ArgumentCaptor argument = ArgumentCaptor.forClass(PutObjectRequest.class); + + dao.storeTextInS3(S3_BUCKET_NAME, ANY_S3_KEY, ANY_PAYLOAD).join(); + + verify(s3AsyncClient, times(1)).putObject(argument.capture(), any(AsyncRequestBody.class)); + + assertEquals(ServerSideEncryption.AWS_KMS, argument.getValue().serverSideEncryption()); + assertEquals(objectCannedACL, argument.getValue().acl()); + assertEquals(S3_BUCKET_NAME, argument.getValue().bucket()); + } + + @Test + public void getTextTest() { + dao = new S3AsyncDao(s3AsyncClient); + when(s3AsyncClient.getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class))).thenReturn( + CompletableFuture.completedFuture(ResponseBytes.fromByteArray( + GetObjectRequest.builder().build(), ANY_PAYLOAD.getBytes(StandardCharsets.UTF_8)))); + + String payload = dao.getTextFromS3(S3_BUCKET_NAME, ANY_S3_KEY).join(); + + verify(s3AsyncClient, times(1)).getObject(any(GetObjectRequest.class), any(AsyncResponseTransformer.class)); + + assertEquals(payload, ANY_PAYLOAD); + } + + @Test + public void deleteTextTest() { + dao = new S3AsyncDao(s3AsyncClient); + when(s3AsyncClient.deleteObject(any(DeleteObjectRequest.class))).thenReturn( + CompletableFuture.completedFuture(null)); + + dao.deletePayloadFromS3(S3_BUCKET_NAME, ANY_S3_KEY).join(); + + verify(s3AsyncClient, times(1)).deleteObject(any(DeleteObjectRequest.class)); + } +} diff --git a/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java new file mode 100644 index 0000000..a0a31f2 --- /dev/null +++ b/src/test/java/software/amazon/payloadoffloading/S3BackedPayloadStoreAsyncTest.java @@ -0,0 +1,176 @@ +package software.amazon.payloadoffloading; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import junitparams.JUnitParamsRunner; +import org.hamcrest.Matchers; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; + +@RunWith(JUnitParamsRunner.class) +public class S3BackedPayloadStoreAsyncTest { + private static final String S3_BUCKET_NAME = "test-bucket-name"; + private static final String ANY_PAYLOAD = "AnyPayload"; + private static final String ANY_S3_KEY = "AnyS3key"; + private static final String INCORRECT_POINTER_EXCEPTION_MSG = "Failed to read the S3 object pointer from given string"; + private PayloadStoreAsync payloadStore; + private S3AsyncDao s3AsyncDao; + + @Rule + public final ExpectedException exception = ExpectedException.none(); + + @Before + public void setup() { + s3AsyncDao = mock(S3AsyncDao.class); + payloadStore = new S3BackedPayloadStoreAsync(s3AsyncDao, S3_BUCKET_NAME); + } + + @Test + public void testStoreOriginalPayloadOnSuccess() { + when(s3AsyncDao.storeTextInS3(any(String.class), any(String.class), any(String.class))).thenReturn( + CompletableFuture.completedFuture(null)); + String actualPayloadPointer = payloadStore.storeOriginalPayload(ANY_PAYLOAD).join(); + + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor sseArgsCaptor = ArgumentCaptor.forClass(ServerSideEncryptionStrategy.class); + ArgumentCaptor cannedArgsCaptor = ArgumentCaptor.forClass(ObjectCannedACL.class); + + verify(s3AsyncDao, times(1)).storeTextInS3(eq(S3_BUCKET_NAME), keyCaptor.capture(), + eq(ANY_PAYLOAD)); + + PayloadS3Pointer expectedPayloadPointer = new PayloadS3Pointer(S3_BUCKET_NAME, keyCaptor.getValue()); + assertEquals(expectedPayloadPointer.toJson(), actualPayloadPointer); + } + + @Test + public void testStoreOriginalPayloadWithS3KeyOnSuccess() { + when(s3AsyncDao.storeTextInS3(any(String.class), any(String.class), any(String.class))).thenReturn( + CompletableFuture.completedFuture(null)); + String actualPayloadPointer = payloadStore.storeOriginalPayload(ANY_PAYLOAD, ANY_S3_KEY).join(); + + verify(s3AsyncDao, times(1)).storeTextInS3(eq(S3_BUCKET_NAME), eq(ANY_S3_KEY), + eq(ANY_PAYLOAD)); + + PayloadS3Pointer expectedPayloadPointer = new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY); + assertEquals(expectedPayloadPointer.toJson(), actualPayloadPointer); + } + + @Test + public void testStoreOriginalPayloadDoesAlwaysCreateNewObjects() { + //Store any payload + when(s3AsyncDao.storeTextInS3(any(String.class), any(String.class), any(String.class))).thenReturn( + CompletableFuture.completedFuture(null)); + String anyActualPayloadPointer = payloadStore.storeOriginalPayload(ANY_PAYLOAD).join(); + + //Store any other payload and validate that the pointers are different + String anyOtherActualPayloadPointer = payloadStore.storeOriginalPayload(ANY_PAYLOAD).join(); + + ArgumentCaptor anyOtherKeyCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor sseArgsCaptor = ArgumentCaptor.forClass(ServerSideEncryptionStrategy.class); + ArgumentCaptor cannedArgsCaptor = ArgumentCaptor.forClass(ObjectCannedACL.class); + + verify(s3AsyncDao, times(2)).storeTextInS3(eq(S3_BUCKET_NAME), anyOtherKeyCaptor.capture(), + eq(ANY_PAYLOAD)); + + String anyS3Key = anyOtherKeyCaptor.getAllValues().get(0); + String anyOtherS3Key = anyOtherKeyCaptor.getAllValues().get(1); + + PayloadS3Pointer anyExpectedPayloadPointer = new PayloadS3Pointer(S3_BUCKET_NAME, anyS3Key); + assertEquals(anyExpectedPayloadPointer.toJson(), anyActualPayloadPointer); + + PayloadS3Pointer anyOtherExpectedPayloadPointer = new PayloadS3Pointer(S3_BUCKET_NAME, anyOtherS3Key); + assertEquals(anyOtherExpectedPayloadPointer.toJson(), anyOtherActualPayloadPointer); + + assertThat(anyS3Key, Matchers.not(anyOtherS3Key)); + assertThat(anyActualPayloadPointer, Matchers.not(anyOtherActualPayloadPointer)); + } + + @Test + public void testStoreOriginalPayloadOnS3Failure() { + CompletableFuture sdkEx = new CompletableFuture<>(); + sdkEx.completeExceptionally(SdkException.create("S3 Exception", new Throwable())); + when(s3AsyncDao.storeTextInS3(any(String.class), any(String.class), any(String.class))).thenReturn(sdkEx); + + exception.expect(CompletionException.class); + exception.expectMessage("S3 Exception"); + //Any S3 Dao exception is thrown back as-is to clients + payloadStore.storeOriginalPayload(ANY_PAYLOAD).join(); + } + + @Test + public void testGetOriginalPayloadOnSuccess() { + PayloadS3Pointer anyPointer = new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY); + when(s3AsyncDao.getTextFromS3(any(String.class), any(String.class))).thenReturn( + CompletableFuture.completedFuture(ANY_PAYLOAD)); + String actualPayload = payloadStore.getOriginalPayload(anyPointer.toJson()).join(); + + ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(String.class); + verify(s3AsyncDao, times(1)).getTextFromS3(bucketNameCaptor.capture(), keyCaptor.capture()); + + assertEquals(ANY_S3_KEY, keyCaptor.getValue()); + assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue()); + assertEquals(ANY_PAYLOAD, actualPayload); + } + + @Test + public void testGetOriginalPayloadIncorrectPointer() { + exception.expect(CompletionException.class); + exception.expectMessage(INCORRECT_POINTER_EXCEPTION_MSG); + //Any S3 Dao exception is thrown back as-is to clients + payloadStore.getOriginalPayload("IncorrectPointer").join(); + verifyNoInteractions(s3AsyncDao); + } + + @Test + public void testGetOriginalPayloadOnS3Failure() { + CompletableFuture sdkEx = new CompletableFuture<>(); + sdkEx.completeExceptionally(SdkException.create("S3 Exception", new Throwable())); + when(s3AsyncDao.getTextFromS3(any(String.class), any(String.class))).thenReturn(sdkEx); + exception.expect(CompletionException.class); + exception.expectMessage("S3 Exception"); + //Any S3 Dao exception is thrown back as-is to clients + PayloadS3Pointer anyPointer = new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY); + payloadStore.getOriginalPayload(anyPointer.toJson()).join(); + } + + @Test + public void testDeleteOriginalPayloadOnSuccess() { + when(s3AsyncDao.deletePayloadFromS3(any(), any())).thenReturn(CompletableFuture.completedFuture(null)); + PayloadS3Pointer anyPointer = new PayloadS3Pointer(S3_BUCKET_NAME, ANY_S3_KEY); + payloadStore.deleteOriginalPayload(anyPointer.toJson()).join(); + + ArgumentCaptor bucketNameCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(String.class); + verify(s3AsyncDao, times(1)).deletePayloadFromS3(bucketNameCaptor.capture(), keyCaptor.capture()); + + assertEquals(ANY_S3_KEY, keyCaptor.getValue()); + assertEquals(S3_BUCKET_NAME, bucketNameCaptor.getValue()); + } + + @Test + public void testDeleteOriginalPayloadIncorrectPointer() { + exception.expect(CompletionException.class); + exception.expectMessage(INCORRECT_POINTER_EXCEPTION_MSG); + payloadStore.deleteOriginalPayload("IncorrectPointer").join(); + verifyNoInteractions(s3AsyncDao); + } +} From ea76600e7e8c4ab4885df4d1fed60cbf5f6b4585 Mon Sep 17 00:00:00 2001 From: Richard Smith Date: Tue, 25 Oct 2022 13:07:52 -0700 Subject: [PATCH 2/3] Update version to 2.2.0 to reflect new feature --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 472a930..2e7c60c 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ software.amazon.payloadoffloading payloadoffloading-common - 2.1.2 + 2.2.0 jar Payload offloading common library for AWS Common library between extended Amazon AWS clients to save payloads up to 2GB on Amazon S3. From 3db3044e7e4f369d338259f79a963ea8ece1fda4 Mon Sep 17 00:00:00 2001 From: Richard Smith Date: Tue, 25 Oct 2022 14:22:09 -0700 Subject: [PATCH 3/3] Separate sync and async configuration. --- .../PayloadStorageAsyncConfiguration.java | 153 ++++++++++++++ .../PayloadStorageConfiguration.java | 188 +----------------- .../PayloadStorageConfigurationBase.java | 178 +++++++++++++++++ .../PayloadStorageAsyncConfigurationTest.java | 102 ++++++++++ .../PayloadStorageConfigurationTest.java | 53 +---- 5 files changed, 446 insertions(+), 228 deletions(-) create mode 100644 src/main/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfiguration.java create mode 100644 src/main/java/software/amazon/payloadoffloading/PayloadStorageConfigurationBase.java create mode 100644 src/test/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfigurationTest.java diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfiguration.java b/src/main/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfiguration.java new file mode 100644 index 0000000..3bd8d08 --- /dev/null +++ b/src/main/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfiguration.java @@ -0,0 +1,153 @@ +package software.amazon.payloadoffloading; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.annotations.NotThreadSafe; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; + +/** + *

Amazon payload storage configuration options such as asynchronous Amazon S3 client, + * bucket name, and payload size threshold for payloads.

+ * + *

Server side encryption is optional and can be enabled using with {@link #withServerSideEncryption(ServerSideEncryptionStrategy)} + * or {@link #setServerSideEncryptionStrategy(ServerSideEncryptionStrategy)}

+ * + *

There are two possible options for server side encrption. This can be using a customer managed key or AWS managed CMK.

+ * + * Example usage: + * + *
+ *     withServerSideEncryption(ServerSideEncrptionFactory.awsManagedCmk())
+ * 
+ * + * or + * + *
+ *     withServerSideEncryption(ServerSideEncrptionFactory.customerKey(YOUR_CUSTOMER_ID))
+ * 
+ * + * @see software.amazon.payloadoffloading.ServerSideEncryptionFactory + */ +@NotThreadSafe +public class PayloadStorageAsyncConfiguration extends PayloadStorageConfigurationBase { + private static final Logger LOG = LoggerFactory.getLogger(PayloadStorageAsyncConfiguration.class); + + private S3AsyncClient s3Async; + + public PayloadStorageAsyncConfiguration() { + s3Async = null; + } + + public PayloadStorageAsyncConfiguration(PayloadStorageAsyncConfiguration other) { + super(other); + this.s3Async = other.getS3AsyncClient(); + } + + /** + * Enables support for payloads using asynchronous storage. + * + * @param s3Async Amazon S3 client which is going to be used for storing payload. + * @param s3BucketName Name of the bucket which is going to be used for storing payload. + * The bucket must be already created and configured in s3. + */ + public void setPayloadSupportEnabled(S3AsyncClient s3Async, String s3BucketName) { + if (s3Async == null || s3BucketName == null) { + String errorMessage = "S3 client and/or S3 bucket name cannot be null."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + super.setPayloadSupportEnabled(s3BucketName); + this.s3Async = s3Async; + } + + /** + * Enables support for payload. + * + * @param s3Async Amazon S3 client which is going to be used for storing payload. + * @param s3BucketName Name of the bucket which is going to be used for storing payloads. + * The bucket must be already created and configured in s3. + * @return the updated PayloadStorageAsyncConfiguration object. + */ + public PayloadStorageAsyncConfiguration withPayloadSupportEnabled(S3AsyncClient s3Async, String s3BucketName) { + setPayloadSupportEnabled(s3Async, s3BucketName); + return this; + } + + /** + * Disables support for payloads. + */ + public void setPayloadSupportDisabled() { + super.setPayloadSupportDisabled(); + s3Async = null; + LOG.info("Payload support disabled."); + } + + /** + * Disables support for payload. + * + * @return the updated PayloadStorageAsyncConfiguration object. + */ + public PayloadStorageAsyncConfiguration withPayloadSupportDisabled() { + setPayloadSupportDisabled(); + return this; + } + + /** + * Gets the Amazon S3 async client which is being used for storing payloads. + * + * @return Reference to the Amazon S3 async client which is being used. + */ + public S3AsyncClient getS3AsyncClient() { + return s3Async; + } + + /** + * Sets the payload size threshold for storing payloads in Amazon S3. + * + * @param payloadSizeThreshold Payload size threshold to be used for storing in Amazon S3. + * Default: 256KB. + * @return the updated PayloadStorageAsyncConfiguration object. + */ + public PayloadStorageAsyncConfiguration withPayloadSizeThreshold(int payloadSizeThreshold) { + setPayloadSizeThreshold(payloadSizeThreshold); + return this; + } + + /** + * Sets whether or not all payloads regardless of their size should be stored in Amazon S3. + * + * @param alwaysThroughS3 Whether or not all payloads regardless of their size + * should be stored in Amazon S3. Default: false + * @return the updated PayloadStorageAsyncConfiguration object. + */ + public PayloadStorageAsyncConfiguration withAlwaysThroughS3(boolean alwaysThroughS3) { + setAlwaysThroughS3(alwaysThroughS3); + return this; + } + + /** + * Sets which method of server side encryption should be used, if required. + * + * This is optional, it is set only when you want to configure S3 server side encryption with KMS. + * + * @param serverSideEncryptionStrategy The method of encryption required for S3 server side encryption with KMS. + * @return the updated PayloadStorageAsyncConfiguration object. + */ + public PayloadStorageAsyncConfiguration withServerSideEncryption(ServerSideEncryptionStrategy serverSideEncryptionStrategy) { + setServerSideEncryptionStrategy(serverSideEncryptionStrategy); + return this; + } + + /** + * Configures the ACL to apply to the Amazon S3 putObject request. + * @param objectCannedACL + * The ACL to be used when storing objects in Amazon S3 + */ + public PayloadStorageAsyncConfiguration withObjectCannedACL(ObjectCannedACL objectCannedACL) { + setObjectCannedACL(objectCannedACL); + return this; + } +} diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java b/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java index 1c16740..9ab3c10 100644 --- a/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java +++ b/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfiguration.java @@ -4,12 +4,11 @@ import org.slf4j.LoggerFactory; import software.amazon.awssdk.annotations.NotThreadSafe; import software.amazon.awssdk.core.exception.SdkClientException; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; /** - *

Amazon payload storage configuration options such as Amazon S3 client, + *

Amazon payload storage configuration options such as synchronous Amazon S3 client, * bucket name, and payload size threshold for payloads.

* *

Server side encryption is optional and can be enabled using with {@link #withServerSideEncryption(ServerSideEncryptionStrategy)} @@ -32,40 +31,18 @@ * @see software.amazon.payloadoffloading.ServerSideEncryptionFactory */ @NotThreadSafe -public class PayloadStorageConfiguration { +public class PayloadStorageConfiguration extends PayloadStorageConfigurationBase { private static final Logger LOG = LoggerFactory.getLogger(PayloadStorageConfiguration.class); private S3Client s3; - private S3AsyncClient s3Async; - private String s3BucketName; - private int payloadSizeThreshold = 0; - private boolean alwaysThroughS3 = false; - private boolean payloadSupport = false; - /** - * This field is optional, it is set only when we want to configure S3 Server Side Encryption with KMS. - */ - private ServerSideEncryptionStrategy serverSideEncryptionStrategy; - /** - * This field is optional, it is set only when we want to add access control list to Amazon S3 buckets and objects - */ - private ObjectCannedACL objectCannedACL; public PayloadStorageConfiguration() { s3 = null; - s3BucketName = null; - serverSideEncryptionStrategy = null; - objectCannedACL = null; } public PayloadStorageConfiguration(PayloadStorageConfiguration other) { + super(other); this.s3 = other.getS3Client(); - this.s3Async = other.getS3AsyncClient(); - this.s3BucketName = other.getS3BucketName(); - this.payloadSupport = other.isPayloadSupportEnabled(); - this.alwaysThroughS3 = other.isAlwaysThroughS3(); - this.payloadSizeThreshold = other.getPayloadSizeThreshold(); - this.serverSideEncryptionStrategy = other.getServerSideEncryptionStrategy(); - this.objectCannedACL = other.getObjectCannedACL(); } /** @@ -81,14 +58,8 @@ public void setPayloadSupportEnabled(S3Client s3, String s3BucketName) { LOG.error(errorMessage); throw SdkClientException.create(errorMessage); } - if (isPayloadSupportEnabled()) { - LOG.warn("Payload support is already enabled. Overwriting AmazonS3Client and S3BucketName."); - } + super.setPayloadSupportEnabled(s3BucketName); this.s3 = s3; - this.s3Async = null; - this.s3BucketName = s3BucketName; - this.payloadSupport = true; - LOG.info("Payload support enabled."); } /** @@ -104,51 +75,12 @@ public PayloadStorageConfiguration withPayloadSupportEnabled(S3Client s3, String return this; } - /** - * Enables support for payloads using asynchronous storage. - * - * @param s3Async Amazon S3 client which is going to be used for storing payload. - * @param s3BucketName Name of the bucket which is going to be used for storing payload. - * The bucket must be already created and configured in s3. - */ - public void setPayloadSupportEnabled(S3AsyncClient s3Async, String s3BucketName) { - if (s3Async == null || s3BucketName == null) { - String errorMessage = "S3 client and/or S3 bucket name cannot be null."; - LOG.error(errorMessage); - throw SdkClientException.create(errorMessage); - } - if (isPayloadSupportEnabled()) { - LOG.warn("Payload support is already enabled. Overwriting AmazonS3Client and S3BucketName."); - } - this.s3 = null; - this.s3Async = s3Async; - this.s3BucketName = s3BucketName; - this.payloadSupport = true; - LOG.info("Payload support enabled."); - } - - /** - * Enables support for payload. - * - * @param s3Async Amazon S3 client which is going to be used for storing payload. - * @param s3BucketName Name of the bucket which is going to be used for storing payloads. - * The bucket must be already created and configured in s3. - * @return the updated PayloadStorageConfiguration object. - */ - public PayloadStorageConfiguration withPayloadSupportEnabled(S3AsyncClient s3Async, String s3BucketName) { - setPayloadSupportEnabled(s3Async, s3BucketName); - return this; - } - /** * Disables support for payloads. */ public void setPayloadSupportDisabled() { + super.setPayloadSupportDisabled(); s3 = null; - s3Async = null; - s3BucketName = null; - payloadSupport = false; - LOG.info("Payload support disabled."); } /** @@ -161,15 +93,6 @@ public PayloadStorageConfiguration withPayloadSupportDisabled() { return this; } - /** - * Check if the support for payloads if enabled. - * - * @return true if support for payloads is enabled. - */ - public boolean isPayloadSupportEnabled() { - return payloadSupport; - } - /** * Gets the Amazon S3 client which is being used for storing payloads. * @@ -179,24 +102,6 @@ public S3Client getS3Client() { return s3; } - /** - * Gets the Amazon S3 async client which is being used for storing payloads. - * - * @return Reference to the Amazon S3 async client which is being used. - */ - public S3AsyncClient getS3AsyncClient() { - return s3Async; - } - - /** - * Gets the name of the S3 bucket which is being used for storing payload. - * - * @return The name of the bucket which is being used. - */ - public String getS3BucketName() { - return s3BucketName; - } - /** * Sets the payload size threshold for storing payloads in Amazon S3. * @@ -209,25 +114,6 @@ public PayloadStorageConfiguration withPayloadSizeThreshold(int payloadSizeThres return this; } - /** - * Gets the payload size threshold for storing payloads in Amazon S3. - * - * @return payload size threshold which is being used for storing in Amazon S3. Default: 256KB. - */ - public int getPayloadSizeThreshold() { - return payloadSizeThreshold; - } - - /** - * Sets the payload size threshold for storing payloads in Amazon S3. - * - * @param payloadSizeThreshold Payload size threshold to be used for storing in Amazon S3. - * Default: 256KB. - */ - public void setPayloadSizeThreshold(int payloadSizeThreshold) { - this.payloadSizeThreshold = payloadSizeThreshold; - } - /** * Sets whether or not all payloads regardless of their size should be stored in Amazon S3. * @@ -240,25 +126,6 @@ public PayloadStorageConfiguration withAlwaysThroughS3(boolean alwaysThroughS3) return this; } - /** - * Checks whether or not all payloads regardless of their size are being stored in Amazon S3. - * - * @return True if all payloads regardless of their size are being stored in Amazon S3. Default: false - */ - public boolean isAlwaysThroughS3() { - return alwaysThroughS3; - } - - /** - * Sets whether or not all payloads regardless of their size should be stored in Amazon S3. - * - * @param alwaysThroughS3 Whether or not all payloads regardless of their size - * should be stored in Amazon S3. Default: false - */ - public void setAlwaysThroughS3(boolean alwaysThroughS3) { - this.alwaysThroughS3 = alwaysThroughS3; - } - /** * Sets which method of server side encryption should be used, if required. * @@ -272,35 +139,6 @@ public PayloadStorageConfiguration withServerSideEncryption(ServerSideEncryption return this; } - /** - * Sets which method of server side encryption should be use, if required. - * - * This is optional, it is set only when you want to configure S3 Server Side Encryption with KMS. - * - * @param serverSideEncryptionStrategy The method of encryption required for S3 server side encryption with KMS. - */ - public void setServerSideEncryptionStrategy(ServerSideEncryptionStrategy serverSideEncryptionStrategy) { - this.serverSideEncryptionStrategy = serverSideEncryptionStrategy; - } - - /** - * The method of service side encryption which should be used, if required. - * - * @return The server side encryption method required. Default null. - */ - public ServerSideEncryptionStrategy getServerSideEncryptionStrategy() { - return this.serverSideEncryptionStrategy; - } - - /** - * Configures the ACL to apply to the Amazon S3 putObject request. - * @param objectCannedACL - * The ACL to be used when storing objects in Amazon S3 - */ - public void setObjectCannedACL(ObjectCannedACL objectCannedACL) { - this.objectCannedACL = objectCannedACL; - } - /** * Configures the ACL to apply to the Amazon S3 putObject request. * @param objectCannedACL @@ -310,20 +148,4 @@ public PayloadStorageConfiguration withObjectCannedACL(ObjectCannedACL objectCan setObjectCannedACL(objectCannedACL); return this; } - - /** - * Checks whether an ACL have been configured for storing objects in Amazon S3. - * @return True if ACL is defined - */ - public boolean isObjectCannedACLDefined() { - return null != objectCannedACL; - } - - /** - * Gets the AWS ACL to apply to the Amazon S3 putObject request. - * @return Amazon S3 object ACL - */ - public ObjectCannedACL getObjectCannedACL() { - return objectCannedACL; - } } diff --git a/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfigurationBase.java b/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfigurationBase.java new file mode 100644 index 0000000..7d08746 --- /dev/null +++ b/src/main/java/software/amazon/payloadoffloading/PayloadStorageConfigurationBase.java @@ -0,0 +1,178 @@ +package software.amazon.payloadoffloading; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.annotations.NotThreadSafe; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; + +/** + *

Base class for Amazon payload storage configuration options such as Amazon S3 client, + * bucket name, and payload size threshold for payloads.

+ * + * @see PayloadStorageConfiguration + * @see PayloadStorageAsyncConfiguration + */ +@NotThreadSafe +public abstract class PayloadStorageConfigurationBase { + private static final Logger LOG = LoggerFactory.getLogger(PayloadStorageConfigurationBase.class); + + private String s3BucketName; + private int payloadSizeThreshold = 0; + private boolean alwaysThroughS3 = false; + private boolean payloadSupport = false; + /** + * This field is optional, it is set only when we want to configure S3 Server Side Encryption with KMS. + */ + private ServerSideEncryptionStrategy serverSideEncryptionStrategy; + /** + * This field is optional, it is set only when we want to add access control list to Amazon S3 buckets and objects + */ + private ObjectCannedACL objectCannedACL; + + public PayloadStorageConfigurationBase() { + s3BucketName = null; + serverSideEncryptionStrategy = null; + objectCannedACL = null; + } + + public PayloadStorageConfigurationBase(PayloadStorageConfigurationBase other) { + this.s3BucketName = other.getS3BucketName(); + this.payloadSupport = other.isPayloadSupportEnabled(); + this.alwaysThroughS3 = other.isAlwaysThroughS3(); + this.payloadSizeThreshold = other.getPayloadSizeThreshold(); + this.serverSideEncryptionStrategy = other.getServerSideEncryptionStrategy(); + this.objectCannedACL = other.getObjectCannedACL(); + } + + /** + * Enables support for payloads . + * + * @param s3BucketName Name of the bucket which is going to be used for storing payload. + * The bucket must be already created and configured in s3. + */ + protected void setPayloadSupportEnabled(String s3BucketName) { + if (s3BucketName == null) { + String errorMessage = "S3 bucket name cannot be null."; + LOG.error(errorMessage); + throw SdkClientException.create(errorMessage); + } + if (isPayloadSupportEnabled()) { + LOG.warn("Payload support is already enabled. Overwriting AmazonS3Client and S3BucketName."); + } + this.s3BucketName = s3BucketName; + this.payloadSupport = true; + LOG.info("Payload support enabled."); + } + + /** + * Disables support for payloads. + */ + public void setPayloadSupportDisabled() { + s3BucketName = null; + payloadSupport = false; + LOG.info("Payload support disabled."); + } + + /** + * Check if the support for payloads if enabled. + * + * @return true if support for payloads is enabled. + */ + public boolean isPayloadSupportEnabled() { + return payloadSupport; + } + + /** + * Gets the name of the S3 bucket which is being used for storing payload. + * + * @return The name of the bucket which is being used. + */ + public String getS3BucketName() { + return s3BucketName; + } + + /** + * Gets the payload size threshold for storing payloads in Amazon S3. + * + * @return payload size threshold which is being used for storing in Amazon S3. Default: 256KB. + */ + public int getPayloadSizeThreshold() { + return payloadSizeThreshold; + } + + /** + * Sets the payload size threshold for storing payloads in Amazon S3. + * + * @param payloadSizeThreshold Payload size threshold to be used for storing in Amazon S3. + * Default: 256KB. + */ + public void setPayloadSizeThreshold(int payloadSizeThreshold) { + this.payloadSizeThreshold = payloadSizeThreshold; + } + + /** + * Checks whether or not all payloads regardless of their size are being stored in Amazon S3. + * + * @return True if all payloads regardless of their size are being stored in Amazon S3. Default: false + */ + public boolean isAlwaysThroughS3() { + return alwaysThroughS3; + } + + /** + * Sets whether or not all payloads regardless of their size should be stored in Amazon S3. + * + * @param alwaysThroughS3 Whether or not all payloads regardless of their size + * should be stored in Amazon S3. Default: false + */ + public void setAlwaysThroughS3(boolean alwaysThroughS3) { + this.alwaysThroughS3 = alwaysThroughS3; + } + + /** + * Sets which method of server side encryption should be use, if required. + * + * This is optional, it is set only when you want to configure S3 Server Side Encryption with KMS. + * + * @param serverSideEncryptionStrategy The method of encryption required for S3 server side encryption with KMS. + */ + public void setServerSideEncryptionStrategy(ServerSideEncryptionStrategy serverSideEncryptionStrategy) { + this.serverSideEncryptionStrategy = serverSideEncryptionStrategy; + } + + /** + * The method of service side encryption which should be used, if required. + * + * @return The server side encryption method required. Default null. + */ + public ServerSideEncryptionStrategy getServerSideEncryptionStrategy() { + return this.serverSideEncryptionStrategy; + } + + /** + * Configures the ACL to apply to the Amazon S3 putObject request. + * @param objectCannedACL + * The ACL to be used when storing objects in Amazon S3 + */ + public void setObjectCannedACL(ObjectCannedACL objectCannedACL) { + this.objectCannedACL = objectCannedACL; + } + + /** + * Checks whether an ACL have been configured for storing objects in Amazon S3. + * @return True if ACL is defined + */ + public boolean isObjectCannedACLDefined() { + return null != objectCannedACL; + } + + /** + * Gets the AWS ACL to apply to the Amazon S3 putObject request. + * @return Amazon S3 object ACL + */ + public ObjectCannedACL getObjectCannedACL() { + return objectCannedACL; + } +} \ No newline at end of file diff --git a/src/test/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfigurationTest.java b/src/test/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfigurationTest.java new file mode 100644 index 0000000..4cc6984 --- /dev/null +++ b/src/test/java/software/amazon/payloadoffloading/PayloadStorageAsyncConfigurationTest.java @@ -0,0 +1,102 @@ +package software.amazon.payloadoffloading; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +import org.junit.Test; +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.ObjectCannedACL; + +/** + * Tests the PayloadStorageAsyncConfiguration class. + */ +public class PayloadStorageAsyncConfigurationTest { + + private static final String s3BucketName = "test-bucket-name"; + private static final ServerSideEncryptionStrategy SERVER_SIDE_ENCRYPTION_STRATEGY = ServerSideEncryptionFactory.awsManagedCmk(); + private final ObjectCannedACL objectCannelACL = ObjectCannedACL.BUCKET_OWNER_FULL_CONTROL; + + @Test + public void testCopyConstructor() { + S3AsyncClient s3Async = mock(S3AsyncClient.class); + + boolean alwaysThroughS3 = true; + int payloadSizeThreshold = 500; + + PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration(); + + payloadStorageConfiguration.withPayloadSupportEnabled(s3Async, s3BucketName) + .withAlwaysThroughS3(alwaysThroughS3) + .withPayloadSizeThreshold(payloadSizeThreshold) + .withServerSideEncryption(SERVER_SIDE_ENCRYPTION_STRATEGY) + .withObjectCannedACL(objectCannelACL); + + PayloadStorageAsyncConfiguration newPayloadStorageConfiguration = new PayloadStorageAsyncConfiguration(payloadStorageConfiguration); + + assertEquals(s3Async, newPayloadStorageConfiguration.getS3AsyncClient()); + assertEquals(s3BucketName, newPayloadStorageConfiguration.getS3BucketName()); + assertEquals(SERVER_SIDE_ENCRYPTION_STRATEGY, newPayloadStorageConfiguration.getServerSideEncryptionStrategy()); + assertTrue(newPayloadStorageConfiguration.isPayloadSupportEnabled()); + assertEquals(objectCannelACL, newPayloadStorageConfiguration.getObjectCannedACL()); + assertEquals(alwaysThroughS3, newPayloadStorageConfiguration.isAlwaysThroughS3()); + assertEquals(payloadSizeThreshold, newPayloadStorageConfiguration.getPayloadSizeThreshold()); + assertNotSame(newPayloadStorageConfiguration, payloadStorageConfiguration); + } + + @Test + public void testPayloadSupportEnabled() { + S3AsyncClient s3Async = mock(S3AsyncClient.class); + PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration(); + payloadStorageConfiguration.setPayloadSupportEnabled(s3Async, s3BucketName); + + assertTrue(payloadStorageConfiguration.isPayloadSupportEnabled()); + assertNotNull(payloadStorageConfiguration.getS3AsyncClient()); + assertEquals(s3BucketName, payloadStorageConfiguration.getS3BucketName()); + } + + @Test + public void testDisablePayloadSupport() { + PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration(); + payloadStorageConfiguration.setPayloadSupportDisabled(); + + assertNull(payloadStorageConfiguration.getS3AsyncClient()); + assertNull(payloadStorageConfiguration.getS3BucketName()); + } + + @Test + public void testAlwaysThroughS3() { + PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration(); + + payloadStorageConfiguration.setAlwaysThroughS3(true); + assertTrue(payloadStorageConfiguration.isAlwaysThroughS3()); + + payloadStorageConfiguration.setAlwaysThroughS3(false); + assertFalse(payloadStorageConfiguration.isAlwaysThroughS3()); + } + + @Test + public void testSseAwsKeyManagementParams() { + PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration(); + + assertNull(payloadStorageConfiguration.getServerSideEncryptionStrategy()); + + payloadStorageConfiguration.setServerSideEncryptionStrategy(SERVER_SIDE_ENCRYPTION_STRATEGY); + assertEquals(SERVER_SIDE_ENCRYPTION_STRATEGY, payloadStorageConfiguration.getServerSideEncryptionStrategy()); + } + + @Test + public void testCannedAccessControlList() { + PayloadStorageAsyncConfiguration payloadStorageConfiguration = new PayloadStorageAsyncConfiguration(); + + assertFalse(payloadStorageConfiguration.isObjectCannedACLDefined()); + + payloadStorageConfiguration.withObjectCannedACL(objectCannelACL); + assertTrue(payloadStorageConfiguration.isObjectCannedACLDefined()); + assertEquals(objectCannelACL, payloadStorageConfiguration.getObjectCannedACL()); + } +} \ No newline at end of file diff --git a/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java b/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java index 6d0c51f..2c05d47 100644 --- a/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java +++ b/src/test/java/software/amazon/payloadoffloading/PayloadStorageConfigurationTest.java @@ -1,13 +1,17 @@ package software.amazon.payloadoffloading; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + import org.junit.Test; -import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.model.ObjectCannedACL; -import static org.mockito.Mockito.mock; -import static org.junit.Assert.*; - /** * Tests the PayloadStorageConfiguration class. */ @@ -44,33 +48,6 @@ public void testCopyConstructor() { assertNotSame(newPayloadStorageConfiguration, payloadStorageConfiguration); } - @Test - public void testCopyConstructorForAsync() { - S3AsyncClient s3Async = mock(S3AsyncClient.class); - - boolean alwaysThroughS3 = true; - int payloadSizeThreshold = 500; - - PayloadStorageConfiguration payloadStorageConfiguration = new PayloadStorageConfiguration(); - - payloadStorageConfiguration.withPayloadSupportEnabled(s3Async, s3BucketName) - .withAlwaysThroughS3(alwaysThroughS3) - .withPayloadSizeThreshold(payloadSizeThreshold) - .withServerSideEncryption(SERVER_SIDE_ENCRYPTION_STRATEGY) - .withObjectCannedACL(objectCannelACL); - - PayloadStorageConfiguration newPayloadStorageConfiguration = new PayloadStorageConfiguration(payloadStorageConfiguration); - - assertEquals(s3Async, newPayloadStorageConfiguration.getS3AsyncClient()); - assertEquals(s3BucketName, newPayloadStorageConfiguration.getS3BucketName()); - assertEquals(SERVER_SIDE_ENCRYPTION_STRATEGY, newPayloadStorageConfiguration.getServerSideEncryptionStrategy()); - assertTrue(newPayloadStorageConfiguration.isPayloadSupportEnabled()); - assertEquals(objectCannelACL, newPayloadStorageConfiguration.getObjectCannedACL()); - assertEquals(alwaysThroughS3, newPayloadStorageConfiguration.isAlwaysThroughS3()); - assertEquals(payloadSizeThreshold, newPayloadStorageConfiguration.getPayloadSizeThreshold()); - assertNotSame(newPayloadStorageConfiguration, payloadStorageConfiguration); - } - @Test public void testPayloadSupportEnabled() { S3Client s3 = mock(S3Client.class); @@ -79,19 +56,6 @@ public void testPayloadSupportEnabled() { assertTrue(payloadStorageConfiguration.isPayloadSupportEnabled()); assertNotNull(payloadStorageConfiguration.getS3Client()); - assertNull(payloadStorageConfiguration.getS3AsyncClient()); - assertEquals(s3BucketName, payloadStorageConfiguration.getS3BucketName()); - } - - @Test - public void testPayloadSupportEnabledForAsync() { - S3AsyncClient s3Async = mock(S3AsyncClient.class); - PayloadStorageConfiguration payloadStorageConfiguration = new PayloadStorageConfiguration(); - payloadStorageConfiguration.setPayloadSupportEnabled(s3Async, s3BucketName); - - assertTrue(payloadStorageConfiguration.isPayloadSupportEnabled()); - assertNull(payloadStorageConfiguration.getS3Client()); - assertNotNull(payloadStorageConfiguration.getS3AsyncClient()); assertEquals(s3BucketName, payloadStorageConfiguration.getS3BucketName()); } @@ -101,7 +65,6 @@ public void testDisablePayloadSupport() { payloadStorageConfiguration.setPayloadSupportDisabled(); assertNull(payloadStorageConfiguration.getS3Client()); - assertNull(payloadStorageConfiguration.getS3AsyncClient()); assertNull(payloadStorageConfiguration.getS3BucketName()); }