From 758b8f06c1c89c2e7250614855b893882f9e62d4 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Wed, 5 May 2021 19:41:44 -0400 Subject: [PATCH 1/2] [matcher] [coordinator] Add RequireNamespaceWatchOnInit option This option on the matcher will require that namespace values and corresponding rulesets are loaded in the matcher as part of startup. This is useful in ensuring dynamic rules are loaded before we start ingesting metrics. --- src/metrics/matcher/config.go | 22 +++--- src/metrics/matcher/namespaces.go | 57 ++++++++++----- src/metrics/matcher/namespaces_test.go | 96 +++++++++++++++++++++----- src/metrics/matcher/options.go | 45 ++++++++---- 4 files changed, 164 insertions(+), 56 deletions(-) diff --git a/src/metrics/matcher/config.go b/src/metrics/matcher/config.go index 88cba241a9..f0b05278e5 100644 --- a/src/metrics/matcher/config.go +++ b/src/metrics/matcher/config.go @@ -38,15 +38,16 @@ import ( // Configuration is config used to create a Matcher. type Configuration struct { - InitWatchTimeout time.Duration `yaml:"initWatchTimeout"` - RulesKVConfig kv.OverrideConfiguration `yaml:"rulesKVConfig"` - NamespacesKey string `yaml:"namespacesKey" validate:"nonzero"` - RuleSetKeyFmt string `yaml:"ruleSetKeyFmt" validate:"nonzero"` - NamespaceTag string `yaml:"namespaceTag" validate:"nonzero"` - DefaultNamespace string `yaml:"defaultNamespace" validate:"nonzero"` - NameTagKey string `yaml:"nameTagKey" validate:"nonzero"` - MatchRangePast *time.Duration `yaml:"matchRangePast"` - SortedTagIteratorPool pool.ObjectPoolConfiguration `yaml:"sortedTagIteratorPool"` + InitWatchTimeout time.Duration `yaml:"initWatchTimeout"` + RulesKVConfig kv.OverrideConfiguration `yaml:"rulesKVConfig"` + NamespacesKey string `yaml:"namespacesKey" validate:"nonzero"` + RuleSetKeyFmt string `yaml:"ruleSetKeyFmt" validate:"nonzero"` + NamespaceTag string `yaml:"namespaceTag" validate:"nonzero"` + DefaultNamespace string `yaml:"defaultNamespace" validate:"nonzero"` + NameTagKey string `yaml:"nameTagKey" validate:"nonzero"` + MatchRangePast *time.Duration `yaml:"matchRangePast"` + SortedTagIteratorPool pool.ObjectPoolConfiguration `yaml:"sortedTagIteratorPool"` + RequireNamespaceWatchOnInit bool `yaml:"requireNamespaceWatchOnInit"` } // NewNamespaces creates a matcher.Namespaces. @@ -136,7 +137,8 @@ func (cfg *Configuration) NewOptions( SetNamespacesKey(cfg.NamespacesKey). SetRuleSetKeyFn(ruleSetKeyFn). SetNamespaceTag([]byte(cfg.NamespaceTag)). - SetDefaultNamespace([]byte(cfg.DefaultNamespace)) + SetDefaultNamespace([]byte(cfg.DefaultNamespace)). + SetRequireNamespaceWatchOnInit(cfg.RequireNamespaceWatchOnInit) if cfg.InitWatchTimeout != 0 { opts = opts.SetInitWatchTimeout(cfg.InitWatchTimeout) diff --git a/src/metrics/matcher/namespaces.go b/src/metrics/matcher/namespaces.go index 4b532d0ae2..fa3e8dbcdb 100644 --- a/src/metrics/matcher/namespaces.go +++ b/src/metrics/matcher/namespaces.go @@ -32,6 +32,7 @@ import ( "github.com/m3db/m3/src/metrics/metric" "github.com/m3db/m3/src/metrics/rules" "github.com/m3db/m3/src/x/clock" + xerrors "github.com/m3db/m3/src/x/errors" "github.com/m3db/m3/src/x/watch" "github.com/uber-go/tally" @@ -112,27 +113,29 @@ type namespaces struct { onNamespaceAddedFn OnNamespaceAddedFn onNamespaceRemovedFn OnNamespaceRemovedFn - proto *rulepb.Namespaces - rules *namespaceRuleSetsMap - metrics namespacesMetrics + proto *rulepb.Namespaces + rules *namespaceRuleSetsMap + metrics namespacesMetrics + requireNamespaceWatchOnInit bool } // NewNamespaces creates a new namespaces object. func NewNamespaces(key string, opts Options) Namespaces { instrumentOpts := opts.InstrumentOptions() n := &namespaces{ - key: key, - store: opts.KVStore(), - opts: opts, - nowFn: opts.ClockOptions().NowFn(), - log: instrumentOpts.Logger(), - ruleSetKeyFn: opts.RuleSetKeyFn(), - matchRangePast: opts.MatchRangePast(), - onNamespaceAddedFn: opts.OnNamespaceAddedFn(), - onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(), - proto: &rulepb.Namespaces{}, - rules: newNamespaceRuleSetsMap(namespaceRuleSetsMapOptions{}), - metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()), + key: key, + store: opts.KVStore(), + opts: opts, + nowFn: opts.ClockOptions().NowFn(), + log: instrumentOpts.Logger(), + ruleSetKeyFn: opts.RuleSetKeyFn(), + matchRangePast: opts.MatchRangePast(), + onNamespaceAddedFn: opts.OnNamespaceAddedFn(), + onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(), + proto: &rulepb.Namespaces{}, + rules: newNamespaceRuleSetsMap(namespaceRuleSetsMapOptions{}), + metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()), + requireNamespaceWatchOnInit: opts.RequireNamespaceWatchOnInit(), } valueOpts := runtime.NewOptions(). SetInstrumentOptions(instrumentOpts). @@ -160,10 +163,15 @@ func (n *namespaces) Open() error { // to be more resilient to error conditions preventing process // from starting up. n.metrics.initWatchErrors.Inc(1) + if n.requireNamespaceWatchOnInit { + return err + } + n.opts.InstrumentOptions().Logger().With( zap.String("key", n.key), zap.Error(err), ).Error("error initializing namespaces values, retrying in the background") + return nil } @@ -255,7 +263,12 @@ func (n *namespaces) process(value interface{}) error { n.Lock() defer n.Unlock() - var watchWg sync.WaitGroup + var ( + watchWg sync.WaitGroup + multiErr xerrors.MultiError + errLock sync.Mutex + ) + for _, entry := range incoming.Iter() { namespace, elem := entry.Key(), rules.Namespace(entry.Value()) nsName, snapshots := elem.Name(), elem.Snapshots() @@ -302,6 +315,13 @@ func (n *namespaces) process(value interface{}) error { n.log.Error("failed to watch ruleset updates", zap.String("ruleSetKey", ruleSet.Key()), zap.Error(err)) + + // Track errors if we explicitly want to ensure watches succeed. + if n.requireNamespaceWatchOnInit { + errLock.Lock() + multiErr = multiErr.Add(err) + errLock.Unlock() + } } }() } @@ -312,6 +332,11 @@ func (n *namespaces) process(value interface{}) error { } watchWg.Wait() + + if !multiErr.Empty() { + return multiErr.FinalError() + } + for _, entry := range n.rules.Iter() { namespace, ruleSet := entry.Key(), entry.Value() _, exists := incoming.Get(namespace) diff --git a/src/metrics/matcher/namespaces_test.go b/src/metrics/matcher/namespaces_test.go index 8a991f93f7..a61d3648db 100644 --- a/src/metrics/matcher/namespaces_test.go +++ b/src/metrics/matcher/namespaces_test.go @@ -46,10 +46,10 @@ func TestNamespacesWatchAndClose(t *testing.T) { store, _, nss, _ := testNamespaces() proto := &rulepb.Namespaces{ Namespaces: []*rulepb.Namespace{ - &rulepb.Namespace{ + { Name: "fooNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: true, }, @@ -64,6 +64,68 @@ func TestNamespacesWatchAndClose(t *testing.T) { nss.Close() } +func TestNamespacesWatchSoftErr(t *testing.T) { + _, _, nss, _ := testNamespaces() + // No value set, so this will soft error + require.NoError(t, nss.Open()) +} + +func TestNamespacesWatchRulesetSoftErr(t *testing.T) { + store, _, nss, _ := testNamespaces() + proto := &rulepb.Namespaces{ + Namespaces: []*rulepb.Namespace{ + { + Name: "fooNs", + Snapshots: []*rulepb.NamespaceSnapshot{ + { + ForRulesetVersion: 1, + Tombstoned: true, + }, + }, + }, + }, + } + _, err := store.SetIfNotExists(testNamespacesKey, proto) + require.NoError(t, err) + + // This should also soft error even though the underlying ruleset does not exist + require.NoError(t, nss.Open()) +} + +func TestNamespacesWatchHardErr(t *testing.T) { + _, _, _, opts := testNamespaces() + opts = opts.SetRequireNamespaceWatchOnInit(true) + nss := NewNamespaces(testNamespacesKey, opts).(*namespaces) + // This should hard error with RequireNamespaceWatchOnInit enabled + require.Error(t, nss.Open()) +} + +func TestNamespacesWatchRulesetHardErr(t *testing.T) { + store, _, _, opts := testNamespaces() + opts = opts.SetRequireNamespaceWatchOnInit(true) + nss := NewNamespaces(testNamespacesKey, opts).(*namespaces) + + proto := &rulepb.Namespaces{ + Namespaces: []*rulepb.Namespace{ + { + Name: "fooNs", + Snapshots: []*rulepb.NamespaceSnapshot{ + { + ForRulesetVersion: 1, + Tombstoned: true, + }, + }, + }, + }, + } + _, err := store.SetIfNotExists(testNamespacesKey, proto) + require.NoError(t, err) + + // This should also hard error with RequireNamespaceWatchOnInit enabled, + // because the underlying ruleset does not exist + require.Error(t, nss.Open()) +} + func TestToNamespacesNilValue(t *testing.T) { _, _, nss, _ := testNamespaces() _, err := nss.toNamespaces(nil) @@ -80,10 +142,10 @@ func TestToNamespacesSuccess(t *testing.T) { store, _, nss, _ := testNamespaces() proto := &rulepb.Namespaces{ Namespaces: []*rulepb.Namespace{ - &rulepb.Namespace{ + { Name: "fooNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: true, }, @@ -128,55 +190,55 @@ func TestNamespacesProcess(t *testing.T) { update := &rulepb.Namespaces{ Namespaces: []*rulepb.Namespace{ - &rulepb.Namespace{ + { Name: "fooNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: false, }, - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 2, Tombstoned: false, }, }, }, - &rulepb.Namespace{ + { Name: "barNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: false, }, - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 2, Tombstoned: true, }, }, }, - &rulepb.Namespace{ + { Name: "bazNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 1, Tombstoned: false, }, - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 2, Tombstoned: false, }, }, }, - &rulepb.Namespace{ + { Name: "catNs", Snapshots: []*rulepb.NamespaceSnapshot{ - &rulepb.NamespaceSnapshot{ + { ForRulesetVersion: 3, Tombstoned: true, }, }, }, - &rulepb.Namespace{ + { Name: "mehNs", Snapshots: nil, }, @@ -215,7 +277,7 @@ func TestNamespacesProcess(t *testing.T) { } } -func testNamespaces() (kv.Store, cache.Cache, *namespaces, Options) { +func testNamespaces() (kv.TxnStore, cache.Cache, *namespaces, Options) { store := mem.NewStore() cache := newMemCache() opts := NewOptions(). diff --git a/src/metrics/matcher/options.go b/src/metrics/matcher/options.go index 2c90aea499..208e102cf9 100644 --- a/src/metrics/matcher/options.go +++ b/src/metrics/matcher/options.go @@ -135,22 +135,29 @@ type Options interface { // OnRuleSetUpdatedFn returns the function to be called when a ruleset is updated. OnRuleSetUpdatedFn() OnRuleSetUpdatedFn + + // SetRequireNamespaceWatchOnInit sets the flag to ensure matcher is initialized with a loaded namespace watch. + SetRequireNamespaceWatchOnInit(value bool) Options + + // RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch. + RequireNamespaceWatchOnInit() bool } type options struct { - clockOpts clock.Options - instrumentOpts instrument.Options - ruleSetOpts rules.Options - initWatchTimeout time.Duration - kvStore kv.Store - namespacesKey string - ruleSetKeyFn RuleSetKeyFn - namespaceTag []byte - defaultNamespace []byte - matchRangePast time.Duration - onNamespaceAddedFn OnNamespaceAddedFn - onNamespaceRemovedFn OnNamespaceRemovedFn - onRuleSetUpdatedFn OnRuleSetUpdatedFn + clockOpts clock.Options + instrumentOpts instrument.Options + ruleSetOpts rules.Options + initWatchTimeout time.Duration + kvStore kv.Store + namespacesKey string + ruleSetKeyFn RuleSetKeyFn + namespaceTag []byte + defaultNamespace []byte + matchRangePast time.Duration + onNamespaceAddedFn OnNamespaceAddedFn + onNamespaceRemovedFn OnNamespaceRemovedFn + onRuleSetUpdatedFn OnRuleSetUpdatedFn + requireNamespaceWatchOnInit bool } // NewOptions creates a new set of options. @@ -299,6 +306,18 @@ func (o *options) OnRuleSetUpdatedFn() OnRuleSetUpdatedFn { return o.onRuleSetUpdatedFn } +// SetRequireNamespaceWatchOnInit sets the flag to ensure matcher is initialized with a loaded namespace watch. +func (o *options) SetRequireNamespaceWatchOnInit(value bool) Options { + opts := *o + opts.requireNamespaceWatchOnInit = value + return &opts +} + +// RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch. +func (o *options) RequireNamespaceWatchOnInit() bool { + return o.requireNamespaceWatchOnInit +} + func defaultRuleSetKeyFn(namespace []byte) string { return fmt.Sprintf(defaultRuleSetKeyFormat, namespace) } From 664a90a42d30adcd28db94b729f1f78b36711d03 Mon Sep 17 00:00:00 2001 From: Wesley Kim Date: Wed, 5 May 2021 23:01:10 -0400 Subject: [PATCH 2/2] Set RequireNamespaceWatchOnInit in downsampler config --- .../m3coordinator/downsample/options.go | 6 ++++- src/metrics/matcher/config.go | 22 +++++++++---------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/src/cmd/services/m3coordinator/downsample/options.go b/src/cmd/services/m3coordinator/downsample/options.go index 9db630ac34..fec28c40fe 100644 --- a/src/cmd/services/m3coordinator/downsample/options.go +++ b/src/cmd/services/m3coordinator/downsample/options.go @@ -284,6 +284,9 @@ type MatcherConfiguration struct { // NamespaceTag defines the namespace tag to use to select rules // namespace to evaluate against. Default is "__m3_namespace__". NamespaceTag string `yaml:"namespaceTag"` + // RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch. + // This only makes sense to use if the corresponding namespace / ruleset values are properly seeded. + RequireNamespaceWatchOnInit bool `yaml:"requireNamespaceWatchOnInit"` } // MatcherCacheConfiguration is the configuration for the rule matcher cache. @@ -710,7 +713,8 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) { SetInstrumentOptions(instrumentOpts). SetRuleSetOptions(ruleSetOpts). SetKVStore(o.RulesKVStore). - SetNamespaceTag([]byte(namespaceTag)) + SetNamespaceTag([]byte(namespaceTag)). + SetRequireNamespaceWatchOnInit(cfg.Matcher.RequireNamespaceWatchOnInit) // NB(r): If rules are being explicitly set in config then we are // going to use an in memory KV store for rules and explicitly set them up. diff --git a/src/metrics/matcher/config.go b/src/metrics/matcher/config.go index f0b05278e5..88cba241a9 100644 --- a/src/metrics/matcher/config.go +++ b/src/metrics/matcher/config.go @@ -38,16 +38,15 @@ import ( // Configuration is config used to create a Matcher. type Configuration struct { - InitWatchTimeout time.Duration `yaml:"initWatchTimeout"` - RulesKVConfig kv.OverrideConfiguration `yaml:"rulesKVConfig"` - NamespacesKey string `yaml:"namespacesKey" validate:"nonzero"` - RuleSetKeyFmt string `yaml:"ruleSetKeyFmt" validate:"nonzero"` - NamespaceTag string `yaml:"namespaceTag" validate:"nonzero"` - DefaultNamespace string `yaml:"defaultNamespace" validate:"nonzero"` - NameTagKey string `yaml:"nameTagKey" validate:"nonzero"` - MatchRangePast *time.Duration `yaml:"matchRangePast"` - SortedTagIteratorPool pool.ObjectPoolConfiguration `yaml:"sortedTagIteratorPool"` - RequireNamespaceWatchOnInit bool `yaml:"requireNamespaceWatchOnInit"` + InitWatchTimeout time.Duration `yaml:"initWatchTimeout"` + RulesKVConfig kv.OverrideConfiguration `yaml:"rulesKVConfig"` + NamespacesKey string `yaml:"namespacesKey" validate:"nonzero"` + RuleSetKeyFmt string `yaml:"ruleSetKeyFmt" validate:"nonzero"` + NamespaceTag string `yaml:"namespaceTag" validate:"nonzero"` + DefaultNamespace string `yaml:"defaultNamespace" validate:"nonzero"` + NameTagKey string `yaml:"nameTagKey" validate:"nonzero"` + MatchRangePast *time.Duration `yaml:"matchRangePast"` + SortedTagIteratorPool pool.ObjectPoolConfiguration `yaml:"sortedTagIteratorPool"` } // NewNamespaces creates a matcher.Namespaces. @@ -137,8 +136,7 @@ func (cfg *Configuration) NewOptions( SetNamespacesKey(cfg.NamespacesKey). SetRuleSetKeyFn(ruleSetKeyFn). SetNamespaceTag([]byte(cfg.NamespaceTag)). - SetDefaultNamespace([]byte(cfg.DefaultNamespace)). - SetRequireNamespaceWatchOnInit(cfg.RequireNamespaceWatchOnInit) + SetDefaultNamespace([]byte(cfg.DefaultNamespace)) if cfg.InitWatchTimeout != 0 { opts = opts.SetInitWatchTimeout(cfg.InitWatchTimeout)