diff --git a/data-prepper-plugins/opensearch/README.md b/data-prepper-plugins/opensearch/README.md index 14fad00846..244307e466 100644 --- a/data-prepper-plugins/opensearch/README.md +++ b/data-prepper-plugins/opensearch/README.md @@ -157,6 +157,9 @@ Besides common metrics in [AbstractSink](https://github.com/opensearch-project/d - `documentsSuccessFirstAttempt`: measures number of documents successfully sent to ES by bulk requests on first attempt. - `documentErrors`: measures number of documents failed to be sent by bulk requests. +### Distribution Summary +- `bulkRequestSizeBytes`: measures the distribution of bulk request's payload sizes in bytes. + ## Developer Guide This plugin is compatible with Java 8. See diff --git a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 869bc5bf27..518da0f7cc 100644 --- a/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -21,6 +21,7 @@ import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory; import com.amazon.dataprepper.plugins.sink.opensearch.index.IndexType; import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Timer; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.bulk.BulkRequest; @@ -50,6 +51,7 @@ public class OpenSearchSink extends AbstractSink> { public static final String BULKREQUEST_LATENCY = "bulkRequestLatency"; public static final String BULKREQUEST_ERRORS = "bulkRequestErrors"; + public static final String BULKREQUEST_SIZE_BYTES = "bulkRequestSizeBytes"; private static final Logger LOG = LoggerFactory.getLogger(OpenSearchSink.class); // Pulled from BulkRequest to make estimation of bytes consistent @@ -68,10 +70,13 @@ public class OpenSearchSink extends AbstractSink> { private final Timer bulkRequestTimer; private final Counter bulkRequestErrorsCounter; + private final DistributionSummary bulkRequestSizeBytesSummary; + public OpenSearchSink(final PluginSetting pluginSetting) { super(pluginSetting); bulkRequestTimer = pluginMetrics.timer(BULKREQUEST_LATENCY); bulkRequestErrorsCounter = pluginMetrics.counter(BULKREQUEST_ERRORS); + bulkRequestSizeBytesSummary = pluginMetrics.summary(BULKREQUEST_SIZE_BYTES); this.openSearchSinkConfig = OpenSearchSinkConfiguration.readESConfig(pluginSetting); this.bulkSize = ByteSizeUnit.MB.toBytes(openSearchSinkConfig.getIndexConfiguration().getBulkSize()); @@ -165,6 +170,7 @@ private void flushBatch(final BulkRequest bulkRequest) { bulkRequestTimer.record(() -> { try { bulkRetryStrategy.execute(bulkRequest); + bulkRequestSizeBytesSummary.record(bulkRequest.estimatedSizeInBytes()); } catch (final InterruptedException e) { LOG.error("Unexpected Interrupt:", e); bulkRequestErrorsCounter.increment(); diff --git a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 91ee237761..d5bacfa3a9 100644 --- a/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/test/java/com/amazon/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -191,6 +191,17 @@ public void testOutputRawSpanDefault() throws IOException, InterruptedException .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); assertEquals(1, documentErrorsMeasurements.size()); assertEquals(0.0, documentErrorsMeasurements.get(0).getValue(), 0); + + /** + * Metrics: Bulk Request Size in Bytes + */ + final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); + assertEquals(3, bulkRequestSizeBytesMetrics.size()); + assertEquals(1.0, bulkRequestSizeBytesMetrics.get(0).getValue(), 0); + assertEquals(2188.0, bulkRequestSizeBytesMetrics.get(1).getValue(), 0); + assertEquals(2188.0, bulkRequestSizeBytesMetrics.get(2).getValue(), 0); } public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException { @@ -233,6 +244,18 @@ public void testOutputRawSpanWithDLQ() throws IOException, InterruptedException .add(BulkRetryStrategy.DOCUMENT_ERRORS).toString()); assertEquals(1, documentErrorsMeasurements.size()); assertEquals(1.0, documentErrorsMeasurements.get(0).getValue(), 0); + + /** + * Metrics: Bulk Request Size in Bytes + */ + final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); + assertEquals(3, bulkRequestSizeBytesMetrics.size()); + assertEquals(1.0, bulkRequestSizeBytesMetrics.get(0).getValue(), 0); + assertEquals(2181.0, bulkRequestSizeBytesMetrics.get(1).getValue(), 0); + assertEquals(2181.0, bulkRequestSizeBytesMetrics.get(2).getValue(), 0); + } public void testInstantiateSinkServiceMapDefault() throws IOException { @@ -277,6 +300,17 @@ public void testOutputServiceMapDefault() throws IOException, InterruptedExcepti // COUNT Assert.assertEquals(1.0, bulkRequestLatencies.get(0).getValue(), 0); + /** + * Metrics: Bulk Request Size in Bytes + */ + final List bulkRequestSizeBytesMetrics = MetricsTestUtil.getMeasurementList( + new StringJoiner(MetricNames.DELIMITER).add(PIPELINE_NAME).add(PLUGIN_NAME) + .add(OpenSearchSink.BULKREQUEST_SIZE_BYTES).toString()); + assertEquals(3, bulkRequestSizeBytesMetrics.size()); + assertEquals(1.0, bulkRequestSizeBytesMetrics.get(0).getValue(), 0); + assertEquals(309.0, bulkRequestSizeBytesMetrics.get(1).getValue(), 0); + assertEquals(309.0, bulkRequestSizeBytesMetrics.get(2).getValue(), 0); + // Check restart for index already exists sink = new OpenSearchSink(pluginSetting); sink.shutdown();