Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[aggregator] A flag to enable FlushOffset for timed metrics #3362

Merged
merged 3 commits into from
Mar 17, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/aggregator/aggregator/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,10 @@ func (l *timedMetricList) ID() metricListID {
}

func (l *timedMetricList) FixedFlushOffset() (time.Duration, bool) {
return l.flushOffset, true
if l.opts.TimedMetricsFlushOffsetEnabled() {
return l.flushOffset, true
}
return 0, false
}

func (l *timedMetricList) Close() {
Expand Down
23 changes: 21 additions & 2 deletions src/aggregator/aggregator/list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,15 +485,17 @@ func TestTimedMetricListID(t *testing.T) {
require.Equal(t, expectedListID, l.ID())
}

func TestTimedMetricListFlushOffset(t *testing.T) {
func TestTimedMetricListFlushOffsetEnabled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

resolution := 10 * time.Second
bufferForPastTimedMetricFn := func(resolution time.Duration) time.Duration {
return resolution + 3*time.Second
}
opts := testOptions(ctrl).SetBufferForPastTimedMetricFn(bufferForPastTimedMetricFn)
opts := testOptions(ctrl).
SetBufferForPastTimedMetricFn(bufferForPastTimedMetricFn).
SetTimedMetricsFlushOffsetEnabled(true)
listID := timedMetricListID{resolution: resolution}

l, err := newTimedMetricList(testShard, listID, opts)
Expand All @@ -504,6 +506,23 @@ func TestTimedMetricListFlushOffset(t *testing.T) {
require.Equal(t, 3*time.Second, offset)
}

func TestTimedMetricListFlushOffsetDisabled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

resolution := 10 * time.Second
opts := testOptions(ctrl).
SetTimedMetricsFlushOffsetEnabled(false)
listID := timedMetricListID{resolution: resolution}

l, err := newTimedMetricList(testShard, listID, opts)
require.NoError(t, err)

offset, ok := l.FixedFlushOffset()
require.False(t, ok)
require.Zero(t, offset)
}

func TestTimedMetricListFlushConsumingAndCollectingTimedMetrics(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
17 changes: 17 additions & 0 deletions src/aggregator/aggregator/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ type Options interface {
// This is a temporary option to help with the seamless rollout of changing Add transforms to Reset transforms for
// resetting aggregate counters. Once rollup rules have changed to use Reset explicitly, this can be removed.
AddToReset() bool

// TimedMetricsFlushOffsetEnabled returns true if using of FlushOffset for timed metrics is enabled.
TimedMetricsFlushOffsetEnabled() bool

// SetTimedMetricsFlushOffsetEnabled controls using of FlushOffset for timed metrics.
SetTimedMetricsFlushOffsetEnabled(bool) Options
}

type options struct {
Expand Down Expand Up @@ -370,6 +376,7 @@ type options struct {
gaugeElemPool GaugeElemPool
verboseErrors bool
addToReset bool
timedMetricsFlushOffsetEnabled bool

// Derived options.
fullCounterPrefix []byte
Expand Down Expand Up @@ -878,6 +885,16 @@ func (o *options) SetAddToReset(value bool) Options {
return &opts
}

func (o *options) TimedMetricsFlushOffsetEnabled() bool {
return o.timedMetricsFlushOffsetEnabled
}

func (o *options) SetTimedMetricsFlushOffsetEnabled(value bool) Options {
opts := *o
opts.timedMetricsFlushOffsetEnabled = value
return &opts
}

func defaultMaxAllowedForwardingDelayFn(
resolution time.Duration,
numForwardedTimes int,
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/services/m3aggregator/config/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,9 @@ type AggregatorConfiguration struct {

// AddToReset is the yaml config for aggregator.Options.AddToReset
AddToReset bool `yaml:"addToReset"`

// TimedMetricsFlushOffsetEnabled enables using FlushOffset for timed metrics.
TimedMetricsFlushOffsetEnabled bool `yaml:"timedMetricsFlushOffsetEnabled"`
}

// InstanceIDType is the instance ID type that defines how the
Expand Down Expand Up @@ -258,7 +261,8 @@ func (c *AggregatorConfiguration) NewAggregatorOptions(
SetInstrumentOptions(instrumentOpts).
SetRuntimeOptionsManager(runtimeOptsManager).
SetVerboseErrors(c.VerboseErrors).
SetAddToReset(c.AddToReset)
SetAddToReset(c.AddToReset).
SetTimedMetricsFlushOffsetEnabled(c.TimedMetricsFlushOffsetEnabled)

rwOpts := serveOpts.RWOptions()
if rwOpts == nil {
Expand Down