Skip to content

Commit

Permalink
OTLP metric exporter accepts aggregation temporality function (#4340)
Browse files Browse the repository at this point in the history
* OTLP metric exporter accepts aggregation temporality function

* Spotless
  • Loading branch information
jack-berg authored Apr 8, 2022
1 parent d78cada commit e7d5a97
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@
import static io.opentelemetry.api.internal.Utils.checkArgument;
import static java.util.Objects.requireNonNull;

import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.okhttp.OkHttpExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand All @@ -23,7 +23,7 @@ public final class OtlpHttpMetricExporterBuilder {
private static final String DEFAULT_ENDPOINT = "http://localhost:4318/v1/metrics";

private static final Function<InstrumentType, AggregationTemporality>
DEFAULT_AGGREGATION_TEMPORALITY_FUNCTION = ExporterBuilderUtil::cumulativePreferred;
DEFAULT_AGGREGATION_TEMPORALITY_FUNCTION = MetricExporter::alwaysCumulative;

private final OkHttpExporterBuilder<MetricsRequestMarshaler> delegate;
private Function<InstrumentType, AggregationTemporality> aggregationTemporalityFunction =
Expand Down Expand Up @@ -101,21 +101,19 @@ public OtlpHttpMetricExporterBuilder setClientTls(byte[] privateKeyPem, byte[] c
}

/**
* Set the preferred aggregation temporality.
* Set the {@link Function} that determines the {@link AggregationTemporality} for each {@link
* InstrumentType}.
*
* <p>If unset, defaults to {@link AggregationTemporality#CUMULATIVE} and returns {@link
* AggregationTemporality#CUMULATIVE} for all instruments. If {@link
* AggregationTemporality#DELTA}, returns {@link AggregationTemporality#DELTA} for counter (sync
* and async) and histogram instruments, {@link AggregationTemporality#CUMULATIVE} for up down
* counter (sync and async) instruments.
* <p>Used to implement {@link MetricExporter#getAggregationTemporality(InstrumentType)}. If
* unset, defaults to {@link MetricExporter#alwaysCumulative(InstrumentType)}.
*
* <p>{@link MetricExporter#deltaPreferred(InstrumentType)} is a common configuration for delta
* backends.
*/
public OtlpHttpMetricExporterBuilder setPreferredTemporality(
AggregationTemporality preferredTemporality) {
requireNonNull(preferredTemporality, "preferredTemporality");
this.aggregationTemporalityFunction =
preferredTemporality == AggregationTemporality.CUMULATIVE
? ExporterBuilderUtil::cumulativePreferred
: ExporterBuilderUtil::deltaPreferred;
public OtlpHttpMetricExporterBuilder setAggregationTemporality(
Function<InstrumentType, AggregationTemporality> aggregationTemporalityFunction) {
requireNonNull(aggregationTemporalityFunction, "aggregationTemporalityFunction");
this.aggregationTemporalityFunction = aggregationTemporalityFunction;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableLongPointData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableSumData;
Expand Down Expand Up @@ -150,11 +151,11 @@ void validConfig() {
assertThatCode(
() ->
OtlpHttpMetricExporter.builder()
.setPreferredTemporality(AggregationTemporality.DELTA))
.setAggregationTemporality(MetricExporter::deltaPreferred))
.doesNotThrowAnyException();
assertThat(
OtlpHttpMetricExporter.builder()
.setPreferredTemporality(AggregationTemporality.DELTA)
.setAggregationTemporality(MetricExporter::deltaPreferred)
.build()
.getAggregationTemporality(InstrumentType.COUNTER))
.isEqualTo(AggregationTemporality.DELTA);
Expand Down Expand Up @@ -199,9 +200,9 @@ void invalidConfig() {
.hasMessage(
"Unsupported compression method. Supported compression methods include: gzip, none.");

assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setPreferredTemporality(null))
assertThatThrownBy(() -> OtlpHttpMetricExporter.builder().setAggregationTemporality(null))
.isInstanceOf(NullPointerException.class)
.hasMessage("preferredTemporality");
.hasMessage("aggregationTemporalityFunction");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@

package io.opentelemetry.exporter.internal;

import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.MetricReader;
import java.net.URI;
import java.net.URISyntaxException;

Expand Down Expand Up @@ -36,36 +33,5 @@ public static URI validateEndpoint(String endpoint) {
return uri;
}

/**
* A {@link MetricReader#getAggregationTemporality(InstrumentType)} function that indicates a
* cumulative preference.
*
* <p>{@link AggregationTemporality#CUMULATIVE} is returned for all instrument types.
*/
public static AggregationTemporality cumulativePreferred(InstrumentType unused) {
return AggregationTemporality.CUMULATIVE;
}

/**
* A {@link MetricReader#getAggregationTemporality(InstrumentType)} function that indicates a
* delta preference.
*
* <p>{@link AggregationTemporality#DELTA} is returned for counter (sync and async) and histogram
* instruments. {@link AggregationTemporality#CUMULATIVE} is returned for up down counter (sync
* and async) instruments.
*/
public static AggregationTemporality deltaPreferred(InstrumentType instrumentType) {
switch (instrumentType) {
case UP_DOWN_COUNTER:
case OBSERVABLE_UP_DOWN_COUNTER:
return AggregationTemporality.CUMULATIVE;
case COUNTER:
case OBSERVABLE_COUNTER:
case HISTOGRAM:
default:
return AggregationTemporality.DELTA;
}
}

private ExporterBuilderUtil() {}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
import static java.util.Objects.requireNonNull;

import io.grpc.ManagedChannel;
import io.opentelemetry.exporter.internal.ExporterBuilderUtil;
import io.opentelemetry.exporter.internal.grpc.GrpcExporter;
import io.opentelemetry.exporter.internal.grpc.GrpcExporterBuilder;
import io.opentelemetry.exporter.internal.otlp.metrics.MetricsRequestMarshaler;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.net.URI;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
Expand All @@ -32,7 +32,7 @@ public final class OtlpGrpcMetricExporterBuilder {
private static final URI DEFAULT_ENDPOINT = URI.create(DEFAULT_ENDPOINT_URL);
private static final long DEFAULT_TIMEOUT_SECS = 10;
private static final Function<InstrumentType, AggregationTemporality>
DEFAULT_AGGREGATION_TEMPORALITY_FUNCTION = ExporterBuilderUtil::cumulativePreferred;
DEFAULT_AGGREGATION_TEMPORALITY_FUNCTION = MetricExporter::alwaysCumulative;

// Visible for testing
final GrpcExporterBuilder<MetricsRequestMarshaler> delegate;
Expand Down Expand Up @@ -142,21 +142,19 @@ public OtlpGrpcMetricExporterBuilder addHeader(String key, String value) {
}

/**
* Set the preferred aggregation temporality.
* Set the {@link Function} that determines the {@link AggregationTemporality} for each {@link
* InstrumentType}.
*
* <p>If unset, defaults to {@link AggregationTemporality#CUMULATIVE} and returns {@link
* AggregationTemporality#CUMULATIVE} for all instruments. If {@link
* AggregationTemporality#DELTA}, returns {@link AggregationTemporality#DELTA} for counter (sync
* and async) and histogram instruments, {@link AggregationTemporality#CUMULATIVE} for up down
* counter (sync and async) instruments.
* <p>Used to implement {@link MetricExporter#getAggregationTemporality(InstrumentType)}. If
* unset, defaults to {@link MetricExporter#alwaysCumulative(InstrumentType)}.
*
* <p>{@link MetricExporter#deltaPreferred(InstrumentType)} is a common configuration for delta
* backends.
*/
public OtlpGrpcMetricExporterBuilder setPreferredTemporality(
AggregationTemporality preferredTemporality) {
requireNonNull(preferredTemporality, "preferredTemporality");
this.aggregationTemporalityFunction =
preferredTemporality == AggregationTemporality.CUMULATIVE
? ExporterBuilderUtil::cumulativePreferred
: ExporterBuilderUtil::deltaPreferred;
public OtlpGrpcMetricExporterBuilder setAggregationTemporality(
Function<InstrumentType, AggregationTemporality> aggregationTemporalityFunction) {
requireNonNull(aggregationTemporalityFunction, "aggregationTemporalityFunction");
this.aggregationTemporalityFunction = aggregationTemporalityFunction;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ static MetricExporter configureOtlpMetrics(ConfigProperties config) {
builder::setTrustedCertificates,
builder::setClientTls,
retryPolicy -> RetryUtil.setRetryPolicyOnDelegate(builder, retryPolicy));
OtlpConfigUtil.configureOtlpAggregationTemporality(config, builder::setPreferredTemporality);
OtlpConfigUtil.configureOtlpAggregationTemporality(
config, builder::setAggregationTemporality);

return builder.build();
} else if (protocol.equals(PROTOCOL_GRPC)) {
Expand All @@ -124,7 +125,8 @@ static MetricExporter configureOtlpMetrics(ConfigProperties config) {
builder::setTrustedCertificates,
builder::setClientTls,
retryPolicy -> RetryUtil.setRetryPolicyOnDelegate(builder, retryPolicy));
OtlpConfigUtil.configureOtlpAggregationTemporality(config, builder::setPreferredTemporality);
OtlpConfigUtil.configureOtlpAggregationTemporality(
config, builder::setAggregationTemporality);

return builder.build();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@
import io.opentelemetry.exporter.internal.retry.RetryPolicy;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.export.MetricExporter;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
Expand All @@ -19,6 +21,7 @@
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.annotation.Nullable;

final class OtlpConfigUtil {
Expand Down Expand Up @@ -129,7 +132,8 @@ static void configureOtlpExporterBuilder(
}

static void configureOtlpAggregationTemporality(
ConfigProperties config, Consumer<AggregationTemporality> setAggregationTemporality) {
ConfigProperties config,
Consumer<Function<InstrumentType, AggregationTemporality>> setAggregationTemporality) {
String temporalityStr = config.getString("otel.exporter.otlp.metrics.temporality.preference");
if (temporalityStr == null) {
// TODO(jack-berg): remove support after 1.13.0
Expand All @@ -145,7 +149,11 @@ static void configureOtlpAggregationTemporality(
throw new ConfigurationException(
"Unrecognized aggregation temporality: " + temporalityStr, e);
}
setAggregationTemporality.accept(temporality);
Function<InstrumentType, AggregationTemporality> temporalityFunction =
temporality == AggregationTemporality.CUMULATIVE
? MetricExporter::alwaysCumulative
: MetricExporter::deltaPreferred;
setAggregationTemporality.accept(temporalityFunction);
}

private static URL createUrl(URL context, String spec) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

import com.google.common.collect.ImmutableMap;
import io.opentelemetry.sdk.autoconfigure.spi.ConfigurationException;
import io.opentelemetry.sdk.metrics.InstrumentType;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.assertj.core.api.ThrowableAssert.ThrowingCallable;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -367,9 +369,12 @@ void configureOtlpAggregationTemporality() {
/** Configure and return the aggregation temporality using the given properties. */
private static AggregationTemporality configureAggregationTemporality(
Map<String, String> properties) {
AtomicReference<AggregationTemporality> temporalityRef = new AtomicReference<>();
AtomicReference<Function<InstrumentType, AggregationTemporality>> temporalityRef =
new AtomicReference<>();
OtlpConfigUtil.configureOtlpAggregationTemporality(
DefaultConfigProperties.createForTest(properties), temporalityRef::set);
return temporalityRef.get();
// configureOtlpAggregationTemporality sets a function, but we can use the
// AggregationTemporality of HISTOGRAM instruments to simplify assertions
return temporalityRef.get().apply(InstrumentType.HISTOGRAM);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,36 @@
*/
public interface MetricExporter extends Closeable {

/**
* A common implementation of {@link #getAggregationTemporality(InstrumentType)} which returns
* {@link AggregationTemporality#CUMULATIVE} for all instruments.
*/
static AggregationTemporality alwaysCumulative(InstrumentType unused) {
return AggregationTemporality.CUMULATIVE;
}

/**
* A common implementation of {@link #getAggregationTemporality(InstrumentType)} which indicates
* delta preference.
*
* <p>{@link AggregationTemporality#DELTA} is returned for {@link InstrumentType#COUNTER}, {@link
* InstrumentType#OBSERVABLE_COUNTER}, and {@link InstrumentType#HISTOGRAM}. {@link
* AggregationTemporality#CUMULATIVE} is returned for {@link InstrumentType#UP_DOWN_COUNTER} and
* {@link InstrumentType#OBSERVABLE_UP_DOWN_COUNTER}.
*/
static AggregationTemporality deltaPreferred(InstrumentType instrumentType) {
switch (instrumentType) {
case UP_DOWN_COUNTER:
case OBSERVABLE_UP_DOWN_COUNTER:
return AggregationTemporality.CUMULATIVE;
case COUNTER:
case OBSERVABLE_COUNTER:
case HISTOGRAM:
default:
return AggregationTemporality.DELTA;
}
}

/** Return the default aggregation temporality for the {@link InstrumentType}. */
AggregationTemporality getAggregationTemporality(InstrumentType instrumentType);

Expand Down
Loading

0 comments on commit e7d5a97

Please sign in to comment.