From b770ecf33ad7a504167d0b40b5216db1f668125a Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Thu, 6 May 2021 17:02:46 -0400 Subject: [PATCH] [matcher] [coordinator] Add RequireNamespaceWatchOnInit option (#3468) * [matcher] [coordinator] Add RequireNamespaceWatchOnInit option This option on the matcher will require that namespace values and corresponding rulesets are loaded in the matcher as part of startup. This is useful in ensuring dynamic rules are loaded before we start ingesting metrics. * Set RequireNamespaceWatchOnInit in downsampler config --- .../m3coordinator/downsample/options.go | 6 +- src/metrics/matcher/namespaces.go | 57 +++++++---- src/metrics/matcher/namespaces_test.go | 96 +++++++++++++++---- src/metrics/matcher/options.go | 45 ++++++--- 4 files changed, 157 insertions(+), 47 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 9db630ac34..fec28c40fe 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -284,6 +284,9 @@ type MatcherConfiguration struct { // NamespaceTag defines the namespace tag to use to select rules // namespace to evaluate against. Default is "__m3_namespace__". NamespaceTag string `yaml:"namespaceTag"` + // RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch. + // This only makes sense to use if the corresponding namespace / ruleset values are properly seeded. + RequireNamespaceWatchOnInit bool `yaml:"requireNamespaceWatchOnInit"` } // MatcherCacheConfiguration is the configuration for the rule matcher cache. @@ -710,7 +713,8 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { SetInstrumentOptions(instrumentOpts). SetRuleSetOptions(ruleSetOpts). SetKVStore(o.RulesKVStore). - SetNamespaceTag([]byte(namespaceTag)) + SetNamespaceTag([]byte(namespaceTag)). + SetRequireNamespaceWatchOnInit(cfg.Matcher.RequireNamespaceWatchOnInit) // NB(r): If rules are being explicitly set in config then we are // going to use an in memory KV store for rules and explicitly set them up. diff --git a/src/metrics/matcher/namespaces.go b/src/metrics/matcher/namespaces.go index 4b532d0ae2..fa3e8dbcdb 100644 --- a/src/metrics/matcher/namespaces.go +++ b/src/metrics/matcher/namespaces.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/rules" "github.com/m3db/m3/src/x/clock" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/watch" "github.com/uber-go/tally" @@ -112,27 +113,29 @@ type namespaces struct { onNamespaceAddedFn OnNamespaceAddedFn onNamespaceRemovedFn OnNamespaceRemovedFn - proto *rulepb.Namespaces - rules *namespaceRuleSetsMap - metrics namespacesMetrics + proto *rulepb.Namespaces + rules *namespaceRuleSetsMap + metrics namespacesMetrics + requireNamespaceWatchOnInit bool } // NewNamespaces creates a new namespaces object. func NewNamespaces(key string, opts Options) Namespaces { instrumentOpts := opts.InstrumentOptions() n := &namespaces{ - key: key, - store: opts.KVStore(), - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - log: instrumentOpts.Logger(), - ruleSetKeyFn: opts.RuleSetKeyFn(), - matchRangePast: opts.MatchRangePast(), - onNamespaceAddedFn: opts.OnNamespaceAddedFn(), - onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(), - proto: &rulepb.Namespaces{}, - rules: newNamespaceRuleSetsMap(namespaceRuleSetsMapOptions{}), - metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()), + key: key, + store: opts.KVStore(), + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + log: instrumentOpts.Logger(), + ruleSetKeyFn: opts.RuleSetKeyFn(), + matchRangePast: opts.MatchRangePast(), + onNamespaceAddedFn: opts.OnNamespaceAddedFn(), + onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(), + proto: &rulepb.Namespaces{}, + rules: newNamespaceRuleSetsMap(namespaceRuleSetsMapOptions{}), + metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()), + requireNamespaceWatchOnInit: opts.RequireNamespaceWatchOnInit(), } valueOpts := runtime.NewOptions(). SetInstrumentOptions(instrumentOpts). @@ -160,10 +163,15 @@ func (n *namespaces) Open() error { // to be more resilient to error conditions preventing process // from starting up. n.metrics.initWatchErrors.Inc(1) + if n.requireNamespaceWatchOnInit { + return err + } + n.opts.InstrumentOptions().Logger().With( zap.String("key", n.key), zap.Error(err), ).Error("error initializing namespaces values, retrying in the background") + return nil } @@ -255,7 +263,12 @@ func (n *namespaces) process(value interface{}) error { n.Lock() defer n.Unlock() - var watchWg sync.WaitGroup + var ( + watchWg sync.WaitGroup + multiErr xerrors.MultiError + errLock sync.Mutex + ) + for _, entry := range incoming.Iter() { namespace, elem := entry.Key(), rules.Namespace(entry.Value()) nsName, snapshots := elem.Name(), elem.Snapshots() @@ -302,6 +315,13 @@ func (n *namespaces) process(value interface{}) error { n.log.Error("failed to watch ruleset updates", zap.String("ruleSetKey", ruleSet.Key()), zap.Error(err)) + + // Track errors if we explicitly want to ensure watches succeed. + if n.requireNamespaceWatchOnInit { + errLock.Lock() + multiErr = multiErr.Add(err) + errLock.Unlock() + } } }() } @@ -312,6 +332,11 @@ func (n *namespaces) process(value interface{}) error { } watchWg.Wait() + + if !multiErr.Empty() { + return multiErr.FinalError() + } + for _, entry := range n.rules.Iter() { namespace, ruleSet := entry.Key(), entry.Value() _, exists := incoming.Get(namespace) diff --git a/src/metrics/matcher/namespaces_test.go b/src/metrics/matcher/namespaces_test.go index 8a991f93f7..a61d3648db 100644 --- a/src/metrics/matcher/namespaces_test.go +++ b/src/metrics/matcher/namespaces_test.go @@ -46,10 +46,10 @@ func TestNamespacesWatchAndClose(t *testing.T) { store, _, nss, _ := testNamespaces() proto := &rulepb.Namespaces{ Namespaces: []*rulepb.Namespace{ - &rulepb.Namespace{ + { Name: "fooNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: true, }, @@ -64,6 +64,68 @@ func TestNamespacesWatchAndClose(t *testing.T) { nss.Close() } +func TestNamespacesWatchSoftErr(t *testing.T) { + _, _, nss, _ := testNamespaces() + // No value set, so this will soft error + require.NoError(t, nss.Open()) +} + +func TestNamespacesWatchRulesetSoftErr(t *testing.T) { + store, _, nss, _ := testNamespaces() + proto := &rulepb.Namespaces{ + Namespaces: []*rulepb.Namespace{ + { + Name: "fooNs", + Snapshots: []*rulepb.NamespaceSnapshot{ + { + ForRulesetVersion: 1, + Tombstoned: true, + }, + }, + }, + }, + } + _, err := store.SetIfNotExists(testNamespacesKey, proto) + require.NoError(t, err) + + // This should also soft error even though the underlying ruleset does not exist + require.NoError(t, nss.Open()) +} + +func TestNamespacesWatchHardErr(t *testing.T) { + _, _, _, opts := testNamespaces() + opts = opts.SetRequireNamespaceWatchOnInit(true) + nss := NewNamespaces(testNamespacesKey, opts).(*namespaces) + // This should hard error with RequireNamespaceWatchOnInit enabled + require.Error(t, nss.Open()) +} + +func TestNamespacesWatchRulesetHardErr(t *testing.T) { + store, _, _, opts := testNamespaces() + opts = opts.SetRequireNamespaceWatchOnInit(true) + nss := NewNamespaces(testNamespacesKey, opts).(*namespaces) + + proto := &rulepb.Namespaces{ + Namespaces: []*rulepb.Namespace{ + { + Name: "fooNs", + Snapshots: []*rulepb.NamespaceSnapshot{ + { + ForRulesetVersion: 1, + Tombstoned: true, + }, + }, + }, + }, + } + _, err := store.SetIfNotExists(testNamespacesKey, proto) + require.NoError(t, err) + + // This should also hard error with RequireNamespaceWatchOnInit enabled, + // because the underlying ruleset does not exist + require.Error(t, nss.Open()) +} + func TestToNamespacesNilValue(t *testing.T) { _, _, nss, _ := testNamespaces() _, err := nss.toNamespaces(nil) @@ -80,10 +142,10 @@ func TestToNamespacesSuccess(t *testing.T) { store, _, nss, _ := testNamespaces() proto := &rulepb.Namespaces{ Namespaces: []*rulepb.Namespace{ - &rulepb.Namespace{ + { Name: "fooNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: true, }, @@ -128,55 +190,55 @@ func TestNamespacesProcess(t *testing.T) { update := &rulepb.Namespaces{ Namespaces: []*rulepb.Namespace{ - &rulepb.Namespace{ + { Name: "fooNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: false, }, - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 2, Tombstoned: false, }, }, }, - &rulepb.Namespace{ + { Name: "barNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: false, }, - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 2, Tombstoned: true, }, }, }, - &rulepb.Namespace{ + { Name: "bazNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: false, }, - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 2, Tombstoned: false, }, }, }, - &rulepb.Namespace{ + { Name: "catNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 3, Tombstoned: true, }, }, }, - &rulepb.Namespace{ + { Name: "mehNs", Snapshots: nil, }, @@ -215,7 +277,7 @@ func TestNamespacesProcess(t *testing.T) { } } -func testNamespaces() (kv.Store, cache.Cache, *namespaces, Options) { +func testNamespaces() (kv.TxnStore, cache.Cache, *namespaces, Options) { store := mem.NewStore() cache := newMemCache() opts := NewOptions(). diff --git a/src/metrics/matcher/options.go b/src/metrics/matcher/options.go index 2c90aea499..208e102cf9 100644 --- a/src/metrics/matcher/options.go +++ b/src/metrics/matcher/options.go @@ -135,22 +135,29 @@ type Options interface { // OnRuleSetUpdatedFn returns the function to be called when a ruleset is updated. OnRuleSetUpdatedFn() OnRuleSetUpdatedFn + + // SetRequireNamespaceWatchOnInit sets the flag to ensure matcher is initialized with a loaded namespace watch. + SetRequireNamespaceWatchOnInit(value bool) Options + + // RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch. + RequireNamespaceWatchOnInit() bool } type options struct { - clockOpts clock.Options - instrumentOpts instrument.Options - ruleSetOpts rules.Options - initWatchTimeout time.Duration - kvStore kv.Store - namespacesKey string - ruleSetKeyFn RuleSetKeyFn - namespaceTag []byte - defaultNamespace []byte - matchRangePast time.Duration - onNamespaceAddedFn OnNamespaceAddedFn - onNamespaceRemovedFn OnNamespaceRemovedFn - onRuleSetUpdatedFn OnRuleSetUpdatedFn + clockOpts clock.Options + instrumentOpts instrument.Options + ruleSetOpts rules.Options + initWatchTimeout time.Duration + kvStore kv.Store + namespacesKey string + ruleSetKeyFn RuleSetKeyFn + namespaceTag []byte + defaultNamespace []byte + matchRangePast time.Duration + onNamespaceAddedFn OnNamespaceAddedFn + onNamespaceRemovedFn OnNamespaceRemovedFn + onRuleSetUpdatedFn OnRuleSetUpdatedFn + requireNamespaceWatchOnInit bool } // NewOptions creates a new set of options. @@ -299,6 +306,18 @@ func (o *options) OnRuleSetUpdatedFn() OnRuleSetUpdatedFn { return o.onRuleSetUpdatedFn } +// SetRequireNamespaceWatchOnInit sets the flag to ensure matcher is initialized with a loaded namespace watch. +func (o *options) SetRequireNamespaceWatchOnInit(value bool) Options { + opts := *o + opts.requireNamespaceWatchOnInit = value + return &opts +} + +// RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch. +func (o *options) RequireNamespaceWatchOnInit() bool { + return o.requireNamespaceWatchOnInit +} + func defaultRuleSetKeyFn(namespace []byte) string { return fmt.Sprintf(defaultRuleSetKeyFormat, namespace) }