Skip to content

Commit

Permalink
S3 Event Consistency via metadata and JSON changes (#1803)
Browse files Browse the repository at this point in the history
Support a configurable key for S3 metadata and make this base key s3/ by default. Moved the output of JSON from S3 objects into the root of the Event from the message key. Resolves #1687

Signed-off-by: David Venable <[email protected]>
  • Loading branch information
dlvenable authored Sep 26, 2022
1 parent 63307d9 commit 6d35560
Show file tree
Hide file tree
Showing 11 changed files with 160 additions and 24 deletions.
2 changes: 2 additions & 0 deletions data-prepper-plugins/s3-source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ All Duration values are a string that represents a duration. They support ISO_86

* `records_to_accumulate` (Optional) : The number of messages to write to accumulate before writing to the Buffer. Defaults to 100.

* `metadata_root_key` (Optional) : String - Sets the base key for adding S3 metadata to each Event. The metadata includes the `key` and `bucket` for each S3 object. Defaults to `s3/`.

* `disable_bucket_ownership_validation` (Optional) : Boolean - If set to true, then the S3 Source will not attempt to validate that the bucket is owned by the expected account. The only expected account is the same account which owns the SQS queue. Defaults to `false`.

### <a name="sqs_configuration">SQS Configuration</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public String getFileExtension() {
@Override
public void assertEventIsCorrect(final Event event) {

final Map<String, Object> messageMap = event.get("message", Map.class);
final Map<String, Object> messageMap = event.toMap();
assertThat(messageMap, notNullValue());
assertThat(messageMap.size(), greaterThanOrEqualTo(KNOWN_FIELD_COUNT_PER_EVENT));
assertThat(messageMap.get(EVENT_VERSION_FIELD), equalTo(EVENT_VERSION_VALUE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class S3ObjectWorkerIT {
private int recordsReceived;
private PluginMetrics pluginMetrics;
private BucketOwnerProvider bucketOwnerProvider;
private EventMetadataModifier eventMetadataModifier;

@BeforeEach
void setUp() {
Expand All @@ -69,6 +70,7 @@ void setUp() {
.build();
bucket = System.getProperty("tests.s3source.bucket");
s3ObjectGenerator = new S3ObjectGenerator(s3Client, bucket);
eventMetadataModifier = new EventMetadataModifier(S3SourceConfig.DEFAULT_METADATA_ROOT_KEY);

buffer = mock(Buffer.class);
recordsReceived = 0;
Expand All @@ -89,8 +91,8 @@ private void stubBufferWriter(final Consumer<Event> additionalEventAssertions, f
for (Record<Event> eventRecord : recordsCollection) {
assertThat(eventRecord, notNullValue());
assertThat(eventRecord.getData(), notNullValue());
assertThat(eventRecord.getData().get("bucket", String.class), equalTo(bucket));
assertThat(eventRecord.getData().get("key", String.class), equalTo(key));
assertThat(eventRecord.getData().get("s3/bucket", String.class), equalTo(bucket));
assertThat(eventRecord.getData().get("s3/key", String.class), equalTo(key));
additionalEventAssertions.accept(eventRecord.getData());

}
Expand All @@ -101,7 +103,7 @@ private void stubBufferWriter(final Consumer<Event> additionalEventAssertions, f
}

private S3ObjectWorker createObjectUnderTest(final Codec codec, final int numberOfRecordsToAccumulate, final CompressionEngine compressionEngine) {
return new S3ObjectWorker(s3Client, buffer, compressionEngine, codec, bucketOwnerProvider, Duration.ofMillis(TIMEOUT_IN_MILLIS), numberOfRecordsToAccumulate, pluginMetrics);
return new S3ObjectWorker(s3Client, buffer, compressionEngine, codec, bucketOwnerProvider, Duration.ofMillis(TIMEOUT_IN_MILLIS), numberOfRecordsToAccumulate, eventMetadataModifier, pluginMetrics);
}

@ParameterizedTest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source;

import com.amazon.dataprepper.model.event.Event;

import java.util.Objects;
import java.util.function.BiConsumer;

class EventMetadataModifier implements BiConsumer<Event, S3ObjectReference> {
private static final String BUCKET_FIELD_NAME = "bucket";
private static final String KEY_FIELD_NAME = "key";
private final String baseKey;

EventMetadataModifier(final String metadataRootKey) {
baseKey = generateBaseKey(metadataRootKey);
}

@Override
public void accept(final Event event, final S3ObjectReference s3ObjectReference) {
event.put(baseKey + BUCKET_FIELD_NAME, s3ObjectReference.getBucketName());
event.put(baseKey + KEY_FIELD_NAME, s3ObjectReference.getKey());
}

private static String generateBaseKey(String metadataRootKey) {
Objects.requireNonNull(metadataRootKey);

if(metadataRootKey.startsWith("/"))
metadataRootKey = metadataRootKey.substring(1);

if(metadataRootKey.isEmpty() || metadataRootKey.endsWith("/"))
return metadataRootKey;

return metadataRootKey + "/";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.InputStream;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;

/**
* Class responsible for taking an {@link S3ObjectReference} and creating all the necessary {@link Event}
Expand All @@ -35,8 +36,6 @@ class S3ObjectWorker {
static final String S3_OBJECTS_FAILED_METRIC_NAME = "s3ObjectsFailed";
static final String S3_OBJECTS_SUCCEEDED_METRIC_NAME = "s3ObjectsSucceeded";
static final String S3_OBJECTS_TIME_ELAPSED_METRIC_NAME = "s3ObjectReadTimeElapsed";
private static final String BUCKET_FIELD_NAME = "bucket";
private static final String KEY_FIELD_NAME = "key";

private final S3Client s3Client;
private final Buffer<Record<Event>> buffer;
Expand All @@ -45,6 +44,7 @@ class S3ObjectWorker {
private final BucketOwnerProvider bucketOwnerProvider;
private final Duration bufferTimeout;
private final int numberOfRecordsToAccumulate;
private final BiConsumer<Event, S3ObjectReference> eventConsumer;
private final Counter s3ObjectsFailedCounter;
private final Counter s3ObjectsSucceededCounter;
private final Timer s3ObjectReadTimer;
Expand All @@ -56,6 +56,7 @@ public S3ObjectWorker(final S3Client s3Client,
final BucketOwnerProvider bucketOwnerProvider,
final Duration bufferTimeout,
final int numberOfRecordsToAccumulate,
final BiConsumer<Event, S3ObjectReference> eventConsumer,
final PluginMetrics pluginMetrics) {
this.s3Client = s3Client;
this.buffer = buffer;
Expand All @@ -64,6 +65,7 @@ public S3ObjectWorker(final S3Client s3Client,
this.bucketOwnerProvider = bucketOwnerProvider;
this.bufferTimeout = bufferTimeout;
this.numberOfRecordsToAccumulate = numberOfRecordsToAccumulate;
this.eventConsumer = eventConsumer;

s3ObjectsFailedCounter = pluginMetrics.counter(S3_OBJECTS_FAILED_METRIC_NAME);
s3ObjectsSucceededCounter = pluginMetrics.counter(S3_OBJECTS_SUCCEEDED_METRIC_NAME);
Expand Down Expand Up @@ -101,8 +103,7 @@ private void doParseObject(final S3ObjectReference s3ObjectReference, final GetO
final InputStream inputStream = compressionEngine.createInputStream(getObjectRequest.key(), responseInputStream)) {
codec.parse(inputStream, record -> {
try {
record.getData().put(BUCKET_FIELD_NAME, s3ObjectReference.getBucketName());
record.getData().put(KEY_FIELD_NAME, s3ObjectReference.getKey());
eventConsumer.accept(record.getData(), s3ObjectReference);
bufferAccumulator.add(record);
} catch (final Exception e) {
LOG.error("Failed writing S3 objects to buffer.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import software.amazon.awssdk.services.s3.S3Client;

import java.io.IOException;
import java.util.function.BiConsumer;

public class S3Service {
private static final Logger LOG = LoggerFactory.getLogger(S3Service.class);
Expand All @@ -32,20 +33,21 @@ public class S3Service {
private final BucketOwnerProvider bucketOwnerProvider;
private final S3ObjectWorker s3ObjectWorker;

public S3Service(final S3SourceConfig s3SourceConfig,
final Buffer<Record<Event>> buffer,
final Codec codec,
final PluginMetrics pluginMetrics,
final BucketOwnerProvider bucketOwnerProvider) {
S3Service(final S3SourceConfig s3SourceConfig,
final Buffer<Record<Event>> buffer,
final Codec codec,
final PluginMetrics pluginMetrics,
final BucketOwnerProvider bucketOwnerProvider) {
this.s3SourceConfig = s3SourceConfig;
this.buffer = buffer;
this.codec = codec;
this.pluginMetrics = pluginMetrics;
this.bucketOwnerProvider = bucketOwnerProvider;
this.s3Client = createS3Client();
this.compressionEngine = s3SourceConfig.getCompression().getEngine();
final BiConsumer<Event, S3ObjectReference> eventMetadataModifier = new EventMetadataModifier(s3SourceConfig.getMetadataRootKey());
this.s3ObjectWorker = new S3ObjectWorker(s3Client, buffer, compressionEngine, codec, bucketOwnerProvider,
s3SourceConfig.getBufferTimeout(), s3SourceConfig.getNumberOfRecordsToAccumulate(), pluginMetrics);
s3SourceConfig.getBufferTimeout(), s3SourceConfig.getNumberOfRecordsToAccumulate(), eventMetadataModifier, pluginMetrics);
}

void addS3Object(final S3ObjectReference s3ObjectReference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
public class S3SourceConfig {
static final Duration DEFAULT_BUFFER_TIMEOUT = Duration.ofSeconds(10);
static final int DEFAULT_NUMBER_OF_RECORDS_TO_ACCUMULATE = 100;
static final String DEFAULT_METADATA_ROOT_KEY = "s3/";

@JsonProperty("notification_type")
@NotNull
Expand Down Expand Up @@ -53,6 +54,9 @@ public class S3SourceConfig {
@JsonProperty("disable_bucket_ownership_validation")
private boolean disableBucketOwnershipValidation = false;

@JsonProperty("metadata_root_key")
private String metadataRootKey = DEFAULT_METADATA_ROOT_KEY;

public NotificationTypeOption getNotificationType() {
return notificationType;
}
Expand Down Expand Up @@ -88,4 +92,8 @@ public int getNumberOfRecordsToAccumulate() {
public boolean isDisableBucketOwnershipValidation() {
return disableBucketOwnershipValidation;
}

public String getMetadataRootKey() {
return metadataRootKey;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
Expand All @@ -27,7 +26,6 @@
*/
@DataPrepperPlugin(name = "json", pluginType = Codec.class)
public class JsonCodec implements Codec {
private static final String MESSAGE_FIELD_NAME = "message";
private final ObjectMapper objectMapper = new ObjectMapper();
private final JsonFactory jsonFactory = new JsonFactory();

Expand Down Expand Up @@ -57,7 +55,7 @@ private void parseRecordsArray(final JsonParser jsonParser, final Consumer<Recor

private Record<Event> createRecord(final Map<String, Object> json) {
final JacksonEvent event = JacksonLog.builder()
.withData(Collections.singletonMap(MESSAGE_FIELD_NAME, json))
.withData(json)
.build();

return new Record<>(event);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package com.amazon.dataprepper.plugins.source;

import com.amazon.dataprepper.model.event.Event;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;

import java.util.UUID;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class EventMetadataModifierTest {
@Mock
private Event event;

@Mock
private S3ObjectReference s3ObjectReference;
private String bucketName;
private String key;

@BeforeEach
void setUp() {
bucketName = UUID.randomUUID().toString();
key = UUID.randomUUID().toString();
}

private EventMetadataModifier createObjectUnderTest(final String metadataRootKey) {
return new EventMetadataModifier(metadataRootKey);
}

@Test
void constructor_throws_if_metadataRootKey_is_null() {
assertThrows(NullPointerException.class, () -> createObjectUnderTest(null));
}

@ParameterizedTest
@ArgumentsSource(KeysArgumentsProvider.class)
void accept_sets_correct_S3_bucket_and_key(final String metadataKey, final String expectedRootKey) {
when(s3ObjectReference.getBucketName()).thenReturn(bucketName);
when(s3ObjectReference.getKey()).thenReturn(key);

createObjectUnderTest(metadataKey).accept(event, s3ObjectReference);

verify(event).put(expectedRootKey + "bucket", bucketName);
verify(event).put(expectedRootKey + "key", key);
}

static class KeysArgumentsProvider implements ArgumentsProvider {
@Override
public Stream<? extends Arguments> provideArguments(final ExtensionContext context) {
return Stream.of(
arguments("", ""),
arguments("/", ""),
arguments("s3", "s3/"),
arguments("s3/", "s3/"),
arguments("/s3", "s3/"),
arguments("/s3/", "s3/"),
arguments("s3/inner", "s3/inner/"),
arguments("s3/inner/", "s3/inner/"),
arguments("/s3/inner", "s3/inner/"),
arguments("/s3/inner/", "s3/inner/")
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import static org.hamcrest.CoreMatchers.equalTo;
Expand Down Expand Up @@ -84,6 +85,8 @@ class S3ObjectWorkerTest {
private Counter s3ObjectsSucceededCounter;
@Mock
private Timer s3ObjectReadTimer;
@Mock
private BiConsumer<Event, S3ObjectReference> eventConsumer;

private String bucketName;
private String key;
Expand Down Expand Up @@ -122,7 +125,7 @@ void setUp() throws Exception {
}

private S3ObjectWorker createObjectUnderTest() {
return new S3ObjectWorker(s3Client, buffer, compressionEngine, codec, bucketOwnerProvider, bufferTimeout, recordsToAccumulate, pluginMetrics);
return new S3ObjectWorker(s3Client, buffer, compressionEngine, codec, bucketOwnerProvider, bufferTimeout, recordsToAccumulate, eventConsumer, pluginMetrics);
}

@Test
Expand Down Expand Up @@ -200,9 +203,8 @@ void parseS3Object_calls_Codec_parse_with_Consumer_that_adds_to_BufferAccumulato

consumerUnderTest.accept(record);

final InOrder inOrder = inOrder(event, bufferAccumulator);
inOrder.verify(event).put("bucket", bucketName);
inOrder.verify(event).put("key", key);
final InOrder inOrder = inOrder(eventConsumer, bufferAccumulator);
inOrder.verify(eventConsumer).accept(event, s3ObjectReference);
inOrder.verify(bufferAccumulator).add(record);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ void parse_with_InputStream_calls_Consumer_with_Event(final int numberOfObjects)
assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString()));

final Map<String, Object> expectedMap = jsonObjects.get(i);
assertThat(actualRecord.getData().get("message", Map.class), equalTo(expectedMap));
assertThat(actualRecord.getData().toMap(), equalTo(expectedMap));
}
}

Expand All @@ -167,7 +167,7 @@ void parse_with_InputStream_calls_Consumer_for_arrays_in_Json_permutations(final
assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString()));

final Map<String, Object> expectedMap = jsonObjects.get(i);
assertThat(actualRecord.getData().get("message", Map.class), equalTo(expectedMap));
assertThat(actualRecord.getData().toMap(), equalTo(expectedMap));
}
}

Expand Down Expand Up @@ -200,7 +200,7 @@ void parse_with_InputStream_calls_Consumer_with_Event_for_two_parallel_arrays(fi
assertThat(actualRecord.getData().getMetadata().getEventType(), equalTo(EventType.LOG.toString()));

final Map<String, Object> expectedMap = expectedJsonObjects.get(i);
assertThat(actualRecord.getData().get("message", Map.class), equalTo(expectedMap));
assertThat(actualRecord.getData().toMap(), equalTo(expectedMap));
}
}

Expand Down

0 comments on commit 6d35560

Please sign in to comment.