Skip to content

Commit

Permalink
Add max measurements to Micrometer Timer & DistributionSummary (#5303)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek authored Feb 8, 2022
1 parent b9fac11 commit 99f8c8d
Show file tree
Hide file tree
Showing 5 changed files with 159 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,29 +7,35 @@

import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.baseUnit;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.description;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.statisticInstrumentName;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.tagsAsAttributes;

import io.micrometer.core.instrument.AbstractDistributionSummary;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Statistic;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.NoopHistogram;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import java.util.Collections;
import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;

final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
implements DistributionSummary, RemovableMeter {

private final Measurements measurements;
private final TimeWindowMax max;
// TODO: use bound instruments when they're available
private final DoubleHistogram otelHistogram;
private final Attributes attributes;
private final Measurements measurements;
private final AsyncMeasurementHandle maxHandle;

private volatile boolean removed = false;

Expand All @@ -38,22 +44,32 @@ final class OpenTelemetryDistributionSummary extends AbstractDistributionSummary
Clock clock,
DistributionStatisticConfig distributionStatisticConfig,
double scale,
Meter otelMeter) {
Meter otelMeter,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
super(id, clock, distributionStatisticConfig, scale, false);

if (isUsingMicrometerHistograms()) {
measurements = new MicrometerHistogramMeasurements();
} else {
measurements = NoopMeasurements.INSTANCE;
}
max = new TimeWindowMax(clock, distributionStatisticConfig);

this.attributes = tagsAsAttributes(id);
this.otelHistogram =
otelMeter
.histogramBuilder(id.getName())
.setDescription(description(id))
.setUnit(baseUnit(id))
.build();
this.attributes = tagsAsAttributes(id);

if (isUsingMicrometerHistograms()) {
measurements = new MicrometerHistogramMeasurements(clock, distributionStatisticConfig);
} else {
measurements = NoopMeasurements.INSTANCE;
}
this.maxHandle =
asyncInstrumentRegistry.buildGauge(
statisticInstrumentName(id, Statistic.MAX),
description(id),
baseUnit(id),
attributes,
max,
TimeWindowMax::poll);
}

boolean isUsingMicrometerHistograms() {
Expand All @@ -65,6 +81,7 @@ protected void recordNonNegative(double amount) {
if (amount >= 0 && !removed) {
otelHistogram.record(amount, attributes);
measurements.record(amount);
max.record(amount);
}
}

Expand All @@ -80,7 +97,7 @@ public double totalAmount() {

@Override
public double max() {
return measurements.max();
return max.poll();
}

@Override
Expand All @@ -92,6 +109,7 @@ public Iterable<Measurement> measure() {
@Override
public void onRemove() {
removed = true;
maxHandle.remove();
}

private interface Measurements {
Expand All @@ -100,8 +118,6 @@ private interface Measurements {
long count();

double totalAmount();

double max();
}

// if micrometer histograms are not being used then there's no need to keep any local state
Expand All @@ -123,32 +139,19 @@ public double totalAmount() {
UnsupportedReadLogger.logWarning();
return Double.NaN;
}

@Override
public double max() {
UnsupportedReadLogger.logWarning();
return Double.NaN;
}
}

// calculate count, totalAmount and max value for the use of micrometer histograms
// calculate count and totalAmount value for the use of micrometer histograms
// kinda similar to how DropwizardDistributionSummary does that
private static final class MicrometerHistogramMeasurements implements Measurements {

private final LongAdder count = new LongAdder();
private final DoubleAdder totalAmount = new DoubleAdder();
private final TimeWindowMax max;

MicrometerHistogramMeasurements(
Clock clock, DistributionStatisticConfig distributionStatisticConfig) {
this.max = new TimeWindowMax(clock, distributionStatisticConfig);
}

@Override
public void record(double amount) {
count.increment();
totalAmount.add(amount);
max.record(amount);
}

@Override
Expand All @@ -160,10 +163,5 @@ public long count() {
public double totalAmount() {
return totalAmount.sum();
}

@Override
public double max() {
return max.poll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ protected Timer newTimer(
DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector) {
OpenTelemetryTimer timer =
new OpenTelemetryTimer(id, clock, distributionStatisticConfig, pauseDetector, otelMeter);
new OpenTelemetryTimer(
id,
clock,
distributionStatisticConfig,
pauseDetector,
otelMeter,
asyncInstrumentRegistry);
if (timer.isUsingMicrometerHistograms()) {
HistogramGauges.registerWithCommonFormat(timer, this);
}
Expand All @@ -98,7 +104,7 @@ protected DistributionSummary newDistributionSummary(
Meter.Id id, DistributionStatisticConfig distributionStatisticConfig, double scale) {
OpenTelemetryDistributionSummary distributionSummary =
new OpenTelemetryDistributionSummary(
id, clock, distributionStatisticConfig, scale, otelMeter);
id, clock, distributionStatisticConfig, scale, otelMeter, asyncInstrumentRegistry);
if (distributionSummary.isUsingMicrometerHistograms()) {
HistogramGauges.registerWithCommonFormat(distributionSummary, this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@
package io.opentelemetry.instrumentation.micrometer.v1_5;

import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.description;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.statisticInstrumentName;
import static io.opentelemetry.instrumentation.micrometer.v1_5.Bridging.tagsAsAttributes;

import io.micrometer.core.instrument.AbstractTimer;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Statistic;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.NoopHistogram;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
Expand All @@ -19,6 +21,8 @@
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.metrics.DoubleHistogram;
import io.opentelemetry.api.metrics.Meter;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry;
import io.opentelemetry.instrumentation.api.internal.AsyncInstrumentRegistry.AsyncMeasurementHandle;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
Expand All @@ -27,10 +31,12 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {

private static final double NANOS_PER_MS = TimeUnit.MILLISECONDS.toNanos(1);

private final Measurements measurements;
private final TimeWindowMax max;
// TODO: use bound instruments when they're available
private final DoubleHistogram otelHistogram;
private final Attributes attributes;
private final Measurements measurements;
private final AsyncMeasurementHandle maxHandle;

private volatile boolean removed = false;

Expand All @@ -39,22 +45,32 @@ final class OpenTelemetryTimer extends AbstractTimer implements RemovableMeter {
Clock clock,
DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector,
Meter otelMeter) {
Meter otelMeter,
AsyncInstrumentRegistry asyncInstrumentRegistry) {
super(id, clock, distributionStatisticConfig, pauseDetector, TimeUnit.MILLISECONDS, false);

if (isUsingMicrometerHistograms()) {
measurements = new MicrometerHistogramMeasurements();
} else {
measurements = NoopMeasurements.INSTANCE;
}
max = new TimeWindowMax(clock, distributionStatisticConfig);

this.attributes = tagsAsAttributes(id);
this.otelHistogram =
otelMeter
.histogramBuilder(id.getName())
.setDescription(description(id))
.setUnit("ms")
.build();
this.attributes = tagsAsAttributes(id);

if (isUsingMicrometerHistograms()) {
measurements = new MicrometerHistogramMeasurements(clock, distributionStatisticConfig);
} else {
measurements = NoopMeasurements.INSTANCE;
}
this.maxHandle =
asyncInstrumentRegistry.buildGauge(
statisticInstrumentName(id, Statistic.MAX),
description(id),
"ms",
attributes,
max,
m -> m.poll(TimeUnit.MILLISECONDS));
}

boolean isUsingMicrometerHistograms() {
Expand All @@ -68,6 +84,7 @@ protected void recordNonNegative(long amount, TimeUnit unit) {
double time = nanos / NANOS_PER_MS;
otelHistogram.record(time, attributes);
measurements.record(nanos);
max.record(nanos, TimeUnit.NANOSECONDS);
}
}

Expand All @@ -83,7 +100,7 @@ public double totalTime(TimeUnit unit) {

@Override
public double max(TimeUnit unit) {
return measurements.max(unit);
return max.poll(unit);
}

@Override
Expand All @@ -95,6 +112,7 @@ public Iterable<Measurement> measure() {
@Override
public void onRemove() {
removed = true;
maxHandle.remove();
}

private interface Measurements {
Expand All @@ -103,8 +121,6 @@ private interface Measurements {
long count();

double totalTime(TimeUnit unit);

double max(TimeUnit unit);
}

// if micrometer histograms are not being used then there's no need to keep any local state
Expand All @@ -126,32 +142,19 @@ public double totalTime(TimeUnit unit) {
UnsupportedReadLogger.logWarning();
return Double.NaN;
}

@Override
public double max(TimeUnit unit) {
UnsupportedReadLogger.logWarning();
return Double.NaN;
}
}

// calculate count, totalTime and max value for the use of micrometer histograms
// calculate count and totalTime value for the use of micrometer histograms
// kinda similar to how DropwizardTimer does that
private static final class MicrometerHistogramMeasurements implements Measurements {

private final LongAdder count = new LongAdder();
private final LongAdder totalTime = new LongAdder();
private final TimeWindowMax max;

MicrometerHistogramMeasurements(
Clock clock, DistributionStatisticConfig distributionStatisticConfig) {
this.max = new TimeWindowMax(clock, distributionStatisticConfig);
}

@Override
public void record(long nanos) {
count.increment();
totalTime.add(nanos);
max.record(nanos, TimeUnit.NANOSECONDS);
}

@Override
Expand All @@ -163,10 +166,5 @@ public long count() {
public double totalTime(TimeUnit unit) {
return TimeUtils.nanosToUnit(totalTime.sum(), unit);
}

@Override
public double max(TimeUnit unit) {
return max.poll(unit);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.assertj.core.api.AbstractIterableAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -169,4 +170,50 @@ void testMicrometerPercentiles() {
point ->
assertThat(point).attributes().containsEntry("phi", "0.99"))));
}

@Test
void testMicrometerMax() throws InterruptedException {
// given
DistributionSummary summary =
DistributionSummary.builder("testSummaryMax")
.description("This is a test distribution summary")
.baseUnit("things")
.tags("tag", "value")
.register(Metrics.globalRegistry);

// when
summary.record(1);
summary.record(2);
summary.record(4);

// then
testing()
.waitAndAssertMetrics(
INSTRUMENTATION_NAME,
"testSummaryMax.max",
metrics ->
metrics.anySatisfy(
metric ->
assertThat(metric)
.hasDescription("This is a test distribution summary")
.hasDoubleGauge()
.points()
.anySatisfy(
point ->
assertThat(point)
.hasValue(4)
.attributes()
.containsEntry("tag", "value"))));

// when
Metrics.globalRegistry.remove(summary);
Thread.sleep(10); // give time for any inflight metric export to be received
testing().clearData();

// then
Thread.sleep(100); // interval of the test metrics exporter
testing()
.waitAndAssertMetrics(
INSTRUMENTATION_NAME, "testSummaryMax.max", AbstractIterableAssert::isEmpty);
}
}
Loading

0 comments on commit 99f8c8d

Please sign in to comment.