Skip to content

Commit

Permalink
Merge pull request #104815 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-23.1-104088

release-23.1: metrics: fix windowed histogram merging approach
  • Loading branch information
ericharmeling authored Jun 14, 2023
2 parents e19e43d + cecb53b commit 89d3f29
Show file tree
Hide file tree
Showing 3 changed files with 238 additions and 45 deletions.
31 changes: 14 additions & 17 deletions pkg/util/metric/hdrhistogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@ import (
prometheusgo "github.com/prometheus/client_model/go"
)

const (
// HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher
// values will be recorded as this value instead.
HdrHistogramMaxLatency = 10 * time.Second

// The number of histograms to keep in rolling window.
hdrHistogramHistWrapNum = 2 // TestSampleInterval is passed to histograms during tests which don't
)
// HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher
// values will be recorded as this value instead.
const HdrHistogramMaxLatency = 10 * time.Second

// A HdrHistogram collects observed values by keeping bucketed counts. For
// convenience, internally two sets of buckets are kept: A cumulative set (i.e.
Expand Down Expand Up @@ -64,12 +59,12 @@ func NewHdrHistogram(
Metadata: metadata,
maxVal: maxVal,
}
wHist := hdrhistogram.NewWindowed(hdrHistogramHistWrapNum, 0, maxVal, sigFigs)
wHist := hdrhistogram.NewWindowed(WindowedHistogramWrapNum, 0, maxVal, sigFigs)
h.mu.cumulative = hdrhistogram.New(0, maxVal, sigFigs)
h.mu.sliding = wHist
h.mu.tickHelper = &tickHelper{
nextT: now(),
tickInterval: duration / hdrHistogramHistWrapNum,
tickInterval: duration / WindowedHistogramWrapNum,
onTick: func() {
wHist.Rotate()
},
Expand Down Expand Up @@ -171,15 +166,19 @@ func (h *HdrHistogram) ToPrometheusMetric() *prometheusgo.Metric {

// TotalWindowed implements the WindowedHistogram interface.
func (h *HdrHistogram) TotalWindowed() (int64, float64) {
pHist := h.ToPrometheusMetricWindowed().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
h.mu.Lock()
defer h.mu.Unlock()
hist := h.mu.sliding.Merge()
totalSum := float64(hist.TotalCount()) * hist.Mean()
return hist.TotalCount(), totalSum
}

func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
hist := &prometheusgo.Histogram{}

maybeTick(h.mu.tickHelper)
bars := h.mu.sliding.Current.Distribution()
mergedHist := h.mu.sliding.Merge()
bars := mergedHist.Distribution()
hist.Bucket = make([]*prometheusgo.Bucket, 0, len(bars))

var cumCount uint64
Expand All @@ -202,7 +201,6 @@ func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
}
hist.SampleCount = &cumCount
hist.SampleSum = &sum // can do better here; we approximate in the loop

return &prometheusgo.Metric{
Histogram: hist,
}
Expand Down Expand Up @@ -233,13 +231,12 @@ func (h *HdrHistogram) ValueAtQuantileWindowed(q float64) float64 {
func (h *HdrHistogram) Mean() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.cumulative.Mean()
}

func (h *HdrHistogram) MeanWindowed() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.sliding.Current.Mean()
hist := h.mu.sliding.Merge()
return hist.Mean()
}
101 changes: 75 additions & 26 deletions pkg/util/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ import (
"github.com/rcrowley/go-metrics"
)

// TestSampleInterval is passed to histograms during tests which don't
// want to concern themselves with supplying a "correct" interval.
const TestSampleInterval = time.Duration(math.MaxInt64)
const (
// TestSampleInterval is passed to histograms during tests which don't
// want to concern themselves with supplying a "correct" interval.
TestSampleInterval = time.Duration(math.MaxInt64)
// WindowedHistogramWrapNum is the number of histograms to keep in rolling
// window.
WindowedHistogramWrapNum = 2
)

