Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adapt otel exponential histogram data #10449

Merged
merged 14 commits into from
Feb 28, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/changes/changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
* `Scope` in the Entity of Metrics query v1 protocol is not required and automatical correction. The scope is determined based on the metric itself.
* Add explicit `ReadTimeout` for ConsulConfigurationWatcher to avoid `IllegalArgumentException: Cache watchInterval=10sec >= networkClientReadTimeout=10000ms`.
* Fix `DurationUtils.getDurationPoints` exceed, when `startTimeBucket` equals `endTimeBucket`.
* Support process OpenTelemetry ExponentialHistogram metrics

#### UI

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import io.opentelemetry.proto.metrics.v1.Sum;
import io.opentelemetry.proto.metrics.v1.SummaryDataPoint;
import io.vavr.Function1;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Stream;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.meter.analyzer.MetricConvert;
Expand Down Expand Up @@ -161,6 +163,60 @@ private static Map<Double, Long> buildBuckets(
return result;
}

/**
* ExponentialHistogram data points are an alternate representation to the Histogram data point in OpenTelemetry
* metric format(https://opentelemetry.io/docs/reference/specification/metrics/data-model/#exponentialhistogram).
* It uses scale, offset and bucket index to calculate the bound. Firstly, calculate the base using scale by
* formula: base = 2**(2**(-scale)). Then the upperBound of specific bucket can be calculated by formula:
* base**(offset+index+1). Above calculation way is about positive buckets. For the negative case, we just
* map them by their absolute value into the negative range using the same scale as the positive range. So the
* upperBound should be calculated as -base**(offset+index).
mufiye marked this conversation as resolved.
Show resolved Hide resolved
*
* @param positiveOffset corresponding to positive Buckets' offset in ExponentialHistogramDataPoint
* @param positiveBucketCounts corresponding to positive Buckets' bucket_counts in ExponentialHistogramDataPoint
* @param negativeOffset corresponding to negative Buckets' offset in ExponentialHistogramDataPoint
* @param negativeBucketCounts corresponding to negative Buckets' bucket_counts in ExponentialHistogramDataPoint
* @param scale corresponding to scale in ExponentialHistogramDataPoint
* @return The map is a bucket set for histogram, the key is specific bucket's upperBound, the value is item count
* in this bucket lower than or equals to key(upperBound)
*/
private static Map<Double, Long> buildBucketsFromExponentialHistogram(
mufiye marked this conversation as resolved.
Show resolved Hide resolved
int positiveOffset, final List<Long> positiveBucketCounts,
int negativeOffset, final List<Long> negativeBucketCounts, int scale) {

final Map<Double, Long> result = new HashMap<>();
mufiye marked this conversation as resolved.
Show resolved Hide resolved
double base = Math.pow(2.0, Math.pow(2.0, -scale));
mufiye marked this conversation as resolved.
Show resolved Hide resolved
if (base == Double.POSITIVE_INFINITY) {
if (log.isWarnEnabled()) {
log.warn("Receive and reject out-of-range ExponentialHistogram data");
}
return result;
}
double upperBound;
for (int i = 0; i < negativeBucketCounts.size(); i++) {
upperBound = -Math.pow(base, negativeOffset + i);
if (upperBound == Double.NEGATIVE_INFINITY) {
if (log.isWarnEnabled()) {
log.warn("Receive and reject out-of-range ExponentialHistogram data");
}
break;
mufiye marked this conversation as resolved.
Show resolved Hide resolved
}
result.put(upperBound, negativeBucketCounts.get(i));
}
for (int i = 0; i < positiveBucketCounts.size() - 1; i++) {
upperBound = Math.pow(base, positiveOffset + i + 1);
if (upperBound == Double.POSITIVE_INFINITY) {
if (log.isWarnEnabled()) {
log.warn("Receive and reject out-of-range ExponentialHistogram data");
}
break;
mufiye marked this conversation as resolved.
Show resolved Hide resolved
}
result.put(upperBound, positiveBucketCounts.get(i));
}
result.put(Double.POSITIVE_INFINITY, positiveBucketCounts.get(positiveBucketCounts.size() - 1));
return result;
}

