diff --git a/.changelog/13421.txt b/.changelog/13421.txt new file mode 100644 index 00000000000..d37bb2d58a2 --- /dev/null +++ b/.changelog/13421.txt @@ -0,0 +1,7 @@ +```release-note:improvement +core: automatically mark clients with recurring plan rejections as ineligible +``` + +```release-note:improvement +metrics: emit `nomad.nomad.plan.rejection_tracker.node_score` metric for the number of times a node had a plan rejection within the past time window +``` diff --git a/command/agent/agent.go b/command/agent/agent.go index cdfb2135f91..e9cde06a82a 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -436,6 +436,20 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { return nil, fmt.Errorf("deploy_query_rate_limit must be greater than 0") } + // Set plan rejection tracker configuration. + if planRejectConf := agentConfig.Server.PlanRejectionTracker; planRejectConf != nil { + if planRejectConf.Enabled != nil { + conf.NodePlanRejectionEnabled = *planRejectConf.Enabled + } + conf.NodePlanRejectionThreshold = planRejectConf.NodeThreshold + + if planRejectConf.NodeWindow == 0 { + return nil, fmt.Errorf("plan_rejection_tracker.node_window must be greater than 0") + } else { + conf.NodePlanRejectionWindow = planRejectConf.NodeWindow + } + } + // Add Enterprise license configs conf.LicenseEnv = agentConfig.Server.LicenseEnv conf.LicensePath = agentConfig.Server.LicensePath diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 6bea52f42d4..cd7aad9ae95 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -365,6 +365,82 @@ func TestAgent_ServerConfig_Limits_OK(t *testing.T) { } } +func TestAgent_ServerConfig_PlanRejectionTracker(t *testing.T) { + ci.Parallel(t) + + cases := []struct { + name string + trackerConfig *PlanRejectionTracker + expectedConfig *PlanRejectionTracker + expectedErr string + }{ + { + name: "default", + trackerConfig: nil, + expectedConfig: &PlanRejectionTracker{ + NodeThreshold: 100, + NodeWindow: 5 * time.Minute, + }, + expectedErr: "", + }, + { + name: "valid config", + trackerConfig: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(true), + NodeThreshold: 123, + NodeWindow: 17 * time.Minute, + }, + expectedConfig: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(true), + NodeThreshold: 123, + NodeWindow: 17 * time.Minute, + }, + expectedErr: "", + }, + { + name: "invalid node window", + trackerConfig: &PlanRejectionTracker{ + NodeThreshold: 123, + }, + expectedErr: "node_window must be greater than 0", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + config := DevConfig(nil) + require.NoError(t, config.normalizeAddrs()) + + if tc.trackerConfig != nil { + config.Server.PlanRejectionTracker = tc.trackerConfig + } + + serverConfig, err := convertServerConfig(config) + + if tc.expectedErr != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedErr) + } else { + require.NoError(t, err) + if tc.expectedConfig.Enabled != nil { + require.Equal(t, + *tc.expectedConfig.Enabled, + serverConfig.NodePlanRejectionEnabled, + ) + } + require.Equal(t, + tc.expectedConfig.NodeThreshold, + serverConfig.NodePlanRejectionThreshold, + ) + require.Equal(t, + tc.expectedConfig.NodeWindow, + serverConfig.NodePlanRejectionWindow, + ) + } + }) + } +} + func TestAgent_ServerConfig_RaftMultiplier_Ok(t *testing.T) { ci.Parallel(t) diff --git a/command/agent/config.go b/command/agent/config.go index 50e5475922e..e84472bf709 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -503,6 +503,10 @@ type ServerConfig struct { // This value is ignored. DefaultSchedulerConfig *structs.SchedulerConfiguration `hcl:"default_scheduler_config"` + // PlanRejectionTracker configures the node plan rejection tracker that + // detects potentially bad nodes. + PlanRejectionTracker *PlanRejectionTracker `hcl:"plan_rejection_tracker"` + // EnableEventBroker configures whether this server's state store // will generate events for its event stream. EnableEventBroker *bool `hcl:"enable_event_broker"` @@ -548,6 +552,53 @@ type RaftBoltConfig struct { NoFreelistSync bool `hcl:"no_freelist_sync"` } +// PlanRejectionTracker is used in servers to configure the plan rejection +// tracker. +type PlanRejectionTracker struct { + // Enabled controls if the plan rejection tracker is active or not. + Enabled *bool `hcl:"enabled"` + + // NodeThreshold is the number of times a node can have plan rejections + // before it is marked as ineligible. + NodeThreshold int `hcl:"node_threshold"` + + // NodeWindow is the time window used to track active plan rejections for + // nodes. + NodeWindow time.Duration + NodeWindowHCL string `hcl:"node_window" json:"-"` + + // ExtraKeysHCL is used by hcl to surface unexpected keys + ExtraKeysHCL []string `hcl:",unusedKeys" json:"-"` +} + +func (p *PlanRejectionTracker) Merge(b *PlanRejectionTracker) *PlanRejectionTracker { + if p == nil { + return b + } + + result := *p + + if b == nil { + return &result + } + + if b.Enabled != nil { + result.Enabled = b.Enabled + } + + if b.NodeThreshold != 0 { + result.NodeThreshold = b.NodeThreshold + } + + if b.NodeWindow != 0 { + result.NodeWindow = b.NodeWindow + } + if b.NodeWindowHCL != "" { + result.NodeWindowHCL = b.NodeWindowHCL + } + return &result +} + // Search is used in servers to configure search API options. type Search struct { // FuzzyEnabled toggles whether the FuzzySearch API is enabled. If not @@ -985,6 +1036,11 @@ func DefaultConfig() *Config { EventBufferSize: helper.IntToPtr(100), RaftProtocol: 3, StartJoin: []string{}, + PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(false), + NodeThreshold: 100, + NodeWindow: 5 * time.Minute, + }, ServerJoin: &ServerJoin{ RetryJoin: []string{}, RetryInterval: 30 * time.Second, @@ -1586,6 +1642,10 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig { result.EventBufferSize = b.EventBufferSize } + if b.PlanRejectionTracker != nil { + result.PlanRejectionTracker = result.PlanRejectionTracker.Merge(b.PlanRejectionTracker) + } + if b.DefaultSchedulerConfig != nil { c := *b.DefaultSchedulerConfig result.DefaultSchedulerConfig = &c diff --git a/command/agent/config_parse.go b/command/agent/config_parse.go index 363db9e1e33..04135f78324 100644 --- a/command/agent/config_parse.go +++ b/command/agent/config_parse.go @@ -43,9 +43,12 @@ func ParseConfigFile(path string) (*Config, error) { VaultRetry: &client.RetryConfig{}, }, }, + Server: &ServerConfig{ + PlanRejectionTracker: &PlanRejectionTracker{}, + ServerJoin: &ServerJoin{}, + }, ACL: &ACLConfig{}, Audit: &config.AuditConfig{}, - Server: &ServerConfig{ServerJoin: &ServerJoin{}}, Consul: &config.ConsulConfig{}, Autopilot: &config.AutopilotConfig{}, Telemetry: &Telemetry{}, @@ -54,7 +57,7 @@ func ParseConfigFile(path string) (*Config, error) { err = hcl.Decode(c, buf.String()) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to decode HCL file %s: %w", path, err) } // convert strings to time.Durations @@ -66,6 +69,7 @@ func ParseConfigFile(path string) (*Config, error) { {"server.heartbeat_grace", &c.Server.HeartbeatGrace, &c.Server.HeartbeatGraceHCL, nil}, {"server.min_heartbeat_ttl", &c.Server.MinHeartbeatTTL, &c.Server.MinHeartbeatTTLHCL, nil}, {"server.failover_heartbeat_ttl", &c.Server.FailoverHeartbeatTTL, &c.Server.FailoverHeartbeatTTLHCL, nil}, + {"server.plan_rejection_tracker.node_window", &c.Server.PlanRejectionTracker.NodeWindow, &c.Server.PlanRejectionTracker.NodeWindowHCL, nil}, {"server.retry_interval", &c.Server.RetryInterval, &c.Server.RetryIntervalHCL, nil}, {"server.server_join.retry_interval", &c.Server.ServerJoin.RetryInterval, &c.Server.ServerJoin.RetryIntervalHCL, nil}, {"consul.timeout", &c.Consul.Timeout, &c.Consul.TimeoutHCL, nil}, diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index d2567c7fce2..5000aef76b2 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -126,6 +126,12 @@ var basicConfig = &Config{ EncryptKey: "abc", EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(200), + PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(true), + NodeThreshold: 100, + NodeWindow: 41 * time.Minute, + NodeWindowHCL: "41m", + }, ServerJoin: &ServerJoin{ RetryJoin: []string{"1.1.1.1", "2.2.2.2"}, RetryInterval: time.Duration(15) * time.Second, @@ -540,6 +546,9 @@ func (c *Config) addDefaults() { if c.Server.ServerJoin == nil { c.Server.ServerJoin = &ServerJoin{} } + if c.Server.PlanRejectionTracker == nil { + c.Server.PlanRejectionTracker = &PlanRejectionTracker{} + } } // Tests for a panic parsing json with an object of exactly @@ -617,6 +626,11 @@ var sample0 = &Config{ RetryJoin: []string{"10.0.0.101", "10.0.0.102", "10.0.0.103"}, EncryptKey: "sHck3WL6cxuhuY7Mso9BHA==", ServerJoin: &ServerJoin{}, + PlanRejectionTracker: &PlanRejectionTracker{ + NodeThreshold: 100, + NodeWindow: 31 * time.Minute, + NodeWindowHCL: "31m", + }, }, ACL: &ACLConfig{ Enabled: true, @@ -707,6 +721,11 @@ var sample1 = &Config{ RetryJoin: []string{"10.0.0.101", "10.0.0.102", "10.0.0.103"}, EncryptKey: "sHck3WL6cxuhuY7Mso9BHA==", ServerJoin: &ServerJoin{}, + PlanRejectionTracker: &PlanRejectionTracker{ + NodeThreshold: 100, + NodeWindow: 31 * time.Minute, + NodeWindowHCL: "31m", + }, }, ACL: &ACLConfig{ Enabled: true, diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 36e263a3134..2e98438f382 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -148,6 +148,11 @@ func TestConfig_Merge(t *testing.T) { UpgradeVersion: "foo", EnableEventBroker: helper.BoolToPtr(false), EventBufferSize: helper.IntToPtr(0), + PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(true), + NodeThreshold: 100, + NodeWindow: 11 * time.Minute, + }, }, ACL: &ACLConfig{ Enabled: true, @@ -343,6 +348,11 @@ func TestConfig_Merge(t *testing.T) { UpgradeVersion: "bar", EnableEventBroker: helper.BoolToPtr(true), EventBufferSize: helper.IntToPtr(100), + PlanRejectionTracker: &PlanRejectionTracker{ + Enabled: helper.BoolToPtr(true), + NodeThreshold: 100, + NodeWindow: 11 * time.Minute, + }, }, ACL: &ACLConfig{ Enabled: true, diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index 27754b95757..4c2119302e1 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -133,6 +133,12 @@ server { enable_event_broker = false event_buffer_size = 200 + plan_rejection_tracker { + enabled = true + node_threshold = 100 + node_window = "41m" + } + server_join { retry_join = ["1.1.1.1", "2.2.2.2"] retry_max = 3 diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index 406e314a960..15ba4328fa0 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -277,6 +277,11 @@ "node_gc_threshold": "12h", "non_voting_server": true, "num_schedulers": 2, + "plan_rejection_tracker": { + "enabled": true, + "node_threshold": 100, + "node_window": "41m" + }, "raft_protocol": 3, "raft_multiplier": 4, "redundancy_zone": "foo", diff --git a/command/agent/testdata/sample0.json b/command/agent/testdata/sample0.json index 1c450a32f20..a7cc60f4930 100644 --- a/command/agent/testdata/sample0.json +++ b/command/agent/testdata/sample0.json @@ -55,6 +55,10 @@ "bootstrap_expect": 3, "enabled": true, "encrypt": "sHck3WL6cxuhuY7Mso9BHA==", + "plan_rejection_tracker": { + "node_threshold": 100, + "node_window": "31m" + }, "retry_join": [ "10.0.0.101", "10.0.0.102", diff --git a/command/agent/testdata/sample1/sample0.json b/command/agent/testdata/sample1/sample0.json index 07796e913d9..a806ea9098a 100644 --- a/command/agent/testdata/sample1/sample0.json +++ b/command/agent/testdata/sample1/sample0.json @@ -20,6 +20,10 @@ "bootstrap_expect": 3, "enabled": true, "encrypt": "sHck3WL6cxuhuY7Mso9BHA==", + "plan_rejection_tracker": { + "node_threshold": 100, + "node_window": "31m" + }, "retry_join": [ "10.0.0.101", "10.0.0.102", diff --git a/nomad/config.go b/nomad/config.go index 6ea8cfd090d..78b928e9aa3 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -232,6 +232,17 @@ type Config struct { // additional delay is selected from this range randomly. EvalFailedFollowupDelayRange time.Duration + // NodePlanRejectionEnabled controls if node rejection tracker is enabled. + NodePlanRejectionEnabled bool + + // NodePlanRejectionThreshold is the number of times a node must have a + // plan rejection before it is set as ineligible. + NodePlanRejectionThreshold int + + // NodePlanRejectionWindow is the time window used to track plan + // rejections for nodes. + NodePlanRejectionWindow time.Duration + // MinHeartbeatTTL is the minimum time between heartbeats. // This is used as a floor to prevent excessive updates. MinHeartbeatTTL time.Duration @@ -395,6 +406,9 @@ func DefaultConfig() *Config { MaxHeartbeatsPerSecond: 50.0, HeartbeatGrace: 10 * time.Second, FailoverHeartbeatTTL: 300 * time.Second, + NodePlanRejectionEnabled: false, + NodePlanRejectionThreshold: 15, + NodePlanRejectionWindow: 10 * time.Minute, ConsulConfig: config.DefaultConsulConfig(), VaultConfig: config.DefaultVaultConfig(), RPCHoldTimeout: 5 * time.Second, diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index 7f1fffc54f8..57bb0c0873a 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -25,20 +25,45 @@ type planner struct { // planQueue is used to manage the submitted allocation // plans that are waiting to be assessed by the leader planQueue *PlanQueue + + // badNodeTracker keeps a score for nodes that have plan rejections. + // Plan rejections are somewhat expected given Nomad's optimistic + // scheduling, but repeated rejections for the same node may indicate an + // undetected issue, so we need to track rejection history. + badNodeTracker BadNodeTracker } // newPlanner returns a new planner to be used for managing allocation plans. func newPlanner(s *Server) (*planner, error) { + log := s.logger.Named("planner") + // Create a plan queue planQueue, err := NewPlanQueue() if err != nil { return nil, err } + // Create the bad node tracker. + var badNodeTracker BadNodeTracker + if s.config.NodePlanRejectionEnabled { + config := DefaultCachedBadNodeTrackerConfig() + + config.Window = s.config.NodePlanRejectionWindow + config.Threshold = s.config.NodePlanRejectionThreshold + + badNodeTracker, err = NewCachedBadNodeTracker(log, config) + if err != nil { + return nil, err + } + } else { + badNodeTracker = &NoopBadNodeTracker{} + } + return &planner{ - Server: s, - log: s.logger.Named("planner"), - planQueue: planQueue, + Server: s, + log: log, + planQueue: planQueue, + badNodeTracker: badNodeTracker, }, nil } @@ -144,6 +169,13 @@ func (p *planner) planApply() { continue } + // Check if any of the rejected nodes should be made ineligible. + for _, nodeID := range result.RejectedNodes { + if p.badNodeTracker.Add(nodeID) { + result.IneligibleNodes = append(result.IneligibleNodes, nodeID) + } + } + // Fast-path the response if there is nothing to do if result.IsNoOp() { pending.respond(result, nil) @@ -202,6 +234,8 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64 // applyPlan is used to apply the plan result and to return the alloc index func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap *state.StateSnapshot) (raft.ApplyFuture, error) { + now := time.Now().UTC().UnixNano() + // Setup the update request req := structs.ApplyPlanResultsRequest{ AllocUpdateRequest: structs.AllocUpdateRequest{ @@ -209,11 +243,12 @@ func (p *planner) applyPlan(plan *structs.Plan, result *structs.PlanResult, snap }, Deployment: result.Deployment, DeploymentUpdates: result.DeploymentUpdates, + IneligibleNodes: result.IneligibleNodes, EvalID: plan.EvalID, + UpdatedAt: now, } preemptedJobIDs := make(map[structs.NamespacedID]struct{}) - now := time.Now().UTC().UnixNano() if ServersMeetMinimumVersion(p.Members(), MinVersionPlanNormalization, true) { // Initialize the allocs request using the new optimized log entry format. @@ -466,6 +501,7 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan // errors since we are processing in parallel. var mErr multierror.Error partialCommit := false + rejectedNodes := make(map[string]struct{}, 0) // handleResult is used to process the result of evaluateNodePlan handleResult := func(nodeID string, fit bool, reason string, err error) (cancel bool) { @@ -489,8 +525,11 @@ func evaluatePlanPlacements(pool *EvaluatePool, snap *state.StateSnapshot, plan "node_id", nodeID, "reason", reason, "eval_id", plan.EvalID, "namespace", plan.Job.Namespace) } - // Set that this is a partial commit + // Set that this is a partial commit and store the node that was + // rejected so the plan applier can detect repeated plan rejections + // for the same node. partialCommit = true + rejectedNodes[nodeID] = struct{}{} // If we require all-at-once scheduling, there is no point // to continue the evaluation, as we've already failed. @@ -595,6 +634,10 @@ OUTER: // placed but wasn't actually placed correctDeploymentCanaries(result) } + + for n := range rejectedNodes { + result.RejectedNodes = append(result.RejectedNodes, n) + } return result, mErr.ErrorOrNil() } diff --git a/nomad/plan_apply_node_tracker.go b/nomad/plan_apply_node_tracker.go new file mode 100644 index 00000000000..48783fbc477 --- /dev/null +++ b/nomad/plan_apply_node_tracker.go @@ -0,0 +1,212 @@ +package nomad + +import ( + "fmt" + "time" + + metrics "github.com/armon/go-metrics" + "github.com/hashicorp/go-hclog" + lru "github.com/hashicorp/golang-lru" + "github.com/hashicorp/nomad/helper" + "golang.org/x/time/rate" +) + +type BadNodeTracker interface { + Add(string) bool + EmitStats(time.Duration, <-chan struct{}) +} + +// NoopBadNodeTracker is a no-op implementation of bad node tracker that is +// used when tracking is disabled. +type NoopBadNodeTracker struct{} + +func (n *NoopBadNodeTracker) EmitStats(time.Duration, <-chan struct{}) {} +func (n *NoopBadNodeTracker) Add(string) bool { + return false +} + +// CachedBadNodeTracker keeps a record of nodes marked as bad by the plan +// applier in a LRU cache. +// +// It takes a time window and a threshold value. Plan rejections for a node +// will be registered with its timestamp. If the number of rejections within +// the time window is greater than the threshold the node is reported as bad. +// +// The tracker uses a fixed size cache that evicts old entries based on access +// frequency and recency. +type CachedBadNodeTracker struct { + logger hclog.Logger + cache *lru.TwoQueueCache + limiter *rate.Limiter + window time.Duration + threshold int +} + +type CachedBadNodeTrackerConfig struct { + CacheSize int + RateLimit float64 + BurstSize int + Window time.Duration + Threshold int +} + +func DefaultCachedBadNodeTrackerConfig() CachedBadNodeTrackerConfig { + return CachedBadNodeTrackerConfig{ + CacheSize: 50, + + // Limit marking 5 nodes per 30min as ineligible with an initial + // burst of 10 nodes. + RateLimit: 5.0 / (30 * 60), + BurstSize: 10, + + // Consider a node as bad if it is added more than 100 times in a 5min + // window period. + Window: 5 * time.Minute, + Threshold: 100, + } +} + +// NewCachedBadNodeTracker returns a new CachedBadNodeTracker. +func NewCachedBadNodeTracker(logger hclog.Logger, config CachedBadNodeTrackerConfig) (*CachedBadNodeTracker, error) { + log := logger.Named("bad_node_tracker"). + With("window", config.Window). + With("threshold", config.Threshold) + + cache, err := lru.New2Q(config.CacheSize) + if err != nil { + return nil, fmt.Errorf("failed to create new bad node tracker: %v", err) + } + + return &CachedBadNodeTracker{ + logger: log, + cache: cache, + limiter: rate.NewLimiter(rate.Limit(config.RateLimit), config.BurstSize), + window: config.Window, + threshold: config.Threshold, + }, nil +} + +// Add records a new rejection for a node and returns true if the number of +// rejections reaches the threshold. +// +// If it's the first time the node is added it will be included in the internal +// cache. If the cache is full the least recently updated or accessed node is +// evicted. +func (c *CachedBadNodeTracker) Add(nodeID string) bool { + value, ok := c.cache.Get(nodeID) + if !ok { + value = newBadNodeStats(nodeID, c.window) + c.cache.Add(nodeID, value) + } + stats := value.(*badNodeStats) + + now := time.Now() + stats.record(now) + + return c.isBad(now, stats) +} + +// EmitStats generates metrics for the bad nodes being currently tracked. Must +// be called in a goroutine. +func (c *CachedBadNodeTracker) EmitStats(period time.Duration, stopCh <-chan struct{}) { + timer, stop := helper.NewSafeTimer(period) + defer stop() + + for { + timer.Reset(period) + + select { + case <-timer.C: + c.emitStats() + case <-stopCh: + return + } + } +} + +// isBad returns true if the node has more rejections than the threshold within +// the time window. +func (c *CachedBadNodeTracker) isBad(t time.Time, stats *badNodeStats) bool { + score := stats.score(t) + logger := c.logger.With("node_id", stats.id, "score", score) + + logger.Trace("checking if node is bad") + if score >= c.threshold { + // Limit the number of nodes we report as bad to avoid mass assigning + // nodes as ineligible, but do it after Get to keep the cache entry + // fresh. + if !c.limiter.Allow() { + logger.Trace("node is bad, but returning false due to rate limiting") + return false + } + return true + } + + return false +} + +func (c *CachedBadNodeTracker) emitStats() { + now := time.Now() + for _, k := range c.cache.Keys() { + value, _ := c.cache.Get(k) + stats := value.(*badNodeStats) + score := stats.score(now) + + labels := []metrics.Label{ + {Name: "node_id", Value: k.(string)}, + } + metrics.SetGaugeWithLabels([]string{"nomad", "plan", "rejection_tracker", "node_score"}, float32(score), labels) + } +} + +// badNodeStats represents a node being tracked by a BadNodeTracker. +type badNodeStats struct { + id string + history []time.Time + window time.Duration +} + +// newBadNodeStats returns an empty badNodeStats. +func newBadNodeStats(id string, window time.Duration) *badNodeStats { + return &badNodeStats{ + id: id, + window: window, + } +} + +// score returns the number of rejections within the past time window. +func (s *badNodeStats) score(t time.Time) int { + active, expired := s.countActive(t) + + // Remove expired records. + if expired > 0 { + s.history = s.history[expired:] + } + + return active +} + +// record adds a new entry to the stats history and returns the new score. +func (s *badNodeStats) record(t time.Time) { + s.history = append(s.history, t) +} + +// countActive returns the number of records that happened after the time +// window started (active) and before (expired). +func (s *badNodeStats) countActive(t time.Time) (int, int) { + windowStart := t.Add(-s.window) + + // Assume all values are expired and move back from history until we find + // a record that actually happened before the window started. + expired := len(s.history) + for ; expired > 0; expired-- { + i := expired - 1 + ts := s.history[i] + if ts.Before(windowStart) { + break + } + } + + active := len(s.history) - expired + return active, expired +} diff --git a/nomad/plan_apply_node_tracker_test.go b/nomad/plan_apply_node_tracker_test.go new file mode 100644 index 00000000000..018335c4f61 --- /dev/null +++ b/nomad/plan_apply_node_tracker_test.go @@ -0,0 +1,164 @@ +package nomad + +import ( + "fmt" + "testing" + "time" + + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/ci" + "github.com/stretchr/testify/require" +) + +func TesCachedtBadNodeTracker(t *testing.T) { + ci.Parallel(t) + + config := DefaultCachedBadNodeTrackerConfig() + config.CacheSize = 3 + + tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config) + require.NoError(t, err) + + for i := 0; i < 10; i++ { + tracker.Add(fmt.Sprintf("node-%d", i+1)) + } + + require.Equal(t, config.CacheSize, tracker.cache.Len()) + + // Only track the most recent values. + expected := []string{"node-8", "node-9", "node-10"} + require.ElementsMatch(t, expected, tracker.cache.Keys()) +} + +func TestCachedBadNodeTracker_isBad(t *testing.T) { + ci.Parallel(t) + + config := DefaultCachedBadNodeTrackerConfig() + config.CacheSize = 3 + config.Threshold = 4 + + tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config) + require.NoError(t, err) + + // Populate cache. + tracker.Add("node-1") + + tracker.Add("node-2") + tracker.Add("node-2") + + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + tracker.Add("node-3") + + testCases := []struct { + name string + nodeID string + bad bool + }{ + { + name: "node-1 is not bad", + nodeID: "node-1", + bad: false, + }, + { + name: "node-3 is bad", + nodeID: "node-3", + bad: true, + }, + } + + now := time.Now() + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Read value from cached. + v, ok := tracker.cache.Get(tc.nodeID) + require.True(t, ok) + + // Check if it's bad. + stats := v.(*badNodeStats) + got := tracker.isBad(now, stats) + require.Equal(t, tc.bad, got) + }) + } + + future := time.Now().Add(2 * config.Window) + nodes := []string{"node-1", "node-2", "node-3"} + for _, n := range nodes { + t.Run(fmt.Sprintf("%s cache expires", n), func(t *testing.T) { + v, ok := tracker.cache.Get(n) + require.True(t, ok) + + stats := v.(*badNodeStats) + bad := tracker.isBad(future, stats) + require.False(t, bad) + }) + } +} + +func TesCachedtBadNodeTracker_rateLimit(t *testing.T) { + ci.Parallel(t) + + config := DefaultCachedBadNodeTrackerConfig() + config.Threshold = 3 + config.RateLimit = float64(1) // Get a new token every second. + config.BurstSize = 3 + + tracker, err := NewCachedBadNodeTracker(hclog.NewNullLogger(), config) + require.NoError(t, err) + + tracker.Add("node-1") + tracker.Add("node-1") + tracker.Add("node-1") + tracker.Add("node-1") + tracker.Add("node-1") + + v, ok := tracker.cache.Get("node-1") + require.True(t, ok) + + stats := v.(*badNodeStats) + + // Burst allows for max 3 operations. + now := time.Now() + require.True(t, tracker.isBad(now, stats)) + require.True(t, tracker.isBad(now, stats)) + require.True(t, tracker.isBad(now, stats)) + require.False(t, tracker.isBad(now, stats)) + + // Wait for a new token. + time.Sleep(time.Second) + now = time.Now() + require.True(t, tracker.isBad(now, stats)) + require.False(t, tracker.isBad(now, stats)) +} + +func TestBadNodeStats_score(t *testing.T) { + ci.Parallel(t) + + window := time.Minute + stats := newBadNodeStats("node-1", window) + + now := time.Now() + require.Equal(t, 0, stats.score(now)) + + stats.record(now) + stats.record(now) + stats.record(now) + require.Equal(t, 3, stats.score(now)) + require.Len(t, stats.history, 3) + + halfWindow := now.Add(window / 2) + stats.record(halfWindow) + require.Equal(t, 4, stats.score(halfWindow)) + require.Len(t, stats.history, 4) + + fullWindow := now.Add(window).Add(time.Second) + require.Equal(t, 1, stats.score(fullWindow)) + require.Len(t, stats.history, 1) + + afterWindow := now.Add(2 * window) + require.Equal(t, 0, stats.score(afterWindow)) + require.Len(t, stats.history, 0) +} diff --git a/nomad/server.go b/nomad/server.go index 9fb64d0a50f..8dee94967eb 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -446,6 +446,9 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr // Emit metrics for the plan queue go s.planQueue.EmitStats(time.Second, s.shutdownCh) + // Emit metrics for the planner's bad node tracker. + go s.planner.badNodeTracker.EmitStats(time.Second, s.shutdownCh) + // Emit metrics for the blocked eval tracker. go s.blockedEvals.EmitStats(time.Second, s.shutdownCh) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6c5fabdda05..41998f81057 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -34,6 +34,17 @@ const ( ) const ( + // NodeEligibilityEventPlanRejectThreshold is the message used when the node + // is set to ineligible due to multiple plan failures. + // This is a preventive measure to signal scheduler workers to not consider + // the node for future placements. + // Plan rejections for a node are expected due to the optimistic and + // concurrent nature of the scheduling process, but repeated failures for + // the same node may indicate an underlying issue not detected by Nomad. + // The plan applier keeps track of plan rejection history and will mark + // nodes as ineligible if they cross a given threshold. + NodeEligibilityEventPlanRejectThreshold = "Node marked as ineligible for scheduling due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information" + // NodeRegisterEventRegistered is the message used when the node becomes // registered. NodeRegisterEventRegistered = "Node registered" @@ -359,6 +370,21 @@ func (s *StateStore) UpsertPlanResults(msgType structs.MessageType, index uint64 txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() + // Mark nodes as ineligible. + for _, nodeID := range results.IneligibleNodes { + s.logger.Warn("marking node as ineligible due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information", "node_id", nodeID) + + nodeEvent := structs.NewNodeEvent(). + SetSubsystem(structs.NodeEventSubsystemScheduler). + SetMessage(NodeEligibilityEventPlanRejectThreshold) + + err := s.updateNodeEligibilityImpl(index, nodeID, + structs.NodeSchedulingIneligible, results.UpdatedAt, nodeEvent, txn) + if err != nil { + return err + } + } + // Upsert the newly created or updated deployment if results.Deployment != nil { if err := s.upsertDeploymentImpl(index, results.Deployment, txn); err != nil { @@ -1136,10 +1162,15 @@ func (s *StateStore) updateNodeDrainImpl(txn *txn, index uint64, nodeID string, // UpdateNodeEligibility is used to update the scheduling eligibility of a node func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent) error { - txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() + if err := s.updateNodeEligibilityImpl(index, nodeID, eligibility, updatedAt, event, txn); err != nil { + return err + } + return txn.Commit() +} +func (s *StateStore) updateNodeEligibilityImpl(index uint64, nodeID string, eligibility string, updatedAt int64, event *structs.NodeEvent, txn *txn) error { // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -1176,7 +1207,7 @@ func (s *StateStore) UpdateNodeEligibility(msgType structs.MessageType, index ui return fmt.Errorf("index update failed: %v", err) } - return txn.Commit() + return nil } // UpsertNodeEvents adds the node events to the nodes, rotating events as diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b376ebfaa14..c8bbbcd5387 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -913,6 +913,14 @@ type ApplyPlanResultsRequest struct { // PreemptionEvals is a slice of follow up evals for jobs whose allocations // have been preempted to place allocs in this plan PreemptionEvals []*Evaluation + + // IneligibleNodes are nodes the plan applier has repeatedly rejected + // placements for and should therefore be considered ineligible by workers + // to avoid retrying them repeatedly. + IneligibleNodes []string + + // UpdatedAt represents server time of receiving request. + UpdatedAt int64 } // AllocUpdateRequest is used to submit changes to allocations, either @@ -1632,6 +1640,7 @@ const ( NodeEventSubsystemDriver = "Driver" NodeEventSubsystemHeartbeat = "Heartbeat" NodeEventSubsystemCluster = "Cluster" + NodeEventSubsystemScheduler = "Scheduler" NodeEventSubsystemStorage = "Storage" ) @@ -11390,6 +11399,16 @@ type PlanResult struct { // as stopped. NodePreemptions map[string][]*Allocation + // RejectedNodes are nodes the scheduler worker has rejected placements for + // and should be considered for ineligibility by the plan applier to avoid + // retrying them repeatedly. + RejectedNodes []string + + // IneligibleNodes are nodes the plan applier has repeatedly rejected + // placements for and should therefore be considered ineligible by workers + // to avoid retrying them repeatedly. + IneligibleNodes []string + // RefreshIndex is the index the worker should refresh state up to. // This allows all evictions and allocations to be materialized. // If any allocations were rejected due to stale data (node state, @@ -11403,8 +11422,9 @@ type PlanResult struct { // IsNoOp checks if this plan result would do nothing func (p *PlanResult) IsNoOp() bool { - return len(p.NodeUpdate) == 0 && len(p.NodeAllocation) == 0 && - len(p.DeploymentUpdates) == 0 && p.Deployment == nil + return len(p.IneligibleNodes) == 0 && len(p.NodeUpdate) == 0 && + len(p.NodeAllocation) == 0 && len(p.DeploymentUpdates) == 0 && + p.Deployment == nil } // FullCommit is used to check if all the allocations in a plan diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index 7872913a064..7ad7494dac8 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -156,6 +156,10 @@ server { disallow this server from making any scheduling decisions. This defaults to the number of CPU cores. +- `plan_rejection_tracker` ([PlanRejectionTracker](#plan_rejection_tracker-parameters)) - + Configuration for the plan rejection tracker that the Nomad leader uses to + track the history of plan rejections. + - `raft_boltdb` - This is a nested object that allows configuring options for Raft's BoltDB based log store. - `no_freelist_sync` - Setting this to `true` will disable syncing the BoltDB @@ -238,6 +242,30 @@ server { section for more information on the format of the string. This field is deprecated in favor of the [server_join stanza][server-join]. +### `plan_rejection_tracker` Parameters + +The leader plan rejection tracker can be adjusted to prevent evaluations from +getting stuck due to always being scheduled to a client that may have an +unexpected issue. Refer to [Monitoring Nomad][monitoring_nomad_progress] for +more details. + +- `enabled` `(bool: false)` - Specifies if plan rejections should be tracked. + +- `node_threshold` `(int: 100)` - The number of plan rejections for a node + within the `node_window` to trigger a client to be set as ineligible. + +- `node_window` `(string: "5m")` - The time window for when plan rejections for + a node should be considered. + +If you observe too many false positives (clients being marked as ineligible +even if they don't present any problem) you may want to increase +`node_threshold`. + +Or if you are noticing jobs not being scheduled due to plan rejections for the +same `node_id` and the client is not being set as ineligible you can try +increasing the `node_window` so more historical rejections are taken into +account. + ## `server` Examples ### Common Setup @@ -331,5 +359,6 @@ server { [update-scheduler-config]: /api-docs/operator/scheduler#update-scheduler-configuration 'Scheduler Config' [bootstrapping a cluster]: /docs/faq#bootstrapping [rfc4648]: https://tools.ietf.org/html/rfc4648#section-5 +[monitoring_nomad_progress]: /docs/operations/monitoring-nomad#progress [`nomad operator keygen`]: /docs/commands/operator/keygen [search]: /docs/configuration/search diff --git a/website/content/docs/operations/metrics-reference.mdx b/website/content/docs/operations/metrics-reference.mdx index edc1286a857..8c29e159215 100644 --- a/website/content/docs/operations/metrics-reference.mdx +++ b/website/content/docs/operations/metrics-reference.mdx @@ -394,6 +394,7 @@ those listed in [Key Metrics](#key-metrics) above. | `nomad.nomad.plan.apply` | Time elapsed to apply a plan | Nanoseconds | Summary | host | | `nomad.nomad.plan.evaluate` | Time elapsed to evaluate a plan | Nanoseconds | Summary | host | | `nomad.nomad.plan.node_rejected` | Number of times a node has had a plan rejected | Integer | Counter | host, node_id | +| `nomad.nomad.plan.rejection_tracker.node_score` | Number of times a node has had a plan rejected within the tracker window | Integer | Gauge | host, node_id | | `nomad.nomad.plan.queue_depth` | Count of evals in the plan queue | Integer | Gauge | host | | `nomad.nomad.plan.submit` | Time elapsed for `Plan.Submit` RPC call | Nanoseconds | Summary | host | | `nomad.nomad.plan.wait_for_index` | Time elapsed that planner waits for the raft index of the plan to be processed | Nanoseconds | Summary | host | @@ -481,5 +482,3 @@ Raft database metrics are emitted by the `raft-boltdb` library. [tagged-metrics]: /docs/telemetry/metrics#tagged-metrics [s_port_plan_failure]: /s/port-plan-failure - - diff --git a/website/content/docs/operations/monitoring-nomad.mdx b/website/content/docs/operations/monitoring-nomad.mdx index 0dccbbefd49..0c21f71969a 100644 --- a/website/content/docs/operations/monitoring-nomad.mdx +++ b/website/content/docs/operations/monitoring-nomad.mdx @@ -149,10 +149,34 @@ While it is possible for these log lines to occur infrequently due to normal cluster conditions, they should not appear repeatedly and prevent the job from eventually running (look up the evaluation ID logged to find the job). -If this log *does* appear repeatedly with the same `node_id` referenced, try +#### Plan rejection tracker + +Nomad provides a mechanism to track the history of plan rejections per client +and mark them as ineligible if the number goes above a given threshold within a +time window. This functionality can be enabled using the +[`plan_rejection_tracker`] server configuration. + +When a node is marked as ineligible due to excessive plan rejections, the +following node event is registered: + +``` +Node marked as ineligible for scheduling due to multiple plan rejections, refer to https://www.nomadproject.io/s/port-plan-failure for more information +``` + +Along with the log line: + +``` +[WARN] nomad.state_store: marking node as ineligible due to multiple plan rejections: node_id=67af2541-5e96-6f54-9095-11089d627626 +``` + +If a client is marked as ineligible due to repeated plan rejections, try [draining] the node and shutting it down. Misconfigurations not caught by validation can cause nodes to enter this state: [#11830][gh-11830]. +If the `plan for node rejected` log *does* appear repeatedly with the same +`node_id` referenced but the client is not being set as ineligible you can try +adjusting the [`plan_rejection_tracker`] configuration of servers. + ### Performance The following metrics allow observing changes in throughput at the various @@ -278,6 +302,7 @@ latency and packet loss for the [Serf] address. [metric-types]: /docs/telemetry/metrics#metric-types [metrics-api-endpoint]: /api-docs/metrics [prometheus-telem]: /docs/configuration/telemetry#prometheus +[`plan_rejection_tracker`]: /docs/configuration/server#plan_rejection_tracker [serf]: /docs/configuration#serf-1 [statsd-exporter]: https://github.com/prometheus/statsd_exporter [statsd-telem]: /docs/configuration/telemetry#statsd