From 3750549cc07c69c9833ce7110928d5bac1b5bc52 Mon Sep 17 00:00:00 2001 From: Ryan Hall Date: Thu, 29 Oct 2020 10:41:16 -0700 Subject: [PATCH] AddToReset config to change Add transforms to Reset transforms This is a temporary option to allow seamlessly rolling out Add -> Reset transforms. With this we can deploy the new aggregator with the option without experiencing another degradation in aggregated counters. Without the change we'd need to deploy the new code and then deploy the new rules. The deploy would result in degraded aggregated counters without this config. --- src/aggregator/aggregator/elem_base.go | 11 +++++++++- src/aggregator/aggregator/elem_base_test.go | 22 +++++++++++++++++++ src/aggregator/aggregator/options.go | 19 ++++++++++++++++ .../m3aggregator/config/aggregator.go | 6 ++++- 4 files changed, 56 insertions(+), 2 deletions(-) diff --git a/src/aggregator/aggregator/elem_base.go b/src/aggregator/aggregator/elem_base.go index eeff7cc5b2..973e223345 100644 --- a/src/aggregator/aggregator/elem_base.go +++ b/src/aggregator/aggregator/elem_base.go @@ -37,9 +37,9 @@ import ( "github.com/m3db/m3/src/metrics/policy" "github.com/m3db/m3/src/metrics/transformation" "github.com/m3db/m3/src/x/pool" - "go.uber.org/zap" "github.com/willf/bitset" + "go.uber.org/zap" ) const ( @@ -166,6 +166,7 @@ type elemBase struct { idPrefixSuffixType IDPrefixSuffixType writeForwardedMetricFn writeForwardedMetricFn onForwardedAggregationWrittenFn onForwardedAggregationDoneFn + addToReset bool // Mutable states. tombstoned bool @@ -179,6 +180,7 @@ func newElemBase(opts Options) elemBase { opts: opts, aggTypesOpts: opts.AggregationTypesOptions(), aggOpts: raggregation.NewOptions(opts.InstrumentOptions()), + addToReset: opts.AddToReset(), } } @@ -198,6 +200,13 @@ func (e *elemBase) resetSetData( l.Error("error parsing pipeline", zap.Error(err)) return err } + if e.addToReset { + for i := range parsed.Transformations { + if parsed.Transformations[i].Type() == transformation.Add { + parsed.Transformations[i], _ = transformation.Reset.NewOp() + } + } + } e.id = id e.sp = sp e.aggTypes = aggTypes diff --git a/src/aggregator/aggregator/elem_base_test.go b/src/aggregator/aggregator/elem_base_test.go index a929227a26..a277432a51 100644 --- a/src/aggregator/aggregator/elem_base_test.go +++ b/src/aggregator/aggregator/elem_base_test.go @@ -572,3 +572,25 @@ func TestParsePipelineTransformationDerivativeOrderTooHigh(t *testing.T) { require.Error(t, err) require.True(t, strings.Contains(err.Error(), "transformation derivative order is 2 higher than supported 1")) } + +func TestAddToRestOption(t *testing.T) { + p := applied.NewPipeline([]applied.OpUnion{ + { + Type: pipeline.TransformationOpType, + Transformation: pipeline.TransformationOp{Type: transformation.Add}, + }, + { + Type: pipeline.TransformationOpType, + Transformation: pipeline.TransformationOp{Type: transformation.Increase}, + }, + }) + e := newElemBase(NewOptions().SetAddToReset(false)) + e.resetSetData(testCounterID, testStoragePolicy, testAggregationTypesExpensive, false, p, 3, WithPrefixWithSuffix) + require.Equal(t, transformation.Add, e.parsedPipeline.Transformations[0].Type()) + require.Equal(t, transformation.Increase, e.parsedPipeline.Transformations[1].Type()) + + e = newElemBase(NewOptions().SetAddToReset(true)) + e.resetSetData(testCounterID, testStoragePolicy, testAggregationTypesExpensive, false, p, 3, WithPrefixWithSuffix) + require.Equal(t, transformation.Reset, e.parsedPipeline.Transformations[0].Type()) + require.Equal(t, transformation.Increase, e.parsedPipeline.Transformations[1].Type()) +} diff --git a/src/aggregator/aggregator/options.go b/src/aggregator/aggregator/options.go index 006801f365..b90d915e6c 100644 --- a/src/aggregator/aggregator/options.go +++ b/src/aggregator/aggregator/options.go @@ -314,6 +314,14 @@ type Options interface { // VerboseErrors returns whether to return verbose errors or not. VerboseErrors() bool + + // SetAddToReset sets the value for AddToReset. + SetAddToReset(value bool) Options + + // AddToReset changes Add transforms to Reset Transforms. + // This is a temporary option to help with the seamless rollout of changing Add transforms to Reset transforms for + // resetting aggregate counters. Once rollup rules have changed to use Reset explicitly, this can be removed. + AddToReset() bool } type options struct { @@ -354,6 +362,7 @@ type options struct { timerElemPool TimerElemPool gaugeElemPool GaugeElemPool verboseErrors bool + addToReset bool // Derived options. fullCounterPrefix []byte @@ -840,6 +849,16 @@ func (o *options) computeFullGaugePrefix() { o.fullGaugePrefix = fullGaugePrefix } +func (o *options) AddToReset() bool { + return o.addToReset +} + +func (o *options) SetAddToReset(value bool) Options { + opts := *o + opts.addToReset = value + return &opts +} + func defaultMaxAllowedForwardingDelayFn( resolution time.Duration, numForwardedTimes int, diff --git a/src/cmd/services/m3aggregator/config/aggregator.go b/src/cmd/services/m3aggregator/config/aggregator.go index 307fa69212..89da0b119e 100644 --- a/src/cmd/services/m3aggregator/config/aggregator.go +++ b/src/cmd/services/m3aggregator/config/aggregator.go @@ -173,6 +173,9 @@ type AggregatorConfiguration struct { // Pool of entries. EntryPool pool.ObjectPoolConfiguration `yaml:"entryPool"` + + // AddToReset is the yaml config for aggregator.Options.AddToReset + AddToReset bool `yaml:"addToReset"` } // InstanceIDType is the instance ID type that defines how the @@ -258,7 +261,8 @@ func (c *AggregatorConfiguration) NewAggregatorOptions( opts := aggregator.NewOptions(). SetInstrumentOptions(instrumentOpts). SetRuntimeOptionsManager(runtimeOptsManager). - SetVerboseErrors(c.VerboseErrors) + SetVerboseErrors(c.VerboseErrors). + SetAddToReset(c.AddToReset) rwOpts := serveOpts.RWOptions() if rwOpts == nil {