From febcf303711968d58e9ac8655e93fcf76eca4ecf Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 17 Jun 2022 15:00:36 -0400 Subject: [PATCH 01/14] core: server config for plan rejection tracking --- command/agent/agent.go | 11 ++++ command/agent/agent_test.go | 68 +++++++++++++++++++++ command/agent/config.go | 52 ++++++++++++++++ command/agent/config_parse.go | 8 ++- command/agent/config_parse_test.go | 18 ++++++ command/agent/config_test.go | 8 +++ command/agent/testdata/basic.hcl | 5 ++ command/agent/testdata/basic.json | 4 ++ command/agent/testdata/sample0.json | 4 ++ command/agent/testdata/sample1/sample0.json | 4 ++ nomad/config.go | 10 +++ 11 files changed, 190 insertions(+), 2 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index cdfb2135f91..55c5b0af0ea 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -436,6 +436,17 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { return nil, fmt.Errorf("deploy_query_rate_limit must be greater than 0") } + // Set plan rejection tracker configuration. + if planRejectConf := agentConfig.Server.PlanRejectionTracker; planRejectConf != nil { + conf.NodePlanRejectionThreshold = planRejectConf.NodeThreshold + + if planRejectConf.NodeWindow == 0 { + return nil, fmt.Errorf("plan_rejection_tracker.node_window must be greater than 0") + } else { + conf.NodePlanRejectionWindow = planRejectConf.NodeWindow + } + } + // Add Enterprise license configs conf.LicenseEnv = agentConfig.Server.LicenseEnv conf.LicensePath = agentConfig.Server.LicensePath diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 6bea52f42d4..2611794b52e 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -365,6 +365,74 @@ func TestAgent_ServerConfig_Limits_OK(t *testing.T) { } } +func TestAgent_ServerConfig_PlanRejectionTracker(t *testing.T) { + ci.Parallel(t) + + cases := []struct { + name string + trackerConfig *PlanRejectionTracker + expectedConfig *PlanRejectionTracker + expectedErr string + }{ + { + name: "default", + trackerConfig: nil, + expectedConfig: &PlanRejectionTracker{ + NodeThreshold: 15, + NodeWindow: 10 * time.Minute, + }, + expectedErr: "", + }, + { + name: "valid config", + trackerConfig: &PlanRejectionTracker{ + NodeThreshold: 123, + NodeWindow: 17 * time.Minute, + }, + expectedConfig: &PlanRejectionTracker{ + NodeThreshold: 123, + NodeWindow: 17 * time.Minute, + }, + expectedErr: "", + }, + { + name: "invalid node window", + trackerConfig: &PlanRejectionTracker{ + NodeThreshold: 123, + }, + expectedErr: "node_window must be greater than 0", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + config := DevConfig(nil) + require.NoError(t, config.normalizeAddrs()) + + if tc.trackerConfig != nil { + config.Server.PlanRejectionTracker = tc.trackerConfig + } + + serverConfig, err := convertServerConfig(config) + + if tc.expectedErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedErr) + } else { + require.NoError(t, err) + require.Equal(t, + tc.expectedConfig.NodeThreshold, + serverConfig.NodePlanRejectionThreshold, + ) + require.Equal(t, + tc.expectedConfig.NodeWindow, + serverConfig.NodePlanRejectionWindow, + ) + } + }) + } +} + func TestAgent_ServerConfig_RaftMultiplier_Ok(t *testing.T) { ci.Parallel(t) diff --git a/command/agent/config.go b/command/agent/config.go index 50e5475922e..72f0f111452 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -503,6 +503,10 @@ type ServerConfig struct { // This value is ignored. DefaultSchedulerConfig *structs.SchedulerConfiguration `hcl:"default_scheduler_config"` + // PlanRejectionTracker configures the node plan rejection tracker that + // detects potentially bad nodes. + PlanRejectionTracker *PlanRejectionTracker `hcl:"plan_rejection_tracker"` + // EnableEventBroker configures whether this server's state store // will generate events for its event stream. EnableEventBroker *bool `hcl:"enable_event_broker"` @@ -548,6 +552,46 @@ type RaftBoltConfig struct { NoFreelistSync bool `hcl:"no_freelist_sync"` } +// PlanRejectionTracker is used in servers to configure the plan rejection +// tracker. +type PlanRejectionTracker struct { + // NodeThreshold is the number of times a node can have plan rejections + // before it is marked as ineligible. + NodeThreshold int `hcl:"node_threshold"` + + // NodeWindow is the time window used to track active plan rejections for + // nodes. + NodeWindow time.Duration + NodeWindowHCL string `hcl:"node_window" json:"-"` + + // ExtraKeysHCL is used by hcl to surface unexpected keys + ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` +} + +func (p *PlanRejectionTracker) Merge(b *PlanRejectionTracker) *PlanRejectionTracker { + if p == nil { + return b + } + + result := *p + + if b == nil { + return &result + } + + if b.NodeThreshold != 0 { + result.NodeThreshold = b.NodeThreshold + } + + if b.NodeWindow != 0 { + result.NodeWindow = b.NodeWindow + } + if b.NodeWindowHCL != "" { + result.NodeWindowHCL = b.NodeWindowHCL + } + return &result +} + // Search is used in servers to configure search API options. type Search struct { // FuzzyEnabled toggles whether the FuzzySearch API is enabled. If not @@ -985,6 +1029,10 @@ func DefaultConfig() *Config { EventBufferSize: helper.IntToPtr(100), RaftProtocol: 3, StartJoin: []string{}, + PlanRejectionTracker: &PlanRejectionTracker{ + NodeThreshold: 15, + NodeWindow: 10 * time.Minute, + }, ServerJoin: &ServerJoin{ RetryJoin: []string{}, RetryInterval: 30 * time.Second, @@ -1586,6 +1634,10 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig { result.EventBufferSize = b.EventBufferSize } + if b.PlanRejectionTracker != nil { + result.PlanRejectionTracker = result.PlanRejectionTracker.Merge(b.PlanRejectionTracker) + } + if b.DefaultSchedulerConfig != nil { c := *b.DefaultSchedulerConfig result.DefaultSchedulerConfig = &c diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 363db9e1e33..d5f5f8847d1 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -43,9 +43,12 @@ func ParseConfigFile(path string) (*Config, error) { VaultRetry: &client.RetryConfig{}, }, }, + Server: &ServerConfig{ + PlanRejectionTracker: &PlanRejectionTracker{}, + ServerJoin: &ServerJoin{}, + }, ACL: &ACLConfig{}, Audit: &config.AuditConfig{}, - Server: &ServerConfig{ServerJoin: &ServerJoin{}}, Consul: &config.ConsulConfig{}, Autopilot: &config.AutopilotConfig{}, Telemetry: &Telemetry{}, @@ -54,7 +57,7 @@ func ParseConfigFile(path string) (*Config, error) { err = hcl.Decode(c, buf.String()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to decode HCL file %s: %v", path, err) } // convert strings to time.Durations @@ -66,6 +69,7 @@ func ParseConfigFile(path string) (*Config, error) { {"server.heartbeat_grace", &c.Server.HeartbeatGrace, &c.Server.HeartbeatGraceHCL, nil}, {"server.min_heartbeat_ttl", &c.Server.MinHeartbeatTTL, &c.Server.MinHeartbeatTTLHCL, nil}, {"server.failover_heartbeat_ttl", &c.Server.FailoverHeartbeatTTL, &c.Server.FailoverHeartbeatTTLHCL, nil}, + {"server.plan_rejection_tracker.node_window", &c.Server.PlanRejectionTracker.NodeWindow, &c.Server.PlanRejectionTracker.NodeWindowHCL, nil}, {"server.retry_interval", &c.Server.RetryInterval, &c.Server.RetryIntervalHCL, nil}, {"server.server_join.retry_interval", &c.Server.ServerJoin.RetryInterval, &c.Server.ServerJoin.RetryIntervalHCL, nil}, {"consul.timeout", &c.Consul.Timeout, &c.Consul.TimeoutHCL, nil}, diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index d2567c7fce2..8a280195b03 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -126,6 +126,11 @@ var basicConfig = &Config{ EncryptKey: "abc", EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(200), + PlanRejectionTracker: &PlanRejectionTracker{ + NodeThreshold: 100, + NodeWindow: 41 * time.Minute, + NodeWindowHCL: "41m", + }, ServerJoin: &ServerJoin{ RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, RetryInterval: time.Duration(15) * time.Second, @@ -540,6 +545,9 @@ func (c *Config) addDefaults() { if c.Server.ServerJoin == nil { c.Server.ServerJoin = &ServerJoin{} } + if c.Server.PlanRejectionTracker == nil { + c.Server.PlanRejectionTracker = &PlanRejectionTracker{} + } } // Tests for a panic parsing json with an object of exactly @@ -617,6 +625,11 @@ var sample0 = &Config{ RetryJoin: []string{"10.0.0.101", "10.0.0.102", "10.0.0.103"}, EncryptKey: "sHck3WL6cxuhuY7Mso9BHA==", ServerJoin: &ServerJoin{}, + PlanRejectionTracker: &PlanRejectionTracker{ + NodeThreshold: 100, + NodeWindow: 31 * time.Minute, + NodeWindowHCL: "31m", + }, }, ACL: &ACLConfig{ Enabled: true, @@ -707,6 +720,11 @@ var sample1 = &Config{ RetryJoin: []string{"10.0.0.101", "10.0.0.102", "10.0.0.103"}, EncryptKey: "sHck3WL6cxuhuY7Mso9BHA==", ServerJoin: &ServerJoin{}, + PlanRejectionTracker: &PlanRejectionTracker{ + NodeThreshold: 100, + NodeWindow: 31 * time.Minute, + NodeWindowHCL: "31m", + }, }, ACL: &ACLConfig{ Enabled: true, diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 36e263a3134..88eb4fd13f8 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -148,6 +148,10 @@ func TestConfig_Merge(t *testing.T) { UpgradeVersion: "foo", EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(0), + PlanRejectionTracker: &PlanRejectionTracker{ + NodeThreshold: 100, + NodeWindow: 11 * time.Minute, + }, }, ACL: &ACLConfig{ Enabled: true, @@ -343,6 +347,10 @@ func TestConfig_Merge(t *testing.T) { UpgradeVersion: "bar", EnableEventBroker: helper.BoolToPtr(true), EventBufferSize: helper.IntToPtr(100), + PlanRejectionTracker: &PlanRejectionTracker{ + NodeThreshold: 100, + NodeWindow: 11 * time.Minute, + }, }, ACL: &ACLConfig{ Enabled: true, diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index 27754b95757..a49872d0f11 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -133,6 +133,11 @@ server { enable_event_broker = false event_buffer_size = 200 + plan_rejection_tracker { + node_threshold = 100 + node_window = "41m" + } + server_join { retry_join = ["1.1.1.1", "2.2.2.2"] retry_max = 3 diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index 406e314a960..4a767899d97 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -277,6 +277,10 @@ "node_gc_threshold": "12h", "non_voting_server": true, "num_schedulers": 2, + "plan_rejection_tracker": { + "node_threshold": 100, + "node_window": "41m" + }, "raft_protocol": 3, "raft_multiplier": 4, "redundancy_zone": "foo", diff --git a/command/agent/testdata/sample0.json b/command/agent/testdata/sample0.json index 1c450a32f20..a7cc60f4930 100644 --- a/command/agent/testdata/sample0.json +++ b/command/agent/testdata/sample0.json @@ -55,6 +55,10 @@ "bootstrap_expect": 3, "enabled": true, "encrypt": "sHck3WL6cxuhuY7Mso9BHA==", + "plan_rejection_tracker": { + "node_threshold": 100, + "node_window": "31m" + }, "retry_join": [ "10.0.0.101", "10.0.0.102", diff --git a/command/agent/testdata/sample1/sample0.json b/command/agent/testdata/sample1/sample0.json index 07796e913d9..a806ea9098a 100644 --- a/command/agent/testdata/sample1/sample0.json +++ b/command/agent/testdata/sample1/sample0.json @@ -20,6 +20,10 @@ "bootstrap_expect": 3, "enabled": true, "encrypt": "sHck3WL6cxuhuY7Mso9BHA==", + "plan_rejection_tracker": { + "node_threshold": 100, + "node_window": "31m" + }, "retry_join": [ "10.0.0.101", "10.0.0.102", diff --git a/nomad/config.go b/nomad/config.go index 6ea8cfd090d..40020532950 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -232,6 +232,14 @@ type Config struct { // additional delay is selected from this range randomly. EvalFailedFollowupDelayRange time.Duration + // NodePlanRejectionThreshold is the number of times a node must have a + // plan rejection before it is set as ineligible. + NodePlanRejectionThreshold int + + // NodePlanRejectionWindow is the time window used to track plan + // rejections for nodes. + NodePlanRejectionWindow time.Duration + // MinHeartbeatTTL is the minimum time between heartbeats. // This is used as a floor to prevent excessive updates. MinHeartbeatTTL time.Duration @@ -395,6 +403,8 @@ func DefaultConfig() *Config { MaxHeartbeatsPerSecond: 50.0, HeartbeatGrace: 10 * time.Second, FailoverHeartbeatTTL: 300 * time.Second, + NodePlanRejectionThreshold: 15, + NodePlanRejectionWindow: 10 * time.Minute, ConsulConfig: config.DefaultConsulConfig(), VaultConfig: config.DefaultVaultConfig(), RPCHoldTimeout: 5 * time.Second, From e343ca68f8d6fe1e0078ddca2bcebba0089ef429 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 17 Jun 2022 15:01:48 -0400 Subject: [PATCH 02/14] core: track and act on node plan rejections Plan rejections occur when the scheduler work and the leader plan applier disagree on the feasibility of a plan. This may happen for valid reasons: since Nomad does parallel scheduling, it is expected that different workers will have a different state when computing placements. As the final plan reaches the leader plan applier, it may no longer be valid due to a concurrent scheduling taking up intended resources. In these situations the plan applier will notify the worker that the plan was rejected and that they should refresh their state before trying again. In some rare and unexpected circumstances it has been observed that workers will repeatedly submit the same plan, even if they are always rejected. While the root cause is still unknown this mitigation has been put in place. The plan applier will now track the history of plan rejections per client and include in the plan result a list of node IDs that should be set as ineligible if the number of rejections in a given time window crosses a certain threshold. The window size and threshold value can be adjusted in the server configuration. --- nomad/plan_apply.go | 43 +++++++- nomad/plan_apply_node_tracker.go | 141 ++++++++++++++++++++++++++ nomad/plan_apply_node_tracker_test.go | 123 ++++++++++++++++++++++ nomad/server.go | 3 + nomad/state/state_store.go | 36 ++++++- nomad/structs/structs.go | 21 +++- 6 files changed, 359 insertions(+), 8 deletions(-) create mode 100644 nomad/plan_apply_node_tracker.go create mode 100644 nomad/plan_apply_node_tracker_test.go diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 7f1fffc54f8..27ed1dc33f8 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -25,20 +25,38 @@ type planner struct { // planQueue is used to manage the submitted allocation // plans that are waiting to be assessed by the leader planQueue *PlanQueue + + // badNodeTracker keeps a score for nodes that have plan rejections. + // Plan rejections are somewhat expected given Nomad's optimistic + // scheduling, but repeated rejections for the same node may indicate an + // undetected issue, so we need to track rejection history. + badNodeTracker *BadNodeTracker } // newPlanner returns a new planner to be used for managing allocation plans. func newPlanner(s *Server) (*planner, error) { + log := s.logger.Named("planner") + // Create a plan queue planQueue, err := NewPlanQueue() if err != nil { return nil, err } + // Create the bad node tracker. + size := 50 + badNodeTracker, err := NewBadNodeTracker(log, size, + s.config.NodePlanRejectionWindow, + s.config.NodePlanRejectionThreshold) + if err != nil { + return nil, err + } + return &planner{ - Server: s, - log: s.logger.Named("planner"), - planQueue: planQueue, + Server: s, + log: log, + planQueue: planQueue, + badNodeTracker: badNodeTracker, }, nil } @@ -144,6 +162,14 @@ func (p *planner) planApply() { continue } + // Check if any of the rejected nodes should be made ineligible. + for _, nodeID := range result.RejectedNodes { + p.badNodeTracker.Add(nodeID) + if p.badNodeTracker.IsBad(nodeID) { + result.IneligibleNodes = append(result.IneligibleNodes, nodeID) + } + } + // Fast-path the response if there is nothing to do if result.IsNoOp() { pending.respond(result, nil) @@ -209,6 +235,7 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap }, Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, + IneligibleNodes: result.IneligibleNodes, EvalID: plan.EvalID, } @@ -466,6 +493,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan // errors since we are processing in parallel. var mErr multierror.Error partialCommit := false + rejectedNodes := make(map[string]struct{}, 0) // handleResult is used to process the result of evaluateNodePlan handleResult := func(nodeID string, fit bool, reason string, err error) (cancel bool) { @@ -489,8 +517,11 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan "node_id", nodeID, "reason", reason, "eval_id", plan.EvalID, "namespace", plan.Job.Namespace) } - // Set that this is a partial commit + // Set that this is a partial commit and store the node that was + // rejected so the plan applier can detect repeated plan rejections + // for the same node. partialCommit = true + rejectedNodes[nodeID] = struct{}{} // If we require all-at-once scheduling, there is no point // to continue the evaluation, as we've already failed. @@ -595,6 +626,10 @@ OUTER: // placed but wasn't actually placed correctDeploymentCanaries(result) } + + for n := range rejectedNodes { + result.RejectedNodes = append(result.RejectedNodes, n) + } return result, mErr.ErrorOrNil() } diff --git a/nomad/plan_apply_node_tracker.go b/nomad/plan_apply_node_tracker.go new file mode 100644 index 00000000000..8877c2a01f3 --- /dev/null +++ b/nomad/plan_apply_node_tracker.go @@ -0,0 +1,141 @@ +package nomad + +import ( + "fmt" + "time" + + metrics "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + lru "github.com/hashicorp/golang-lru" + "github.com/hashicorp/nomad/helper" +) + +// BadNodeTracker keeps a record of nodes marked as bad by the plan applier. +// +// It takes a time window and a threshold value. Plan rejections for a node +// will be registered with its timestamp. If the number of rejections within +// the time window is greater than the threshold the node is reported as bad. +// +// The tracker uses a fixed size cache that evicts old entries based on access +// frequency and recency. +type BadNodeTracker struct { + logger hclog.Logger + cache *lru.TwoQueueCache + window time.Duration + threshold int +} + +// NewBadNodeTracker returns a new BadNodeTracker. +func NewBadNodeTracker(logger hclog.Logger, size int, window time.Duration, threshold int) (*BadNodeTracker, error) { + cache, err := lru.New2Q(size) + if err != nil { + return nil, fmt.Errorf("failed to create new bad node tracker: %v", err) + } + + return &BadNodeTracker{ + logger: logger.Named("bad_node_tracker"). + With("threshold", threshold). + With("window", window), + cache: cache, + window: window, + threshold: threshold, + }, nil +} + +// IsBad returns true if the node has more rejections than the threshold within +// the time window. +func (t *BadNodeTracker) IsBad(nodeID string) bool { + value, ok := t.cache.Get(nodeID) + if !ok { + return false + } + + stats := value.(*badNodeStats) + score := stats.score() + + t.logger.Debug("checking if node is bad", "node_id", nodeID, "score", score) + return score > t.threshold +} + +// Add records a new rejection for node. If it's the first time a node is added +// it will be included in the internal cache. If the cache is full the least +// recently updated or accessed node is evicted. +func (t *BadNodeTracker) Add(nodeID string) { + value, ok := t.cache.Get(nodeID) + if !ok { + value = newBadNodeStats(t.window) + t.cache.Add(nodeID, value) + } + + stats := value.(*badNodeStats) + score := stats.record() + t.logger.Debug("adding node plan rejection", "node_id", nodeID, "score", score) +} + +// EmitStats generates metrics for the bad nodes being currently tracked. Must +// be called in a goroutine. +func (t *BadNodeTracker) EmitStats(period time.Duration, stopCh <-chan struct{}) { + timer, stop := helper.NewSafeTimer(period) + defer stop() + + for { + timer.Reset(period) + + select { + case <-timer.C: + t.emitStats() + case <-stopCh: + return + } + } +} + +func (t *BadNodeTracker) emitStats() { + for _, k := range t.cache.Keys() { + value, _ := t.cache.Get(k) + stats := value.(*badNodeStats) + score := stats.score() + + labels := []metrics.Label{ + {Name: "node_id", Value: k.(string)}, + } + metrics.SetGaugeWithLabels([]string{"nomad", "plan", "rejection_tracker", "node_score"}, float32(score), labels) + } +} + +// badNodeStats represents a node being tracked by BadNodeTracker. +type badNodeStats struct { + history []time.Time + window time.Duration +} + +// newBadNodeStats returns an empty badNodeStats. +func newBadNodeStats(window time.Duration) *badNodeStats { + return &badNodeStats{ + window: window, + } +} + +// score returns the number of rejections within the past time window. +func (s *badNodeStats) score() int { + count := 0 + windowStart := time.Now().Add(-s.window) + + for i := len(s.history) - 1; i >= 0; i-- { + ts := s.history[i] + if ts.Before(windowStart) { + // Since we start from the end of the history list, anything past + // this point will have happened before the time window. + break + } + count += 1 + } + return count +} + +// record adds a new entry to the stats history and returns the new score. +func (s *badNodeStats) record() int { + now := time.Now() + s.history = append(s.history, now) + return s.score() +} diff --git a/nomad/plan_apply_node_tracker_test.go b/nomad/plan_apply_node_tracker_test.go new file mode 100644 index 00000000000..2e1202a4374 --- /dev/null +++ b/nomad/plan_apply_node_tracker_test.go @@ -0,0 +1,123 @@ +package nomad + +import ( + "fmt" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +func TestBadNodeTracker(t *testing.T) { + ci.Parallel(t) + + cacheSize := 3 + tracker, err := NewBadNodeTracker( + hclog.NewNullLogger(), cacheSize, time.Second, 10) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + tracker.Add(fmt.Sprintf("node-%d", i+1)) + } + + require.Equal(t, cacheSize, tracker.cache.Len()) + + // Only track the most recent values. + expected := []string{"node-8", "node-9", "node-10"} + require.ElementsMatch(t, expected, tracker.cache.Keys()) +} + +func TestBadNodeTracker_IsBad(t *testing.T) { + ci.Parallel(t) + + window := time.Duration(testutil.TestMultiplier()) * time.Second + tracker, err := NewBadNodeTracker(hclog.NewNullLogger(), 3, window, 4) + require.NoError(t, err) + + // Populate cache. + tracker.Add("node-1") + + tracker.Add("node-2") + tracker.Add("node-2") + + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + + testCases := []struct { + name string + nodeID string + bad bool + }{ + { + name: "node-1 is not bad", + nodeID: "node-1", + bad: false, + }, + { + name: "node-3 is bad", + nodeID: "node-3", + bad: true, + }, + { + name: "node not tracked is not bad", + nodeID: "node-1000", + bad: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + got := tracker.IsBad(tc.nodeID) + require.Equal(t, tc.bad, got) + }) + } + + t.Run("cache expires", func(t *testing.T) { + time.Sleep(window) + require.False(t, tracker.IsBad("node-1")) + require.False(t, tracker.IsBad("node-2")) + require.False(t, tracker.IsBad("node-3")) + }) + + t.Run("IsBad updates cache", func(t *testing.T) { + // Don't access node-3 so it should be evicted when a new value is + // added and the tracker size overflows. + tracker.IsBad("node-1") + tracker.IsBad("node-2") + tracker.Add("node-4") + + expected := []string{"node-1", "node-2", "node-4"} + require.ElementsMatch(t, expected, tracker.cache.Keys()) + }) +} + +func TestBadNodeStats_score(t *testing.T) { + ci.Parallel(t) + + window := time.Duration(testutil.TestMultiplier()) * time.Second + stats := newBadNodeStats(window) + + require.Equal(t, 0, stats.score()) + + stats.record() + stats.record() + stats.record() + require.Equal(t, 3, stats.score()) + + time.Sleep(window / 2) + stats.record() + require.Equal(t, 4, stats.score()) + + time.Sleep(window / 2) + require.Equal(t, 1, stats.score()) + + time.Sleep(window / 2) + require.Equal(t, 0, stats.score()) +} diff --git a/nomad/server.go b/nomad/server.go index 9fb64d0a50f..8dee94967eb 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -446,6 +446,9 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr // Emit metrics for the plan queue go s.planQueue.EmitStats(time.Second, s.shutdownCh) + // Emit metrics for the planner's bad node tracker. + go s.planner.badNodeTracker.EmitStats(time.Second, s.shutdownCh) + // Emit metrics for the blocked eval tracker. go s.blockedEvals.EmitStats(time.Second, s.shutdownCh) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6c5fabdda05..910e2d695d9 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -34,6 +34,17 @@ const ( ) const ( + // NodeEligibilityEventPlanRejectThreshold is the message used when the node + // is set to ineligible due to multiple plan failures. + // This is a preventive measure to signal scheduler workers to not consider + // the node for future placements. + // Plan rejections for a node are expected due to the optimistic and + // concurrent nature of the scheduling process, but repeated failures for + // the same node may indicate an underlying issue not detected by Nomad. + // The plan applier keeps track of plan rejection history and will mark + // nodes as ineligible if they cross a given threshold. + NodeEligibilityEventPlanRejectThreshold = "Node marked as ineligible for scheduling due to multiple plan rejections" + // NodeRegisterEventRegistered is the message used when the node becomes // registered. NodeRegisterEventRegistered = "Node registered" @@ -359,6 +370,22 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() + // Mark nodes as ineligible. + now := time.Now().Unix() + for _, nodeID := range results.IneligibleNodes { + s.logger.Warn("marking node as ineligible due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information", "node_id", nodeID) + + nodeEvent := structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemScheduler). + SetMessage(NodeEligibilityEventPlanRejectThreshold) + + err := s.updateNodeEligibilityImpl(index, nodeID, + structs.NodeSchedulingIneligible, now, nodeEvent, txn) + if err != nil { + return err + } + } + // Upsert the newly created or updated deployment if results.Deployment != nil { if err := s.upsertDeploymentImpl(index, results.Deployment, txn); err != nil { @@ -1136,10 +1163,15 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string, // UpdateNodeEligibility is used to update the scheduling eligibility of a node func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() + if err := s.updateNodeEligibilityImpl(index, nodeID, eligibility, updatedAt, event, txn); err != nil { + return err + } + return txn.Commit() +} +func (s *StateStore) updateNodeEligibilityImpl(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent, txn *txn) error { // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -1176,7 +1208,7 @@ func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index ui return fmt.Errorf("index update failed: %v", err) } - return txn.Commit() + return nil } // UpsertNodeEvents adds the node events to the nodes, rotating events as diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b376ebfaa14..179b54a9eed 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -913,6 +913,11 @@ type ApplyPlanResultsRequest struct { // PreemptionEvals is a slice of follow up evals for jobs whose allocations // have been preempted to place allocs in this plan PreemptionEvals []*Evaluation + + // IneligibleNodes are nodes the plan applier has repeatedly rejected + // placements for and should therefore be considered ineligible by workers + // to avoid retrying them repeatedly. + IneligibleNodes []string } // AllocUpdateRequest is used to submit changes to allocations, either @@ -1632,6 +1637,7 @@ const ( NodeEventSubsystemDriver = "Driver" NodeEventSubsystemHeartbeat = "Heartbeat" NodeEventSubsystemCluster = "Cluster" + NodeEventSubsystemScheduler = "Scheduler" NodeEventSubsystemStorage = "Storage" ) @@ -11390,6 +11396,16 @@ type PlanResult struct { // as stopped. NodePreemptions map[string][]*Allocation + // RejectedNodes are nodes the scheduler worker has rejected placements for + // and should be considered for ineligibility by the plan applier to avoid + // retrying them repeatedly. + RejectedNodes []string + + // IneligibleNodes are nodes the plan applier has repeatedly rejected + // placements for and should therefore be considered ineligible by workers + // to avoid retrying them repeatedly. + IneligibleNodes []string + // RefreshIndex is the index the worker should refresh state up to. // This allows all evictions and allocations to be materialized. // If any allocations were rejected due to stale data (node state, @@ -11403,8 +11419,9 @@ type PlanResult struct { // IsNoOp checks if this plan result would do nothing func (p *PlanResult) IsNoOp() bool { - return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0 && - len(p.DeploymentUpdates) == 0 && p.Deployment == nil + return len(p.IneligibleNodes) == 0 && len(p.NodeUpdate) == 0 && + len(p.NodeAllocation) == 0 && len(p.DeploymentUpdates) == 0 && + p.Deployment == nil } // FullCommit is used to check if all the allocations in a plan From 5beb6a10f6a04c3f824648f6ec4c51b1ccee27d3 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Sat, 18 Jun 2022 00:36:58 -0400 Subject: [PATCH 03/14] docs: document plan rejection tracking --- website/content/docs/configuration/server.mdx | 27 +++++++++++++++++++ .../docs/operations/metrics-reference.mdx | 3 +-- .../docs/operations/monitoring-nomad.mdx | 22 ++++++++++++++- 3 files changed, 49 insertions(+), 3 deletions(-) diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index 7872913a064..f9e107b17b0 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -156,6 +156,10 @@ server { disallow this server from making any scheduling decisions. This defaults to the number of CPU cores. +- `plan_rejection_tracker` ([PlanRejectionTracker](#plan_rejection_tracker-parameters)) - + Configuration for the plan rejection tracker that the Nomad leader uses to + track the history of plan rejections. + - `raft_boltdb` - This is a nested object that allows configuring options for Raft's BoltDB based log store. - `no_freelist_sync` - Setting this to `true` will disable syncing the BoltDB @@ -238,6 +242,28 @@ server { section for more information on the format of the string. This field is deprecated in favor of the [server_join stanza][server-join]. +### `plan_rejection_tracker` Parameters + +The leader plan rejection tracker can be adjusted to prevent evaluations from +getting stuck due to always being scheduled to a client that may have an +unexpected and undetected issues. Refer to [Monitoring +Nomad][monitoring_nomad_progress] for more details. + +- `node_threshold` `(int: 15)` - The number of plan rejections for a node + within the `node_window` to trigger a client to be set as ineligible. + +- `node_window` `(int: "10m")` - The time window for when plan rejections for a + node should be considered. + +If you observe too many false positives (clients being marked as ineligible +even if they don't present any problem) you may want to increase +`node_threshold`. + +Or if you are noticing jobs not being scheduled due to plan rejections for the +same `node_id` and the client is not being set as ineligible you can try +increasing the `node_window` so more historical rejections are taken into +account. + ## `server` Examples ### Common Setup @@ -331,5 +357,6 @@ server { [update-scheduler-config]: /api-docs/operator/scheduler#update-scheduler-configuration 'Scheduler Config' [bootstrapping a cluster]: /docs/faq#bootstrapping [rfc4648]: https://tools.ietf.org/html/rfc4648#section-5 +[monitoring_nomad_progress]: /docs/operations/monitoring-nomad#progress [`nomad operator keygen`]: /docs/commands/operator/keygen [search]: /docs/configuration/search diff --git a/website/content/docs/operations/metrics-reference.mdx b/website/content/docs/operations/metrics-reference.mdx index edc1286a857..8c29e159215 100644 --- a/website/content/docs/operations/metrics-reference.mdx +++ b/website/content/docs/operations/metrics-reference.mdx @@ -394,6 +394,7 @@ those listed in [Key Metrics](#key-metrics) above. | `nomad.nomad.plan.apply` | Time elapsed to apply a plan | Nanoseconds | Summary | host | | `nomad.nomad.plan.evaluate` | Time elapsed to evaluate a plan | Nanoseconds | Summary | host | | `nomad.nomad.plan.node_rejected` | Number of times a node has had a plan rejected | Integer | Counter | host, node_id | +| `nomad.nomad.plan.rejection_tracker.node_score` | Number of times a node has had a plan rejected within the tracker window | Integer | Gauge | host, node_id | | `nomad.nomad.plan.queue_depth` | Count of evals in the plan queue | Integer | Gauge | host | | `nomad.nomad.plan.submit` | Time elapsed for `Plan.Submit` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.plan.wait_for_index` | Time elapsed that planner waits for the raft index of the plan to be processed | Nanoseconds | Summary | host | @@ -481,5 +482,3 @@ Raft database metrics are emitted by the `raft-boltdb` library. [tagged-metrics]: /docs/telemetry/metrics#tagged-metrics [s_port_plan_failure]: /s/port-plan-failure - - diff --git a/website/content/docs/operations/monitoring-nomad.mdx b/website/content/docs/operations/monitoring-nomad.mdx index 0dccbbefd49..dc053d7defb 100644 --- a/website/content/docs/operations/monitoring-nomad.mdx +++ b/website/content/docs/operations/monitoring-nomad.mdx @@ -149,10 +149,29 @@ While it is possible for these log lines to occur infrequently due to normal cluster conditions, they should not appear repeatedly and prevent the job from eventually running (look up the evaluation ID logged to find the job). -If this log *does* appear repeatedly with the same `node_id` referenced, try +Nomad tracks the history of plan rejections per client and will mark it as +ineligible for scheduling if the number of rejections goes above a given +threshold within a time window. When this happens, the following node event is +registered: + +``` +Node marked as ineligible for scheduling due to multiple plan rejections +``` + +Along with the log line: + +``` +[WARN] nomad.state_store: marking node as ineligible due to multiple plan rejections: node_id=67af2541-5e96-6f54-9095-11089d627626 +``` + +If a client is marked as ineligible due to repeated plan rejections, try [draining] the node and shutting it down. Misconfigurations not caught by validation can cause nodes to enter this state: [#11830][gh-11830]. +If the `plan for node rejected` log *does* appear repeatedly with the same +`node_id` referenced but the client is not being set as ineligible you can try +adjusting the [`plan_rejection_tracker`] configuration of servers. + ### Performance The following metrics allow observing changes in throughput at the various @@ -278,6 +297,7 @@ latency and packet loss for the [Serf] address. [metric-types]: /docs/telemetry/metrics#metric-types [metrics-api-endpoint]: /api-docs/metrics [prometheus-telem]: /docs/configuration/telemetry#prometheus +[`plan_rejection_tracker`]: /docs/configuration/server#plan_rejection_tracker [serf]: /docs/configuration#serf-1 [statsd-exporter]: https://github.com/prometheus/statsd_exporter [statsd-telem]: /docs/configuration/telemetry#statsd From 2db5edc84b773cb6e562c3750825f1505414b6cf Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Sat, 18 Jun 2022 01:01:45 -0400 Subject: [PATCH 04/14] changelog: addn entry for #13421 --- .changelog/13421.txt | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changelog/13421.txt diff --git a/.changelog/13421.txt b/.changelog/13421.txt new file mode 100644 index 00000000000..d37bb2d58a2 --- /dev/null +++ b/.changelog/13421.txt @@ -0,0 +1,7 @@ +```release-note:improvement +core: automatically mark clients with recurring plan rejections as ineligible +``` + +```release-note:improvement +metrics: emit `nomad.nomad.plan.rejection_tracker.node_score` metric for the number of times a node had a plan rejection within the past time window +``` From ff15de9acca8b80047a45e5dfcb7e495e1fcf794 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 6 Jul 2022 22:28:04 -0400 Subject: [PATCH 05/14] apply code review suggestions --- command/agent/config_parse.go | 2 +- website/content/docs/configuration/server.mdx | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index d5f5f8847d1..04135f78324 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -57,7 +57,7 @@ func ParseConfigFile(path string) (*Config, error) { err = hcl.Decode(c, buf.String()) if err != nil { - return nil, fmt.Errorf("failed to decode HCL file %s: %v", path, err) + return nil, fmt.Errorf("failed to decode HCL file %s: %w", path, err) } // convert strings to time.Durations diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index f9e107b17b0..35ae39dcaa5 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -246,8 +246,8 @@ server { The leader plan rejection tracker can be adjusted to prevent evaluations from getting stuck due to always being scheduled to a client that may have an -unexpected and undetected issues. Refer to [Monitoring -Nomad][monitoring_nomad_progress] for more details. +unexpected issue. Refer to [Monitoring Nomad][monitoring_nomad_progress] for +more details. - `node_threshold` `(int: 15)` - The number of plan rejections for a node within the `node_window` to trigger a client to be set as ineligible. From 67f42b873c8a34d77243ec7fca40977f798ea807 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Wed, 6 Jul 2022 22:30:14 -0400 Subject: [PATCH 06/14] allow enabling the node rejection tracker and limit the rate nodes are marked ineligible --- command/agent/agent.go | 1 + command/agent/agent_test.go | 6 ++ command/agent/config.go | 11 +- command/agent/config_parse_test.go | 1 + command/agent/config_test.go | 2 + command/agent/testdata/basic.hcl | 1 + command/agent/testdata/basic.json | 1 + nomad/config.go | 4 + nomad/plan_apply.go | 21 ++-- nomad/plan_apply_node_tracker.go | 102 +++++++++++++----- nomad/plan_apply_node_tracker_test.go | 46 ++++++-- website/content/docs/configuration/server.mdx | 6 +- 12 files changed, 157 insertions(+), 45 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 55c5b0af0ea..04f0955a4f0 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -438,6 +438,7 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { // Set plan rejection tracker configuration. if planRejectConf := agentConfig.Server.PlanRejectionTracker; planRejectConf != nil { + conf.NodePlanRejectionEnabled = planRejectConf.Enabled conf.NodePlanRejectionThreshold = planRejectConf.NodeThreshold if planRejectConf.NodeWindow == 0 { diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 2611794b52e..3f4cd1e2dbd 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -386,10 +386,12 @@ func TestAgent_ServerConfig_PlanRejectionTracker(t *testing.T) { { name: "valid config", trackerConfig: &PlanRejectionTracker{ + Enabled: true, NodeThreshold: 123, NodeWindow: 17 * time.Minute, }, expectedConfig: &PlanRejectionTracker{ + Enabled: true, NodeThreshold: 123, NodeWindow: 17 * time.Minute, }, @@ -420,6 +422,10 @@ func TestAgent_ServerConfig_PlanRejectionTracker(t *testing.T) { require.Contains(t, err.Error(), tc.expectedErr) } else { require.NoError(t, err) + require.Equal(t, + tc.expectedConfig.Enabled, + serverConfig.NodePlanRejectionEnabled, + ) require.Equal(t, tc.expectedConfig.NodeThreshold, serverConfig.NodePlanRejectionThreshold, diff --git a/command/agent/config.go b/command/agent/config.go index 72f0f111452..816f0ab4bde 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -555,6 +555,9 @@ type RaftBoltConfig struct { // PlanRejectionTracker is used in servers to configure the plan rejection // tracker. type PlanRejectionTracker struct { + // Enabled controls if the plan rejection tracker is active or not. + Enabled bool `hcl:"enabled"` + // NodeThreshold is the number of times a node can have plan rejections // before it is marked as ineligible. NodeThreshold int `hcl:"node_threshold"` @@ -579,6 +582,10 @@ func (p *PlanRejectionTracker) Merge(b *PlanRejectionTracker) *PlanRejectionTrac return &result } + if b.Enabled { + result.Enabled = true + } + if b.NodeThreshold != 0 { result.NodeThreshold = b.NodeThreshold } @@ -1030,8 +1037,8 @@ func DefaultConfig() *Config { RaftProtocol: 3, StartJoin: []string{}, PlanRejectionTracker: &PlanRejectionTracker{ - NodeThreshold: 15, - NodeWindow: 10 * time.Minute, + NodeThreshold: 100, + NodeWindow: 5 * time.Minute, }, ServerJoin: &ServerJoin{ RetryJoin: []string{}, diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 8a280195b03..c203178a49c 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -127,6 +127,7 @@ var basicConfig = &Config{ EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(200), PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: true, NodeThreshold: 100, NodeWindow: 41 * time.Minute, NodeWindowHCL: "41m", diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 88eb4fd13f8..6710ac5c03f 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -149,6 +149,7 @@ func TestConfig_Merge(t *testing.T) { EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(0), PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: true, NodeThreshold: 100, NodeWindow: 11 * time.Minute, }, @@ -348,6 +349,7 @@ func TestConfig_Merge(t *testing.T) { EnableEventBroker: helper.BoolToPtr(true), EventBufferSize: helper.IntToPtr(100), PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: true, NodeThreshold: 100, NodeWindow: 11 * time.Minute, }, diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index a49872d0f11..4c2119302e1 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -134,6 +134,7 @@ server { event_buffer_size = 200 plan_rejection_tracker { + enabled = true node_threshold = 100 node_window = "41m" } diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index 4a767899d97..15ba4328fa0 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -278,6 +278,7 @@ "non_voting_server": true, "num_schedulers": 2, "plan_rejection_tracker": { + "enabled": true, "node_threshold": 100, "node_window": "41m" }, diff --git a/nomad/config.go b/nomad/config.go index 40020532950..78b928e9aa3 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -232,6 +232,9 @@ type Config struct { // additional delay is selected from this range randomly. EvalFailedFollowupDelayRange time.Duration + // NodePlanRejectionEnabled controls if node rejection tracker is enabled. + NodePlanRejectionEnabled bool + // NodePlanRejectionThreshold is the number of times a node must have a // plan rejection before it is set as ineligible. NodePlanRejectionThreshold int @@ -403,6 +406,7 @@ func DefaultConfig() *Config { MaxHeartbeatsPerSecond: 50.0, HeartbeatGrace: 10 * time.Second, FailoverHeartbeatTTL: 300 * time.Second, + NodePlanRejectionEnabled: false, NodePlanRejectionThreshold: 15, NodePlanRejectionWindow: 10 * time.Minute, ConsulConfig: config.DefaultConsulConfig(), diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 27ed1dc33f8..0716247dd30 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -30,7 +30,7 @@ type planner struct { // Plan rejections are somewhat expected given Nomad's optimistic // scheduling, but repeated rejections for the same node may indicate an // undetected issue, so we need to track rejection history. - badNodeTracker *BadNodeTracker + badNodeTracker BadNodeTracker } // newPlanner returns a new planner to be used for managing allocation plans. @@ -44,12 +44,19 @@ func newPlanner(s *Server) (*planner, error) { } // Create the bad node tracker. - size := 50 - badNodeTracker, err := NewBadNodeTracker(log, size, - s.config.NodePlanRejectionWindow, - s.config.NodePlanRejectionThreshold) - if err != nil { - return nil, err + var badNodeTracker BadNodeTracker + if s.config.NodePlanRejectionEnabled { + config := DefaultCachedBadNodeTrackerConfig() + + config.Window = s.config.NodePlanRejectionWindow + config.Threshold = s.config.NodePlanRejectionThreshold + + badNodeTracker, err = NewCachedBadNodeTracker(log, config) + if err != nil { + return nil, err + } + } else { + badNodeTracker = &NoopBadNodeTracker{} } return &planner{ diff --git a/nomad/plan_apply_node_tracker.go b/nomad/plan_apply_node_tracker.go index 8877c2a01f3..80092bd34fb 100644 --- a/nomad/plan_apply_node_tracker.go +++ b/nomad/plan_apply_node_tracker.go @@ -8,9 +8,27 @@ import ( "github.com/hashicorp/go-hclog" lru "github.com/hashicorp/golang-lru" "github.com/hashicorp/nomad/helper" + "golang.org/x/time/rate" ) -// BadNodeTracker keeps a record of nodes marked as bad by the plan applier. +type BadNodeTracker interface { + IsBad(string) bool + Add(string) + EmitStats(time.Duration, <-chan struct{}) +} + +// NoopBadNodeTracker is a no-op implementation of bad node tracker that is +// used when tracking is disabled. +type NoopBadNodeTracker struct{} + +func (n *NoopBadNodeTracker) Add(string) {} +func (n *NoopBadNodeTracker) EmitStats(time.Duration, <-chan struct{}) {} +func (n *NoopBadNodeTracker) IsBad(string) bool { + return false +} + +// CachedBadNodeTracker keeps a record of nodes marked as bad by the plan +// applier in a LRU cache. // // It takes a time window and a threshold value. Plan rejections for a node // will be registered with its timestamp. If the number of rejections within @@ -18,63 +36,93 @@ import ( // // The tracker uses a fixed size cache that evicts old entries based on access // frequency and recency. -type BadNodeTracker struct { +type CachedBadNodeTracker struct { logger hclog.Logger cache *lru.TwoQueueCache + limiter *rate.Limiter window time.Duration threshold int } -// NewBadNodeTracker returns a new BadNodeTracker. -func NewBadNodeTracker(logger hclog.Logger, size int, window time.Duration, threshold int) (*BadNodeTracker, error) { - cache, err := lru.New2Q(size) +type CachedBadNodeTrackerConfig struct { + CacheSize int + RateLimit float64 + BurstSize int + Window time.Duration + Threshold int +} + +func DefaultCachedBadNodeTrackerConfig() CachedBadNodeTrackerConfig { + return CachedBadNodeTrackerConfig{ + CacheSize: 50, + + // Limit marking 5 nodes per 30min as ineligible with an initial + // burst of 10 nodes. + RateLimit: 5 / (30 * 60), + BurstSize: 10, + + // Consider a node as bad if it is added more than 100 times in a 5min + // window period. + Window: 5 * time.Minute, + Threshold: 100, + } +} + +// NewCachedBadNodeTracker returns a new CachedBadNodeTracker. +func NewCachedBadNodeTracker(logger hclog.Logger, config CachedBadNodeTrackerConfig) (*CachedBadNodeTracker, error) { + log := logger.Named("bad_node_tracker"). + With("threshold", config.Threshold). + With("window", config.Window) + + cache, err := lru.New2Q(config.CacheSize) if err != nil { return nil, fmt.Errorf("failed to create new bad node tracker: %v", err) } - return &BadNodeTracker{ - logger: logger.Named("bad_node_tracker"). - With("threshold", threshold). - With("window", window), + return &CachedBadNodeTracker{ + logger: log, cache: cache, - window: window, - threshold: threshold, + limiter: rate.NewLimiter(rate.Limit(config.RateLimit), config.BurstSize), + window: config.Window, + threshold: config.Threshold, }, nil } // IsBad returns true if the node has more rejections than the threshold within // the time window. -func (t *BadNodeTracker) IsBad(nodeID string) bool { - value, ok := t.cache.Get(nodeID) - if !ok { +func (c *CachedBadNodeTracker) IsBad(nodeID string) bool { + // Limit the number of nodes we report as bad to avoid mass assigning nodes + // as ineligible, but still call Get to keep the cache entry fresh. + value, ok := c.cache.Get(nodeID) + if !ok || !c.limiter.Allow() { return false } stats := value.(*badNodeStats) score := stats.score() - t.logger.Debug("checking if node is bad", "node_id", nodeID, "score", score) - return score > t.threshold + c.logger.Debug("checking if node is bad", "node_id", nodeID, "score", score) + return score > c.threshold } // Add records a new rejection for node. If it's the first time a node is added // it will be included in the internal cache. If the cache is full the least // recently updated or accessed node is evicted. -func (t *BadNodeTracker) Add(nodeID string) { - value, ok := t.cache.Get(nodeID) +func (c *CachedBadNodeTracker) Add(nodeID string) { + value, ok := c.cache.Get(nodeID) if !ok { - value = newBadNodeStats(t.window) - t.cache.Add(nodeID, value) + value = newBadNodeStats(c.window) + c.cache.Add(nodeID, value) } stats := value.(*badNodeStats) score := stats.record() - t.logger.Debug("adding node plan rejection", "node_id", nodeID, "score", score) + c.logger.Debug("adding node plan rejection", "node_id", nodeID, "score", score) } // EmitStats generates metrics for the bad nodes being currently tracked. Must // be called in a goroutine. -func (t *BadNodeTracker) EmitStats(period time.Duration, stopCh <-chan struct{}) { +func (c *CachedBadNodeTracker) EmitStats(period time.Duration, stopCh <-chan struct{}) { timer, stop := helper.NewSafeTimer(period) defer stop() @@ -83,16 +131,16 @@ func (t *BadNodeTracker) EmitStats(period time.Duration, stopCh <-chan struct{}) select { case <-timer.C: - t.emitStats() + c.emitStats() case <-stopCh: return } } } -func (t *BadNodeTracker) emitStats() { - for _, k := range t.cache.Keys() { - value, _ := t.cache.Get(k) +func (c *CachedBadNodeTracker) emitStats() { + for _, k := range c.cache.Keys() { + value, _ := c.cache.Get(k) stats := value.(*badNodeStats) score := stats.score() @@ -103,7 +151,7 @@ func (t *BadNodeTracker) emitStats() { } } -// badNodeStats represents a node being tracked by BadNodeTracker. +// badNodeStats represents a node being tracked by a BadNodeTracker. type badNodeStats struct { history []time.Time window time.Duration diff --git a/nomad/plan_apply_node_tracker_test.go b/nomad/plan_apply_node_tracker_test.go index 2e1202a4374..76205c19d19 100644 --- a/nomad/plan_apply_node_tracker_test.go +++ b/nomad/plan_apply_node_tracker_test.go @@ -14,16 +14,17 @@ import ( func TestBadNodeTracker(t *testing.T) { ci.Parallel(t) - cacheSize := 3 - tracker, err := NewBadNodeTracker( - hclog.NewNullLogger(), cacheSize, time.Second, 10) + config := DefaultCachedBadNodeTrackerConfig() + config.CacheSize = 3 + + tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config) require.NoError(t, err) for i := 0; i < 10; i++ { tracker.Add(fmt.Sprintf("node-%d", i+1)) } - require.Equal(t, cacheSize, tracker.cache.Len()) + require.Equal(t, config.CacheSize, tracker.cache.Len()) // Only track the most recent values. expected := []string{"node-8", "node-9", "node-10"} @@ -33,8 +34,12 @@ func TestBadNodeTracker(t *testing.T) { func TestBadNodeTracker_IsBad(t *testing.T) { ci.Parallel(t) - window := time.Duration(testutil.TestMultiplier()) * time.Second - tracker, err := NewBadNodeTracker(hclog.NewNullLogger(), 3, window, 4) + config := DefaultCachedBadNodeTrackerConfig() + config.CacheSize = 3 + config.Window = time.Duration(testutil.TestMultiplier()) * time.Second + config.Threshold = 4 + + tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config) require.NoError(t, err) // Populate cache. @@ -80,7 +85,7 @@ func TestBadNodeTracker_IsBad(t *testing.T) { } t.Run("cache expires", func(t *testing.T) { - time.Sleep(window) + time.Sleep(config.Window) require.False(t, tracker.IsBad("node-1")) require.False(t, tracker.IsBad("node-2")) require.False(t, tracker.IsBad("node-3")) @@ -98,6 +103,33 @@ func TestBadNodeTracker_IsBad(t *testing.T) { }) } +func TestBadNodeTracker_RateLimit(t *testing.T) { + config := DefaultCachedBadNodeTrackerConfig() + config.Threshold = 3 + config.RateLimit = float64(testutil.TestMultiplier()) + config.BurstSize = 3 + + tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config) + require.NoError(t, err) + + tracker.Add("node-1") + tracker.Add("node-1") + tracker.Add("node-1") + tracker.Add("node-1") + tracker.Add("node-1") + + // Burst allows for max 3 operations. + require.True(t, tracker.IsBad("node-1")) + require.True(t, tracker.IsBad("node-1")) + require.True(t, tracker.IsBad("node-1")) + require.False(t, tracker.IsBad("node-1")) + + // Wait for a new token. + time.Sleep(time.Duration(testutil.TestMultiplier()) * time.Second) + require.True(t, tracker.IsBad("node-1")) + require.False(t, tracker.IsBad("node-1")) +} + func TestBadNodeStats_score(t *testing.T) { ci.Parallel(t) diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index 35ae39dcaa5..538a8449a4f 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -249,10 +249,12 @@ getting stuck due to always being scheduled to a client that may have an unexpected issue. Refer to [Monitoring Nomad][monitoring_nomad_progress] for more details. -- `node_threshold` `(int: 15)` - The number of plan rejections for a node +- `enabled` `(bool: false)` - Specifies if plan rejections should be tracked. + +- `node_threshold` `(int: 100)` - The number of plan rejections for a node within the `node_window` to trigger a client to be set as ineligible. -- `node_window` `(int: "10m")` - The time window for when plan rejections for a +- `node_window` `(int: "5m")` - The time window for when plan rejections for a node should be considered. If you observe too many false positives (clients being marked as ineligible From 98ad0d747eeac4f67c49a3231eae545580c0c2ab Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 7 Jul 2022 11:55:22 -0400 Subject: [PATCH 07/14] fix tests --- command/agent/agent_test.go | 4 ++-- nomad/plan_apply_node_tracker_test.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 3f4cd1e2dbd..7b75766ab80 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -378,8 +378,8 @@ func TestAgent_ServerConfig_PlanRejectionTracker(t *testing.T) { name: "default", trackerConfig: nil, expectedConfig: &PlanRejectionTracker{ - NodeThreshold: 15, - NodeWindow: 10 * time.Minute, + NodeThreshold: 100, + NodeWindow: 5 * time.Minute, }, expectedErr: "", }, diff --git a/nomad/plan_apply_node_tracker_test.go b/nomad/plan_apply_node_tracker_test.go index 76205c19d19..12b32953ebb 100644 --- a/nomad/plan_apply_node_tracker_test.go +++ b/nomad/plan_apply_node_tracker_test.go @@ -106,7 +106,7 @@ func TestBadNodeTracker_IsBad(t *testing.T) { func TestBadNodeTracker_RateLimit(t *testing.T) { config := DefaultCachedBadNodeTrackerConfig() config.Threshold = 3 - config.RateLimit = float64(testutil.TestMultiplier()) + config.RateLimit = float64(1) // Get a new token every second. config.BurstSize = 3 tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config) @@ -125,7 +125,7 @@ func TestBadNodeTracker_RateLimit(t *testing.T) { require.False(t, tracker.IsBad("node-1")) // Wait for a new token. - time.Sleep(time.Duration(testutil.TestMultiplier()) * time.Second) + time.Sleep(time.Second) require.True(t, tracker.IsBad("node-1")) require.False(t, tracker.IsBad("node-1")) } From 620b41311827100f3ed4b8386edba6d800f2d712 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Thu, 7 Jul 2022 17:39:23 -0400 Subject: [PATCH 08/14] fix rate limit division and add more logs --- nomad/plan_apply_node_tracker.go | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) diff --git a/nomad/plan_apply_node_tracker.go b/nomad/plan_apply_node_tracker.go index 80092bd34fb..2517e4ad129 100644 --- a/nomad/plan_apply_node_tracker.go +++ b/nomad/plan_apply_node_tracker.go @@ -58,7 +58,7 @@ func DefaultCachedBadNodeTrackerConfig() CachedBadNodeTrackerConfig { // Limit marking 5 nodes per 30min as ineligible with an initial // burst of 10 nodes. - RateLimit: 5 / (30 * 60), + RateLimit: 5.0 / (30 * 60), BurstSize: 10, // Consider a node as bad if it is added more than 100 times in a 5min @@ -71,8 +71,8 @@ func DefaultCachedBadNodeTrackerConfig() CachedBadNodeTrackerConfig { // NewCachedBadNodeTracker returns a new CachedBadNodeTracker. func NewCachedBadNodeTracker(logger hclog.Logger, config CachedBadNodeTrackerConfig) (*CachedBadNodeTracker, error) { log := logger.Named("bad_node_tracker"). - With("threshold", config.Threshold). - With("window", config.Window) + With("window", config.Window). + With("threshold", config.Threshold) cache, err := lru.New2Q(config.CacheSize) if err != nil { @@ -91,18 +91,32 @@ func NewCachedBadNodeTracker(logger hclog.Logger, config CachedBadNodeTrackerCon // IsBad returns true if the node has more rejections than the threshold within // the time window. func (c *CachedBadNodeTracker) IsBad(nodeID string) bool { - // Limit the number of nodes we report as bad to avoid mass assigning nodes - // as ineligible, but still call Get to keep the cache entry fresh. + logger := c.logger.With("node_id", nodeID) + logger.Debug("checking if node is bad") + value, ok := c.cache.Get(nodeID) - if !ok || !c.limiter.Allow() { + if !ok { + logger.Debug("node not in cache") + return false + } + + // Limit the number of nodes we report as bad to avoid mass assigning nodes + // as ineligible, but do it after Get to keep the cache entry fresh. + if !c.limiter.Allow() { + logger.Info("returning false due to rate limiting") return false } stats := value.(*badNodeStats) score := stats.score() - c.logger.Debug("checking if node is bad", "node_id", nodeID, "score", score) - return score > c.threshold + if score >= c.threshold { + logger.Debug("node is bad", "score", score) + return true + } + + logger.Debug("node is not bad", "score", score) + return false } // Add records a new rejection for node. If it's the first time a node is added From 27e767d3f202e9008ad4bbc5027d497d76c87c23 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 8 Jul 2022 14:39:05 -0400 Subject: [PATCH 09/14] website: update documention for the plan rejection tracker --- website/content/docs/configuration/server.mdx | 4 ++-- .../content/docs/operations/monitoring-nomad.mdx | 13 +++++++++---- 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index 538a8449a4f..7ad7494dac8 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -254,8 +254,8 @@ more details. - `node_threshold` `(int: 100)` - The number of plan rejections for a node within the `node_window` to trigger a client to be set as ineligible. -- `node_window` `(int: "5m")` - The time window for when plan rejections for a - node should be considered. +- `node_window` `(string: "5m")` - The time window for when plan rejections for + a node should be considered. If you observe too many false positives (clients being marked as ineligible even if they don't present any problem) you may want to increase diff --git a/website/content/docs/operations/monitoring-nomad.mdx b/website/content/docs/operations/monitoring-nomad.mdx index dc053d7defb..c4f16513783 100644 --- a/website/content/docs/operations/monitoring-nomad.mdx +++ b/website/content/docs/operations/monitoring-nomad.mdx @@ -149,10 +149,15 @@ While it is possible for these log lines to occur infrequently due to normal cluster conditions, they should not appear repeatedly and prevent the job from eventually running (look up the evaluation ID logged to find the job). -Nomad tracks the history of plan rejections per client and will mark it as -ineligible for scheduling if the number of rejections goes above a given -threshold within a time window. When this happens, the following node event is -registered: +#### Plan rejection tracker + +Nomad provides a mechanism to track the history of plan rejections per client +and mark them as ineligible if the number goes above a given threshold within a +time window. This functionality can be enabled using the +[`plan_rejection_tracker`] server configuration. + +When a node is marked as ineligible due to excessive plan rejections, the +following node event is registered: ``` Node marked as ineligible for scheduling due to multiple plan rejections From ff8f670c4a1ac747b5375b32300e1d62ae5af555 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 8 Jul 2022 17:31:09 -0400 Subject: [PATCH 10/14] core: refactor plan rejection tracker Simplify the interface for `BadNodeTracker` by merging the methods `Add` and `IsBad` since they are always called in tandem and reduce the number and level of log messages generated. Also cleanup expired records to avoid inifinite growth the cache entry never expires. Take explicit timestamp to make tests faster and more reliable. --- nomad/plan_apply.go | 3 +- nomad/plan_apply_node_tracker.go | 125 ++++++++++++++------------ nomad/plan_apply_node_tracker_test.go | 105 ++++++++++++---------- 3 files changed, 125 insertions(+), 108 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 0716247dd30..21e4651d25a 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -171,8 +171,7 @@ func (p *planner) planApply() { // Check if any of the rejected nodes should be made ineligible. for _, nodeID := range result.RejectedNodes { - p.badNodeTracker.Add(nodeID) - if p.badNodeTracker.IsBad(nodeID) { + if p.badNodeTracker.Add(nodeID) { result.IneligibleNodes = append(result.IneligibleNodes, nodeID) } } diff --git a/nomad/plan_apply_node_tracker.go b/nomad/plan_apply_node_tracker.go index 2517e4ad129..48783fbc477 100644 --- a/nomad/plan_apply_node_tracker.go +++ b/nomad/plan_apply_node_tracker.go @@ -12,8 +12,7 @@ import ( ) type BadNodeTracker interface { - IsBad(string) bool - Add(string) + Add(string) bool EmitStats(time.Duration, <-chan struct{}) } @@ -21,9 +20,8 @@ type BadNodeTracker interface { // used when tracking is disabled. type NoopBadNodeTracker struct{} -func (n *NoopBadNodeTracker) Add(string) {} func (n *NoopBadNodeTracker) EmitStats(time.Duration, <-chan struct{}) {} -func (n *NoopBadNodeTracker) IsBad(string) bool { +func (n *NoopBadNodeTracker) Add(string) bool { return false } @@ -88,50 +86,24 @@ func NewCachedBadNodeTracker(logger hclog.Logger, config CachedBadNodeTrackerCon }, nil } -// IsBad returns true if the node has more rejections than the threshold within -// the time window. -func (c *CachedBadNodeTracker) IsBad(nodeID string) bool { - logger := c.logger.With("node_id", nodeID) - logger.Debug("checking if node is bad") - +// Add records a new rejection for a node and returns true if the number of +// rejections reaches the threshold. +// +// If it's the first time the node is added it will be included in the internal +// cache. If the cache is full the least recently updated or accessed node is +// evicted. +func (c *CachedBadNodeTracker) Add(nodeID string) bool { value, ok := c.cache.Get(nodeID) if !ok { - logger.Debug("node not in cache") - return false - } - - // Limit the number of nodes we report as bad to avoid mass assigning nodes - // as ineligible, but do it after Get to keep the cache entry fresh. - if !c.limiter.Allow() { - logger.Info("returning false due to rate limiting") - return false + value = newBadNodeStats(nodeID, c.window) + c.cache.Add(nodeID, value) } - stats := value.(*badNodeStats) - score := stats.score() - - if score >= c.threshold { - logger.Debug("node is bad", "score", score) - return true - } - logger.Debug("node is not bad", "score", score) - return false -} - -// Add records a new rejection for node. If it's the first time a node is added -// it will be included in the internal cache. If the cache is full the least -// recently updated or accessed node is evicted. -func (c *CachedBadNodeTracker) Add(nodeID string) { - value, ok := c.cache.Get(nodeID) - if !ok { - value = newBadNodeStats(c.window) - c.cache.Add(nodeID, value) - } + now := time.Now() + stats.record(now) - stats := value.(*badNodeStats) - score := stats.record() - c.logger.Debug("adding node plan rejection", "node_id", nodeID, "score", score) + return c.isBad(now, stats) } // EmitStats generates metrics for the bad nodes being currently tracked. Must @@ -152,11 +124,33 @@ func (c *CachedBadNodeTracker) EmitStats(period time.Duration, stopCh <-chan str } } +// isBad returns true if the node has more rejections than the threshold within +// the time window. +func (c *CachedBadNodeTracker) isBad(t time.Time, stats *badNodeStats) bool { + score := stats.score(t) + logger := c.logger.With("node_id", stats.id, "score", score) + + logger.Trace("checking if node is bad") + if score >= c.threshold { + // Limit the number of nodes we report as bad to avoid mass assigning + // nodes as ineligible, but do it after Get to keep the cache entry + // fresh. + if !c.limiter.Allow() { + logger.Trace("node is bad, but returning false due to rate limiting") + return false + } + return true + } + + return false +} + func (c *CachedBadNodeTracker) emitStats() { + now := time.Now() for _, k := range c.cache.Keys() { value, _ := c.cache.Get(k) stats := value.(*badNodeStats) - score := stats.score() + score := stats.score(now) labels := []metrics.Label{ {Name: "node_id", Value: k.(string)}, @@ -167,37 +161,52 @@ func (c *CachedBadNodeTracker) emitStats() { // badNodeStats represents a node being tracked by a BadNodeTracker. type badNodeStats struct { + id string history []time.Time window time.Duration } // newBadNodeStats returns an empty badNodeStats. -func newBadNodeStats(window time.Duration) *badNodeStats { +func newBadNodeStats(id string, window time.Duration) *badNodeStats { return &badNodeStats{ + id: id, window: window, } } // score returns the number of rejections within the past time window. -func (s *badNodeStats) score() int { - count := 0 - windowStart := time.Now().Add(-s.window) +func (s *badNodeStats) score(t time.Time) int { + active, expired := s.countActive(t) - for i := len(s.history) - 1; i >= 0; i-- { + // Remove expired records. + if expired > 0 { + s.history = s.history[expired:] + } + + return active +} + +// record adds a new entry to the stats history and returns the new score. +func (s *badNodeStats) record(t time.Time) { + s.history = append(s.history, t) +} + +// countActive returns the number of records that happened after the time +// window started (active) and before (expired). +func (s *badNodeStats) countActive(t time.Time) (int, int) { + windowStart := t.Add(-s.window) + + // Assume all values are expired and move back from history until we find + // a record that actually happened before the window started. + expired := len(s.history) + for ; expired > 0; expired-- { + i := expired - 1 ts := s.history[i] if ts.Before(windowStart) { - // Since we start from the end of the history list, anything past - // this point will have happened before the time window. break } - count += 1 } - return count -} -// record adds a new entry to the stats history and returns the new score. -func (s *badNodeStats) record() int { - now := time.Now() - s.history = append(s.history, now) - return s.score() + active := len(s.history) - expired + return active, expired } diff --git a/nomad/plan_apply_node_tracker_test.go b/nomad/plan_apply_node_tracker_test.go index 12b32953ebb..018335c4f61 100644 --- a/nomad/plan_apply_node_tracker_test.go +++ b/nomad/plan_apply_node_tracker_test.go @@ -7,11 +7,10 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/require" ) -func TestBadNodeTracker(t *testing.T) { +func TesCachedtBadNodeTracker(t *testing.T) { ci.Parallel(t) config := DefaultCachedBadNodeTrackerConfig() @@ -31,12 +30,11 @@ func TestBadNodeTracker(t *testing.T) { require.ElementsMatch(t, expected, tracker.cache.Keys()) } -func TestBadNodeTracker_IsBad(t *testing.T) { +func TestCachedBadNodeTracker_isBad(t *testing.T) { ci.Parallel(t) config := DefaultCachedBadNodeTrackerConfig() config.CacheSize = 3 - config.Window = time.Duration(testutil.TestMultiplier()) * time.Second config.Threshold = 4 tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config) @@ -70,40 +68,39 @@ func TestBadNodeTracker_IsBad(t *testing.T) { nodeID: "node-3", bad: true, }, - { - name: "node not tracked is not bad", - nodeID: "node-1000", - bad: false, - }, } + now := time.Now() for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - got := tracker.IsBad(tc.nodeID) + // Read value from cached. + v, ok := tracker.cache.Get(tc.nodeID) + require.True(t, ok) + + // Check if it's bad. + stats := v.(*badNodeStats) + got := tracker.isBad(now, stats) require.Equal(t, tc.bad, got) }) } - t.Run("cache expires", func(t *testing.T) { - time.Sleep(config.Window) - require.False(t, tracker.IsBad("node-1")) - require.False(t, tracker.IsBad("node-2")) - require.False(t, tracker.IsBad("node-3")) - }) - - t.Run("IsBad updates cache", func(t *testing.T) { - // Don't access node-3 so it should be evicted when a new value is - // added and the tracker size overflows. - tracker.IsBad("node-1") - tracker.IsBad("node-2") - tracker.Add("node-4") - - expected := []string{"node-1", "node-2", "node-4"} - require.ElementsMatch(t, expected, tracker.cache.Keys()) - }) + future := time.Now().Add(2 * config.Window) + nodes := []string{"node-1", "node-2", "node-3"} + for _, n := range nodes { + t.Run(fmt.Sprintf("%s cache expires", n), func(t *testing.T) { + v, ok := tracker.cache.Get(n) + require.True(t, ok) + + stats := v.(*badNodeStats) + bad := tracker.isBad(future, stats) + require.False(t, bad) + }) + } } -func TestBadNodeTracker_RateLimit(t *testing.T) { +func TesCachedtBadNodeTracker_rateLimit(t *testing.T) { + ci.Parallel(t) + config := DefaultCachedBadNodeTrackerConfig() config.Threshold = 3 config.RateLimit = float64(1) // Get a new token every second. @@ -118,38 +115,50 @@ func TestBadNodeTracker_RateLimit(t *testing.T) { tracker.Add("node-1") tracker.Add("node-1") + v, ok := tracker.cache.Get("node-1") + require.True(t, ok) + + stats := v.(*badNodeStats) + // Burst allows for max 3 operations. - require.True(t, tracker.IsBad("node-1")) - require.True(t, tracker.IsBad("node-1")) - require.True(t, tracker.IsBad("node-1")) - require.False(t, tracker.IsBad("node-1")) + now := time.Now() + require.True(t, tracker.isBad(now, stats)) + require.True(t, tracker.isBad(now, stats)) + require.True(t, tracker.isBad(now, stats)) + require.False(t, tracker.isBad(now, stats)) // Wait for a new token. time.Sleep(time.Second) - require.True(t, tracker.IsBad("node-1")) - require.False(t, tracker.IsBad("node-1")) + now = time.Now() + require.True(t, tracker.isBad(now, stats)) + require.False(t, tracker.isBad(now, stats)) } func TestBadNodeStats_score(t *testing.T) { ci.Parallel(t) - window := time.Duration(testutil.TestMultiplier()) * time.Second - stats := newBadNodeStats(window) + window := time.Minute + stats := newBadNodeStats("node-1", window) - require.Equal(t, 0, stats.score()) + now := time.Now() + require.Equal(t, 0, stats.score(now)) - stats.record() - stats.record() - stats.record() - require.Equal(t, 3, stats.score()) + stats.record(now) + stats.record(now) + stats.record(now) + require.Equal(t, 3, stats.score(now)) + require.Len(t, stats.history, 3) - time.Sleep(window / 2) - stats.record() - require.Equal(t, 4, stats.score()) + halfWindow := now.Add(window / 2) + stats.record(halfWindow) + require.Equal(t, 4, stats.score(halfWindow)) + require.Len(t, stats.history, 4) - time.Sleep(window / 2) - require.Equal(t, 1, stats.score()) + fullWindow := now.Add(window).Add(time.Second) + require.Equal(t, 1, stats.score(fullWindow)) + require.Len(t, stats.history, 1) - time.Sleep(window / 2) - require.Equal(t, 0, stats.score()) + afterWindow := now.Add(2 * window) + require.Equal(t, 0, stats.score(afterWindow)) + require.Len(t, stats.history, 0) } From bd833f933d59d9ef9175f3b3d2bbbed641f44633 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 8 Jul 2022 17:43:39 -0400 Subject: [PATCH 11/14] core: add plan rejetion URL to node ineligibility event --- nomad/state/state_store.go | 2 +- website/content/docs/operations/monitoring-nomad.mdx | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 910e2d695d9..35046b0fb74 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -43,7 +43,7 @@ const ( // the same node may indicate an underlying issue not detected by Nomad. // The plan applier keeps track of plan rejection history and will mark // nodes as ineligible if they cross a given threshold. - NodeEligibilityEventPlanRejectThreshold = "Node marked as ineligible for scheduling due to multiple plan rejections" + NodeEligibilityEventPlanRejectThreshold = "Node marked as ineligible for scheduling due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information" // NodeRegisterEventRegistered is the message used when the node becomes // registered. diff --git a/website/content/docs/operations/monitoring-nomad.mdx b/website/content/docs/operations/monitoring-nomad.mdx index c4f16513783..0c21f71969a 100644 --- a/website/content/docs/operations/monitoring-nomad.mdx +++ b/website/content/docs/operations/monitoring-nomad.mdx @@ -160,7 +160,7 @@ When a node is marked as ineligible due to excessive plan rejections, the following node event is registered: ``` -Node marked as ineligible for scheduling due to multiple plan rejections +Node marked as ineligible for scheduling due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information ``` Along with the log line: From fb2e761cda83d847960df140a743537db0827d2d Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 12 Jul 2022 15:33:12 -0400 Subject: [PATCH 12/14] core: use stable time FSM operation Set the timestamp for a plan apply operation at request time to avoid non-deterministic operations in the FSM. --- nomad/plan_apply.go | 4 +++- nomad/state/state_store.go | 3 +-- nomad/structs/structs.go | 3 +++ 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 21e4651d25a..57bb0c0873a 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -234,6 +234,8 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64 // applyPlan is used to apply the plan result and to return the alloc index func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) { + now := time.Now().UTC().UnixNano() + // Setup the update request req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ @@ -243,10 +245,10 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap DeploymentUpdates: result.DeploymentUpdates, IneligibleNodes: result.IneligibleNodes, EvalID: plan.EvalID, + UpdatedAt: now, } preemptedJobIDs := make(map[structs.NamespacedID]struct{}) - now := time.Now().UTC().UnixNano() if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) { // Initialize the allocs request using the new optimized log entry format. diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 35046b0fb74..41998f81057 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -371,7 +371,6 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 defer txn.Abort() // Mark nodes as ineligible. - now := time.Now().Unix() for _, nodeID := range results.IneligibleNodes { s.logger.Warn("marking node as ineligible due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information", "node_id", nodeID) @@ -380,7 +379,7 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 SetMessage(NodeEligibilityEventPlanRejectThreshold) err := s.updateNodeEligibilityImpl(index, nodeID, - structs.NodeSchedulingIneligible, now, nodeEvent, txn) + structs.NodeSchedulingIneligible, results.UpdatedAt, nodeEvent, txn) if err != nil { return err } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 179b54a9eed..c8bbbcd5387 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -918,6 +918,9 @@ type ApplyPlanResultsRequest struct { // placements for and should therefore be considered ineligible by workers // to avoid retrying them repeatedly. IneligibleNodes []string + + // UpdatedAt represents server time of receiving request. + UpdatedAt int64 } // AllocUpdateRequest is used to submit changes to allocations, either From f5936f0830e1b5a490a8a4f5f7d39f138cd628a3 Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 12 Jul 2022 16:31:22 -0400 Subject: [PATCH 13/14] config: use pointer for plan_rejection_tracker.enabled Using a pointer allow us to differentiate between a non-set value and an explicit `false` if we decide to use `true` by default. --- command/agent/agent.go | 4 +++- command/agent/agent_test.go | 4 ++-- command/agent/config.go | 7 ++++--- command/agent/config_parse_test.go | 2 +- command/agent/config_test.go | 4 ++-- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 04f0955a4f0..e9cde06a82a 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -438,7 +438,9 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { // Set plan rejection tracker configuration. if planRejectConf := agentConfig.Server.PlanRejectionTracker; planRejectConf != nil { - conf.NodePlanRejectionEnabled = planRejectConf.Enabled + if planRejectConf.Enabled != nil { + conf.NodePlanRejectionEnabled = *planRejectConf.Enabled + } conf.NodePlanRejectionThreshold = planRejectConf.NodeThreshold if planRejectConf.NodeWindow == 0 { diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 7b75766ab80..d04ffeffd02 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -386,12 +386,12 @@ func TestAgent_ServerConfig_PlanRejectionTracker(t *testing.T) { { name: "valid config", trackerConfig: &PlanRejectionTracker{ - Enabled: true, + Enabled: helper.BoolToPtr(true), NodeThreshold: 123, NodeWindow: 17 * time.Minute, }, expectedConfig: &PlanRejectionTracker{ - Enabled: true, + Enabled: helper.BoolToPtr(true), NodeThreshold: 123, NodeWindow: 17 * time.Minute, }, diff --git a/command/agent/config.go b/command/agent/config.go index 816f0ab4bde..e84472bf709 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -556,7 +556,7 @@ type RaftBoltConfig struct { // tracker. type PlanRejectionTracker struct { // Enabled controls if the plan rejection tracker is active or not. - Enabled bool `hcl:"enabled"` + Enabled *bool `hcl:"enabled"` // NodeThreshold is the number of times a node can have plan rejections // before it is marked as ineligible. @@ -582,8 +582,8 @@ func (p *PlanRejectionTracker) Merge(b *PlanRejectionTracker) *PlanRejectionTrac return &result } - if b.Enabled { - result.Enabled = true + if b.Enabled != nil { + result.Enabled = b.Enabled } if b.NodeThreshold != 0 { @@ -1037,6 +1037,7 @@ func DefaultConfig() *Config { RaftProtocol: 3, StartJoin: []string{}, PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(false), NodeThreshold: 100, NodeWindow: 5 * time.Minute, }, diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index c203178a49c..5000aef76b2 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -127,7 +127,7 @@ var basicConfig = &Config{ EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(200), PlanRejectionTracker: &PlanRejectionTracker{ - Enabled: true, + Enabled: helper.BoolToPtr(true), NodeThreshold: 100, NodeWindow: 41 * time.Minute, NodeWindowHCL: "41m", diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 6710ac5c03f..2e98438f382 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -149,7 +149,7 @@ func TestConfig_Merge(t *testing.T) { EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(0), PlanRejectionTracker: &PlanRejectionTracker{ - Enabled: true, + Enabled: helper.BoolToPtr(true), NodeThreshold: 100, NodeWindow: 11 * time.Minute, }, @@ -349,7 +349,7 @@ func TestConfig_Merge(t *testing.T) { EnableEventBroker: helper.BoolToPtr(true), EventBufferSize: helper.IntToPtr(100), PlanRejectionTracker: &PlanRejectionTracker{ - Enabled: true, + Enabled: helper.BoolToPtr(true), NodeThreshold: 100, NodeWindow: 11 * time.Minute, }, From b243d7d011ffc031e9cf4ad08c42baf309269f8b Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Tue, 12 Jul 2022 17:50:50 -0400 Subject: [PATCH 14/14] test: fix pointer dereference --- command/agent/agent_test.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index d04ffeffd02..cd7aad9ae95 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -422,10 +422,12 @@ func TestAgent_ServerConfig_PlanRejectionTracker(t *testing.T) { require.Contains(t, err.Error(), tc.expectedErr) } else { require.NoError(t, err) - require.Equal(t, - tc.expectedConfig.Enabled, - serverConfig.NodePlanRejectionEnabled, - ) + if tc.expectedConfig.Enabled != nil { + require.Equal(t, + *tc.expectedConfig.Enabled, + serverConfig.NodePlanRejectionEnabled, + ) + } require.Equal(t, tc.expectedConfig.NodeThreshold, serverConfig.NodePlanRejectionThreshold,