diff --git a/data-prepper-plugins/dynamodb-source/build.gradle b/data-prepper-plugins/dynamodb-source/build.gradle index 8c25a0482b..8fdc037470 100644 --- a/data-prepper-plugins/dynamodb-source/build.gradle +++ b/data-prepper-plugins/dynamodb-source/build.gradle @@ -27,8 +27,4 @@ dependencies { testImplementation testLibs.mockito.inline testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' -} - -test { - useJUnitPlatform() } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java index 96376590bf..b8733a5577 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBService.java @@ -98,7 +98,7 @@ public void start(Buffer> buffer) { DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer); Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig); - ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); + ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, dynamoDBSourceConfig.getTableConfigs().get(0).getStreamConfig()); Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator(dynamoDBSourceConfig.getTableConfigs().get(0).getExportConfig() != null)); // leader scheduler will handle the initialization Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs); @@ -109,8 +109,10 @@ public void start(Buffer> buffer) { executor.submit(leaderScheduler); executor.submit(exportScheduler); executor.submit(fileLoaderScheduler); - executor.submit(streamScheduler); + if (tableConfigs.get(0).getStreamConfig() != null) { + executor.submit(streamScheduler); + } } /** diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/StreamConfig.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/StreamConfig.java index 903cd9ff00..83bc360be6 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/StreamConfig.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/configuration/StreamConfig.java @@ -6,14 +6,22 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.configuration; import com.fasterxml.jackson.annotation.JsonProperty; +import software.amazon.awssdk.services.dynamodb.model.StreamViewType; public class StreamConfig { @JsonProperty(value = "start_position") private StreamStartPosition startPosition = StreamStartPosition.LATEST; + @JsonProperty("view_on_remove") + private StreamViewType viewForRemoves = StreamViewType.NEW_IMAGE; + public StreamStartPosition getStartPosition() { return startPosition; } + public StreamViewType getStreamViewForRemoves() { + return viewForRemoves; + } + } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java index 067eaf8519..d3fadf7217 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverter.java @@ -14,12 +14,15 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.OperationType; import software.amazon.awssdk.services.dynamodb.model.Record; +import software.amazon.awssdk.services.dynamodb.model.StreamViewType; import java.time.Instant; import java.util.HashMap; @@ -40,6 +43,8 @@ public class StreamRecordConverter extends RecordConverter { private static final TypeReference> MAP_TYPE_REFERENCE = new TypeReference>() { }; + private final StreamConfig streamConfig; + private final PluginMetrics pluginMetrics; private final Counter changeEventSuccessCounter; @@ -50,13 +55,18 @@ public class StreamRecordConverter extends RecordConverter { private Instant currentSecond; private int recordsSeenThisSecond = 0; - public StreamRecordConverter(final BufferAccumulator> bufferAccumulator, TableInfo tableInfo, PluginMetrics pluginMetrics) { + public StreamRecordConverter(final BufferAccumulator> bufferAccumulator, + final TableInfo tableInfo, + final PluginMetrics pluginMetrics, + final StreamConfig streamConfig) { super(bufferAccumulator, tableInfo); this.pluginMetrics = pluginMetrics; this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT); this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT); this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED); this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED); + this.streamConfig = streamConfig; + } @Override @@ -73,8 +83,10 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List data; Map keys; try { + final Map streamRecord = getStreamRecordFromImage(record); + // NewImage may be empty - data = convertData(record.dynamodb().newImage()); + data = convertData(streamRecord); // Always get keys from dynamodb().keys() keys = convertKeys(record.dynamodb().keys()); } catch (final Exception e) { @@ -150,4 +162,21 @@ private long calculateTieBreakingVersionFromTimestamp(final Instant eventTimeInS return eventTimeInSeconds.getEpochSecond() * 1_000_000 + recordsSeenThisSecond; } + + private Map getStreamRecordFromImage(final Record record) { + if (!OperationType.REMOVE.equals(record.eventName())) { + return record.dynamodb().newImage(); + } + + if (StreamViewType.OLD_IMAGE.equals(streamConfig.getStreamViewForRemoves())) { + if (!record.dynamodb().hasOldImage()) { + LOG.warn("view_on_remove with OLD_IMAGE is enabled, but no old image can be found on the stream record, using NEW_IMAGE"); + return record.dynamodb().newImage(); + } else { + return record.dynamodb().oldImage(); + } + } + + return record.dynamodb().newImage(); + } } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index 9497a87479..894e668568 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -12,6 +12,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; @@ -121,7 +122,7 @@ private ShardConsumer(Builder builder) { this.startTime = builder.startTime == null ? Instant.MIN : builder.startTime.minus(STREAM_EVENT_OVERLAP_TIME); this.waitForExport = builder.waitForExport; final BufferAccumulator> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT); - recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics); + recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics, builder.streamConfig); this.acknowledgementSet = builder.acknowledgementSet; this.shardAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout; this.shardId = builder.shardId; @@ -132,8 +133,9 @@ private ShardConsumer(Builder builder) { public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics, - final Buffer> buffer) { - return new Builder(dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); + final Buffer> buffer, + final StreamConfig streamConfig) { + return new Builder(dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); } @@ -164,14 +166,18 @@ static class Builder { private AcknowledgementSet acknowledgementSet; private Duration dataFileAcknowledgmentTimeout; + private StreamConfig streamConfig; + public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient, final PluginMetrics pluginMetrics, final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics, - final Buffer> buffer) { + final Buffer> buffer, + final StreamConfig streamConfig) { this.dynamoDbStreamsClient = dynamoDbStreamsClient; this.pluginMetrics = pluginMetrics; this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics; this.buffer = buffer; + this.streamConfig = streamConfig; } public Builder tableInfo(TableInfo tableInfo) { diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index 4ca28ef8cc..0a72133032 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -11,6 +11,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; @@ -45,18 +46,21 @@ public class ShardConsumerFactory { private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics; private final Buffer> buffer; + private final StreamConfig streamConfig; + public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordinator, final DynamoDbStreamsClient streamsClient, final PluginMetrics pluginMetrics, final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics, - final Buffer> buffer) { + final Buffer> buffer, + final StreamConfig streamConfig) { this.streamsClient = streamsClient; this.enhancedSourceCoordinator = enhancedSourceCoordinator; this.pluginMetrics = pluginMetrics; this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics; this.buffer = buffer; - + this.streamConfig = streamConfig; } public Runnable createConsumer(final StreamPartition streamPartition, @@ -97,7 +101,7 @@ public Runnable createConsumer(final StreamPartition streamPartition, LOG.debug("Create shard consumer for {} with shardIter {}", streamPartition.getShardId(), shardIterator); LOG.debug("Create shard consumer for {} with lastShardIter {}", streamPartition.getShardId(), lastShardIterator); - ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer) + ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig) .tableInfo(tableInfo) .checkpointer(checkpointer) .shardIterator(shardIterator) diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java index 6c9228878f..3015cc7ef0 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBServiceTest.java @@ -8,7 +8,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.Mock; +import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -16,16 +18,26 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig; import software.amazon.awssdk.services.dynamodb.DynamoDbClient; import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient; import software.amazon.awssdk.services.s3.S3Client; +import java.util.Collections; import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class DynamoDBServiceTest { @@ -51,6 +63,9 @@ class DynamoDBServiceTest { @Mock private TableConfig tableConfig; + @Mock + private StreamConfig streamConfig; + @Mock private PluginMetrics pluginMetrics; @@ -60,6 +75,9 @@ class DynamoDBServiceTest { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private ExecutorService executorService; + private DynamoDBService dynamoDBService; @BeforeEach @@ -73,16 +91,45 @@ void setup() { } private DynamoDBService createObjectUnderTest() { - DynamoDBService objectUnderTest = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics, acknowledgementSetManager); - return objectUnderTest; + + try (final MockedStatic executorsMockedStatic = mockStatic(Executors.class)) { + executorsMockedStatic.when(() -> Executors.newFixedThreadPool(eq(4))).thenReturn(executorService); + + return new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics, acknowledgementSetManager); + } } @Test - void test_normal_start() { + void test_normal_start_with_stream_config() { + when(tableConfig.getStreamConfig()).thenReturn(streamConfig); dynamoDBService = createObjectUnderTest(); assertThat(dynamoDBService, notNullValue()); + + final ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + dynamoDBService.start(buffer); + verify(executorService, times(4)).submit(runnableArgumentCaptor.capture()); + + assertThat(runnableArgumentCaptor.getAllValues(), notNullValue()); + assertThat(runnableArgumentCaptor.getAllValues().size(), equalTo(4)); + } + + @Test + void test_normal_start_without_stream_config() { + when(tableConfig.getStreamConfig()).thenReturn(null); + + dynamoDBService = createObjectUnderTest(); + assertThat(dynamoDBService, notNullValue()); + + final ArgumentCaptor runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class); + + dynamoDBService.start(buffer); + + verify(executorService, times(3)).submit(runnableArgumentCaptor.capture()); + + assertThat(runnableArgumentCaptor.getAllValues(), notNullValue()); + assertThat(runnableArgumentCaptor.getAllValues().size(), equalTo(3)); } @@ -90,6 +137,8 @@ void test_normal_start() { void test_normal_shutdown() { dynamoDBService = createObjectUnderTest(); assertThat(dynamoDBService, notNullValue()); + + when(executorService.shutdownNow()).thenReturn(Collections.emptyList()); dynamoDBService.shutdown(); } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java index 19b4e27b49..a01e569f36 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/converter/StreamRecordConverterTest.java @@ -21,11 +21,13 @@ import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.opensearch.OpenSearchBulkActions; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; import software.amazon.awssdk.services.dynamodb.model.OperationType; import software.amazon.awssdk.services.dynamodb.model.StreamRecord; +import software.amazon.awssdk.services.dynamodb.model.StreamViewType; import java.time.Instant; import java.util.ArrayList; @@ -47,6 +49,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_VERSION_FROM_TIMESTAMP; import static org.opensearch.dataprepper.plugins.source.dynamodb.converter.MetadataKeyAttributes.EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE; @@ -83,6 +86,9 @@ class StreamRecordConverterTest { @Mock private DistributionSummary bytesProcessedSummary; + @Mock + private StreamConfig streamConfig; + private final String tableName = UUID.randomUUID().toString(); private final String tableArn = "arn:aws:dynamodb:us-west-2:123456789012:table/" + tableName; @@ -120,7 +126,7 @@ void test_writeToBuffer() throws Exception { List records = buildRecords(numberOfRecords, Instant.now()); - StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig); recordConverter.writeToBuffer(null, records); verify(bufferAccumulator, times(numberOfRecords)).add(any(Record.class)); @@ -139,7 +145,7 @@ void test_writeSingleRecordToBuffer() throws Exception { List records = buildRecords(1, Instant.now()); final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0); - StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig); doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); recordConverter.writeToBuffer(null, records); @@ -185,7 +191,7 @@ void test_writeSingleRecordToBuffer_with_other_data(final String additionalStrin List records = buildRecords(1, Instant.now(), additionalData); final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0); - final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig); doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); objectUnderTest.writeToBuffer(null, records); @@ -221,7 +227,7 @@ void test_writeSingleRecordToBuffer_with_bad_input_does_not_write() throws Excep final Map badData = Map.of("otherData", AttributeValue.builder().build()); List badRecords = buildRecords(2, Instant.now(), badData); - final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig); objectUnderTest.writeToBuffer(null, badRecords); @@ -235,7 +241,7 @@ void test_writeSingleRecordToBuffer_with_mixed_input_writes_good_records() throw List badRecords = buildRecords(2, Instant.now(), badData); List goodRecords = buildRecords(5, Instant.now()); - final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig); List mixedRecords = new ArrayList<>(); mixedRecords.addAll(badRecords); @@ -258,7 +264,7 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta records.add(buildRecord(newerSecond)); final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); - StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics); + StreamRecordConverter recordConverter = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig); doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); recordConverter.writeToBuffer(null, records); @@ -337,6 +343,139 @@ void writingToBuffer_with_nth_event_in_that_second_returns_expected_that_timesta verify(bytesProcessedSummary, times(4)).record(anyDouble()); } + @Test + void remove_record_with_use_old_image_on_delete_uses_old_image() throws Exception { + when(streamConfig.getStreamViewForRemoves()).thenReturn(StreamViewType.OLD_IMAGE); + + final String newImageKey = UUID.randomUUID().toString(); + final String newImageValue = UUID.randomUUID().toString(); + + final String oldImageKey = UUID.randomUUID().toString(); + final String oldImageValue = UUID.randomUUID().toString(); + + final Map newImage = Map.of(newImageKey, AttributeValue.builder().s(newImageValue).build()); + final Map oldImage = Map.of(oldImageKey, AttributeValue.builder().s(oldImageValue).build()); + List records = Collections.singletonList(buildRecord(Instant.now(), newImage, oldImage, OperationType.REMOVE)); + final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0); + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig); + doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); + + objectUnderTest.writeToBuffer(null, records); + + verify(bufferAccumulator).add(any(Record.class)); + verify(bufferAccumulator).flush(); + verify(changeEventSuccessCounter).increment(anyDouble()); + assertThat(recordArgumentCaptor.getValue().getData(), notNullValue()); + JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData(); + + assertThat(event.getMetadata(), notNullValue()); + String partitionKey = record.dynamodb().keys().get(partitionKeyAttrName).s(); + String sortKey = record.dynamodb().keys().get(sortKeyAttrName).s(); + assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(partitionKey)); + assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sortKey)); + assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "|" + sortKey)); + assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.DELETE.toString())); + assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo(OperationType.REMOVE.toString())); + assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli())); + + assertThat(event.get(partitionKeyAttrName, String.class), notNullValue()); + assertThat(event.get(sortKeyAttrName, String.class), notNullValue()); + assertThat(event.get(newImageKey, String.class), equalTo(null)); + assertThat(event.get(oldImageKey, String.class), equalTo(oldImageValue)); + + verifyNoInteractions(changeEventErrorCounter); + verify(bytesReceivedSummary).record(record.dynamodb().sizeBytes()); + verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes()); + } + + @Test + void remove_record_with_use_old_image_on_delete_with_no_new_image_found_uses_new_image() throws Exception { + when(streamConfig.getStreamViewForRemoves()).thenReturn(StreamViewType.OLD_IMAGE); + + final String newImageKey = UUID.randomUUID().toString(); + final String newImageValue = UUID.randomUUID().toString(); + + final Map newImage = Map.of(newImageKey, AttributeValue.builder().s(newImageValue).build()); + List records = Collections.singletonList(buildRecord(Instant.now(), newImage, null, OperationType.REMOVE)); + final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0); + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig); + doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); + + objectUnderTest.writeToBuffer(null, records); + + verify(bufferAccumulator).add(any(Record.class)); + verify(bufferAccumulator).flush(); + verify(changeEventSuccessCounter).increment(anyDouble()); + assertThat(recordArgumentCaptor.getValue().getData(), notNullValue()); + JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData(); + + assertThat(event.getMetadata(), notNullValue()); + String partitionKey = record.dynamodb().keys().get(partitionKeyAttrName).s(); + String sortKey = record.dynamodb().keys().get(sortKeyAttrName).s(); + assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(partitionKey)); + assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sortKey)); + assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "|" + sortKey)); + assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.DELETE.toString())); + assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo(OperationType.REMOVE.toString())); + assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli())); + + assertThat(event.get(partitionKeyAttrName, String.class), notNullValue()); + assertThat(event.get(sortKeyAttrName, String.class), notNullValue()); + assertThat(event.get(newImageKey, String.class), equalTo(newImageValue)); + + verifyNoInteractions(changeEventErrorCounter); + verify(bytesReceivedSummary).record(record.dynamodb().sizeBytes()); + verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes()); + } + + @Test + void remove_record_without_use_old_image_on_delete_uses_new_image() throws Exception { + when(streamConfig.getStreamViewForRemoves()).thenReturn(StreamViewType.NEW_IMAGE); + + final String newImageKey = UUID.randomUUID().toString(); + final String newImageValue = UUID.randomUUID().toString(); + + final String oldImageKey = UUID.randomUUID().toString(); + final String oldImageValue = UUID.randomUUID().toString(); + + final Map newImage = Map.of(newImageKey, AttributeValue.builder().s(newImageValue).build()); + final Map oldImage = Map.of(oldImageKey, AttributeValue.builder().s(oldImageValue).build()); + List records = Collections.singletonList(buildRecord(Instant.now(), newImage, oldImage, OperationType.REMOVE)); + final ArgumentCaptor recordArgumentCaptor = ArgumentCaptor.forClass(Record.class); + software.amazon.awssdk.services.dynamodb.model.Record record = records.get(0); + final StreamRecordConverter objectUnderTest = new StreamRecordConverter(bufferAccumulator, tableInfo, pluginMetrics, streamConfig); + doNothing().when(bufferAccumulator).add(recordArgumentCaptor.capture()); + + objectUnderTest.writeToBuffer(null, records); + + verify(bufferAccumulator).add(any(Record.class)); + verify(bufferAccumulator).flush(); + verify(changeEventSuccessCounter).increment(anyDouble()); + assertThat(recordArgumentCaptor.getValue().getData(), notNullValue()); + JacksonEvent event = (JacksonEvent) recordArgumentCaptor.getValue().getData(); + + assertThat(event.getMetadata(), notNullValue()); + String partitionKey = record.dynamodb().keys().get(partitionKeyAttrName).s(); + String sortKey = record.dynamodb().keys().get(sortKeyAttrName).s(); + assertThat(event.getMetadata().getAttribute(PARTITION_KEY_METADATA_ATTRIBUTE), equalTo(partitionKey)); + assertThat(event.getMetadata().getAttribute(SORT_KEY_METADATA_ATTRIBUTE), equalTo(sortKey)); + assertThat(event.getMetadata().getAttribute(PRIMARY_KEY_DOCUMENT_ID_METADATA_ATTRIBUTE), equalTo(partitionKey + "|" + sortKey)); + assertThat(event.getMetadata().getAttribute(EVENT_NAME_BULK_ACTION_METADATA_ATTRIBUTE), equalTo(OpenSearchBulkActions.DELETE.toString())); + assertThat(event.getMetadata().getAttribute(DDB_STREAM_EVENT_NAME_METADATA_ATTRIBUTE), equalTo(OperationType.REMOVE.toString())); + assertThat(event.getMetadata().getAttribute(EVENT_TIMESTAMP_METADATA_ATTRIBUTE), equalTo(record.dynamodb().approximateCreationDateTime().toEpochMilli())); + + assertThat(event.get(partitionKeyAttrName, String.class), notNullValue()); + assertThat(event.get(sortKeyAttrName, String.class), notNullValue()); + assertThat(event.get(oldImageKey, String.class), equalTo(null)); + assertThat(event.get(newImageKey, String.class), equalTo(newImageValue)); + + verifyNoInteractions(changeEventErrorCounter); + verify(bytesReceivedSummary).record(record.dynamodb().sizeBytes()); + verify(bytesProcessedSummary).record(record.dynamodb().sizeBytes()); + } + private List buildRecords(int count, final Instant creationTime) { return buildRecords(count, creationTime, Collections.emptyMap()); } @@ -347,18 +486,20 @@ private List buildRecords final Map additionalData) { List records = new ArrayList<>(); for (int i = 0; i < count; i++) { - records.add(buildRecord(creationTime, additionalData)); + records.add(buildRecord(creationTime, additionalData, null, OperationType.INSERT)); } return records; } private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final Instant creationTime) { - return buildRecord(creationTime, Collections.emptyMap()); + return buildRecord(creationTime, Collections.emptyMap(), null, OperationType.INSERT); } private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final Instant creationTime, - Map additionalData) { + Map additionalData, + Map oldImage, + final OperationType operationType) { Map keysData = Map.of( partitionKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build(), sortKeyAttrName, AttributeValue.builder().s(UUID.randomUUID().toString()).build()); @@ -367,16 +508,24 @@ private software.amazon.awssdk.services.dynamodb.model.Record buildRecord(final data.putAll(keysData); data.putAll(additionalData); + Map oldImageData = new HashMap<>(); + if (oldImage != null) { + oldImageData.putAll(keysData); + oldImageData.putAll(oldImage); + } + + StreamRecord streamRecord = StreamRecord.builder() .sizeBytes(RANDOM.nextLong()) .newImage(data) + .oldImage(oldImageData.isEmpty() ? null : oldImageData) .keys(keysData) .sequenceNumber(UUID.randomUUID().toString()) .approximateCreationDateTime(creationTime) .build(); software.amazon.awssdk.services.dynamodb.model.Record record = software.amazon.awssdk.services.dynamodb.model.Record.builder() .dynamodb(streamRecord) - .eventName(OperationType.INSERT) + .eventName(operationType) .build(); return record; } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java index 13f18977d6..6d503f7a19 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java @@ -16,6 +16,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; +import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; @@ -67,6 +68,9 @@ class ShardConsumerFactoryTest { @Mock private GlobalState tableInfoGlobalState; + @Mock + private StreamConfig streamConfig; + private final String tableName = UUID.randomUUID().toString(); private final String tableArn = "arn:aws:dynamodb:us-west-2:123456789012:table/" + tableName; @@ -111,7 +115,7 @@ public void test_create_shardConsumer_correctly() { state.setStartTime(Instant.now().toEpochMilli()); streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state)); - ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); + ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); assertThat(consumer, notNullValue()); verify(dynamoDbStreamsClient).getShardIterator(any(GetShardIteratorRequest.class)); @@ -128,7 +132,7 @@ public void test_create_shardConsumer_for_closedShards() { state.setEndingSequenceNumber(UUID.randomUUID().toString()); streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state)); - ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); + ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); assertThat(consumer, notNullValue()); // Should get iterators twice @@ -149,7 +153,7 @@ void stream5xxErrors_is_incremented_when_get_shard_iterator_throws_internal_exce final Counter stream5xxErrors = mock(Counter.class); when(dynamoDBSourceAggregateMetrics.getStream5xxErrors()).thenReturn(stream5xxErrors); - ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); + ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); assertThat(consumer, nullValue()); verify(stream5xxErrors).increment(); @@ -167,7 +171,7 @@ void stream4xxErrors_is_incremented_when_get_shard_iterator_throws_dynamodb_exce final Counter stream4xxErrors = mock(Counter.class); when(dynamoDBSourceAggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors); - ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); + ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig); Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); assertThat(consumer, nullValue()); verify(stream4xxErrors).increment(); diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index f2192fef8f..175664d6ea 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -20,6 +20,7 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition; import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; @@ -103,6 +104,9 @@ class ShardConsumerTest { @Mock private DistributionSummary testSummary; + @Mock + private StreamConfig streamConfig; + private StreamCheckpointer checkpointer; @@ -181,7 +185,7 @@ void test_run_shardConsumer_correctly() throws Exception { final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class) ) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer) + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) .shardIterator(shardIterator) .checkpointer(checkpointer) .tableInfo(tableInfo) @@ -215,7 +219,7 @@ void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception { final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class) ) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer) + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) .shardIterator(shardIterator) .checkpointer(checkpointer) .tableInfo(tableInfo) @@ -251,7 +255,7 @@ void test_run_shardConsumer_catches_5xx_exception_and_increments_metric() { try ( final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer) + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) .shardIterator(shardIterator) .checkpointer(checkpointer) .tableInfo(tableInfo) @@ -275,7 +279,7 @@ void test_run_shardConsumer_catches_4xx_exception_and_increments_metric() { try ( final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); - shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer) + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer, streamConfig) .shardIterator(shardIterator) .checkpointer(checkpointer) .tableInfo(tableInfo) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverter.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverter.java index bb53963e7e..31b24fec8f 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverter.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/dlq/FailedBulkOperationConverter.java @@ -61,6 +61,11 @@ public DlqObject convertToDlqObject(final FailedBulkOperation failedBulkOperatio private Object convertDocumentToGenericMap(final BulkOperationWrapper bulkOperation) { final SerializedJson document = (SerializedJson) bulkOperation.getDocument(); + + if (document == null) { + return ImmutableMap.of(); + } + final byte[] documentBytes = document.getSerializedJson(); final String jsonString = new String(documentBytes, StandardCharsets.UTF_8);