From 2bb184de63cf6a0a865a6dab6ff3bc38277c4c54 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 9 Mar 2020 16:09:19 -0400 Subject: [PATCH 01/10] [aggregator] Take last value by wall clock timestamp not arrival time --- src/aggregator/aggregation/counter_test.go | 5 ++- src/aggregator/aggregation/gauge.go | 26 +++++++++---- src/aggregator/aggregation/gauge_test.go | 31 +++++++++++++-- src/aggregator/aggregation/options.go | 37 ++++++++++++++++-- src/aggregator/aggregation/options_test.go | 3 +- .../aggregation/timer_benchmark_test.go | 4 +- src/aggregator/aggregation/timer_test.go | 9 +++-- src/aggregator/aggregator/aggregation.go | 39 +++++++++++++++---- src/aggregator/aggregator/elem_base.go | 3 +- src/aggregator/aggregator/gauge_elem_gen.go | 6 +-- 10 files changed, 127 insertions(+), 36 deletions(-) diff --git a/src/aggregator/aggregation/counter_test.go b/src/aggregator/aggregation/counter_test.go index 7d8537812c..c91b9c7211 100644 --- a/src/aggregator/aggregation/counter_test.go +++ b/src/aggregator/aggregation/counter_test.go @@ -26,10 +26,11 @@ import ( "github.com/m3db/m3/src/metrics/aggregation" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" ) func TestCounterDefaultAggregationType(t *testing.T) { - c := NewCounter(NewOptions()) + c := NewCounter(NewOptions(tally.NoopScope)) require.False(t, c.HasExpensiveAggregations) for i := 1; i <= 100; i++ { c.Update(int64(i)) @@ -41,7 +42,7 @@ func TestCounterDefaultAggregationType(t *testing.T) { } func TestCounterCustomAggregationType(t *testing.T) { - opts := NewOptions() + opts := NewOptions(tally.NoopScope) opts.HasExpensiveAggregations = true c := NewCounter(opts) diff --git a/src/aggregator/aggregation/gauge.go b/src/aggregator/aggregation/gauge.go index c67109c0fb..872f96fcf6 100644 --- a/src/aggregator/aggregation/gauge.go +++ b/src/aggregator/aggregation/gauge.go @@ -22,6 +22,7 @@ package aggregation import ( "math" + "time" "github.com/m3db/m3/src/metrics/aggregation" ) @@ -34,12 +35,13 @@ const ( type Gauge struct { Options - last float64 - sum float64 - sumSq float64 - count int64 - max float64 - min float64 + last float64 + lastAt time.Time + sum float64 + sumSq float64 + count int64 + max float64 + min float64 } // NewGauge creates a new gauge. @@ -52,8 +54,16 @@ func NewGauge(opts Options) Gauge { } // Update updates the gauge value. -func (g *Gauge) Update(value float64) { - g.last = value +func (g *Gauge) Update(timestamp time.Time, value float64) { + if g.lastAt.IsZero() || timestamp.After(g.lastAt) { + // NB(r): Only set the last value if this value arrives + // after the wall clock timestamp of previous values, not + // the arrival time (i.e. order received). + g.last = value + g.lastAt = timestamp + } else { + g.Options.Metrics.IncGaugeValuesOutOfOrder() + } g.sum += value g.count++ diff --git a/src/aggregator/aggregation/gauge_test.go b/src/aggregator/aggregation/gauge_test.go index 38e9fb3087..0d5dae7189 100644 --- a/src/aggregator/aggregation/gauge_test.go +++ b/src/aggregator/aggregation/gauge_test.go @@ -22,17 +22,19 @@ package aggregation import ( "testing" + "time" "github.com/m3db/m3/src/metrics/aggregation" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" ) func TestGaugeDefaultAggregationType(t *testing.T) { - g := NewGauge(NewOptions()) + g := NewGauge(NewOptions(tally.NoopScope)) require.False(t, g.HasExpensiveAggregations) for i := 1.0; i <= 100.0; i++ { - g.Update(i) + g.Update(time.Now(), i) } require.Equal(t, 100.0, g.Last()) require.Equal(t, 100.0, g.ValueOf(aggregation.Last)) @@ -42,14 +44,14 @@ func TestGaugeDefaultAggregationType(t *testing.T) { } func TestGaugeCustomAggregationType(t *testing.T) { - opts := NewOptions() + opts := NewOptions(tally.NoopScope) opts.HasExpensiveAggregations = true g := NewGauge(opts) require.True(t, g.HasExpensiveAggregations) for i := 1; i <= 100; i++ { - g.Update(float64(i)) + g.Update(time.Now(), float64(i)) } require.Equal(t, 100.0, g.Last()) @@ -78,3 +80,24 @@ func TestGaugeCustomAggregationType(t *testing.T) { } } } + +func TestGaugeLastOutOfOrderValues(t *testing.T) { + scope := tally.NewTestScope("", nil) + g := NewGauge(NewOptions(scope)) + + timeMid := time.Now().Add(time.Minute) + timePre := timeMid.Add(-1 * time.Second) + timePrePre := timeMid.Add(-1 * time.Second) + timeAfter := timeMid.Add(time.Second) + + g.Update(timeMid, 42) + g.Update(timePre, 41) + g.Update(timeAfter, 43) + g.Update(timePrePre, 40) + + require.Equal(t, 43.0, g.Last()) + snap := scope.Snapshot().Counters() + counter, ok := snap["aggregation.gauges.values-out-of-order+"] + require.True(t, ok) + require.Equal(t, int64(2), counter.Value()) +} diff --git a/src/aggregator/aggregation/options.go b/src/aggregator/aggregation/options.go index 2b70ec8136..c56d2cb215 100644 --- a/src/aggregator/aggregation/options.go +++ b/src/aggregator/aggregation/options.go @@ -20,10 +20,13 @@ package aggregation -import "github.com/m3db/m3/src/metrics/aggregation" +import ( + "github.com/m3db/m3/src/metrics/aggregation" + "github.com/uber-go/tally" +) var ( - defaultOptions Options + defaultHasExpensiveAggregations = false ) // Options is the options for aggregations. @@ -31,11 +34,37 @@ type Options struct { // HasExpensiveAggregations means expensive (multiplication/division) // aggregation types are enabled. HasExpensiveAggregations bool + // Metrics is as set of aggregation metrics. + Metrics Metrics +} + +// Metrics is a set of metrics that can be used by elements. +type Metrics struct { + gaugeValuesOutOfOrder tally.Counter +} + +// NewMetrics is a set of aggregation metrics. +func NewMetrics(scope tally.Scope) Metrics { + scope = scope.SubScope("aggregation") + gaugeScope := scope.SubScope("gauges") + return Metrics{ + gaugeValuesOutOfOrder: gaugeScope.Counter("values-out-of-order"), + } +} + +// IncGaugeValuesOutOfOrder increments value or if not initialized is a no-op. +func (m Metrics) IncGaugeValuesOutOfOrder() { + if m.gaugeValuesOutOfOrder != nil { + m.gaugeValuesOutOfOrder.Inc(1) + } } // NewOptions creates a new aggregation options. -func NewOptions() Options { - return defaultOptions +func NewOptions(scope tally.Scope) Options { + return Options{ + HasExpensiveAggregations: defaultHasExpensiveAggregations, + Metrics: NewMetrics(scope), + } } // ResetSetData resets the aggregation options. diff --git a/src/aggregator/aggregation/options_test.go b/src/aggregator/aggregation/options_test.go index 00156ada84..c065b0db6a 100644 --- a/src/aggregator/aggregation/options_test.go +++ b/src/aggregator/aggregation/options_test.go @@ -26,10 +26,11 @@ import ( "github.com/m3db/m3/src/metrics/aggregation" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" ) func TestOptions(t *testing.T) { - o := NewOptions() + o := NewOptions(tally.NoopScope) require.False(t, o.HasExpensiveAggregations) o.ResetSetData(nil) diff --git a/src/aggregator/aggregation/timer_benchmark_test.go b/src/aggregator/aggregation/timer_benchmark_test.go index 08886f97d7..8cff2cfbbd 100644 --- a/src/aggregator/aggregation/timer_benchmark_test.go +++ b/src/aggregator/aggregation/timer_benchmark_test.go @@ -24,10 +24,12 @@ import ( "testing" "github.com/m3db/m3/src/aggregator/aggregation/quantile/cm" + + "github.com/uber-go/tally" ) func getTimer() Timer { - opts := NewOptions() + opts := NewOptions(tally.NoopScope) opts.ResetSetData(testAggTypes) timer := NewTimer(testQuantiles, cm.NewOptions(), opts) diff --git a/src/aggregator/aggregation/timer_test.go b/src/aggregator/aggregation/timer_test.go index 01db582e35..92e12781cb 100644 --- a/src/aggregator/aggregation/timer_test.go +++ b/src/aggregator/aggregation/timer_test.go @@ -29,6 +29,7 @@ import ( "github.com/m3db/m3/src/x/pool" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" ) var ( @@ -56,13 +57,13 @@ func TestCreateTimerResetStream(t *testing.T) { // Add a value to the timer and close the timer, which returns the // underlying stream to the pool. - timer := NewTimer(testQuantiles, streamOpts, NewOptions()) + timer := NewTimer(testQuantiles, streamOpts, NewOptions(tally.NoopScope)) timer.Add(1.0) require.Equal(t, 1.0, timer.Min()) timer.Close() // Create a new timer and assert the underlying stream has been closed. - timer = NewTimer(testQuantiles, streamOpts, NewOptions()) + timer = NewTimer(testQuantiles, streamOpts, NewOptions(tally.NoopScope)) timer.Add(1.0) require.Equal(t, 1.0, timer.Min()) timer.Close() @@ -70,7 +71,7 @@ func TestCreateTimerResetStream(t *testing.T) { } func TestTimerAggregations(t *testing.T) { - opts := NewOptions() + opts := NewOptions(tally.NoopScope) opts.ResetSetData(testAggTypes) timer := NewTimer(testQuantiles, cm.NewOptions(), opts) @@ -143,7 +144,7 @@ func TestTimerAggregations(t *testing.T) { } func TestTimerAggregationsNotExpensive(t *testing.T) { - opts := NewOptions() + opts := NewOptions(tally.NoopScope) opts.ResetSetData(aggregation.Types{aggregation.Sum}) timer := NewTimer(testQuantiles, cm.NewOptions(), opts) diff --git a/src/aggregator/aggregator/aggregation.go b/src/aggregator/aggregator/aggregation.go index b1d3f7a4ac..61354ed32b 100644 --- a/src/aggregator/aggregator/aggregation.go +++ b/src/aggregator/aggregator/aggregation.go @@ -21,6 +21,8 @@ package aggregator import ( + "time" + "github.com/m3db/m3/src/aggregator/aggregation" "github.com/m3db/m3/src/metrics/metric/unaggregated" ) @@ -34,23 +36,44 @@ func newCounterAggregation(c aggregation.Counter) counterAggregation { return counterAggregation{Counter: c} } -func (c *counterAggregation) Add(value float64) { c.Counter.Update(int64(value)) } -func (c *counterAggregation) AddUnion(mu unaggregated.MetricUnion) { c.Counter.Update(mu.CounterVal) } +func (c *counterAggregation) Add(value float64) { + c.Counter.Update(int64(value)) +} + +func (c *counterAggregation) AddUnion(mu unaggregated.MetricUnion) { + c.Counter.Update(mu.CounterVal) +} // timerAggregation is a timer aggregation. type timerAggregation struct { aggregation.Timer } -func newTimerAggregation(t aggregation.Timer) timerAggregation { return timerAggregation{Timer: t} } -func (t *timerAggregation) Add(value float64) { t.Timer.Add(value) } -func (t *timerAggregation) AddUnion(mu unaggregated.MetricUnion) { t.Timer.AddBatch(mu.BatchTimerVal) } +func newTimerAggregation(t aggregation.Timer) timerAggregation { + return timerAggregation{Timer: t} +} + +func (t *timerAggregation) Add(value float64) { + t.Timer.Add(value) +} + +func (t *timerAggregation) AddUnion(mu unaggregated.MetricUnion) { + t.Timer.AddBatch(mu.BatchTimerVal) +} // gaugeAggregation is a gauge aggregation. type gaugeAggregation struct { aggregation.Gauge } -func newGaugeAggregation(g aggregation.Gauge) gaugeAggregation { return gaugeAggregation{Gauge: g} } -func (g *gaugeAggregation) Add(value float64) { g.Gauge.Update(value) } -func (g *gaugeAggregation) AddUnion(mu unaggregated.MetricUnion) { g.Gauge.Update(mu.GaugeVal) } +func newGaugeAggregation(g aggregation.Gauge) gaugeAggregation { + return gaugeAggregation{Gauge: g} +} + +func (g *gaugeAggregation) Add(t time.Time, value float64) { + g.Gauge.Update(t, value) +} + +func (g *gaugeAggregation) AddUnion(t time.Time, mu unaggregated.MetricUnion) { + g.Gauge.Update(t, mu.GaugeVal) +} diff --git a/src/aggregator/aggregator/elem_base.go b/src/aggregator/aggregator/elem_base.go index 440b840620..a8e748a326 100644 --- a/src/aggregator/aggregator/elem_base.go +++ b/src/aggregator/aggregator/elem_base.go @@ -173,10 +173,11 @@ type elemBase struct { } func newElemBase(opts Options) elemBase { + scope := opts.InstrumentOptions().MetricsScope() return elemBase{ opts: opts, aggTypesOpts: opts.AggregationTypesOptions(), - aggOpts: raggregation.NewOptions(), + aggOpts: raggregation.NewOptions(scope), } } diff --git a/src/aggregator/aggregator/gauge_elem_gen.go b/src/aggregator/aggregator/gauge_elem_gen.go index 0ccbf80ca2..536ff03992 100644 --- a/src/aggregator/aggregator/gauge_elem_gen.go +++ b/src/aggregator/aggregator/gauge_elem_gen.go @@ -153,7 +153,7 @@ func (e *GaugeElem) AddUnion(timestamp time.Time, mu unaggregated.MetricUnion) e lockedAgg.Unlock() return errAggregationClosed } - lockedAgg.aggregation.AddUnion(mu) + lockedAgg.aggregation.AddUnion(timestamp, mu) lockedAgg.Unlock() return nil } @@ -170,7 +170,7 @@ func (e *GaugeElem) AddValue(timestamp time.Time, value float64) error { lockedAgg.Unlock() return errAggregationClosed } - lockedAgg.aggregation.Add(value) + lockedAgg.aggregation.Add(timestamp, value) lockedAgg.Unlock() return nil } @@ -196,7 +196,7 @@ func (e *GaugeElem) AddUnique(timestamp time.Time, values []float64, sourceID ui } lockedAgg.sourcesSeen.Set(source) for _, v := range values { - lockedAgg.aggregation.Add(v) + lockedAgg.aggregation.Add(timestamp, v) } lockedAgg.Unlock() return nil From 2268961d9fc745a5b0c642ca9b96020f99f71b9b Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 9 Mar 2020 18:02:50 -0400 Subject: [PATCH 02/10] Fix build --- src/aggregator/aggregator/aggregation_test.go | 18 ++++++++++-------- src/aggregator/aggregator/elem_base_test.go | 5 +++-- src/aggregator/aggregator/elem_test.go | 2 +- src/aggregator/integration/integration_data.go | 8 ++++---- .../multi_server_forwarding_pipeline_test.go | 5 +++-- 5 files changed, 21 insertions(+), 17 deletions(-) diff --git a/src/aggregator/aggregator/aggregation_test.go b/src/aggregator/aggregator/aggregation_test.go index b6ff036038..dab214d093 100644 --- a/src/aggregator/aggregator/aggregation_test.go +++ b/src/aggregator/aggregator/aggregation_test.go @@ -22,6 +22,7 @@ package aggregator import ( "testing" + "time" "github.com/m3db/m3/src/aggregator/aggregation" "github.com/m3db/m3/src/aggregator/aggregation/quantile/cm" @@ -29,6 +30,7 @@ import ( "github.com/m3db/m3/src/metrics/metric/unaggregated" "github.com/stretchr/testify/require" + "github.com/uber-go/tally" ) var ( @@ -53,7 +55,7 @@ var ( ) func TestCounterAggregationAdd(t *testing.T) { - c := newCounterAggregation(aggregation.NewCounter(aggregation.NewOptions())) + c := newCounterAggregation(aggregation.NewCounter(aggregation.NewOptions(tally.NoopScope))) for _, v := range testAggregationValues { c.Add(v) } @@ -62,7 +64,7 @@ func TestCounterAggregationAdd(t *testing.T) { } func TestCounterAggregationAddUnion(t *testing.T) { - c := newCounterAggregation(aggregation.NewCounter(aggregation.NewOptions())) + c := newCounterAggregation(aggregation.NewCounter(aggregation.NewOptions(tally.NoopScope))) for _, v := range testAggregationUnions { c.AddUnion(v) } @@ -71,7 +73,7 @@ func TestCounterAggregationAddUnion(t *testing.T) { } func TestTimerAggregationAdd(t *testing.T) { - tm := newTimerAggregation(aggregation.NewTimer([]float64{0.5}, cm.NewOptions(), aggregation.NewOptions())) + tm := newTimerAggregation(aggregation.NewTimer([]float64{0.5}, cm.NewOptions(), aggregation.NewOptions(tally.NoopScope))) for _, v := range testAggregationValues { tm.Add(v) } @@ -80,7 +82,7 @@ func TestTimerAggregationAdd(t *testing.T) { } func TestTimerAggregationAddUnion(t *testing.T) { - tm := newTimerAggregation(aggregation.NewTimer([]float64{0.5}, cm.NewOptions(), aggregation.NewOptions())) + tm := newTimerAggregation(aggregation.NewTimer([]float64{0.5}, cm.NewOptions(), aggregation.NewOptions(tally.NoopScope))) for _, v := range testAggregationUnions { tm.AddUnion(v) } @@ -89,18 +91,18 @@ func TestTimerAggregationAddUnion(t *testing.T) { } func TestGaugeAggregationAdd(t *testing.T) { - g := newGaugeAggregation(aggregation.NewGauge(aggregation.NewOptions())) + g := newGaugeAggregation(aggregation.NewGauge(aggregation.NewOptions(tally.NoopScope))) for _, v := range testAggregationValues { - g.Add(v) + g.Add(time.Now(), v) } require.Equal(t, int64(4), g.Count()) require.Equal(t, 799.2, g.Sum()) } func TestGaugeAggregationAddUnion(t *testing.T) { - g := newGaugeAggregation(aggregation.NewGauge(aggregation.NewOptions())) + g := newGaugeAggregation(aggregation.NewGauge(aggregation.NewOptions(tally.NoopScope))) for _, v := range testAggregationUnions { - g.AddUnion(v) + g.AddUnion(time.Now(), v) } require.Equal(t, int64(3), g.Count()) require.Equal(t, 123.456, g.Sum()) diff --git a/src/aggregator/aggregator/elem_base_test.go b/src/aggregator/aggregator/elem_base_test.go index e2350603b3..8dbac0777f 100644 --- a/src/aggregator/aggregator/elem_base_test.go +++ b/src/aggregator/aggregator/elem_base_test.go @@ -23,6 +23,7 @@ package aggregator import ( "strings" "testing" + "time" raggregation "github.com/m3db/m3/src/aggregator/aggregation" maggregation "github.com/m3db/m3/src/metrics/aggregation" @@ -276,11 +277,11 @@ func TestGaugeElemBase(t *testing.T) { func TestGaugeElemBaseNewLockedAggregation(t *testing.T) { e := gaugeElemBase{} la := e.NewAggregation(nil, raggregation.Options{}) - la.AddUnion(unaggregated.MetricUnion{ + la.AddUnion(time.Now(), unaggregated.MetricUnion{ Type: metric.GaugeType, GaugeVal: 100.0, }) - la.AddUnion(unaggregated.MetricUnion{ + la.AddUnion(time.Now(), unaggregated.MetricUnion{ Type: metric.GaugeType, GaugeVal: 200.0, }) diff --git a/src/aggregator/aggregator/elem_test.go b/src/aggregator/aggregator/elem_test.go index fb48068d1c..3e056c87dc 100644 --- a/src/aggregator/aggregator/elem_test.go +++ b/src/aggregator/aggregator/elem_test.go @@ -1843,7 +1843,7 @@ func testGaugeElem( e := MustNewGaugeElem(testGaugeID, testStoragePolicy, aggTypes, pipeline, testNumForwardedTimes, WithPrefixWithSuffix, opts) for i, aligned := range alignedstartAtNanos { gauge := &lockedGaugeAggregation{aggregation: newGaugeAggregation(raggregation.NewGauge(e.aggOpts))} - gauge.aggregation.Update(gaugeVals[i]) + gauge.aggregation.Update(time.Now(), gaugeVals[i]) e.values = append(e.values, timedGauge{ startAtNanos: aligned, lockedAgg: gauge, diff --git a/src/aggregator/integration/integration_data.go b/src/aggregator/integration/integration_data.go index 3bfa3763e5..247dab53b5 100644 --- a/src/aggregator/integration/integration_data.go +++ b/src/aggregator/integration/integration_data.go @@ -367,7 +367,7 @@ func computeExpectedAggregationBuckets( var ( aggTypeOpts = opts.AggregationTypesOptions() aggTypes = maggregation.NewIDDecompressor().MustDecompress(bucket.key.aggregationID) - aggregationOpts = aggregation.NewOptions() + aggregationOpts = aggregation.NewOptions(opts.InstrumentOptions().MetricsScope()) ) switch mu.Type() { case metric.CounterType: @@ -431,7 +431,7 @@ func addUntimedMetricToAggregation( return v, nil case metric.GaugeType: v := values.(aggregation.Gauge) - v.Update(mu.GaugeVal) + v.Update(time.Now(), mu.GaugeVal) return v, nil default: return nil, fmt.Errorf("unrecognized untimed metric type %v", mu.Type) @@ -453,7 +453,7 @@ func addTimedMetricToAggregation( return v, nil case metric.GaugeType: v := values.(aggregation.Gauge) - v.Update(mu.Value) + v.Update(time.Now(), mu.Value) return v, nil default: return nil, fmt.Errorf("unrecognized timed metric type %v", mu.Type) @@ -478,7 +478,7 @@ func addForwardedMetricToAggregation( case metric.GaugeType: v := values.(aggregation.Gauge) for _, val := range mu.Values { - v.Update(val) + v.Update(time.Now(), val) } return v, nil default: diff --git a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go index 85221ce080..330537f6dc 100644 --- a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go +++ b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go @@ -397,8 +397,9 @@ func testMultiServerForwardingPipeline(t *testing.T, discardNaNAggregatedValues continue } currTime := start.Add(time.Duration(i+1) * storagePolicy.Resolution().Window) - agg := aggregation.NewGauge(aggregation.NewOptions()) - agg.Update(expectedValuesList[spIdx][i]) + scope := aggregatorOpts.InstrumentOptions().MetricsScope() + agg := aggregation.NewGauge(aggregation.NewOptions(scope)) + agg.Update(time.Now(), expectedValuesList[spIdx][i]) expectedValuesByTimeList[spIdx][currTime.UnixNano()] = agg } } From 747e8af4531b958f2c5e3fd409845be0e1dd6b6f Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 9 Mar 2020 18:15:38 -0400 Subject: [PATCH 03/10] Address feedback --- src/aggregator/aggregation/counter_test.go | 6 ++-- src/aggregator/aggregation/gauge.go | 2 +- src/aggregator/aggregation/gauge_test.go | 7 +++-- src/aggregator/aggregation/options.go | 30 +++++++++++++------ src/aggregator/aggregation/options_test.go | 4 +-- .../aggregation/timer_benchmark_test.go | 5 ++-- src/aggregator/aggregation/timer_test.go | 10 +++---- src/aggregator/aggregator/aggregation_test.go | 14 ++++----- src/aggregator/aggregator/elem_base.go | 3 +- .../integration/integration_data.go | 2 +- .../multi_server_forwarding_pipeline_test.go | 4 +-- 11 files changed, 49 insertions(+), 38 deletions(-) diff --git a/src/aggregator/aggregation/counter_test.go b/src/aggregator/aggregation/counter_test.go index c91b9c7211..8af2bd680d 100644 --- a/src/aggregator/aggregation/counter_test.go +++ b/src/aggregator/aggregation/counter_test.go @@ -24,13 +24,13 @@ import ( "testing" "github.com/m3db/m3/src/metrics/aggregation" + "github.com/m3db/m3/src/x/instrument" "github.com/stretchr/testify/require" - "github.com/uber-go/tally" ) func TestCounterDefaultAggregationType(t *testing.T) { - c := NewCounter(NewOptions(tally.NoopScope)) + c := NewCounter(NewOptions(instrument.NewOptions())) require.False(t, c.HasExpensiveAggregations) for i := 1; i <= 100; i++ { c.Update(int64(i)) @@ -42,7 +42,7 @@ func TestCounterDefaultAggregationType(t *testing.T) { } func TestCounterCustomAggregationType(t *testing.T) { - opts := NewOptions(tally.NoopScope) + opts := NewOptions(instrument.NewOptions()) opts.HasExpensiveAggregations = true c := NewCounter(opts) diff --git a/src/aggregator/aggregation/gauge.go b/src/aggregator/aggregation/gauge.go index 872f96fcf6..4dbd7efcb6 100644 --- a/src/aggregator/aggregation/gauge.go +++ b/src/aggregator/aggregation/gauge.go @@ -62,7 +62,7 @@ func (g *Gauge) Update(timestamp time.Time, value float64) { g.last = value g.lastAt = timestamp } else { - g.Options.Metrics.IncGaugeValuesOutOfOrder() + g.Options.Metrics.Gauge.IncValuesOutOfOrder() } g.sum += value diff --git a/src/aggregator/aggregation/gauge_test.go b/src/aggregator/aggregation/gauge_test.go index 0d5dae7189..bf3505e42b 100644 --- a/src/aggregator/aggregation/gauge_test.go +++ b/src/aggregator/aggregation/gauge_test.go @@ -25,13 +25,14 @@ import ( "time" "github.com/m3db/m3/src/metrics/aggregation" + "github.com/m3db/m3/src/x/instrument" "github.com/stretchr/testify/require" "github.com/uber-go/tally" ) func TestGaugeDefaultAggregationType(t *testing.T) { - g := NewGauge(NewOptions(tally.NoopScope)) + g := NewGauge(NewOptions(instrument.NewOptions())) require.False(t, g.HasExpensiveAggregations) for i := 1.0; i <= 100.0; i++ { g.Update(time.Now(), i) @@ -44,7 +45,7 @@ func TestGaugeDefaultAggregationType(t *testing.T) { } func TestGaugeCustomAggregationType(t *testing.T) { - opts := NewOptions(tally.NoopScope) + opts := NewOptions(instrument.NewOptions()) opts.HasExpensiveAggregations = true g := NewGauge(opts) @@ -83,7 +84,7 @@ func TestGaugeCustomAggregationType(t *testing.T) { func TestGaugeLastOutOfOrderValues(t *testing.T) { scope := tally.NewTestScope("", nil) - g := NewGauge(NewOptions(scope)) + g := NewGauge(NewOptions(instrument.NewOptions().SetMetricsScope(scope))) timeMid := time.Now().Add(time.Minute) timePre := timeMid.Add(-1 * time.Second) diff --git a/src/aggregator/aggregation/options.go b/src/aggregator/aggregation/options.go index c56d2cb215..d2c797454c 100644 --- a/src/aggregator/aggregation/options.go +++ b/src/aggregator/aggregation/options.go @@ -22,6 +22,8 @@ package aggregation import ( "github.com/m3db/m3/src/metrics/aggregation" + "github.com/m3db/m3/src/x/instrument" + "github.com/uber-go/tally" ) @@ -40,30 +42,40 @@ type Options struct { // Metrics is a set of metrics that can be used by elements. type Metrics struct { - gaugeValuesOutOfOrder tally.Counter + Gauge GaugeMetrics +} + +// GaugeMetrics is a set of gauge metrics can be used by all gauges. +type GaugeMetrics struct { + valuesOutOfOrder tally.Counter } // NewMetrics is a set of aggregation metrics. func NewMetrics(scope tally.Scope) Metrics { scope = scope.SubScope("aggregation") - gaugeScope := scope.SubScope("gauges") return Metrics{ - gaugeValuesOutOfOrder: gaugeScope.Counter("values-out-of-order"), + Gauge: newGaugeMetrics(scope.SubScope("gauges")), + } +} + +func newGaugeMetrics(scope tally.Scope) GaugeMetrics { + return GaugeMetrics{ + valuesOutOfOrder: scope.Counter("values-out-of-order"), } } -// IncGaugeValuesOutOfOrder increments value or if not initialized is a no-op. -func (m Metrics) IncGaugeValuesOutOfOrder() { - if m.gaugeValuesOutOfOrder != nil { - m.gaugeValuesOutOfOrder.Inc(1) +// IncValuesOutOfOrder increments value or if not initialized is a no-op. +func (m GaugeMetrics) IncValuesOutOfOrder() { + if m.valuesOutOfOrder != nil { + m.valuesOutOfOrder.Inc(1) } } // NewOptions creates a new aggregation options. -func NewOptions(scope tally.Scope) Options { +func NewOptions(instrumentOpts instrument.Options) Options { return Options{ HasExpensiveAggregations: defaultHasExpensiveAggregations, - Metrics: NewMetrics(scope), + Metrics: NewMetrics(instrumentOpts.MetricsScope()), } } diff --git a/src/aggregator/aggregation/options_test.go b/src/aggregator/aggregation/options_test.go index c065b0db6a..e216b045ef 100644 --- a/src/aggregator/aggregation/options_test.go +++ b/src/aggregator/aggregation/options_test.go @@ -24,13 +24,13 @@ import ( "testing" "github.com/m3db/m3/src/metrics/aggregation" + "github.com/m3db/m3/src/x/instrument" "github.com/stretchr/testify/require" - "github.com/uber-go/tally" ) func TestOptions(t *testing.T) { - o := NewOptions(tally.NoopScope) + o := NewOptions(instrument.NewOptions()) require.False(t, o.HasExpensiveAggregations) o.ResetSetData(nil) diff --git a/src/aggregator/aggregation/timer_benchmark_test.go b/src/aggregator/aggregation/timer_benchmark_test.go index 8cff2cfbbd..d494e90af1 100644 --- a/src/aggregator/aggregation/timer_benchmark_test.go +++ b/src/aggregator/aggregation/timer_benchmark_test.go @@ -24,12 +24,11 @@ import ( "testing" "github.com/m3db/m3/src/aggregator/aggregation/quantile/cm" - - "github.com/uber-go/tally" + "github.com/m3db/m3/src/x/instrument" ) func getTimer() Timer { - opts := NewOptions(tally.NoopScope) + opts := NewOptions(instrument.NewOptions()) opts.ResetSetData(testAggTypes) timer := NewTimer(testQuantiles, cm.NewOptions(), opts) diff --git a/src/aggregator/aggregation/timer_test.go b/src/aggregator/aggregation/timer_test.go index 92e12781cb..4fc0850558 100644 --- a/src/aggregator/aggregation/timer_test.go +++ b/src/aggregator/aggregation/timer_test.go @@ -26,10 +26,10 @@ import ( "github.com/m3db/m3/src/aggregator/aggregation/quantile/cm" "github.com/m3db/m3/src/metrics/aggregation" + "github.com/m3db/m3/src/x/instrument" "github.com/m3db/m3/src/x/pool" "github.com/stretchr/testify/require" - "github.com/uber-go/tally" ) var ( @@ -57,13 +57,13 @@ func TestCreateTimerResetStream(t *testing.T) { // Add a value to the timer and close the timer, which returns the // underlying stream to the pool. - timer := NewTimer(testQuantiles, streamOpts, NewOptions(tally.NoopScope)) + timer := NewTimer(testQuantiles, streamOpts, NewOptions(instrument.NewOptions())) timer.Add(1.0) require.Equal(t, 1.0, timer.Min()) timer.Close() // Create a new timer and assert the underlying stream has been closed. - timer = NewTimer(testQuantiles, streamOpts, NewOptions(tally.NoopScope)) + timer = NewTimer(testQuantiles, streamOpts, NewOptions(instrument.NewOptions())) timer.Add(1.0) require.Equal(t, 1.0, timer.Min()) timer.Close() @@ -71,7 +71,7 @@ func TestCreateTimerResetStream(t *testing.T) { } func TestTimerAggregations(t *testing.T) { - opts := NewOptions(tally.NoopScope) + opts := NewOptions(instrument.NewOptions()) opts.ResetSetData(testAggTypes) timer := NewTimer(testQuantiles, cm.NewOptions(), opts) @@ -144,7 +144,7 @@ func TestTimerAggregations(t *testing.T) { } func TestTimerAggregationsNotExpensive(t *testing.T) { - opts := NewOptions(tally.NoopScope) + opts := NewOptions(instrument.NewOptions()) opts.ResetSetData(aggregation.Types{aggregation.Sum}) timer := NewTimer(testQuantiles, cm.NewOptions(), opts) diff --git a/src/aggregator/aggregator/aggregation_test.go b/src/aggregator/aggregator/aggregation_test.go index dab214d093..70bf01b96f 100644 --- a/src/aggregator/aggregator/aggregation_test.go +++ b/src/aggregator/aggregator/aggregation_test.go @@ -28,9 +28,9 @@ import ( "github.com/m3db/m3/src/aggregator/aggregation/quantile/cm" "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/metric/unaggregated" + "github.com/m3db/m3/src/x/instrument" "github.com/stretchr/testify/require" - "github.com/uber-go/tally" ) var ( @@ -55,7 +55,7 @@ var ( ) func TestCounterAggregationAdd(t *testing.T) { - c := newCounterAggregation(aggregation.NewCounter(aggregation.NewOptions(tally.NoopScope))) + c := newCounterAggregation(aggregation.NewCounter(aggregation.NewOptions(instrument.NewOptions()))) for _, v := range testAggregationValues { c.Add(v) } @@ -64,7 +64,7 @@ func TestCounterAggregationAdd(t *testing.T) { } func TestCounterAggregationAddUnion(t *testing.T) { - c := newCounterAggregation(aggregation.NewCounter(aggregation.NewOptions(tally.NoopScope))) + c := newCounterAggregation(aggregation.NewCounter(aggregation.NewOptions(instrument.NewOptions()))) for _, v := range testAggregationUnions { c.AddUnion(v) } @@ -73,7 +73,7 @@ func TestCounterAggregationAddUnion(t *testing.T) { } func TestTimerAggregationAdd(t *testing.T) { - tm := newTimerAggregation(aggregation.NewTimer([]float64{0.5}, cm.NewOptions(), aggregation.NewOptions(tally.NoopScope))) + tm := newTimerAggregation(aggregation.NewTimer([]float64{0.5}, cm.NewOptions(), aggregation.NewOptions(instrument.NewOptions()))) for _, v := range testAggregationValues { tm.Add(v) } @@ -82,7 +82,7 @@ func TestTimerAggregationAdd(t *testing.T) { } func TestTimerAggregationAddUnion(t *testing.T) { - tm := newTimerAggregation(aggregation.NewTimer([]float64{0.5}, cm.NewOptions(), aggregation.NewOptions(tally.NoopScope))) + tm := newTimerAggregation(aggregation.NewTimer([]float64{0.5}, cm.NewOptions(), aggregation.NewOptions(instrument.NewOptions()))) for _, v := range testAggregationUnions { tm.AddUnion(v) } @@ -91,7 +91,7 @@ func TestTimerAggregationAddUnion(t *testing.T) { } func TestGaugeAggregationAdd(t *testing.T) { - g := newGaugeAggregation(aggregation.NewGauge(aggregation.NewOptions(tally.NoopScope))) + g := newGaugeAggregation(aggregation.NewGauge(aggregation.NewOptions(instrument.NewOptions()))) for _, v := range testAggregationValues { g.Add(time.Now(), v) } @@ -100,7 +100,7 @@ func TestGaugeAggregationAdd(t *testing.T) { } func TestGaugeAggregationAddUnion(t *testing.T) { - g := newGaugeAggregation(aggregation.NewGauge(aggregation.NewOptions(tally.NoopScope))) + g := newGaugeAggregation(aggregation.NewGauge(aggregation.NewOptions(instrument.NewOptions()))) for _, v := range testAggregationUnions { g.AddUnion(time.Now(), v) } diff --git a/src/aggregator/aggregator/elem_base.go b/src/aggregator/aggregator/elem_base.go index a8e748a326..af6228fd3a 100644 --- a/src/aggregator/aggregator/elem_base.go +++ b/src/aggregator/aggregator/elem_base.go @@ -173,11 +173,10 @@ type elemBase struct { } func newElemBase(opts Options) elemBase { - scope := opts.InstrumentOptions().MetricsScope() return elemBase{ opts: opts, aggTypesOpts: opts.AggregationTypesOptions(), - aggOpts: raggregation.NewOptions(scope), + aggOpts: raggregation.NewOptions(opts.InstrumentOptions()), } } diff --git a/src/aggregator/integration/integration_data.go b/src/aggregator/integration/integration_data.go index 247dab53b5..8a053f9726 100644 --- a/src/aggregator/integration/integration_data.go +++ b/src/aggregator/integration/integration_data.go @@ -367,7 +367,7 @@ func computeExpectedAggregationBuckets( var ( aggTypeOpts = opts.AggregationTypesOptions() aggTypes = maggregation.NewIDDecompressor().MustDecompress(bucket.key.aggregationID) - aggregationOpts = aggregation.NewOptions(opts.InstrumentOptions().MetricsScope()) + aggregationOpts = aggregation.NewOptions(opts.InstrumentOptions()) ) switch mu.Type() { case metric.CounterType: diff --git a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go index 330537f6dc..d73c02b565 100644 --- a/src/aggregator/integration/multi_server_forwarding_pipeline_test.go +++ b/src/aggregator/integration/multi_server_forwarding_pipeline_test.go @@ -397,8 +397,8 @@ func testMultiServerForwardingPipeline(t *testing.T, discardNaNAggregatedValues continue } currTime := start.Add(time.Duration(i+1) * storagePolicy.Resolution().Window) - scope := aggregatorOpts.InstrumentOptions().MetricsScope() - agg := aggregation.NewGauge(aggregation.NewOptions(scope)) + instrumentOpts := aggregatorOpts.InstrumentOptions() + agg := aggregation.NewGauge(aggregation.NewOptions(instrumentOpts)) agg.Update(time.Now(), expectedValuesList[spIdx][i]) expectedValuesByTimeList[spIdx][currTime.UnixNano()] = agg } From edd8974b45df53b342c0f8703119ef74f910e773 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 9 Mar 2020 18:54:04 -0400 Subject: [PATCH 04/10] Refactor --- src/aggregator/aggregation/gauge_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/aggregator/aggregation/gauge_test.go b/src/aggregator/aggregation/gauge_test.go index bf3505e42b..bcf37e006f 100644 --- a/src/aggregator/aggregation/gauge_test.go +++ b/src/aggregator/aggregation/gauge_test.go @@ -97,8 +97,9 @@ func TestGaugeLastOutOfOrderValues(t *testing.T) { g.Update(timePrePre, 40) require.Equal(t, 43.0, g.Last()) - snap := scope.Snapshot().Counters() - counter, ok := snap["aggregation.gauges.values-out-of-order+"] + snap := scope.Snapshot() + counters := snap.Counters() + counter, ok := counters["aggregation.gauges.values-out-of-order+"] require.True(t, ok) require.Equal(t, int64(2), counter.Value()) } From 63d4a77dacd3c5e7a220f94653291482ffcba30d Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 9 Mar 2020 23:03:27 -0400 Subject: [PATCH 05/10] Fix build --- src/aggregator/aggregation/counter.go | 3 ++- src/aggregator/aggregation/counter_test.go | 5 +++-- src/aggregator/aggregator/aggregation.go | 8 ++++---- src/aggregator/aggregator/aggregation_test.go | 4 ++-- src/aggregator/aggregator/counter_elem_gen.go | 8 ++++---- src/aggregator/aggregator/elem_base_test.go | 4 ++-- src/aggregator/aggregator/elem_test.go | 2 +- src/aggregator/aggregator/generic_elem.go | 10 +++++----- src/aggregator/aggregator/timer_elem_gen.go | 2 +- 9 files changed, 24 insertions(+), 22 deletions(-) diff --git a/src/aggregator/aggregation/counter.go b/src/aggregator/aggregation/counter.go index 9044ad791f..4add082174 100644 --- a/src/aggregator/aggregation/counter.go +++ b/src/aggregator/aggregation/counter.go @@ -22,6 +22,7 @@ package aggregation import ( "math" + "time" "github.com/m3db/m3/src/metrics/aggregation" ) @@ -47,7 +48,7 @@ func NewCounter(opts Options) Counter { } // Update updates the counter value. -func (c *Counter) Update(value int64) { +func (c *Counter) Update(timestamp time.Time, value int64) { c.sum += value c.count++ diff --git a/src/aggregator/aggregation/counter_test.go b/src/aggregator/aggregation/counter_test.go index 8af2bd680d..47492459fe 100644 --- a/src/aggregator/aggregation/counter_test.go +++ b/src/aggregator/aggregation/counter_test.go @@ -22,6 +22,7 @@ package aggregation import ( "testing" + "time" "github.com/m3db/m3/src/metrics/aggregation" "github.com/m3db/m3/src/x/instrument" @@ -33,7 +34,7 @@ func TestCounterDefaultAggregationType(t *testing.T) { c := NewCounter(NewOptions(instrument.NewOptions())) require.False(t, c.HasExpensiveAggregations) for i := 1; i <= 100; i++ { - c.Update(int64(i)) + c.Update(time.Now(), int64(i)) } require.Equal(t, int64(5050), c.Sum()) require.Equal(t, 5050.0, c.ValueOf(aggregation.Sum)) @@ -49,7 +50,7 @@ func TestCounterCustomAggregationType(t *testing.T) { require.True(t, c.HasExpensiveAggregations) for i := 1; i <= 100; i++ { - c.Update(int64(i)) + c.Update(time.Now(), int64(i)) } require.Equal(t, int64(5050), c.Sum()) for aggType := range aggregation.ValidTypes { diff --git a/src/aggregator/aggregator/aggregation.go b/src/aggregator/aggregator/aggregation.go index 61354ed32b..7269537a07 100644 --- a/src/aggregator/aggregator/aggregation.go +++ b/src/aggregator/aggregator/aggregation.go @@ -36,12 +36,12 @@ func newCounterAggregation(c aggregation.Counter) counterAggregation { return counterAggregation{Counter: c} } -func (c *counterAggregation) Add(value float64) { - c.Counter.Update(int64(value)) +func (c *counterAggregation) Add(t time.Time, value float64) { + c.Counter.Update(t, int64(value)) } -func (c *counterAggregation) AddUnion(mu unaggregated.MetricUnion) { - c.Counter.Update(mu.CounterVal) +func (c *counterAggregation) AddUnion(t time.Time, mu unaggregated.MetricUnion) { + c.Counter.Update(t, mu.CounterVal) } // timerAggregation is a timer aggregation. diff --git a/src/aggregator/aggregator/aggregation_test.go b/src/aggregator/aggregator/aggregation_test.go index 70bf01b96f..37b23e5783 100644 --- a/src/aggregator/aggregator/aggregation_test.go +++ b/src/aggregator/aggregator/aggregation_test.go @@ -57,7 +57,7 @@ var ( func TestCounterAggregationAdd(t *testing.T) { c := newCounterAggregation(aggregation.NewCounter(aggregation.NewOptions(instrument.NewOptions()))) for _, v := range testAggregationValues { - c.Add(v) + c.Add(time.Now(), v) } require.Equal(t, int64(4), c.Count()) require.Equal(t, int64(799), c.Sum()) @@ -66,7 +66,7 @@ func TestCounterAggregationAdd(t *testing.T) { func TestCounterAggregationAddUnion(t *testing.T) { c := newCounterAggregation(aggregation.NewCounter(aggregation.NewOptions(instrument.NewOptions()))) for _, v := range testAggregationUnions { - c.AddUnion(v) + c.AddUnion(time.Now(), v) } require.Equal(t, int64(3), c.Count()) require.Equal(t, int64(1234), c.Sum()) diff --git a/src/aggregator/aggregator/counter_elem_gen.go b/src/aggregator/aggregator/counter_elem_gen.go index ff1566a5b8..6758ed22b4 100644 --- a/src/aggregator/aggregator/counter_elem_gen.go +++ b/src/aggregator/aggregator/counter_elem_gen.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -153,7 +153,7 @@ func (e *CounterElem) AddUnion(timestamp time.Time, mu unaggregated.MetricUnion) lockedAgg.Unlock() return errAggregationClosed } - lockedAgg.aggregation.AddUnion(mu) + lockedAgg.aggregation.AddUnion(timestamp, mu) lockedAgg.Unlock() return nil } @@ -170,7 +170,7 @@ func (e *CounterElem) AddValue(timestamp time.Time, value float64) error { lockedAgg.Unlock() return errAggregationClosed } - lockedAgg.aggregation.Add(value) + lockedAgg.aggregation.Add(timestamp, value) lockedAgg.Unlock() return nil } @@ -196,7 +196,7 @@ func (e *CounterElem) AddUnique(timestamp time.Time, values []float64, sourceID } lockedAgg.sourcesSeen.Set(source) for _, v := range values { - lockedAgg.aggregation.Add(v) + lockedAgg.aggregation.Add(timestamp, v) } lockedAgg.Unlock() return nil diff --git a/src/aggregator/aggregator/elem_base_test.go b/src/aggregator/aggregator/elem_base_test.go index 8dbac0777f..228aabeee0 100644 --- a/src/aggregator/aggregator/elem_base_test.go +++ b/src/aggregator/aggregator/elem_base_test.go @@ -168,11 +168,11 @@ func TestCounterElemBase(t *testing.T) { func TestCounterElemBaseNewAggregation(t *testing.T) { e := counterElemBase{} la := e.NewAggregation(nil, raggregation.Options{}) - la.AddUnion(unaggregated.MetricUnion{ + la.AddUnion(time.Now(), unaggregated.MetricUnion{ Type: metric.CounterType, CounterVal: 100, }) - la.AddUnion(unaggregated.MetricUnion{ + la.AddUnion(time.Now(), unaggregated.MetricUnion{ Type: metric.CounterType, CounterVal: 200, }) diff --git a/src/aggregator/aggregator/elem_test.go b/src/aggregator/aggregator/elem_test.go index 3e056c87dc..9818b0cc83 100644 --- a/src/aggregator/aggregator/elem_test.go +++ b/src/aggregator/aggregator/elem_test.go @@ -1804,7 +1804,7 @@ func testCounterElem( e := MustNewCounterElem(testCounterID, testStoragePolicy, aggTypes, pipeline, testNumForwardedTimes, WithPrefixWithSuffix, opts) for i, aligned := range alignedstartAtNanos { counter := &lockedCounterAggregation{aggregation: newCounterAggregation(raggregation.NewCounter(e.aggOpts))} - counter.aggregation.Update(counterVals[i]) + counter.aggregation.Update(time.Unix(0, aligned), counterVals[i]) e.values = append(e.values, timedCounter{ startAtNanos: aligned, lockedAgg: counter, diff --git a/src/aggregator/aggregator/generic_elem.go b/src/aggregator/aggregator/generic_elem.go index a0f61ccab3..49ba630ca2 100644 --- a/src/aggregator/aggregator/generic_elem.go +++ b/src/aggregator/aggregator/generic_elem.go @@ -43,10 +43,10 @@ type typeSpecificAggregation interface { generic.Type // Add adds a new metric value. - Add(value float64) + Add(t time.Time, value float64) // AddUnion adds a new metric value union. - AddUnion(mu unaggregated.MetricUnion) + AddUnion(t time.Time, mu unaggregated.MetricUnion) // ValueOf returns the value for the given aggregation type. ValueOf(aggType maggregation.Type) float64 @@ -207,7 +207,7 @@ func (e *GenericElem) AddUnion(timestamp time.Time, mu unaggregated.MetricUnion) lockedAgg.Unlock() return errAggregationClosed } - lockedAgg.aggregation.AddUnion(mu) + lockedAgg.aggregation.AddUnion(timestamp, mu) lockedAgg.Unlock() return nil } @@ -224,7 +224,7 @@ func (e *GenericElem) AddValue(timestamp time.Time, value float64) error { lockedAgg.Unlock() return errAggregationClosed } - lockedAgg.aggregation.Add(value) + lockedAgg.aggregation.Add(timestamp, value) lockedAgg.Unlock() return nil } @@ -250,7 +250,7 @@ func (e *GenericElem) AddUnique(timestamp time.Time, values []float64, sourceID } lockedAgg.sourcesSeen.Set(source) for _, v := range values { - lockedAgg.aggregation.Add(v) + lockedAgg.aggregation.Add(timestamp, v) } lockedAgg.Unlock() return nil diff --git a/src/aggregator/aggregator/timer_elem_gen.go b/src/aggregator/aggregator/timer_elem_gen.go index e8e28014f5..d70dc0f1e7 100644 --- a/src/aggregator/aggregator/timer_elem_gen.go +++ b/src/aggregator/aggregator/timer_elem_gen.go @@ -1,4 +1,4 @@ -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal From 49bdbc1c7b8cef34cdbfc08938792be03b0b494b Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 9 Mar 2020 23:04:55 -0400 Subject: [PATCH 06/10] Fix integration test build --- src/aggregator/integration/integration_data.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/aggregator/integration/integration_data.go b/src/aggregator/integration/integration_data.go index 8a053f9726..116a404346 100644 --- a/src/aggregator/integration/integration_data.go +++ b/src/aggregator/integration/integration_data.go @@ -423,7 +423,7 @@ func addUntimedMetricToAggregation( switch mu.Type { case metric.CounterType: v := values.(aggregation.Counter) - v.Update(mu.CounterVal) + v.Update(time.Now(), mu.CounterVal) return v, nil case metric.TimerType: v := values.(aggregation.Timer) @@ -445,7 +445,7 @@ func addTimedMetricToAggregation( switch mu.Type { case metric.CounterType: v := values.(aggregation.Counter) - v.Update(int64(mu.Value)) + v.Update(time.Now(), int64(mu.Value)) return v, nil case metric.TimerType: v := values.(aggregation.Timer) @@ -468,7 +468,7 @@ func addForwardedMetricToAggregation( case metric.CounterType: v := values.(aggregation.Counter) for _, val := range mu.Values { - v.Update(int64(val)) + v.Update(time.Now(), int64(val)) } return v, nil case metric.TimerType: From a56bf9bbb658dc60d52e3b9aa6ec079dd065541a Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 9 Mar 2020 23:33:33 -0400 Subject: [PATCH 07/10] Fix build for timer elem generate --- src/aggregator/aggregator/aggregation.go | 24 ++++++++++----------- src/aggregator/aggregator/timer_elem_gen.go | 6 +++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/aggregator/aggregator/aggregation.go b/src/aggregator/aggregator/aggregation.go index 7269537a07..352f496aa7 100644 --- a/src/aggregator/aggregator/aggregation.go +++ b/src/aggregator/aggregator/aggregation.go @@ -36,12 +36,12 @@ func newCounterAggregation(c aggregation.Counter) counterAggregation { return counterAggregation{Counter: c} } -func (c *counterAggregation) Add(t time.Time, value float64) { - c.Counter.Update(t, int64(value)) +func (a *counterAggregation) Add(t time.Time, value float64) { + a.Counter.Update(t, int64(value)) } -func (c *counterAggregation) AddUnion(t time.Time, mu unaggregated.MetricUnion) { - c.Counter.Update(t, mu.CounterVal) +func (a *counterAggregation) AddUnion(t time.Time, mu unaggregated.MetricUnion) { + a.Counter.Update(t, mu.CounterVal) } // timerAggregation is a timer aggregation. @@ -53,12 +53,12 @@ func newTimerAggregation(t aggregation.Timer) timerAggregation { return timerAggregation{Timer: t} } -func (t *timerAggregation) Add(value float64) { - t.Timer.Add(value) +func (a *timerAggregation) Add(_ time.Time, value float64) { + a.Timer.Add(value) } -func (t *timerAggregation) AddUnion(mu unaggregated.MetricUnion) { - t.Timer.AddBatch(mu.BatchTimerVal) +func (a *timerAggregation) AddUnion(_ time.Time, mu unaggregated.MetricUnion) { + a.Timer.AddBatch(mu.BatchTimerVal) } // gaugeAggregation is a gauge aggregation. @@ -70,10 +70,10 @@ func newGaugeAggregation(g aggregation.Gauge) gaugeAggregation { return gaugeAggregation{Gauge: g} } -func (g *gaugeAggregation) Add(t time.Time, value float64) { - g.Gauge.Update(t, value) +func (a *gaugeAggregation) Add(t time.Time, value float64) { + a.Gauge.Update(t, value) } -func (g *gaugeAggregation) AddUnion(t time.Time, mu unaggregated.MetricUnion) { - g.Gauge.Update(t, mu.GaugeVal) +func (a *gaugeAggregation) AddUnion(t time.Time, mu unaggregated.MetricUnion) { + a.Gauge.Update(t, mu.GaugeVal) } diff --git a/src/aggregator/aggregator/timer_elem_gen.go b/src/aggregator/aggregator/timer_elem_gen.go index d70dc0f1e7..40e7dd0df7 100644 --- a/src/aggregator/aggregator/timer_elem_gen.go +++ b/src/aggregator/aggregator/timer_elem_gen.go @@ -153,7 +153,7 @@ func (e *TimerElem) AddUnion(timestamp time.Time, mu unaggregated.MetricUnion) e lockedAgg.Unlock() return errAggregationClosed } - lockedAgg.aggregation.AddUnion(mu) + lockedAgg.aggregation.AddUnion(timestamp, mu) lockedAgg.Unlock() return nil } @@ -170,7 +170,7 @@ func (e *TimerElem) AddValue(timestamp time.Time, value float64) error { lockedAgg.Unlock() return errAggregationClosed } - lockedAgg.aggregation.Add(value) + lockedAgg.aggregation.Add(timestamp, value) lockedAgg.Unlock() return nil } @@ -196,7 +196,7 @@ func (e *TimerElem) AddUnique(timestamp time.Time, values []float64, sourceID ui } lockedAgg.sourcesSeen.Set(source) for _, v := range values { - lockedAgg.aggregation.Add(v) + lockedAgg.aggregation.Add(timestamp, v) } lockedAgg.Unlock() return nil From 34300735a89de7add58a96f671878e8f8c0d1b13 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Mon, 9 Mar 2020 23:41:03 -0400 Subject: [PATCH 08/10] Fix build for unit tests --- src/aggregator/aggregator/aggregation_test.go | 4 ++-- src/aggregator/aggregator/elem_base_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/aggregator/aggregator/aggregation_test.go b/src/aggregator/aggregator/aggregation_test.go index 37b23e5783..8c7b428268 100644 --- a/src/aggregator/aggregator/aggregation_test.go +++ b/src/aggregator/aggregator/aggregation_test.go @@ -75,7 +75,7 @@ func TestCounterAggregationAddUnion(t *testing.T) { func TestTimerAggregationAdd(t *testing.T) { tm := newTimerAggregation(aggregation.NewTimer([]float64{0.5}, cm.NewOptions(), aggregation.NewOptions(instrument.NewOptions()))) for _, v := range testAggregationValues { - tm.Add(v) + tm.Add(time.Now(), v) } require.Equal(t, int64(4), tm.Count()) require.Equal(t, 799.2, tm.Sum()) @@ -84,7 +84,7 @@ func TestTimerAggregationAdd(t *testing.T) { func TestTimerAggregationAddUnion(t *testing.T) { tm := newTimerAggregation(aggregation.NewTimer([]float64{0.5}, cm.NewOptions(), aggregation.NewOptions(instrument.NewOptions()))) for _, v := range testAggregationUnions { - tm.AddUnion(v) + tm.AddUnion(time.Now(), v) } require.Equal(t, int64(5), tm.Count()) require.Equal(t, 18.0, tm.Sum()) diff --git a/src/aggregator/aggregator/elem_base_test.go b/src/aggregator/aggregator/elem_base_test.go index 228aabeee0..1ab9948e21 100644 --- a/src/aggregator/aggregator/elem_base_test.go +++ b/src/aggregator/aggregator/elem_base_test.go @@ -218,11 +218,11 @@ func TestTimerElemBase(t *testing.T) { func TestTimerElemBaseNewAggregation(t *testing.T) { e := timerElemBase{} la := e.NewAggregation(NewOptions(), raggregation.Options{}) - la.AddUnion(unaggregated.MetricUnion{ + la.AddUnion(time.Now(), unaggregated.MetricUnion{ Type: metric.TimerType, BatchTimerVal: []float64{100.0, 200.0}, }) - la.AddUnion(unaggregated.MetricUnion{ + la.AddUnion(time.Now(), unaggregated.MetricUnion{ Type: metric.TimerType, BatchTimerVal: []float64{300.0, 400.0, 500.0}, }) From a512f8c37932687baf2e8406ed35ed0d70c2cdf1 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 10 Mar 2020 00:23:52 -0400 Subject: [PATCH 09/10] Print logs if test fails --- scripts/docker-integration-tests/prometheus/test.sh | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index fc02f05d69..efc4da5665 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -21,7 +21,14 @@ echo "Run m3dbnode and m3coordinator containers" docker-compose -f ${COMPOSE_FILE} up -d dbnode01 docker-compose -f ${COMPOSE_FILE} up -d coordinator01 +TEST_SUCCESS=false + function defer { + if [[ "$TEST_SUCCESS" != "true" ]]; then + echo "Test failure, printing docker-compose logs" + docker-compose logs + fi + docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes } trap defer EXIT @@ -191,3 +198,5 @@ test_query_restrict_metrics_type echo "Running function correctness tests" test_correctness + +TEST_SUCCESS=true From 6364c0c61b2eeb12cd9774cac56054fde7592fe2 Mon Sep 17 00:00:00 2001 From: Rob Skillington Date: Tue, 10 Mar 2020 01:10:02 -0400 Subject: [PATCH 10/10] Print logs with docker-compose file specified --- scripts/docker-integration-tests/prometheus/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/docker-integration-tests/prometheus/test.sh b/scripts/docker-integration-tests/prometheus/test.sh index efc4da5665..450154cae4 100755 --- a/scripts/docker-integration-tests/prometheus/test.sh +++ b/scripts/docker-integration-tests/prometheus/test.sh @@ -26,7 +26,7 @@ TEST_SUCCESS=false function defer { if [[ "$TEST_SUCCESS" != "true" ]]; then echo "Test failure, printing docker-compose logs" - docker-compose logs + docker-compose -f ${COMPOSE_FILE} logs fi docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes