Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Payload Offloading Java Common Library For AWS #52

Merged
merged 5 commits into from
Jul 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
target/
.idea/
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@
<version>${aws-java-sdk.version}</version>
</dependency>

<dependency>
aws-rizi marked this conversation as resolved.
Show resolved Hide resolved
<groupId>software.amazon.payloadoffloading</groupId>
<artifactId>payloadoffloading-common</artifactId>
<version>1.0.0</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
Expand Down Expand Up @@ -70,6 +68,10 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import software.amazon.payloadoffloading.PayloadS3Pointer;
import software.amazon.payloadoffloading.PayloadStorageConfiguration;

import static software.amazon.payloadoffloading.Util.getStringSizeInBytes;

/**
* Amazon SQS Extended Client extends the functionality of Amazon SQS client.
Expand All @@ -93,7 +95,7 @@
public class AmazonSQSExtendedClient extends AmazonSQSExtendedClientBase implements AmazonSQS {
private static final Log LOG = LogFactory.getLog(AmazonSQSExtendedClient.class);

private ExtendedClientConfiguration clientConfiguration;
private PayloadStorageConfiguration clientConfiguration;

/**
* Constructs a new Amazon SQS extended client to invoke service methods on
Expand All @@ -108,7 +110,7 @@ public class AmazonSQSExtendedClient extends AmazonSQSExtendedClientBase impleme
* The Amazon SQS client to use to connect to Amazon SQS.
*/
public AmazonSQSExtendedClient(AmazonSQS sqsClient) {
this(sqsClient, new ExtendedClientConfiguration());
this(sqsClient, new PayloadStorageConfiguration().withPayloadSizeThreshold(SQSExtendedClientConstants.DEFAULT_MESSAGE_SIZE_THRESHOLD));
}

