diff --git a/data-prepper-plugins/s3-source/build.gradle b/data-prepper-plugins/s3-source/build.gradle index bcc916bb12..ee09a35515 100644 --- a/data-prepper-plugins/s3-source/build.gradle +++ b/data-prepper-plugins/s3-source/build.gradle @@ -17,8 +17,10 @@ dependencies { implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:sts' implementation 'software.amazon.awssdk:sqs' - implementation 'com.amazonaws:aws-java-sdk-s3:1.12.257' + implementation 'com.fasterxml.jackson.core:jackson-core' + implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.3' implementation 'org.apache.commons:commons-compress:1.21' + implementation 'joda-time:joda-time:2.10.14' implementation 'org.hibernate.validator:hibernate-validator:7.0.4.Final' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-csv' testImplementation 'org.apache.commons:commons-lang3:3.12.0' diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/DateTimeJsonSerializer.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/DateTimeJsonSerializer.java new file mode 100644 index 0000000000..e3eaf33eb0 --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/DateTimeJsonSerializer.java @@ -0,0 +1,27 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source; + +import java.io.IOException; +import java.util.Date; + +import software.amazon.awssdk.utils.DateUtils; + +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.SerializerProvider; + +/** + * A Jackson serializer for Joda {@code DateTime}s. + */ +final class DateTimeJsonSerializer extends JsonSerializer { + + @Override + public void serialize(Date value, JsonGenerator jgen, SerializerProvider provider) throws IOException { + jgen.writeString(DateUtils.formatIso8601Date(value.toInstant())); + } + +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3EventNotification.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3EventNotification.java new file mode 100644 index 0000000000..9809b3545b --- /dev/null +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3EventNotification.java @@ -0,0 +1,427 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package com.amazon.dataprepper.plugins.source; + +import java.util.List; + +import org.joda.time.DateTime; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import software.amazon.awssdk.utils.http.SdkHttpUtils; + +/** + * A helper class that represents a strongly typed S3 EventNotification item sent + * to SQS, SNS, or Lambda. + * + * This class is derived from S3EventNotification in the AWS SDKv1 for Java. + */ +public class S3EventNotification { + + private final List records; + + @JsonCreator + public S3EventNotification( + @JsonProperty(value = "Records") List records) + { + this.records = records; + } + + /** + *

+ * Parse the JSON string into a S3EventNotification object. + *

+ *

+ * The function will try its best to parse input JSON string as best as it can. + * It will not fail even if the JSON string contains unknown properties. + * The function will throw SdkClientException if the input JSON string is + * not valid JSON. + *

+ * @param json + * JSON string to parse. Typically this is the body of your SQS + * notification message body. + * + * @return The resulting S3EventNotification object. + */ + public static S3EventNotification parseJson(String json) throws JsonProcessingException { + if (json == null) { + return null; + } + final ObjectMapper objectMapper = new ObjectMapper(); + return objectMapper.readValue(json, S3EventNotification.class); + } + + /** + * @return the records in this notification + */ + @JsonProperty(value = "Records") + public List getRecords() { + return records; + } + + public static class UserIdentityEntity { + + private final String principalId; + + @JsonCreator + public UserIdentityEntity( + @JsonProperty(value = "principalId") String principalId) { + this.principalId = principalId; + } + + public String getPrincipalId() { + return principalId; + } + } + + public static class S3BucketEntity { + + private final String name; + private final UserIdentityEntity ownerIdentity; + private final String arn; + + @JsonCreator + public S3BucketEntity( + @JsonProperty(value = "name") String name, + @JsonProperty(value = "ownerIdentity") UserIdentityEntity ownerIdentity, + @JsonProperty(value = "arn") String arn) + { + this.name = name; + this.ownerIdentity = ownerIdentity; + this.arn = arn; + } + + public String getName() { + return name; + } + + public UserIdentityEntity getOwnerIdentity() { + return ownerIdentity; + } + + public String getArn() { + return arn; + } + } + + public static class S3ObjectEntity { + + private final String key; + private final Long size; + private final String eTag; + private final String versionId; + private final String sequencer; + + @Deprecated + public S3ObjectEntity( + String key, + Integer size, + String eTag, + String versionId) + { + this.key = key; + this.size = size == null ? null : size.longValue(); + this.eTag = eTag; + this.versionId = versionId; + this.sequencer = null; + } + + @Deprecated + public S3ObjectEntity( + String key, + Long size, + String eTag, + String versionId) + { + this(key, size, eTag, versionId, null); + } + + @JsonCreator + public S3ObjectEntity( + @JsonProperty(value = "key") String key, + @JsonProperty(value = "size") Long size, + @JsonProperty(value = "eTag") String eTag, + @JsonProperty(value = "versionId") String versionId, + @JsonProperty(value = "sequencer") String sequencer) + { + this.key = key; + this.size = size; + this.eTag = eTag; + this.versionId = versionId; + this.sequencer = sequencer; + } + + public String getKey() { + return key; + } + + /** + * S3 URL encodes the key of the object involved in the event. This is + * a convenience method to automatically URL decode the key. + * @return The URL decoded object key. + */ + public String getUrlDecodedKey() { + return SdkHttpUtils.urlDecode(getKey()); + } + + /** + * @deprecated use {@link #getSizeAsLong()} instead. + */ + @Deprecated + @JsonIgnore + public Integer getSize() { + return size == null ? null : size.intValue(); + } + + @JsonProperty(value = "size") + public Long getSizeAsLong() { + return size; + } + + public String geteTag() { + return eTag; + } + + public String getVersionId() { + return versionId; + } + + public String getSequencer() { + return sequencer; + } + } + + public static class S3Entity { + + private final String configurationId; + private final S3BucketEntity bucket; + private final S3ObjectEntity object; + private final String s3SchemaVersion; + + @JsonCreator + public S3Entity( + @JsonProperty(value = "configurationId") String configurationId, + @JsonProperty(value = "bucket") S3BucketEntity bucket, + @JsonProperty(value = "object") S3ObjectEntity object, + @JsonProperty(value = "s3SchemaVersion") String s3SchemaVersion) + { + this.configurationId = configurationId; + this.bucket = bucket; + this.object = object; + this.s3SchemaVersion = s3SchemaVersion; + } + + public String getConfigurationId() { + return configurationId; + } + + public S3BucketEntity getBucket() { + return bucket; + } + + public S3ObjectEntity getObject() { + return object; + } + + public String getS3SchemaVersion() { + return s3SchemaVersion; + } + } + + public static class RequestParametersEntity { + + private final String sourceIPAddress; + + @JsonCreator + public RequestParametersEntity( + @JsonProperty(value = "sourceIPAddress") String sourceIPAddress) + { + this.sourceIPAddress = sourceIPAddress; + } + + public String getSourceIPAddress() { + return sourceIPAddress; + } + } + + public static class ResponseElementsEntity { + + private final String xAmzId2; + private final String xAmzRequestId; + + @JsonCreator + public ResponseElementsEntity( + @JsonProperty(value = "x-amz-id-2") String xAmzId2, + @JsonProperty(value = "x-amz-request-id") String xAmzRequestId) + { + this.xAmzId2 = xAmzId2; + this.xAmzRequestId = xAmzRequestId; + } + + @JsonProperty("x-amz-id-2") + public String getxAmzId2() { + return xAmzId2; + } + + @JsonProperty("x-amz-request-id") + public String getxAmzRequestId() { + return xAmzRequestId; + } + } + + public static class GlacierEventDataEntity { + private final RestoreEventDataEntity restoreEventData; + + @JsonCreator + public GlacierEventDataEntity( + @JsonProperty(value = "restoreEventData") RestoreEventDataEntity restoreEventData) + { + this.restoreEventData = restoreEventData; + } + + public RestoreEventDataEntity getRestoreEventData() { + return restoreEventData; + } + } + + public static class RestoreEventDataEntity { + private DateTime lifecycleRestorationExpiryTime; + private final String lifecycleRestoreStorageClass; + + @JsonCreator + public RestoreEventDataEntity( + @JsonProperty("lifecycleRestorationExpiryTime") String lifecycleRestorationExpiryTime, + @JsonProperty("lifecycleRestoreStorageClass") String lifecycleRestoreStorageClass) + { + if (lifecycleRestorationExpiryTime != null) { + this.lifecycleRestorationExpiryTime = DateTime.parse(lifecycleRestorationExpiryTime); + } + this.lifecycleRestoreStorageClass = lifecycleRestoreStorageClass; + } + + @JsonSerialize(using=DateTimeJsonSerializer.class) + public DateTime getLifecycleRestorationExpiryTime() { + return lifecycleRestorationExpiryTime; + } + + public String getLifecycleRestoreStorageClass() { + return lifecycleRestoreStorageClass; + } + } + + public static class S3EventNotificationRecord { + + private final String awsRegion; + private final String eventName; + private final String eventSource; + private DateTime eventTime; + private final String eventVersion; + private final RequestParametersEntity requestParameters; + private final ResponseElementsEntity responseElements; + private final S3Entity s3; + private final UserIdentityEntity userIdentity; + private final GlacierEventDataEntity glacierEventData; + + @Deprecated + public S3EventNotificationRecord( + String awsRegion, + String eventName, + String eventSource, + String eventTime, + String eventVersion, + RequestParametersEntity requestParameters, + ResponseElementsEntity responseElements, + S3Entity s3, + UserIdentityEntity userIdentity) + { + this(awsRegion, + eventName, + eventSource, + eventTime, + eventVersion, + requestParameters, + responseElements, + s3, + userIdentity, + null); + } + + @JsonCreator + public S3EventNotificationRecord( + @JsonProperty(value = "awsRegion") String awsRegion, + @JsonProperty(value = "eventName") String eventName, + @JsonProperty(value = "eventSource") String eventSource, + @JsonProperty(value = "eventTime") String eventTime, + @JsonProperty(value = "eventVersion") String eventVersion, + @JsonProperty(value = "requestParameters") RequestParametersEntity requestParameters, + @JsonProperty(value = "responseElements") ResponseElementsEntity responseElements, + @JsonProperty(value = "s3") S3Entity s3, + @JsonProperty(value = "userIdentity") UserIdentityEntity userIdentity, + @JsonProperty(value = "glacierEventData") GlacierEventDataEntity glacierEventData) + { + this.awsRegion = awsRegion; + this.eventName = eventName; + this.eventSource = eventSource; + + if (eventTime != null) + { + this.eventTime = DateTime.parse(eventTime); + } + + this.eventVersion = eventVersion; + this.requestParameters = requestParameters; + this.responseElements = responseElements; + this.s3 = s3; + this.userIdentity = userIdentity; + this.glacierEventData = glacierEventData; + } + + public String getAwsRegion() { + return awsRegion; + } + + public String getEventName() { + return eventName; + } + + public String getEventSource() { + return eventSource; + } + + @JsonSerialize(using=DateTimeJsonSerializer.class) + public DateTime getEventTime() { + return eventTime; + } + + public String getEventVersion() { + return eventVersion; + } + + public RequestParametersEntity getRequestParameters() { + return requestParameters; + } + + public ResponseElementsEntity getResponseElements() { + return responseElements; + } + + public S3Entity getS3() { + return s3; + } + + public UserIdentityEntity getUserIdentity() { + return userIdentity; + } + + public GlacierEventDataEntity getGlacierEventData() { + return glacierEventData; + } + } +} \ No newline at end of file diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/SqsWorker.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/SqsWorker.java index 9bf41aa027..096acd8b99 100644 --- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/SqsWorker.java +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/SqsWorker.java @@ -10,12 +10,13 @@ import com.amazon.dataprepper.plugins.source.configuration.SqsOptions; import com.amazon.dataprepper.plugins.source.filter.ObjectCreatedFilter; import com.amazon.dataprepper.plugins.source.filter.S3EventFilter; -import com.amazonaws.SdkClientException; -import com.amazonaws.services.s3.event.S3EventNotification; +import com.fasterxml.jackson.core.JsonProcessingException; import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Timer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; @@ -141,7 +142,7 @@ private List convertS3EventMessag return Collections.emptyList(); } - } catch (SdkClientException e) { + } catch (SdkClientException | JsonProcessingException e) { LOG.error("Invalid JSON string in message body of {}", message.messageId(), e); } return Collections.emptyList(); diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilter.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilter.java index 8d672e80ba..fcafcbf963 100644 --- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilter.java +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilter.java @@ -5,7 +5,7 @@ package com.amazon.dataprepper.plugins.source.filter; -import com.amazonaws.services.s3.event.S3EventNotification; +import com.amazon.dataprepper.plugins.source.S3EventNotification; import java.util.Optional; diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/S3EventFilter.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/S3EventFilter.java index e9b56c20ed..d1e9ef3ba2 100644 --- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/S3EventFilter.java +++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/filter/S3EventFilter.java @@ -5,7 +5,7 @@ package com.amazon.dataprepper.plugins.source.filter; -import com.amazonaws.services.s3.event.S3EventNotification; +import com.amazon.dataprepper.plugins.source.S3EventNotification; import java.util.Optional; diff --git a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilterTest.java b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilterTest.java index 4b93dfd8e9..0a480e15b6 100644 --- a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilterTest.java +++ b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/filter/ObjectCreatedFilterTest.java @@ -5,7 +5,6 @@ package com.amazon.dataprepper.plugins.source.filter; -import com.amazonaws.services.s3.event.S3EventNotification; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -19,6 +18,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import com.amazon.dataprepper.plugins.source.S3EventNotification; + class ObjectCreatedFilterTest { private ObjectCreatedFilter objectCreatedFilter;