Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add max measurements to Micrometer Timer & DistributionSummary #5303

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,35 @@

import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.baseUnit;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.description;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.statisticInstrumentName;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.tagsAsAttributes;

import io.micrometer.core.instrument.AbstractDistributionSummary;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Statistic;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.NoopHistogram;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import java.util.Collections;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;

final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
implements DistributionSummary, RemovableMeter {

private final Measurements measurements;
private final TimeWindowMax max;
// TODO: use bound instruments when they're available
private final DoubleHistogram otelHistogram;
private final Attributes attributes;
private final Measurements measurements;
private final AsyncMeasurementHandle maxHandle;

private volatile boolean removed = false;

Expand All @@ -38,22 +44,32 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
Clock clock,
DistributionStatisticConfig distributionStatisticConfig,
double scale,
Meter otelMeter) {
Meter otelMeter,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
super(id, clock, distributionStatisticConfig, scale, false);

if (isUsingMicrometerHistograms()) {
measurements = new MicrometerHistogramMeasurements();
} else {
measurements = NoopMeasurements.INSTANCE;
}
max = new TimeWindowMax(clock, distributionStatisticConfig);

this.attributes = tagsAsAttributes(id);
this.otelHistogram =
otelMeter
.histogramBuilder(id.getName())
.setDescription(description(id))
.setUnit(baseUnit(id))
.build();
this.attributes = tagsAsAttributes(id);

if (isUsingMicrometerHistograms()) {
measurements = new MicrometerHistogramMeasurements(clock, distributionStatisticConfig);
} else {
measurements = NoopMeasurements.INSTANCE;
}
this.maxHandle =
asyncInstrumentRegistry.buildGauge(
statisticInstrumentName(id, Statistic.MAX),
description(id),
baseUnit(id),
attributes,
max,
TimeWindowMax::poll);
}

