Skip to content

Commit

Permalink
Wire Exemplars into the metrics.data package (#3353)
Browse files Browse the repository at this point in the history
* Add Exemplars into `metrics.data` pacakge

- Add Exemplars to match OTLP spec
- Add assertj helpers for exemplar extraction on points.

* Wire exemplar export to OTLP exporter

* Wire exemplar export to Prometheus exporter

* Add javadoc for AbstractSampledPointDataAssert

* Fixes from review.

* Fixes from review.

* Fixes from review.

* Fixes from spotless.

* Fixes froom review.

* Add clarification to javadoc from review.

* ONe last javadoc cleanup.

* Fixes to javadoc build.

* Update method name from review.

* Fixes from review.
  • Loading branch information
jsuereth authored Jul 10, 2021
1 parent 57bd952 commit ae4bb36
Show file tree
Hide file tree
Showing 14 changed files with 603 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_DELTA;
import static io.opentelemetry.proto.metrics.v1.AggregationTemporality.AGGREGATION_TEMPORALITY_UNSPECIFIED;

import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.opentelemetry.api.internal.OtelEncodingUtils;
import io.opentelemetry.api.trace.SpanId;
import io.opentelemetry.api.trace.TraceId;
import io.opentelemetry.proto.metrics.v1.AggregationTemporality;
import io.opentelemetry.proto.metrics.v1.Exemplar;
import io.opentelemetry.proto.metrics.v1.Gauge;
import io.opentelemetry.proto.metrics.v1.Histogram;
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
Expand All @@ -21,13 +27,16 @@
import io.opentelemetry.proto.metrics.v1.Summary;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.internal.ThrottlingLogger;
import io.opentelemetry.sdk.metrics.data.DoubleExemplar;
import io.opentelemetry.sdk.metrics.data.DoubleGaugeData;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramData;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.DoubleSumData;
import io.opentelemetry.sdk.metrics.data.DoubleSummaryData;
import io.opentelemetry.sdk.metrics.data.DoubleSummaryPointData;
import io.opentelemetry.sdk.metrics.data.LongExemplar;
import io.opentelemetry.sdk.metrics.data.LongGaugeData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.LongSumData;
Expand All @@ -39,10 +48,15 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

/** Converter from SDK {@link MetricData} to OTLP {@link ResourceMetrics}. */
public final class MetricAdapter {

private static final ThrottlingLogger logger =
new ThrottlingLogger(Logger.getLogger(MetricAdapter.class.getName()));

/** Converts the provided {@link MetricData} to {@link ResourceMetrics}. */
public static List<ResourceMetrics> toProtoResourceMetrics(Collection<MetricData> metricData) {
Map<Resource, Map<InstrumentationLibraryInfo, List<Metric>>> resourceAndLibraryMap =
Expand Down Expand Up @@ -199,6 +213,7 @@ static List<NumberDataPoint> toIntDataPoints(Collection<LongPointData> points) {
.getAttributes()
.forEach(
(key, value) -> builder.addAttributes(CommonAdapter.toProtoAttribute(key, value)));
longPoint.getExemplars().forEach(e -> builder.addExemplars(toExemplar(e)));
result.add(builder.build());
}
return result;
Expand All @@ -216,6 +231,7 @@ static Collection<NumberDataPoint> toDoubleDataPoints(Collection<DoublePointData
.getAttributes()
.forEach(
(key, value) -> builder.addAttributes(CommonAdapter.toProtoAttribute(key, value)));
doublePoint.getExemplars().forEach(e -> builder.addExemplars(toExemplar(e)));
result.add(builder.build());
}
return result;
Expand Down Expand Up @@ -269,10 +285,48 @@ static Collection<HistogramDataPoint> toHistogramDataPoints(
.getAttributes()
.forEach(
(key, value) -> builder.addAttributes(CommonAdapter.toProtoAttribute(key, value)));
doubleHistogramPoint.getExemplars().forEach(e -> builder.addExemplars(toExemplar(e)));
result.add(builder.build());
}
return result;
}

static Exemplar toExemplar(io.opentelemetry.sdk.metrics.data.Exemplar exemplar) {
// TODO - Use a thread local cache for spanid/traceid -> byte conversion.
Exemplar.Builder builder = Exemplar.newBuilder();
builder.setTimeUnixNano(exemplar.getEpochNanos());
if (exemplar.getSpanId() != null) {
builder.setSpanId(convertSpanId(exemplar.getSpanId()));
}
if (exemplar.getTraceId() != null) {
builder.setTraceId(convertTraceId(exemplar.getTraceId()));
}
exemplar
.getFilteredAttributes()
.forEach(
(key, value) ->
builder.addFilteredAttributes(CommonAdapter.toProtoAttribute(key, value)));
if (exemplar instanceof LongExemplar) {
builder.setAsInt(((LongExemplar) exemplar).getValue());
} else if (exemplar instanceof DoubleExemplar) {
builder.setAsDouble(((DoubleExemplar) exemplar).getValue());
} else {
if (logger.isLoggable(Level.SEVERE)) {
logger.log(Level.SEVERE, "Unable to convert unknown exemplar type: " + exemplar);
}
}
return builder.build();
}

private static ByteString convertTraceId(String id) {
return UnsafeByteOperations.unsafeWrap(
OtelEncodingUtils.bytesFromBase16(id, TraceId.getLength()));
}

private static ByteString convertSpanId(String id) {
return UnsafeByteOperations.unsafeWrap(
OtelEncodingUtils.bytesFromBase16(id, SpanId.getLength()));
}

private MetricAdapter() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
import static org.assertj.core.api.Assertions.assertThat;

import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.proto.common.v1.AnyValue;
import io.opentelemetry.proto.common.v1.InstrumentationLibrary;
import io.opentelemetry.proto.common.v1.KeyValue;
import io.opentelemetry.proto.metrics.v1.Exemplar;
import io.opentelemetry.proto.metrics.v1.Gauge;
import io.opentelemetry.proto.metrics.v1.Histogram;
import io.opentelemetry.proto.metrics.v1.HistogramDataPoint;
Expand All @@ -28,19 +30,22 @@
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.opentelemetry.sdk.common.InstrumentationLibraryInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplar;
import io.opentelemetry.sdk.metrics.data.DoubleGaugeData;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramData;
import io.opentelemetry.sdk.metrics.data.DoubleHistogramPointData;
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.DoubleSumData;
import io.opentelemetry.sdk.metrics.data.DoubleSummaryData;
import io.opentelemetry.sdk.metrics.data.DoubleSummaryPointData;
import io.opentelemetry.sdk.metrics.data.LongExemplar;
import io.opentelemetry.sdk.metrics.data.LongGaugeData;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.LongSumData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.data.ValueAtPercentile;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Arrays;
import java.util.Collections;
import org.junit.jupiter.api.Test;

Expand All @@ -57,7 +62,19 @@ void toInt64DataPoints() {
assertThat(MetricAdapter.toIntDataPoints(Collections.emptyList())).isEmpty();
assertThat(
MetricAdapter.toIntDataPoints(
singletonList(LongPointData.create(123, 456, KV_ATTR, 5))))
singletonList(
LongPointData.create(
123,
456,
KV_ATTR,
5,
Arrays.asList(
LongExemplar.create(
Attributes.of(stringKey("test"), "value"),
2,
/*spanId=*/ "0000000000000002",
/*traceId=*/ "00000000000000000000000000000001",
1))))))
.containsExactly(
NumberDataPoint.newBuilder()
.setStartTimeUnixNano(123)
Expand All @@ -66,6 +83,20 @@ void toInt64DataPoints() {
singletonList(
KeyValue.newBuilder().setKey("k").setValue(stringValue("v")).build()))
.setAsInt(5)
.addExemplars(
Exemplar.newBuilder()
.setTimeUnixNano(2)
.addFilteredAttributes(
KeyValue.newBuilder()
.setKey("test")
.setValue(stringValue("value"))
.build())
.setSpanId(ByteString.copyFrom(new byte[] {0, 0, 0, 0, 0, 0, 0, 2}))
.setTraceId(
ByteString.copyFrom(
new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}))
.setAsInt(1)
.build())
.build());
assertThat(
MetricAdapter.toIntDataPoints(
Expand Down Expand Up @@ -206,7 +237,14 @@ void toHistogramDataPoints() {
Attributes.empty(),
15.3,
ImmutableList.of(),
ImmutableList.of(7L)))))
ImmutableList.of(7L),
ImmutableList.of(
DoubleExemplar.create(
Attributes.of(stringKey("test"), "value"),
2,
/*spanId=*/ "0000000000000002",
/*traceId=*/ "00000000000000000000000000000001",
1.5))))))
.containsExactly(
HistogramDataPoint.newBuilder()
.setStartTimeUnixNano(123)
Expand All @@ -226,6 +264,20 @@ void toHistogramDataPoints() {
.setCount(7)
.setSum(15.3)
.addBucketCounts(7)
.addExemplars(
Exemplar.newBuilder()
.setTimeUnixNano(2)
.addFilteredAttributes(
KeyValue.newBuilder()
.setKey("test")
.setValue(stringValue("value"))
.build())
.setSpanId(ByteString.copyFrom(new byte[] {0, 0, 0, 0, 0, 0, 0, 2}))
.setTraceId(
ByteString.copyFrom(
new byte[] {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}))
.setAsDouble(1.5)
.build())
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.opentelemetry.sdk.metrics.data.DoublePointData;
import io.opentelemetry.sdk.metrics.data.DoubleSumData;
import io.opentelemetry.sdk.metrics.data.DoubleSummaryPointData;
import io.opentelemetry.sdk.metrics.data.Exemplar;
import io.opentelemetry.sdk.metrics.data.LongPointData;
import io.opentelemetry.sdk.metrics.data.LongSumData;
import io.opentelemetry.sdk.metrics.data.MetricData;
Expand All @@ -26,7 +27,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.annotation.Nullable;

/**
* Util methods to convert OpenTelemetry Metrics data models to Prometheus data models.
Expand Down Expand Up @@ -117,12 +120,24 @@ static List<Sample> toSamples(
case DOUBLE_SUM:
case DOUBLE_GAUGE:
DoublePointData doublePoint = (DoublePointData) pointData;
samples.add(new Sample(name, labelNames, labelValues, doublePoint.getValue()));
samples.add(
createSample(
name,
labelNames,
labelValues,
doublePoint.getValue(),
lastExemplarOrNull(doublePoint.getExemplars())));
break;
case LONG_SUM:
case LONG_GAUGE:
LongPointData longPoint = (LongPointData) pointData;
samples.add(new Sample(name, labelNames, labelValues, longPoint.getValue()));
samples.add(
createSample(
name,
labelNames,
labelValues,
longPoint.getValue(),
lastExemplarOrNull(longPoint.getExemplars())));
break;
case SUMMARY:
addSummarySamples(
Expand Down Expand Up @@ -183,21 +198,50 @@ private static void addHistogramSamples(
labelNamesWithLe.add(LABEL_NAME_LE);

long cumulativeCount = 0;
List<Double> boundaries = doubleHistogramPointData.getBoundaries();
List<Long> counts = doubleHistogramPointData.getCounts();
for (int i = 0; i < counts.size(); i++) {
List<String> labelValuesWithLe = new ArrayList<>(labelValues.size() + 1);
// This is the upper boundary (inclusive). I.e. all values should be < this value (LE -
// Less-then-or-Equal).
double boundary = doubleHistogramPointData.getBucketUpperBound(i);
labelValuesWithLe.addAll(labelValues);
labelValuesWithLe.add(
doubleToGoString(i < boundaries.size() ? boundaries.get(i) : Double.POSITIVE_INFINITY));
labelValuesWithLe.add(doubleToGoString(boundary));

cumulativeCount += counts.get(i);
samples.add(
new Sample(
name + SAMPLE_SUFFIX_BUCKET, labelNamesWithLe, labelValuesWithLe, cumulativeCount));
createSample(
name + SAMPLE_SUFFIX_BUCKET,
labelNamesWithLe,
labelValuesWithLe,
cumulativeCount,
filterExemplars(
doubleHistogramPointData.getExemplars(),
doubleHistogramPointData.getBucketLowerBound(i),
boundary)));
}
}

@Nullable
private static Exemplar lastExemplarOrNull(Collection<Exemplar> exemplars) {
Exemplar result = null;
for (Exemplar e : exemplars) {
result = e;
}
return result;
}

@Nullable
private static Exemplar filterExemplars(Collection<Exemplar> exemplars, double min, double max) {
Exemplar result = null;
for (Exemplar e : exemplars) {
double value = e.getValueAsDouble();
if (value <= max && value > min) {
result = e;
}
}
return result;
}

private static int estimateNumSamples(int numPoints, MetricDataType type) {
if (type == MetricDataType.SUMMARY) {
// count + sum + estimated 2 percentiles (default MinMaxSumCount aggregator).
Expand All @@ -224,5 +268,31 @@ private static Collection<? extends PointData> getPoints(MetricData metricData)
return Collections.emptyList();
}

private static Sample createSample(
String name,
List<String> labelNames,
List<String> labelValues,
double value,
@Nullable Exemplar exemplar) {
if (exemplar != null) {
return new Sample(name, labelNames, labelValues, value, toPrometheusExemplar(exemplar));
}
return new Sample(name, labelNames, labelValues, value);
}

private static io.prometheus.client.exemplars.Exemplar toPrometheusExemplar(Exemplar exemplar) {
if (exemplar.getSpanId() != null && exemplar.getTraceId() != null) {
return new io.prometheus.client.exemplars.Exemplar(
exemplar.getValueAsDouble(),
// Convert to ms for prometheus, truncate nanosecond precision.
TimeUnit.NANOSECONDS.toMillis(exemplar.getEpochNanos()),
"trace_id",
exemplar.getTraceId(),
"span_id",
exemplar.getSpanId());
}
return new io.prometheus.client.exemplars.Exemplar(exemplar.getValueAsDouble());
}

private MetricAdapter() {}
}
Loading

0 comments on commit ae4bb36

Please sign in to comment.