Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] Take last value by value timestamp not arrival time #2199

Merged
merged 11 commits into from
Mar 10, 2020
9 changes: 9 additions & 0 deletions scripts/docker-integration-tests/prometheus/test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,14 @@ echo "Run m3dbnode and m3coordinator containers"
docker-compose -f ${COMPOSE_FILE} up -d dbnode01
docker-compose -f ${COMPOSE_FILE} up -d coordinator01

TEST_SUCCESS=false

function defer {
if [[ "$TEST_SUCCESS" != "true" ]]; then
echo "Test failure, printing docker-compose logs"
docker-compose -f ${COMPOSE_FILE} logs
fi

docker-compose -f ${COMPOSE_FILE} down || echo "unable to shutdown containers" # CI fails to stop all containers sometimes
}
trap defer EXIT
Expand Down Expand Up @@ -191,3 +198,5 @@ test_query_restrict_metrics_type

echo "Running function correctness tests"
test_correctness

TEST_SUCCESS=true
3 changes: 2 additions & 1 deletion src/aggregator/aggregation/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package aggregation

import (
"math"
"time"

"github.com/m3db/m3/src/metrics/aggregation"
)
Expand All @@ -47,7 +48,7 @@ func NewCounter(opts Options) Counter {
}

// Update updates the counter value.
func (c *Counter) Update(value int64) {
func (c *Counter) Update(timestamp time.Time, value int64) {
c.sum += value

c.count++
Expand Down
10 changes: 6 additions & 4 deletions src/aggregator/aggregation/counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,19 @@ package aggregation

import (
"testing"
"time"

"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/x/instrument"

"github.com/stretchr/testify/require"
)

func TestCounterDefaultAggregationType(t *testing.T) {
c := NewCounter(NewOptions())
c := NewCounter(NewOptions(instrument.NewOptions()))
require.False(t, c.HasExpensiveAggregations)
for i := 1; i <= 100; i++ {
c.Update(int64(i))
c.Update(time.Now(), int64(i))
}
require.Equal(t, int64(5050), c.Sum())
require.Equal(t, 5050.0, c.ValueOf(aggregation.Sum))
Expand All @@ -41,14 +43,14 @@ func TestCounterDefaultAggregationType(t *testing.T) {
}

func TestCounterCustomAggregationType(t *testing.T) {
opts := NewOptions()
opts := NewOptions(instrument.NewOptions())
opts.HasExpensiveAggregations = true

c := NewCounter(opts)
require.True(t, c.HasExpensiveAggregations)

for i := 1; i <= 100; i++ {
c.Update(int64(i))
c.Update(time.Now(), int64(i))
}
require.Equal(t, int64(5050), c.Sum())
for aggType := range aggregation.ValidTypes {
Expand Down
26 changes: 18 additions & 8 deletions src/aggregator/aggregation/gauge.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package aggregation

import (
"math"
"time"

"github.com/m3db/m3/src/metrics/aggregation"
)
Expand All @@ -34,12 +35,13 @@ const (
type Gauge struct {
Options

last float64
sum float64
sumSq float64
count int64
max float64
min float64
last float64
lastAt time.Time
sum float64
sumSq float64
count int64
max float64
min float64
}

// NewGauge creates a new gauge.
Expand All @@ -52,8 +54,16 @@ func NewGauge(opts Options) Gauge {
}

// Update updates the gauge value.
func (g *Gauge) Update(value float64) {
g.last = value
func (g *Gauge) Update(timestamp time.Time, value float64) {
if g.lastAt.IsZero() || timestamp.After(g.lastAt) {
robskillington marked this conversation as resolved.
Show resolved Hide resolved
// NB(r): Only set the last value if this value arrives
// after the wall clock timestamp of previous values, not
// the arrival time (i.e. order received).
g.last = value
g.lastAt = timestamp
} else {
g.Options.Metrics.Gauge.IncValuesOutOfOrder()
}

g.sum += value
g.count++
Expand Down
33 changes: 29 additions & 4 deletions src/aggregator/aggregation/gauge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,20 @@ package aggregation

import (
"testing"
"time"

"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/x/instrument"

"github.com/stretchr/testify/require"
"github.com/uber-go/tally"
)

func TestGaugeDefaultAggregationType(t *testing.T) {
g := NewGauge(NewOptions())
g := NewGauge(NewOptions(instrument.NewOptions()))
require.False(t, g.HasExpensiveAggregations)
for i := 1.0; i <= 100.0; i++ {
g.Update(i)
g.Update(time.Now(), i)
}
require.Equal(t, 100.0, g.Last())
require.Equal(t, 100.0, g.ValueOf(aggregation.Last))
Expand All @@ -42,14 +45,14 @@ func TestGaugeDefaultAggregationType(t *testing.T) {
}

func TestGaugeCustomAggregationType(t *testing.T) {
opts := NewOptions()
opts := NewOptions(instrument.NewOptions())
opts.HasExpensiveAggregations = true

g := NewGauge(opts)
require.True(t, g.HasExpensiveAggregations)

for i := 1; i <= 100; i++ {
g.Update(float64(i))
g.Update(time.Now(), float64(i))
}

require.Equal(t, 100.0, g.Last())
Expand Down Expand Up @@ -78,3 +81,25 @@ func TestGaugeCustomAggregationType(t *testing.T) {
}
}
}

func TestGaugeLastOutOfOrderValues(t *testing.T) {
scope := tally.NewTestScope("", nil)
g := NewGauge(NewOptions(instrument.NewOptions().SetMetricsScope(scope)))

timeMid := time.Now().Add(time.Minute)
timePre := timeMid.Add(-1 * time.Second)
timePrePre := timeMid.Add(-1 * time.Second)
timeAfter := timeMid.Add(time.Second)

g.Update(timeMid, 42)
g.Update(timePre, 41)
g.Update(timeAfter, 43)
g.Update(timePrePre, 40)

require.Equal(t, 43.0, g.Last())
snap := scope.Snapshot()
counters := snap.Counters()
counter, ok := counters["aggregation.gauges.values-out-of-order+"]
require.True(t, ok)
require.Equal(t, int64(2), counter.Value())
}
49 changes: 45 additions & 4 deletions src/aggregator/aggregation/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,63 @@

package aggregation

import "github.com/m3db/m3/src/metrics/aggregation"
import (
"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/x/instrument"

"github.com/uber-go/tally"
)

var (
defaultOptions Options
defaultHasExpensiveAggregations = false
)

// Options is the options for aggregations.
type Options struct {
// HasExpensiveAggregations means expensive (multiplication/division)
// aggregation types are enabled.
HasExpensiveAggregations bool
// Metrics is as set of aggregation metrics.
Metrics Metrics
}

// Metrics is a set of metrics that can be used by elements.
type Metrics struct {
Gauge GaugeMetrics
}

// GaugeMetrics is a set of gauge metrics can be used by all gauges.
type GaugeMetrics struct {
valuesOutOfOrder tally.Counter
}

// NewMetrics is a set of aggregation metrics.
func NewMetrics(scope tally.Scope) Metrics {
scope = scope.SubScope("aggregation")
return Metrics{
Gauge: newGaugeMetrics(scope.SubScope("gauges")),
}
}

func newGaugeMetrics(scope tally.Scope) GaugeMetrics {
return GaugeMetrics{
valuesOutOfOrder: scope.Counter("values-out-of-order"),
}
}

// IncValuesOutOfOrder increments value or if not initialized is a no-op.
func (m GaugeMetrics) IncValuesOutOfOrder() {
if m.valuesOutOfOrder != nil {
m.valuesOutOfOrder.Inc(1)
}
}

// NewOptions creates a new aggregation options.
func NewOptions() Options {
return defaultOptions
func NewOptions(instrumentOpts instrument.Options) Options {
return Options{
HasExpensiveAggregations: defaultHasExpensiveAggregations,
Metrics: NewMetrics(instrumentOpts.MetricsScope()),
}
}

// ResetSetData resets the aggregation options.
Expand Down
3 changes: 2 additions & 1 deletion src/aggregator/aggregation/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (
"testing"

"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/x/instrument"

"github.com/stretchr/testify/require"
)

func TestOptions(t *testing.T) {
o := NewOptions()
o := NewOptions(instrument.NewOptions())
require.False(t, o.HasExpensiveAggregations)

o.ResetSetData(nil)
Expand Down
3 changes: 2 additions & 1 deletion src/aggregator/aggregation/timer_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import (
"testing"

"github.com/m3db/m3/src/aggregator/aggregation/quantile/cm"
"github.com/m3db/m3/src/x/instrument"
)

func getTimer() Timer {
opts := NewOptions()
opts := NewOptions(instrument.NewOptions())
opts.ResetSetData(testAggTypes)

timer := NewTimer(testQuantiles, cm.NewOptions(), opts)
Expand Down
9 changes: 5 additions & 4 deletions src/aggregator/aggregation/timer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/m3db/m3/src/aggregator/aggregation/quantile/cm"
"github.com/m3db/m3/src/metrics/aggregation"
"github.com/m3db/m3/src/x/instrument"
"github.com/m3db/m3/src/x/pool"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -56,21 +57,21 @@ func TestCreateTimerResetStream(t *testing.T) {

// Add a value to the timer and close the timer, which returns the
// underlying stream to the pool.
timer := NewTimer(testQuantiles, streamOpts, NewOptions())
timer := NewTimer(testQuantiles, streamOpts, NewOptions(instrument.NewOptions()))
timer.Add(1.0)
require.Equal(t, 1.0, timer.Min())
timer.Close()

// Create a new timer and assert the underlying stream has been closed.
timer = NewTimer(testQuantiles, streamOpts, NewOptions())
timer = NewTimer(testQuantiles, streamOpts, NewOptions(instrument.NewOptions()))
timer.Add(1.0)
require.Equal(t, 1.0, timer.Min())
timer.Close()
require.Equal(t, 0.0, timer.stream.Min())
}

func TestTimerAggregations(t *testing.T) {
opts := NewOptions()
opts := NewOptions(instrument.NewOptions())
opts.ResetSetData(testAggTypes)

timer := NewTimer(testQuantiles, cm.NewOptions(), opts)
Expand Down Expand Up @@ -143,7 +144,7 @@ func TestTimerAggregations(t *testing.T) {
}

func TestTimerAggregationsNotExpensive(t *testing.T) {
opts := NewOptions()
opts := NewOptions(instrument.NewOptions())
opts.ResetSetData(aggregation.Types{aggregation.Sum})

timer := NewTimer(testQuantiles, cm.NewOptions(), opts)
Expand Down
39 changes: 31 additions & 8 deletions src/aggregator/aggregator/aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
package aggregator

import (
"time"

"github.com/m3db/m3/src/aggregator/aggregation"
"github.com/m3db/m3/src/metrics/metric/unaggregated"
)
Expand All @@ -34,23 +36,44 @@ func newCounterAggregation(c aggregation.Counter) counterAggregation {
return counterAggregation{Counter: c}
}

func (c *counterAggregation) Add(value float64) { c.Counter.Update(int64(value)) }
func (c *counterAggregation) AddUnion(mu unaggregated.MetricUnion) { c.Counter.Update(mu.CounterVal) }
func (a *counterAggregation) Add(t time.Time, value float64) {
a.Counter.Update(t, int64(value))
}

func (a *counterAggregation) AddUnion(t time.Time, mu unaggregated.MetricUnion) {
a.Counter.Update(t, mu.CounterVal)
}

// timerAggregation is a timer aggregation.
type timerAggregation struct {
aggregation.Timer
}

func newTimerAggregation(t aggregation.Timer) timerAggregation { return timerAggregation{Timer: t} }
func (t *timerAggregation) Add(value float64) { t.Timer.Add(value) }
func (t *timerAggregation) AddUnion(mu unaggregated.MetricUnion) { t.Timer.AddBatch(mu.BatchTimerVal) }
func newTimerAggregation(t aggregation.Timer) timerAggregation {
return timerAggregation{Timer: t}
}

func (a *timerAggregation) Add(_ time.Time, value float64) {
a.Timer.Add(value)
}

func (a *timerAggregation) AddUnion(_ time.Time, mu unaggregated.MetricUnion) {
a.Timer.AddBatch(mu.BatchTimerVal)
}

// gaugeAggregation is a gauge aggregation.
type gaugeAggregation struct {
aggregation.Gauge
}

func newGaugeAggregation(g aggregation.Gauge) gaugeAggregation { return gaugeAggregation{Gauge: g} }
func (g *gaugeAggregation) Add(value float64) { g.Gauge.Update(value) }
func (g *gaugeAggregation) AddUnion(mu unaggregated.MetricUnion) { g.Gauge.Update(mu.GaugeVal) }
func newGaugeAggregation(g aggregation.Gauge) gaugeAggregation {
return gaugeAggregation{Gauge: g}
}

func (a *gaugeAggregation) Add(t time.Time, value float64) {
a.Gauge.Update(t, value)
}

func (a *gaugeAggregation) AddUnion(t time.Time, mu unaggregated.MetricUnion) {
a.Gauge.Update(t, mu.GaugeVal)
}
Loading