Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AddToReset config to change Add transforms to Reset transforms #2817

Merged
merged 2 commits into from
Oct 29, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/aggregator/aggregator/elem_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ import (
"github.com/m3db/m3/src/metrics/policy"
"github.com/m3db/m3/src/metrics/transformation"
"github.com/m3db/m3/src/x/pool"
"go.uber.org/zap"

"github.com/willf/bitset"
"go.uber.org/zap"
)

const (
Expand Down Expand Up @@ -166,6 +166,7 @@ type elemBase struct {
idPrefixSuffixType IDPrefixSuffixType
writeForwardedMetricFn writeForwardedMetricFn
onForwardedAggregationWrittenFn onForwardedAggregationDoneFn
addToReset bool

// Mutable states.
tombstoned bool
Expand All @@ -179,6 +180,7 @@ func newElemBase(opts Options) elemBase {
opts: opts,
aggTypesOpts: opts.AggregationTypesOptions(),
aggOpts: raggregation.NewOptions(opts.InstrumentOptions()),
addToReset: opts.AddToReset(),
}
}

Expand All @@ -198,6 +200,13 @@ func (e *elemBase) resetSetData(
l.Error("error parsing pipeline", zap.Error(err))
return err
}
if e.addToReset {
for i := range parsed.Transformations {
if parsed.Transformations[i].Type() == transformation.Add {
parsed.Transformations[i], _ = transformation.Reset.NewOp()
}
}
}
e.id = id
e.sp = sp
e.aggTypes = aggTypes
Expand Down
22 changes: 22 additions & 0 deletions src/aggregator/aggregator/elem_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,3 +572,25 @@ func TestParsePipelineTransformationDerivativeOrderTooHigh(t *testing.T) {
require.Error(t, err)
require.True(t, strings.Contains(err.Error(), "transformation derivative order is 2 higher than supported 1"))
}

func TestAddToRestOption(t *testing.T) {
p := applied.NewPipeline([]applied.OpUnion{
{
Type: pipeline.TransformationOpType,
Transformation: pipeline.TransformationOp{Type: transformation.Add},
},
{
Type: pipeline.TransformationOpType,
Transformation: pipeline.TransformationOp{Type: transformation.Increase},
},
})
e := newElemBase(NewOptions().SetAddToReset(false))
e.resetSetData(testCounterID, testStoragePolicy, testAggregationTypesExpensive, false, p, 3, WithPrefixWithSuffix)
require.Equal(t, transformation.Add, e.parsedPipeline.Transformations[0].Type())
require.Equal(t, transformation.Increase, e.parsedPipeline.Transformations[1].Type())

e = newElemBase(NewOptions().SetAddToReset(true))
e.resetSetData(testCounterID, testStoragePolicy, testAggregationTypesExpensive, false, p, 3, WithPrefixWithSuffix)
require.Equal(t, transformation.Reset, e.parsedPipeline.Transformations[0].Type())
require.Equal(t, transformation.Increase, e.parsedPipeline.Transformations[1].Type())
}
19 changes: 19 additions & 0 deletions src/aggregator/aggregator/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,14 @@ type Options interface {

// VerboseErrors returns whether to return verbose errors or not.
VerboseErrors() bool

// SetAddToReset sets the value for AddToReset.
SetAddToReset(value bool) Options

// AddToReset changes Add transforms to Reset Transforms.
// This is a temporary option to help with the seamless rollout of changing Add transforms to Reset transforms for
// resetting aggregate counters. Once rollup rules have changed to use Reset explicitly, this can be removed.
AddToReset() bool
}

type options struct {
Expand Down Expand Up @@ -354,6 +362,7 @@ type options struct {
timerElemPool TimerElemPool
gaugeElemPool GaugeElemPool
verboseErrors bool
addToReset bool

// Derived options.
fullCounterPrefix []byte
Expand Down Expand Up @@ -840,6 +849,16 @@ func (o *options) computeFullGaugePrefix() {
o.fullGaugePrefix = fullGaugePrefix
}

func (o *options) AddToReset() bool {
return o.addToReset
}

func (o *options) SetAddToReset(value bool) Options {
opts := *o
opts.addToReset = value
return &opts
}

func defaultMaxAllowedForwardingDelayFn(
resolution time.Duration,
numForwardedTimes int,
Expand Down
6 changes: 5 additions & 1 deletion src/cmd/services/m3aggregator/config/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,9 @@ type AggregatorConfiguration struct {

// Pool of entries.
EntryPool pool.ObjectPoolConfiguration `yaml:"entryPool"`

// AddToReset is the yaml config for aggregator.Options.AddToReset
AddToReset bool `yaml:"addToReset"`
}

// InstanceIDType is the instance ID type that defines how the
Expand Down Expand Up @@ -258,7 +261,8 @@ func (c *AggregatorConfiguration) NewAggregatorOptions(
opts := aggregator.NewOptions().
SetInstrumentOptions(instrumentOpts).
SetRuntimeOptionsManager(runtimeOptsManager).
SetVerboseErrors(c.VerboseErrors)
SetVerboseErrors(c.VerboseErrors).
SetAddToReset(c.AddToReset)

rwOpts := serveOpts.RWOptions()
if rwOpts == nil {
Expand Down