Skip to content

Commit

Permalink
Add support to use old ddb stream image for REMOVE events (#4275)
Browse files Browse the repository at this point in the history
Add suport to use old ddb stream image for REMOVE events

Signed-off-by: Taylor Gray <[email protected]>
  • Loading branch information
graytaylor0 authored Mar 19, 2024
1 parent 8b6059c commit d026f56
Show file tree
Hide file tree
Showing 11 changed files with 292 additions and 36 deletions.
4 changes: 0 additions & 4 deletions data-prepper-plugins/dynamodb-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,4 @@ dependencies {

testImplementation testLibs.mockito.inline
testImplementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
}

test {
useJUnitPlatform()
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void start(Buffer<Record<Event>> buffer) {
DataFileLoaderFactory loaderFactory = new DataFileLoaderFactory(coordinator, s3Client, pluginMetrics, buffer);
Runnable fileLoaderScheduler = new DataFileScheduler(coordinator, loaderFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig);

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer);
ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, dynamoDBSourceConfig.getTableConfigs().get(0).getStreamConfig());
Runnable streamScheduler = new StreamScheduler(coordinator, consumerFactory, pluginMetrics, acknowledgementSetManager, dynamoDBSourceConfig, new BackoffCalculator(dynamoDBSourceConfig.getTableConfigs().get(0).getExportConfig() != null));
// leader scheduler will handle the initialization
Runnable leaderScheduler = new LeaderScheduler(coordinator, dynamoDbClient, shardManager, tableConfigs);
Expand All @@ -109,8 +109,10 @@ public void start(Buffer<Record<Event>> buffer) {
executor.submit(leaderScheduler);
executor.submit(exportScheduler);
executor.submit(fileLoaderScheduler);
executor.submit(streamScheduler);

if (tableConfigs.get(0).getStreamConfig() != null) {
executor.submit(streamScheduler);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,22 @@
package org.opensearch.dataprepper.plugins.source.dynamodb.configuration;

import com.fasterxml.jackson.annotation.JsonProperty;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;

public class StreamConfig {

@JsonProperty(value = "start_position")
private StreamStartPosition startPosition = StreamStartPosition.LATEST;

@JsonProperty("view_on_remove")
private StreamViewType viewForRemoves = StreamViewType.NEW_IMAGE;

public StreamStartPosition getStartPosition() {
return startPosition;
}

public StreamViewType getStreamViewForRemoves() {
return viewForRemoves;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,15 @@
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.enhanced.dynamodb.document.EnhancedDocument;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.OperationType;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;

import java.time.Instant;
import java.util.HashMap;
Expand All @@ -40,6 +43,8 @@ public class StreamRecordConverter extends RecordConverter {
private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>() {
};

private final StreamConfig streamConfig;

private final PluginMetrics pluginMetrics;

private final Counter changeEventSuccessCounter;
Expand All @@ -50,13 +55,18 @@ public class StreamRecordConverter extends RecordConverter {
private Instant currentSecond;
private int recordsSeenThisSecond = 0;

public StreamRecordConverter(final BufferAccumulator<org.opensearch.dataprepper.model.record.Record<Event>> bufferAccumulator, TableInfo tableInfo, PluginMetrics pluginMetrics) {
public StreamRecordConverter(final BufferAccumulator<org.opensearch.dataprepper.model.record.Record<Event>> bufferAccumulator,
final TableInfo tableInfo,
final PluginMetrics pluginMetrics,
final StreamConfig streamConfig) {
super(bufferAccumulator, tableInfo);
this.pluginMetrics = pluginMetrics;
this.changeEventSuccessCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSED_COUNT);
this.changeEventErrorCounter = pluginMetrics.counter(CHANGE_EVENTS_PROCESSING_ERROR_COUNT);
this.bytesReceivedSummary = pluginMetrics.summary(BYTES_RECEIVED);
this.bytesProcessedSummary = pluginMetrics.summary(BYTES_PROCESSED);
this.streamConfig = streamConfig;

}

@Override
Expand All @@ -73,8 +83,10 @@ public void writeToBuffer(final AcknowledgementSet acknowledgementSet, List<Reco
Map<String, Object> data;
Map<String, Object> keys;
try {
final Map<String, AttributeValue> streamRecord = getStreamRecordFromImage(record);

// NewImage may be empty
data = convertData(record.dynamodb().newImage());
data = convertData(streamRecord);
// Always get keys from dynamodb().keys()
keys = convertKeys(record.dynamodb().keys());
} catch (final Exception e) {
Expand Down Expand Up @@ -150,4 +162,21 @@ private long calculateTieBreakingVersionFromTimestamp(final Instant eventTimeInS

return eventTimeInSeconds.getEpochSecond() * 1_000_000 + recordsSeenThisSecond;
}

private Map<String, AttributeValue> getStreamRecordFromImage(final Record record) {
if (!OperationType.REMOVE.equals(record.eventName())) {
return record.dynamodb().newImage();
}

if (StreamViewType.OLD_IMAGE.equals(streamConfig.getStreamViewForRemoves())) {
if (!record.dynamodb().hasOldImage()) {
LOG.warn("view_on_remove with OLD_IMAGE is enabled, but no old image can be found on the stream record, using NEW_IMAGE");
return record.dynamodb().newImage();
} else {
return record.dynamodb().oldImage();
}
}

return record.dynamodb().newImage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.converter.StreamRecordConverter;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableInfo;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
Expand Down Expand Up @@ -121,7 +122,7 @@ private ShardConsumer(Builder builder) {
this.startTime = builder.startTime == null ? Instant.MIN : builder.startTime.minus(STREAM_EVENT_OVERLAP_TIME);
this.waitForExport = builder.waitForExport;
final BufferAccumulator<Record<Event>> bufferAccumulator = BufferAccumulator.create(builder.buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT);
recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics);
recordConverter = new StreamRecordConverter(bufferAccumulator, builder.tableInfo, builder.pluginMetrics, builder.streamConfig);
this.acknowledgementSet = builder.acknowledgementSet;
this.shardAcknowledgmentTimeout = builder.dataFileAcknowledgmentTimeout;
this.shardId = builder.shardId;
Expand All @@ -132,8 +133,9 @@ private ShardConsumer(Builder builder) {
public static Builder builder(final DynamoDbStreamsClient dynamoDbStreamsClient,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics,
final Buffer<Record<Event>> buffer) {
return new Builder(dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer);
final Buffer<Record<Event>> buffer,
final StreamConfig streamConfig) {
return new Builder(dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig);
}


Expand Down Expand Up @@ -164,14 +166,18 @@ static class Builder {
private AcknowledgementSet acknowledgementSet;
private Duration dataFileAcknowledgmentTimeout;

private StreamConfig streamConfig;

public Builder(final DynamoDbStreamsClient dynamoDbStreamsClient,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final StreamConfig streamConfig) {
this.dynamoDbStreamsClient = dynamoDbStreamsClient;
this.pluginMetrics = pluginMetrics;
this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics;
this.buffer = buffer;
this.streamConfig = streamConfig;
}

public Builder tableInfo(TableInfo tableInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.partition.StreamPartition;
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
Expand Down Expand Up @@ -45,18 +46,21 @@ public class ShardConsumerFactory {
private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;
private final Buffer<Record<Event>> buffer;

private final StreamConfig streamConfig;


public ShardConsumerFactory(final EnhancedSourceCoordinator enhancedSourceCoordinator,
final DynamoDbStreamsClient streamsClient,
final PluginMetrics pluginMetrics,
final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics,
final Buffer<Record<Event>> buffer) {
final Buffer<Record<Event>> buffer,
final StreamConfig streamConfig) {
this.streamsClient = streamsClient;
this.enhancedSourceCoordinator = enhancedSourceCoordinator;
this.pluginMetrics = pluginMetrics;
this.dynamoDBSourceAggregateMetrics = dynamoDBSourceAggregateMetrics;
this.buffer = buffer;

this.streamConfig = streamConfig;
}

public Runnable createConsumer(final StreamPartition streamPartition,
Expand Down Expand Up @@ -97,7 +101,7 @@ public Runnable createConsumer(final StreamPartition streamPartition,

LOG.debug("Create shard consumer for {} with shardIter {}", streamPartition.getShardId(), shardIterator);
LOG.debug("Create shard consumer for {} with lastShardIter {}", streamPartition.getShardId(), lastShardIterator);
ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer)
ShardConsumer shardConsumer = ShardConsumer.builder(streamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer, streamConfig)
.tableInfo(tableInfo)
.checkpointer(checkpointer)
.shardIterator(shardIterator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,36 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager;
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.StreamConfig;
import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.s3.S3Client;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class DynamoDBServiceTest {
Expand All @@ -51,6 +63,9 @@ class DynamoDBServiceTest {
@Mock
private TableConfig tableConfig;

@Mock
private StreamConfig streamConfig;

@Mock
private PluginMetrics pluginMetrics;

Expand All @@ -60,6 +75,9 @@ class DynamoDBServiceTest {
@Mock
private AcknowledgementSetManager acknowledgementSetManager;

@Mock
private ExecutorService executorService;

private DynamoDBService dynamoDBService;

@BeforeEach
Expand All @@ -73,23 +91,54 @@ void setup() {
}

private DynamoDBService createObjectUnderTest() {
DynamoDBService objectUnderTest = new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics, acknowledgementSetManager);
return objectUnderTest;

try (final MockedStatic<Executors> executorsMockedStatic = mockStatic(Executors.class)) {
executorsMockedStatic.when(() -> Executors.newFixedThreadPool(eq(4))).thenReturn(executorService);

return new DynamoDBService(coordinator, clientFactory, sourceConfig, pluginMetrics, acknowledgementSetManager);
}
}

@Test
void test_normal_start() {
void test_normal_start_with_stream_config() {
when(tableConfig.getStreamConfig()).thenReturn(streamConfig);
dynamoDBService = createObjectUnderTest();
assertThat(dynamoDBService, notNullValue());

final ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);

dynamoDBService.start(buffer);

verify(executorService, times(4)).submit(runnableArgumentCaptor.capture());

assertThat(runnableArgumentCaptor.getAllValues(), notNullValue());
assertThat(runnableArgumentCaptor.getAllValues().size(), equalTo(4));
}

@Test
void test_normal_start_without_stream_config() {
when(tableConfig.getStreamConfig()).thenReturn(null);

dynamoDBService = createObjectUnderTest();
assertThat(dynamoDBService, notNullValue());

final ArgumentCaptor<Runnable> runnableArgumentCaptor = ArgumentCaptor.forClass(Runnable.class);

dynamoDBService.start(buffer);

verify(executorService, times(3)).submit(runnableArgumentCaptor.capture());

assertThat(runnableArgumentCaptor.getAllValues(), notNullValue());
assertThat(runnableArgumentCaptor.getAllValues().size(), equalTo(3));
}


@Test
void test_normal_shutdown() {
dynamoDBService = createObjectUnderTest();
assertThat(dynamoDBService, notNullValue());

when(executorService.shutdownNow()).thenReturn(Collections.emptyList());
dynamoDBService.shutdown();
}

Expand Down
Loading

0 comments on commit d026f56

Please sign in to comment.