Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazily initialize exponential histogram buckets #5023

Merged
merged 1 commit into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -421,8 +421,8 @@ void exponentialHistogramDataPoints() {
1,
null,
null,
new TestExponentialHistogramBuckets(0, Collections.emptyList()),
new TestExponentialHistogramBuckets(0, Collections.emptyList()),
new TestExponentialHistogramBuckets(0, 0, Collections.emptyList()),
new TestExponentialHistogramBuckets(0, 0, Collections.emptyList()),
123,
456,
Attributes.empty(),
Expand All @@ -433,8 +433,8 @@ void exponentialHistogramDataPoints() {
1,
3.3,
80.1,
new TestExponentialHistogramBuckets(1, ImmutableList.of(1L, 0L, 2L)),
new TestExponentialHistogramBuckets(0, Collections.emptyList()),
new TestExponentialHistogramBuckets(0, 1, ImmutableList.of(1L, 0L, 2L)),
new TestExponentialHistogramBuckets(0, 0, Collections.emptyList()),
123,
456,
Attributes.of(stringKey("key"), "value"),
Expand Down Expand Up @@ -837,9 +837,9 @@ void toProtoMetric_exponentialHistogram() {
20.1,
44.3,
new TestExponentialHistogramBuckets(
-1, ImmutableList.of(0L, 128L, 1L << 32)),
20, -1, ImmutableList.of(0L, 128L, 1L << 32)),
new TestExponentialHistogramBuckets(
1, ImmutableList.of(0L, 128L, 1L << 32)),
20, 1, ImmutableList.of(0L, 128L, 1L << 32)),
123,
456,
KV_ATTR,
Expand Down Expand Up @@ -1155,14 +1155,21 @@ private static String toJson(Marshaler marshaler) {
*/
private static class TestExponentialHistogramBuckets implements ExponentialHistogramBuckets {

private final int scale;
private final int offset;
private final List<Long> bucketCounts;

TestExponentialHistogramBuckets(int offset, List<Long> bucketCounts) {
TestExponentialHistogramBuckets(int scale, int offset, List<Long> bucketCounts) {
this.scale = scale;
this.offset = offset;
this.bucketCounts = bucketCounts;
}

@Override
public int getScale() {
return scale;
}

@Override
public int getOffset() {
return offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,25 @@

package io.opentelemetry.sdk.metrics.internal.aggregator;

import com.google.auto.value.AutoValue;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.sdk.common.InstrumentationScopeInfo;
import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.data.MetricData;
import io.opentelemetry.sdk.metrics.internal.data.ImmutableMetricData;
import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.ExponentialHistogramBuckets;
import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.ExponentialHistogramData;
import io.opentelemetry.sdk.metrics.internal.descriptor.MetricDescriptor;
import io.opentelemetry.sdk.metrics.internal.exemplar.ExemplarReservoir;
import io.opentelemetry.sdk.metrics.internal.state.ExponentialCounterFactory;
import io.opentelemetry.sdk.resources.Resource;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/**
* Aggregator that generates exponential histograms.
Expand Down Expand Up @@ -68,15 +73,15 @@ public ExponentialHistogramAccumulation merge(
ExponentialHistogramAccumulation previous, ExponentialHistogramAccumulation current) {

// Create merged buckets
DoubleExponentialHistogramBuckets posBuckets =
ExponentialHistogramBuckets posBuckets =
merge(previous.getPositiveBuckets(), current.getPositiveBuckets());
DoubleExponentialHistogramBuckets negBuckets =
ExponentialHistogramBuckets negBuckets =
merge(previous.getNegativeBuckets(), current.getNegativeBuckets());

// resolve possible scale difference due to merge
int commonScale = Math.min(posBuckets.getScale(), negBuckets.getScale());
posBuckets.downscale(posBuckets.getScale() - commonScale);
negBuckets.downscale(negBuckets.getScale() - commonScale);
posBuckets = downscale(posBuckets, commonScale);
negBuckets = downscale(negBuckets, commonScale);
double min = -1;
double max = -1;
if (previous.hasMinMax() && current.hasMinMax()) {
Expand Down Expand Up @@ -104,16 +109,50 @@ public ExponentialHistogramAccumulation merge(
/**
* Merge the exponential histogram buckets. If {@code a} is empty, return {@code b}. If {@code b}
* is empty, return {@code a}. Else merge {@code b} into {@code a}.
*
* <p>Assumes {@code a} and {@code b} are either {@link DoubleExponentialHistogramBuckets} or
* {@link EmptyExponentialHistogramBuckets}.
*/
private static DoubleExponentialHistogramBuckets merge(
DoubleExponentialHistogramBuckets a, DoubleExponentialHistogramBuckets b) {
if (b.getTotalCount() == 0) {
return a;
} else if (a.getTotalCount() == 0) {
private static ExponentialHistogramBuckets merge(
ExponentialHistogramBuckets a, ExponentialHistogramBuckets b) {
if (a instanceof EmptyExponentialHistogramBuckets || a.getTotalCount() == 0) {
return b;
}
a.mergeInto(b);
return a;
if (b instanceof EmptyExponentialHistogramBuckets || b.getTotalCount() == 0) {
return a;
}
if ((a instanceof DoubleExponentialHistogramBuckets)
&& (b instanceof DoubleExponentialHistogramBuckets)) {
DoubleExponentialHistogramBuckets a1 = (DoubleExponentialHistogramBuckets) a;
DoubleExponentialHistogramBuckets b2 = (DoubleExponentialHistogramBuckets) b;
a1.mergeInto(b2);
return a1;
}
throw new IllegalStateException(
"Unable to merge ExponentialHistogramBuckets. Unrecognized implementation.");
}

/**
* Downscale the {@code buckets} to the {@code targetScale}.
*
* <p>Assumes {@code a} and {@code b} are either {@link DoubleExponentialHistogramBuckets} or
* {@link EmptyExponentialHistogramBuckets}.
*/
private static ExponentialHistogramBuckets downscale(
ExponentialHistogramBuckets buckets, int targetScale) {
if (buckets.getScale() == targetScale) {
return buckets;
}
if (buckets instanceof EmptyExponentialHistogramBuckets) {
return EmptyExponentialHistogramBuckets.get(targetScale);
}
if (buckets instanceof DoubleExponentialHistogramBuckets) {
DoubleExponentialHistogramBuckets buckets1 = (DoubleExponentialHistogramBuckets) buckets;
buckets1.downscale(buckets1.getScale() - targetScale);
return buckets1;
}
throw new IllegalStateException(
"Unable to merge ExponentialHistogramBuckets. Unrecognized implementation");
}

@Override
Expand Down Expand Up @@ -144,53 +183,66 @@ public MetricData toMetricData(

static final class Handle
extends AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> {
private final DoubleExponentialHistogramBuckets positiveBuckets;
private final DoubleExponentialHistogramBuckets negativeBuckets;
private final ExponentialBucketStrategy bucketStrategy;
@Nullable private DoubleExponentialHistogramBuckets positiveBuckets;
@Nullable private DoubleExponentialHistogramBuckets negativeBuckets;
private long zeroCount;
private double sum;
private double min;
private double max;
private long count;
private int scale;

Handle(
ExemplarReservoir<DoubleExemplarData> reservoir, ExponentialBucketStrategy bucketStrategy) {
super(reservoir);
this.bucketStrategy = bucketStrategy;
this.sum = 0;
this.zeroCount = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
this.positiveBuckets = bucketStrategy.newBuckets();
this.negativeBuckets = bucketStrategy.newBuckets();
this.scale = bucketStrategy.getStartingScale();
}

@Override
protected synchronized ExponentialHistogramAccumulation doAccumulateThenReset(
List<DoubleExemplarData> exemplars) {
ExponentialHistogramBuckets positiveBuckets;
ExponentialHistogramBuckets negativeBuckets;
if (this.positiveBuckets != null) {
positiveBuckets = this.positiveBuckets.copy();
this.positiveBuckets.clear();
} else {
positiveBuckets = EmptyExponentialHistogramBuckets.get(scale);
}
if (this.negativeBuckets != null) {
negativeBuckets = this.negativeBuckets.copy();
this.negativeBuckets.clear();
} else {
negativeBuckets = EmptyExponentialHistogramBuckets.get(scale);
}
ExponentialHistogramAccumulation acc =
ExponentialHistogramAccumulation.create(
this.positiveBuckets.getScale(),
scale,
sum,
this.count > 0,
this.count > 0 ? this.min : -1,
this.count > 0 ? this.max : -1,
positiveBuckets.copy(),
negativeBuckets.copy(),
positiveBuckets,
negativeBuckets,
zeroCount,
exemplars);
this.sum = 0;
this.zeroCount = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
this.positiveBuckets.clear();
this.negativeBuckets.clear();
return acc;
}

@Override
protected synchronized void doRecordDouble(double value) {

// ignore NaN and infinity
if (!Double.isFinite(value)) {
return;
Expand All @@ -203,14 +255,28 @@ protected synchronized void doRecordDouble(double value) {
count++;

int c = Double.compare(value, 0);
DoubleExponentialHistogramBuckets buckets;
if (c == 0) {
zeroCount++;
return;
} else if (c > 0) {
// Initialize positive buckets if needed, adjusting to current scale
if (positiveBuckets == null) {
positiveBuckets = bucketStrategy.newBuckets();
positiveBuckets.downscale(positiveBuckets.getScale() - scale);
}
buckets = positiveBuckets;
} else {
// Initialize negative buckets if needed, adjusting to current scale
if (negativeBuckets == null) {
negativeBuckets = bucketStrategy.newBuckets();
negativeBuckets.downscale(negativeBuckets.getScale() - scale);
}
buckets = negativeBuckets;
}

// Record; If recording fails, calculate scale reduction and scale down to fit new value.
// 2nd attempt at recording should work with new scale
DoubleExponentialHistogramBuckets buckets = (c > 0) ? positiveBuckets : negativeBuckets;
// TODO: We should experiment with downscale on demand during sync execution and only
// unifying scale factor between positive/negative at collection time (doAccumulate).
if (!buckets.record(value)) {
Expand All @@ -227,8 +293,31 @@ protected void doRecordLong(long value) {
}

void downScale(int by) {
positiveBuckets.downscale(by);
negativeBuckets.downscale(by);
if (positiveBuckets != null) {
positiveBuckets.downscale(by);
scale = positiveBuckets.getScale();
}
if (negativeBuckets != null) {
negativeBuckets.downscale(by);
scale = negativeBuckets.getScale();
}
}
}

@AutoValue
abstract static class EmptyExponentialHistogramBuckets implements ExponentialHistogramBuckets {

private static final Map<Integer, ExponentialHistogramBuckets> ZERO_BUCKETS =
new ConcurrentHashMap<>();

EmptyExponentialHistogramBuckets() {}

static ExponentialHistogramBuckets get(int scale) {
return ZERO_BUCKETS.computeIfAbsent(
scale,
scale1 ->
new AutoValue_DoubleExponentialHistogramAggregator_EmptyExponentialHistogramBuckets(
scale1, 0, Collections.emptyList(), 0));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ void mergeInto(DoubleExponentialHistogramBuckets other) {
this.totalCount += other.totalCount;
}

int getScale() {
@Override
public int getScale() {
return scale;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ DoubleExponentialHistogramBuckets newBuckets() {
return new DoubleExponentialHistogramBuckets(startingScale, maxBuckets, counterFactory);
}

int getStartingScale() {
return startingScale;
}

/** Create a new strategy for generating Exponential Buckets. */
static ExponentialBucketStrategy newStrategy(
int maxBuckets, ExponentialCounterFactory counterFactory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.google.auto.value.AutoValue;
import io.opentelemetry.sdk.metrics.data.DoubleExemplarData;
import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.ExponentialHistogramBuckets;
import java.util.List;

@AutoValue
Expand All @@ -20,8 +21,8 @@ static ExponentialHistogramAccumulation create(
boolean hasMinMax,
double min,
double max,
DoubleExponentialHistogramBuckets positiveBuckets,
DoubleExponentialHistogramBuckets negativeBuckets,
ExponentialHistogramBuckets positiveBuckets,
ExponentialHistogramBuckets negativeBuckets,
long zeroCount,
List<DoubleExemplarData> exemplars) {
return new AutoValue_ExponentialHistogramAccumulation(
Expand All @@ -38,9 +39,9 @@ static ExponentialHistogramAccumulation create(

abstract double getMax();

abstract DoubleExponentialHistogramBuckets getPositiveBuckets();
abstract ExponentialHistogramBuckets getPositiveBuckets();

abstract DoubleExponentialHistogramBuckets getNegativeBuckets();
abstract ExponentialHistogramBuckets getNegativeBuckets();

abstract long getZeroCount();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
@Immutable
public interface ExponentialHistogramBuckets {

/** The scale of the buckets. Must align with {@link ExponentialHistogramPointData#getScale()}. */
int getScale();

/**
* The offset shifts the bucket boundaries according to <code>lower_bound = base^(offset+i).
* </code>.
Expand Down
Loading