Skip to content

Commit

Permalink
[aggregator] A flag to enable FlushOffset for timed metrics (#3362)
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm authored Mar 17, 2021
1 parent 31d3bf8 commit 541aba8
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 4 deletions.
5 changes: 4 additions & 1 deletion src/aggregator/aggregator/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,10 @@ func (l *timedMetricList) ID() metricListID {
}

func (l *timedMetricList) FixedFlushOffset() (time.Duration, bool) {
return l.flushOffset, true
if l.opts.TimedMetricsFlushOffsetEnabled() {
return l.flushOffset, true
}
return 0, false
}

func (l *timedMetricList) Close() {
Expand Down
23 changes: 21 additions & 2 deletions src/aggregator/aggregator/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,15 +485,17 @@ func TestTimedMetricListID(t *testing.T) {
require.Equal(t, expectedListID, l.ID())
}

func TestTimedMetricListFlushOffset(t *testing.T) {
func TestTimedMetricListFlushOffsetEnabled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

resolution := 10 * time.Second
bufferForPastTimedMetricFn := func(resolution time.Duration) time.Duration {
return resolution + 3*time.Second
}
opts := testOptions(ctrl).SetBufferForPastTimedMetricFn(bufferForPastTimedMetricFn)
opts := testOptions(ctrl).
SetBufferForPastTimedMetricFn(bufferForPastTimedMetricFn).
SetTimedMetricsFlushOffsetEnabled(true)
listID := timedMetricListID{resolution: resolution}

l, err := newTimedMetricList(testShard, listID, opts)
Expand All @@ -504,6 +506,23 @@ func TestTimedMetricListFlushOffset(t *testing.T) {
require.Equal(t, 3*time.Second, offset)
}

func TestTimedMetricListFlushOffsetDisabled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

resolution := 10 * time.Second
opts := testOptions(ctrl).
SetTimedMetricsFlushOffsetEnabled(false)
listID := timedMetricListID{resolution: resolution}

l, err := newTimedMetricList(testShard, listID, opts)
require.NoError(t, err)

offset, ok := l.FixedFlushOffset()
require.False(t, ok)
require.Zero(t, offset)
}

func TestTimedMetricListFlushConsumingAndCollectingTimedMetrics(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
17 changes: 17 additions & 0 deletions src/aggregator/aggregator/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ type Options interface {
// This is a temporary option to help with the seamless rollout of changing Add transforms to Reset transforms for
// resetting aggregate counters. Once rollup rules have changed to use Reset explicitly, this can be removed.
AddToReset() bool

// TimedMetricsFlushOffsetEnabled returns true if using of FlushOffset for timed metrics is enabled.
TimedMetricsFlushOffsetEnabled() bool

// SetTimedMetricsFlushOffsetEnabled controls using of FlushOffset for timed metrics.
SetTimedMetricsFlushOffsetEnabled(bool) Options
}

type options struct {
Expand Down Expand Up @@ -370,6 +376,7 @@ type options struct {
gaugeElemPool GaugeElemPool
verboseErrors bool
addToReset bool
timedMetricsFlushOffsetEnabled bool

// Derived options.
fullCounterPrefix []byte
Expand Down Expand Up @@ -878,6 +885,16 @@ func (o *options) SetAddToReset(value bool) Options {
return &opts
}

func (o *options) TimedMetricsFlushOffsetEnabled() bool {
return o.timedMetricsFlushOffsetEnabled
}

func (o *options) SetTimedMetricsFlushOffsetEnabled(value bool) Options {
opts := *o
opts.timedMetricsFlushOffsetEnabled = value
return &opts
}

func defaultMaxAllowedForwardingDelayFn(
resolution time.Duration,
numForwardedTimes int,
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/services/m3aggregator/config/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type AggregatorConfiguration struct {

// AddToReset is the yaml config for aggregator.Options.AddToReset
AddToReset bool `yaml:"addToReset"`

// TimedMetricsFlushOffsetEnabled enables using FlushOffset for timed metrics.
TimedMetricsFlushOffsetEnabled bool `yaml:"timedMetricsFlushOffsetEnabled"`
}

// InstanceIDType is the instance ID type that defines how the
Expand Down Expand Up @@ -258,7 +261,8 @@ func (c *AggregatorConfiguration) NewAggregatorOptions(
SetInstrumentOptions(instrumentOpts).
SetRuntimeOptionsManager(runtimeOptsManager).
SetVerboseErrors(c.VerboseErrors).
SetAddToReset(c.AddToReset)
SetAddToReset(c.AddToReset).
SetTimedMetricsFlushOffsetEnabled(c.TimedMetricsFlushOffsetEnabled)

rwOpts := serveOpts.RWOptions()
if rwOpts == nil {
Expand Down

0 comments on commit 541aba8

Please sign in to comment.