Skip to content

Commit

Permalink
metrics: fix windowed histogram merging approach
Browse files Browse the repository at this point in the history
Fixes #103814.
Fixes #98266.

This commit updates the windowed histogram merging approach
to add the previous window's histogram bucket counts and sample
count to those of the current one. As a result of this change,
the histograms will no longer report under-sampled quantile values,
and timeseries metrics-derived charts (e.g., the quantile-based
SQL service latency charts on the DB console's Metrics page) will
more accurately display metrics.

Release note (bug fix): Updated the histogram window merge calculation
to more accurately interpolate quantile values. This change will result
in smoother, more accurate Metrics charts on the DB Console.

Co-authored-by: Aaditya Sondhi <[email protected]>
  • Loading branch information
ericharmeling and aadityasondhi committed Jun 5, 2023
1 parent 24a32f2 commit 13beab0
Show file tree
Hide file tree
Showing 3 changed files with 182 additions and 33 deletions.
31 changes: 14 additions & 17 deletions pkg/util/metric/hdrhistogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@ import (
prometheusgo "github.com/prometheus/client_model/go"
)

const (
// HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher
// values will be recorded as this value instead.
HdrHistogramMaxLatency = 10 * time.Second

// The number of histograms to keep in rolling window.
hdrHistogramHistWrapNum = 2 // TestSampleInterval is passed to histograms during tests which don't
)
// HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher
// values will be recorded as this value instead.
const HdrHistogramMaxLatency = 10 * time.Second

// A HdrHistogram collects observed values by keeping bucketed counts. For
// convenience, internally two sets of buckets are kept: A cumulative set (i.e.
Expand Down Expand Up @@ -64,12 +59,12 @@ func NewHdrHistogram(
Metadata: metadata,
maxVal: maxVal,
}
wHist := hdrhistogram.NewWindowed(hdrHistogramHistWrapNum, 0, maxVal, sigFigs)
wHist := hdrhistogram.NewWindowed(WindowedHistogramWrapNum, 0, maxVal, sigFigs)
h.mu.cumulative = hdrhistogram.New(0, maxVal, sigFigs)
h.mu.sliding = wHist
h.mu.tickHelper = &tickHelper{
nextT: now(),
tickInterval: duration / hdrHistogramHistWrapNum,
tickInterval: duration / WindowedHistogramWrapNum,
onTick: func() {
wHist.Rotate()
},
Expand Down Expand Up @@ -171,15 +166,19 @@ func (h *HdrHistogram) ToPrometheusMetric() *prometheusgo.Metric {

// TotalWindowed implements the WindowedHistogram interface.
func (h *HdrHistogram) TotalWindowed() (int64, float64) {
pHist := h.ToPrometheusMetricWindowed().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
h.mu.Lock()
defer h.mu.Unlock()
hist := h.mu.sliding.Merge()
totalSum := float64(hist.TotalCount()) * hist.Mean()
return hist.TotalCount(), totalSum
}

func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
hist := &prometheusgo.Histogram{}

maybeTick(h.mu.tickHelper)
bars := h.mu.sliding.Current.Distribution()
mergedHist := h.mu.sliding.Merge()
bars := mergedHist.Distribution()
hist.Bucket = make([]*prometheusgo.Bucket, 0, len(bars))

var cumCount uint64
Expand All @@ -202,7 +201,6 @@ func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
}
hist.SampleCount = &cumCount
hist.SampleSum = &sum // can do better here; we approximate in the loop

return &prometheusgo.Metric{
Histogram: hist,
}
Expand Down Expand Up @@ -233,13 +231,12 @@ func (h *HdrHistogram) ValueAtQuantileWindowed(q float64) float64 {
func (h *HdrHistogram) Mean() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.cumulative.Mean()
}

func (h *HdrHistogram) MeanWindowed() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.sliding.Current.Mean()
hist := h.mu.sliding.Merge()
return hist.Mean()
}
82 changes: 68 additions & 14 deletions pkg/util/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ import (
"github.com/rcrowley/go-metrics"
)

// TestSampleInterval is passed to histograms during tests which don't
// want to concern themselves with supplying a "correct" interval.
const TestSampleInterval = time.Duration(math.MaxInt64)
const (
// TestSampleInterval is passed to histograms during tests which don't
// want to concern themselves with supplying a "correct" interval.
TestSampleInterval = time.Duration(math.MaxInt64)
// WindowedHistogramWrapNum is the number of histograms to keep in rolling
// window.
WindowedHistogramWrapNum = 2
)

// Iterable provides a method for synchronized access to interior objects.
type Iterable interface {
Expand Down Expand Up @@ -97,10 +102,12 @@ type WindowedHistogram interface {
Total() (int64, float64)
// MeanWindowed returns the average of the samples in the current window.
MeanWindowed() float64
// Mean returns the average of the sample in teh cumulative histogram.
// Mean returns the average of the sample in the cumulative histogram.
Mean() float64
// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the windowed histogram.
// Methods implementing this interface should the merge buckets, sums,
// and counts of previous and current windows.
ValueAtQuantileWindowed(q float64) float64
}

Expand Down Expand Up @@ -224,7 +231,10 @@ const (
type HistogramOptions struct {
// Metadata is the metric Metadata associated with the histogram.
Metadata Metadata
// Duration is the histogram's window duration.
// Duration is the total duration of all windows in the histogram.
// The individual window duration is equal to the
// Duration/WindowedHistogramWrapNum (i.e., the number of windows
// in the histogram).
Duration time.Duration
// MaxVal is only relevant to the HdrHistogram, and represents the
// highest trackable value in the resulting histogram buckets.
Expand Down Expand Up @@ -269,7 +279,7 @@ func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64
}
h.windowed.tickHelper = &tickHelper{
nextT: now(),
tickInterval: windowDuration,
tickInterval: windowDuration / WindowedHistogramWrapNum,
onTick: func() {
h.windowed.prev = h.windowed.cur
h.windowed.cur = prometheus.NewHistogram(opts)
Expand Down Expand Up @@ -368,15 +378,23 @@ func (h *Histogram) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the right type.
// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the
// right type.
func (h *Histogram) ToPrometheusMetricWindowed() *prometheusgo.Metric {
h.windowed.Lock()
defer h.windowed.Unlock()
m := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(m); err != nil {
cur := &prometheusgo.Metric{}
prev := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(cur); err != nil {
panic(err)
}
return m
if h.windowed.prev != nil {
if err := h.windowed.prev.Write(prev); err != nil {
panic(err)
}
MergeWindowedHistogram(cur.Histogram, prev.Histogram)
}
return cur
}

// GetMetadata returns the metric's metadata including the Prometheus
Expand Down Expand Up @@ -428,7 +446,8 @@ func (h *Histogram) MeanWindowed() float64 {
// 2. Since the prometheus client library ensures buckets are in a strictly
// increasing order at creation, we do not sort them.
func (h *Histogram) ValueAtQuantileWindowed(q float64) float64 {
return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, q)
return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram,
q)
}

var _ PrometheusExportable = (*ManualWindowHistogram)(nil)
Expand Down Expand Up @@ -592,26 +611,46 @@ func (mwh *ManualWindowHistogram) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the
// right type.
func (mwh *ManualWindowHistogram) ToPrometheusMetricWindowed() *prometheusgo.
Metric {
cur := &prometheusgo.Metric{}
if err := mwh.mu.cum.Write(cur); err != nil {
panic(err)
}
if mwh.mu.prev != nil {
MergeWindowedHistogram(cur.Histogram, mwh.mu.prev)
}
return cur
}

// TotalWindowed implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) TotalWindowed() (int64, float64) {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return int64(mwh.mu.cur.GetSampleCount()), mwh.mu.cur.GetSampleSum()
pHist := mwh.ToPrometheusMetricWindowed().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
}

// Total implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) Total() (int64, float64) {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
h := mwh.ToPrometheusMetric().Histogram
return int64(h.GetSampleCount()), h.GetSampleSum()
}

func (mwh *ManualWindowHistogram) MeanWindowed() float64 {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return mwh.mu.cur.GetSampleSum() / float64(mwh.mu.cur.GetSampleCount())
pHist := mwh.ToPrometheusMetricWindowed().Histogram
return pHist.GetSampleSum() / float64(pHist.GetSampleCount())
}

func (mwh *ManualWindowHistogram) Mean() float64 {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
h := mwh.ToPrometheusMetric().Histogram
return h.GetSampleSum() / float64(h.GetSampleCount())
}
Expand All @@ -626,7 +665,7 @@ func (mwh *ManualWindowHistogram) ValueAtQuantileWindowed(q float64) float64 {
if mwh.mu.cur == nil {
return 0
}
return ValueAtQuantileWindowed(mwh.mu.cur, q)
return ValueAtQuantileWindowed(mwh.ToPrometheusMetricWindowed().Histogram, q)
}

// A Counter holds a single mutable atomic value.
Expand Down Expand Up @@ -881,6 +920,21 @@ func (g *GaugeFloat64) GetMetadata() Metadata {
return baseMetadata
}

// MergeWindowedHistogram adds the bucket counts, sample count, and sample sum
// from the previous windowed histogram to those of the current windowed
// histogram.
// NB: Buckets on each histogram must be the same
func MergeWindowedHistogram(cur *prometheusgo.Histogram, prev *prometheusgo.Histogram) {
for i, bucket := range cur.Bucket {
count := *bucket.CumulativeCount + *prev.Bucket[i].CumulativeCount
*bucket.CumulativeCount = count
}
sampleCount := *cur.SampleCount + *prev.SampleCount
*cur.SampleCount = sampleCount
sampleSum := *cur.SampleSum + *prev.SampleSum
*cur.SampleSum = sampleSum
}

// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the given histogram.
func ValueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float64 {
Expand Down
102 changes: 100 additions & 2 deletions pkg/util/metric/metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"encoding/json"
"math"
"reflect"
"sort"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -285,11 +286,10 @@ func TestNewHistogramRotate(t *testing.T) {
// But cumulative histogram has history (if i > 0).
count, _ := h.Total()
require.EqualValues(t, i, count)

// Add a measurement and verify it's there.
{
h.RecordValue(12345)
f := float64(12345)
f := float64(12345) + sum
_, wSum := h.TotalWindowed()
require.Equal(t, wSum, f)
}
Expand All @@ -298,3 +298,101 @@ func TestNewHistogramRotate(t *testing.T) {
// Go to beginning.
}
}

func TestHistogramWindowed(t *testing.T) {
defer TestingSetNow(nil)()
setNow(0)

duration := 10 * time.Second

h := NewHistogram(HistogramOptions{
Mode: HistogramModePrometheus,
Metadata: Metadata{},
Duration: duration,
Buckets: IOLatencyBuckets,
})

measurements := []int64{200000000, 0, 4000000, 5000000, 10000000, 20000000,
25000000, 30000000, 40000000, 90000000}

// Sort the measurements so we can calculate the expected quantile values
// for the first windowed histogram after the measurements have been recorded.
sortedMeasurements := make([]int64, len(measurements))
copy(sortedMeasurements, measurements)
sort.Slice(sortedMeasurements, func(i, j int) bool {
return sortedMeasurements[i] < sortedMeasurements[j]
})

// Calculate the expected quantile values as the lowest bucket values that are
// greater than each measurement.
count := 0
j := 0
var expQuantileValues []float64
for i := range IOLatencyBuckets {
if j < len(sortedMeasurements) && IOLatencyBuckets[i] > float64(
sortedMeasurements[j]) {
count += 1
j += 1
expQuantileValues = append(expQuantileValues, IOLatencyBuckets[i])
}
}

w := 2
var expHist []prometheusgo.Histogram
var expSum float64
var expCount uint64
for i := 0; i < w; i++ {
h.Inspect(func(interface{}) {}) // trigger ticking
if i == 0 {
// If there is no previous window, we should be unable to calculate mean
// or quantile without any observations.
require.Equal(t, 0.0, h.ValueAtQuantileWindowed(99.99))
if !math.IsNaN(h.MeanWindowed()) {
t.Fatalf("mean should be undefined with no observations")
}
// Record all measurements on first iteration.
for _, m := range measurements {
h.RecordValue(m)
expCount += 1
expSum += float64(m)
}
// Because we have 10 observations, we expect quantiles to correspond
// to observation indices (e.g., the 8th expected quantile value is equal
// to the value interpolated at the 80th percentile).
require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0))
require.Equal(t, expQuantileValues[0], h.ValueAtQuantileWindowed(10))
require.Equal(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50))
require.Equal(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80))
require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99))
} else {
// The SampleSum and SampleCount values in the current window before any
// observations should be equal to those of the previous window, after all
// observations (the quantile values will also be the same).
expSum = *expHist[i-1].SampleSum
expCount = *expHist[i-1].SampleCount

// After recording a few higher-value observations in the second window,
// the quantile values will shift in the direction of the observations.
for _, m := range sortedMeasurements[len(sortedMeasurements)-3 : len(
sortedMeasurements)-1] {
h.RecordValue(m)
expCount += 1
expSum += float64(m)
}
require.Less(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50))
require.Less(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80))
require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99))
}

// In all cases, the windowed mean should be equal to the expected sum/count
require.Equal(t, expSum/float64(expCount), h.MeanWindowed())

expHist = append(expHist, prometheusgo.Histogram{
SampleCount: &expCount,
SampleSum: &expSum,
})

// Increment Now time to trigger tick on the following iteration.
setNow(time.Duration(i+1) * (duration / 2))
}
}

0 comments on commit 13beab0

Please sign in to comment.