/**
Expand All @@ -122,13 +124,16 @@ public AmazonSQSExtendedClient(AmazonSQS sqsClient) {
*
* @param sqsClient
* The Amazon SQS client to use to connect to Amazon SQS.
* @param extendedClientConfig
* The extended client configuration options controlling the
* @param payloadStorageConfiguration
* The payload storage configuration options controlling the
* functionality of this client.
*/
public AmazonSQSExtendedClient(AmazonSQS sqsClient, ExtendedClientConfiguration extendedClientConfig) {
public AmazonSQSExtendedClient(AmazonSQS sqsClient, PayloadStorageConfiguration payloadStorageConfiguration) {
super(sqsClient);
this.clientConfiguration = new ExtendedClientConfiguration(extendedClientConfig);
if (payloadStorageConfiguration.getPayloadSizeThreshold() == 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since PayloadStorageConfiguration variable payloadSizeThreshold by default is 0, setting it to SQS default value. Let me know if I should go for another approach.

payloadStorageConfiguration.setPayloadSizeThreshold(SQSExtendedClientConstants.DEFAULT_MESSAGE_SIZE_THRESHOLD);
}
this.clientConfiguration = new PayloadStorageConfiguration(payloadStorageConfiguration);
}

/**
Expand Down Expand Up @@ -175,7 +180,7 @@ public SendMessageResult sendMessage(SendMessageRequest sendMessageRequest) {

sendMessageRequest.getRequestClientOptions().appendUserAgent(SQSExtendedClientConstants.USER_AGENT_HEADER);

if (!clientConfiguration.isLargePayloadSupportEnabled()) {
if (!clientConfiguration.isPayloadSupportEnabled()) {
return super.sendMessage(sendMessageRequest);
}

Expand Down Expand Up @@ -340,7 +345,7 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR

receiveMessageRequest.getRequestClientOptions().appendUserAgent(SQSExtendedClientConstants.USER_AGENT_HEADER);

if (!clientConfiguration.isLargePayloadSupportEnabled()) {
if (!clientConfiguration.isPayloadSupportEnabled()) {
return super.receiveMessage(receiveMessageRequest);
}

Expand All @@ -360,7 +365,7 @@ public ReceiveMessageResult receiveMessage(ReceiveMessageRequest receiveMessageR
String messageBody = message.getBody();

// read the S3 pointer from the message body JSON string.
MessageS3Pointer s3Pointer = readMessageS3PointerFromJSON(messageBody);
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(messageBody);

String s3MsgBucketName = s3Pointer.getS3BucketName();
String s3MsgKey = s3Pointer.getS3Key();
Expand Down Expand Up @@ -546,7 +551,7 @@ public DeleteMessageResult deleteMessage(DeleteMessageRequest deleteMessageReque

deleteMessageRequest.getRequestClientOptions().appendUserAgent(SQSExtendedClientConstants.USER_AGENT_HEADER);

if (!clientConfiguration.isLargePayloadSupportEnabled()) {
if (!clientConfiguration.isPayloadSupportEnabled()) {
return super.deleteMessage(deleteMessageRequest);
}

Expand Down Expand Up @@ -769,7 +774,7 @@ public SendMessageBatchResult sendMessageBatch(SendMessageBatchRequest sendMessa

sendMessageBatchRequest.getRequestClientOptions().appendUserAgent(SQSExtendedClientConstants.USER_AGENT_HEADER);

if (!clientConfiguration.isLargePayloadSupportEnabled()) {
if (!clientConfiguration.isPayloadSupportEnabled()) {
return super.sendMessageBatch(sendMessageBatchRequest);
}

Expand Down Expand Up @@ -907,7 +912,7 @@ public DeleteMessageBatchResult deleteMessageBatch(DeleteMessageBatchRequest del
deleteMessageBatchRequest.getRequestClientOptions().appendUserAgent(
SQSExtendedClientConstants.USER_AGENT_HEADER);

if (!clientConfiguration.isLargePayloadSupportEnabled()) {
if (!clientConfiguration.isPayloadSupportEnabled()) {
return super.deleteMessageBatch(deleteMessageBatchRequest);
}

Expand Down Expand Up @@ -1117,9 +1122,9 @@ private void deleteMessagePayloadFromS3(String receiptHandle) {

private void checkMessageAttributes(Map<String, MessageAttributeValue> messageAttributes) {
int msgAttributesSize = getMsgAttributesSize(messageAttributes);
if (msgAttributesSize > clientConfiguration.getMessageSizeThreshold()) {
if (msgAttributesSize > clientConfiguration.getPayloadSizeThreshold()) {
String errorMessage = "Total size of Message attributes is " + msgAttributesSize
+ " bytes which is larger than the threshold of " + clientConfiguration.getMessageSizeThreshold()
+ " bytes which is larger than the threshold of " + clientConfiguration.getPayloadSizeThreshold()
+ " Bytes. Consider including the payload in the message body instead of message attributes.";
LOG.error(errorMessage);
throw new AmazonClientException(errorMessage);
Expand Down Expand Up @@ -1152,20 +1157,6 @@ private String embedS3PointerInReceiptHandle(String receiptHandle, String s3MsgB
return modifiedReceiptHandle;
}

private MessageS3Pointer readMessageS3PointerFromJSON(String messageBody) {

MessageS3Pointer s3Pointer = null;
try {
JsonDataConverter jsonDataConverter = new JsonDataConverter();
s3Pointer = jsonDataConverter.deserializeFromJson(messageBody, MessageS3Pointer.class);
} catch (Exception e) {
String errorMessage = "Failed to read the S3 object pointer from an SQS message. Message was not received.";
LOG.error(errorMessage, e);
throw new AmazonClientException(errorMessage, e);
}
return s3Pointer;
}

private String getOrigReceiptHandle(String receiptHandle) {
int secondOccurence = receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER,
receiptHandle.indexOf(SQSExtendedClientConstants.S3_KEY_MARKER) + 1);
Expand Down Expand Up @@ -1215,14 +1206,14 @@ private boolean isLarge(SendMessageRequest sendMessageRequest) {
int msgAttributesSize = getMsgAttributesSize(sendMessageRequest.getMessageAttributes());
long msgBodySize = getStringSizeInBytes(sendMessageRequest.getMessageBody());
long totalMsgSize = msgAttributesSize + msgBodySize;
return (totalMsgSize > clientConfiguration.getMessageSizeThreshold());
return (totalMsgSize > clientConfiguration.getPayloadSizeThreshold());
}

private boolean isLarge(SendMessageBatchRequestEntry batchEntry) {
int msgAttributesSize = getMsgAttributesSize(batchEntry.getMessageAttributes());
long msgBodySize = getStringSizeInBytes(batchEntry.getMessageBody());
long totalMsgSize = msgAttributesSize + msgBodySize;
return (totalMsgSize > clientConfiguration.getMessageSizeThreshold());
return (totalMsgSize > clientConfiguration.getPayloadSizeThreshold());
}

private int getMsgAttributesSize(Map<String, MessageAttributeValue> msgAttributes) {
Expand Down Expand Up @@ -1272,8 +1263,8 @@ private SendMessageBatchRequestEntry storeMessageInS3(SendMessageBatchRequestEnt
+ ".");

// Convert S3 pointer (bucket name, key, etc) to JSON string
MessageS3Pointer s3Pointer = new MessageS3Pointer(clientConfiguration.getS3BucketName(), s3Key);
String s3PointerStr = getJSONFromS3Pointer(s3Pointer);
PayloadS3Pointer s3Pointer = new PayloadS3Pointer(clientConfiguration.getS3BucketName(), s3Key);
String s3PointerStr = s3Pointer.toJson();

// Storing S3 pointer in the message body.
batchEntry.setMessageBody(s3PointerStr);
Expand Down Expand Up @@ -1305,29 +1296,16 @@ private SendMessageRequest storeMessageInS3(SendMessageRequest sendMessageReques
+ ".");

// Convert S3 pointer (bucket name, key, etc) to JSON string
MessageS3Pointer s3Pointer = new MessageS3Pointer(clientConfiguration.getS3BucketName(), s3Key);
PayloadS3Pointer s3Pointer = new PayloadS3Pointer(clientConfiguration.getS3BucketName(), s3Key);

String s3PointerStr = getJSONFromS3Pointer(s3Pointer);
String s3PointerStr = s3Pointer.toJson();

// Storing S3 pointer in the message body.
sendMessageRequest.setMessageBody(s3PointerStr);

return sendMessageRequest;
}

private String getJSONFromS3Pointer(MessageS3Pointer s3Pointer) {
String s3PointerStr = null;
try {
JsonDataConverter jsonDataConverter = new JsonDataConverter();
s3PointerStr = jsonDataConverter.serializeToJson(s3Pointer);
} catch (Exception e) {
String errorMessage = "Failed to convert S3 object pointer to text. Message was not sent.";
LOG.error(errorMessage, e);
throw new AmazonClientException(errorMessage, e);
}
return s3PointerStr;
}

private void storeTextInS3(String s3Key, String messageContentStr, Long messageContentSize) {
InputStream messageContentStream = new ByteArrayInputStream(messageContentStr.getBytes(StandardCharsets.UTF_8));
ObjectMetadata messageContentStreamMetadata = new ObjectMetadata();
Expand All @@ -1347,19 +1325,4 @@ private void storeTextInS3(String s3Key, String messageContentStr, Long messageC
}
}

private static long getStringSizeInBytes(String str) {
CountingOutputStream counterOutputStream = new CountingOutputStream();
try {
Writer writer = new OutputStreamWriter(counterOutputStream, "UTF-8");
writer.write(str);
writer.flush();
writer.close();
} catch (IOException e) {
String errorMessage = "Failed to calculate the size of message payload.";
LOG.error(errorMessage, e);
throw new AmazonClientException(errorMessage, e);
}
return counterOutputStream.getTotalSize();
}

}

This file was deleted.

Loading