Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sqs:large-message): Expose SDK v2 s3 client #602

Merged
merged 2 commits into from
Nov 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/utilities/sqs_large_message_handling.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,4 +180,31 @@ processing.
return "ok";
}
}
```

## Overriding the default S3Client

If you require customisations to the default S3Client, you can create your own `S3Client` and pass it to be used by utility either for
**[SqsLargeMessage annotation](#lambda-handler)**, or **[SqsUtils Utility API](#utility)**.

=== "App.java"

```java hl_lines="4 5 11"
import software.amazon.lambda.powertools.sqs.SqsLargeMessage;

static {
SqsUtils.overrideS3Client(S3Client.builder()
.build());
}

public class SqsMessageHandler implements RequestHandler<SQSEvent, String> {

@Override
@SqsLargeMessage
public String handleRequest(SQSEvent sqsEvent, Context context) {
// process messages

return "ok";
}
}
```
6 changes: 3 additions & 3 deletions powertools-sqs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@
<artifactId>aws-lambda-java-events</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.payloadoffloading</groupId>
<artifactId>payloadoffloading-common</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
pankajagrawal16 marked this conversation as resolved.
Show resolved Hide resolved
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.lambda.powertools.sqs.internal.BatchContext;
import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect;
import software.amazon.payloadoffloading.PayloadS3Pointer;
import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect;
pankajagrawal16 marked this conversation as resolved.
Show resolved Hide resolved

import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect.processMessages;
Expand All @@ -42,6 +41,7 @@ public final class SqsUtils {

private static final ObjectMapper objectMapper = new ObjectMapper();
private static SqsClient client;
private static S3Client s3Client;

private SqsUtils() {
}
Expand Down Expand Up @@ -98,6 +98,16 @@ public static void overrideSqsClient(SqsClient client) {
SqsUtils.client = client;
}

/**
* By default, the S3Client is instantiated via {@link S3Client#create()}.
* This method provides the ability to override the S3Client with your own custom version.
*
* @param s3Client {@link S3Client} to be used by utility
*/
public static void overrideS3Client(S3Client s3Client) {
SqsUtils.s3Client = s3Client;
}

/**
* This utility method is used to process each {@link SQSMessage} inside the received {@link SQSEvent}
*
Expand Down Expand Up @@ -524,4 +534,12 @@ private static SQSMessage clonedMessage(final SQSMessage sqsMessage) {
public static ObjectMapper objectMapper() {
return objectMapper;
}

public static S3Client s3Client() {
if(null == s3Client) {
SqsUtils.s3Client = S3Client.create();
}

return s3Client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,33 @@
import java.util.List;
import java.util.function.Function;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.util.IOUtils;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.lambda.powertools.sqs.SqsLargeMessage;
import software.amazon.payloadoffloading.PayloadS3Pointer;

import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static java.lang.String.format;
import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod;
import static software.amazon.lambda.powertools.sqs.SqsUtils.s3Client;

@Aspect
public class SqsLargeMessageAspect {

private static final Logger LOG = LoggerFactory.getLogger(SqsLargeMessageAspect.class);
private static AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();

@SuppressWarnings({"EmptyMethod"})
@Pointcut("@annotation(sqsLargeMessage)")
Expand All @@ -52,7 +50,7 @@ && placedOnSqsEventRequestHandler(pjp)) {
Object proceed = pjp.proceed(proceedArgs);

if (sqsLargeMessage.deletePayloads()) {
pointersToDelete.forEach(this::deleteMessageFromS3);
pointersToDelete.forEach(SqsLargeMessageAspect::deleteMessage);
}
return proceed;
}
Expand All @@ -69,15 +67,21 @@ public static List<PayloadS3Pointer> processMessages(final List<SQSMessage> reco
List<PayloadS3Pointer> s3Pointers = new ArrayList<>();
for (SQSMessage sqsMessage : records) {
if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());

S3Object s3Object = callS3Gracefully(s3Pointer, pointer -> {
S3Object object = amazonS3.getObject(pointer.getS3BucketName(), pointer.getS3Key());
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody())
.orElseThrow(() -> new FailedProcessingLargePayloadException(format("Failed processing SQS body to extract S3 details. [ %s ].", sqsMessage.getBody())));

ResponseInputStream<GetObjectResponse> s3Object = callS3Gracefully(s3Pointer, pointer -> {
ResponseInputStream<GetObjectResponse> response = s3Client().getObject(GetObjectRequest.builder()
.bucket(pointer.getS3BucketName())
.key(pointer.getS3Key())
.build());

LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key());
return object;
return response;
});

sqsMessage.setBody(readStringFromS3Object(s3Object));
sqsMessage.setBody(readStringFromS3Object(s3Object, s3Pointer));
s3Pointers.add(s3Pointer);
}
}
Expand All @@ -89,26 +93,22 @@ private static boolean isBodyLargeMessagePointer(String record) {
return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\"");
}

private static String readStringFromS3Object(S3Object object) {
try (S3ObjectInputStream is = object.getObjectContent()) {
return IOUtils.toString(is);
private static String readStringFromS3Object(ResponseInputStream<GetObjectResponse> response,
PayloadS3Pointer s3Pointer) {
try (ResponseInputStream<GetObjectResponse> content = response) {
return IoUtils.toUtf8String(content);
} catch (IOException e) {
LOG.error("Error converting S3 object to String", e);
throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", object.getBucketName(), object.getKey()), e);
throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", s3Pointer.getS3BucketName(), s3Pointer.getS3Key()), e);
}
}

private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
callS3Gracefully(s3Pointer, pointer -> {
amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
LOG.info("Message deleted from S3: " + s3Pointer.toJson());
return null;
});
}

public static void deleteMessage(PayloadS3Pointer s3Pointer) {
callS3Gracefully(s3Pointer, pointer -> {
amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
s3Client().deleteObject(DeleteObjectRequest.builder()
.bucket(pointer.getS3BucketName())
.key(pointer.getS3Key())
.build());
LOG.info("Message deleted from S3: " + s3Pointer.toJson());
return null;
});
Expand All @@ -118,7 +118,7 @@ private static <R> R callS3Gracefully(final PayloadS3Pointer pointer,
final Function<PayloadS3Pointer, R> function) {
try {
return function.apply(pointer);
} catch (AmazonServiceException e) {
} catch (S3Exception e) {
LOG.error("A service exception", e);
throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e);
} catch (SdkClientException e) {
Expand All @@ -137,5 +137,9 @@ public static class FailedProcessingLargePayloadException extends RuntimeExcepti
public FailedProcessingLargePayloadException(String message, Throwable cause) {
super(message, cause);
}

public FailedProcessingLargePayloadException(String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package software.amazon.payloadoffloading;
msailes marked this conversation as resolved.
Show resolved Hide resolved

import java.util.Optional;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;

public class PayloadS3Pointer {
private static final Logger LOG = LoggerFactory.getLogger(PayloadS3Pointer.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private String s3BucketName;
private String s3Key;

static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.activateDefaultTyping(objectMapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
}

private PayloadS3Pointer() {

}

public String getS3BucketName() {
return this.s3BucketName;
}

public String getS3Key() {
return this.s3Key;
}

public static Optional<PayloadS3Pointer> fromJson(String s3PointerJson) {
try {
return ofNullable(objectMapper.readValue(s3PointerJson, PayloadS3Pointer.class));
} catch (Exception e) {
LOG.error("Failed to read the S3 object pointer from given string.", e);
return empty();
}
}

public Optional<String> toJson() {
try {
ObjectWriter objectWriter = objectMapper.writer();
return ofNullable(objectWriter.writeValueAsString(this));

} catch (Exception e) {
LOG.error("Failed to convert S3 object pointer to text.", e);
return empty();
}
}
}
Loading