From 70a3c11b7acac69e5e2196922aff3b5d6b2165db Mon Sep 17 00:00:00 2001 From: Taylor Gray Date: Fri, 5 Jan 2024 11:57:06 -0600 Subject: [PATCH] Add 4xx aggregate metric and shard progress metric for dynamodb source (#3913) Signed-off-by: Taylor Gray (cherry picked from commit e6df3eb2cd46ebd13dd1c7b808d288c2f3d6ee51) --- .../dynamodb/export/ExportTaskManager.java | 2 + .../source/dynamodb/leader/ShardManager.java | 5 +++ .../source/dynamodb/stream/ShardConsumer.java | 10 +++++ .../dynamodb/stream/ShardConsumerFactory.java | 1 + .../utils/DynamoDBSourceAggregateMetrics.java | 11 ++++++ .../dynamodb/leader/ShardManagerTest.java | 17 +++++++++ .../stream/ShardConsumerFactoryTest.java | 19 ++++++++++ .../dynamodb/stream/ShardConsumerTest.java | 38 ++++++++++++++++++- 8 files changed, 102 insertions(+), 1 deletion(-) diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportTaskManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportTaskManager.java index 2ecd2119c9..03289e912c 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportTaskManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/export/ExportTaskManager.java @@ -63,6 +63,7 @@ public String submitExportJob(String tableArn, String bucket, String prefix, Str return null; } catch (SdkException e) { LOG.error("Failed to submit an export job with error " + e.getMessage()); + dynamoAggregateMetrics.getExport4xxErrors().increment(); return null; } @@ -82,6 +83,7 @@ public String getExportManifest(String exportArn) { LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage()); } catch (SdkException e) { LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage()); + dynamoAggregateMetrics.getExport4xxErrors().increment(); } return manifestKey; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManager.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManager.java index 0b29a7b1a0..7d4113f622 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManager.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManager.java @@ -3,6 +3,7 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import software.amazon.awssdk.core.exception.SdkException; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; @@ -176,6 +177,10 @@ private List listShards(String streamArn, String lastEvaluatedShardId) { LOG.error("Received an internal server exception from DynamoDB while listing shards: {}", e.getMessage()); dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment(); return shards; + } catch (final SdkException e) { + LOG.error("Received an exception from DynamoDB while listing shards: {}", e.getMessage()); + dynamoDBSourceAggregateMetrics.getStream4xxErrors().increment(); + return shards; } long endTime = System.currentTimeMillis(); diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java index 0dda2a4f02..9497a87479 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumer.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.plugins.source.dynamodb.stream; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.buffer.common.BufferAccumulator; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; @@ -81,6 +82,7 @@ public class ShardConsumer implements Runnable { static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60); static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000; + static final String SHARD_PROGRESS = "shardProgress"; private final DynamoDbStreamsClient dynamoDbStreamsClient; @@ -105,9 +107,12 @@ public class ShardConsumer implements Runnable { private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics; + private final Counter shardProgress; + private long recordsWrittenToBuffer; private ShardConsumer(Builder builder) { + this.shardProgress = builder.pluginMetrics.counter(SHARD_PROGRESS); this.dynamoDbStreamsClient = builder.dynamoDbStreamsClient; this.checkpointer = builder.checkpointer; this.shardIterator = builder.shardIterator; @@ -226,6 +231,7 @@ public void run() { LOG.debug("Shard Consumer start to run..."); // Check should skip processing or not. if (shouldSkip()) { + shardProgress.increment(); if (acknowledgementSet != null) { checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout); acknowledgementSet.complete(); @@ -272,12 +278,14 @@ public void run() { .filter(record -> record.dynamodb().approximateCreationDateTime().isAfter(startTime)) .collect(Collectors.toList()); recordConverter.writeToBuffer(acknowledgementSet, records); + shardProgress.increment(); recordsWrittenToBuffer += records.size(); long delay = System.currentTimeMillis() - lastEventTime.toEpochMilli(); interval = delay > GET_RECORD_DELAY_THRESHOLD_MILLS ? MINIMUM_GET_RECORD_INTERVAL_MILLS : GET_RECORD_INTERVAL_MILLS; } else { interval = GET_RECORD_INTERVAL_MILLS; + shardProgress.increment(); } try { @@ -324,6 +332,7 @@ private GetRecordsResponse callGetRecords(String shardIterator) { dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment(); throw new RuntimeException(ex.getMessage()); } catch (final Exception e) { + dynamoDBSourceAggregateMetrics.getStream4xxErrors().increment(); throw new RuntimeException(e.getMessage()); } @@ -335,6 +344,7 @@ private void waitForExport() { while (!checkpointer.isExportDone()) { LOG.debug("Export is in progress, wait..."); try { + shardProgress.increment(); Thread.sleep(DEFAULT_WAIT_FOR_EXPORT_INTERVAL_MILLS); // The wait for export may take a long time // Need to extend the timeout of the ownership in the coordination store. diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java index c26e5143a0..4ca28ef8cc 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactory.java @@ -162,6 +162,7 @@ public String getShardIterator(String streamArn, String shardId, String sequence LOG.error("Received an internal server error from DynamoDB while getting a shard iterator: {}", e.getMessage()); return null; } catch (SdkException e) { + dynamoDBSourceAggregateMetrics.getStream4xxErrors().increment(); LOG.error("Exception when trying to get the shard iterator due to {}", e.getMessage()); return null; } diff --git a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/DynamoDBSourceAggregateMetrics.java b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/DynamoDBSourceAggregateMetrics.java index b63b1950bf..146deb1882 100644 --- a/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/DynamoDBSourceAggregateMetrics.java +++ b/data-prepper-plugins/dynamodb-source/src/main/java/org/opensearch/dataprepper/plugins/source/dynamodb/utils/DynamoDBSourceAggregateMetrics.java @@ -13,23 +13,30 @@ public class DynamoDBSourceAggregateMetrics { private static final String DYNAMO_DB = "dynamodb"; private static final String DDB_STREAM_5XX_EXCEPTIONS = "stream5xxErrors"; + private static final String DDB_STREAM_4XX_EXCEPTIONS = "stream4xxErrors"; private static final String DDB_STREAM_API_INVOCATIONS = "streamApiInvocations"; private static final String DDB_EXPORT_5XX_ERRORS = "export5xxErrors"; + private static final String DDB_EXPORT_4XX_ERRORS = "export4xxErrors"; private static final String DDB_EXPORT_API_INVOCATIONS = "exportApiInvocations"; + private final PluginMetrics pluginMetrics; private final Counter stream5xxErrors; + private final Counter stream4xxErrors; private final Counter streamApiInvocations; private final Counter export5xxErrors; + private final Counter export4xxErrors; private final Counter exportApiInvocations; public DynamoDBSourceAggregateMetrics() { this.pluginMetrics = PluginMetrics.fromPrefix(DYNAMO_DB); this.stream5xxErrors = pluginMetrics.counter(DDB_STREAM_5XX_EXCEPTIONS); + this.stream4xxErrors = pluginMetrics.counter(DDB_STREAM_4XX_EXCEPTIONS); this.streamApiInvocations = pluginMetrics.counter(DDB_STREAM_API_INVOCATIONS); this.export5xxErrors = pluginMetrics.counter(DDB_EXPORT_5XX_ERRORS); + this.export4xxErrors = pluginMetrics.counter(DDB_EXPORT_4XX_ERRORS); this.exportApiInvocations = pluginMetrics.counter(DDB_EXPORT_API_INVOCATIONS); } @@ -37,11 +44,15 @@ public Counter getStream5xxErrors() { return stream5xxErrors; } + public Counter getStream4xxErrors() { return stream4xxErrors; } + public Counter getStreamApiInvocations() { return streamApiInvocations; } public Counter getExport5xxErrors() { return export5xxErrors; } + public Counter getExport4xxErrors() { return export4xxErrors; } + public Counter getExportApiInvocations() { return exportApiInvocations; } } diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManagerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManagerTest.java index 24fcad623e..fd21bc7ec1 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManagerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/leader/ShardManagerTest.java @@ -14,6 +14,7 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest; import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange; import software.amazon.awssdk.services.dynamodb.model.Shard; @@ -45,6 +46,9 @@ class ShardManagerTest { @Mock private Counter stream5xxErrors; + @Mock + private Counter stream4xxErrors; + @Mock private Counter streamApiInvocations; @@ -132,4 +136,17 @@ void stream5xxError_is_incremented_when_describe_stream_throws_internal_error() verify(streamApiInvocations).increment(); } + @Test + void stream4xxErrors_is_incremented_when_describe_stream_throws_DynamoDBException() { + when(dynamoDbStreamsClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(DynamoDbException.class); + when(dynamoDBSourceAggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors); + + final List shards = shardManager.runDiscovery(streamArn); + assertThat(shards, notNullValue()); + assertThat(shards.isEmpty(), equalTo(true)); + + verify(stream4xxErrors).increment(); + verify(streamApiInvocations).increment(); + } + } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java index 00331799f0..13f18977d6 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerFactoryTest.java @@ -21,6 +21,7 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState; import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest; import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse; import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; @@ -155,4 +156,22 @@ void stream5xxErrors_is_incremented_when_get_shard_iterator_throws_internal_exce verify(streamApiInvocations).increment(); } + @Test + void stream4xxErrors_is_incremented_when_get_shard_iterator_throws_dynamodb_exception() { + StreamProgressState state = new StreamProgressState(); + state.setWaitForExport(false); + state.setStartTime(Instant.now().toEpochMilli()); + streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state)); + + when(dynamoDbStreamsClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(DynamoDbException.class); + final Counter stream4xxErrors = mock(Counter.class); + when(dynamoDBSourceAggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors); + + ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer); + Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null); + assertThat(consumer, nullValue()); + verify(stream4xxErrors).increment(); + verify(streamApiInvocations).increment(); + } + } \ No newline at end of file diff --git a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java index 34af1cf978..f2192fef8f 100644 --- a/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java +++ b/data-prepper-plugins/dynamodb-source/src/test/java/org/opensearch/dataprepper/plugins/source/dynamodb/stream/ShardConsumerTest.java @@ -27,6 +27,7 @@ import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata; import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics; import software.amazon.awssdk.services.dynamodb.model.AttributeValue; +import software.amazon.awssdk.services.dynamodb.model.DynamoDbException; import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest; import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse; import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException; @@ -57,6 +58,7 @@ import static org.mockito.Mockito.when; import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.BUFFER_TIMEOUT; import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.DEFAULT_BUFFER_BATCH_SIZE; +import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.SHARD_PROGRESS; import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamCheckpointer.CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE; @ExtendWith(MockitoExtension.class) @@ -77,9 +79,15 @@ class ShardConsumerTest { @Mock private Counter stream5xxErrors; + @Mock + private Counter stream4xxErrors; + @Mock private Counter streamApiInvocations; + @Mock + private Counter shardProgress; + @Mock private Buffer> buffer; @@ -157,7 +165,9 @@ void setup() throws Exception { .build(); lenient().when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenReturn(response); - given(pluginMetrics.counter(anyString())).willReturn(testCounter); + given(pluginMetrics.counter(SHARD_PROGRESS)).willReturn(shardProgress); + given(pluginMetrics.counter("changeEventsProcessed")).willReturn(testCounter); + given(pluginMetrics.counter("changeEventsProcessingErrors")).willReturn(testCounter); given(pluginMetrics.summary(anyString())).willReturn(testSummary); when(aggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations); @@ -192,6 +202,7 @@ void test_run_shardConsumer_correctly() throws Exception { verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE)); verify(streamApiInvocations).increment(); + verify(shardProgress).increment(); } @Test @@ -230,6 +241,7 @@ void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception { verify(acknowledgementSet).complete(); verify(streamApiInvocations).increment(); + verify(shardProgress).increment(); } @Test @@ -256,6 +268,30 @@ void test_run_shardConsumer_catches_5xx_exception_and_increments_metric() { verify(streamApiInvocations).increment(); } + @Test + void test_run_shardConsumer_catches_4xx_exception_and_increments_metric() { + ShardConsumer shardConsumer; + when(aggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors); + try ( + final MockedStatic bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) { + bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator); + shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer) + .shardIterator(shardIterator) + .checkpointer(checkpointer) + .tableInfo(tableInfo) + .startTime(null) + .waitForExport(false) + .build(); + } + + when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenThrow(DynamoDbException.class); + + assertThrows(RuntimeException.class, shardConsumer::run); + + verify(stream4xxErrors).increment(); + verify(streamApiInvocations).increment(); + } + /** * Helper function to generate some data. */