diff --git a/pkg/util/metric/hdrhistogram.go b/pkg/util/metric/hdrhistogram.go index b6b562403e2a..2d8c9dbdd57b 100644 --- a/pkg/util/metric/hdrhistogram.go +++ b/pkg/util/metric/hdrhistogram.go @@ -171,15 +171,19 @@ func (h *HdrHistogram) ToPrometheusMetric() *prometheusgo.Metric { // TotalWindowed implements the WindowedHistogram interface. func (h *HdrHistogram) TotalWindowed() (int64, float64) { - pHist := h.ToPrometheusMetricWindowed().Histogram - return int64(pHist.GetSampleCount()), pHist.GetSampleSum() + h.mu.Lock() + defer h.mu.Unlock() + hist := h.mu.sliding.Merge() + totalSum := float64(hist.TotalCount()) * hist.Mean() + return hist.TotalCount(), totalSum } func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric { hist := &prometheusgo.Histogram{} maybeTick(h.mu.tickHelper) - bars := h.mu.sliding.Current.Distribution() + mergedHist := h.mu.sliding.Merge() + bars := mergedHist.Distribution() hist.Bucket = make([]*prometheusgo.Bucket, 0, len(bars)) var cumCount uint64 @@ -202,7 +206,6 @@ func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric { } hist.SampleCount = &cumCount hist.SampleSum = &sum // can do better here; we approximate in the loop - return &prometheusgo.Metric{ Histogram: hist, } @@ -233,13 +236,12 @@ func (h *HdrHistogram) ValueAtQuantileWindowed(q float64) float64 { func (h *HdrHistogram) Mean() float64 { h.mu.Lock() defer h.mu.Unlock() - return h.mu.cumulative.Mean() } func (h *HdrHistogram) MeanWindowed() float64 { h.mu.Lock() defer h.mu.Unlock() - - return h.mu.sliding.Current.Mean() + hist := h.mu.sliding.Merge() + return hist.Mean() } diff --git a/pkg/util/metric/metric.go b/pkg/util/metric/metric.go index 9674739c3109..ebd0af897268 100644 --- a/pkg/util/metric/metric.go +++ b/pkg/util/metric/metric.go @@ -97,10 +97,11 @@ type WindowedHistogram interface { Total() (int64, float64) // MeanWindowed returns the average of the samples in the current window. MeanWindowed() float64 - // Mean returns the average of the sample in teh cumulative histogram. + // Mean returns the average of the sample in the cumulative histogram. Mean() float64 // ValueAtQuantileWindowed takes a quantile value [0,100] and returns the // interpolated value at that quantile for the windowed histogram. + // This should merge by default. ValueAtQuantileWindowed(q float64) float64 } @@ -368,15 +369,23 @@ func (h *Histogram) ToPrometheusMetric() *prometheusgo.Metric { return m } -// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the right type. +// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the +// right type. func (h *Histogram) ToPrometheusMetricWindowed() *prometheusgo.Metric { h.windowed.Lock() defer h.windowed.Unlock() - m := &prometheusgo.Metric{} - if err := h.windowed.cur.Write(m); err != nil { + cur := &prometheusgo.Metric{} + prev := &prometheusgo.Metric{} + if err := h.windowed.cur.Write(cur); err != nil { panic(err) } - return m + if h.windowed.prev != nil { + if err := h.windowed.prev.Write(prev); err != nil { + panic(err) + } + MergeWindowedHistogram(cur.Histogram, prev.Histogram) + } + return cur } // GetMetadata returns the metric's metadata including the Prometheus @@ -428,7 +437,8 @@ func (h *Histogram) MeanWindowed() float64 { // 2. Since the prometheus client library ensures buckets are in a strictly // increasing order at creation, we do not sort them. func (h *Histogram) ValueAtQuantileWindowed(q float64) float64 { - return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, q) + return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, + q) } var _ PrometheusExportable = (*ManualWindowHistogram)(nil) @@ -592,15 +602,32 @@ func (mwh *ManualWindowHistogram) ToPrometheusMetric() *prometheusgo.Metric { return m } +// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the +// right type. +func (mwh *ManualWindowHistogram) ToPrometheusMetricWindowed() *prometheusgo. + Metric { + cur := &prometheusgo.Metric{} + if err := mwh.mu.cum.Write(cur); err != nil { + panic(err) + } + if mwh.mu.prev != nil { + MergeWindowedHistogram(cur.Histogram, mwh.mu.prev) + } + return cur +} + // TotalWindowed implements the WindowedHistogram interface. func (mwh *ManualWindowHistogram) TotalWindowed() (int64, float64) { mwh.mu.RLock() defer mwh.mu.RUnlock() - return int64(mwh.mu.cur.GetSampleCount()), mwh.mu.cur.GetSampleSum() + pHist := mwh.ToPrometheusMetricWindowed().Histogram + return int64(pHist.GetSampleCount()), pHist.GetSampleSum() } // Total implements the WindowedHistogram interface. func (mwh *ManualWindowHistogram) Total() (int64, float64) { + mwh.mu.RLock() + defer mwh.mu.RUnlock() h := mwh.ToPrometheusMetric().Histogram return int64(h.GetSampleCount()), h.GetSampleSum() } @@ -608,10 +635,13 @@ func (mwh *ManualWindowHistogram) Total() (int64, float64) { func (mwh *ManualWindowHistogram) MeanWindowed() float64 { mwh.mu.RLock() defer mwh.mu.RUnlock() - return mwh.mu.cur.GetSampleSum() / float64(mwh.mu.cur.GetSampleCount()) + pHist := mwh.ToPrometheusMetricWindowed().Histogram + return pHist.GetSampleSum() / float64(pHist.GetSampleCount()) } func (mwh *ManualWindowHistogram) Mean() float64 { + mwh.mu.RLock() + defer mwh.mu.RUnlock() h := mwh.ToPrometheusMetric().Histogram return h.GetSampleSum() / float64(h.GetSampleCount()) } @@ -626,7 +656,7 @@ func (mwh *ManualWindowHistogram) ValueAtQuantileWindowed(q float64) float64 { if mwh.mu.cur == nil { return 0 } - return ValueAtQuantileWindowed(mwh.mu.cur, q) + return ValueAtQuantileWindowed(mwh.ToPrometheusMetricWindowed().Histogram, q) } // A Counter holds a single mutable atomic value. @@ -881,6 +911,21 @@ func (g *GaugeFloat64) GetMetadata() Metadata { return baseMetadata } +// MergeWindowedHistogram adds the bucket counts, sample count, and sample sum +// from the previous windowed histogram to those of the current windowed +// histogram. +// NB: Buckets on each histogram must be the same +func MergeWindowedHistogram(cur *prometheusgo.Histogram, prev *prometheusgo.Histogram) { + for i, bucket := range cur.Bucket { + count := *bucket.CumulativeCount + *prev.Bucket[i].CumulativeCount + *bucket.CumulativeCount = count + } + sampleCount := *cur.SampleCount + *prev.SampleCount + *cur.SampleCount = sampleCount + sampleSum := *cur.SampleSum + *prev.SampleSum + *cur.SampleSum = sampleSum +} + // ValueAtQuantileWindowed takes a quantile value [0,100] and returns the // interpolated value at that quantile for the given histogram. func ValueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float64 { diff --git a/pkg/util/metric/metric_test.go b/pkg/util/metric/metric_test.go index b69c87584e2d..0bc0ab708398 100644 --- a/pkg/util/metric/metric_test.go +++ b/pkg/util/metric/metric_test.go @@ -15,6 +15,7 @@ import ( "encoding/json" "math" "reflect" + "sort" "sync" "testing" "time" @@ -281,15 +282,19 @@ func TestNewHistogramRotate(t *testing.T) { // Windowed histogram is initially empty. h.Inspect(func(interface{}) {}) // triggers ticking _, sum := h.TotalWindowed() - require.Zero(t, sum) + if i == 0 { + require.Zero(t, sum) + } else { + // When there are multiple windows, start with previously recorded values. + require.Equal(t, sum, float64(12345)) + } // But cumulative histogram has history (if i > 0). count, _ := h.Total() require.EqualValues(t, i, count) - // Add a measurement and verify it's there. { h.RecordValue(12345) - f := float64(12345) + f := float64(12345) + sum _, wSum := h.TotalWindowed() require.Equal(t, wSum, f) } @@ -298,3 +303,71 @@ func TestNewHistogramRotate(t *testing.T) { // Go to beginning. } } + +func TestHistogramWindowMerge(t *testing.T) { + u := func(v int) *uint64 { + n := uint64(v) + return &n + } + + h := NewHistogram(HistogramOptions{ + Mode: HistogramModePrometheus, + Metadata: Metadata{}, + Duration: time.Minute, + Buckets: IOLatencyBuckets, + }) + + measurements := []int64{200000000, 0, 4000000, 5000000, 10000000, 20000000, + 25000000, 30000000, 40000000, 90000000} + w := 4 + var expSum float64 + + for i := 0; i < w; i++ { + h.Inspect(func(interface{}) {}) // triggers ticking + for j, m := range measurements { + h.RecordValue(m) + if j == 0 { + require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0)) + require.Equal(t, 235983346.678219, h.ValueAtQuantileWindowed( + 99)) // value immediately greater than measurements[0] + } + expSum += float64(m) + } + // Tick. This rotates the histogram. + setNow(time.Duration(i+1) * 10 * time.Second) + // Go to beginning. + } + + act := *h.ToPrometheusMetric().Histogram + var buckets []*prometheusgo.Bucket + count := 0 + j := 0 + var expQuantileValues []float64 + sort.Slice(measurements, func(i, j int) bool { + return measurements[i] < measurements[j] + }) + for i := range IOLatencyBuckets { + if j < len(measurements) && IOLatencyBuckets[i] > float64(measurements[j]) { + count = count + 4 + j += 1 + expQuantileValues = append(expQuantileValues, IOLatencyBuckets[i]) + } + buckets = append(buckets, &prometheusgo.Bucket{CumulativeCount: u(count), + UpperBound: &IOLatencyBuckets[i]}) + } + exp := prometheusgo.Histogram{ + SampleCount: u(len(measurements) * w), + SampleSum: &expSum, + Bucket: buckets, + } + + if !reflect.DeepEqual(act, exp) { + t.Fatalf("expected differs from actual: %s", pretty.Diff(exp, act)) + } + + require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0)) + require.Equal(t, expQuantileValues[0], h.ValueAtQuantileWindowed(10)) + require.Equal(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50)) + require.Equal(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80)) + require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99)) +}