diff --git a/data-prepper-plugins/s3-source/README.md b/data-prepper-plugins/s3-source/README.md
index f34bd9b601..0da719066c 100644
--- a/data-prepper-plugins/s3-source/README.md
+++ b/data-prepper-plugins/s3-source/README.md
@@ -64,6 +64,8 @@ All Duration values are a string that represents a duration. They support ISO_86
* `records_to_accumulate` (Optional) : The number of messages to write to accumulate before writing to the Buffer. Defaults to 100.
+* `metadata_root_key` (Optional) : String - Sets the base key for adding S3 metadata to each Event. The metadata includes the `key` and `bucket` for each S3 object. Defaults to `s3/`.
+
* `disable_bucket_ownership_validation` (Optional) : Boolean - If set to true, then the S3 Source will not attempt to validate that the bucket is owned by the expected account. The only expected account is the same account which owns the SQS queue. Defaults to `false`.
### SQS Configuration
diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/JsonRecordsGenerator.java b/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/JsonRecordsGenerator.java
index 453b5c8cb7..27bd3ee4f8 100644
--- a/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/JsonRecordsGenerator.java
+++ b/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/JsonRecordsGenerator.java
@@ -60,7 +60,7 @@ public String getFileExtension() {
@Override
public void assertEventIsCorrect(final Event event) {
- final Map messageMap = event.get("message", Map.class);
+ final Map messageMap = event.toMap();
assertThat(messageMap, notNullValue());
assertThat(messageMap.size(), greaterThanOrEqualTo(KNOWN_FIELD_COUNT_PER_EVENT));
assertThat(messageMap.get(EVENT_VERSION_FIELD), equalTo(EVENT_VERSION_VALUE));
diff --git a/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerIT.java b/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerIT.java
index 0668123ff7..e14588f21a 100644
--- a/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerIT.java
+++ b/data-prepper-plugins/s3-source/src/integrationTest/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerIT.java
@@ -61,6 +61,7 @@ class S3ObjectWorkerIT {
private int recordsReceived;
private PluginMetrics pluginMetrics;
private BucketOwnerProvider bucketOwnerProvider;
+ private EventMetadataModifier eventMetadataModifier;
@BeforeEach
void setUp() {
@@ -69,6 +70,7 @@ void setUp() {
.build();
bucket = System.getProperty("tests.s3source.bucket");
s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket);
+ eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY);
buffer = mock(Buffer.class);
recordsReceived = 0;
@@ -89,8 +91,8 @@ private void stubBufferWriter(final Consumer additionalEventAssertions, f
for (Record eventRecord : recordsCollection) {
assertThat(eventRecord, notNullValue());
assertThat(eventRecord.getData(), notNullValue());
- assertThat(eventRecord.getData().get("bucket", String.class), equalTo(bucket));
- assertThat(eventRecord.getData().get("key", String.class), equalTo(key));
+ assertThat(eventRecord.getData().get("s3/bucket", String.class), equalTo(bucket));
+ assertThat(eventRecord.getData().get("s3/key", String.class), equalTo(key));
additionalEventAssertions.accept(eventRecord.getData());
}
@@ -101,7 +103,7 @@ private void stubBufferWriter(final Consumer additionalEventAssertions, f
}
private S3ObjectWorker createObjectUnderTest(final Codec codec, final int numberOfRecordsToAccumulate, final CompressionEngine compressionEngine) {
- return new S3ObjectWorker(s3Client, buffer, compressionEngine, codec, bucketOwnerProvider, Duration.ofMillis(TIMEOUT_IN_MILLIS), numberOfRecordsToAccumulate, pluginMetrics);
+ return new S3ObjectWorker(s3Client, buffer, compressionEngine, codec, bucketOwnerProvider, Duration.ofMillis(TIMEOUT_IN_MILLIS), numberOfRecordsToAccumulate, eventMetadataModifier, pluginMetrics);
}
@ParameterizedTest
diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/EventMetadataModifier.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/EventMetadataModifier.java
new file mode 100644
index 0000000000..699f91a51d
--- /dev/null
+++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/EventMetadataModifier.java
@@ -0,0 +1,39 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.amazon.dataprepper.plugins.source;
+
+import com.amazon.dataprepper.model.event.Event;
+
+import java.util.Objects;
+import java.util.function.BiConsumer;
+
+class EventMetadataModifier implements BiConsumer {
+ private static final String BUCKET_FIELD_NAME = "bucket";
+ private static final String KEY_FIELD_NAME = "key";
+ private final String baseKey;
+
+ EventMetadataModifier(final String metadataRootKey) {
+ baseKey = generateBaseKey(metadataRootKey);
+ }
+
+ @Override
+ public void accept(final Event event, final S3ObjectReference s3ObjectReference) {
+ event.put(baseKey + BUCKET_FIELD_NAME, s3ObjectReference.getBucketName());
+ event.put(baseKey + KEY_FIELD_NAME, s3ObjectReference.getKey());
+ }
+
+ private static String generateBaseKey(String metadataRootKey) {
+ Objects.requireNonNull(metadataRootKey);
+
+ if(metadataRootKey.startsWith("/"))
+ metadataRootKey = metadataRootKey.substring(1);
+
+ if(metadataRootKey.isEmpty() || metadataRootKey.endsWith("/"))
+ return metadataRootKey;
+
+ return metadataRootKey + "/";
+ }
+}
diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3ObjectWorker.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3ObjectWorker.java
index 72ea9a0112..de8390db3d 100644
--- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3ObjectWorker.java
+++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3ObjectWorker.java
@@ -25,6 +25,7 @@
import java.io.InputStream;
import java.time.Duration;
import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
/**
* Class responsible for taking an {@link S3ObjectReference} and creating all the necessary {@link Event}
@@ -35,8 +36,6 @@ class S3ObjectWorker {
static final String S3_OBJECTS_FAILED_METRIC_NAME = "s3ObjectsFailed";
static final String S3_OBJECTS_SUCCEEDED_METRIC_NAME = "s3ObjectsSucceeded";
static final String S3_OBJECTS_TIME_ELAPSED_METRIC_NAME = "s3ObjectReadTimeElapsed";
- private static final String BUCKET_FIELD_NAME = "bucket";
- private static final String KEY_FIELD_NAME = "key";
private final S3Client s3Client;
private final Buffer> buffer;
@@ -45,6 +44,7 @@ class S3ObjectWorker {
private final BucketOwnerProvider bucketOwnerProvider;
private final Duration bufferTimeout;
private final int numberOfRecordsToAccumulate;
+ private final BiConsumer eventConsumer;
private final Counter s3ObjectsFailedCounter;
private final Counter s3ObjectsSucceededCounter;
private final Timer s3ObjectReadTimer;
@@ -56,6 +56,7 @@ public S3ObjectWorker(final S3Client s3Client,
final BucketOwnerProvider bucketOwnerProvider,
final Duration bufferTimeout,
final int numberOfRecordsToAccumulate,
+ final BiConsumer eventConsumer,
final PluginMetrics pluginMetrics) {
this.s3Client = s3Client;
this.buffer = buffer;
@@ -64,6 +65,7 @@ public S3ObjectWorker(final S3Client s3Client,
this.bucketOwnerProvider = bucketOwnerProvider;
this.bufferTimeout = bufferTimeout;
this.numberOfRecordsToAccumulate = numberOfRecordsToAccumulate;
+ this.eventConsumer = eventConsumer;
s3ObjectsFailedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME);
s3ObjectsSucceededCounter = pluginMetrics.counter(S3_OBJECTS_SUCCEEDED_METRIC_NAME);
@@ -101,8 +103,7 @@ private void doParseObject(final S3ObjectReference s3ObjectReference, final GetO
final InputStream inputStream = compressionEngine.createInputStream(getObjectRequest.key(), responseInputStream)) {
codec.parse(inputStream, record -> {
try {
- record.getData().put(BUCKET_FIELD_NAME, s3ObjectReference.getBucketName());
- record.getData().put(KEY_FIELD_NAME, s3ObjectReference.getKey());
+ eventConsumer.accept(record.getData(), s3ObjectReference);
bufferAccumulator.add(record);
} catch (final Exception e) {
LOG.error("Failed writing S3 objects to buffer.", e);
diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3Service.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3Service.java
index 38d5aa2f4f..f90f1b9302 100644
--- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3Service.java
+++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3Service.java
@@ -19,6 +19,7 @@
import software.amazon.awssdk.services.s3.S3Client;
import java.io.IOException;
+import java.util.function.BiConsumer;
public class S3Service {
private static final Logger LOG = LoggerFactory.getLogger(S3Service.class);
@@ -32,11 +33,11 @@ public class S3Service {
private final BucketOwnerProvider bucketOwnerProvider;
private final S3ObjectWorker s3ObjectWorker;
- public S3Service(final S3SourceConfig s3SourceConfig,
- final Buffer> buffer,
- final Codec codec,
- final PluginMetrics pluginMetrics,
- final BucketOwnerProvider bucketOwnerProvider) {
+ S3Service(final S3SourceConfig s3SourceConfig,
+ final Buffer> buffer,
+ final Codec codec,
+ final PluginMetrics pluginMetrics,
+ final BucketOwnerProvider bucketOwnerProvider) {
this.s3SourceConfig = s3SourceConfig;
this.buffer = buffer;
this.codec = codec;
@@ -44,8 +45,9 @@ public S3Service(final S3SourceConfig s3SourceConfig,
this.bucketOwnerProvider = bucketOwnerProvider;
this.s3Client = createS3Client();
this.compressionEngine = s3SourceConfig.getCompression().getEngine();
+ final BiConsumer eventMetadataModifier = new EventMetadataModifier(s3SourceConfig.getMetadataRootKey());
this.s3ObjectWorker = new S3ObjectWorker(s3Client, buffer, compressionEngine, codec, bucketOwnerProvider,
- s3SourceConfig.getBufferTimeout(), s3SourceConfig.getNumberOfRecordsToAccumulate(), pluginMetrics);
+ s3SourceConfig.getBufferTimeout(), s3SourceConfig.getNumberOfRecordsToAccumulate(), eventMetadataModifier, pluginMetrics);
}
void addS3Object(final S3ObjectReference s3ObjectReference) {
diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3SourceConfig.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3SourceConfig.java
index 4d210f6fc4..2574282d5b 100644
--- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3SourceConfig.java
+++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/S3SourceConfig.java
@@ -20,6 +20,7 @@
public class S3SourceConfig {
static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(10);
static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 100;
+ static final String DEFAULT_METADATA_ROOT_KEY = "s3/";
@JsonProperty("notification_type")
@NotNull
@@ -53,6 +54,9 @@ public class S3SourceConfig {
@JsonProperty("disable_bucket_ownership_validation")
private boolean disableBucketOwnershipValidation = false;
+ @JsonProperty("metadata_root_key")
+ private String metadataRootKey = DEFAULT_METADATA_ROOT_KEY;
+
public NotificationTypeOption getNotificationType() {
return notificationType;
}
@@ -88,4 +92,8 @@ public int getNumberOfRecordsToAccumulate() {
public boolean isDisableBucketOwnershipValidation() {
return disableBucketOwnershipValidation;
}
+
+ public String getMetadataRootKey() {
+ return metadataRootKey;
+ }
}
diff --git a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/JsonCodec.java b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/JsonCodec.java
index fa308db4ea..5b7e228f1e 100644
--- a/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/JsonCodec.java
+++ b/data-prepper-plugins/s3-source/src/main/java/com/amazon/dataprepper/plugins/source/codec/JsonCodec.java
@@ -17,7 +17,6 @@
import java.io.IOException;
import java.io.InputStream;
-import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
@@ -27,7 +26,6 @@
*/
@DataPrepperPlugin(name = "json", pluginType = Codec.class)
public class JsonCodec implements Codec {
- private static final String MESSAGE_FIELD_NAME = "message";
private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();
@@ -57,7 +55,7 @@ private void parseRecordsArray(final JsonParser jsonParser, final Consumer createRecord(final Map json) {
final JacksonEvent event = JacksonLog.builder()
- .withData(Collections.singletonMap(MESSAGE_FIELD_NAME, json))
+ .withData(json)
.build();
return new Record<>(event);
diff --git a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/EventMetadataModifierTest.java b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/EventMetadataModifierTest.java
new file mode 100644
index 0000000000..74ffa9ea75
--- /dev/null
+++ b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/EventMetadataModifierTest.java
@@ -0,0 +1,82 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package com.amazon.dataprepper.plugins.source;
+
+import com.amazon.dataprepper.model.event.Event;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.params.provider.Arguments.arguments;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@ExtendWith(MockitoExtension.class)
+class EventMetadataModifierTest {
+ @Mock
+ private Event event;
+
+ @Mock
+ private S3ObjectReference s3ObjectReference;
+ private String bucketName;
+ private String key;
+
+ @BeforeEach
+ void setUp() {
+ bucketName = UUID.randomUUID().toString();
+ key = UUID.randomUUID().toString();
+ }
+
+ private EventMetadataModifier createObjectUnderTest(final String metadataRootKey) {
+ return new EventMetadataModifier(metadataRootKey);
+ }
+
+ @Test
+ void constructor_throws_if_metadataRootKey_is_null() {
+ assertThrows(NullPointerException.class, () -> createObjectUnderTest(null));
+ }
+
+ @ParameterizedTest
+ @ArgumentsSource(KeysArgumentsProvider.class)
+ void accept_sets_correct_S3_bucket_and_key(final String metadataKey, final String expectedRootKey) {
+ when(s3ObjectReference.getBucketName()).thenReturn(bucketName);
+ when(s3ObjectReference.getKey()).thenReturn(key);
+
+ createObjectUnderTest(metadataKey).accept(event, s3ObjectReference);
+
+ verify(event).put(expectedRootKey + "bucket", bucketName);
+ verify(event).put(expectedRootKey + "key", key);
+ }
+
+ static class KeysArgumentsProvider implements ArgumentsProvider {
+ @Override
+ public Stream extends Arguments> provideArguments(final ExtensionContext context) {
+ return Stream.of(
+ arguments("", ""),
+ arguments("/", ""),
+ arguments("s3", "s3/"),
+ arguments("s3/", "s3/"),
+ arguments("/s3", "s3/"),
+ arguments("/s3/", "s3/"),
+ arguments("s3/inner", "s3/inner/"),
+ arguments("s3/inner/", "s3/inner/"),
+ arguments("/s3/inner", "s3/inner/"),
+ arguments("/s3/inner/", "s3/inner/")
+ );
+ }
+ }
+}
\ No newline at end of file
diff --git a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerTest.java b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerTest.java
index fa9ea22190..dc512228a1 100644
--- a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerTest.java
+++ b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/S3ObjectWorkerTest.java
@@ -34,6 +34,7 @@
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
+import java.util.function.BiConsumer;
import java.util.function.Consumer;
import static org.hamcrest.CoreMatchers.equalTo;
@@ -84,6 +85,8 @@ class S3ObjectWorkerTest {
private Counter s3ObjectsSucceededCounter;
@Mock
private Timer s3ObjectReadTimer;
+ @Mock
+ private BiConsumer eventConsumer;
private String bucketName;
private String key;
@@ -122,7 +125,7 @@ void setUp() throws Exception {
}
private S3ObjectWorker createObjectUnderTest() {
- return new S3ObjectWorker(s3Client, buffer, compressionEngine, codec, bucketOwnerProvider, bufferTimeout, recordsToAccumulate, pluginMetrics);
+ return new S3ObjectWorker(s3Client, buffer, compressionEngine, codec, bucketOwnerProvider, bufferTimeout, recordsToAccumulate, eventConsumer, pluginMetrics);
}
@Test
@@ -200,9 +203,8 @@ void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulato
consumerUnderTest.accept(record);
- final InOrder inOrder = inOrder(event, bufferAccumulator);
- inOrder.verify(event).put("bucket", bucketName);
- inOrder.verify(event).put("key", key);
+ final InOrder inOrder = inOrder(eventConsumer, bufferAccumulator);
+ inOrder.verify(eventConsumer).accept(event, s3ObjectReference);
inOrder.verify(bufferAccumulator).add(record);
}
diff --git a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/codec/JsonCodecTest.java b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/codec/JsonCodecTest.java
index adfc8bea06..e9e72803b8 100644
--- a/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/codec/JsonCodecTest.java
+++ b/data-prepper-plugins/s3-source/src/test/java/com/amazon/dataprepper/plugins/source/codec/JsonCodecTest.java
@@ -140,7 +140,7 @@ void parse_with_InputStream_calls_Consumer_with_Event(final int numberOfObjects)
assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString()));
final Map expectedMap = jsonObjects.get(i);
- assertThat(actualRecord.getData().get("message", Map.class), equalTo(expectedMap));
+ assertThat(actualRecord.getData().toMap(), equalTo(expectedMap));
}
}
@@ -167,7 +167,7 @@ void parse_with_InputStream_calls_Consumer_for_arrays_in_Json_permutations(final
assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString()));
final Map expectedMap = jsonObjects.get(i);
- assertThat(actualRecord.getData().get("message", Map.class), equalTo(expectedMap));
+ assertThat(actualRecord.getData().toMap(), equalTo(expectedMap));
}
}
@@ -200,7 +200,7 @@ void parse_with_InputStream_calls_Consumer_with_Event_for_two_parallel_arrays(fi
assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString()));
final Map expectedMap = expectedJsonObjects.get(i);
- assertThat(actualRecord.getData().get("message", Map.class), equalTo(expectedMap));
+ assertThat(actualRecord.getData().toMap(), equalTo(expectedMap));
}
}