Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[matcher] [coordinator] Add RequireNamespaceWatchOnInit option #3468

Merged
merged 2 commits into from
May 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/cmd/services/m3coordinator/downsample/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,9 @@ type MatcherConfiguration struct {
// NamespaceTag defines the namespace tag to use to select rules
// namespace to evaluate against. Default is "__m3_namespace__".
NamespaceTag string `yaml:"namespaceTag"`
// RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch.
// This only makes sense to use if the corresponding namespace / ruleset values are properly seeded.
RequireNamespaceWatchOnInit bool `yaml:"requireNamespaceWatchOnInit"`
}

// MatcherCacheConfiguration is the configuration for the rule matcher cache.
Expand Down Expand Up @@ -710,7 +713,8 @@ func (cfg Configuration) newAggregator(o DownsamplerOptions) (agg, error) {
SetInstrumentOptions(instrumentOpts).
SetRuleSetOptions(ruleSetOpts).
SetKVStore(o.RulesKVStore).
SetNamespaceTag([]byte(namespaceTag))
SetNamespaceTag([]byte(namespaceTag)).
SetRequireNamespaceWatchOnInit(cfg.Matcher.RequireNamespaceWatchOnInit)

// NB(r): If rules are being explicitly set in config then we are
// going to use an in memory KV store for rules and explicitly set them up.
Expand Down
57 changes: 41 additions & 16 deletions src/metrics/matcher/namespaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/m3db/m3/src/metrics/metric"
"github.com/m3db/m3/src/metrics/rules"
"github.com/m3db/m3/src/x/clock"
xerrors "github.com/m3db/m3/src/x/errors"
"github.com/m3db/m3/src/x/watch"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -112,27 +113,29 @@ type namespaces struct {
onNamespaceAddedFn OnNamespaceAddedFn
onNamespaceRemovedFn OnNamespaceRemovedFn

proto *rulepb.Namespaces
rules *namespaceRuleSetsMap
metrics namespacesMetrics
proto *rulepb.Namespaces
rules *namespaceRuleSetsMap
metrics namespacesMetrics
requireNamespaceWatchOnInit bool
}

// NewNamespaces creates a new namespaces object.
func NewNamespaces(key string, opts Options) Namespaces {
instrumentOpts := opts.InstrumentOptions()
n := &namespaces{
key: key,
store: opts.KVStore(),
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
log: instrumentOpts.Logger(),
ruleSetKeyFn: opts.RuleSetKeyFn(),
matchRangePast: opts.MatchRangePast(),
onNamespaceAddedFn: opts.OnNamespaceAddedFn(),
onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(),
proto: &rulepb.Namespaces{},
rules: newNamespaceRuleSetsMap(namespaceRuleSetsMapOptions{}),
metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()),
key: key,
store: opts.KVStore(),
opts: opts,
nowFn: opts.ClockOptions().NowFn(),
log: instrumentOpts.Logger(),
ruleSetKeyFn: opts.RuleSetKeyFn(),
matchRangePast: opts.MatchRangePast(),
onNamespaceAddedFn: opts.OnNamespaceAddedFn(),
onNamespaceRemovedFn: opts.OnNamespaceRemovedFn(),
proto: &rulepb.Namespaces{},
rules: newNamespaceRuleSetsMap(namespaceRuleSetsMapOptions{}),
metrics: newNamespacesMetrics(instrumentOpts.MetricsScope()),
requireNamespaceWatchOnInit: opts.RequireNamespaceWatchOnInit(),
}
valueOpts := runtime.NewOptions().
SetInstrumentOptions(instrumentOpts).
Expand Down Expand Up @@ -160,10 +163,15 @@ func (n *namespaces) Open() error {
// to be more resilient to error conditions preventing process
// from starting up.
n.metrics.initWatchErrors.Inc(1)
if n.requireNamespaceWatchOnInit {
return err
}

n.opts.InstrumentOptions().Logger().With(
zap.String("key", n.key),
zap.Error(err),
).Error("error initializing namespaces values, retrying in the background")

return nil
}

Expand Down Expand Up @@ -255,7 +263,12 @@ func (n *namespaces) process(value interface{}) error {
n.Lock()
defer n.Unlock()

var watchWg sync.WaitGroup
var (
watchWg sync.WaitGroup
multiErr xerrors.MultiError
errLock sync.Mutex
)

for _, entry := range incoming.Iter() {
namespace, elem := entry.Key(), rules.Namespace(entry.Value())
nsName, snapshots := elem.Name(), elem.Snapshots()
Expand Down Expand Up @@ -302,6 +315,13 @@ func (n *namespaces) process(value interface{}) error {
n.log.Error("failed to watch ruleset updates",
zap.String("ruleSetKey", ruleSet.Key()),
zap.Error(err))

// Track errors if we explicitly want to ensure watches succeed.
if n.requireNamespaceWatchOnInit {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one awkward bit here is that this also covers the update case, not just the init case. However even if we propagate an error in that case, in the update path it gets swallowed regardless

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah makes sense

errLock.Lock()
multiErr = multiErr.Add(err)
errLock.Unlock()
}
}
}()
}
Expand All @@ -312,6 +332,11 @@ func (n *namespaces) process(value interface{}) error {
}

watchWg.Wait()

if !multiErr.Empty() {
return multiErr.FinalError()
}

for _, entry := range n.rules.Iter() {
namespace, ruleSet := entry.Key(), entry.Value()
_, exists := incoming.Get(namespace)
Expand Down
96 changes: 79 additions & 17 deletions src/metrics/matcher/namespaces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ func TestNamespacesWatchAndClose(t *testing.T) {
store, _, nss, _ := testNamespaces()
proto := &rulepb.Namespaces{
Namespaces: []*rulepb.Namespace{
&rulepb.Namespace{
{
Name: "fooNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: true,
},
Expand All @@ -64,6 +64,68 @@ func TestNamespacesWatchAndClose(t *testing.T) {
nss.Close()
}

func TestNamespacesWatchSoftErr(t *testing.T) {
_, _, nss, _ := testNamespaces()
// No value set, so this will soft error
require.NoError(t, nss.Open())
}

func TestNamespacesWatchRulesetSoftErr(t *testing.T) {
store, _, nss, _ := testNamespaces()
proto := &rulepb.Namespaces{
Namespaces: []*rulepb.Namespace{
{
Name: "fooNs",
Snapshots: []*rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: true,
},
},
},
},
}
_, err := store.SetIfNotExists(testNamespacesKey, proto)
require.NoError(t, err)

// This should also soft error even though the underlying ruleset does not exist
require.NoError(t, nss.Open())
}

func TestNamespacesWatchHardErr(t *testing.T) {
_, _, _, opts := testNamespaces()
opts = opts.SetRequireNamespaceWatchOnInit(true)
nss := NewNamespaces(testNamespacesKey, opts).(*namespaces)
// This should hard error with RequireNamespaceWatchOnInit enabled
require.Error(t, nss.Open())
}

func TestNamespacesWatchRulesetHardErr(t *testing.T) {
store, _, _, opts := testNamespaces()
opts = opts.SetRequireNamespaceWatchOnInit(true)
nss := NewNamespaces(testNamespacesKey, opts).(*namespaces)

proto := &rulepb.Namespaces{
Namespaces: []*rulepb.Namespace{
{
Name: "fooNs",
Snapshots: []*rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: true,
},
},
},
},
}
_, err := store.SetIfNotExists(testNamespacesKey, proto)
require.NoError(t, err)

// This should also hard error with RequireNamespaceWatchOnInit enabled,
// because the underlying ruleset does not exist
require.Error(t, nss.Open())
}

func TestToNamespacesNilValue(t *testing.T) {
_, _, nss, _ := testNamespaces()
_, err := nss.toNamespaces(nil)
Expand All @@ -80,10 +142,10 @@ func TestToNamespacesSuccess(t *testing.T) {
store, _, nss, _ := testNamespaces()
proto := &rulepb.Namespaces{
Namespaces: []*rulepb.Namespace{
&rulepb.Namespace{
{
Name: "fooNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: true,
},
Expand Down Expand Up @@ -128,55 +190,55 @@ func TestNamespacesProcess(t *testing.T) {

update := &rulepb.Namespaces{
Namespaces: []*rulepb.Namespace{
&rulepb.Namespace{
{
Name: "fooNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: false,
},
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 2,
Tombstoned: false,
},
},
},
&rulepb.Namespace{
{
Name: "barNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: false,
},
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 2,
Tombstoned: true,
},
},
},
&rulepb.Namespace{
{
Name: "bazNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 1,
Tombstoned: false,
},
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 2,
Tombstoned: false,
},
},
},
&rulepb.Namespace{
{
Name: "catNs",
Snapshots: []*rulepb.NamespaceSnapshot{
&rulepb.NamespaceSnapshot{
{
ForRulesetVersion: 3,
Tombstoned: true,
},
},
},
&rulepb.Namespace{
{
Name: "mehNs",
Snapshots: nil,
},
Expand Down Expand Up @@ -215,7 +277,7 @@ func TestNamespacesProcess(t *testing.T) {
}
}

func testNamespaces() (kv.Store, cache.Cache, *namespaces, Options) {
func testNamespaces() (kv.TxnStore, cache.Cache, *namespaces, Options) {
store := mem.NewStore()
cache := newMemCache()
opts := NewOptions().
Expand Down
45 changes: 32 additions & 13 deletions src/metrics/matcher/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,22 +135,29 @@ type Options interface {

// OnRuleSetUpdatedFn returns the function to be called when a ruleset is updated.
OnRuleSetUpdatedFn() OnRuleSetUpdatedFn

// SetRequireNamespaceWatchOnInit sets the flag to ensure matcher is initialized with a loaded namespace watch.
SetRequireNamespaceWatchOnInit(value bool) Options

// RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch.
RequireNamespaceWatchOnInit() bool
}

type options struct {
clockOpts clock.Options
instrumentOpts instrument.Options
ruleSetOpts rules.Options
initWatchTimeout time.Duration
kvStore kv.Store
namespacesKey string
ruleSetKeyFn RuleSetKeyFn
namespaceTag []byte
defaultNamespace []byte
matchRangePast time.Duration
onNamespaceAddedFn OnNamespaceAddedFn
onNamespaceRemovedFn OnNamespaceRemovedFn
onRuleSetUpdatedFn OnRuleSetUpdatedFn
clockOpts clock.Options
instrumentOpts instrument.Options
ruleSetOpts rules.Options
initWatchTimeout time.Duration
kvStore kv.Store
namespacesKey string
ruleSetKeyFn RuleSetKeyFn
namespaceTag []byte
defaultNamespace []byte
matchRangePast time.Duration
onNamespaceAddedFn OnNamespaceAddedFn
onNamespaceRemovedFn OnNamespaceRemovedFn
onRuleSetUpdatedFn OnRuleSetUpdatedFn
requireNamespaceWatchOnInit bool
}

// NewOptions creates a new set of options.
Expand Down Expand Up @@ -299,6 +306,18 @@ func (o *options) OnRuleSetUpdatedFn() OnRuleSetUpdatedFn {
return o.onRuleSetUpdatedFn
}

// SetRequireNamespaceWatchOnInit sets the flag to ensure matcher is initialized with a loaded namespace watch.
func (o *options) SetRequireNamespaceWatchOnInit(value bool) Options {
opts := *o
opts.requireNamespaceWatchOnInit = value
return &opts
}

// RequireNamespaceWatchOnInit returns the flag to ensure matcher is initialized with a loaded namespace watch.
func (o *options) RequireNamespaceWatchOnInit() bool {
return o.requireNamespaceWatchOnInit
}

func defaultRuleSetKeyFn(namespace []byte) string {
return fmt.Sprintf(defaultRuleSetKeyFormat, namespace)
}