Skip to content

Commit

Permalink
feat: Expose SDK v2 s3 client
Browse files Browse the repository at this point in the history
  • Loading branch information
Pankaj Agrawal committed Nov 1, 2021
1 parent 9f20b10 commit 5db7fb9
Show file tree
Hide file tree
Showing 8 changed files with 249 additions and 115 deletions.
6 changes: 3 additions & 3 deletions powertools-sqs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@
<artifactId>aws-lambda-java-events</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.payloadoffloading</groupId>
<artifactId>payloadoffloading-common</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.lambda.powertools.sqs.internal.BatchContext;
import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect;
import software.amazon.payloadoffloading.PayloadS3Pointer;
import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect;

import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect.processMessages;
Expand All @@ -42,6 +41,7 @@ public final class SqsUtils {

private static final ObjectMapper objectMapper = new ObjectMapper();
private static SqsClient client;
private static S3Client s3Client;

private SqsUtils() {
}
Expand Down Expand Up @@ -98,6 +98,16 @@ public static void overrideSqsClient(SqsClient client) {
SqsUtils.client = client;
}

/**
* Provides ability to set default {@link S3Client} to be used by utility.
* If no default configuration is provided, client is instantiated via {@link S3Client#create()}
*
* @param s3Client {@link S3Client} to be used by utility
*/
public static void overrideS3Client(S3Client s3Client) {
SqsUtils.s3Client = s3Client;
}

/**
* This utility method is used to process each {@link SQSMessage} inside the received {@link SQSEvent}
*
Expand Down Expand Up @@ -524,4 +534,12 @@ private static SQSMessage clonedMessage(final SQSMessage sqsMessage) {
public static ObjectMapper objectMapper() {
return objectMapper;
}

public static S3Client s3Client() {
if(null == s3Client) {
SqsUtils.s3Client = S3Client.create();
}

return s3Client;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,33 @@
import java.util.List;
import java.util.function.Function;

import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkClientException;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.S3ObjectInputStream;
import com.amazonaws.util.IOUtils;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.GetObjectResponse;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.lambda.powertools.sqs.SqsLargeMessage;
import software.amazon.payloadoffloading.PayloadS3Pointer;

import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static java.lang.String.format;
import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod;
import static software.amazon.lambda.powertools.sqs.SqsUtils.s3Client;

@Aspect
public class SqsLargeMessageAspect {

private static final Logger LOG = LoggerFactory.getLogger(SqsLargeMessageAspect.class);
private static AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient();

@SuppressWarnings({"EmptyMethod"})
@Pointcut("@annotation(sqsLargeMessage)")
Expand All @@ -52,7 +50,7 @@ && placedOnSqsEventRequestHandler(pjp)) {
Object proceed = pjp.proceed(proceedArgs);

if (sqsLargeMessage.deletePayloads()) {
pointersToDelete.forEach(this::deleteMessageFromS3);
pointersToDelete.forEach(SqsLargeMessageAspect::deleteMessage);
}
return proceed;
}
Expand All @@ -69,15 +67,21 @@ public static List<PayloadS3Pointer> processMessages(final List<SQSMessage> reco
List<PayloadS3Pointer> s3Pointers = new ArrayList<>();
for (SQSMessage sqsMessage : records) {
if (isBodyLargeMessagePointer(sqsMessage.getBody())) {
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody());

S3Object s3Object = callS3Gracefully(s3Pointer, pointer -> {
S3Object object = amazonS3.getObject(pointer.getS3BucketName(), pointer.getS3Key());
PayloadS3Pointer s3Pointer = PayloadS3Pointer.fromJson(sqsMessage.getBody())
.orElseThrow(() -> new FailedProcessingLargePayloadException(format("Failed processing SQS body to extract S3 details. [ %s ].", sqsMessage.getBody())));

ResponseInputStream<GetObjectResponse> s3Object = callS3Gracefully(s3Pointer, pointer -> {
ResponseInputStream<GetObjectResponse> response = s3Client().getObject(GetObjectRequest.builder()
.bucket(pointer.getS3BucketName())
.key(pointer.getS3Key())
.build());

LOG.debug("Object downloaded with key: " + s3Pointer.getS3Key());
return object;
return response;
});

sqsMessage.setBody(readStringFromS3Object(s3Object));
sqsMessage.setBody(readStringFromS3Object(s3Object, s3Pointer));
s3Pointers.add(s3Pointer);
}
}
Expand All @@ -89,26 +93,22 @@ private static boolean isBodyLargeMessagePointer(String record) {
return record.startsWith("[\"software.amazon.payloadoffloading.PayloadS3Pointer\"");
}

private static String readStringFromS3Object(S3Object object) {
try (S3ObjectInputStream is = object.getObjectContent()) {
return IOUtils.toString(is);
private static String readStringFromS3Object(ResponseInputStream<GetObjectResponse> response,
PayloadS3Pointer s3Pointer) {
try (ResponseInputStream<GetObjectResponse> content = response) {
return IoUtils.toUtf8String(content);
} catch (IOException e) {
LOG.error("Error converting S3 object to String", e);
throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", object.getBucketName(), object.getKey()), e);
throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", s3Pointer.getS3BucketName(), s3Pointer.getS3Key()), e);
}
}

private void deleteMessageFromS3(PayloadS3Pointer s3Pointer) {
callS3Gracefully(s3Pointer, pointer -> {
amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
LOG.info("Message deleted from S3: " + s3Pointer.toJson());
return null;
});
}

public static void deleteMessage(PayloadS3Pointer s3Pointer) {
callS3Gracefully(s3Pointer, pointer -> {
amazonS3.deleteObject(s3Pointer.getS3BucketName(), s3Pointer.getS3Key());
s3Client().deleteObject(DeleteObjectRequest.builder()
.bucket(pointer.getS3BucketName())
.key(pointer.getS3Key())
.build());
LOG.info("Message deleted from S3: " + s3Pointer.toJson());
return null;
});
Expand All @@ -118,7 +118,7 @@ private static <R> R callS3Gracefully(final PayloadS3Pointer pointer,
final Function<PayloadS3Pointer, R> function) {
try {
return function.apply(pointer);
} catch (AmazonServiceException e) {
} catch (S3Exception e) {
LOG.error("A service exception", e);
throw new FailedProcessingLargePayloadException(format("Failed processing S3 record with [Bucket Name: %s Bucket Key: %s]", pointer.getS3BucketName(), pointer.getS3Key()), e);
} catch (SdkClientException e) {
Expand All @@ -137,5 +137,9 @@ public static class FailedProcessingLargePayloadException extends RuntimeExcepti
public FailedProcessingLargePayloadException(String message, Throwable cause) {
super(message, cause);
}

public FailedProcessingLargePayloadException(String message) {
super(message);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package software.amazon.payloadoffloading;

import java.util.Optional;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static java.util.Optional.empty;
import static java.util.Optional.ofNullable;

public class PayloadS3Pointer {
private static final Logger LOG = LoggerFactory.getLogger(PayloadS3Pointer.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

private String s3BucketName;
private String s3Key;

static {
objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, false);
objectMapper.activateDefaultTyping(objectMapper.getPolymorphicTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL);
}

private PayloadS3Pointer() {

}

public String getS3BucketName() {
return this.s3BucketName;
}

public String getS3Key() {
return this.s3Key;
}

public static Optional<PayloadS3Pointer> fromJson(String s3PointerJson) {
try {
return ofNullable(objectMapper.readValue(s3PointerJson, PayloadS3Pointer.class));
} catch (Exception e) {
LOG.error("Failed to read the S3 object pointer from given string.", e);
return empty();
}
}

public Optional<String> toJson() {
try {
ObjectWriter objectWriter = objectMapper.writer();
return ofNullable(objectWriter.writeValueAsString(this));

} catch (Exception e) {
LOG.error("Failed to convert S3 object pointer to text.", e);
return empty();
}
}
}
Loading

0 comments on commit 5db7fb9

Please sign in to comment.