From e3a5701974519c6cf0d9069f3d6a3d9d50fe5c32 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 15 Apr 2024 21:39:23 +0000 Subject: [PATCH 1/2] Fix acknowledgements in DynamoDB Signed-off-by: Krishna Kondaka --- .../plugins/source/dynamodb/DynamoDBSource.java | 7 +++++++ .../plugins/source/dynamodb/export/DataFileLoader.java | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java index 08226eacfb..700be39da8 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSource.java @@ -46,6 +46,7 @@ public class DynamoDBSource implements Source>, UsesEnhancedSource private DynamoDBService dynamoDBService; + private final boolean acknowledgementsEnabled; @DataPrepperPluginConstructor public DynamoDBSource(final PluginMetrics pluginMetrics, @@ -58,10 +59,16 @@ public DynamoDBSource(final PluginMetrics pluginMetrics, this.sourceConfig = sourceConfig; this.pluginFactory = pluginFactory; this.acknowledgementSetManager = acknowledgementSetManager; + this.acknowledgementsEnabled = sourceConfig.isAcknowledgmentsEnabled(); clientFactory = new ClientFactory(awsCredentialsSupplier, sourceConfig.getAwsAuthenticationConfig(), sourceConfig.getTableConfigs().get(0).getExportConfig()); } + @Override + public boolean areAcknowledgementsEnabled() { + return acknowledgementsEnabled; + } + @Override public void start(Buffer> buffer) { Objects.requireNonNull(coordinator); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java index 650f5f33f4..3af1fc1fc7 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/DataFileLoader.java @@ -216,10 +216,10 @@ public void run() { recordConverter.writeToBuffer(acknowledgementSet, lines); checkpointer.checkpoint(lineCount); } + LOG.info("Completed loading {} lines from s3://{}/{} to buffer", lines.size(), bucketName, key); lines.clear(); - LOG.info("Completed loading s3://{}/{} to buffer", bucketName, key); if (acknowledgementSet != null) { checkpointer.updateDatafileForAcknowledgmentWait(dataFileAcknowledgmentTimeout); From 99ca0e8c03d6cdb36b93d48800b8b43109f2a2d9 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Mon, 15 Apr 2024 21:41:42 +0000 Subject: [PATCH 2/2] Added tests Signed-off-by: Krishna Kondaka --- .../source/dynamodb/DynamoDBSourceTest.java | 97 +++++++++++++++++++ 1 file changed, 97 insertions(+) create mode 100644 data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceTest.java diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceTest.java new file mode 100644 index 0000000000..a8a5a2d6fa --- /dev/null +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/DynamoDBSourceTest.java @@ -0,0 +1,97 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.source.dynamodb; + +import software.amazon.awssdk.regions.Region; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.AwsAuthenticationConfig; +import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.TableConfig; +import org.opensearch.dataprepper.plugins.source.dynamodb.configuration.ExportConfig; + +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import org.mockito.Mock; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.Map; +import java.util.UUID; + +class DynamoDBSourceTest { + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private PluginFactory pluginFactory; + + @Mock + private AcknowledgementSetManager acknowledgementSetManager; + + @Mock + private AwsCredentialsSupplier awsCredentialsSupplier; + + @Mock + private DynamoDBSourceConfig dynamoDBSourceConfig; + + @Mock + private AwsAuthenticationConfig awsAuthenticationConfig; + + @Mock + private TableConfig tableConfig; + + @Mock + private ExportConfig exportConfig; + + private DynamoDBSource source; + + @BeforeEach + void setup() { + pluginMetrics = mock(PluginMetrics.class); + pluginFactory = mock(PluginFactory.class); + dynamoDBSourceConfig = mock(DynamoDBSourceConfig.class); + awsCredentialsSupplier = mock(AwsCredentialsSupplier.class); + acknowledgementSetManager = mock(AcknowledgementSetManager.class); + awsAuthenticationConfig = mock(AwsAuthenticationConfig.class); + when(awsAuthenticationConfig.getAwsRegion()).thenReturn(Region.of("us-west-2")); + when(awsAuthenticationConfig.getAwsStsRoleArn()).thenReturn(UUID.randomUUID().toString()); + when(awsAuthenticationConfig.getAwsStsExternalId()).thenReturn(UUID.randomUUID().toString()); + final Map stsHeaderOverrides = Map.of(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + when(awsAuthenticationConfig.getAwsStsHeaderOverrides()).thenReturn(stsHeaderOverrides); + when(dynamoDBSourceConfig.getAwsAuthenticationConfig()).thenReturn(awsAuthenticationConfig); + tableConfig = mock(TableConfig.class); + exportConfig = mock(ExportConfig.class); + when(tableConfig.getExportConfig()).thenReturn(exportConfig); + when(dynamoDBSourceConfig.getTableConfigs()).thenReturn(List.of(tableConfig)); + } + + public DynamoDBSource createObjectUnderTest() { + return new DynamoDBSource(pluginMetrics, dynamoDBSourceConfig, pluginFactory, awsCredentialsSupplier, acknowledgementSetManager); + } + + @Test + public void test_without_acknowledgements() { + when(dynamoDBSourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + source = createObjectUnderTest(); + assertThat(source.areAcknowledgementsEnabled(), equalTo(false)); + } + + @Test + public void test_with_acknowledgements() { + when(dynamoDBSourceConfig.isAcknowledgmentsEnabled()).thenReturn(true); + source = createObjectUnderTest(); + assertThat(source.areAcknowledgementsEnabled(), equalTo(true)); + + } + +} +