boolean isUsingMicrometerHistograms() {
Expand All @@ -65,6 +81,7 @@ protected void recordNonNegative(double amount) {
if (amount >= 0 && !removed) {
otelHistogram.record(amount, attributes);
measurements.record(amount);
max.record(amount);
}
}

Expand All @@ -80,7 +97,7 @@ public double totalAmount() {

@Override
public double max() {
return measurements.max();
return max.poll();
}

@Override
Expand All @@ -92,6 +109,7 @@ public Iterable<Measurement> measure() {
@Override
public void onRemove() {
removed = true;
maxHandle.remove();
}

private interface Measurements {
Expand All @@ -100,8 +118,6 @@ private interface Measurements {
long count();

double totalAmount();

double max();
}

// if micrometer histograms are not being used then there's no need to keep any local state
Expand All @@ -123,32 +139,19 @@ public double totalAmount() {
UnsupportedReadLogger.logWarning();
return Double.NaN;
}

@Override
public double max() {
UnsupportedReadLogger.logWarning();
return Double.NaN;
}
}

// calculate count, totalAmount and max value for the use of micrometer histograms
// calculate count and totalAmount value for the use of micrometer histograms
// kinda similar to how DropwizardDistributionSummary does that
private static final class MicrometerHistogramMeasurements implements Measurements {

private final LongAdder count = new LongAdder();
private final DoubleAdder totalAmount = new DoubleAdder();
private final TimeWindowMax max;

MicrometerHistogramMeasurements(
Clock clock, DistributionStatisticConfig distributionStatisticConfig) {
this.max = new TimeWindowMax(clock, distributionStatisticConfig);
}

@Override
public void record(double amount) {
count.increment();
totalAmount.add(amount);
max.record(amount);
}

@Override
Expand All @@ -160,10 +163,5 @@ public long count() {
public double totalAmount() {
return totalAmount.sum();
}

@Override
public double max() {
return max.poll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ protected Timer newTimer(
DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector) {
OpenTelemetryTimer timer =
new OpenTelemetryTimer(id, clock, distributionStatisticConfig, pauseDetector, otelMeter);
new OpenTelemetryTimer(
id,
clock,
distributionStatisticConfig,
pauseDetector,
otelMeter,
asyncInstrumentRegistry);
if (timer.isUsingMicrometerHistograms()) {
HistogramGauges.registerWithCommonFormat(timer, this);
}
Expand All @@ -98,7 +104,7 @@ protected DistributionSummary newDistributionSummary(
Meter.Id id, DistributionStatisticConfig distributionStatisticConfig, double scale) {
OpenTelemetryDistributionSummary distributionSummary =
new OpenTelemetryDistributionSummary(
id, clock, distributionStatisticConfig, scale, otelMeter);
id, clock, distributionStatisticConfig, scale, otelMeter, asyncInstrumentRegistry);
if (distributionSummary.isUsingMicrometerHistograms()) {
HistogramGauges.registerWithCommonFormat(distributionSummary, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
package io.opentelemetry.instrumentation.micrometer.v1_5;

import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.description;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.statisticInstrumentName;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.tagsAsAttributes;

import io.micrometer.core.instrument.AbstractTimer;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Statistic;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.NoopHistogram;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
Expand All @@ -19,6 +21,8 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -27,10 +31,12 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {

private static final double NANOS_PER_MS = TimeUnit.MILLISECONDS.toNanos(1);

private final Measurements measurements;
private final TimeWindowMax max;
// TODO: use bound instruments when they're available
private final DoubleHistogram otelHistogram;
private final Attributes attributes;
private final Measurements measurements;
private final AsyncMeasurementHandle maxHandle;

private volatile boolean removed = false;

Expand All @@ -39,22 +45,32 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
Clock clock,
DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector,
Meter otelMeter) {
Meter otelMeter,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
super(id, clock, distributionStatisticConfig, pauseDetector, TimeUnit.MILLISECONDS, false);

if (isUsingMicrometerHistograms()) {
measurements = new MicrometerHistogramMeasurements();
} else {
measurements = NoopMeasurements.INSTANCE;
}
max = new TimeWindowMax(clock, distributionStatisticConfig);

this.attributes = tagsAsAttributes(id);
this.otelHistogram =
otelMeter
.histogramBuilder(id.getName())
.setDescription(description(id))
.setUnit("ms")
.build();
this.attributes = tagsAsAttributes(id);

if (isUsingMicrometerHistograms()) {
measurements = new MicrometerHistogramMeasurements(clock, distributionStatisticConfig);
} else {
measurements = NoopMeasurements.INSTANCE;
}
this.maxHandle =
asyncInstrumentRegistry.buildGauge(
statisticInstrumentName(id, Statistic.MAX),
description(id),
"ms",
attributes,
max,
m -> m.poll(TimeUnit.MILLISECONDS));
}

boolean isUsingMicrometerHistograms() {
Expand All @@ -68,6 +84,7 @@ protected void recordNonNegative(long amount, TimeUnit unit) {
double time = nanos / NANOS_PER_MS;
otelHistogram.record(time, attributes);
measurements.record(nanos);
max.record(nanos, TimeUnit.NANOSECONDS);
}
}

Expand All @@ -83,7 +100,7 @@ public double totalTime(TimeUnit unit) {

@Override
public double max(TimeUnit unit) {
return measurements.max(unit);
return max.poll(unit);
}

@Override
Expand All @@ -95,6 +112,7 @@ public Iterable<Measurement> measure() {
@Override
public void onRemove() {
removed = true;
maxHandle.remove();
}

private interface Measurements {
Expand All @@ -103,8 +121,6 @@ private interface Measurements {
long count();

double totalTime(TimeUnit unit);

double max(TimeUnit unit);
}

// if micrometer histograms are not being used then there's no need to keep any local state
Expand All @@ -126,32 +142,19 @@ public double totalTime(TimeUnit unit) {
UnsupportedReadLogger.logWarning();
return Double.NaN;
}

@Override
public double max(TimeUnit unit) {
UnsupportedReadLogger.logWarning();
return Double.NaN;
}
}

// calculate count, totalTime and max value for the use of micrometer histograms
// calculate count and totalTime value for the use of micrometer histograms
// kinda similar to how DropwizardTimer does that
private static final class MicrometerHistogramMeasurements implements Measurements {

private final LongAdder count = new LongAdder();
private final LongAdder totalTime = new LongAdder();
private final TimeWindowMax max;

MicrometerHistogramMeasurements(
Clock clock, DistributionStatisticConfig distributionStatisticConfig) {
this.max = new TimeWindowMax(clock, distributionStatisticConfig);
}

@Override
public void record(long nanos) {
count.increment();
totalTime.add(nanos);
max.record(nanos, TimeUnit.NANOSECONDS);
}

@Override
Expand All @@ -163,10 +166,5 @@ public long count() {
public double totalTime(TimeUnit unit) {
return TimeUtils.nanosToUnit(totalTime.sum(), unit);
}

@Override
public double max(TimeUnit unit) {
return max.poll(unit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.assertj.core.api.AbstractIterableAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -169,4 +170,50 @@ void testMicrometerPercentiles() {
point ->
assertThat(point).attributes().containsEntry("phi", "0.99"))));
}

@Test
void testMicrometerMax() throws InterruptedException {
// given
DistributionSummary summary =
DistributionSummary.builder("testSummaryMax")
.description("This is a test distribution summary")
.baseUnit("things")
.tags("tag", "value")
.register(Metrics.globalRegistry);

// when
summary.record(1);
summary.record(2);
summary.record(4);

// then
testing()
.waitAndAssertMetrics(
INSTRUMENTATION_NAME,
"testSummaryMax.max",
metrics ->
metrics.anySatisfy(
metric ->
assertThat(metric)
.hasDescription("This is a test distribution summary")
.hasDoubleGauge()
.points()
.anySatisfy(
point ->
assertThat(point)
.hasValue(4)
.attributes()
.containsEntry("tag", "value"))));

// when
Metrics.globalRegistry.remove(summary);
Thread.sleep(10); // give time for any inflight metric export to be received
testing().clearData();

// then
Thread.sleep(100); // interval of the test metrics exporter
testing()
.waitAndAssertMetrics(
INSTRUMENTATION_NAME, "testSummaryMax.max", AbstractIterableAssert::isEmpty);
}
}
Loading