Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Align DeltaHistogram in SignalFx registry with count and total #3799

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,38 @@
*/
final class CumulativeHistogramConfigUtil {

static DistributionStatisticConfig updateConfig(DistributionStatisticConfig distributionStatisticConfig) {
private static final double[] EMPTY_SLO = new double[0];

static DistributionStatisticConfig updateConfig(DistributionStatisticConfig distributionStatisticConfig,
boolean isDelta) {
double[] sloBoundaries = distributionStatisticConfig.getServiceLevelObjectiveBoundaries();
if (sloBoundaries == null || sloBoundaries.length == 0) {
return distributionStatisticConfig;
}
double[] newSloBoundaries = sloBoundaries;
// Add the +Inf bucket since the "count" resets every export.
if (!isPositiveInf(sloBoundaries[sloBoundaries.length - 1])) {
newSloBoundaries = Arrays.copyOf(sloBoundaries, sloBoundaries.length + 1);
newSloBoundaries[newSloBoundaries.length - 1] = Double.MAX_VALUE;
}

return DistributionStatisticConfig.builder()
// Set the expiration duration for the histogram counts to be effectively
// infinite.
// Without this, the counts are reset every expiry duration.
.expiry(Duration.ofNanos(Long.MAX_VALUE)) // effectively infinite
.bufferLength(1)
.serviceLevelObjectives(newSloBoundaries)
// If delta Histograms are enabled, empty the slo's and use
// StepBucketHistogram.
.serviceLevelObjectives(isDelta ? EMPTY_SLO : addPositiveInfBucket(sloBoundaries))
.build()
.merge(distributionStatisticConfig);
}

static double[] addPositiveInfBucket(double[] sloBoundaries) {
double[] newSloBoundaries = sloBoundaries;
// Add the +Inf bucket since the "count" resets every export.
if (!isPositiveInf(sloBoundaries[sloBoundaries.length - 1])) {
newSloBoundaries = Arrays.copyOf(sloBoundaries, sloBoundaries.length + 1);
newSloBoundaries[newSloBoundaries.length - 1] = Double.MAX_VALUE;
}
return newSloBoundaries;
}

private static boolean isPositiveInf(double bucket) {
return bucket == Double.POSITIVE_INFINITY || bucket == Double.MAX_VALUE || (long) bucket == Long.MAX_VALUE;
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,88 +16,69 @@
package io.micrometer.signalfx;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.AbstractDistributionSummary;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.micrometer.core.instrument.step.StepTuple2;

import java.util.concurrent.atomic.DoubleAdder;
import java.util.concurrent.atomic.LongAdder;
import io.micrometer.core.instrument.distribution.StepBucketHistogram;
import io.micrometer.core.instrument.step.StepDistributionSummary;

/**
* This class is mostly the same as
* {@link io.micrometer.core.instrument.step.StepDistributionSummary}, with one notable
* difference: the {@link DistributionStatisticConfig} is modified before being passed to
* the super class constructor - that forces the histogram generated by this meter to be
* cumulative.
* A StepDistributionSummary which provides support for multiple flavours of Histograms to
* be recorded based on {@link SignalFxConfig#publishCumulativeHistogram()} and
* {@link SignalFxConfig#publishDeltaHistogram()}.
*
* @author Bogdan Drutu
* @author Mateusz Rzeszutek
* @author Lenin Jaganathan
*/
final class SignalfxDistributionSummary extends AbstractDistributionSummary {

private final LongAdder count = new LongAdder();

private final DoubleAdder total = new DoubleAdder();

private final StepTuple2<Long, Double> countTotal;

private final TimeWindowMax max;
final class SignalfxDistributionSummary extends StepDistributionSummary {
lenin-jaganathan marked this conversation as resolved.
Show resolved Hide resolved

@Nullable
private final DeltaHistogramCounts deltaHistogramCounts;
private final StepBucketHistogram stepBucketHistogram;

SignalfxDistributionSummary(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
double scale, long stepMillis, boolean isDelta) {
super(id, clock, CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig), scale, false);
this.countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0.0, count::sumThenReset, total::sumThenReset);
max = new TimeWindowMax(clock, distributionStatisticConfig);
if (distributionStatisticConfig.isPublishingHistogram() && isDelta) {
deltaHistogramCounts = new DeltaHistogramCounts();
super(id, clock, distributionStatisticConfig, scale, stepMillis, defaultHistogram(clock,
CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig, isDelta), false));

double[] slo = distributionStatisticConfig.getServiceLevelObjectiveBoundaries();
if (slo != null && slo.length > 0 && isDelta) {
stepBucketHistogram = new StepBucketHistogram(clock, stepMillis,
DistributionStatisticConfig.builder()
.serviceLevelObjectives(CumulativeHistogramConfigUtil.addPositiveInfBucket(slo))
.build()
.merge(distributionStatisticConfig),
false, true);
}
else {
deltaHistogramCounts = null;
stepBucketHistogram = null;
}
}

@Override
protected void recordNonNegative(double amount) {
count.increment();
total.add(amount);
max.record(amount);
if (stepBucketHistogram != null) {
stepBucketHistogram.recordDouble(amount);
}
super.recordNonNegative(amount);
}

@Override
public long count() {
return countTotal.poll1();
}

@Override
public double totalAmount() {
return countTotal.poll2();
}

@Override
public double max() {
return max.poll();
if (stepBucketHistogram != null) {
stepBucketHistogram.poll();
lenin-jaganathan marked this conversation as resolved.
Show resolved Hide resolved
}
return super.count();
}

@Override
public HistogramSnapshot takeSnapshot() {
HistogramSnapshot currentSnapshot = super.takeSnapshot();
if (deltaHistogramCounts == null) {
if (stepBucketHistogram == null) {
return currentSnapshot;
}
return new HistogramSnapshot(currentSnapshot.count(), // Already delta in sfx
// implementation.
currentSnapshot.total(), // Already delta in sfx implementation.
currentSnapshot.max(), // Max cannot be calculated as delta, keep the
// current.
currentSnapshot.percentileValues(), // No changes to the percentile
// values.
deltaHistogramCounts.calculate(currentSnapshot.histogramCounts()), currentSnapshot::outputSummary);
return new HistogramSnapshot(currentSnapshot.count(), currentSnapshot.total(), currentSnapshot.max(),
currentSnapshot.percentileValues(), stepBucketHistogram.poll(), currentSnapshot::outputSummary);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,91 +16,72 @@
package io.micrometer.signalfx;

import io.micrometer.common.lang.Nullable;
import io.micrometer.core.instrument.AbstractTimer;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.distribution.DistributionStatisticConfig;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.distribution.TimeWindowMax;
import io.micrometer.core.instrument.distribution.StepBucketHistogram;
import io.micrometer.core.instrument.distribution.pause.PauseDetector;
import io.micrometer.core.instrument.step.StepTuple2;
import io.micrometer.core.instrument.util.TimeUtils;
import io.micrometer.core.instrument.step.StepTimer;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;

/**
* This class is mostly the same as {@link io.micrometer.core.instrument.step.StepTimer},
* with one notable difference: the {@link DistributionStatisticConfig} is modified before
* being passed to the super class constructor - that forces the histogram generated by
* this meter to be cumulative.
* A StepTimer which provides support for multiple flavours of Histograms to be recorded
* based on {@link SignalFxConfig#publishCumulativeHistogram()} and
* {@link SignalFxConfig#publishDeltaHistogram()}.
*
* @author Bogdan Drutu
* @author Mateusz Rzeszutek
* @author Lenin Jaganathan
*/
final class SignalfxTimer extends AbstractTimer {

private final LongAdder count = new LongAdder();

private final LongAdder total = new LongAdder();

private final StepTuple2<Long, Long> countTotal;

private final TimeWindowMax max;
final class SignalfxTimer extends StepTimer {

@Nullable
private final DeltaHistogramCounts deltaHistogramCounts;
private final StepBucketHistogram stepBucketHistogram;

SignalfxTimer(Id id, Clock clock, DistributionStatisticConfig distributionStatisticConfig,
PauseDetector pauseDetector, TimeUnit baseTimeUnit, long stepMillis, boolean isDelta) {
super(id, clock, CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig), pauseDetector,
baseTimeUnit, false);
countTotal = new StepTuple2<>(clock, stepMillis, 0L, 0L, count::sumThenReset, total::sumThenReset);
max = new TimeWindowMax(clock, distributionStatisticConfig);
if (distributionStatisticConfig.isPublishingHistogram() && isDelta) {
deltaHistogramCounts = new DeltaHistogramCounts();
super(id, clock, distributionStatisticConfig, pauseDetector, baseTimeUnit, stepMillis, defaultHistogram(clock,
CumulativeHistogramConfigUtil.updateConfig(distributionStatisticConfig, isDelta), false));

double[] slo = distributionStatisticConfig.getServiceLevelObjectiveBoundaries();
if (slo != null && slo.length > 0 && isDelta) {
stepBucketHistogram = new StepBucketHistogram(clock, stepMillis,
DistributionStatisticConfig.builder()
.serviceLevelObjectives(CumulativeHistogramConfigUtil.addPositiveInfBucket(slo))
.build()
.merge(distributionStatisticConfig),
false, true);
}
else {
deltaHistogramCounts = null;
stepBucketHistogram = null;
}
}

@Override
protected void recordNonNegative(long amount, TimeUnit unit) {
final long nanoAmount = (long) TimeUtils.convert(amount, unit, TimeUnit.NANOSECONDS);
count.increment();
total.add(nanoAmount);
max.record(amount, unit);
if (stepBucketHistogram != null) {
stepBucketHistogram.recordLong(TimeUnit.NANOSECONDS.convert(amount, unit));
}
super.recordNonNegative(amount, unit);
}

@Override
public long count() {
return countTotal.poll1();
}

@Override
public double totalTime(TimeUnit unit) {
return TimeUtils.nanosToUnit(countTotal.poll2(), unit);
}

@Override
public double max(TimeUnit unit) {
return max.poll(unit);
if (stepBucketHistogram != null) {
stepBucketHistogram.poll();
}
return super.count();
}

@Override
public HistogramSnapshot takeSnapshot() {
HistogramSnapshot currentSnapshot = super.takeSnapshot();
if (deltaHistogramCounts == null) {
if (stepBucketHistogram == null) {
return currentSnapshot;
}
return new HistogramSnapshot(currentSnapshot.count(), // Already delta in sfx
// implementation
currentSnapshot.total(), // Already delta in sfx implementation
currentSnapshot.max(), // Max cannot be calculated as delta, keep the
// current.
currentSnapshot.percentileValues(), // No changes to the percentile
// values.
deltaHistogramCounts.calculate(currentSnapshot.histogramCounts()), currentSnapshot::outputSummary);
return new HistogramSnapshot(currentSnapshot.count(), currentSnapshot.total(), currentSnapshot.max(),
currentSnapshot.percentileValues(), stepBucketHistogram.poll(), currentSnapshot::outputSummary);
}

}

This file was deleted.

Loading