diff --git a/src/aggregator/aggregator/list.go b/src/aggregator/aggregator/list.go index 29d1847267..ef0a9c2eb3 100644 --- a/src/aggregator/aggregator/list.go +++ b/src/aggregator/aggregator/list.go @@ -761,7 +761,10 @@ func (l *timedMetricList) ID() metricListID { } func (l *timedMetricList) FixedFlushOffset() (time.Duration, bool) { - return l.flushOffset, true + if l.opts.TimedMetricsFlushOffsetEnabled() { + return l.flushOffset, true + } + return 0, false } func (l *timedMetricList) Close() { diff --git a/src/aggregator/aggregator/list_test.go b/src/aggregator/aggregator/list_test.go index cdab066aa2..ce7aeb6ac1 100644 --- a/src/aggregator/aggregator/list_test.go +++ b/src/aggregator/aggregator/list_test.go @@ -485,7 +485,7 @@ func TestTimedMetricListID(t *testing.T) { require.Equal(t, expectedListID, l.ID()) } -func TestTimedMetricListFlushOffset(t *testing.T) { +func TestTimedMetricListFlushOffsetEnabled(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -493,7 +493,9 @@ func TestTimedMetricListFlushOffset(t *testing.T) { bufferForPastTimedMetricFn := func(resolution time.Duration) time.Duration { return resolution + 3*time.Second } - opts := testOptions(ctrl).SetBufferForPastTimedMetricFn(bufferForPastTimedMetricFn) + opts := testOptions(ctrl). + SetBufferForPastTimedMetricFn(bufferForPastTimedMetricFn). + SetTimedMetricsFlushOffsetEnabled(true) listID := timedMetricListID{resolution: resolution} l, err := newTimedMetricList(testShard, listID, opts) @@ -504,6 +506,23 @@ func TestTimedMetricListFlushOffset(t *testing.T) { require.Equal(t, 3*time.Second, offset) } +func TestTimedMetricListFlushOffsetDisabled(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + resolution := 10 * time.Second + opts := testOptions(ctrl). + SetTimedMetricsFlushOffsetEnabled(false) + listID := timedMetricListID{resolution: resolution} + + l, err := newTimedMetricList(testShard, listID, opts) + require.NoError(t, err) + + offset, ok := l.FixedFlushOffset() + require.False(t, ok) + require.Zero(t, offset) +} + func TestTimedMetricListFlushConsumingAndCollectingTimedMetrics(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/src/aggregator/aggregator/options.go b/src/aggregator/aggregator/options.go index 2957ae6143..b80d16b882 100644 --- a/src/aggregator/aggregator/options.go +++ b/src/aggregator/aggregator/options.go @@ -328,6 +328,12 @@ type Options interface { // This is a temporary option to help with the seamless rollout of changing Add transforms to Reset transforms for // resetting aggregate counters. Once rollup rules have changed to use Reset explicitly, this can be removed. AddToReset() bool + + // TimedMetricsFlushOffsetEnabled returns true if using of FlushOffset for timed metrics is enabled. + TimedMetricsFlushOffsetEnabled() bool + + // SetTimedMetricsFlushOffsetEnabled controls using of FlushOffset for timed metrics. + SetTimedMetricsFlushOffsetEnabled(bool) Options } type options struct { @@ -370,6 +376,7 @@ type options struct { gaugeElemPool GaugeElemPool verboseErrors bool addToReset bool + timedMetricsFlushOffsetEnabled bool // Derived options. fullCounterPrefix []byte @@ -878,6 +885,16 @@ func (o *options) SetAddToReset(value bool) Options { return &opts } +func (o *options) TimedMetricsFlushOffsetEnabled() bool { + return o.timedMetricsFlushOffsetEnabled +} + +func (o *options) SetTimedMetricsFlushOffsetEnabled(value bool) Options { + opts := *o + opts.timedMetricsFlushOffsetEnabled = value + return &opts +} + func defaultMaxAllowedForwardingDelayFn( resolution time.Duration, numForwardedTimes int, diff --git a/src/cmd/services/m3aggregator/config/aggregator.go b/src/cmd/services/m3aggregator/config/aggregator.go index cf18fe6080..0f62b11afa 100644 --- a/src/cmd/services/m3aggregator/config/aggregator.go +++ b/src/cmd/services/m3aggregator/config/aggregator.go @@ -174,6 +174,9 @@ type AggregatorConfiguration struct { // AddToReset is the yaml config for aggregator.Options.AddToReset AddToReset bool `yaml:"addToReset"` + + // TimedMetricsFlushOffsetEnabled enables using FlushOffset for timed metrics. + TimedMetricsFlushOffsetEnabled bool `yaml:"timedMetricsFlushOffsetEnabled"` } // InstanceIDType is the instance ID type that defines how the @@ -258,7 +261,8 @@ func (c *AggregatorConfiguration) NewAggregatorOptions( SetInstrumentOptions(instrumentOpts). SetRuntimeOptionsManager(runtimeOptsManager). SetVerboseErrors(c.VerboseErrors). - SetAddToReset(c.AddToReset) + SetAddToReset(c.AddToReset). + SetTimedMetricsFlushOffsetEnabled(c.TimedMetricsFlushOffsetEnabled) rwOpts := serveOpts.RWOptions() if rwOpts == nil {