Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse AggregatorHandle with cumulative temporality #5142

Merged
merged 3 commits into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ static Aggregator<Object, DoubleExemplarData> drop() {
default T accumulateLongMeasurement(long value, Attributes attributes, Context context) {
AggregatorHandle<T, U> handle = createHandle();
handle.recordLong(value, attributes, context);
return handle.accumulateThenReset(attributes);
return handle.accumulateThenMaybeReset(attributes, /* reset= */ true);
}

/**
Expand All @@ -73,21 +73,9 @@ default T accumulateLongMeasurement(long value, Attributes attributes, Context c
default T accumulateDoubleMeasurement(double value, Attributes attributes, Context context) {
AggregatorHandle<T, U> handle = createHandle();
handle.recordDouble(value, attributes, context);
return handle.accumulateThenReset(attributes);
return handle.accumulateThenMaybeReset(attributes, /* reset= */ true);
}

/**
* Returns the result of the merge of the given accumulations.
*
* <p>This should always assume that the accumulations do not overlap and merge together for a new
* cumulative report.
*
* @param previousCumulative the previously captured accumulation
* @param delta the newly captured (delta) accumulation
* @return the result of the merge of the given accumulations.
*/
T merge(T previousCumulative, T delta);

/**
* Returns a new DELTA aggregation by comparing two cumulative measurements.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,22 @@ public final boolean tryUnmap() {
}

/**
* Returns the current value into as {@link T} and resets the current value in this {@code
* Aggregator}.
* Returns the current value into as {@link T}. If {@code reset} is {@code true}, resets the
* current value in this {@code Aggregator}.
*/
@Nullable
public final T accumulateThenReset(Attributes attributes) {
public final T accumulateThenMaybeReset(Attributes attributes, boolean reset) {
if (!hasRecordings) {
return null;
}
hasRecordings = false;
return doAccumulateThenReset(exemplarReservoir.collectAndReset(attributes));
if (reset) {
hasRecordings = false;
}
return doAccumulateThenMaybeReset(exemplarReservoir.collectAndReset(attributes), reset);
}

/** Implementation of the {@code accumulateThenReset}. */
protected abstract T doAccumulateThenReset(List<U> exemplars);
/** Implementation of the {@link #accumulateThenMaybeReset(Attributes, boolean)}. */
protected abstract T doAccumulateThenMaybeReset(List<U> exemplars, boolean reset);

@Override
public final void recordLong(long value, Attributes attributes, Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,40 +62,6 @@ public AggregatorHandle<ExplicitBucketHistogramAccumulation, DoubleExemplarData>
return new Handle(this.boundaries, reservoirSupplier.get());
}

/**
* Return the result of the merge of two histogram accumulations. As long as one Aggregator
* instance produces all Accumulations with constant boundaries we don't need to worry about
* merging accumulations with different boundaries.
*/
@Override
public ExplicitBucketHistogramAccumulation merge(
ExplicitBucketHistogramAccumulation previous, ExplicitBucketHistogramAccumulation current) {
long[] previousCounts = previous.getCounts();
long[] mergedCounts = new long[previousCounts.length];
for (int i = 0; i < previousCounts.length; ++i) {
mergedCounts[i] = previousCounts[i] + current.getCounts()[i];
}
double min = -1;
double max = -1;
if (previous.hasMinMax() && current.hasMinMax()) {
min = Math.min(previous.getMin(), current.getMin());
max = Math.max(previous.getMax(), current.getMax());
} else if (previous.hasMinMax()) {
min = previous.getMin();
max = previous.getMax();
} else if (current.hasMinMax()) {
min = current.getMin();
max = current.getMax();
}
return ExplicitBucketHistogramAccumulation.create(
previous.getSum() + current.getSum(),
previous.hasMinMax() || current.hasMinMax(),
min,
max,
mergedCounts,
current.getExemplars());
}

@Override
public MetricData toMetricData(
Resource resource,
Expand Down Expand Up @@ -156,8 +122,8 @@ static final class Handle
}

@Override
protected ExplicitBucketHistogramAccumulation doAccumulateThenReset(
List<DoubleExemplarData> exemplars) {
protected ExplicitBucketHistogramAccumulation doAccumulateThenMaybeReset(
List<DoubleExemplarData> exemplars, boolean reset) {
lock.lock();
try {
ExplicitBucketHistogramAccumulation acc =
Expand All @@ -168,11 +134,13 @@ protected ExplicitBucketHistogramAccumulation doAccumulateThenReset(
this.count > 0 ? this.max : -1,
Arrays.copyOf(counts, counts.length),
exemplars);
this.sum = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
Arrays.fill(this.counts, 0);
if (reset) {
this.sum = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
Arrays.fill(this.counts, 0);
}
return acc;
} finally {
lock.unlock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,99 +56,6 @@ public AggregatorHandle<ExponentialHistogramAccumulation, DoubleExemplarData> cr
return new Handle(reservoirSupplier.get(), maxBuckets, maxScale);
}

/**
* Merge the exponential histogram accumulations. Mutates the {@link
* ExponentialHistogramAccumulation#getPositiveBuckets()} and {@link
* ExponentialHistogramAccumulation#getNegativeBuckets()} of {@code previous}. Mutating buckets is
* acceptable because copies are already made in {@link Handle#doAccumulateThenReset(List)}.
*/
@Override
public ExponentialHistogramAccumulation merge(
ExponentialHistogramAccumulation previous, ExponentialHistogramAccumulation current) {

// Create merged buckets
ExponentialHistogramBuckets posBuckets =
merge(previous.getPositiveBuckets(), current.getPositiveBuckets());
ExponentialHistogramBuckets negBuckets =
merge(previous.getNegativeBuckets(), current.getNegativeBuckets());

// resolve possible scale difference due to merge
int commonScale = Math.min(posBuckets.getScale(), negBuckets.getScale());
posBuckets = downscale(posBuckets, commonScale);
negBuckets = downscale(negBuckets, commonScale);
double min = -1;
double max = -1;
if (previous.hasMinMax() && current.hasMinMax()) {
min = Math.min(previous.getMin(), current.getMin());
max = Math.max(previous.getMax(), current.getMax());
} else if (previous.hasMinMax()) {
min = previous.getMin();
max = previous.getMax();
} else if (current.hasMinMax()) {
min = current.getMin();
max = current.getMax();
}
return ExponentialHistogramAccumulation.create(
commonScale,
previous.getSum() + current.getSum(),
previous.hasMinMax() || current.hasMinMax(),
min,
max,
posBuckets,
negBuckets,
previous.getZeroCount() + current.getZeroCount(),
current.getExemplars());
}

/**
* Merge the exponential histogram buckets. If {@code a} is empty, return {@code b}. If {@code b}
* is empty, return {@code a}. Else merge {@code b} into {@code a}.
*
* <p>Assumes {@code a} and {@code b} are either {@link DoubleExponentialHistogramBuckets} or
* {@link EmptyExponentialHistogramBuckets}.
*/
private static ExponentialHistogramBuckets merge(
ExponentialHistogramBuckets a, ExponentialHistogramBuckets b) {
if (a instanceof EmptyExponentialHistogramBuckets || a.getTotalCount() == 0) {
return b;
}
if (b instanceof EmptyExponentialHistogramBuckets || b.getTotalCount() == 0) {
return a;
}
if ((a instanceof DoubleExponentialHistogramBuckets)
&& (b instanceof DoubleExponentialHistogramBuckets)) {
DoubleExponentialHistogramBuckets a1 = (DoubleExponentialHistogramBuckets) a;
DoubleExponentialHistogramBuckets b2 = (DoubleExponentialHistogramBuckets) b;
a1.mergeInto(b2);
return a1;
}
throw new IllegalStateException(
"Unable to merge ExponentialHistogramBuckets. Unrecognized implementation.");
}

/**
* Downscale the {@code buckets} to the {@code targetScale}.
*
* <p>Assumes {@code a} and {@code b} are either {@link DoubleExponentialHistogramBuckets} or
* {@link EmptyExponentialHistogramBuckets}.
*/
private static ExponentialHistogramBuckets downscale(
ExponentialHistogramBuckets buckets, int targetScale) {
if (buckets.getScale() == targetScale) {
return buckets;
}
if (buckets instanceof EmptyExponentialHistogramBuckets) {
return EmptyExponentialHistogramBuckets.get(targetScale);
}
if (buckets instanceof DoubleExponentialHistogramBuckets) {
DoubleExponentialHistogramBuckets buckets1 = (DoubleExponentialHistogramBuckets) buckets;
buckets1.downscale(buckets1.getScale() - targetScale);
return buckets1;
}
throw new IllegalStateException(
"Unable to merge ExponentialHistogramBuckets. Unrecognized implementation");
}

@Override
public MetricData toMetricData(
Resource resource,
Expand Down Expand Up @@ -199,41 +106,41 @@ static final class Handle
}

@Override
protected synchronized ExponentialHistogramAccumulation doAccumulateThenReset(
List<DoubleExemplarData> exemplars) {
ExponentialHistogramBuckets positiveBuckets;
ExponentialHistogramBuckets negativeBuckets;
if (this.positiveBuckets != null) {
positiveBuckets = this.positiveBuckets.copy();
this.positiveBuckets.clear();
} else {
positiveBuckets = EmptyExponentialHistogramBuckets.get(scale);
}
if (this.negativeBuckets != null) {
negativeBuckets = this.negativeBuckets.copy();
this.negativeBuckets.clear();
} else {
negativeBuckets = EmptyExponentialHistogramBuckets.get(scale);
}
protected synchronized ExponentialHistogramAccumulation doAccumulateThenMaybeReset(
List<DoubleExemplarData> exemplars, boolean reset) {
ExponentialHistogramAccumulation acc =
ExponentialHistogramAccumulation.create(
scale,
sum,
this.count > 0,
this.count > 0 ? this.min : -1,
this.count > 0 ? this.max : -1,
positiveBuckets,
negativeBuckets,
resolveBuckets(this.positiveBuckets, scale, reset),
resolveBuckets(this.negativeBuckets, scale, reset),
zeroCount,
exemplars);
this.sum = 0;
this.zeroCount = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
if (reset) {
this.sum = 0;
this.zeroCount = 0;
this.min = Double.MAX_VALUE;
this.max = -1;
this.count = 0;
}
return acc;
}

private static ExponentialHistogramBuckets resolveBuckets(
@Nullable DoubleExponentialHistogramBuckets buckets, int scale, boolean reset) {
if (buckets == null) {
return EmptyExponentialHistogramBuckets.get(scale);
}
ExponentialHistogramBuckets copy = buckets.copy();
if (reset) {
buckets.clear();
}
return copy;
}

@Override
protected synchronized void doRecordDouble(double value) {
// ignore NaN and infinity
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.opentelemetry.sdk.resources.Resource;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -48,11 +49,6 @@ public AggregatorHandle<DoubleAccumulation, DoubleExemplarData> createHandle() {
return new Handle(reservoirSupplier.get());
}

@Override
public DoubleAccumulation merge(DoubleAccumulation previous, DoubleAccumulation current) {
return current;
}

@Override
public DoubleAccumulation diff(DoubleAccumulation previous, DoubleAccumulation current) {
return current;
Expand Down Expand Up @@ -94,8 +90,12 @@ private Handle(ExemplarReservoir<DoubleExemplarData> reservoir) {
}

@Override
protected DoubleAccumulation doAccumulateThenReset(List<DoubleExemplarData> exemplars) {
return DoubleAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars);
protected DoubleAccumulation doAccumulateThenMaybeReset(
List<DoubleExemplarData> exemplars, boolean reset) {
if (reset) {
return DoubleAccumulation.create(this.current.getAndSet(DEFAULT_VALUE), exemplars);
}
return DoubleAccumulation.create(Objects.requireNonNull(this.current.get()), exemplars);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,6 @@ public DoubleAccumulation accumulateDoubleMeasurement(
return DoubleAccumulation.create(value);
}

@Override
public DoubleAccumulation merge(
DoubleAccumulation previousAccumulation, DoubleAccumulation accumulation) {
return DoubleAccumulation.create(
previousAccumulation.getValue() + accumulation.getValue(), accumulation.getExemplars());
}

@Override
public DoubleAccumulation diff(
DoubleAccumulation previousAccumulation, DoubleAccumulation accumulation) {
Expand Down Expand Up @@ -107,8 +100,12 @@ static final class Handle extends AggregatorHandle<DoubleAccumulation, DoubleExe
}

@Override
protected DoubleAccumulation doAccumulateThenReset(List<DoubleExemplarData> exemplars) {
return DoubleAccumulation.create(this.current.sumThenReset(), exemplars);
protected DoubleAccumulation doAccumulateThenMaybeReset(
List<DoubleExemplarData> exemplars, boolean reset) {
if (reset) {
return DoubleAccumulation.create(this.current.sumThenReset(), exemplars);
}
return DoubleAccumulation.create(this.current.sum(), exemplars);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ protected void doRecordLong(long value) {}
protected void doRecordDouble(double value) {}

@Override
protected Object doAccumulateThenReset(List<DoubleExemplarData> exemplars) {
protected Object doAccumulateThenMaybeReset(
List<DoubleExemplarData> exemplars, boolean reset) {
return ACCUMULATION;
}
};
Expand All @@ -49,11 +50,6 @@ public AggregatorHandle<Object, DoubleExemplarData> createHandle() {
return HANDLE;
}

@Override
public Object merge(Object previousAccumulation, Object accumulation) {
return ACCUMULATION;
}

@Override
public MetricData toMetricData(
Resource resource,
Expand Down
Loading