Skip to content

Commit

Permalink
Add a new metric for OpenSearch Sink plugin: bulkRequestSizeBytes (#572)
Browse files Browse the repository at this point in the history
* Add a new metric for OpenSearch Sink plugin: bulkRequestSizeBytes
Signed-off-by: Han Jiang <[email protected]>

* Using summary as the instrumenting mechanism for collecting bulkRequestSizeBytes metrics
Signed-off-by: Han Jiang <[email protected]>
  • Loading branch information
jianghancn authored Nov 16, 2021
1 parent b1c42a2 commit 54f8ce7
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 0 deletions.
3 changes: 3 additions & 0 deletions data-prepper-plugins/opensearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,9 @@ Besides common metrics in [AbstractSink](https://github.com/opensearch-project/d
- `documentsSuccessFirstAttempt`: measures number of documents successfully sent to ES by bulk requests on first attempt.
- `documentErrors`: measures number of documents failed to be sent by bulk requests.

### Distribution Summary
- `bulkRequestSizeBytes`: measures the distribution of bulk request's payload sizes in bytes.

## Developer Guide

This plugin is compatible with Java 8. See
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory;
import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Timer;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.bulk.BulkRequest;
Expand Down Expand Up @@ -50,6 +51,7 @@
public class OpenSearchSink extends AbstractSink<Record<Object>> {
public static final String BULKREQUEST_LATENCY = "bulkRequestLatency";
public static final String BULKREQUEST_ERRORS = "bulkRequestErrors";
public static final String BULKREQUEST_SIZE_BYTES = "bulkRequestSizeBytes";

private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class);
// Pulled from BulkRequest to make estimation of bytes consistent
Expand All @@ -68,10 +70,13 @@ public class OpenSearchSink extends AbstractSink<Record<Object>> {

private final Timer bulkRequestTimer;
private final Counter bulkRequestErrorsCounter;
private final DistributionSummary bulkRequestSizeBytesSummary;

public OpenSearchSink(final PluginSetting pluginSetting) {
super(pluginSetting);
bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY);
bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS);
bulkRequestSizeBytesSummary = pluginMetrics.summary(BULKREQUEST_SIZE_BYTES);

this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting);
this.bulkSize = ByteSizeUnit.MB.toBytes(openSearchSinkConfig.getIndexConfiguration().getBulkSize());
Expand Down Expand Up @@ -165,6 +170,7 @@ private void flushBatch(final BulkRequest bulkRequest) {
bulkRequestTimer.record(() -> {
try {
bulkRetryStrategy.execute(bulkRequest);
bulkRequestSizeBytesSummary.record(bulkRequest.estimatedSizeInBytes());
} catch (final InterruptedException e) {
LOG.error("Unexpected Interrupt:", e);
bulkRequestErrorsCounter.increment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,17 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException
.add(BulkRetryStrategy.DOCUMENT_ERRORS).toString());
assertEquals(1, documentErrorsMeasurements.size());
assertEquals(0.0, documentErrorsMeasurements.get(0).getValue(), 0);

/**
* Metrics: Bulk Request Size in Bytes
*/
final List<Measurement> bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
assertEquals(3, bulkRequestSizeBytesMetrics.size());
assertEquals(1.0, bulkRequestSizeBytesMetrics.get(0).getValue(), 0);
assertEquals(2188.0, bulkRequestSizeBytesMetrics.get(1).getValue(), 0);
assertEquals(2188.0, bulkRequestSizeBytesMetrics.get(2).getValue(), 0);
}

public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException {
Expand Down Expand Up @@ -233,6 +244,18 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException
.add(BulkRetryStrategy.DOCUMENT_ERRORS).toString());
assertEquals(1, documentErrorsMeasurements.size());
assertEquals(1.0, documentErrorsMeasurements.get(0).getValue(), 0);

/**
* Metrics: Bulk Request Size in Bytes
*/
final List<Measurement> bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
assertEquals(3, bulkRequestSizeBytesMetrics.size());
assertEquals(1.0, bulkRequestSizeBytesMetrics.get(0).getValue(), 0);
assertEquals(2181.0, bulkRequestSizeBytesMetrics.get(1).getValue(), 0);
assertEquals(2181.0, bulkRequestSizeBytesMetrics.get(2).getValue(), 0);

}

public void testInstantiateSinkServiceMapDefault() throws IOException {
Expand Down Expand Up @@ -277,6 +300,17 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti
// COUNT
Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0);

/**
* Metrics: Bulk Request Size in Bytes
*/
final List<Measurement> bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList(
new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME)
.add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString());
assertEquals(3, bulkRequestSizeBytesMetrics.size());
assertEquals(1.0, bulkRequestSizeBytesMetrics.get(0).getValue(), 0);
assertEquals(309.0, bulkRequestSizeBytesMetrics.get(1).getValue(), 0);
assertEquals(309.0, bulkRequestSizeBytesMetrics.get(2).getValue(), 0);

// Check restart for index already exists
sink = new OpenSearchSink(pluginSetting);
sink.shutdown();
Expand Down

0 comments on commit 54f8ce7

Please sign in to comment.