From e9b568586301706de370784dd67e2aab5512c9cc Mon Sep 17 00:00:00 2001 From: George Chen Date: Thu, 22 Feb 2024 14:22:35 -0600 Subject: [PATCH 1/3] ENH: use timer for latency Signed-off-by: George Chen --- .../model/sink/SinkLatencyMetrics.java | 13 +++++----- .../model/sink/SinkLatencyMetricsTest.java | 24 +++++++++---------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetrics.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetrics.java index 3a39c75b96..27cd954da4 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetrics.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetrics.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.model.sink; +import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.metrics.PluginMetrics; import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.model.event.EventHandle; @@ -15,19 +16,19 @@ public class SinkLatencyMetrics { public static final String INTERNAL_LATENCY = "PipelineLatency"; public static final String EXTERNAL_LATENCY = "EndToEndLatency"; - private final DistributionSummary internalLatencySummary; - private final DistributionSummary externalLatencySummary; + private final Timer internalLatencyTimer; + private final Timer externalLatencyTimer; public SinkLatencyMetrics(PluginMetrics pluginMetrics) { - internalLatencySummary = pluginMetrics.summary(INTERNAL_LATENCY); - externalLatencySummary = pluginMetrics.summary(EXTERNAL_LATENCY); + internalLatencyTimer = pluginMetrics.timer(INTERNAL_LATENCY); + externalLatencyTimer = pluginMetrics.timer(EXTERNAL_LATENCY); } public void update(final EventHandle eventHandle) { Instant now = Instant.now(); - internalLatencySummary.record(Duration.between(eventHandle.getInternalOriginationTime(), now).toMillis()); + internalLatencyTimer.record(Duration.between(eventHandle.getInternalOriginationTime(), now)); if (eventHandle.getExternalOriginationTime() == null) { return; } - externalLatencySummary.record(Duration.between(eventHandle.getExternalOriginationTime(), now).toMillis()); + externalLatencyTimer.record(Duration.between(eventHandle.getExternalOriginationTime(), now)); } } diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetricsTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetricsTest.java index 4cf5043cae..8e0dfd2212 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetricsTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetricsTest.java @@ -5,6 +5,7 @@ package org.opensearch.dataprepper.model.sink; +import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.EventHandle; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; @@ -20,14 +21,15 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import java.time.Instant; +import java.util.concurrent.TimeUnit; class SinkLatencyMetricsTest { private PluginMetrics pluginMetrics; private EventHandle eventHandle; private SinkLatencyMetrics latencyMetrics; - private DistributionSummary internalLatencySummary; - private DistributionSummary externalLatencySummary; + private Timer internalLatencyTimer; + private Timer externalLatencyTimer; public SinkLatencyMetrics createObjectUnderTest() { return new SinkLatencyMetrics(pluginMetrics); @@ -37,16 +39,14 @@ public SinkLatencyMetrics createObjectUnderTest() { void setup() { pluginMetrics = mock(PluginMetrics.class); SimpleMeterRegistry registry = new SimpleMeterRegistry(); - internalLatencySummary = DistributionSummary + internalLatencyTimer = Timer .builder("internalLatency") - .baseUnit("milliseconds") .register(registry); - externalLatencySummary = DistributionSummary + externalLatencyTimer = Timer .builder("externalLatency") - .baseUnit("milliseconds") .register(registry); - when(pluginMetrics.summary(SinkLatencyMetrics.INTERNAL_LATENCY)).thenReturn(internalLatencySummary); - when(pluginMetrics.summary(SinkLatencyMetrics.EXTERNAL_LATENCY)).thenReturn(externalLatencySummary); + when(pluginMetrics.timer(SinkLatencyMetrics.INTERNAL_LATENCY)).thenReturn(internalLatencyTimer); + when(pluginMetrics.timer(SinkLatencyMetrics.EXTERNAL_LATENCY)).thenReturn(externalLatencyTimer); eventHandle = mock(EventHandle.class); when(eventHandle.getInternalOriginationTime()).thenReturn(Instant.now()); latencyMetrics = createObjectUnderTest(); @@ -55,16 +55,16 @@ void setup() { @Test public void testInternalOriginationTime() { latencyMetrics.update(eventHandle); - assertThat(internalLatencySummary.count(), equalTo(1L)); + assertThat(internalLatencyTimer.count(), equalTo(1L)); } @Test public void testExternalOriginationTime() { when(eventHandle.getExternalOriginationTime()).thenReturn(Instant.now().minusMillis(10)); latencyMetrics.update(eventHandle); - assertThat(internalLatencySummary.count(), equalTo(1L)); - assertThat(externalLatencySummary.count(), equalTo(1L)); - assertThat(externalLatencySummary.max(), greaterThanOrEqualTo(10.0)); + assertThat(internalLatencyTimer.count(), equalTo(1L)); + assertThat(externalLatencyTimer.count(), equalTo(1L)); + assertThat(externalLatencyTimer.max(TimeUnit.MILLISECONDS), greaterThanOrEqualTo(10.0)); } } From 29d97f76ef3059b52816a41d4feb2d57af2d1b6c Mon Sep 17 00:00:00 2001 From: George Chen Date: Thu, 22 Feb 2024 16:55:16 -0600 Subject: [PATCH 2/3] MAINT: remove unused import Signed-off-by: George Chen --- .../opensearch/dataprepper/model/sink/SinkLatencyMetrics.java | 1 - .../dataprepper/model/sink/SinkLatencyMetricsTest.java | 1 - 2 files changed, 2 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetrics.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetrics.java index 27cd954da4..9961b577ca 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetrics.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetrics.java @@ -7,7 +7,6 @@ import io.micrometer.core.instrument.Timer; import org.opensearch.dataprepper.metrics.PluginMetrics; -import io.micrometer.core.instrument.DistributionSummary; import org.opensearch.dataprepper.model.event.EventHandle; import java.time.Duration; diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetricsTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetricsTest.java index 8e0dfd2212..33d5dd9238 100644 --- a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetricsTest.java +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/sink/SinkLatencyMetricsTest.java @@ -9,7 +9,6 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.event.EventHandle; import io.micrometer.core.instrument.simple.SimpleMeterRegistry; -import io.micrometer.core.instrument.DistributionSummary; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; From 78696a932e26a8147cc672ed56645a33cfdb465e Mon Sep 17 00:00:00 2001 From: George Chen Date: Thu, 22 Feb 2024 17:14:13 -0600 Subject: [PATCH 3/3] MAINT: fix test case in opensearch sink Signed-off-by: George Chen --- .../plugins/sink/opensearch/OpenSearchSinkTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 8789929823..05996e936e 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -158,8 +158,8 @@ void setup() { when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(mock(Counter.class)); when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(mock(Timer.class)); - when(pluginMetrics.summary(INTERNAL_LATENCY)).thenReturn(mock(DistributionSummary.class)); - when(pluginMetrics.summary(EXTERNAL_LATENCY)).thenReturn(mock(DistributionSummary.class)); + when(pluginMetrics.timer(INTERNAL_LATENCY)).thenReturn(mock(Timer.class)); + when(pluginMetrics.timer(EXTERNAL_LATENCY)).thenReturn(mock(Timer.class)); when(pluginMetrics.timer(BULKREQUEST_LATENCY)).thenReturn(bulkRequestTimer); when(pluginMetrics.counter(BULKREQUEST_ERRORS)).thenReturn(bulkRequestErrorsCounter); when(pluginMetrics.counter(INVALID_ACTION_ERRORS)).thenReturn(invalidActionErrorsCounter);