Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-23.1: metrics: fix windowed histogram merging approach #104815

Merged
merged 1 commit into from
Jun 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 14 additions & 17 deletions pkg/util/metric/hdrhistogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,9 @@ import (
prometheusgo "github.com/prometheus/client_model/go"
)

const (
// HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher
// values will be recorded as this value instead.
HdrHistogramMaxLatency = 10 * time.Second

// The number of histograms to keep in rolling window.
hdrHistogramHistWrapNum = 2 // TestSampleInterval is passed to histograms during tests which don't
)
// HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher
// values will be recorded as this value instead.
const HdrHistogramMaxLatency = 10 * time.Second

// A HdrHistogram collects observed values by keeping bucketed counts. For
// convenience, internally two sets of buckets are kept: A cumulative set (i.e.
Expand Down Expand Up @@ -64,12 +59,12 @@ func NewHdrHistogram(
Metadata: metadata,
maxVal: maxVal,
}
wHist := hdrhistogram.NewWindowed(hdrHistogramHistWrapNum, 0, maxVal, sigFigs)
wHist := hdrhistogram.NewWindowed(WindowedHistogramWrapNum, 0, maxVal, sigFigs)
h.mu.cumulative = hdrhistogram.New(0, maxVal, sigFigs)
h.mu.sliding = wHist
h.mu.tickHelper = &tickHelper{
nextT: now(),
tickInterval: duration / hdrHistogramHistWrapNum,
tickInterval: duration / WindowedHistogramWrapNum,
onTick: func() {
wHist.Rotate()
},
Expand Down Expand Up @@ -171,15 +166,19 @@ func (h *HdrHistogram) ToPrometheusMetric() *prometheusgo.Metric {

// TotalWindowed implements the WindowedHistogram interface.
func (h *HdrHistogram) TotalWindowed() (int64, float64) {
pHist := h.ToPrometheusMetricWindowed().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
h.mu.Lock()
defer h.mu.Unlock()
hist := h.mu.sliding.Merge()
totalSum := float64(hist.TotalCount()) * hist.Mean()
return hist.TotalCount(), totalSum
}

func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
hist := &prometheusgo.Histogram{}

maybeTick(h.mu.tickHelper)
bars := h.mu.sliding.Current.Distribution()
mergedHist := h.mu.sliding.Merge()
bars := mergedHist.Distribution()
hist.Bucket = make([]*prometheusgo.Bucket, 0, len(bars))

var cumCount uint64
Expand All @@ -202,7 +201,6 @@ func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric {
}
hist.SampleCount = &cumCount
hist.SampleSum = &sum // can do better here; we approximate in the loop

return &prometheusgo.Metric{
Histogram: hist,
}
Expand Down Expand Up @@ -233,13 +231,12 @@ func (h *HdrHistogram) ValueAtQuantileWindowed(q float64) float64 {
func (h *HdrHistogram) Mean() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.cumulative.Mean()
}

func (h *HdrHistogram) MeanWindowed() float64 {
h.mu.Lock()
defer h.mu.Unlock()

return h.mu.sliding.Current.Mean()
hist := h.mu.sliding.Merge()
return hist.Mean()
}
101 changes: 75 additions & 26 deletions pkg/util/metric/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@ import (
"github.com/rcrowley/go-metrics"
)

// TestSampleInterval is passed to histograms during tests which don't
// want to concern themselves with supplying a "correct" interval.
const TestSampleInterval = time.Duration(math.MaxInt64)
const (
// TestSampleInterval is passed to histograms during tests which don't
// want to concern themselves with supplying a "correct" interval.
TestSampleInterval = time.Duration(math.MaxInt64)
// WindowedHistogramWrapNum is the number of histograms to keep in rolling
// window.
WindowedHistogramWrapNum = 2
)

// Iterable provides a method for synchronized access to interior objects.
type Iterable interface {
Expand Down Expand Up @@ -97,10 +102,12 @@ type WindowedHistogram interface {
Total() (int64, float64)
// MeanWindowed returns the average of the samples in the current window.
MeanWindowed() float64
// Mean returns the average of the sample in teh cumulative histogram.
// Mean returns the average of the sample in the cumulative histogram.
Mean() float64
// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the windowed histogram.
// Methods implementing this interface should the merge buckets, sums,
// and counts of previous and current windows.
ValueAtQuantileWindowed(q float64) float64
}

Expand Down Expand Up @@ -224,7 +231,10 @@ const (
type HistogramOptions struct {
// Metadata is the metric Metadata associated with the histogram.
Metadata Metadata
// Duration is the histogram's window duration.
// Duration is the total duration of all windows in the histogram.
// The individual window duration is equal to the
// Duration/WindowedHistogramWrapNum (i.e., the number of windows
// in the histogram).
Duration time.Duration
// MaxVal is only relevant to the HdrHistogram, and represents the
// highest trackable value in the resulting histogram buckets.
Expand Down Expand Up @@ -256,7 +266,7 @@ func NewHistogram(opt HistogramOptions) IHistogram {
// NewHistogram is a prometheus-backed histogram. Depending on the value of
// opts.Buckets, this is suitable for recording any kind of quantity. Common
// sensible choices are {IO,Network}LatencyBuckets.
func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64) *Histogram {
func newHistogram(meta Metadata, duration time.Duration, buckets []float64) *Histogram {
// TODO(obs-inf): prometheus supports labeled histograms but they require more
// plumbing and don't fit into the PrometheusObservable interface any more.
opts := prometheus.HistogramOpts{
Expand All @@ -268,8 +278,11 @@ func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64
cum: cum,
}
h.windowed.tickHelper = &tickHelper{
nextT: now(),
tickInterval: windowDuration,
nextT: now(),
// We want to divide the total window duration by the number of windows
// because we need to rotate the windows at uniformly distributed
// intervals within a histogram's total duration.
tickInterval: duration / WindowedHistogramWrapNum,
onTick: func() {
h.windowed.prev = h.windowed.cur
h.windowed.cur = prometheus.NewHistogram(opts)
Expand All @@ -293,16 +306,13 @@ type Histogram struct {
Metadata
cum prometheus.Histogram

// TODO(obs-inf): the way we implement windowed histograms is not great. If
// the windowed histogram is pulled right after a tick, it will be mostly
// empty. We could add a third bucket and represent the merged view of the two
// most recent buckets to avoid that. Or we could "just" double the rotation
// interval (so that the histogram really collects for 20s when we expect to
// persist the contents every 10s). Really it would make more sense to
// explicitly rotate the histogram atomically with collecting its contents,
// but that is now how we have set it up right now. It should be doable
// though, since there is only one consumer of windowed histograms - our
// internal timeseries system.
// TODO(obs-inf): the way we implement windowed histograms is not great.
// We could "just" double the rotation interval (so that the histogram really
// collects for 20s when we expect to persist the contents every 10s).
// Really it would make more sense to explicitly rotate the histogram
// atomically with collecting its contents, but that is now how we have set
// it up right now. It should be doable though, since there is only one
// consumer of windowed histograms - our internal timeseries system.
windowed struct {
// prometheus.Histogram is thread safe, so we only
// need an RLock to record into it. But write lock
Expand Down Expand Up @@ -368,15 +378,23 @@ func (h *Histogram) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the right type.
// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the
// right type.
func (h *Histogram) ToPrometheusMetricWindowed() *prometheusgo.Metric {
h.windowed.Lock()
defer h.windowed.Unlock()
m := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(m); err != nil {
cur := &prometheusgo.Metric{}
prev := &prometheusgo.Metric{}
if err := h.windowed.cur.Write(cur); err != nil {
panic(err)
}
return m
if h.windowed.prev != nil {
if err := h.windowed.prev.Write(prev); err != nil {
panic(err)
}
MergeWindowedHistogram(cur.Histogram, prev.Histogram)
}
return cur
}

// GetMetadata returns the metric's metadata including the Prometheus
Expand Down Expand Up @@ -428,7 +446,8 @@ func (h *Histogram) MeanWindowed() float64 {
// 2. Since the prometheus client library ensures buckets are in a strictly
// increasing order at creation, we do not sort them.
func (h *Histogram) ValueAtQuantileWindowed(q float64) float64 {
return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, q)
return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram,
q)
}

var _ PrometheusExportable = (*ManualWindowHistogram)(nil)
Expand Down Expand Up @@ -592,11 +611,25 @@ func (mwh *ManualWindowHistogram) ToPrometheusMetric() *prometheusgo.Metric {
return m
}

// ToPrometheusMetricWindowedLocked returns a filled-in prometheus metric of the
// right type.
func (mwh *ManualWindowHistogram) ToPrometheusMetricWindowedLocked() *prometheusgo.Metric {
cur := &prometheusgo.Metric{}
if err := mwh.mu.cum.Write(cur); err != nil {
panic(err)
}
if mwh.mu.prev != nil {
MergeWindowedHistogram(cur.Histogram, mwh.mu.prev)
}
return cur
}

// TotalWindowed implements the WindowedHistogram interface.
func (mwh *ManualWindowHistogram) TotalWindowed() (int64, float64) {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return int64(mwh.mu.cur.GetSampleCount()), mwh.mu.cur.GetSampleSum()
pHist := mwh.ToPrometheusMetricWindowedLocked().Histogram
return int64(pHist.GetSampleCount()), pHist.GetSampleSum()
}

// Total implements the WindowedHistogram interface.
Expand All @@ -608,7 +641,8 @@ func (mwh *ManualWindowHistogram) Total() (int64, float64) {
func (mwh *ManualWindowHistogram) MeanWindowed() float64 {
mwh.mu.RLock()
defer mwh.mu.RUnlock()
return mwh.mu.cur.GetSampleSum() / float64(mwh.mu.cur.GetSampleCount())
pHist := mwh.ToPrometheusMetricWindowedLocked().Histogram
return pHist.GetSampleSum() / float64(pHist.GetSampleCount())
}

func (mwh *ManualWindowHistogram) Mean() float64 {
Expand All @@ -626,7 +660,7 @@ func (mwh *ManualWindowHistogram) ValueAtQuantileWindowed(q float64) float64 {
if mwh.mu.cur == nil {
return 0
}
return ValueAtQuantileWindowed(mwh.mu.cur, q)
return ValueAtQuantileWindowed(mwh.ToPrometheusMetricWindowedLocked().Histogram, q)
}

// A Counter holds a single mutable atomic value.
Expand Down Expand Up @@ -881,6 +915,21 @@ func (g *GaugeFloat64) GetMetadata() Metadata {
return baseMetadata
}

// MergeWindowedHistogram adds the bucket counts, sample count, and sample sum
// from the previous windowed histogram to those of the current windowed
// histogram.
// NB: Buckets on each histogram must be the same
func MergeWindowedHistogram(cur *prometheusgo.Histogram, prev *prometheusgo.Histogram) {
for i, bucket := range cur.Bucket {
count := *bucket.CumulativeCount + *prev.Bucket[i].CumulativeCount
*bucket.CumulativeCount = count
}
sampleCount := *cur.SampleCount + *prev.SampleCount
*cur.SampleCount = sampleCount
sampleSum := *cur.SampleSum + *prev.SampleSum
*cur.SampleSum = sampleSum
}

// ValueAtQuantileWindowed takes a quantile value [0,100] and returns the
// interpolated value at that quantile for the given histogram.
func ValueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float64 {
Expand Down
Loading