From f96b21fecc9fc2f60cf861416270fb02d21e6d9d Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Fri, 27 Jan 2023 13:11:30 -0500 Subject: [PATCH] metrics: introduce CounterFloat64 and use for tenant RUs Previously, we were prevented from using float64s directly in metric counters since counters were limited to ints. This led to the use of Gauges in situations where Counters would be preferable since we didn't have code to help manage a monotonically increasing float64 value. This commit introduces some helpers around atomically adding float64s together and increasing one monotonically. Those primitives are composed into a `CounterFloat64` type further used to construct `AggCounterFloat64` which can be used in place of `AggCounter` to track per-tenant metrics. The two `GaugeFloat64` types used for tenant `totalRU` and `totalKVRU` metrics are replaced with the new `CounterFloat64` type to properly reflect the fact that these are monotonically increasing values. This helps Prometheus when scraping these metrics to correctly account for missing data if necessary. Resolves #68291 Epic: CRDB0-14536 Release note: None --- .../tenantcostserver/metrics.go | 12 +- .../tenantcostserver/token_bucket.go | 4 +- pkg/util/metric/aggmetric/agg_metric.go | 5 + pkg/util/metric/aggmetric/agg_metric_test.go | 36 +++++- pkg/util/metric/aggmetric/counter.go | 115 ++++++++++++++++++ .../aggmetric/testdata/add_after_destroy.txt | 3 + pkg/util/metric/aggmetric/testdata/basic.txt | 3 + .../metric/aggmetric/testdata/destroy.txt | 2 + pkg/util/metric/metric.go | 57 +++++++++ pkg/util/metric/metric_test.go | 28 +++++ pkg/util/syncutil/atomic.go | 26 ++++ pkg/util/syncutil/atomic_test.go | 59 ++++++++- 12 files changed, 337 insertions(+), 13 deletions(-) diff --git a/pkg/ccl/multitenantccl/tenantcostserver/metrics.go b/pkg/ccl/multitenantccl/tenantcostserver/metrics.go index f0c28a45be84..edddd38986ea 100644 --- a/pkg/ccl/multitenantccl/tenantcostserver/metrics.go +++ b/pkg/ccl/multitenantccl/tenantcostserver/metrics.go @@ -25,8 +25,8 @@ import ( // aggregated value for a metric is not useful (it sums up the consumption for // each tenant, as last reported to this node). type Metrics struct { - TotalRU *aggmetric.AggGaugeFloat64 - TotalKVRU *aggmetric.AggGaugeFloat64 + TotalRU *aggmetric.AggCounterFloat64 + TotalKVRU *aggmetric.AggCounterFloat64 TotalReadBatches *aggmetric.AggGauge TotalReadRequests *aggmetric.AggGauge TotalReadBytes *aggmetric.AggGauge @@ -130,8 +130,8 @@ var ( func (m *Metrics) init() { b := aggmetric.MakeBuilder(multitenant.TenantIDLabel) *m = Metrics{ - TotalRU: b.GaugeFloat64(metaTotalRU), - TotalKVRU: b.GaugeFloat64(metaTotalKVRU), + TotalRU: b.CounterFloat64(metaTotalRU), + TotalKVRU: b.CounterFloat64(metaTotalKVRU), TotalReadBatches: b.Gauge(metaTotalReadBatches), TotalReadRequests: b.Gauge(metaTotalReadRequests), TotalReadBytes: b.Gauge(metaTotalReadBytes), @@ -148,8 +148,8 @@ func (m *Metrics) init() { // tenantMetrics represent metrics for an individual tenant. type tenantMetrics struct { - totalRU *aggmetric.GaugeFloat64 - totalKVRU *aggmetric.GaugeFloat64 + totalRU *aggmetric.CounterFloat64 + totalKVRU *aggmetric.CounterFloat64 totalReadBatches *aggmetric.Gauge totalReadRequests *aggmetric.Gauge totalReadBytes *aggmetric.Gauge diff --git a/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go b/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go index d0fb4c350328..925ed03553c0 100644 --- a/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go +++ b/pkg/ccl/multitenantccl/tenantcostserver/token_bucket.go @@ -122,8 +122,8 @@ func (s *instance) TokenBucketRequest( } // Report current consumption. - metrics.totalRU.Update(consumption.RU) - metrics.totalKVRU.Update(consumption.KVRU) + metrics.totalRU.UpdateIfHigher(consumption.RU) + metrics.totalKVRU.UpdateIfHigher(consumption.KVRU) metrics.totalReadBatches.Update(int64(consumption.ReadBatches)) metrics.totalReadRequests.Update(int64(consumption.ReadRequests)) metrics.totalReadBytes.Update(int64(consumption.ReadBytes)) diff --git a/pkg/util/metric/aggmetric/agg_metric.go b/pkg/util/metric/aggmetric/agg_metric.go index ab5ad03ce5b6..9a841864f713 100644 --- a/pkg/util/metric/aggmetric/agg_metric.go +++ b/pkg/util/metric/aggmetric/agg_metric.go @@ -48,6 +48,11 @@ func (b Builder) Counter(metadata metric.Metadata) *AggCounter { return NewCounter(metadata, b.labels...) } +// CounterFloat64 constructs a new AggCounter with the Builder's labels. +func (b Builder) CounterFloat64(metadata metric.Metadata) *AggCounterFloat64 { + return NewCounterFloat64(metadata, b.labels...) +} + // Histogram constructs a new AggHistogram with the Builder's labels. func (b Builder) Histogram(opts metric.HistogramOptions) *AggHistogram { return NewHistogram(opts, b.labels...) diff --git a/pkg/util/metric/aggmetric/agg_metric_test.go b/pkg/util/metric/aggmetric/agg_metric_test.go index 605f7276b2f6..73652961ddea 100644 --- a/pkg/util/metric/aggmetric/agg_metric_test.go +++ b/pkg/util/metric/aggmetric/agg_metric_test.go @@ -54,6 +54,11 @@ func TestAggMetric(t *testing.T) { }, "tenant_id") r.AddMetric(c) + d := aggmetric.NewCounterFloat64(metric.Metadata{ + Name: "fob_counter", + }, "tenant_id") + r.AddMetric(d) + g := aggmetric.NewGauge(metric.Metadata{ Name: "bar_gauge", }, "tenant_id") @@ -78,6 +83,8 @@ func TestAggMetric(t *testing.T) { tenant3 := roachpb.MustMakeTenantID(3) c2 := c.AddChild(tenant2.String()) c3 := c.AddChild(tenant3.String()) + d2 := d.AddChild(tenant2.String()) + d3 := d.AddChild(tenant3.String()) g2 := g.AddChild(tenant2.String()) g3 := g.AddChild(tenant3.String()) f2 := f.AddChild(tenant2.String()) @@ -88,6 +95,8 @@ func TestAggMetric(t *testing.T) { t.Run("basic", func(t *testing.T) { c2.Inc(2) c3.Inc(4) + d2.Inc(123456.5) + d3.Inc(789089.5) g2.Inc(2) g3.Inc(3) g3.Dec(1) @@ -105,6 +114,7 @@ func TestAggMetric(t *testing.T) { t.Run("destroy", func(t *testing.T) { g3.Unlink() c2.Unlink() + d3.Unlink() f3.Unlink() h3.Unlink() testFile := "destroy.txt" @@ -119,6 +129,9 @@ func TestAggMetric(t *testing.T) { require.Panics(t, func() { c.AddChild(tenant3.String()) }) + require.Panics(t, func() { + d.AddChild(tenant2.String()) + }) require.Panics(t, func() { g.AddChild(tenant2.String()) }) @@ -130,6 +143,7 @@ func TestAggMetric(t *testing.T) { t.Run("add after destroy", func(t *testing.T) { g3 = g.AddChild(tenant3.String()) c2 = c.AddChild(tenant2.String()) + d3 = d.AddChild(tenant3.String()) f3 = f.AddChild(tenant3.String()) h3 = h.AddChild(tenant3.String()) testFile := "add_after_destroy.txt" @@ -141,15 +155,23 @@ func TestAggMetric(t *testing.T) { t.Run("panic on label length mismatch", func(t *testing.T) { require.Panics(t, func() { c.AddChild() }) + require.Panics(t, func() { d.AddChild() }) require.Panics(t, func() { g.AddChild("", "") }) }) } +type Eacher interface { + Each( + labels []*prometheusgo.LabelPair, f func(metric *prometheusgo.Metric), + ) +} + func TestAggMetricBuilder(t *testing.T) { defer leaktest.AfterTest(t)() b := aggmetric.MakeBuilder("tenant_id") c := b.Counter(metric.Metadata{Name: "foo_counter"}) + d := b.CounterFloat64(metric.Metadata{Name: "fob_counter"}) g := b.Gauge(metric.Metadata{Name: "bar_gauge"}) f := b.GaugeFloat64(metric.Metadata{Name: "baz_gauge"}) h := b.Histogram(metric.HistogramOptions{ @@ -163,12 +185,20 @@ func TestAggMetricBuilder(t *testing.T) { for i := 5; i < 10; i++ { tenantLabel := roachpb.MustMakeTenantID(uint64(i)).String() c.AddChild(tenantLabel) + d.AddChild(tenantLabel) g.AddChild(tenantLabel) f.AddChild(tenantLabel) h.AddChild(tenantLabel) } - c.Each(nil, func(pm *prometheusgo.Metric) { - require.Equal(t, 1, len(pm.GetLabel())) - }) + for _, m := range [5]Eacher{ + c, d, g, f, h, + } { + numChildren := 0 + m.Each(nil, func(pm *prometheusgo.Metric) { + require.Equal(t, 1, len(pm.GetLabel())) + numChildren += 1 + }) + require.Equal(t, 5, numChildren) + } } diff --git a/pkg/util/metric/aggmetric/counter.go b/pkg/util/metric/aggmetric/counter.go index e36d4cb7a70a..a6b703ef1eb5 100644 --- a/pkg/util/metric/aggmetric/counter.go +++ b/pkg/util/metric/aggmetric/counter.go @@ -126,3 +126,118 @@ func (g *Counter) Inc(i int64) { g.parent.g.Inc(i) atomic.AddInt64(&g.value, i) } + +// AggCounterFloat64 maintains a value as the sum of its children. The counter will +// report to crdb-internal time series only the aggregate sum of all of its +// children, while its children are additionally exported to prometheus via the +// PrometheusIterable interface. +type AggCounterFloat64 struct { + g metric.CounterFloat64 + childSet +} + +var _ metric.Iterable = (*AggCounterFloat64)(nil) +var _ metric.PrometheusIterable = (*AggCounterFloat64)(nil) +var _ metric.PrometheusExportable = (*AggCounterFloat64)(nil) + +// NewCounterFloat64 constructs a new AggCounterFloat64. +func NewCounterFloat64(metadata metric.Metadata, childLabels ...string) *AggCounterFloat64 { + c := &AggCounterFloat64{g: *metric.NewCounterFloat64(metadata)} + c.init(childLabels) + return c +} + +// GetName is part of the metric.Iterable interface. +func (c *AggCounterFloat64) GetName() string { return c.g.GetName() } + +// GetHelp is part of the metric.Iterable interface. +func (c *AggCounterFloat64) GetHelp() string { return c.g.GetHelp() } + +// GetMeasurement is part of the metric.Iterable interface. +func (c *AggCounterFloat64) GetMeasurement() string { return c.g.GetMeasurement() } + +// GetUnit is part of the metric.Iterable interface. +func (c *AggCounterFloat64) GetUnit() metric.Unit { return c.g.GetUnit() } + +// GetMetadata is part of the metric.Iterable interface. +func (c *AggCounterFloat64) GetMetadata() metric.Metadata { return c.g.GetMetadata() } + +// Inspect is part of the metric.Iterable interface. +func (c *AggCounterFloat64) Inspect(f func(interface{})) { f(c) } + +// GetType is part of the metric.PrometheusExportable interface. +func (c *AggCounterFloat64) GetType() *io_prometheus_client.MetricType { + return c.g.GetType() +} + +// GetLabels is part of the metric.PrometheusExportable interface. +func (c *AggCounterFloat64) GetLabels() []*io_prometheus_client.LabelPair { + return c.g.GetLabels() +} + +// ToPrometheusMetric is part of the metric.PrometheusExportable interface. +func (c *AggCounterFloat64) ToPrometheusMetric() *io_prometheus_client.Metric { + return c.g.ToPrometheusMetric() +} + +// Count returns the aggregate count of all of its current and past children. +func (c *AggCounterFloat64) Count() float64 { + return c.g.Count() +} + +// AddChild adds a Counter to this AggCounter. This method panics if a Counter +// already exists for this set of labelVals. +func (c *AggCounterFloat64) AddChild(labelVals ...string) *CounterFloat64 { + child := &CounterFloat64{ + parent: c, + labelValuesSlice: labelValuesSlice(labelVals), + } + c.add(child) + return child +} + +// CounterFloat64 is a child of a AggCounter. When it is incremented, so too is the +// parent. When metrics are collected by prometheus, each of the children will +// appear with a distinct label, however, when cockroach internally collects +// metrics, only the parent is collected. +type CounterFloat64 struct { + parent *AggCounterFloat64 + labelValuesSlice + value metric.CounterFloat64 +} + +// ToPrometheusMetric constructs a prometheus metric for this Counter. +func (g *CounterFloat64) ToPrometheusMetric() *io_prometheus_client.Metric { + return &io_prometheus_client.Metric{ + Counter: &io_prometheus_client.Counter{ + Value: proto.Float64(g.Value()), + }, + } +} + +// Unlink unlinks this child from the parent, i.e. the parent will no longer +// track this child (i.e. won't generate labels for it, etc). However, the child +// will continue to be functional and reference the parent, meaning updates to +// it will be reflected in the aggregate stored in the parent. +// +// See tenantrate.TestUseAfterRelease. +func (g *CounterFloat64) Unlink() { + g.parent.remove(g) +} + +// Value returns the AggCounter's current value. +func (g *CounterFloat64) Value() float64 { + return g.value.Count() +} + +// Inc increments the AggCounter's value. +func (g *CounterFloat64) Inc(i float64) { + g.parent.g.Inc(i) + g.value.Inc(i) +} + +// UpdateIfHigher sets the counter's value only if it's higher +// than the currently set one. It's assumed the caller holds +func (g *CounterFloat64) UpdateIfHigher(i float64) { + g.value.UpdateIfHigher(i) +} diff --git a/pkg/util/metric/aggmetric/testdata/add_after_destroy.txt b/pkg/util/metric/aggmetric/testdata/add_after_destroy.txt index 45b67cb0a546..823da0aa06b8 100644 --- a/pkg/util/metric/aggmetric/testdata/add_after_destroy.txt +++ b/pkg/util/metric/aggmetric/testdata/add_after_destroy.txt @@ -6,6 +6,9 @@ bar_gauge{tenant_id="3"} 0 baz_gauge 4 baz_gauge{tenant_id="2"} 1.5 baz_gauge{tenant_id="3"} 0 +fob_counter 912546 +fob_counter{tenant_id="2"} 123456.5 +fob_counter{tenant_id="3"} 0 foo_counter 6 foo_counter{tenant_id="2"} 0 foo_counter{tenant_id="3"} 4 diff --git a/pkg/util/metric/aggmetric/testdata/basic.txt b/pkg/util/metric/aggmetric/testdata/basic.txt index c8c4533fbbb3..fe5a58971444 100644 --- a/pkg/util/metric/aggmetric/testdata/basic.txt +++ b/pkg/util/metric/aggmetric/testdata/basic.txt @@ -6,6 +6,9 @@ bar_gauge{tenant_id="3"} 2 baz_gauge 4 baz_gauge{tenant_id="2"} 1.5 baz_gauge{tenant_id="3"} 2.5 +fob_counter 912546 +fob_counter{tenant_id="2"} 123456.5 +fob_counter{tenant_id="3"} 789089.5 foo_counter 6 foo_counter{tenant_id="2"} 2 foo_counter{tenant_id="3"} 4 diff --git a/pkg/util/metric/aggmetric/testdata/destroy.txt b/pkg/util/metric/aggmetric/testdata/destroy.txt index c7ad402b85a4..d8b22e618dcf 100644 --- a/pkg/util/metric/aggmetric/testdata/destroy.txt +++ b/pkg/util/metric/aggmetric/testdata/destroy.txt @@ -4,6 +4,8 @@ bar_gauge 4 bar_gauge{tenant_id="2"} 2 baz_gauge 4 baz_gauge{tenant_id="2"} 1.5 +fob_counter 912546 +fob_counter{tenant_id="2"} 123456.5 foo_counter 6 foo_counter{tenant_id="3"} 4 histo_gram_bucket{le="+Inf"} 2 diff --git a/pkg/util/metric/metric.go b/pkg/util/metric/metric.go index afd19cf83638..3b3b806404fd 100644 --- a/pkg/util/metric/metric.go +++ b/pkg/util/metric/metric.go @@ -578,6 +578,63 @@ func (c *Counter) GetMetadata() Metadata { return baseMetadata } +type CounterFloat64 struct { + Metadata + count syncutil.AtomicFloat64 +} + +// GetMetadata returns the metric's metadata including the Prometheus +// MetricType. +func (c *CounterFloat64) GetMetadata() Metadata { + baseMetadata := c.Metadata + baseMetadata.MetricType = prometheusgo.MetricType_COUNTER + return baseMetadata +} + +func (c *CounterFloat64) Clear() { + syncutil.StoreFloat64(&c.count, 0) +} + +func (c *CounterFloat64) Count() float64 { + return syncutil.LoadFloat64(&c.count) +} + +func (c *CounterFloat64) Inc(i float64) { + syncutil.AddFloat64(&c.count, i) +} + +func (c *CounterFloat64) UpdateIfHigher(i float64) { + syncutil.StoreFloat64IfHigher(&c.count, i) +} + +func (c *CounterFloat64) Snapshot() *CounterFloat64 { + newCounter := NewCounterFloat64(c.Metadata) + syncutil.StoreFloat64(&newCounter.count, c.Count()) + return newCounter +} + +// GetType returns the prometheus type enum for this metric. +func (c *CounterFloat64) GetType() *prometheusgo.MetricType { + return prometheusgo.MetricType_COUNTER.Enum() +} + +// ToPrometheusMetric returns a filled-in prometheus metric of the right type. +func (c *CounterFloat64) ToPrometheusMetric() *prometheusgo.Metric { + return &prometheusgo.Metric{ + Counter: &prometheusgo.Counter{Value: proto.Float64(c.Count())}, + } +} + +// MarshalJSON marshals to JSON. +func (c *CounterFloat64) MarshalJSON() ([]byte, error) { + return json.Marshal(c.Count()) +} + +// NewCounterFloat64 creates a counter. +func NewCounterFloat64(metadata Metadata) *CounterFloat64 { + return &CounterFloat64{Metadata: metadata} +} + // A Gauge atomically stores a single integer value. type Gauge struct { Metadata diff --git a/pkg/util/metric/metric_test.go b/pkg/util/metric/metric_test.go index d8a451ec06ad..bee57d768b8a 100644 --- a/pkg/util/metric/metric_test.go +++ b/pkg/util/metric/metric_test.go @@ -95,6 +95,34 @@ func TestCounter(t *testing.T) { testMarshal(t, c, "90") } +func TestCounterFloat64(t *testing.T) { + g := NewCounterFloat64(emptyMetadata) + g.UpdateIfHigher(10) + if v := g.Count(); v != 10 { + t.Fatalf("unexpected value: %f", v) + } + testMarshal(t, g, "10") + + var wg sync.WaitGroup + for i := int64(0); i < 10; i++ { + wg.Add(1) + go func(i int64) { g.Inc(float64(i)); wg.Done() }(i) + } + wg.Wait() + if v := g.Count(); math.Abs(v-55.0) > 0.001 { + t.Fatalf("unexpected value: %g", v) + } + + for i := int64(55); i < 65; i++ { + wg.Add(1) + go func(i int64) { g.UpdateIfHigher(float64(i)); wg.Done() }(i) + } + wg.Wait() + if v := g.Count(); math.Abs(v-64.0) > 0.001 { + t.Fatalf("unexpected value: %g", v) + } +} + func setNow(d time.Duration) { now = func() time.Time { return time.Time{}.Add(d) diff --git a/pkg/util/syncutil/atomic.go b/pkg/util/syncutil/atomic.go index e0357331fc49..419d6b74f62d 100644 --- a/pkg/util/syncutil/atomic.go +++ b/pkg/util/syncutil/atomic.go @@ -31,6 +31,32 @@ func LoadFloat64(addr *AtomicFloat64) (val float64) { return math.Float64frombits(atomic.LoadUint64((*uint64)(addr))) } +func AddFloat64(addr *AtomicFloat64, add float64) (val float64) { + for { + oldFloat := LoadFloat64(addr) + oldInt := math.Float64bits(oldFloat) + newFloat := oldFloat + add + newInt := math.Float64bits(newFloat) + if atomic.CompareAndSwapUint64((*uint64)(addr), oldInt, newInt) { + return + } + } +} + +func StoreFloat64IfHigher(addr *AtomicFloat64, new float64) (val float64) { + for { + oldFloat := LoadFloat64(addr) + if oldFloat > new { + return + } + oldInt := math.Float64bits(oldFloat) + newInt := math.Float64bits(new) + if atomic.CompareAndSwapUint64((*uint64)(addr), oldInt, newInt) { + return + } + } +} + // AtomicBool mimics an atomic boolean. type AtomicBool uint32 diff --git a/pkg/util/syncutil/atomic_test.go b/pkg/util/syncutil/atomic_test.go index 0e68dbf0452c..b95a783d277e 100644 --- a/pkg/util/syncutil/atomic_test.go +++ b/pkg/util/syncutil/atomic_test.go @@ -11,6 +11,7 @@ package syncutil import ( + "math" "testing" "github.com/stretchr/testify/require" @@ -33,8 +34,8 @@ func TestAtomicFloat64(t *testing.T) { } x.before = magic64 x.after = magic64 - for delta := uint64(1); delta+delta > delta; delta += delta { - e := float64(delta) + for delta := float64(1); delta+delta > delta; delta += delta { + e := delta StoreFloat64(&x.i, e) a := LoadFloat64(&x.i) if a != e { @@ -46,6 +47,60 @@ func TestAtomicFloat64(t *testing.T) { } } +// TestAtomicStoreFloat64IfHigher is also adapted from https://golang.org/src/sync/atomic/atomic_test.go +func TestAtomicStoreFloat64IfHigher(t *testing.T) { + var x struct { + before AtomicFloat64 + i AtomicFloat64 + after AtomicFloat64 + } + x.before = magic64 + x.after = magic64 + + // Roughly half the time we will have to store a larger value. + StoreFloat64(&x.i, math.MaxFloat64/math.Pow(2, 500)) + for delta := float64(1); delta+delta > delta; delta += delta { + e := delta + cur := LoadFloat64(&x.i) + shouldStore := e > cur + StoreFloat64IfHigher(&x.i, e) + afterStore := LoadFloat64(&x.i) + if shouldStore && e != afterStore { + t.Fatalf("should store: expected=%f got=%f", e, afterStore) + } + if !shouldStore && cur != afterStore { + t.Fatalf("should not store: expected=%f got=%f", cur, afterStore) + } + StoreFloat64(&x.i, math.MaxFloat64/math.Pow(2, 500)) + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + +// TestAtomicAddFloat64 is also adapted from https://golang.org/src/sync/atomic/atomic_test.go +func TestAtomicAddFloat64(t *testing.T) { + var x struct { + before AtomicFloat64 + i AtomicFloat64 + after AtomicFloat64 + } + x.before = magic64 + x.after = magic64 + j := LoadFloat64(&x.i) + for delta := float64(1); delta+delta > delta; delta += delta { + AddFloat64(&x.i, delta) + j += delta + got := LoadFloat64(&x.i) + if j != LoadFloat64(&x.i) { + t.Fatalf("expected=%f got=%f", j, got) + } + } + if x.before != magic64 || x.after != magic64 { + t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, uint64(magic64), uint64(magic64)) + } +} + func TestAtomicBool(t *testing.T) { var x AtomicBool x.Set(true)