// Adapt the OpenTelemetry metrics to SkyWalking metrics
private Stream<? extends Metric> adaptMetrics(
final Map<String, String> nodeLabels,
Expand All @@ -187,16 +243,16 @@ private Stream<? extends Metric> adaptMetrics(
if (sum
.getAggregationTemporality() == AGGREGATION_TEMPORALITY_DELTA) {
return sum.getDataPointsList().stream()
.map(point -> new Gauge(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000
));
.map(point -> new Gauge(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.hasAsDouble() ? point.getAsDouble()
: point.getAsInt(),
point.getTimeUnixNano() / 1000000
));
}
if (sum.getIsMonotonic()) {
return sum.getDataPointsList().stream()
Expand Down Expand Up @@ -241,6 +297,26 @@ private Stream<? extends Metric> adaptMetrics(
point.getTimeUnixNano() / 1000000
));
}
if (metric.hasExponentialHistogram()) {
return metric.getExponentialHistogram().getDataPointsList().stream()
.map(point -> new Histogram(
metric.getName(),
mergeLabels(
nodeLabels,
buildLabels(point.getAttributesList())
),
point.getCount(),
point.getSum(),
buildBucketsFromExponentialHistogram(
point.getPositive().getOffset(),
point.getPositive().getBucketCountsList(),
point.getNegative().getOffset(),
point.getNegative().getBucketCountsList(),
point.getScale()
),
point.getTimeUnixNano() / 1000000
));
}
if (metric.hasSummary()) {
return metric.getSummary().getDataPointsList().stream()
.map(point -> new Summary(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.skywalking.oap.server.receiver.otel.otlp;

import io.opentelemetry.proto.metrics.v1.ExponentialHistogram;
import io.opentelemetry.proto.metrics.v1.ExponentialHistogramDataPoint;
import io.opentelemetry.proto.metrics.v1.Metric;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.prometheus.metrics.Histogram;
import org.apache.skywalking.oap.server.receiver.otel.OtelMetricReceiverConfig;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class OpenTelemetryMetricRequestProcessorTest {

private OtelMetricReceiverConfig config;

private ModuleManager manager;

private OpenTelemetryMetricRequestProcessor metricRequestProcessor;

private Map<String, String> nodeLabels;

@BeforeEach
public void setUp() {
manager = new ModuleManager();
config = new OtelMetricReceiverConfig();
metricRequestProcessor = new OpenTelemetryMetricRequestProcessor(manager, config);
nodeLabels = new HashMap<>();
}

@Test
public void testAdaptExponentialHistogram() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
Class<OpenTelemetryMetricRequestProcessor> clazz = OpenTelemetryMetricRequestProcessor.class;
Method adaptMetricsMethod = clazz.getDeclaredMethod("adaptMetrics", Map.class, Metric.class);
adaptMetricsMethod.setAccessible(true);

// number is 4, 7, 9
ExponentialHistogramDataPoint.Buckets positiveBuckets = ExponentialHistogramDataPoint.Buckets.newBuilder()
mufiye marked this conversation as resolved.
Show resolved Hide resolved
.setOffset(10)
.addBucketCounts(
1) // (0, 6.72]
.addBucketCounts(
1
) // (6.72, 8]
.addBucketCounts(
1
) // (8, 9.51]
.build();
// number is -14, -17, -18, -21
ExponentialHistogramDataPoint.Buckets negativeBuckets = ExponentialHistogramDataPoint.Buckets.newBuilder()
.setOffset(15)
.addBucketCounts(
1
) // (-16, -13.45]
.addBucketCounts(
2
) // (-19.02, -16]
.addBucketCounts(
1
) // (NEGATIVE_INFINITY, -19.02]
.build();
ExponentialHistogramDataPoint dataPoint = ExponentialHistogramDataPoint.newBuilder()
.setCount(7)
.setSum(-50)
.setScale(2)
.setPositive(positiveBuckets)
.setNegative(negativeBuckets)
.setTimeUnixNano(1000000)
.build();
ExponentialHistogram exponentialHistogram = ExponentialHistogram.newBuilder()
.addDataPoints(dataPoint)
.build();
Metric metric = Metric.newBuilder()
.setName("test_metric")
.setExponentialHistogram(exponentialHistogram)
.build();

Stream<Histogram> stream = (Stream<Histogram>) adaptMetricsMethod.invoke(
metricRequestProcessor, nodeLabels, metric);
List<Histogram> list = stream.toList();
Histogram histogramMetric = list.get(0);
assertEquals("test_metric", histogramMetric.getName());
assertEquals(1, histogramMetric.getTimestamp());
assertEquals(7, histogramMetric.getSampleCount());
assertEquals(-50, histogramMetric.getSampleSum());

// validate the key and value of bucket
double base = Math.pow(2, Math.pow(2, -2));

assertTrue(histogramMetric.getBuckets().containsKey(Math.pow(base, 11)));
assertEquals(1, histogramMetric.getBuckets().get(Math.pow(base, 11)));

assertTrue(histogramMetric.getBuckets().containsKey(Math.pow(base, 12)));
assertEquals(1, histogramMetric.getBuckets().get(Math.pow(base, 12)));

assertTrue(histogramMetric.getBuckets().containsKey(Double.POSITIVE_INFINITY));
assertEquals(1, histogramMetric.getBuckets().get(Double.POSITIVE_INFINITY));

assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 15)));
assertEquals(1, histogramMetric.getBuckets().get(-Math.pow(base, 15)));

assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 16)));
assertEquals(2, histogramMetric.getBuckets().get(-Math.pow(base, 16)));

assertTrue(histogramMetric.getBuckets().containsKey(-Math.pow(base, 17)));
assertEquals(1, histogramMetric.getBuckets().get(-Math.pow(base, 17)));
}
}