// Iterable provides a method for synchronized access to interior objects.
type Iterable interface {
Expand Down Expand Up @@ -97,10 +102,12 @@ type WindowedHistogram interface {
Total() (int64, float64)
// MeanWindowed returns the average of the samples in the current window.
MeanWindowed() float64
// Mean returns the average of the sample in teh cumulative histogram.
// Mean returns the average of the sample in the cumulative histogram.
Mean() float64
// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the windowed histogram.
// Methods implementing this interface should the merge buckets, sums,
// and counts of previous and current windows.
ValueAtQuantileWindowed(q float64) float64
}

Expand Down Expand Up @@ -224,7 +231,10 @@ const (
type HistogramOptions struct {
// Metadata is the metric Metadata associated with the histogram.
Metadata Metadata
// Duration is the histogram's window duration.
// Duration is the total duration of all windows in the histogram.
// The individual window duration is equal to the
// Duration/WindowedHistogramWrapNum (i.e., the number of windows
// in the histogram).
Duration time.Duration
// MaxVal is only relevant to the HdrHistogram, and represents the
// highest trackable value in the resulting histogram buckets.
Expand Down Expand Up @@ -256,7 +266,7 @@ func NewHistogram(opt HistogramOptions) IHistogram {
// NewHistogram is a prometheus-backed histogram. Depending on the value of
// opts.Buckets, this is suitable for recording any kind of quantity. Common
// sensible choices are {IO,Network}LatencyBuckets.
func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64) *Histogram {
func newHistogram(meta Metadata, duration time.Duration, buckets []float64) *Histogram {
// TODO(obs-inf): prometheus supports labeled histograms but they require more
// plumbing and don't fit into the PrometheusObservable interface any more.
opts := prometheus.HistogramOpts{
Expand All @@ -268,8 +278,11 @@ func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64
cum: cum,
}
h.windowed.tickHelper = &tickHelper{
nextT: now(),
tickInterval: windowDuration,
nextT: now(),
// We want to divide the total window duration by the number of windows
// because we need to rotate the windows at uniformly distributed
// intervals within a histogram's total duration.
tickInterval: duration / WindowedHistogramWrapNum,
onTick: func() {
h.windowed.prev = h.windowed.cur
h.windowed.cur = prometheus.NewHistogram(opts)
Expand All @@ -293,16 +306,13 @@ type Histogram struct {
Metadata
cum prometheus.Histogram

// TODO(obs-inf): the way we implement windowed histograms is not great. If
// the windowed histogram is pulled right after a tick, it will be mostly
// empty. We could add a third bucket and represent the merged view of the two
// most recent buckets to avoid that. Or we could "just" double the rotation
// interval (so that the histogram really collects for 20s when we expect to
// persist the contents every 10s). Really it would make more sense to
// explicitly rotate the histogram atomically with collecting its contents,
// but that is now how we have set it up right now. It should be doable
// though, since there is only one consumer of windowed histograms - our
// internal timeseries system.
// TODO(obs-inf): the way we implement windowed histograms is not great.
// We could "just" double the rotation interval (so that the histogram really
// collects for 20s when we expect to persist the contents every 10s).
// Really it would make more sense to explicitly rotate the histogram
// atomically with collecting its contents, but that is now how we have set
// it up right now. It should be doable though, since there is only one
// consumer of windowed histograms - our internal timeseries system.
windowed struct {
// prometheus.Histogram is thread safe, so we only
// need an RLock to record into it. But write lock
Expand Down Expand Up @@ -368,15 +378,23 @@ func (h *Histogram) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the right type.
// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the
// right type.
func (h *Histogram) ToPrometheusMetricWindowed() *prometheusgo.Metric {
h.windowed.Lock()
defer h.windowed.Unlock()
m := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(m); err != nil {
cur := &prometheusgo.Metric{}
prev := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(cur); err != nil {
panic(err)
}
return m
if h.windowed.prev != nil {
if err := h.windowed.prev.Write(prev); err != nil {
panic(err)
}
MergeWindowedHistogram(cur.Histogram, prev.Histogram)
}
return cur
}

// GetMetadata returns the metric's metadata including the Prometheus
Expand Down Expand Up @@ -428,7 +446,8 @@ func (h *Histogram) MeanWindowed() float64 {
// 2. Since the prometheus client library ensures buckets are in a strictly
// increasing order at creation, we do not sort them.
func (h *Histogram) ValueAtQuantileWindowed(q float64) float64 {
return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, q)
return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram,
q)
}

var _ PrometheusExportable = (*ManualWindowHistogram)(nil)
Expand Down Expand Up @@ -592,11 +611,25 @@ func (mwh *ManualWindowHistogram) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// ToPrometheusMetricWindowedLocked returns a filled-in prometheus metric of the
// right type.
func (mwh *ManualWindowHistogram) ToPrometheusMetricWindowedLocked() *prometheusgo.Metric {
cur := &prometheusgo.Metric{}
if err := mwh.mu.cum.Write(cur); err != nil {
panic(err)
}
if mwh.mu.prev != nil {
MergeWindowedHistogram(cur.Histogram, mwh.mu.prev)
}
return cur
}

// TotalWindowed implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) TotalWindowed() (int64, float64) {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return int64(mwh.mu.cur.GetSampleCount()), mwh.mu.cur.GetSampleSum()
pHist := mwh.ToPrometheusMetricWindowedLocked().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
}

// Total implements the WindowedHistogram interface.
Expand All @@ -608,7 +641,8 @@ func (mwh *ManualWindowHistogram) Total() (int64, float64) {
func (mwh *ManualWindowHistogram) MeanWindowed() float64 {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return mwh.mu.cur.GetSampleSum() / float64(mwh.mu.cur.GetSampleCount())
pHist := mwh.ToPrometheusMetricWindowedLocked().Histogram
return pHist.GetSampleSum() / float64(pHist.GetSampleCount())
}

func (mwh *ManualWindowHistogram) Mean() float64 {
Expand All @@ -626,7 +660,7 @@ func (mwh *ManualWindowHistogram) ValueAtQuantileWindowed(q float64) float64 {
if mwh.mu.cur == nil {
return 0
}
return ValueAtQuantileWindowed(mwh.mu.cur, q)
return ValueAtQuantileWindowed(mwh.ToPrometheusMetricWindowedLocked().Histogram, q)
}

// A Counter holds a single mutable atomic value.
Expand Down Expand Up @@ -881,6 +915,21 @@ func (g *GaugeFloat64) GetMetadata() Metadata {
return baseMetadata
}

// MergeWindowedHistogram adds the bucket counts, sample count, and sample sum
// from the previous windowed histogram to those of the current windowed
// histogram.
// NB: Buckets on each histogram must be the same
func MergeWindowedHistogram(cur *prometheusgo.Histogram, prev *prometheusgo.Histogram) {
for i, bucket := range cur.Bucket {
count := *bucket.CumulativeCount + *prev.Bucket[i].CumulativeCount
*bucket.CumulativeCount = count
}
sampleCount := *cur.SampleCount + *prev.SampleCount
*cur.SampleCount = sampleCount
sampleSum := *cur.SampleSum + *prev.SampleSum
*cur.SampleSum = sampleSum
}

// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the given histogram.
func ValueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float64 {
Expand Down
Loading

0 comments on commit 89d3f29

Please sign in to comment.