Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.6] Add 4xx aggregate metric and shard progress metric for dynamodb source #3921

Merged
merged 1 commit into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ public String submitExportJob(String tableArn, String bucket, String prefix, Str
return null;
} catch (SdkException e) {
LOG.error("Failed to submit an export job with error " + e.getMessage());
dynamoAggregateMetrics.getExport4xxErrors().increment();
return null;
}

Expand All @@ -82,6 +83,7 @@ public String getExportManifest(String exportArn) {
LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage());
} catch (SdkException e) {
LOG.error("Unable to get manifest file for export {}: {}", exportArn, e.getMessage());
dynamoAggregateMetrics.getExport4xxErrors().increment();
}
return manifestKey;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
Expand Down Expand Up @@ -176,6 +177,10 @@ private List<Shard> listShards(String streamArn, String lastEvaluatedShardId) {
LOG.error("Received an internal server exception from DynamoDB while listing shards: {}", e.getMessage());
dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment();
return shards;
} catch (final SdkException e) {
LOG.error("Received an exception from DynamoDB while listing shards: {}", e.getMessage());
dynamoDBSourceAggregateMetrics.getStream4xxErrors().increment();
return shards;
}

long endTime = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.dataprepper.plugins.source.dynamodb.stream;

import io.micrometer.core.instrument.Counter;
import org.opensearch.dataprepper.buffer.common.BufferAccumulator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet;
Expand Down Expand Up @@ -81,6 +82,7 @@ public class ShardConsumer implements Runnable {

static final Duration BUFFER_TIMEOUT = Duration.ofSeconds(60);
static final int DEFAULT_BUFFER_BATCH_SIZE = 1_000;
static final String SHARD_PROGRESS = "shardProgress";


private final DynamoDbStreamsClient dynamoDbStreamsClient;
Expand All @@ -105,9 +107,12 @@ public class ShardConsumer implements Runnable {

private final DynamoDBSourceAggregateMetrics dynamoDBSourceAggregateMetrics;

private final Counter shardProgress;

private long recordsWrittenToBuffer;

private ShardConsumer(Builder builder) {
this.shardProgress = builder.pluginMetrics.counter(SHARD_PROGRESS);
this.dynamoDbStreamsClient = builder.dynamoDbStreamsClient;
this.checkpointer = builder.checkpointer;
this.shardIterator = builder.shardIterator;
Expand Down Expand Up @@ -226,6 +231,7 @@ public void run() {
LOG.debug("Shard Consumer start to run...");
// Check should skip processing or not.
if (shouldSkip()) {
shardProgress.increment();
if (acknowledgementSet != null) {
checkpointer.updateShardForAcknowledgmentWait(shardAcknowledgmentTimeout);
acknowledgementSet.complete();
Expand Down Expand Up @@ -272,12 +278,14 @@ public void run() {
.filter(record -> record.dynamodb().approximateCreationDateTime().isAfter(startTime))
.collect(Collectors.toList());
recordConverter.writeToBuffer(acknowledgementSet, records);
shardProgress.increment();
recordsWrittenToBuffer += records.size();
long delay = System.currentTimeMillis() - lastEventTime.toEpochMilli();
interval = delay > GET_RECORD_DELAY_THRESHOLD_MILLS ? MINIMUM_GET_RECORD_INTERVAL_MILLS : GET_RECORD_INTERVAL_MILLS;

} else {
interval = GET_RECORD_INTERVAL_MILLS;
shardProgress.increment();
}

try {
Expand Down Expand Up @@ -324,6 +332,7 @@ private GetRecordsResponse callGetRecords(String shardIterator) {
dynamoDBSourceAggregateMetrics.getStream5xxErrors().increment();
throw new RuntimeException(ex.getMessage());
} catch (final Exception e) {
dynamoDBSourceAggregateMetrics.getStream4xxErrors().increment();
throw new RuntimeException(e.getMessage());
}

Expand All @@ -335,6 +344,7 @@ private void waitForExport() {
while (!checkpointer.isExportDone()) {
LOG.debug("Export is in progress, wait...");
try {
shardProgress.increment();
Thread.sleep(DEFAULT_WAIT_FOR_EXPORT_INTERVAL_MILLS);
// The wait for export may take a long time
// Need to extend the timeout of the ownership in the coordination store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ public String getShardIterator(String streamArn, String shardId, String sequence
LOG.error("Received an internal server error from DynamoDB while getting a shard iterator: {}", e.getMessage());
return null;
} catch (SdkException e) {
dynamoDBSourceAggregateMetrics.getStream4xxErrors().increment();
LOG.error("Exception when trying to get the shard iterator due to {}", e.getMessage());
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,46 @@ public class DynamoDBSourceAggregateMetrics {
private static final String DYNAMO_DB = "dynamodb";

private static final String DDB_STREAM_5XX_EXCEPTIONS = "stream5xxErrors";
private static final String DDB_STREAM_4XX_EXCEPTIONS = "stream4xxErrors";
private static final String DDB_STREAM_API_INVOCATIONS = "streamApiInvocations";
private static final String DDB_EXPORT_5XX_ERRORS = "export5xxErrors";
private static final String DDB_EXPORT_4XX_ERRORS = "export4xxErrors";
private static final String DDB_EXPORT_API_INVOCATIONS = "exportApiInvocations";



private final PluginMetrics pluginMetrics;

private final Counter stream5xxErrors;
private final Counter stream4xxErrors;
private final Counter streamApiInvocations;
private final Counter export5xxErrors;
private final Counter export4xxErrors;
private final Counter exportApiInvocations;

public DynamoDBSourceAggregateMetrics() {
this.pluginMetrics = PluginMetrics.fromPrefix(DYNAMO_DB);
this.stream5xxErrors = pluginMetrics.counter(DDB_STREAM_5XX_EXCEPTIONS);
this.stream4xxErrors = pluginMetrics.counter(DDB_STREAM_4XX_EXCEPTIONS);
this.streamApiInvocations = pluginMetrics.counter(DDB_STREAM_API_INVOCATIONS);
this.export5xxErrors = pluginMetrics.counter(DDB_EXPORT_5XX_ERRORS);
this.export4xxErrors = pluginMetrics.counter(DDB_EXPORT_4XX_ERRORS);
this.exportApiInvocations = pluginMetrics.counter(DDB_EXPORT_API_INVOCATIONS);
}

public Counter getStream5xxErrors() {
return stream5xxErrors;
}

public Counter getStream4xxErrors() { return stream4xxErrors; }

public Counter getStreamApiInvocations() { return streamApiInvocations; }

public Counter getExport5xxErrors() {
return export5xxErrors;
}

public Counter getExport4xxErrors() { return export4xxErrors; }

public Counter getExportApiInvocations() { return exportApiInvocations; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
import software.amazon.awssdk.services.dynamodb.model.SequenceNumberRange;
import software.amazon.awssdk.services.dynamodb.model.Shard;
Expand Down Expand Up @@ -45,6 +46,9 @@ class ShardManagerTest {
@Mock
private Counter stream5xxErrors;

@Mock
private Counter stream4xxErrors;

@Mock
private Counter streamApiInvocations;

Expand Down Expand Up @@ -132,4 +136,17 @@ void stream5xxError_is_incremented_when_describe_stream_throws_internal_error()
verify(streamApiInvocations).increment();
}

@Test
void stream4xxErrors_is_incremented_when_describe_stream_throws_DynamoDBException() {
when(dynamoDbStreamsClient.describeStream(any(DescribeStreamRequest.class))).thenThrow(DynamoDbException.class);
when(dynamoDBSourceAggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors);

final List<Shard> shards = shardManager.runDiscovery(streamArn);
assertThat(shards, notNullValue());
assertThat(shards.isEmpty(), equalTo(true));

verify(stream4xxErrors).increment();
verify(streamApiInvocations).increment();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.coordination.state.StreamProgressState;
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
Expand Down Expand Up @@ -155,4 +156,22 @@ void stream5xxErrors_is_incremented_when_get_shard_iterator_throws_internal_exce
verify(streamApiInvocations).increment();
}

@Test
void stream4xxErrors_is_incremented_when_get_shard_iterator_throws_dynamodb_exception() {
StreamProgressState state = new StreamProgressState();
state.setWaitForExport(false);
state.setStartTime(Instant.now().toEpochMilli());
streamPartition = new StreamPartition(streamArn, shardId, Optional.of(state));

when(dynamoDbStreamsClient.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(DynamoDbException.class);
final Counter stream4xxErrors = mock(Counter.class);
when(dynamoDBSourceAggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors);

ShardConsumerFactory consumerFactory = new ShardConsumerFactory(coordinator, dynamoDbStreamsClient, pluginMetrics, dynamoDBSourceAggregateMetrics, buffer);
Runnable consumer = consumerFactory.createConsumer(streamPartition, null, null);
assertThat(consumer, nullValue());
verify(stream4xxErrors).increment();
verify(streamApiInvocations).increment();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.opensearch.dataprepper.plugins.source.dynamodb.model.TableMetadata;
import org.opensearch.dataprepper.plugins.source.dynamodb.utils.DynamoDBSourceAggregateMetrics;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.InternalServerErrorException;
Expand Down Expand Up @@ -57,6 +58,7 @@
import static org.mockito.Mockito.when;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.BUFFER_TIMEOUT;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.DEFAULT_BUFFER_BATCH_SIZE;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.ShardConsumer.SHARD_PROGRESS;
import static org.opensearch.dataprepper.plugins.source.dynamodb.stream.StreamCheckpointer.CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE;

@ExtendWith(MockitoExtension.class)
Expand All @@ -77,9 +79,15 @@ class ShardConsumerTest {
@Mock
private Counter stream5xxErrors;

@Mock
private Counter stream4xxErrors;

@Mock
private Counter streamApiInvocations;

@Mock
private Counter shardProgress;

@Mock
private Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer;

Expand Down Expand Up @@ -157,7 +165,9 @@ void setup() throws Exception {
.build();
lenient().when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenReturn(response);

given(pluginMetrics.counter(anyString())).willReturn(testCounter);
given(pluginMetrics.counter(SHARD_PROGRESS)).willReturn(shardProgress);
given(pluginMetrics.counter("changeEventsProcessed")).willReturn(testCounter);
given(pluginMetrics.counter("changeEventsProcessingErrors")).willReturn(testCounter);
given(pluginMetrics.summary(anyString())).willReturn(testSummary);

when(aggregateMetrics.getStreamApiInvocations()).thenReturn(streamApiInvocations);
Expand Down Expand Up @@ -192,6 +202,7 @@ void test_run_shardConsumer_correctly() throws Exception {
verify(coordinator).saveProgressStateForPartition(any(StreamPartition.class), eq(CHECKPOINT_OWNERSHIP_TIMEOUT_INCREASE));

verify(streamApiInvocations).increment();
verify(shardProgress).increment();
}

@Test
Expand Down Expand Up @@ -230,6 +241,7 @@ void test_run_shardConsumer_with_acknowledgments_correctly() throws Exception {
verify(acknowledgementSet).complete();

verify(streamApiInvocations).increment();
verify(shardProgress).increment();
}

@Test
Expand All @@ -256,6 +268,30 @@ void test_run_shardConsumer_catches_5xx_exception_and_increments_metric() {
verify(streamApiInvocations).increment();
}

@Test
void test_run_shardConsumer_catches_4xx_exception_and_increments_metric() {
ShardConsumer shardConsumer;
when(aggregateMetrics.getStream4xxErrors()).thenReturn(stream4xxErrors);
try (
final MockedStatic<BufferAccumulator> bufferAccumulatorMockedStatic = mockStatic(BufferAccumulator.class)) {
bufferAccumulatorMockedStatic.when(() -> BufferAccumulator.create(buffer, DEFAULT_BUFFER_BATCH_SIZE, BUFFER_TIMEOUT)).thenReturn(bufferAccumulator);
shardConsumer = ShardConsumer.builder(dynamoDbStreamsClient, pluginMetrics, aggregateMetrics, buffer)
.shardIterator(shardIterator)
.checkpointer(checkpointer)
.tableInfo(tableInfo)
.startTime(null)
.waitForExport(false)
.build();
}

when(dynamoDbStreamsClient.getRecords(any(GetRecordsRequest.class))).thenThrow(DynamoDbException.class);

assertThrows(RuntimeException.class, shardConsumer::run);

verify(stream4xxErrors).increment();
verify(streamApiInvocations).increment();
}

/**
* Helper function to generate some data.
*/
Expand Down
Loading