Skip to content

Commit

Permalink
metrics: fix windowed histogram merging approach
Browse files Browse the repository at this point in the history
Fixes #103814.
Fixes #98266.

This commit updates the windowed histogram merging approach
to add the previous window's histogram bucket counts and sample
count to those of the current one. As a result of this change,
the histograms will no longer report under-sampled quantile values,
and timeseries metrics-derived charts (e.g., the quantile-based
SQL service latency charts on the DB console's Metrics page) will
more accurately display metrics.

Release note (bug fix): Updated the histogram window merge calculation
to more accurately interpolate quantile values. This change will result
in smoother, more accurate Metrics charts on the DB Console.

Co-authored-by: Aaditya Sondhi <[email protected]>
  • Loading branch information
ericharmeling and aadityasondhi committed Jun 1, 2023
1 parent 24a32f2 commit eb2872c
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 19 deletions.
16 changes: 9 additions & 7 deletions pkg/util/metric/hdrhistogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,19 @@ func (h *HdrHistogram) ToPrometheusMetric() *prometheusgo.Metric {

// TotalWindowed implements the WindowedHistogram interface.
func (h *HdrHistogram) TotalWindowed() (int64, float64) {
pHist := h.ToPrometheusMetricWindowed().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
h.mu.Lock()
defer h.mu.Unlock()
hist := h.mu.sliding.Merge()
totalSum := float64(hist.TotalCount()) * hist.Mean()
return hist.TotalCount(), totalSum
}

func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
hist := &prometheusgo.Histogram{}

maybeTick(h.mu.tickHelper)
bars := h.mu.sliding.Current.Distribution()
mergedHist := h.mu.sliding.Merge()
bars := mergedHist.Distribution()
hist.Bucket = make([]*prometheusgo.Bucket, 0, len(bars))

var cumCount uint64
Expand All @@ -202,7 +206,6 @@ func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
}
hist.SampleCount = &cumCount
hist.SampleSum = &sum // can do better here; we approximate in the loop

return &prometheusgo.Metric{
Histogram: hist,
}
Expand Down Expand Up @@ -233,13 +236,12 @@ func (h *HdrHistogram) ValueAtQuantileWindowed(q float64) float64 {
func (h *HdrHistogram) Mean() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.cumulative.Mean()
}

func (h *HdrHistogram) MeanWindowed() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.sliding.Current.Mean()
hist := h.mu.sliding.Merge()
return hist.Mean()
}
64 changes: 55 additions & 9 deletions pkg/util/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,12 @@ type WindowedHistogram interface {
Total() (int64, float64)
// MeanWindowed returns the average of the samples in the current window.
MeanWindowed() float64
// Mean returns the average of the sample in teh cumulative histogram.
// Mean returns the average of the sample in the cumulative histogram.
Mean() float64
// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the windowed histogram.
// Methods implementing this interface should the merge buckets, sums,
// and counts of previous and current windows.
ValueAtQuantileWindowed(q float64) float64
}

Expand Down Expand Up @@ -368,15 +370,23 @@ func (h *Histogram) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the right type.
// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the
// right type.
func (h *Histogram) ToPrometheusMetricWindowed() *prometheusgo.Metric {
h.windowed.Lock()
defer h.windowed.Unlock()
m := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(m); err != nil {
cur := &prometheusgo.Metric{}
prev := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(cur); err != nil {
panic(err)
}
return m
if h.windowed.prev != nil {
if err := h.windowed.prev.Write(prev); err != nil {
panic(err)
}
MergeWindowedHistogram(cur.Histogram, prev.Histogram)
}
return cur
}

// GetMetadata returns the metric's metadata including the Prometheus
Expand Down Expand Up @@ -428,7 +438,8 @@ func (h *Histogram) MeanWindowed() float64 {
// 2. Since the prometheus client library ensures buckets are in a strictly
// increasing order at creation, we do not sort them.
func (h *Histogram) ValueAtQuantileWindowed(q float64) float64 {
return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, q)
return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram,
q)
}

var _ PrometheusExportable = (*ManualWindowHistogram)(nil)
Expand Down Expand Up @@ -592,26 +603,46 @@ func (mwh *ManualWindowHistogram) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the
// right type.
func (mwh *ManualWindowHistogram) ToPrometheusMetricWindowed() *prometheusgo.
Metric {
cur := &prometheusgo.Metric{}
if err := mwh.mu.cum.Write(cur); err != nil {
panic(err)
}
if mwh.mu.prev != nil {
MergeWindowedHistogram(cur.Histogram, mwh.mu.prev)
}
return cur
}

// TotalWindowed implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) TotalWindowed() (int64, float64) {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return int64(mwh.mu.cur.GetSampleCount()), mwh.mu.cur.GetSampleSum()
pHist := mwh.ToPrometheusMetricWindowed().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
}

// Total implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) Total() (int64, float64) {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
h := mwh.ToPrometheusMetric().Histogram
return int64(h.GetSampleCount()), h.GetSampleSum()
}

