Skip to content

Commit

Permalink
ENH: use timer for latency (#4174)
Browse files Browse the repository at this point in the history
ENH: use timer for latency

Signed-off-by: George Chen <[email protected]>
  • Loading branch information
chenqi0805 authored Mar 6, 2024
1 parent 6f8203a commit 8c5bb54
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

package org.opensearch.dataprepper.model.sink;

import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import io.micrometer.core.instrument.DistributionSummary;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.time.Duration;
Expand All @@ -15,19 +15,19 @@
public class SinkLatencyMetrics {
public static final String INTERNAL_LATENCY = "PipelineLatency";
public static final String EXTERNAL_LATENCY = "EndToEndLatency";
private final DistributionSummary internalLatencySummary;
private final DistributionSummary externalLatencySummary;
private final Timer internalLatencyTimer;
private final Timer externalLatencyTimer;

public SinkLatencyMetrics(PluginMetrics pluginMetrics) {
internalLatencySummary = pluginMetrics.summary(INTERNAL_LATENCY);
externalLatencySummary = pluginMetrics.summary(EXTERNAL_LATENCY);
internalLatencyTimer = pluginMetrics.timer(INTERNAL_LATENCY);
externalLatencyTimer = pluginMetrics.timer(EXTERNAL_LATENCY);
}
public void update(final EventHandle eventHandle) {
Instant now = Instant.now();
internalLatencySummary.record(Duration.between(eventHandle.getInternalOriginationTime(), now).toMillis());
internalLatencyTimer.record(Duration.between(eventHandle.getInternalOriginationTime(), now));
if (eventHandle.getExternalOriginationTime() == null) {
return;
}
externalLatencySummary.record(Duration.between(eventHandle.getExternalOriginationTime(), now).toMillis());
externalLatencyTimer.record(Duration.between(eventHandle.getExternalOriginationTime(), now));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@

package org.opensearch.dataprepper.model.sink;

import io.micrometer.core.instrument.Timer;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.EventHandle;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
import io.micrometer.core.instrument.DistributionSummary;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -20,14 +20,15 @@
import static org.hamcrest.Matchers.greaterThanOrEqualTo;

import java.time.Instant;
import java.util.concurrent.TimeUnit;

class SinkLatencyMetricsTest {

private PluginMetrics pluginMetrics;
private EventHandle eventHandle;
private SinkLatencyMetrics latencyMetrics;
private DistributionSummary internalLatencySummary;
private DistributionSummary externalLatencySummary;
private Timer internalLatencyTimer;
private Timer externalLatencyTimer;

public SinkLatencyMetrics createObjectUnderTest() {
return new SinkLatencyMetrics(pluginMetrics);
Expand All @@ -37,16 +38,14 @@ public SinkLatencyMetrics createObjectUnderTest() {
void setup() {
pluginMetrics = mock(PluginMetrics.class);
SimpleMeterRegistry registry = new SimpleMeterRegistry();
internalLatencySummary = DistributionSummary
internalLatencyTimer = Timer
.builder("internalLatency")
.baseUnit("milliseconds")
.register(registry);
externalLatencySummary = DistributionSummary
externalLatencyTimer = Timer
.builder("externalLatency")
.baseUnit("milliseconds")
.register(registry);
when(pluginMetrics.summary(SinkLatencyMetrics.INTERNAL_LATENCY)).thenReturn(internalLatencySummary);
when(pluginMetrics.summary(SinkLatencyMetrics.EXTERNAL_LATENCY)).thenReturn(externalLatencySummary);
when(pluginMetrics.timer(SinkLatencyMetrics.INTERNAL_LATENCY)).thenReturn(internalLatencyTimer);
when(pluginMetrics.timer(SinkLatencyMetrics.EXTERNAL_LATENCY)).thenReturn(externalLatencyTimer);
eventHandle = mock(EventHandle.class);
when(eventHandle.getInternalOriginationTime()).thenReturn(Instant.now());
latencyMetrics = createObjectUnderTest();
Expand All @@ -55,16 +54,16 @@ void setup() {
@Test
public void testInternalOriginationTime() {
latencyMetrics.update(eventHandle);
assertThat(internalLatencySummary.count(), equalTo(1L));
assertThat(internalLatencyTimer.count(), equalTo(1L));
}

@Test
public void testExternalOriginationTime() {
when(eventHandle.getExternalOriginationTime()).thenReturn(Instant.now().minusMillis(10));
latencyMetrics.update(eventHandle);
assertThat(internalLatencySummary.count(), equalTo(1L));
assertThat(externalLatencySummary.count(), equalTo(1L));
assertThat(externalLatencySummary.max(), greaterThanOrEqualTo(10.0));
assertThat(internalLatencyTimer.count(), equalTo(1L));
assertThat(externalLatencyTimer.count(), equalTo(1L));
assertThat(externalLatencyTimer.max(TimeUnit.MILLISECONDS), greaterThanOrEqualTo(10.0));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ void setup() {

when(pluginMetrics.counter(MetricNames.RECORDS_IN)).thenReturn(mock(Counter.class));
when(pluginMetrics.timer(MetricNames.TIME_ELAPSED)).thenReturn(mock(Timer.class));
when(pluginMetrics.summary(INTERNAL_LATENCY)).thenReturn(mock(DistributionSummary.class));
when(pluginMetrics.summary(EXTERNAL_LATENCY)).thenReturn(mock(DistributionSummary.class));
when(pluginMetrics.timer(INTERNAL_LATENCY)).thenReturn(mock(Timer.class));
when(pluginMetrics.timer(EXTERNAL_LATENCY)).thenReturn(mock(Timer.class));
when(pluginMetrics.timer(BULKREQUEST_LATENCY)).thenReturn(bulkRequestTimer);
when(pluginMetrics.counter(BULKREQUEST_ERRORS)).thenReturn(bulkRequestErrorsCounter);
when(pluginMetrics.counter(INVALID_ACTION_ERRORS)).thenReturn(invalidActionErrorsCounter);
Expand Down

0 comments on commit 8c5bb54

Please sign in to comment.