func (mwh *ManualWindowHistogram) MeanWindowed() float64 {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return mwh.mu.cur.GetSampleSum() / float64(mwh.mu.cur.GetSampleCount())
pHist := mwh.ToPrometheusMetricWindowed().Histogram
return pHist.GetSampleSum() / float64(pHist.GetSampleCount())
}

func (mwh *ManualWindowHistogram) Mean() float64 {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
h := mwh.ToPrometheusMetric().Histogram
return h.GetSampleSum() / float64(h.GetSampleCount())
}
Expand All @@ -626,7 +657,7 @@ func (mwh *ManualWindowHistogram) ValueAtQuantileWindowed(q float64) float64 {
if mwh.mu.cur == nil {
return 0
}
return ValueAtQuantileWindowed(mwh.mu.cur, q)
return ValueAtQuantileWindowed(mwh.ToPrometheusMetricWindowed().Histogram, q)
}

// A Counter holds a single mutable atomic value.
Expand Down Expand Up @@ -881,6 +912,21 @@ func (g *GaugeFloat64) GetMetadata() Metadata {
return baseMetadata
}

// MergeWindowedHistogram adds the bucket counts, sample count, and sample sum
// from the previous windowed histogram to those of the current windowed
// histogram.
// NB: Buckets on each histogram must be the same
func MergeWindowedHistogram(cur *prometheusgo.Histogram, prev *prometheusgo.Histogram) {
for i, bucket := range cur.Bucket {
count := *bucket.CumulativeCount + *prev.Bucket[i].CumulativeCount
*bucket.CumulativeCount = count
}
sampleCount := *cur.SampleCount + *prev.SampleCount
*cur.SampleCount = sampleCount
sampleSum := *cur.SampleSum + *prev.SampleSum
*cur.SampleSum = sampleSum
}

// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the given histogram.
func ValueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float64 {
Expand Down
79 changes: 76 additions & 3 deletions pkg/util/metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"encoding/json"
"math"
"reflect"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -281,15 +282,19 @@ func TestNewHistogramRotate(t *testing.T) {
// Windowed histogram is initially empty.
h.Inspect(func(interface{}) {}) // triggers ticking
_, sum := h.TotalWindowed()
require.Zero(t, sum)
if i == 0 {
require.Zero(t, sum)
} else {
// When there are multiple windows, start with previously recorded values.
require.Equal(t, sum, float64(12345))
}
// But cumulative histogram has history (if i > 0).
count, _ := h.Total()
require.EqualValues(t, i, count)

// Add a measurement and verify it's there.
{
h.RecordValue(12345)
f := float64(12345)
f := float64(12345) + sum
_, wSum := h.TotalWindowed()
require.Equal(t, wSum, f)
}
Expand All @@ -298,3 +303,71 @@ func TestNewHistogramRotate(t *testing.T) {
// Go to beginning.
}
}

func TestHistogramWindowMerge(t *testing.T) {
u := func(v int) *uint64 {
n := uint64(v)
return &n
}

h := NewHistogram(HistogramOptions{
Mode: HistogramModePrometheus,
Metadata: Metadata{},
Duration: time.Minute,
Buckets: IOLatencyBuckets,
})

measurements := []int64{200000000, 0, 4000000, 5000000, 10000000, 20000000,
25000000, 30000000, 40000000, 90000000}
w := 4
var expSum float64

for i := 0; i < w; i++ {
h.Inspect(func(interface{}) {}) // triggers ticking
for j, m := range measurements {
h.RecordValue(m)
if j == 0 {
require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0))
require.Equal(t, 235983346.678219, h.ValueAtQuantileWindowed(
99)) // value immediately greater than measurements[0]
}
expSum += float64(m)
}
// Tick. This rotates the histogram.
setNow(time.Duration(i+1) * 10 * time.Second)
// Go to beginning.
}

act := *h.ToPrometheusMetric().Histogram
var buckets []*prometheusgo.Bucket
count := 0
j := 0
var expQuantileValues []float64
sort.Slice(measurements, func(i, j int) bool {
return measurements[i] < measurements[j]
})
for i := range IOLatencyBuckets {
if j < len(measurements) && IOLatencyBuckets[i] > float64(measurements[j]) {
count = count + 4
j += 1
expQuantileValues = append(expQuantileValues, IOLatencyBuckets[i])
}
buckets = append(buckets, &prometheusgo.Bucket{CumulativeCount: u(count),
UpperBound: &IOLatencyBuckets[i]})
}
exp := prometheusgo.Histogram{
SampleCount: u(len(measurements) * w),
SampleSum: &expSum,
Bucket: buckets,
}

if !reflect.DeepEqual(act, exp) {
t.Fatalf("expected differs from actual: %s", pretty.Diff(exp, act))
}

require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0))
require.Equal(t, expQuantileValues[0], h.ValueAtQuantileWindowed(10))
require.Equal(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50))
require.Equal(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80))
require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99))
}

0 comments on commit eb2872c

Please sign in to comment.