From 4b170e1805bf4b416af51e058fbceb1f04bd55c0 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 14:37:03 +0100 Subject: [PATCH 01/11] core: allow pause/un-pause of eval broker on region leader. --- helper/broker/notify.go | 106 ++++++++++++++++++++++++++++++++ helper/broker/notify_test.go | 55 +++++++++++++++++ nomad/eval_broker.go | 12 +++- nomad/eval_endpoint.go | 17 +++++ nomad/eval_endpoint_test.go | 27 ++++++++ nomad/leader.go | 97 ++++++++++++++++++++++++----- nomad/leader_test.go | 86 ++++++++++++++++++++++++++ nomad/operator_endpoint.go | 12 ++++ nomad/operator_endpoint_test.go | 29 +++++---- nomad/server.go | 20 ++++-- nomad/structs/operator.go | 6 ++ 11 files changed, 434 insertions(+), 33 deletions(-) create mode 100644 helper/broker/notify.go create mode 100644 helper/broker/notify_test.go diff --git a/helper/broker/notify.go b/helper/broker/notify.go new file mode 100644 index 00000000000..24320dce45f --- /dev/null +++ b/helper/broker/notify.go @@ -0,0 +1,106 @@ +package broker + +import ( + "time" + + "github.com/hashicorp/nomad/helper" +) + +// GenericNotifier allows a process to send updates to many subscribers in an +// easy manner. +type GenericNotifier struct { + + // publishCh is the channel used to receive the update which will be sent + // to all subscribers. + publishCh chan interface{} + + // subscribeCh and unsubscribeCh are the channels used to modify the + // subscription membership mapping. + subscribeCh chan chan interface{} + unsubscribeCh chan chan interface{} +} + +// NewGenericNotifier returns a generic notifier which can be used by a process +// to notify many subscribers when a specific update is triggered. +func NewGenericNotifier() *GenericNotifier { + return &GenericNotifier{ + publishCh: make(chan interface{}, 1), + subscribeCh: make(chan chan interface{}, 1), + unsubscribeCh: make(chan chan interface{}, 1), + } +} + +// Notify allows the implementer to notify all subscribers with a specific +// update. There is no guarantee the order in which subscribers receive the +// message which is sent linearly. +func (g *GenericNotifier) Notify(msg interface{}) { + select { + case g.publishCh <- msg: + default: + } +} + +// Run is a long-lived process which handles updating subscribers as well as +// ensuring any update is sent to them. The passed stopCh is used to coordinate +// shutdown. +func (g *GenericNotifier) Run(stopCh <-chan struct{}) { + + // Store our subscribers inline with a map. This map can only be accessed + // via a single channel update at a time, meaning we can manage without + // using a lock. + subscribers := map[chan interface{}]struct{}{} + + for { + select { + case <-stopCh: + return + case msgCh := <-g.subscribeCh: + subscribers[msgCh] = struct{}{} + case msgCh := <-g.unsubscribeCh: + delete(subscribers, msgCh) + case update := <-g.publishCh: + for subscriberCh := range subscribers { + + // The subscribers channels are buffered, but ensure we don't + // block the whole process on this. + select { + case subscriberCh <- update: + default: + } + } + } + } +} + +// WaitForChange allows a subscriber to wait until there is a notification +// change, or the timeout is reached. The function will block until one +// condition is met. +func (g *GenericNotifier) WaitForChange(timeout time.Duration) interface{} { + + // Create a channel and subscribe to any update. This channel is buffered + // to ensure we do not block the main broker process. + updateCh := make(chan interface{}, 1) + g.subscribeCh <- updateCh + + // Create a timeout timer and use the helper to ensure this routine doesn't + // panic and making the stop call clear. + timeoutTimer, timeoutStop := helper.NewSafeTimer(timeout) + + // Defer a function which performs all the required cleanup of the + // subscriber once it has been notified of a change, or reached its wait + // timeout. + defer func() { + g.unsubscribeCh <- updateCh + close(updateCh) + timeoutStop() + }() + + // Enter the main loop which listens for an update or timeout and returns + // this information to the subscriber. + select { + case <-timeoutTimer.C: + return "wait timed out after " + timeout.String() + case update := <-updateCh: + return update + } +} diff --git a/helper/broker/notify_test.go b/helper/broker/notify_test.go new file mode 100644 index 00000000000..51943987e3f --- /dev/null +++ b/helper/broker/notify_test.go @@ -0,0 +1,55 @@ +package broker + +import ( + "sync" + "testing" + "time" + + "github.com/hashicorp/nomad/ci" + "github.com/stretchr/testify/require" +) + +func TestGenericNotifier(t *testing.T) { + ci.Parallel(t) + + // Create the new notifier. + stopChan := make(chan struct{}) + defer close(stopChan) + + notifier := NewGenericNotifier() + go notifier.Run(stopChan) + + // Ensure we have buffered channels. + require.Equal(t, 1, cap(notifier.publishCh)) + require.Equal(t, 1, cap(notifier.subscribeCh)) + require.Equal(t, 1, cap(notifier.unsubscribeCh)) + + // Test that the timeout works. + var timeoutWG sync.WaitGroup + + for i := 0; i < 6; i++ { + go func(wg *sync.WaitGroup) { + wg.Add(1) + msg := notifier.WaitForChange(100 * time.Millisecond) + require.Equal(t, "wait timed out after 100ms", msg) + wg.Done() + }(&timeoutWG) + } + timeoutWG.Wait() + + // Test that all subscribers recieve an update when a single notification + // is sent. + var notifiedWG sync.WaitGroup + + for i := 0; i < 6; i++ { + go func(wg *sync.WaitGroup) { + wg.Add(1) + msg := notifier.WaitForChange(3 * time.Second) + require.Equal(t, "we got an update and not a timeout", msg) + wg.Done() + }(¬ifiedWG) + } + + notifier.Notify("we got an update and not a timeout") + notifiedWG.Wait() +} diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 9aa7542dcfe..b0b0b87d547 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -6,11 +6,13 @@ import ( "errors" "fmt" "math/rand" + "strconv" "sync" "time" metrics "github.com/armon/go-metrics" "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/broker" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/lib/delayheap" "github.com/hashicorp/nomad/nomad/structs" @@ -48,8 +50,10 @@ type EvalBroker struct { nackTimeout time.Duration deliveryLimit int - enabled bool - stats *BrokerStats + enabled bool + enabledNotifier *broker.GenericNotifier + + stats *BrokerStats // evals tracks queued evaluations by ID to de-duplicate enqueue. // The counter is the number of times we've attempted delivery, @@ -131,6 +135,7 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, nackTimeout: timeout, deliveryLimit: deliveryLimit, enabled: false, + enabledNotifier: broker.NewGenericNotifier(), stats: new(BrokerStats), evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), @@ -176,6 +181,9 @@ func (b *EvalBroker) SetEnabled(enabled bool) { if !enabled { b.flush() } + + // Notify all subscribers to state changes of the broker enabled value. + b.enabledNotifier.Notify("eval broker enabled status changed to " + strconv.FormatBool(enabled)) } // Enqueue is used to enqueue a new evaluation diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 2d6af727c32..43b4d8081e0 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -131,6 +131,23 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, args.Timeout = DefaultDequeueTimeout } + // If the eval broker is paused, attempt to block and wait for a state + // change before returning. This avoids a tight loop and mimics the + // behaviour where there are no evals to process. + // + // The call can return because either the timeout is reached or the broker + // SetEnabled function was called to modify its state. It is possible this + // is because of leadership transition, therefore the RPC should exit to + // allow all safety checks and RPC forwarding to occur again. + // + // The log line is trace, because the default worker timeout is 500ms which + // produces a large amount of logging. + if !e.srv.evalBroker.Enabled() { + message := e.srv.evalBroker.enabledNotifier.WaitForChange(args.Timeout) + e.logger.Trace("eval broker wait for un-pause", "message", message) + return nil + } + // Attempt the dequeue eval, token, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout) if err != nil { diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index b23a9f2b285..1a67d0e6a47 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -454,6 +454,33 @@ func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) { } } +func TestEvalEndpoint_Dequeue_BrokerDisabled(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue. + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register a request. + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + + // Disable the eval broker and try to dequeue. + s1.evalBroker.SetEnabled(false) + + get := &structs.EvalDequeueRequest{ + Schedulers: defaultSched, + SchedulerVersion: scheduler.SchedulerVersion, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.EvalDequeueResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp)) + require.Empty(t, resp.Eval) +} + func TestEvalEndpoint_Ack(t *testing.T) { ci.Parallel(t) diff --git a/nomad/leader.go b/nomad/leader.go index d4b40f94380..8ad91bc4cbd 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -289,8 +289,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { s.getOrCreateAutopilotConfig() s.autopilot.Start() - // Initialize scheduler configuration - s.getOrCreateSchedulerConfig() + // Initialize scheduler configuration. + schedulerConfig := s.getOrCreateSchedulerConfig() // Initialize the ClusterID _, _ = s.ClusterID() @@ -302,12 +302,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Start the plan evaluator go s.planApply() - // Enable the eval broker, since we are now the leader - s.evalBroker.SetEnabled(true) - - // Enable the blocked eval tracker, since we are now the leader - s.blockedEvals.SetEnabled(true) - s.blockedEvals.SetTimetable(s.fsm.TimeTable()) + // Start the eval broker and blocked eval broker if these are not paused by + // the operator. + restoreEvals := s.handleEvalBrokerStateChange(schedulerConfig) // Enable the deployment watcher, since we are now the leader s.deploymentWatcher.SetEnabled(true, s.State()) @@ -318,9 +315,12 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Enable the volume watcher, since we are now the leader s.volumeWatcher.SetEnabled(true, s.State(), s.getLeaderAcl()) - // Restore the eval broker state - if err := s.restoreEvals(); err != nil { - return err + // Restore the eval broker state and blocked eval state. If these are + // currently paused, we do not need to do this. + if restoreEvals { + if err := s.restoreEvals(); err != nil { + return err + } } // Activate the vault client @@ -1110,11 +1110,13 @@ func (s *Server) revokeLeadership() error { // Disable the plan queue, since we are no longer leader s.planQueue.SetEnabled(false) - // Disable the eval broker, since it is only useful as a leader + // Disable the eval broker and blocked evals. We do not need to check the + // scheduler configuration paused eval broker value, as the brokers should + // always be paused on the non-leader. + s.brokerLock.Lock() s.evalBroker.SetEnabled(false) - - // Disable the blocked eval tracker, since it is only useful as a leader s.blockedEvals.SetEnabled(false) + s.brokerLock.Unlock() // Disable the periodic dispatcher, since it is only useful as a leader s.periodicDispatcher.SetEnabled(false) @@ -1693,3 +1695,70 @@ func (s *Server) generateClusterID() (string, error) { s.logger.Named("core").Info("established cluster id", "cluster_id", newMeta.ClusterID, "create_time", newMeta.CreateTime) return newMeta.ClusterID, nil } + +// handleEvalBrokerStateChange handles changing the evalBroker and blockedEvals +// enabled status based on the passed scheduler configuration. The boolean +// response indicates whether the caller needs to call restoreEvals() due to +// the brokers being enabled. It is for use when the change must take the +// scheduler configuration into account. This is not needed when calling +// revokeLeadership, as the configuration doesn't matter, and we need to ensure +// the brokers are stopped. +// +// The function checks the server is the leader and uses a mutex to avoid any +// potential timings problems. Consider the following timings: +// - operator updates the configuration via the API +// - the RPC handler applies the change via Raft +// - leadership transitions with write barrier +// - the RPC handler call this function to enact the change +// +// The mutex also protects against a situation where leadership is revoked +// while this function is being called. Ensuring the correct series of actions +// occurs so that state stays consistent. +func (s *Server) handleEvalBrokerStateChange(schedConfig *structs.SchedulerConfiguration) bool { + + // Grab the lock first. Once we have this we can be sure to run everything + // needed before any leader transition can attempt to modify the state. + s.brokerLock.Lock() + defer s.brokerLock.Unlock() + + // If we are no longer the leader, exit early. + if !s.IsLeader() { + return false + } + + // enableEvalBroker tracks whether the evalBroker and blockedEvals + // processes should be enabled or not. It allows us to answer this question + // whether using a persisted Raft configuration, or the default bootstrap + // config. + var enableBrokers, restoreEvals bool + + // The scheduler config can only be persisted to Raft once quorum has been + // established. If this is a fresh cluster, we need to use the default + // scheduler config, otherwise we can use the persisted object. + switch schedConfig { + case nil: + enableBrokers = !s.config.DefaultSchedulerConfig.PauseEvalBroker + default: + enableBrokers = !schedConfig.PauseEvalBroker + } + + // If the evalBroker status is changing, set the new state. + if enableBrokers != s.evalBroker.Enabled() { + s.logger.Info("eval broker status modified", "paused", !enableBrokers) + s.evalBroker.SetEnabled(enableBrokers) + restoreEvals = enableBrokers + } + + // If the blockedEvals status is changing, set the new state. + if enableBrokers != s.blockedEvals.Enabled() { + s.logger.Info("blocked evals status modified", "paused", !enableBrokers) + s.blockedEvals.SetEnabled(enableBrokers) + restoreEvals = enableBrokers + + if enableBrokers { + s.blockedEvals.SetTimetable(s.fsm.TimeTable()) + } + } + + return restoreEvals +} diff --git a/nomad/leader_test.go b/nomad/leader_test.go index c6d92160d3c..62864d115d0 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -1664,3 +1664,89 @@ func waitForStableLeadership(t *testing.T, servers []*Server) *Server { return leader } + +func TestServer_handleEvalBrokerStateChange(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + startValue bool + testServerCallBackConfig func(c *Config) + inputSchedulerConfiguration *structs.SchedulerConfiguration + expectedOutput bool + name string + }{ + { + startValue: false, + testServerCallBackConfig: func(c *Config) { c.DefaultSchedulerConfig.PauseEvalBroker = false }, + inputSchedulerConfiguration: nil, + expectedOutput: true, + name: "bootstrap un-paused", + }, + { + startValue: false, + testServerCallBackConfig: func(c *Config) { c.DefaultSchedulerConfig.PauseEvalBroker = true }, + inputSchedulerConfiguration: nil, + expectedOutput: false, + name: "bootstrap paused", + }, + { + startValue: true, + testServerCallBackConfig: nil, + inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true}, + expectedOutput: false, + name: "state change to paused", + }, + { + startValue: false, + testServerCallBackConfig: nil, + inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true}, + expectedOutput: false, + name: "no state change to paused", + }, + { + startValue: false, + testServerCallBackConfig: nil, + inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: false}, + expectedOutput: true, + name: "state change to un-paused", + }, + { + startValue: false, + testServerCallBackConfig: nil, + inputSchedulerConfiguration: &structs.SchedulerConfiguration{PauseEvalBroker: true}, + expectedOutput: false, + name: "no state change to un-paused", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + // Create a new server and wait for leadership to be established. + testServer, cleanupFn := TestServer(t, nil) + _ = waitForStableLeadership(t, []*Server{testServer}) + defer cleanupFn() + + // If we set a callback config, we are just testing the eventual + // state of the brokers. Otherwise, we set our starting value and + // then perform our state modification change and check. + if tc.testServerCallBackConfig == nil { + testServer.evalBroker.SetEnabled(tc.startValue) + testServer.blockedEvals.SetEnabled(tc.startValue) + actualOutput := testServer.handleEvalBrokerStateChange(tc.inputSchedulerConfiguration) + require.Equal(t, tc.expectedOutput, actualOutput) + } + + // Check the brokers are in the expected state. + var expectedEnabledVal bool + + if tc.inputSchedulerConfiguration == nil { + expectedEnabledVal = !testServer.config.DefaultSchedulerConfig.PauseEvalBroker + } else { + expectedEnabledVal = !tc.inputSchedulerConfiguration.PauseEvalBroker + } + require.Equal(t, expectedEnabledVal, testServer.evalBroker.Enabled()) + require.Equal(t, expectedEnabledVal, testServer.blockedEvals.Enabled()) + }) + } +} diff --git a/nomad/operator_endpoint.go b/nomad/operator_endpoint.go index aab11c539ae..40c27669eac 100644 --- a/nomad/operator_endpoint.go +++ b/nomad/operator_endpoint.go @@ -334,7 +334,19 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe if respBool, ok := resp.(bool); ok { reply.Updated = respBool } + reply.Index = index + + // If we updated the configuration, handle any required state changes within + // the eval broker and blocked evals processes. The state change and + // restore functions have protections around leadership transitions and + // restoring into non-running brokers. + if reply.Updated { + if op.srv.handleEvalBrokerStateChange(&args.Config) { + return op.srv.restoreEvals() + } + } + return nil } diff --git a/nomad/operator_endpoint_test.go b/nomad/operator_endpoint_test.go index 1cac9b8029f..e96493e416d 100644 --- a/nomad/operator_endpoint_test.go +++ b/nomad/operator_endpoint_test.go @@ -402,39 +402,42 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) { c.Build = "0.9.0+unittest" }) defer cleanupS1() - codec := rpcClient(t, s1) + rpcCodec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) - require := require.New(t) - - // Disable preemption + // Disable preemption and pause the eval broker. arg := structs.SchedulerSetConfigRequest{ Config: structs.SchedulerConfiguration{ PreemptionConfig: structs.PreemptionConfig{ SystemSchedulerEnabled: false, }, + PauseEvalBroker: true, }, } arg.Region = s1.config.Region var setResponse structs.SchedulerSetConfigurationResponse - err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerSetConfiguration", &arg, &setResponse) - require.Nil(err) - require.NotZero(setResponse.Index) + err := msgpackrpc.CallWithCodec(rpcCodec, "Operator.SchedulerSetConfiguration", &arg, &setResponse) + require.Nil(t, err) + require.NotZero(t, setResponse.Index) - // Read and verify that preemption is disabled + // Read and verify that preemption is disabled and the eval and blocked + // evals systems are disabled. readConfig := structs.GenericRequest{ QueryOptions: structs.QueryOptions{ Region: s1.config.Region, }, } var reply structs.SchedulerConfigurationResponse - if err := msgpackrpc.CallWithCodec(codec, "Operator.SchedulerGetConfiguration", &readConfig, &reply); err != nil { - t.Fatalf("err: %v", err) - } + err = msgpackrpc.CallWithCodec(rpcCodec, "Operator.SchedulerGetConfiguration", &readConfig, &reply) + require.NoError(t, err) - require.NotZero(reply.Index) - require.False(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + require.NotZero(t, reply.Index) + require.False(t, reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + require.True(t, reply.SchedulerConfig.PauseEvalBroker) + + require.False(t, s1.evalBroker.Enabled()) + require.False(t, s1.blockedEvals.Enabled()) } func TestOperator_SchedulerGetConfiguration_ACL(t *testing.T) { diff --git a/nomad/server.go b/nomad/server.go index 9fb64d0a50f..3c5351addb0 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -190,6 +190,18 @@ type Server struct { // capacity changes. blockedEvals *BlockedEvals + // evalBroker is used to manage the in-progress evaluations + // that are waiting to be brokered to a sub-scheduler + evalBroker *EvalBroker + + // brokerLock is used to synchronise the alteration of the blockedEvals and + // evalBroker enabled state. These two subsystems change state when + // leadership changes or when the user modifies the setting via the + // operator scheduler configuration. This lock allows these actions to be + // performed safely, without potential for user interactions and leadership + // transitions to collide and create inconsistent state. + brokerLock sync.Mutex + // deploymentWatcher is used to watch deployments and their allocations and // make the required calls to continue to transition the deployment. deploymentWatcher *deploymentwatcher.Watcher @@ -200,10 +212,6 @@ type Server struct { // volumeWatcher is used to release volume claims volumeWatcher *volumewatcher.Watcher - // evalBroker is used to manage the in-progress evaluations - // that are waiting to be brokered to a sub-scheduler - evalBroker *EvalBroker - // periodicDispatcher is used to track and create evaluations for periodic jobs. periodicDispatcher *PeriodicDispatch @@ -423,6 +431,10 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, consulConfigEntr return nil, fmt.Errorf("failed to create volume watcher: %v", err) } + // Start the eval broker notification system so any subscribers can get + // updates when the processes SetEnabled is triggered. + go s.evalBroker.enabledNotifier.Run(s.shutdownCh) + // Setup the node drainer. s.setupNodeDrainer() diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go index 633afa6c33e..28452a312f5 100644 --- a/nomad/structs/operator.go +++ b/nomad/structs/operator.go @@ -156,6 +156,12 @@ type SchedulerConfiguration struct { // management ACL token RejectJobRegistration bool `hcl:"reject_job_registration"` + // PauseEvalBroker is a boolean to control whether the evaluation broker + // should be paused on the cluster leader. Only a single broker runs per + // region, and it must be persisted to state so the parameter is consistent + // during leadership transitions. + PauseEvalBroker bool `hcl:"pause_eval_broker"` + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 From 9292cdc3478cfd89f82c0f74ea028e01a8e46464 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 14:37:24 +0100 Subject: [PATCH 02/11] agent: add ability to pause eval broker via scheduler config. --- command/agent/operator_endpoint.go | 1 + command/agent/operator_endpoint_test.go | 41 +++++++++++++------------ 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/command/agent/operator_endpoint.go b/command/agent/operator_endpoint.go index a0f7575f4c6..224ddabe27e 100644 --- a/command/agent/operator_endpoint.go +++ b/command/agent/operator_endpoint.go @@ -262,6 +262,7 @@ func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.R SchedulerAlgorithm: structs.SchedulerAlgorithm(conf.SchedulerAlgorithm), MemoryOversubscriptionEnabled: conf.MemoryOversubscriptionEnabled, RejectJobRegistration: conf.RejectJobRegistration, + PauseEvalBroker: conf.PauseEvalBroker, PreemptionConfig: structs.PreemptionConfig{ SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled, SysBatchSchedulerEnabled: conf.PreemptionConfig.SysBatchSchedulerEnabled, diff --git a/command/agent/operator_endpoint_test.go b/command/agent/operator_endpoint_test.go index 1e090c82dfb..b4166d73006 100644 --- a/command/agent/operator_endpoint_test.go +++ b/command/agent/operator_endpoint_test.go @@ -272,32 +272,32 @@ func TestOperator_ServerHealth_Unhealthy(t *testing.T) { func TestOperator_SchedulerGetConfiguration(t *testing.T) { ci.Parallel(t) httpTest(t, nil, func(s *TestAgent) { - require := require.New(t) body := bytes.NewBuffer(nil) req, _ := http.NewRequest("GET", "/v1/operator/scheduler/configuration", body) resp := httptest.NewRecorder() obj, err := s.Server.OperatorSchedulerConfiguration(resp, req) - require.Nil(err) - require.Equal(200, resp.Code) + require.Nil(t, err) + require.Equal(t, 200, resp.Code) out, ok := obj.(structs.SchedulerConfigurationResponse) - require.True(ok) + require.True(t, ok) // Only system jobs can preempt other jobs by default. - require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) - require.False(out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled) - require.False(out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled) - require.False(out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled) - require.False(out.SchedulerConfig.MemoryOversubscriptionEnabled) + require.True(t, out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + require.False(t, out.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled) + require.False(t, out.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled) + require.False(t, out.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled) + require.False(t, out.SchedulerConfig.MemoryOversubscriptionEnabled) + require.False(t, out.SchedulerConfig.PauseEvalBroker) }) } func TestOperator_SchedulerSetConfiguration(t *testing.T) { ci.Parallel(t) httpTest(t, nil, func(s *TestAgent) { - require := require.New(t) body := bytes.NewBuffer([]byte(` { "MemoryOversubscriptionEnabled": true, + "PauseEvalBroker": true, "PreemptionConfig": { "SystemSchedulerEnabled": true, "ServiceSchedulerEnabled": true @@ -306,11 +306,11 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) { req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body) resp := httptest.NewRecorder() setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req) - require.Nil(err) - require.Equal(200, resp.Code) + require.Nil(t, err) + require.Equal(t, 200, resp.Code) schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse) - require.True(ok) - require.NotZero(schedSetResp.Index) + require.True(t, ok) + require.NotZero(t, schedSetResp.Index) args := structs.GenericRequest{ QueryOptions: structs.QueryOptions{ @@ -320,12 +320,13 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) { var reply structs.SchedulerConfigurationResponse err = s.RPC("Operator.SchedulerGetConfiguration", &args, &reply) - require.Nil(err) - require.True(reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) - require.False(reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled) - require.False(reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled) - require.True(reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled) - require.True(reply.SchedulerConfig.MemoryOversubscriptionEnabled) + require.Nil(t, err) + require.True(t, reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + require.False(t, reply.SchedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled) + require.False(t, reply.SchedulerConfig.PreemptionConfig.BatchSchedulerEnabled) + require.True(t, reply.SchedulerConfig.PreemptionConfig.ServiceSchedulerEnabled) + require.True(t, reply.SchedulerConfig.MemoryOversubscriptionEnabled) + require.True(t, reply.SchedulerConfig.PauseEvalBroker) }) } From 169501272d0e46e9b3142b418b9d0e0ebeac5545 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 14:37:40 +0100 Subject: [PATCH 03/11] cli: add operator scheduler commands to interact with config. --- command/commands.go | 16 +- command/monitor.go | 6 +- command/operator_scheduler.go | 42 ++++ command/operator_scheduler_get_config.go | 118 ++++++++++ command/operator_scheduler_get_config_test.go | 47 ++++ command/operator_scheduler_set_config.go | 210 ++++++++++++++++++ command/operator_scheduler_set_config_test.go | 108 +++++++++ 7 files changed, 543 insertions(+), 4 deletions(-) create mode 100644 command/operator_scheduler.go create mode 100644 command/operator_scheduler_get_config.go create mode 100644 command/operator_scheduler_get_config_test.go create mode 100644 command/operator_scheduler_set_config.go create mode 100644 command/operator_scheduler_set_config_test.go diff --git a/command/commands.go b/command/commands.go index 3e697a66adf..9bdefe891cf 100644 --- a/command/commands.go +++ b/command/commands.go @@ -571,7 +571,21 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, - + "operator scheduler": func() (cli.Command, error) { + return &OperatorSchedulerCommand{ + Meta: meta, + }, nil + }, + "operator scheduler get-config": func() (cli.Command, error) { + return &OperatorSchedulerGetConfig{ + Meta: meta, + }, nil + }, + "operator scheduler set-config": func() (cli.Command, error) { + return &OperatorSchedulerSetConfig{ + Meta: meta, + }, nil + }, "operator snapshot": func() (cli.Command, error) { return &OperatorSnapshotCommand{ Meta: meta, diff --git a/command/monitor.go b/command/monitor.go index 631e2671755..2214523fde8 100644 --- a/command/monitor.go +++ b/command/monitor.go @@ -186,6 +186,9 @@ func (m *monitor) monitor(evalID string) int { // Add the initial pending state m.update(newEvalState()) + m.ui.Info(fmt.Sprintf("%s: Monitoring evaluation %q", + formatTime(time.Now()), limit(evalID, m.length))) + for { // Query the evaluation eval, _, err := m.client.Evaluations().Info(evalID, nil) @@ -194,9 +197,6 @@ func (m *monitor) monitor(evalID string) int { return 1 } - m.ui.Info(fmt.Sprintf("%s: Monitoring evaluation %q", - formatTime(time.Now()), limit(eval.ID, m.length))) - // Create the new eval state. state := newEvalState() state.status = eval.Status diff --git a/command/operator_scheduler.go b/command/operator_scheduler.go new file mode 100644 index 00000000000..3a555237438 --- /dev/null +++ b/command/operator_scheduler.go @@ -0,0 +1,42 @@ +package command + +import ( + "strings" + + "github.com/mitchellh/cli" +) + +// Ensure OperatorSchedulerCommand satisfies the cli.Command interface. +var _ cli.Command = &OperatorSchedulerCommand{} + +type OperatorSchedulerCommand struct { + Meta +} + +func (o *OperatorSchedulerCommand) Help() string { + helpText := ` +Usage: nomad operator scheduler [options] + + This command groups subcommands for interacting with Nomad's scheduler + subsystem. + + Get the scheduler configuration: + + $ nomad operator scheduler get-config + + Set the scheduler to use the spread algorithm: + + $ nomad operator scheduler set-config -scheduler-algorithm=spread + + Please see the individual subcommand help for detailed usage information. +` + return strings.TrimSpace(helpText) +} + +func (o *OperatorSchedulerCommand) Synopsis() string { + return "Provides access to the scheduler configuration" +} + +func (o *OperatorSchedulerCommand) Name() string { return "operator scheduler" } + +func (o *OperatorSchedulerCommand) Run(_ []string) int { return cli.RunResultHelp } diff --git a/command/operator_scheduler_get_config.go b/command/operator_scheduler_get_config.go new file mode 100644 index 00000000000..cc286729a58 --- /dev/null +++ b/command/operator_scheduler_get_config.go @@ -0,0 +1,118 @@ +package command + +import ( + "fmt" + "strings" + + "github.com/mitchellh/cli" + "github.com/posener/complete" +) + +// Ensure OperatorSchedulerGetConfig satisfies the cli.Command interface. +var _ cli.Command = &OperatorSchedulerGetConfig{} + +type OperatorSchedulerGetConfig struct { + Meta + + json bool + tmpl string +} + +func (o *OperatorSchedulerGetConfig) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(o.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-json": complete.PredictNothing, + "-t": complete.PredictAnything, + }, + ) +} + +func (o *OperatorSchedulerGetConfig) AutocompleteArgs() complete.Predictor { + return complete.PredictNothing +} + +func (o *OperatorSchedulerGetConfig) Name() string { return "operator scheduler get-config" } + +func (o *OperatorSchedulerGetConfig) Run(args []string) int { + + flags := o.Meta.FlagSet("get-config", FlagSetClient) + flags.BoolVar(&o.json, "json", false, "") + flags.StringVar(&o.tmpl, "t", "", "") + flags.Usage = func() { o.Ui.Output(o.Help()) } + + if err := flags.Parse(args); err != nil { + return 1 + } + + // Set up a client. + client, err := o.Meta.Client() + if err != nil { + o.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + // Fetch the current configuration. + resp, _, err := client.Operator().SchedulerGetConfiguration(nil) + if err != nil { + o.Ui.Error(fmt.Sprintf("Error querying scheduler configuration: %s", err)) + return 1 + } + + // If the user has specified to output the scheduler config as JSON or + // using a template, perform this action for the entire object and exit the + // command. + if o.json || len(o.tmpl) > 0 { + out, err := Format(o.json, o.tmpl, resp) + if err != nil { + o.Ui.Error(err.Error()) + return 1 + } + o.Ui.Output(out) + return 0 + } + + schedConfig := resp.SchedulerConfig + + // Output the information. + o.Ui.Output(formatKV([]string{ + fmt.Sprintf("Scheduler Algorithm|%s", schedConfig.SchedulerAlgorithm), + fmt.Sprintf("Memory Oversubscription|%v", schedConfig.MemoryOversubscriptionEnabled), + fmt.Sprintf("Reject Job Registration|%v", schedConfig.RejectJobRegistration), + fmt.Sprintf("Pause Eval Broker|%v", schedConfig.PauseEvalBroker), + fmt.Sprintf("Preemption System Scheduler|%v", schedConfig.PreemptionConfig.SystemSchedulerEnabled), + fmt.Sprintf("Preemption Service Scheduler|%v", schedConfig.PreemptionConfig.ServiceSchedulerEnabled), + fmt.Sprintf("Preemption Batch Scheduler|%v", schedConfig.PreemptionConfig.BatchSchedulerEnabled), + fmt.Sprintf("Preemption SysBatch Scheduler|%v", schedConfig.PreemptionConfig.SysBatchSchedulerEnabled), + fmt.Sprintf("Modify Index|%v", resp.SchedulerConfig.ModifyIndex), + })) + return 0 +} + +func (o *OperatorSchedulerGetConfig) Synopsis() string { + return "Display the current scheduler configuration" +} + +func (o *OperatorSchedulerGetConfig) Help() string { + helpText := ` +Usage: nomad operator scheduler get-config [options] + + Displays the current scheduler configuration. + + If ACLs are enabled, this command requires a token with the 'operator:read' + capability. + +General Options: + + ` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + ` + +Scheduler Get Config Options: + + -json + Output the scheduler config in its JSON format. + + -t + Format and display the scheduler config using a Go template. +` + + return strings.TrimSpace(helpText) +} diff --git a/command/operator_scheduler_get_config_test.go b/command/operator_scheduler_get_config_test.go new file mode 100644 index 00000000000..7ece5a28dc5 --- /dev/null +++ b/command/operator_scheduler_get_config_test.go @@ -0,0 +1,47 @@ +package command + +import ( + "encoding/json" + "testing" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/ci" + "github.com/mitchellh/cli" + "github.com/stretchr/testify/require" +) + +func TestOperatorSchedulerGetConfig_Run(t *testing.T) { + ci.Parallel(t) + + srv, _, addr := testServer(t, false, nil) + defer srv.Shutdown() + + ui := cli.NewMockUi() + c := &OperatorSchedulerGetConfig{Meta: Meta{Ui: ui}} + + // Run the command, so we get the default output and test this. + require.EqualValues(t, 0, c.Run([]string{"-address=" + addr})) + s := ui.OutputWriter.String() + require.Contains(t, s, "Scheduler Algorithm = binpack") + require.Contains(t, s, "Preemption SysBatch Scheduler = false") + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + + // Request JSON output and test. + require.EqualValues(t, 0, c.Run([]string{"-address=" + addr, "-json"})) + s = ui.OutputWriter.String() + var js api.SchedulerConfiguration + require.NoError(t, json.Unmarshal([]byte(s), &js)) + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + + // Request a template output and test. + require.EqualValues(t, 0, c.Run([]string{"-address=" + addr, "-t='{{printf \"%s!!!\" .SchedulerConfig.SchedulerAlgorithm}}'"})) + require.Contains(t, ui.OutputWriter.String(), "binpack!!!") + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + + // Test an unsupported flag. + require.EqualValues(t, 1, c.Run([]string{"-address=" + addr, "-yaml"})) + require.Contains(t, ui.OutputWriter.String(), "Usage: nomad operator scheduler get-config") +} diff --git a/command/operator_scheduler_set_config.go b/command/operator_scheduler_set_config.go new file mode 100644 index 00000000000..e7d03d39912 --- /dev/null +++ b/command/operator_scheduler_set_config.go @@ -0,0 +1,210 @@ +package command + +import ( + "fmt" + "strings" + + "github.com/hashicorp/nomad/api" + flagHelper "github.com/hashicorp/nomad/helper/flags" + "github.com/mitchellh/cli" + "github.com/posener/complete" +) + +// Ensure OperatorSchedulerSetConfig satisfies the cli.Command interface. +var _ cli.Command = &OperatorSchedulerSetConfig{} + +type OperatorSchedulerSetConfig struct { + Meta + + // The scheduler configuration flags allow us to tell whether the user set + // a value or not. This means we can safely merge the current configuration + // with user supplied, selective updates. + checkIndex string + schedulerAlgorithm string + memoryOversubscription flagHelper.BoolValue + rejectJobRegistration flagHelper.BoolValue + pauseEvalBroker flagHelper.BoolValue + preemptBatchScheduler flagHelper.BoolValue + preemptServiceScheduler flagHelper.BoolValue + preemptSysBatchScheduler flagHelper.BoolValue + preemptSystemScheduler flagHelper.BoolValue +} + +func (o *OperatorSchedulerSetConfig) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(o.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-check-index": complete.PredictAnything, + "-scheduler-algorithm": complete.PredictSet( + string(api.SchedulerAlgorithmBinpack), + string(api.SchedulerAlgorithmSpread), + ), + "-memory-oversubscription": complete.PredictSet("true", "false"), + "-reject-job-registration": complete.PredictSet("true", "false"), + "-pause-eval-broker": complete.PredictSet("true", "false"), + "-preempt-batch-scheduler": complete.PredictSet("true", "false"), + "-preempt-service-scheduler": complete.PredictSet("true", "false"), + "-preempt-sysbatch-scheduler": complete.PredictSet("true", "false"), + "-preempt-system-scheduler": complete.PredictSet("true", "false"), + }, + ) +} + +func (o *OperatorSchedulerSetConfig) AutocompleteArgs() complete.Predictor { + return complete.PredictNothing +} + +func (o *OperatorSchedulerSetConfig) Name() string { return "operator scheduler set-config" } + +func (o *OperatorSchedulerSetConfig) Run(args []string) int { + + flags := o.Meta.FlagSet("set-config", FlagSetClient) + flags.Usage = func() { o.Ui.Output(o.Help()) } + + flags.StringVar(&o.checkIndex, "check-index", "", "") + flags.StringVar(&o.schedulerAlgorithm, "scheduler-algorithm", "", "") + flags.Var(&o.memoryOversubscription, "memory-oversubscription", "") + flags.Var(&o.rejectJobRegistration, "reject-job-registration", "") + flags.Var(&o.pauseEvalBroker, "pause-eval-broker", "") + flags.Var(&o.preemptBatchScheduler, "preempt-batch-scheduler", "") + flags.Var(&o.preemptServiceScheduler, "preempt-service-scheduler", "") + flags.Var(&o.preemptSysBatchScheduler, "preempt-sysbatch-scheduler", "") + flags.Var(&o.preemptSystemScheduler, "preempt-system-scheduler", "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + // Set up a client. + client, err := o.Meta.Client() + if err != nil { + o.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + // Check that we got no arguments. + args = flags.Args() + if l := len(args); l != 0 { + o.Ui.Error("This command takes no arguments") + o.Ui.Error(commandErrorText(o)) + return 1 + } + + // Convert the check index string and handle any errors before adding this + // to our request. This parsing handles empty values correctly. + checkIndex, _, err := parseCheckIndex(o.checkIndex) + if err != nil { + o.Ui.Error(fmt.Sprintf("Error parsing check-index value %q: %v", o.checkIndex, err)) + return 1 + } + + // Fetch the current configuration. This will be used as a base to merge + // user configuration onto. + resp, _, err := client.Operator().SchedulerGetConfiguration(nil) + if err != nil { + o.Ui.Error(fmt.Sprintf("Error querying for scheduler configuration: %s", err)) + return 1 + } + + if checkIndex > 0 && resp.SchedulerConfig.ModifyIndex != checkIndex { + errMsg := fmt.Sprintf("check-index %v does not match does not match current state value %v", + checkIndex, resp.SchedulerConfig.ModifyIndex) + o.Ui.Error(fmt.Sprintf("Error performing check index set: %s", errMsg)) + return 1 + } + + schedulerConfig := resp.SchedulerConfig + + // Overwrite the modification index if the user supplied one, otherwise we + // use what was included within the read response. + if checkIndex > 0 { + schedulerConfig.ModifyIndex = checkIndex + } + + // Merge the current configuration with any values set by the operator. + if o.schedulerAlgorithm != "" { + schedulerConfig.SchedulerAlgorithm = api.SchedulerAlgorithm(o.schedulerAlgorithm) + } + o.memoryOversubscription.Merge(&schedulerConfig.MemoryOversubscriptionEnabled) + o.rejectJobRegistration.Merge(&schedulerConfig.RejectJobRegistration) + o.pauseEvalBroker.Merge(&schedulerConfig.PauseEvalBroker) + o.preemptBatchScheduler.Merge(&schedulerConfig.PreemptionConfig.BatchSchedulerEnabled) + o.preemptServiceScheduler.Merge(&schedulerConfig.PreemptionConfig.ServiceSchedulerEnabled) + o.preemptSysBatchScheduler.Merge(&schedulerConfig.PreemptionConfig.SysBatchSchedulerEnabled) + o.preemptSystemScheduler.Merge(&schedulerConfig.PreemptionConfig.SystemSchedulerEnabled) + + // Check-and-set the new configuration. + result, _, err := client.Operator().SchedulerCASConfiguration(schedulerConfig, nil) + if err != nil { + o.Ui.Error(fmt.Sprintf("Error setting scheduler configuration: %s", err)) + return 1 + } + if result.Updated { + o.Ui.Output("Scheduler configuration updated!") + return 0 + } + o.Ui.Output("Scheduler configuration could not be atomically updated, please try again") + return 1 +} + +func (o *OperatorSchedulerSetConfig) Synopsis() string { + return "Modify the current scheduler configuration" +} + +func (o *OperatorSchedulerSetConfig) Help() string { + helpText := ` +Usage: nomad operator scheduler set-config [options] + + Modifies the current scheduler configuration. + + If ACLs are enabled, this command requires a token with the 'operator:write' + capability. + +General Options: + + ` + generalOptionsUsage(usageOptsDefault|usageOptsNoNamespace) + ` + +Scheduler Set Config Options: + + -check-index + If set, the scheduler config is only updated if the passed modify index + matches the current server side version. If a non-zero value is passed, it + ensures that the scheduler config is being updated from a known state. + + -scheduler-algorithm=["binpack"|"spread"] + Specifies whether scheduler binpacks or spreads allocations on available + nodes. + + -memory-oversubscription=[true|false] + When true, tasks may exceed their reserved memory limit, if the client has + excess memory capacity. Tasks must specify memory_max to take advantage of + memory oversubscription. + + -reject-job-registration=[true|false] + When true, the server will return permission denied errors for job registration, + job dispatch, and job scale APIs, unless the ACL token for the request is a + management token. If ACLs are disabled, no user will be able to register jobs. + This allows operators to shed load from automated processes during incident + response. + + -pause-eval-broker=[true|false] + When set to true, the eval broker which usually runs on the leader will be + disabled. This will prevent the scheduler workers from receiving new work. + + -preempt-batch-scheduler=[true|false] + Specifies whether preemption for batch jobs is enabled. Note that if this + is set to true, then batch jobs can preempt any other jobs. + + -preempt-service-scheduler=[true|false] + Specifies whether preemption for service jobs is enabled. Note that if this + is set to true, then service jobs can preempt any other jobs. + + -preempt-sysbatch-scheduler=[true|false] + Specifies whether preemption for system batch jobs is enabled. Note that if + this is set to true, then system batch jobs can preempt any other jobs. + + -preempt-system-scheduler=[true|false] + Specifies whether preemption for system jobs is enabled. Note that if this + is set to true, then system jobs can preempt any other jobs. +` + return strings.TrimSpace(helpText) +} diff --git a/command/operator_scheduler_set_config_test.go b/command/operator_scheduler_set_config_test.go new file mode 100644 index 00000000000..7d4b2fdfca9 --- /dev/null +++ b/command/operator_scheduler_set_config_test.go @@ -0,0 +1,108 @@ +package command + +import ( + "strconv" + "testing" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/ci" + "github.com/mitchellh/cli" + "github.com/stretchr/testify/require" +) + +func TestOperatorSchedulerSetConfig_Run(t *testing.T) { + ci.Parallel(t) + + srv, _, addr := testServer(t, false, nil) + defer srv.Shutdown() + + ui := cli.NewMockUi() + c := &OperatorSchedulerSetConfig{Meta: Meta{Ui: ui}} + + bootstrappedConfig, _, err := srv.Client().Operator().SchedulerGetConfiguration(nil) + require.NoError(t, err) + require.NotEmpty(t, bootstrappedConfig.SchedulerConfig) + + // Run the command with zero value and ensure the configuration does not + // change. + require.EqualValues(t, 0, c.Run([]string{"-address=" + addr})) + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + + // Read the configuration again and test that nothing has changed which + // ensures our empty flags are working correctly. + nonModifiedConfig, _, err := srv.Client().Operator().SchedulerGetConfiguration(nil) + require.NoError(t, err) + schedulerConfigEquals(t, bootstrappedConfig.SchedulerConfig, nonModifiedConfig.SchedulerConfig) + + // Modify every configuration parameter using the flags. This ensures the + // merging is working correctly and that operators can control the entire + // object via the CLI. + modifyingArgs := []string{ + "-address=" + addr, + "-scheduler-algorithm=spread", + "-pause-eval-broker=true", + "-memory-oversubscription=true", + "-reject-job-registration=true", + "-preempt-batch-scheduler=true", + "-preempt-service-scheduler=true", + "-preempt-sysbatch-scheduler=true", + "-preempt-system-scheduler=false", + } + require.EqualValues(t, 0, c.Run(modifyingArgs)) + s := ui.OutputWriter.String() + require.Contains(t, s, "Scheduler configuration updated!") + + modifiedConfig, _, err := srv.Client().Operator().SchedulerGetConfiguration(nil) + require.NoError(t, err) + schedulerConfigEquals(t, &api.SchedulerConfiguration{ + SchedulerAlgorithm: "spread", + PreemptionConfig: api.PreemptionConfig{ + SystemSchedulerEnabled: false, + SysBatchSchedulerEnabled: true, + BatchSchedulerEnabled: true, + ServiceSchedulerEnabled: true, + }, + MemoryOversubscriptionEnabled: true, + RejectJobRegistration: true, + PauseEvalBroker: true, + }, modifiedConfig.SchedulerConfig) + + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + + // Make a Freudian slip with one of the flags to ensure the usage is + // returned. + require.EqualValues(t, 1, c.Run([]string{"-address=" + addr, "-pause-evil-broker=true"})) + require.Contains(t, ui.OutputWriter.String(), "Usage: nomad operator scheduler set-config") + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + + // Try updating the config using an incorrect check-index value. + require.EqualValues(t, 1, c.Run([]string{ + "-address=" + addr, + "-pause-eval-broker=false", + "-check-index=1000000", + })) + require.Contains(t, ui.ErrorWriter.String(), "check-index 1000000 does not match does not match current state") + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + + // Try updating the config using a correct check-index value. + require.EqualValues(t, 0, c.Run([]string{ + "-address=" + addr, + "-pause-eval-broker=false", + "-check-index=" + strconv.FormatUint(modifiedConfig.SchedulerConfig.ModifyIndex, 10), + })) + require.Contains(t, ui.OutputWriter.String(), "Scheduler configuration updated!") + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() +} + +func schedulerConfigEquals(t *testing.T, expected, actual *api.SchedulerConfiguration) { + require.Equal(t, expected.SchedulerAlgorithm, actual.SchedulerAlgorithm) + require.Equal(t, expected.RejectJobRegistration, actual.RejectJobRegistration) + require.Equal(t, expected.MemoryOversubscriptionEnabled, actual.MemoryOversubscriptionEnabled) + require.Equal(t, expected.PauseEvalBroker, actual.PauseEvalBroker) + require.Equal(t, expected.PreemptionConfig, actual.PreemptionConfig) +} From 8caeb9427f4be25c573b5493a8258c60afab3324 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 14:39:00 +0100 Subject: [PATCH 04/11] api: add ability to pause eval broker via scheduler config --- api/operator.go | 4 ++++ api/operator_test.go | 44 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) diff --git a/api/operator.go b/api/operator.go index c3343c5e4ce..8a3f74def46 100644 --- a/api/operator.go +++ b/api/operator.go @@ -134,6 +134,10 @@ type SchedulerConfiguration struct { // management ACL token RejectJobRegistration bool + // PauseEvalBroker stops the leader evaluation broker process from running + // until the configuration is updated and written to the Nomad servers. + PauseEvalBroker bool + // CreateIndex/ModifyIndex store the create/modify indexes of this configuration. CreateIndex uint64 ModifyIndex uint64 diff --git a/api/operator_test.go b/api/operator_test.go index 276aefb186c..bfefcacbe4d 100644 --- a/api/operator_test.go +++ b/api/operator_test.go @@ -5,6 +5,7 @@ import ( "testing" "github.com/hashicorp/nomad/api/internal/testutil" + "github.com/stretchr/testify/require" ) func TestOperator_RaftGetConfiguration(t *testing.T) { @@ -53,3 +54,46 @@ func TestOperator_RaftRemovePeerByID(t *testing.T) { t.Fatalf("err: %v", err) } } + +func TestOperator_SchedulerGetConfiguration(t *testing.T) { + testutil.Parallel(t) + c, s := makeClient(t, nil, nil) + defer s.Stop() + + schedulerConfig, _, err := c.Operator().SchedulerGetConfiguration(nil) + require.Nil(t, err) + require.NotEmpty(t, schedulerConfig) +} + +func TestOperator_SchedulerSetConfiguration(t *testing.T) { + testutil.Parallel(t) + c, s := makeClient(t, nil, nil) + defer s.Stop() + + newSchedulerConfig := SchedulerConfiguration{ + SchedulerAlgorithm: SchedulerAlgorithmSpread, + PreemptionConfig: PreemptionConfig{ + SystemSchedulerEnabled: true, + SysBatchSchedulerEnabled: true, + BatchSchedulerEnabled: true, + ServiceSchedulerEnabled: true, + }, + MemoryOversubscriptionEnabled: true, + RejectJobRegistration: true, + PauseEvalBroker: true, + } + + schedulerConfigUpdateResp, _, err := c.Operator().SchedulerSetConfiguration(&newSchedulerConfig, nil) + require.Nil(t, err) + require.True(t, schedulerConfigUpdateResp.Updated) + + // We can't exactly predict the query meta responses, so we test fields + // individually. + schedulerConfig, _, err := c.Operator().SchedulerGetConfiguration(nil) + require.Nil(t, err) + require.Equal(t, schedulerConfig.SchedulerConfig.SchedulerAlgorithm, SchedulerAlgorithmSpread) + require.True(t, schedulerConfig.SchedulerConfig.PauseEvalBroker) + require.True(t, schedulerConfig.SchedulerConfig.RejectJobRegistration) + require.True(t, schedulerConfig.SchedulerConfig.MemoryOversubscriptionEnabled) + require.Equal(t, newSchedulerConfig.PreemptionConfig, schedulerConfig.SchedulerConfig.PreemptionConfig) +} From df7eb6a07e451cfeeb2c70c140d5f4f8d96a10de Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 14:39:32 +0100 Subject: [PATCH 05/11] e2e: add operator scheduler test for eval broker pause. --- e2e/operator_scheduler/doc.go | 6 + e2e/operator_scheduler/input/basic.nomad | 21 ++++ .../operator_scheduler_test.go | 117 ++++++++++++++++++ 3 files changed, 144 insertions(+) create mode 100644 e2e/operator_scheduler/doc.go create mode 100644 e2e/operator_scheduler/input/basic.nomad create mode 100644 e2e/operator_scheduler/operator_scheduler_test.go diff --git a/e2e/operator_scheduler/doc.go b/e2e/operator_scheduler/doc.go new file mode 100644 index 00000000000..b8f238c8d89 --- /dev/null +++ b/e2e/operator_scheduler/doc.go @@ -0,0 +1,6 @@ +// Package operator_scheduler provides end-to-end tests for the Nomad operator +// scheduler functionality and configuration options. +// +// In order to run this test suite only, from the e2e directory you can trigger +// go test -v -run '^TestOperatorScheduler' ./operator_scheduler +package operator_scheduler diff --git a/e2e/operator_scheduler/input/basic.nomad b/e2e/operator_scheduler/input/basic.nomad new file mode 100644 index 00000000000..96dd31627ea --- /dev/null +++ b/e2e/operator_scheduler/input/basic.nomad @@ -0,0 +1,21 @@ +job "operator_scheduler" { + datacenters = ["dc1"] + type = "batch" + + constraint { + attribute = "${attr.kernel.name}" + value = "linux" + } + + group "operator_scheduler" { + + task "test" { + driver = "raw_exec" + + config { + command = "bash" + args = ["-c", "sleep 1"] + } + } + } +} diff --git a/e2e/operator_scheduler/operator_scheduler_test.go b/e2e/operator_scheduler/operator_scheduler_test.go new file mode 100644 index 00000000000..1872a31d857 --- /dev/null +++ b/e2e/operator_scheduler/operator_scheduler_test.go @@ -0,0 +1,117 @@ +package operator_scheduler + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/nomad/e2e/e2eutil" + "github.com/hashicorp/nomad/helper/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +const jobBasic = "./input/basic.nomad" + +// TestOperatorScheduler runs the Nomad Operator Scheduler suit of tests which +// focus on the behaviour of the /v1/operator/scheduler API. +func TestOperatorScheduler(t *testing.T) { + + // Wait until we have a usable cluster before running the tests. + nomadClient := e2eutil.NomadClient(t) + e2eutil.WaitForLeader(t, nomadClient) + e2eutil.WaitForNodesReady(t, nomadClient, 1) + + // Run our test cases. + t.Run("TestOperatorScheduler_ConfigPauseEvalBroker", testConfigPauseEvalBroker) +} + +// testConfig tests pausing and un-pausing the eval broker and ensures the +// correct behaviour is observed at each stage. +func testConfigPauseEvalBroker(t *testing.T) { + + nomadClient := e2eutil.NomadClient(t) + + // Generate our job ID which will be used for the entire test. + jobID := "operator-scheduler-config-pause-eval-broker-" + uuid.Generate()[:8] + jobIDs := []string{jobID} + + // Defer a cleanup function to remove the job. This will trigger if the + // test fails, unless the cancel function is called. + ctx, cancel := context.WithCancel(context.Background()) + defer e2eutil.CleanupJobsAndGCWithContext(t, ctx, &jobIDs) + + // Register the job and ensure the alloc reaches the running state before + // moving safely on. + allocStubs := e2eutil.RegisterAndWaitForAllocs(t, nomadClient, jobBasic, jobID, "") + require.Len(t, allocStubs, 1) + e2eutil.WaitForAllocRunning(t, nomadClient, allocStubs[0].ID) + + // Get the current scheduler config object. + schedulerConfig, _, err := nomadClient.Operator().SchedulerGetConfiguration(nil) + require.NoError(t, err) + require.NotNil(t, schedulerConfig.SchedulerConfig) + + // Set the eval broker to be paused. + schedulerConfig.SchedulerConfig.PauseEvalBroker = true + + // Write the config back to Nomad. + schedulerConfigUpdate, _, err := nomadClient.Operator().SchedulerSetConfiguration( + schedulerConfig.SchedulerConfig, nil) + require.NoError(t, err) + require.True(t, schedulerConfigUpdate.Updated) + + // Perform a deregister call. The call will succeed and create an + // evaluation. Do not use purge, so we can check the job status when + // dereigster happens. + evalID, _, err := nomadClient.Jobs().Deregister(jobID, false, nil) + require.NoError(t, err) + require.NotEmpty(t, evalID) + + // Evaluation status is set to pending initially, so there isn't a great + // way to ensure it doesn't transition to another status other than polling + // for a long enough time to assume it won't change. + timedFn := func() error { + + // 5 seconds should be more than enough time for an eval to change + // status unless the broker is disabled. + timer := time.NewTimer(5 * time.Second) + defer timer.Stop() + + for { + select { + case <-timer.C: + return nil + default: + evalInfo, _, err := nomadClient.Evaluations().Info(evalID, nil) + if err != nil { + return err + } + if !assert.Equal(t, "pending", evalInfo.Status) { + return fmt.Errorf(`expected eval status "pending", got %q`, evalInfo.Status) + } + time.Sleep(1 * time.Second) + } + } + } + require.NoError(t, timedFn()) + + // Set the eval broker to be un-paused. + schedulerConfig.SchedulerConfig.PauseEvalBroker = false + + // Write the config back to Nomad. + schedulerConfigUpdate, _, err = nomadClient.Operator().SchedulerSetConfiguration( + schedulerConfig.SchedulerConfig, nil) + require.NoError(t, err) + require.True(t, schedulerConfigUpdate.Updated) + + // Ensure the job is stopped, then run the garbage collection to clear out + // all resources. + e2eutil.WaitForJobStopped(t, nomadClient, jobID) + _, err = e2eutil.Command("nomad", "system", "gc") + require.NoError(t, err) + + // If we have reached this far, we do not need to run the cleanup function. + cancel() +} From 5f8a5ea62d0fd935ac1df72f067b36df0e97f253 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 14:39:49 +0100 Subject: [PATCH 06/11] docs: include new opertor scheduler CLI and pause eval API info. --- .changelog/13045.txt | 7 ++ .../content/api-docs/operator/scheduler.mdx | 56 +++++++++---- .../content/docs/commands/operator/index.mdx | 8 ++ .../operator/scheduler-get-config.mdx | 47 +++++++++++ .../operator/scheduler-set-config.mdx | 82 +++++++++++++++++++ website/content/docs/configuration/server.mdx | 7 +- website/data/docs-nav-data.json | 8 ++ 7 files changed, 197 insertions(+), 18 deletions(-) create mode 100644 .changelog/13045.txt create mode 100644 website/content/docs/commands/operator/scheduler-get-config.mdx create mode 100644 website/content/docs/commands/operator/scheduler-set-config.mdx diff --git a/.changelog/13045.txt b/.changelog/13045.txt new file mode 100644 index 00000000000..cb930bcf867 --- /dev/null +++ b/.changelog/13045.txt @@ -0,0 +1,7 @@ +```release-note:improvement +cli: Added `scheduler get-config` and `scheduler set-config` commands to the operator CLI +``` + +```release-note:improvement +core: Added the ability to pause and un-pause the eval broker and blocked eval broker +``` diff --git a/website/content/api-docs/operator/scheduler.mdx b/website/content/api-docs/operator/scheduler.mdx index d007c5ce589..3cf9d81e6a3 100644 --- a/website/content/api-docs/operator/scheduler.mdx +++ b/website/content/api-docs/operator/scheduler.mdx @@ -40,18 +40,20 @@ $ curl \ "Index": 5, "KnownLeader": true, "LastContact": 0, + "NextToken": "", "SchedulerConfig": { "CreateIndex": 5, + "MemoryOversubscriptionEnabled": false, "ModifyIndex": 5, - "SchedulerAlgorithm": "spread", - "MemoryOversubscriptionEnabled": true, - "RejectJobRegistration": false, + "PauseEvalBroker": false, "PreemptionConfig": { - "SystemSchedulerEnabled": true, - "SysBatchSchedulerEnabled": false, "BatchSchedulerEnabled": false, - "ServiceSchedulerEnabled": false - } + "ServiceSchedulerEnabled": false, + "SysBatchSchedulerEnabled": false, + "SystemSchedulerEnabled": true + }, + "RejectJobRegistration": false, + "SchedulerAlgorithm": "binpack" } } ``` @@ -64,9 +66,23 @@ $ curl \ - `SchedulerConfig` `(SchedulerConfig)` - The returned `SchedulerConfig` object has configuration settings mentioned below. - - `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler binpacks or spreads allocations on available nodes. + - `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler + binpacks or spreads allocations on available nodes. - - `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When `true`, tasks may exceed their reserved memory limit, if the client has excess memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max) to take advantage of memory oversubscription. + - `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When + `true`, tasks may exceed their reserved memory limit, if the client has excess + memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max) + to take advantage of memory oversubscription. + + - `RejectJobRegistration` `(bool: false)` - When `true`, the server will return + permission denied errors for job registration, job dispatch, and job scale APIs, + unless the ACL token for the request is a management token. If ACLs are disabled, + no user will be able to register jobs. This allows operators to shed load from + automated processes during incident response. + + - `PauseEvalBroker` `(bool: false)` - When set to `true`, the eval broker which + usually runs on the leader will be disabled. This will prevent the scheduler + workers from receiving new work. - `PreemptionConfig` `(PreemptionConfig)` - Options to enable preemption for various schedulers. @@ -120,6 +136,7 @@ server state is authoritative. "SchedulerAlgorithm": "spread", "MemoryOversubscriptionEnabled": false, "RejectJobRegistration": false, + "PauseEvalBroker": false, "PreemptionConfig": { "SystemSchedulerEnabled": true, "SysBatchSchedulerEnabled": false, @@ -133,9 +150,20 @@ server state is authoritative. binpacks or spreads allocations on available nodes. Possible values are `"binpack"` and `"spread"` -- `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When `true`, tasks may exceed their reserved memory limit, if the client has excess memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max) to take advantage of memory oversubscription. +- `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When + `true`, tasks may exceed their reserved memory limit, if the client has excess + memory capacity. Tasks must specify [`memory_max`](/docs/job-specification/resources#memory_max) + to take advantage of memory oversubscription. -- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return permission denied errors for job registration, job dispatch, and job scale APIs, unless the ACL token for the request is a management token. If ACLs are disabled, no user will be able to register jobs. This allows operators to shed load from automated proceses during incident response. +- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return + permission denied errors for job registration, job dispatch, and job scale APIs, + unless the ACL token for the request is a management token. If ACLs are disabled, + no user will be able to register jobs. This allows operators to shed load from + automated processes during incident response. + +- `PauseEvalBroker` `(bool: false)` - When set to `true`, the eval broker which + usually runs on the leader will be disabled. This will prevent the scheduler + workers from receiving new work. - `PreemptionConfig` `(PreemptionConfig)` - Options to enable preemption for various schedulers. @@ -143,9 +171,9 @@ server state is authoritative. - `SystemSchedulerEnabled` `(bool: true)` - Specifies whether preemption for system jobs is enabled. Note that if this is set to true, then system jobs can preempt any other jobs. - - - `SysBatchSchedulerEnabled` `(bool: false)` 1.2 - Specifies - whether preemption for system batch jobs is enabled. Note that if this is + + - `SysBatchSchedulerEnabled` `(bool: false)` 1.2 - Specifies + whether preemption for system batch jobs is enabled. Note that if this is set to true, then system batch jobs can preempt any other jobs. - `BatchSchedulerEnabled` `(bool: false)` - Specifies diff --git a/website/content/docs/commands/operator/index.mdx b/website/content/docs/commands/operator/index.mdx index c800a964353..21043b3063d 100644 --- a/website/content/docs/commands/operator/index.mdx +++ b/website/content/docs/commands/operator/index.mdx @@ -42,6 +42,12 @@ The following subcommands are available: - [`operator raft remove-peer`][remove] - Remove a Nomad server from the Raft configuration +- [`operator scheduler get-config`][scheduler-get-config] - Display the current + scheduler configuration + +- [`operator scheduler set-config`][scheduler-set-config] - Modify the scheduler + configuration + - [`operator snapshot agent`][snapshot-agent] - Inspects a snapshot of the Nomad server state - [`operator snapshot save`][snapshot-save] - Saves a snapshot of the Nomad server state @@ -63,3 +69,5 @@ The following subcommands are available: [snapshot-restore]: /docs/commands/operator/snapshot-restore 'Snapshot Restore command' [snapshot-inspect]: /docs/commands/operator/snapshot-inspect 'Snapshot Inspect command' [snapshot-agent]: /docs/commands/operator/snapshot-agent 'Snapshot Agent command' +[scheduler-get-config]: /docs/commands/operator/scheduler-get-config 'Scheduler Get Config command' +[scheduler-set-config]: /docs/commands/operator/scheduler-set-config 'Scheduler Set Config command' diff --git a/website/content/docs/commands/operator/scheduler-get-config.mdx b/website/content/docs/commands/operator/scheduler-get-config.mdx new file mode 100644 index 00000000000..01a5d20499d --- /dev/null +++ b/website/content/docs/commands/operator/scheduler-get-config.mdx @@ -0,0 +1,47 @@ +--- +layout: docs +page_title: 'Commands: operator scheduler get-config' +description: | + Display the current scheduler configuration. +--- + +# Command: operator scheduler get-config + +The scheduler operator get-config command is used to view the current scheduler +configuration. + +## Usage + +```plaintext +nomad operator scheduler get-config [options] +``` + +If ACLs are enabled, this command requires a token with the `operator:read` +capability. + +## General Options + +@include 'general_options_no_namespace.mdx' + +## Get Config Options + +- `-json`: Output the scheduler config in its JSON format. + +- `-t`: Format and display the scheduler config using a Go template. + +## Examples + +Display the current scheduler configuration: + +```shell-session +$ nomad operator scheduler get-config +Scheduler Algorithm = binpack +Memory Oversubscription = false +Reject Job Registration = false +Pause Eval Broker = false +Preemption System Scheduler = true +Preemption Service Scheduler = false +Preemption Batch Scheduler = false +Preemption SysBatch Scheduler = false +Modify Index = 5 +``` diff --git a/website/content/docs/commands/operator/scheduler-set-config.mdx b/website/content/docs/commands/operator/scheduler-set-config.mdx new file mode 100644 index 00000000000..74b7f81750c --- /dev/null +++ b/website/content/docs/commands/operator/scheduler-set-config.mdx @@ -0,0 +1,82 @@ +--- +layout: docs +page_title: 'Commands: operator scheduler set-config' +description: | + Modify the scheduler configuration. +--- + +# Command: operator scheduler set-config + +The scheduler operator set-config command is used to modify the scheduler +configuration. + +## Usage + +```plaintext +nomad operator scheduler set-config [options] +``` + +If ACLs are enabled, this command requires a token with the `operator:write` +capability. + +## General Options + +@include 'general_options_no_namespace.mdx' + +## Set Config Options + +- `-check-index` - If set, the scheduler config is only updated if the passed + modify index matches the current server side version. If a non-zero value is + passed, it ensures that the scheduler config is being updated from a known + state. + +- `-scheduler-algorithm` - Specifies whether scheduler binpacks or spreads + allocations on available nodes. Must be one of `["binpack"|"spread"]`. + +- `-memory-oversubscription` - When true, tasks may exceed their reserved memory + limit, if the client has excess memory capacity. Tasks must specify [`memory_max`] + to take advantage of memory oversubscription. Must be one of `[true|false]`. + +- `-reject-job-registration` - When true, the server will return permission denied + errors for job registration, job dispatch, and job scale APIs, unless the ACL + token for the request is a management token. If ACLs are disabled, no user + will be able to register jobs. This allows operators to shed load from automated + processes during incident response. Must be one of `[true|false]`. + +- `-pause-eval-broker` - When set to true, the eval broker which usually runs on + the leader will be disabled. This will prevent the scheduler workers from + receiving new work. Must be one of `[true|false]`. + +- `-preempt-batch-scheduler` - Specifies whether preemption for batch jobs + is enabled. Note that if this is set to true, then batch jobs can preempt any + other jobs. Must be one of `[true|false]`. + +- `-preempt-service-scheduler` - Specifies whether preemption for service jobs + is enabled. Note that if this is set to true, then service jobs can preempt any + other jobs. Must be one of `[true|false]`. + +- `-preempt-sysbatch-scheduler` - Specifies whether preemption for system batch + jobs is enabled. Note that if this is set to true, then system batch jobs can + preempt any other jobs. Must be one of `[true|false]`. + +- `-preempt-system-scheduler` - Specifies whether preemption for system jobs + is enabled. Note that if this is set to true, then system jobs can preempt any + other jobs. Must be one of `[true|false]`. + +## Examples + +Modify the scheduler algorithm to spread: + +```shell-session +$ nomad operator scheduler set-config -scheduler-algorithm=spread +Scheduler configuration updated! +``` + +Modify the scheduler algorithm to spread using the check index flag: + +```shell-session +$ nomad operator scheduler set-config -scheduler-algorithm=spread -check-index=5 +Scheduler configuration updated! +``` + +[`memory_max`]: /docs/job-specification/resources#memory_max diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index 588aaa33bc5..a3a64ca4679 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -311,11 +311,10 @@ job-type schedulers. ```hcl server { default_scheduler_config { - scheduler_algorithm = "spread" - + scheduler_algorithm = "spread" memory_oversubscription_enabled = true - - reject_job_registration = false + reject_job_registration = false + pause_eval_broker = false # New in Nomad 1.3.2 preemption_config { batch_scheduler_enabled = true diff --git a/website/data/docs-nav-data.json b/website/data/docs-nav-data.json index 52e6d4f253e..50199de2a2e 100644 --- a/website/data/docs-nav-data.json +++ b/website/data/docs-nav-data.json @@ -585,6 +585,14 @@ "title": "raft state", "path": "commands/operator/raft-state" }, + { + "title": "scheduler get-config", + "path": "commands/operator/scheduler-get-config" + }, + { + "title": "scheduler set-config", + "path": "commands/operator/scheduler-set-config" + }, { "title": "snapshot agent", "path": "commands/operator/snapshot-agent" From 71939c7bd8b179dc48ae54f9330a32abbbedcb0c Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 15:41:13 +0100 Subject: [PATCH 07/11] core: add eval delete RPC and core functionality. --- client/client_test.go | 2 +- nomad/core_sched.go | 23 +- nomad/core_sched_test.go | 13 +- nomad/eval_endpoint.go | 147 +++++++- nomad/eval_endpoint_test.go | 351 +++++++++++++++++++- nomad/fsm.go | 4 +- nomad/fsm_test.go | 2 +- nomad/node_endpoint_test.go | 2 +- nomad/rpc_test.go | 2 +- nomad/state/state_store.go | 21 +- nomad/state/state_store_test.go | 45 ++- nomad/structs/eval.go | 24 ++ nomad/structs/structs.go | 14 +- nomad/structs/uuid.go | 7 + nomad/volumewatcher/volumes_watcher_test.go | 2 +- 15 files changed, 621 insertions(+), 38 deletions(-) create mode 100644 nomad/structs/eval.go create mode 100644 nomad/structs/uuid.go diff --git a/client/client_test.go b/client/client_test.go index 55c057e0283..8d7fa6c435c 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -507,7 +507,7 @@ func TestClient_WatchAllocs(t *testing.T) { }) // Delete one allocation - if err := state.DeleteEval(103, nil, []string{alloc1.ID}); err != nil { + if err := state.DeleteEval(103, nil, []string{alloc1.ID}, false); err != nil { t.Fatalf("err: %v", err) } diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 61e77b3ef5a..ff8605e02c1 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -14,13 +14,6 @@ import ( "github.com/hashicorp/nomad/scheduler" ) -var ( - // maxIdsPerReap is the maximum number of evals and allocations to reap in a - // single Raft transaction. This is to ensure that the Raft message does not - // become too large. - maxIdsPerReap = (1024 * 256) / 36 // 0.25 MB of ids. -) - // CoreScheduler is a special "scheduler" that is registered // as "_core". It is used to run various administrative work // across the cluster. @@ -193,7 +186,7 @@ func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string) }, } requests = append(requests, req) - available := maxIdsPerReap + available := structs.MaxUUIDsPerWriteRequest if remaining := len(jobs) - submittedJobs; remaining > 0 { if remaining <= available { @@ -359,20 +352,20 @@ func (c *CoreScheduler) evalReap(evals, allocs []string) error { return nil } -// partitionEvalReap returns a list of EvalDeleteRequest to make, ensuring a single +// partitionEvalReap returns a list of EvalReapRequest to make, ensuring a single // request does not contain too many allocations and evaluations. This is // necessary to ensure that the Raft transaction does not become too large. -func (c *CoreScheduler) partitionEvalReap(evals, allocs []string) []*structs.EvalDeleteRequest { - var requests []*structs.EvalDeleteRequest +func (c *CoreScheduler) partitionEvalReap(evals, allocs []string) []*structs.EvalReapRequest { + var requests []*structs.EvalReapRequest submittedEvals, submittedAllocs := 0, 0 for submittedEvals != len(evals) || submittedAllocs != len(allocs) { - req := &structs.EvalDeleteRequest{ + req := &structs.EvalReapRequest{ WriteRequest: structs.WriteRequest{ Region: c.srv.config.Region, }, } requests = append(requests, req) - available := maxIdsPerReap + available := structs.MaxUUIDsPerWriteRequest // Add the allocs first if remaining := len(allocs) - submittedAllocs; remaining > 0 { @@ -484,7 +477,7 @@ func (c *CoreScheduler) nodeReap(eval *structs.Evaluation, nodeIDs []string) err } // Call to the leader to issue the reap - for _, ids := range partitionAll(maxIdsPerReap, nodeIDs) { + for _, ids := range partitionAll(structs.MaxUUIDsPerWriteRequest, nodeIDs) { req := structs.NodeBatchDeregisterRequest{ NodeIDs: ids, WriteRequest: structs.WriteRequest{ @@ -584,7 +577,7 @@ func (c *CoreScheduler) partitionDeploymentReap(deployments []string) []*structs }, } requests = append(requests, req) - available := maxIdsPerReap + available := structs.MaxUUIDsPerWriteRequest if remaining := len(deployments) - submittedDeployments; remaining > 0 { if remaining <= available { diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index aa2d968fd1f..76aa6eb29b9 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -1852,7 +1852,7 @@ func TestCoreScheduler_PartitionEvalReap(t *testing.T) { core := NewCoreScheduler(s1, snap) // Set the max ids per reap to something lower. - maxIdsPerReap = 2 + structs.MaxUUIDsPerWriteRequest = 2 evals := []string{"a", "b", "c"} allocs := []string{"1", "2", "3"} @@ -1895,7 +1895,7 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) { core := NewCoreScheduler(s1, snap) // Set the max ids per reap to something lower. - maxIdsPerReap = 2 + structs.MaxUUIDsPerWriteRequest = 2 deployments := []string{"a", "b", "c"} requests := core.(*CoreScheduler).partitionDeploymentReap(deployments) @@ -1915,7 +1915,6 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) { } func TestCoreScheduler_PartitionJobReap(t *testing.T) { - ci.Parallel(t) s1, cleanupS1 := TestServer(t, nil) defer cleanupS1() @@ -1929,7 +1928,11 @@ func TestCoreScheduler_PartitionJobReap(t *testing.T) { core := NewCoreScheduler(s1, snap) // Set the max ids per reap to something lower. - maxIdsPerReap = 2 + originalMaxUUIDsPerWriteRequest := structs.MaxUUIDsPerWriteRequest + structs.MaxUUIDsPerWriteRequest = 2 + defer func() { + structs.MaxUUIDsPerWriteRequest = originalMaxUUIDsPerWriteRequest + }() jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()} requests := core.(*CoreScheduler).partitionJobReap(jobs, "") @@ -2385,7 +2388,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { require.NoError(t, err) index, _ = store.LatestIndex() index++ - err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID}) + err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID}, false) require.NoError(t, err) // Create a core scheduler and attempt the volume claim GC diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 43b4d8081e0..111ccf81ba6 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -1,6 +1,7 @@ package nomad import ( + "errors" "fmt" "net/http" "time" @@ -391,7 +392,7 @@ func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericRe } // Reap is used to cleanup dead evaluations and allocations -func (e *Eval) Reap(args *structs.EvalDeleteRequest, +func (e *Eval) Reap(args *structs.EvalReapRequest, reply *structs.GenericResponse) error { // Ensure the connection was initiated by another server if TLS is used. @@ -416,6 +417,150 @@ func (e *Eval) Reap(args *structs.EvalDeleteRequest, return nil } +// Delete is used by operators to delete evaluations during severe outages. It +// differs from Reap while duplicating some behavior to ensure we have the +// correct controls for user initiated deletions. +func (e *Eval) Delete( + args *structs.EvalDeleteRequest, + reply *structs.EvalDeleteResponse) error { + + if done, err := e.srv.forward(structs.EvalDeleteRPCMethod, args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "eval", "delete"}, time.Now()) + + // This RPC endpoint is very destructive and alters Nomad's core state, + // meaning only those with management tokens can call it. + if aclObj, err := e.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.IsManagement() { + return structs.ErrPermissionDenied + } + + // The eval broker must be disabled otherwise Nomad's state will likely get + // wild in a very un-fun way. + if e.srv.evalBroker.Enabled() { + return errors.New("eval broker is enabled; eval broker must be paused to delete evals") + } + + // Grab the state snapshot, so we can look up relevant eval information. + serverStateSnapshot, err := e.srv.State().Snapshot() + if err != nil { + return fmt.Errorf("failed to lookup state snapshot: %v", err) + } + ws := memdb.NewWatchSet() + + // Iterate the evaluations and ensure they are safe to delete. It is + // possible passed evals are not safe to delete and would make Nomads state + // a little wonky. The nature of the RPC return error, means a single + // unsafe eval ID fails the whole call. + for _, evalID := range args.EvalIDs { + + evalInfo, err := serverStateSnapshot.EvalByID(ws, evalID) + if err != nil { + return fmt.Errorf("failed to lookup eval: %v", err) + } + if evalInfo == nil { + return errors.New("eval not found") + } + + jobInfo, err := serverStateSnapshot.JobByID(ws, evalInfo.Namespace, evalInfo.JobID) + if err != nil { + return fmt.Errorf("failed to lookup eval job: %v", err) + } + + allocs, err := serverStateSnapshot.AllocsByEval(ws, evalInfo.ID) + if err != nil { + return fmt.Errorf("failed to lookup eval allocs: %v", err) + } + + if !evalDeleteSafe(allocs, jobInfo) { + return fmt.Errorf("eval %s is not safe to delete", evalID) + } + } + + // Generate the Raft request object using the reap request object. This + // avoids adding new Raft messages types and follows the existing reap + // flow. + raftReq := structs.EvalReapRequest{ + Evals: args.EvalIDs, + UserInitiated: true, + WriteRequest: args.WriteRequest, + } + + // Update via Raft. + _, index, err := e.srv.raftApply(structs.EvalDeleteRequestType, &raftReq) + if err != nil { + return err + } + + // Update the index and return. + reply.Index = index + return nil +} + +// evalDeleteSafe ensures an evaluation is safe to delete based on its related +// allocation and job information. This follows similar, but different rules to +// the eval reap checking, to ensure evaluations for running allocs or allocs +// which need the evaluation detail are not deleted. +func evalDeleteSafe(allocs []*structs.Allocation, job *structs.Job) bool { + + // If the job is deleted, stopped, or dead, all allocs are terminal and + // the eval can be deleted. + if job == nil || job.Stop || job.Status == structs.JobStatusDead { + return true + } + + // Iterate the allocations associated to the eval, if any, and check + // whether we can delete the eval. + for _, alloc := range allocs { + + // If the allocation is still classed as running on the client, or + // might be, we can't delete. + switch alloc.ClientStatus { + case structs.AllocClientStatusRunning, structs.AllocClientStatusUnknown: + return false + } + + // If the alloc hasn't failed then we don't need to consider it for + // rescheduling. Rescheduling needs to copy over information from the + // previous alloc so that it can enforce the reschedule policy. + if alloc.ClientStatus != structs.AllocClientStatusFailed { + continue + } + + var reschedulePolicy *structs.ReschedulePolicy + tg := job.LookupTaskGroup(alloc.TaskGroup) + + if tg != nil { + reschedulePolicy = tg.ReschedulePolicy + } + + // No reschedule policy or rescheduling is disabled + if reschedulePolicy == nil || (!reschedulePolicy.Unlimited && reschedulePolicy.Attempts == 0) { + continue + } + + // The restart tracking information has not been carried forward. + if alloc.NextAllocation == "" { + return false + } + + // This task has unlimited rescheduling and the alloc has not been + // replaced, so we can't delete the eval yet. + if reschedulePolicy.Unlimited { + return false + } + + // No restarts have been attempted yet. + if alloc.RescheduleTracker == nil || len(alloc.RescheduleTracker.Events) == 0 { + return false + } + } + + return true +} + // List is used to get a list of the evaluations in the system func (e *Eval) List(args *structs.EvalListRequest, reply *structs.EvalListResponse) error { if done, err := e.srv.forward("Eval.List", args, args, reply); done { diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 1a67d0e6a47..bf05ce4d91a 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -202,7 +202,7 @@ func TestEvalEndpoint_GetEval_Blocking(t *testing.T) { // Eval delete triggers watches time.AfterFunc(100*time.Millisecond, func() { - err := state.DeleteEval(300, []string{eval2.ID}, []string{}) + err := state.DeleteEval(300, []string{eval2.ID}, []string{}, false) if err != nil { t.Fatalf("err: %v", err) } @@ -691,7 +691,7 @@ func TestEvalEndpoint_Reap(t *testing.T) { s1.fsm.State().UpsertEvals(structs.MsgTypeTestSetup, 1000, []*structs.Evaluation{eval1}) // Reap the eval - get := &structs.EvalDeleteRequest{ + get := &structs.EvalReapRequest{ Evals: []string{eval1.ID}, WriteRequest: structs.WriteRequest{Region: "global"}, } @@ -714,6 +714,351 @@ func TestEvalEndpoint_Reap(t *testing.T) { } } +func TestEvalEndpoint_Delete(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + testFn func() + name string + }{ + { + testFn: func() { + + testServer, testServerCleanup := TestServer(t, nil) + defer testServerCleanup() + + codec := rpcClient(t, testServer) + testutil.WaitForLeader(t, testServer.RPC) + + // Create and upsert an evaluation. + mockEval := mock.Eval() + require.NoError(t, testServer.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 10, []*structs.Evaluation{mockEval})) + + // Attempt to delete the eval, which should fail because the + // eval broker is not paused. + get := &structs.EvalDeleteRequest{ + EvalIDs: []string{mockEval.ID}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.EvalDeleteResponse + err := msgpackrpc.CallWithCodec(codec, structs.EvalDeleteRPCMethod, get, &resp) + require.Contains(t, err.Error(), "eval broker is enabled") + }, + name: "unsuccessful delete broker enabled", + }, + { + testFn: func() { + + testServer, testServerCleanup := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer testServerCleanup() + + codec := rpcClient(t, testServer) + testutil.WaitForLeader(t, testServer.RPC) + + // Pause the eval broker and update the scheduler config. + testServer.evalBroker.SetEnabled(false) + + _, schedulerConfig, err := testServer.fsm.State().SchedulerConfig() + require.NoError(t, err) + require.NotNil(t, schedulerConfig) + + schedulerConfig.PauseEvalBroker = true + require.NoError(t, testServer.fsm.State().SchedulerSetConfig(10, schedulerConfig)) + + // Create and upsert an evaluation. + mockEval := mock.Eval() + require.NoError(t, testServer.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 10, []*structs.Evaluation{mockEval})) + + // Attempt to delete the eval, which should succeed as the eval + // broker is disabled. + get := &structs.EvalDeleteRequest{ + EvalIDs: []string{mockEval.ID}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.EvalDeleteResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, structs.EvalDeleteRPCMethod, get, &resp)) + + // Attempt to read the eval from state; this should not be found. + ws := memdb.NewWatchSet() + respEval, err := testServer.fsm.State().EvalByID(ws, mockEval.ID) + require.Nil(t, err) + require.Nil(t, respEval) + }, + name: "successful delete without ACLs", + }, + { + testFn: func() { + + testServer, rootToken, testServerCleanup := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer testServerCleanup() + + codec := rpcClient(t, testServer) + testutil.WaitForLeader(t, testServer.RPC) + + // Pause the eval broker and update the scheduler config. + testServer.evalBroker.SetEnabled(false) + + _, schedulerConfig, err := testServer.fsm.State().SchedulerConfig() + require.NoError(t, err) + require.NotNil(t, schedulerConfig) + + schedulerConfig.PauseEvalBroker = true + require.NoError(t, testServer.fsm.State().SchedulerSetConfig(10, schedulerConfig)) + + // Create and upsert an evaluation. + mockEval := mock.Eval() + require.NoError(t, testServer.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 20, []*structs.Evaluation{mockEval})) + + // Attempt to delete the eval, which should succeed as the eval + // broker is disabled, and we are using a management token. + get := &structs.EvalDeleteRequest{ + EvalIDs: []string{mockEval.ID}, + WriteRequest: structs.WriteRequest{ + AuthToken: rootToken.SecretID, + Region: "global", + }, + } + var resp structs.EvalDeleteResponse + require.NoError(t, msgpackrpc.CallWithCodec(codec, structs.EvalDeleteRPCMethod, get, &resp)) + + // Attempt to read the eval from state; this should not be found. + ws := memdb.NewWatchSet() + respEval, err := testServer.fsm.State().EvalByID(ws, mockEval.ID) + require.Nil(t, err) + require.Nil(t, respEval) + }, + name: "successful delete with ACLs", + }, + { + testFn: func() { + + testServer, _, testServerCleanup := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer testServerCleanup() + + codec := rpcClient(t, testServer) + testutil.WaitForLeader(t, testServer.RPC) + + // Pause the eval broker. + testServer.evalBroker.SetEnabled(false) + + // Create and upsert an evaluation. + mockEval := mock.Eval() + require.NoError(t, testServer.fsm.State().UpsertEvals( + structs.MsgTypeTestSetup, 10, []*structs.Evaluation{mockEval})) + + nonMgntToken := mock.CreatePolicyAndToken(t, testServer.State(), 20, "test-valid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob})) + + // Attempt to delete the eval, which should not succeed as we + // are using a non-management token. + get := &structs.EvalDeleteRequest{ + EvalIDs: []string{mockEval.ID}, + WriteRequest: structs.WriteRequest{ + AuthToken: nonMgntToken.SecretID, + Region: "global", + }, + } + var resp structs.EvalDeleteResponse + err := msgpackrpc.CallWithCodec(codec, structs.EvalDeleteRPCMethod, get, &resp) + require.Contains(t, err.Error(), structs.ErrPermissionDenied.Error()) + }, + name: "unsuccessful delete with ACLs incorrect token permissions", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.testFn() + }) + } +} + +func Test_evalDeleteSafe(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + inputAllocs []*structs.Allocation + inputJob *structs.Job + expectedResult bool + name string + }{ + { + inputAllocs: nil, + inputJob: nil, + expectedResult: true, + name: "job not in state", + }, + { + inputAllocs: nil, + inputJob: &structs.Job{Status: structs.JobStatusDead}, + expectedResult: true, + name: "job stopped", + }, + { + inputAllocs: nil, + inputJob: &structs.Job{Stop: true}, + expectedResult: true, + name: "job dead", + }, + { + inputAllocs: []*structs.Allocation{}, + inputJob: &structs.Job{Status: structs.JobStatusRunning}, + expectedResult: true, + name: "no allocs for eval", + }, + { + inputAllocs: []*structs.Allocation{ + {ClientStatus: structs.AllocClientStatusComplete}, + {ClientStatus: structs.AllocClientStatusRunning}, + }, + inputJob: &structs.Job{Status: structs.JobStatusRunning}, + expectedResult: false, + name: "running alloc for eval", + }, + { + inputAllocs: []*structs.Allocation{ + {ClientStatus: structs.AllocClientStatusComplete}, + {ClientStatus: structs.AllocClientStatusUnknown}, + }, + inputJob: &structs.Job{Status: structs.JobStatusRunning}, + expectedResult: false, + name: "unknown alloc for eval", + }, + { + inputAllocs: []*structs.Allocation{ + {ClientStatus: structs.AllocClientStatusComplete}, + {ClientStatus: structs.AllocClientStatusLost}, + }, + inputJob: &structs.Job{Status: structs.JobStatusRunning}, + expectedResult: true, + name: "complete and lost allocs for eval", + }, + { + inputAllocs: []*structs.Allocation{ + { + ClientStatus: structs.AllocClientStatusFailed, + TaskGroup: "test", + }, + }, + inputJob: &structs.Job{ + Status: structs.JobStatusPending, + TaskGroups: []*structs.TaskGroup{ + { + Name: "test", + ReschedulePolicy: nil, + }, + }, + }, + expectedResult: true, + name: "failed alloc job without reschedule", + }, + { + inputAllocs: []*structs.Allocation{ + { + ClientStatus: structs.AllocClientStatusFailed, + TaskGroup: "test", + }, + }, + inputJob: &structs.Job{ + Status: structs.JobStatusPending, + TaskGroups: []*structs.TaskGroup{ + { + Name: "test", + ReschedulePolicy: &structs.ReschedulePolicy{ + Unlimited: false, + Attempts: 0, + }, + }, + }, + }, + expectedResult: true, + name: "failed alloc job reschedule disabled", + }, + { + inputAllocs: []*structs.Allocation{ + { + ClientStatus: structs.AllocClientStatusFailed, + TaskGroup: "test", + }, + }, + inputJob: &structs.Job{ + Status: structs.JobStatusPending, + TaskGroups: []*structs.TaskGroup{ + { + Name: "test", + ReschedulePolicy: &structs.ReschedulePolicy{ + Unlimited: false, + Attempts: 3, + }, + }, + }, + }, + expectedResult: false, + name: "failed alloc next alloc not set", + }, + { + inputAllocs: []*structs.Allocation{ + { + ClientStatus: structs.AllocClientStatusFailed, + TaskGroup: "test", + NextAllocation: "4aa4930a-8749-c95b-9c67-5ef29b0fc653", + }, + }, + inputJob: &structs.Job{ + Status: structs.JobStatusPending, + TaskGroups: []*structs.TaskGroup{ + { + Name: "test", + ReschedulePolicy: &structs.ReschedulePolicy{ + Unlimited: false, + Attempts: 3, + }, + }, + }, + }, + expectedResult: false, + name: "failed alloc next alloc set", + }, + { + inputAllocs: []*structs.Allocation{ + { + ClientStatus: structs.AllocClientStatusFailed, + TaskGroup: "test", + }, + }, + inputJob: &structs.Job{ + Status: structs.JobStatusPending, + TaskGroups: []*structs.TaskGroup{ + { + Name: "test", + ReschedulePolicy: &structs.ReschedulePolicy{ + Unlimited: true, + }, + }, + }, + }, + expectedResult: false, + name: "failed alloc job reschedule unlimited", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualResult := evalDeleteSafe(tc.inputAllocs, tc.inputJob) + require.Equal(t, tc.expectedResult, actualResult) + }) + } +} + func TestEvalEndpoint_List(t *testing.T) { ci.Parallel(t) @@ -1002,7 +1347,7 @@ func TestEvalEndpoint_List_Blocking(t *testing.T) { // Eval deletion triggers watches time.AfterFunc(100*time.Millisecond, func() { - if err := state.DeleteEval(3, []string{eval.ID}, nil); err != nil { + if err := state.DeleteEval(3, []string{eval.ID}, nil, false); err != nil { t.Fatalf("err: %v", err) } }) diff --git a/nomad/fsm.go b/nomad/fsm.go index 4bac03bef9e..e686e211b0a 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -782,12 +782,12 @@ func (n *nomadFSM) handleUpsertedEval(eval *structs.Evaluation) { func (n *nomadFSM) applyDeleteEval(buf []byte, index uint64) interface{} { defer metrics.MeasureSince([]string{"nomad", "fsm", "delete_eval"}, time.Now()) - var req structs.EvalDeleteRequest + var req structs.EvalReapRequest if err := structs.Decode(buf, &req); err != nil { panic(fmt.Errorf("failed to decode request: %v", err)) } - if err := n.state.DeleteEval(index, req.Evals, req.Allocs); err != nil { + if err := n.state.DeleteEval(index, req.Evals, req.Allocs, req.UserInitiated); err != nil { n.logger.Error("DeleteEval failed", "error", err) return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 153b9d400d7..68f2a383f4e 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1160,7 +1160,7 @@ func TestFSM_DeleteEval(t *testing.T) { t.Fatalf("resp: %v", resp) } - req2 := structs.EvalDeleteRequest{ + req2 := structs.EvalReapRequest{ Evals: []string{eval.ID}, } buf, err = structs.Encode(structs.EvalDeleteRequestType, req2) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 3232eb92333..b0e35ad283d 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2265,7 +2265,7 @@ func TestClientEndpoint_GetClientAllocs_Blocking_GC(t *testing.T) { // Delete an allocation time.AfterFunc(100*time.Millisecond, func() { - assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID})) + assert.Nil(state.DeleteEval(200, nil, []string{alloc2.ID}, false)) }) req.QueryOptions.MinQueryIndex = 150 diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index 5105ac854eb..61397547cb4 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -1141,7 +1141,7 @@ func TestRPC_TLS_Enforcement_RPC(t *testing.T) { "Eval.Reblock": &structs.EvalUpdateRequest{ WriteRequest: structs.WriteRequest{Region: "global"}, }, - "Eval.Reap": &structs.EvalDeleteRequest{ + "Eval.Reap": &structs.EvalReapRequest{ WriteRequest: structs.WriteRequest{Region: "global"}, }, "Plan.Submit": &structs.PlanRequest{ diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6c5fabdda05..661586f5289 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2,6 +2,7 @@ package state import ( "context" + "errors" "fmt" "reflect" "sort" @@ -3107,10 +3108,22 @@ func (s *StateStore) updateEvalModifyIndex(txn *txn, index uint64, evalID string } // DeleteEval is used to delete an evaluation -func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error { +func (s *StateStore) DeleteEval(index uint64, evals, allocs []string, userInitiated bool) error { txn := s.db.WriteTxn(index) defer txn.Abort() + // If this deletion has been initiated by an operator, ensure the eval + // broker is paused. + if userInitiated { + _, schedConfig, err := s.schedulerConfigTxn(txn) + if err != nil { + return err + } + if schedConfig == nil || !schedConfig.PauseEvalBroker { + return errors.New("eval broker is enabled; eval broker must be paused to delete evals") + } + } + jobs := make(map[structs.NamespacedID]string, len(evals)) // evalsTableUpdated and allocsTableUpdated allow us to track whether each @@ -5890,9 +5903,13 @@ func expiredOneTimeTokenFilter(now time.Time) func(interface{}) bool { func (s *StateStore) SchedulerConfig() (uint64, *structs.SchedulerConfiguration, error) { tx := s.db.ReadTxn() defer tx.Abort() + return s.schedulerConfigTxn(tx) +} + +func (s *StateStore) schedulerConfigTxn(txn *txn) (uint64, *structs.SchedulerConfiguration, error) { // Get the scheduler config - c, err := tx.First("scheduler_config", "id") + c, err := txn.First("scheduler_config", "id") if err != nil { return 0, nil, fmt.Errorf("failed scheduler config lookup: %s", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index bdf7af29355..c6743dd4fe2 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -4233,7 +4233,7 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) { t.Fatalf("err: %v", err) } - err = state.DeleteEval(1002, []string{eval1.ID, eval2.ID}, []string{alloc1.ID, alloc2.ID}) + err = state.DeleteEval(1002, []string{eval1.ID, eval2.ID}, []string{alloc1.ID, alloc2.ID}, false) if err != nil { t.Fatalf("err: %v", err) } @@ -4304,7 +4304,7 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) { // Call the eval delete function with zero length eval and alloc ID arrays. // This should result in the table indexes both staying the same, rather // than updating without cause. - require.NoError(t, state.DeleteEval(1010, []string{}, []string{})) + require.NoError(t, state.DeleteEval(1010, []string{}, []string{}, false)) allocsIndex, err := state.Index("allocs") require.NoError(t, err) @@ -4354,7 +4354,7 @@ func TestStateStore_DeleteEval_ChildJob(t *testing.T) { t.Fatalf("bad: %v", err) } - err = state.DeleteEval(1002, []string{eval1.ID}, []string{alloc1.ID}) + err = state.DeleteEval(1002, []string{eval1.ID}, []string{alloc1.ID}, false) if err != nil { t.Fatalf("err: %v", err) } @@ -4386,6 +4386,45 @@ func TestStateStore_DeleteEval_ChildJob(t *testing.T) { } } +func TestStateStore_DeleteEval_UserInitiated(t *testing.T) { + ci.Parallel(t) + + testState := testStateStore(t) + + // Upsert a scheduler config object, so we have something to check and + // modify. + schedulerConfig := structs.SchedulerConfiguration{PauseEvalBroker: false} + require.NoError(t, testState.SchedulerSetConfig(10, &schedulerConfig)) + + // Generate some mock evals and upsert these into state. + mockEval1 := mock.Eval() + mockEval2 := mock.Eval() + require.NoError(t, testState.UpsertEvals( + structs.MsgTypeTestSetup, 20, []*structs.Evaluation{mockEval1, mockEval2})) + + mockEvalIDs := []string{mockEval1.ID, mockEval2.ID} + + // Try and delete the evals without pausing the eval broker. + err := testState.DeleteEval(30, mockEvalIDs, []string{}, true) + require.ErrorContains(t, err, "eval broker is enabled") + + // Pause the eval broker on the scheduler config, and try deleting the + // evals again. + schedulerConfig.PauseEvalBroker = true + require.NoError(t, testState.SchedulerSetConfig(30, &schedulerConfig)) + + require.NoError(t, testState.DeleteEval(40, mockEvalIDs, []string{}, true)) + + ws := memdb.NewWatchSet() + mockEval1Lookup, err := testState.EvalByID(ws, mockEval1.ID) + require.NoError(t, err) + require.Nil(t, mockEval1Lookup) + + mockEval2Lookup, err := testState.EvalByID(ws, mockEval1.ID) + require.NoError(t, err) + require.Nil(t, mockEval2Lookup) +} + func TestStateStore_EvalsByJob(t *testing.T) { ci.Parallel(t) diff --git a/nomad/structs/eval.go b/nomad/structs/eval.go new file mode 100644 index 00000000000..cb74a3e3e5e --- /dev/null +++ b/nomad/structs/eval.go @@ -0,0 +1,24 @@ +package structs + +const ( + // EvalDeleteRPCMethod is the RPC method for batch deleting evaluations + // using their IDs. + // + // Args: EvalDeleteRequest + // Reply: EvalDeleteResponse + EvalDeleteRPCMethod = "Eval.Delete" +) + +// EvalDeleteRequest is the request object used when operators are manually +// deleting evaluations. The number of evaluation IDs within the request must +// not be greater than MaxEvalIDsPerDeleteRequest. +type EvalDeleteRequest struct { + EvalIDs []string + WriteRequest +} + +// EvalDeleteResponse is the response object when one or more evaluation are +// deleted manually by an operator. +type EvalDeleteResponse struct { + WriteMeta +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b186584724c..1304ddd4396 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -822,10 +822,20 @@ type EvalUpdateRequest struct { WriteRequest } -// EvalDeleteRequest is used for deleting an evaluation. -type EvalDeleteRequest struct { +// EvalReapRequest is used for reaping evaluations and allocation. This struct +// is used by the Eval.Reap RPC endpoint as a request argument, and also when +// performing eval reap or deletes via Raft. This is because Eval.Reap and +// Eval.Delete use the same Raft message when performing deletes so we do not +// need more Raft message types. +type EvalReapRequest struct { Evals []string Allocs []string + + // UserInitiated tracks whether this reap request is the result of an + // operator request. If this is true, the FSM needs to ensure the eval + // broker is paused as the request can include non-terminal allocations. + UserInitiated bool + WriteRequest } diff --git a/nomad/structs/uuid.go b/nomad/structs/uuid.go new file mode 100644 index 00000000000..f983fd7b491 --- /dev/null +++ b/nomad/structs/uuid.go @@ -0,0 +1,7 @@ +package structs + +// MaxUUIDsPerWriteRequest is the maximum number of UUIDs that can be included +// within a single write request. This is to ensure that the Raft message does +// not become too large. The resulting value corresponds to 0.25MB of IDs or +// 7282 UUID strings. +var MaxUUIDsPerWriteRequest = (1024 * 256) / 36 diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 97a83a9b538..dc4c60ed833 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -108,7 +108,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { // allocation is now invalid index++ - err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID}) + err = srv.State().DeleteEval(index, []string{}, []string{alloc.ID}, false) require.NoError(t, err) // emit a GC so that we have a volume change that's dropped From 5b2565c3c663498f40222151fb5af4bb11810939 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 15:41:31 +0100 Subject: [PATCH 08/11] agent: add eval delete HTTP endpoint. --- command/agent/eval_endpoint.go | 48 +++++++++- command/agent/eval_endpoint_test.go | 131 ++++++++++++++++++++++++++++ 2 files changed, 177 insertions(+), 2 deletions(-) diff --git a/command/agent/eval_endpoint.go b/command/agent/eval_endpoint.go index 1be0e24baa4..72e8ae8ba2b 100644 --- a/command/agent/eval_endpoint.go +++ b/command/agent/eval_endpoint.go @@ -1,16 +1,29 @@ package agent import ( + "fmt" "net/http" "strings" "github.com/hashicorp/nomad/nomad/structs" ) +// EvalsRequest is the entry point for /v1/evaluations and is responsible for +// handling both the listing of evaluations, and the bulk deletion of +// evaluations. The latter is a dangerous operation and should use the +// eval delete command to perform this. func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - if req.Method != "GET" { - return nil, CodedError(405, ErrInvalidMethod) + switch req.Method { + case http.MethodGet: + return s.evalsListRequest(resp, req) + case http.MethodDelete: + return s.evalsDeleteRequest(resp, req) + default: + return nil, CodedError(http.StatusMethodNotAllowed, ErrInvalidMethod) } +} + +func (s *HTTPServer) evalsListRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { args := structs.EvalListRequest{} if s.parse(resp, req, &args.Region, &args.QueryOptions) { @@ -33,6 +46,37 @@ func (s *HTTPServer) EvalsRequest(resp http.ResponseWriter, req *http.Request) ( return out.Evaluations, nil } +func (s *HTTPServer) evalsDeleteRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + + var args structs.EvalDeleteRequest + + if err := decodeBody(req, &args); err != nil { + return nil, CodedError(http.StatusBadRequest, err.Error()) + } + + numIDs := len(args.EvalIDs) + + // Ensure the number of evaluation IDs included in the request is within + // bounds. + if numIDs < 1 { + return nil, CodedError(http.StatusBadRequest, "request does not include any evaluation IDs") + } else if numIDs > structs.MaxUUIDsPerWriteRequest { + return nil, CodedError(http.StatusBadRequest, fmt.Sprintf( + "request includes %v evaluations IDs, must be %v or fewer", + numIDs, structs.MaxUUIDsPerWriteRequest)) + } + + // Pass the write request to populate all meta fields. + s.parseWriteRequest(req, &args.WriteRequest) + + var reply structs.EvalDeleteResponse + if err := s.agent.RPC(structs.EvalDeleteRPCMethod, &args, &reply); err != nil { + return nil, err + } + setIndex(resp, reply.Index) + return nil, nil +} + func (s *HTTPServer) EvalSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { path := strings.TrimPrefix(req.URL.Path, "/v1/evaluation/") switch { diff --git a/command/agent/eval_endpoint_test.go b/command/agent/eval_endpoint_test.go index 2c9665d9105..48eeaaeb758 100644 --- a/command/agent/eval_endpoint_test.go +++ b/command/agent/eval_endpoint_test.go @@ -6,7 +6,9 @@ import ( "net/http/httptest" "testing" + "github.com/hashicorp/nomad/api" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" @@ -110,6 +112,135 @@ func TestHTTP_EvalPrefixList(t *testing.T) { }) } +func TestHTTP_EvalsDelete(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + testFn func() + name string + }{ + { + testFn: func() { + httpTest(t, nil, func(s *TestAgent) { + + // Create an empty request object which doesn't contain any + // eval IDs. + deleteReq := api.EvalDeleteRequest{} + buf := encodeReq(&deleteReq) + + // Generate the HTTP request. + req, err := http.NewRequest(http.MethodDelete, "/v1/evaluations", buf) + require.NoError(t, err) + respW := httptest.NewRecorder() + + // Make the request and check the response. + obj, err := s.Server.EvalsRequest(respW, req) + require.Equal(t, + CodedError(http.StatusBadRequest, "request does not include any evaluation IDs"), err) + require.Nil(t, obj) + }) + }, + name: "too few eval IDs", + }, + { + testFn: func() { + httpTest(t, nil, func(s *TestAgent) { + + deleteReq := api.EvalDeleteRequest{EvalIDs: make([]string, 8000)} + + // Generate a UUID and add it 8000 times to the eval ID + // request array. + evalID := uuid.Generate() + + for i := 0; i < 8000; i++ { + deleteReq.EvalIDs[i] = evalID + } + + buf := encodeReq(&deleteReq) + + // Generate the HTTP request. + req, err := http.NewRequest(http.MethodDelete, "/v1/evaluations", buf) + require.NoError(t, err) + respW := httptest.NewRecorder() + + // Make the request and check the response. + obj, err := s.Server.EvalsRequest(respW, req) + require.Equal(t, + CodedError(http.StatusBadRequest, + "request includes 8000 evaluations IDs, must be 7281 or fewer"), err) + require.Nil(t, obj) + }) + }, + name: "too many eval IDs", + }, + { + testFn: func() { + httpTest(t, func(c *Config) { + c.NomadConfig.DefaultSchedulerConfig.PauseEvalBroker = true + }, func(s *TestAgent) { + + // Generate a request with an eval ID that doesn't exist + // within state. + deleteReq := api.EvalDeleteRequest{EvalIDs: []string{uuid.Generate()}} + buf := encodeReq(&deleteReq) + + // Generate the HTTP request. + req, err := http.NewRequest(http.MethodDelete, "/v1/evaluations", buf) + require.NoError(t, err) + respW := httptest.NewRecorder() + + // Make the request and check the response. + obj, err := s.Server.EvalsRequest(respW, req) + require.Contains(t, err.Error(), "eval not found") + require.Nil(t, obj) + }) + }, + name: "eval doesn't exist", + }, + { + testFn: func() { + httpTest(t, func(c *Config) { + c.NomadConfig.DefaultSchedulerConfig.PauseEvalBroker = true + }, func(s *TestAgent) { + + // Upsert an eval into state. + mockEval := mock.Eval() + + err := s.Agent.server.State().UpsertEvals( + structs.MsgTypeTestSetup, 10, []*structs.Evaluation{mockEval}) + require.NoError(t, err) + + // Generate a request with the ID of the eval previously upserted. + deleteReq := api.EvalDeleteRequest{EvalIDs: []string{mockEval.ID}} + buf := encodeReq(&deleteReq) + + // Generate the HTTP request. + req, err := http.NewRequest(http.MethodDelete, "/v1/evaluations", buf) + require.NoError(t, err) + respW := httptest.NewRecorder() + + // Make the request and check the response. + obj, err := s.Server.EvalsRequest(respW, req) + require.Nil(t, err) + require.Nil(t, obj) + + // Ensure the eval is not found. + readEval, err := s.Agent.server.State().EvalByID(nil, mockEval.ID) + require.NoError(t, err) + require.Nil(t, readEval) + }) + }, + name: "successfully delete eval", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.testFn() + }) + } +} + func TestHTTP_EvalAllocations(t *testing.T) { ci.Parallel(t) httpTest(t, nil, func(s *TestAgent) { From b9324b5eead1a98b5df56f18146cf53c0f410d08 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 15:41:46 +0100 Subject: [PATCH 09/11] api: add eval delete API functionality. --- api/acl.go | 4 ++-- api/api.go | 7 ++++--- api/api_test.go | 2 +- api/csi.go | 10 +++++----- api/evaluations.go | 17 +++++++++++++++++ api/evaluations_test.go | 27 +++++++++++++++++++++++++++ api/jobs.go | 4 ++-- api/namespace.go | 2 +- api/quota.go | 2 +- api/raw.go | 2 +- api/sentinel.go | 2 +- api/services.go | 2 +- 12 files changed, 63 insertions(+), 18 deletions(-) diff --git a/api/acl.go b/api/acl.go index a964b01e013..4a289c666c7 100644 --- a/api/acl.go +++ b/api/acl.go @@ -42,7 +42,7 @@ func (a *ACLPolicies) Delete(policyName string, q *WriteOptions) (*WriteMeta, er if policyName == "" { return nil, fmt.Errorf("missing policy name") } - wm, err := a.client.delete("/v1/acl/policy/"+policyName, nil, q) + wm, err := a.client.delete("/v1/acl/policy/"+policyName, nil, nil, q) if err != nil { return nil, err } @@ -142,7 +142,7 @@ func (a *ACLTokens) Delete(accessorID string, q *WriteOptions) (*WriteMeta, erro if accessorID == "" { return nil, fmt.Errorf("missing accessor ID") } - wm, err := a.client.delete("/v1/acl/token/"+accessorID, nil, q) + wm, err := a.client.delete("/v1/acl/token/"+accessorID, nil, nil, q) if err != nil { return nil, err } diff --git a/api/api.go b/api/api.go index 3d3dcf88d08..8d3fc46d87b 100644 --- a/api/api.go +++ b/api/api.go @@ -982,14 +982,15 @@ func (c *Client) write(endpoint string, in, out interface{}, q *WriteOptions) (* return wm, nil } -// delete is used to do a DELETE request against an endpoint -// and serialize/deserialized using the standard Nomad conventions. -func (c *Client) delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) { +// delete is used to do a DELETE request against an endpoint and +// serialize/deserialized using the standard Nomad conventions. +func (c *Client) delete(endpoint string, in, out interface{}, q *WriteOptions) (*WriteMeta, error) { r, err := c.newRequest("DELETE", endpoint) if err != nil { return nil, err } r.setWriteOptions(q) + r.obj = in rtt, resp, err := requireOK(c.doRequest(r)) if err != nil { return nil, err diff --git a/api/api_test.go b/api/api_test.go index 82d35acf192..ce845c10fbe 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -107,7 +107,7 @@ func TestRequestTime(t *testing.T) { t.Errorf("bad request time: %d", wm.RequestTime) } - wm, err = client.delete("/", &out, nil) + wm, err = client.delete("/", nil, &out, nil) if err != nil { t.Fatalf("delete err: %v", err) } diff --git a/api/csi.go b/api/csi.go index b42202b67c1..5eb5d1b71b3 100644 --- a/api/csi.go +++ b/api/csi.go @@ -82,7 +82,7 @@ func (v *CSIVolumes) Register(vol *CSIVolume, w *WriteOptions) (*WriteMeta, erro // Deregister deregisters a single CSIVolume from Nomad. The volume will not be deleted from the external storage provider. func (v *CSIVolumes) Deregister(id string, force bool, w *WriteOptions) error { - _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v?force=%t", url.PathEscape(id), force), nil, w) + _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v?force=%t", url.PathEscape(id), force), nil, nil, w) return err } @@ -104,7 +104,7 @@ func (v *CSIVolumes) Create(vol *CSIVolume, w *WriteOptions) ([]*CSIVolume, *Wri // passed as an argument here is for the storage provider's ID, so a volume // that's already been deregistered can be deleted. func (v *CSIVolumes) Delete(externalVolID string, w *WriteOptions) error { - _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(externalVolID)), nil, w) + _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(externalVolID)), nil, nil, w) return err } @@ -117,7 +117,7 @@ func (v *CSIVolumes) DeleteOpts(req *CSIVolumeDeleteRequest, w *WriteOptions) er w = &WriteOptions{} } w.SetHeadersFromCSISecrets(req.Secrets) - _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(req.ExternalVolumeID)), nil, w) + _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/delete", url.PathEscape(req.ExternalVolumeID)), nil, nil, w) return err } @@ -125,7 +125,7 @@ func (v *CSIVolumes) DeleteOpts(req *CSIVolumeDeleteRequest, w *WriteOptions) er // node. This is used in the case that the node is temporarily lost and the // allocations are unable to drop their claims automatically. func (v *CSIVolumes) Detach(volID, nodeID string, w *WriteOptions) error { - _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/detach?node=%v", url.PathEscape(volID), nodeID), nil, w) + _, err := v.client.delete(fmt.Sprintf("/v1/volume/csi/%v/detach?node=%v", url.PathEscape(volID), nodeID), nil, nil, w) return err } @@ -152,7 +152,7 @@ func (v *CSIVolumes) DeleteSnapshot(snap *CSISnapshot, w *WriteOptions) error { w = &WriteOptions{} } w.SetHeadersFromCSISecrets(snap.Secrets) - _, err := v.client.delete("/v1/volumes/snapshot?"+qp.Encode(), nil, w) + _, err := v.client.delete("/v1/volumes/snapshot?"+qp.Encode(), nil, nil, w) return err } diff --git a/api/evaluations.go b/api/evaluations.go index 62d699ef320..1acf70db7d9 100644 --- a/api/evaluations.go +++ b/api/evaluations.go @@ -40,6 +40,18 @@ func (e *Evaluations) Info(evalID string, q *QueryOptions) (*Evaluation, *QueryM return &resp, qm, nil } +// Delete is used to batch delete evaluations using their IDs. +func (e *Evaluations) Delete(evalIDs []string, w *WriteOptions) (*WriteMeta, error) { + req := EvalDeleteRequest{ + EvalIDs: evalIDs, + } + wm, err := e.client.delete("/v1/evaluations", &req, nil, w) + if err != nil { + return nil, err + } + return wm, nil +} + // Allocations is used to retrieve a set of allocations given // an evaluation ID. func (e *Evaluations) Allocations(evalID string, q *QueryOptions) ([]*AllocationListStub, *QueryMeta, error) { @@ -108,6 +120,11 @@ type EvaluationStub struct { ModifyTime int64 } +type EvalDeleteRequest struct { + EvalIDs []string + WriteRequest +} + // EvalIndexSort is a wrapper to sort evaluations by CreateIndex. // We reverse the test so that we get the highest index first. type EvalIndexSort []*Evaluation diff --git a/api/evaluations_test.go b/api/evaluations_test.go index 0939f87e17e..2027f9083f2 100644 --- a/api/evaluations_test.go +++ b/api/evaluations_test.go @@ -147,6 +147,33 @@ func TestEvaluations_Info(t *testing.T) { } } +func TestEvaluations_Delete(t *testing.T) { + testutil.Parallel(t) + + testClient, testServer := makeClient(t, nil, nil) + defer testServer.Stop() + + // Attempting to delete an evaluation when the eval broker is not paused + // should return an error. + wm, err := testClient.Evaluations().Delete([]string{"8E231CF4-CA48-43FF-B694-5801E69E22FA"}, nil) + require.Nil(t, wm) + require.ErrorContains(t, err, "eval broker is enabled") + + // Pause the eval broker, and try to delete an evaluation that does not + // exist. + schedulerConfig, _, err := testClient.Operator().SchedulerGetConfiguration(nil) + require.NoError(t, err) + require.NotNil(t, schedulerConfig) + + schedulerConfig.SchedulerConfig.PauseEvalBroker = true + schedulerConfigUpdated, _, err := testClient.Operator().SchedulerCASConfiguration(schedulerConfig.SchedulerConfig, nil) + require.NoError(t, err) + require.True(t, schedulerConfigUpdated.Updated) + + wm, err = testClient.Evaluations().Delete([]string{"8E231CF4-CA48-43FF-B694-5801E69E22FA"}, nil) + require.ErrorContains(t, err, "eval not found") +} + func TestEvaluations_Allocations(t *testing.T) { testutil.Parallel(t) c, s := makeClient(t, nil, nil) diff --git a/api/jobs.go b/api/jobs.go index 656b3c92315..c781c628a05 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -288,7 +288,7 @@ func (j *Jobs) Evaluations(jobID string, q *QueryOptions) ([]*Evaluation, *Query // eventually GC'ed from the system. Most callers should not specify purge. func (j *Jobs) Deregister(jobID string, purge bool, q *WriteOptions) (string, *WriteMeta, error) { var resp JobDeregisterResponse - wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", url.PathEscape(jobID), purge), &resp, q) + wm, err := j.client.delete(fmt.Sprintf("/v1/job/%v?purge=%t", url.PathEscape(jobID), purge), nil, &resp, q) if err != nil { return "", nil, err } @@ -334,7 +334,7 @@ func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOpt opts.Purge, opts.Global, opts.EvalPriority, opts.NoShutdownDelay) } - wm, err := j.client.delete(endpoint, &resp, q) + wm, err := j.client.delete(endpoint, nil, &resp, q) if err != nil { return "", nil, err } diff --git a/api/namespace.go b/api/namespace.go index 7e53521263c..3a21e224753 100644 --- a/api/namespace.go +++ b/api/namespace.go @@ -58,7 +58,7 @@ func (n *Namespaces) Register(namespace *Namespace, q *WriteOptions) (*WriteMeta // Delete is used to delete a namespace func (n *Namespaces) Delete(namespace string, q *WriteOptions) (*WriteMeta, error) { - wm, err := n.client.delete(fmt.Sprintf("/v1/namespace/%s", namespace), nil, q) + wm, err := n.client.delete(fmt.Sprintf("/v1/namespace/%s", namespace), nil, nil, q) if err != nil { return nil, err } diff --git a/api/quota.go b/api/quota.go index 029f1f4a55e..be4e46c7e95 100644 --- a/api/quota.go +++ b/api/quota.go @@ -90,7 +90,7 @@ func (q *Quotas) Register(spec *QuotaSpec, qo *WriteOptions) (*WriteMeta, error) // Delete is used to delete a quota spec func (q *Quotas) Delete(quota string, qo *WriteOptions) (*WriteMeta, error) { - wm, err := q.client.delete(fmt.Sprintf("/v1/quota/%s", quota), nil, qo) + wm, err := q.client.delete(fmt.Sprintf("/v1/quota/%s", quota), nil, nil, qo) if err != nil { return nil, err } diff --git a/api/raw.go b/api/raw.go index 9369829c511..077f87dd064 100644 --- a/api/raw.go +++ b/api/raw.go @@ -34,5 +34,5 @@ func (raw *Raw) Write(endpoint string, in, out interface{}, q *WriteOptions) (*W // Delete is used to do a DELETE request against an endpoint // and serialize/deserialized using the standard Nomad conventions. func (raw *Raw) Delete(endpoint string, out interface{}, q *WriteOptions) (*WriteMeta, error) { - return raw.c.delete(endpoint, out, q) + return raw.c.delete(endpoint, nil, out, q) } diff --git a/api/sentinel.go b/api/sentinel.go index c1e52c7cb81..fdccd9f6b64 100644 --- a/api/sentinel.go +++ b/api/sentinel.go @@ -39,7 +39,7 @@ func (a *SentinelPolicies) Delete(policyName string, q *WriteOptions) (*WriteMet if policyName == "" { return nil, fmt.Errorf("missing policy name") } - wm, err := a.client.delete("/v1/sentinel/policy/"+policyName, nil, q) + wm, err := a.client.delete("/v1/sentinel/policy/"+policyName, nil, nil, q) if err != nil { return nil, err } diff --git a/api/services.go b/api/services.go index 9cd9d8c5f9d..55d2b01c278 100644 --- a/api/services.go +++ b/api/services.go @@ -122,7 +122,7 @@ func (s *Services) Get(serviceName string, q *QueryOptions) ([]*ServiceRegistrat // by its service name and service ID. func (s *Services) Delete(serviceName, serviceID string, q *WriteOptions) (*WriteMeta, error) { path := fmt.Sprintf("/v1/service/%s/%s", url.PathEscape(serviceName), url.PathEscape(serviceID)) - wm, err := s.client.delete(path, nil, q) + wm, err := s.client.delete(path, nil, nil, q) if err != nil { return nil, err } From 84eeae7193d2d31d68fe1792cf7114d163f73f44 Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 15:41:56 +0100 Subject: [PATCH 10/11] cli: add eval delete command. --- command/commands.go | 5 + command/eval.go | 32 ++- command/eval_delete.go | 406 ++++++++++++++++++++++++++++++++++++ command/eval_delete_test.go | 184 ++++++++++++++++ command/eval_list.go | 15 +- 5 files changed, 626 insertions(+), 16 deletions(-) create mode 100644 command/eval_delete.go create mode 100644 command/eval_delete_test.go diff --git a/command/commands.go b/command/commands.go index 9bdefe891cf..2776298cf61 100644 --- a/command/commands.go +++ b/command/commands.go @@ -265,6 +265,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "eval delete": func() (cli.Command, error) { + return &EvalDeleteCommand{ + Meta: meta, + }, nil + }, "eval list": func() (cli.Command, error) { return &EvalListCommand{ Meta: meta, diff --git a/command/eval.go b/command/eval.go index af0ddaa5925..82311fb84f7 100644 --- a/command/eval.go +++ b/command/eval.go @@ -1,8 +1,10 @@ package command import ( + "fmt" "strings" + "github.com/hashicorp/nomad/api" "github.com/mitchellh/cli" ) @@ -19,10 +21,18 @@ Usage: nomad eval [options] [args] detail but can be useful for debugging placement failures when the cluster does not have the resources to run a given job. + List evaluations: + + $ nomad eval list + Examine an evaluations status: $ nomad eval status + Delete evaluations: + + $ nomad eval delete + Please see the individual subcommand help for detailed usage information. ` @@ -35,6 +45,24 @@ func (f *EvalCommand) Synopsis() string { func (f *EvalCommand) Name() string { return "eval" } -func (f *EvalCommand) Run(args []string) int { - return cli.RunResultHelp +func (f *EvalCommand) Run(_ []string) int { return cli.RunResultHelp } + +// outputEvalList is a helper which outputs an array of evaluations as a list +// to the UI with key information such as ID and status. +func outputEvalList(ui cli.Ui, evals []*api.Evaluation, length int) { + + out := make([]string, len(evals)+1) + out[0] = "ID|Priority|Triggered By|Job ID|Status|Placement Failures" + for i, eval := range evals { + failures, _ := evalFailureStatus(eval) + out[i+1] = fmt.Sprintf("%s|%d|%s|%s|%s|%s", + limit(eval.ID, length), + eval.Priority, + eval.TriggeredBy, + eval.JobID, + eval.Status, + failures, + ) + } + ui.Output(formatList(out)) } diff --git a/command/eval_delete.go b/command/eval_delete.go new file mode 100644 index 00000000000..381c161ae29 --- /dev/null +++ b/command/eval_delete.go @@ -0,0 +1,406 @@ +package command + +import ( + "errors" + "fmt" + "strings" + "time" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/api/contexts" + "github.com/posener/complete" +) + +type EvalDeleteCommand struct { + Meta + + filter string + yes bool + + // deleteByArg is set when the command is deleting an evaluation that has + // been passed as an argument. This avoids need for confirmation. + deleteByArg bool + + // numDeleted tracks the total evaluations deleted in a single run of this + // command. It provides a way to output this information to the user at the + // command completion. + numDeleted int + + // client is the lazy-loaded API client and is stored here, so we don't + // need to pass it to multiple functions. + client *api.Client +} + +func (e *EvalDeleteCommand) Help() string { + helpText := ` +Usage: nomad eval delete [options] + + Delete an evaluation by ID. If the evaluation ID is omitted, this command + will use the filter flag to identify and delete a set of evaluations. If ACLs + are enabled, this command requires a management ACL token. + + This command should be used cautiously and only in outage situations where + there is a large backlog of evaluations not being processed. During most + normal and outage scenarios, Nomads reconciliation and state management will + handle evaluations as needed. + + The eval broker is expected to be paused prior to running this command and + un-paused after. This can be done using the following two commands: + - nomad operator scheduler set-config -pause-eval-broker=true + - nomad operator scheduler set-config -pause-eval-broker=false + +General Options: + + ` + generalOptionsUsage(usageOptsNoNamespace) + ` + +Eval Delete Options: + + -filter + Specifies an expression used to filter evaluations by for deletion. When + using this flag, it is advisable to ensure the syntax is correct using the + eval list command first. + + -yes + Bypass the confirmation prompt if an evaluation ID was not provided. +` + + return strings.TrimSpace(helpText) +} + +func (e *EvalDeleteCommand) Synopsis() string { + return "Delete evaluations by ID or using a filter" +} + +func (e *EvalDeleteCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(e.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-filter": complete.PredictAnything, + "-yes": complete.PredictNothing, + }) +} + +func (e *EvalDeleteCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFunc(func(a complete.Args) []string { + client, err := e.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Evals, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Evals] + }) +} + +func (e *EvalDeleteCommand) Name() string { return "eval delete" } + +func (e *EvalDeleteCommand) Run(args []string) int { + + flags := e.Meta.FlagSet(e.Name(), FlagSetClient) + flags.Usage = func() { e.Ui.Output(e.Help()) } + flags.StringVar(&e.filter, "filter", "", "") + flags.BoolVar(&e.yes, "yes", false, "") + if err := flags.Parse(args); err != nil { + return 1 + } + + args = flags.Args() + + if err := e.verifyArgsAndFlags(args); err != nil { + e.Ui.Error(fmt.Sprintf("Error validating command args and flags: %v", err)) + return 1 + } + + // Get the HTTP client and store this for use across multiple functions. + client, err := e.Meta.Client() + if err != nil { + e.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + e.client = client + + // Ensure the eval broker is paused. This check happens multiple times on + // the leader, but this check means we can provide quick and actionable + // feedback. + schedulerConfig, _, err := e.client.Operator().SchedulerGetConfiguration(nil) + if err != nil { + e.Ui.Error(fmt.Sprintf("Error querying scheduler configuration: %s", err)) + return 1 + } + + if !schedulerConfig.SchedulerConfig.PauseEvalBroker { + e.Ui.Error("Eval broker is not paused") + e.Ui.Output(`To delete evaluations you must first pause the eval broker by running "nomad operator scheduler set-config -pause-eval-broker=true"`) + e.Ui.Output(`After the deletion is complete, unpause the eval broker by running "nomad operator scheduler set-config -pause-eval-broker=false"`) + return 1 + } + + // Track the eventual exit code as there are a number of factors that + // influence this. + var exitCode int + + // Call the correct function in order to handle the operator input + // correctly. + switch len(args) { + case 1: + e.deleteByArg = true + exitCode, err = e.handleEvalArgDelete(args[0]) + default: + + // Track the next token, so we can iterate all pages that match the + // passed filter. + var nextToken string + + // It is possible the filter matches a large number of evaluations + // which means we need to run a number of batch deletes. Perform + // iteration here rather than recursion in later function, so we avoid + // any potential issues with stack size limits. + for { + exitCode, nextToken, err = e.handleFlagFilterDelete(nextToken) + + // If there is another page of evaluations matching the filter, + // iterate the loop and delete the next batch of evals. We pause + // for a 500ms rather than just run as fast as the code and machine + // possibly can. This means deleting 13million evals will take + // roughly 13-15 mins, which seems reasonable. It is worth noting, + // we do not expect operators to delete this many evals in a single + // run and expect more careful filtering options to be used. + if nextToken != "" { + time.Sleep(500 * time.Millisecond) + continue + } else { + break + } + } + } + + // Do not exit if we got an error as it's possible this was on the + // non-first iteration, and we have therefore deleted some evals. + if err != nil { + e.Ui.Error(fmt.Sprintf("Error deleting evaluations: %s", err)) + } + + // Depending on whether we deleted evaluations or not, output a message so + // this is clear. + if e.numDeleted > 0 { + e.Ui.Output(fmt.Sprintf("Successfully deleted %v %s", + e.numDeleted, correctGrammar("evaluation", e.numDeleted))) + } else if err == nil { + e.Ui.Output("No evaluations were deleted") + } + + return exitCode +} + +// verifyArgsAndFlags ensures the passed arguments and flags are valid for what +// this command accepts and can take action on. +func (e *EvalDeleteCommand) verifyArgsAndFlags(args []string) error { + + numArgs := len(args) + + // The command takes either an argument or filter, but not both. + if (e.filter == "" && numArgs < 1) || (e.filter != "" && numArgs > 0) { + return errors.New("evaluation ID or filter flag required") + } + + // If an argument is supplied, we only accept a single eval ID. + if numArgs > 1 { + return fmt.Errorf("expected 1 argument, got %v", numArgs) + } + + return nil +} + +// handleEvalArgDelete handles deletion and evaluation which was passed via +// it's ID as a command argument. This is the simplest route to take and +// doesn't require filtering or batching. +func (e *EvalDeleteCommand) handleEvalArgDelete(evalID string) (int, error) { + evalInfo, _, err := e.client.Evaluations().Info(evalID, nil) + if err != nil { + return 1, err + } + + // Supplying an eval to delete by its ID will always skip verification, so + // we don't need to understand the boolean response. + code, _, err := e.batchDelete([]*api.Evaluation{evalInfo}) + return code, err +} + +// handleFlagFilterDelete handles deletion of evaluations discovered using +// the filter. It is unknown how many will match the operator criteria so +// this function batches lookup and delete requests into sensible numbers. +func (e *EvalDeleteCommand) handleFlagFilterDelete(nt string) (int, string, error) { + + evalsToDelete, nextToken, err := e.batchLookupEvals(nt) + if err != nil { + return 1, "", err + } + + numEvalsToDelete := len(evalsToDelete) + + // The filter flags are operator controlled, therefore ensure we + // actually found some evals to delete. Otherwise, inform the operator + // their flags are potentially incorrect. + if numEvalsToDelete == 0 { + if e.numDeleted > 0 { + return 0, "", nil + } else { + return 1, "", errors.New("failed to find any evals that matched filter criteria") + } + } + + if code, actioned, err := e.batchDelete(evalsToDelete); err != nil { + return code, "", err + } else if !actioned { + return code, "", nil + } + + e.Ui.Info(fmt.Sprintf("Successfully deleted batch of %v %s", + numEvalsToDelete, correctGrammar("evaluation", numEvalsToDelete))) + + return 0, nextToken, nil +} + +// batchLookupEvals handles batched lookup of evaluations using the operator +// provided filter. The lookup is performed a maximum number of 3 times to +// ensure their size is limited and the number of evals to delete doesn't exceed +// the total allowable in a single call. +// +// The JSON serialized evaluation API object is 350-380B in size. +// 2426 * 380B (3.8e-4 MB) = 0.92MB. We may want to make this configurable +// in the future, but this is counteracted by the CLI logic which will loop +// until the user tells it to exit, or all evals matching the filter are +// deleted. 2426 * 3 falls below the maximum limit for eval IDs in a single +// delete request (set by MaxEvalIDsPerDeleteRequest). +func (e *EvalDeleteCommand) batchLookupEvals(nextToken string) ([]*api.Evaluation, string, error) { + + var evalsToDelete []*api.Evaluation + currentNextToken := nextToken + + // Call List 3 times to accumulate the maximum number if eval IDs supported + // in a single Delete request. See math above. + for i := 0; i < 3; i++ { + + // Generate the query options using the passed next token and filter. The + // per page value is less than the total number we can include in a single + // delete request. This keeps the maximum size of the return object at a + // reasonable size. + opts := api.QueryOptions{ + Filter: e.filter, + PerPage: 2426, + NextToken: currentNextToken, + } + + evalList, meta, err := e.client.Evaluations().List(&opts) + if err != nil { + return nil, "", err + } + + if len(evalList) > 0 { + evalsToDelete = append(evalsToDelete, evalList...) + } + + // Store the next token no matter if it is empty or populated. + currentNextToken = meta.NextToken + + // If there is no next token, ensure we exit and avoid any new loops + // which will result in duplicate IDs. + if currentNextToken == "" { + break + } + } + + return evalsToDelete, currentNextToken, nil +} + +// batchDelete is responsible for deleting the passed evaluations and asking +// any confirmation questions along the way. It will ask whether the operator +// want to list the evals before deletion, and optionally ask for confirmation +// before deleting based on input criteria. +func (e *EvalDeleteCommand) batchDelete(evals []*api.Evaluation) (int, bool, error) { + + // Ask whether the operator wants to see the list of evaluations before + // moving forward with deletion. This will only happen if filters are used + // and the confirmation step is not bypassed. + if !e.yes && !e.deleteByArg { + _, listEvals := e.askQuestion(fmt.Sprintf( + "Do you want to list evals (%v) before deletion? [y/N]", + len(evals)), "") + + // List the evals for deletion is the user has requested this. It can + // be useful when the list is small and targeted, but is maybe best + // avoided when deleting large quantities of evals. + if listEvals { + e.Ui.Output("") + outputEvalList(e.Ui, evals, shortId) + e.Ui.Output("") + } + } + + // Generate our list of eval IDs which is required for the API request. + ids := make([]string, len(evals)) + + for i, eval := range evals { + ids[i] = eval.ID + } + + // If the user did not wish to bypass the confirmation step, ask this now + // and handle the response. + if !e.yes && !e.deleteByArg { + code, deleteEvals := e.askQuestion(fmt.Sprintf( + "Are you sure you want to delete %v evals? [y/N]", + len(evals)), "Cancelling eval deletion") + e.Ui.Output("") + + if !deleteEvals { + return code, deleteEvals, nil + } + } + + _, err := e.client.Evaluations().Delete(ids, nil) + if err != nil { + return 1, false, err + } + + // Calculate how many total evaluations we have deleted, so we can output + // this at the end of the process. + curDeleted := e.numDeleted + e.numDeleted = curDeleted + len(ids) + + return 0, true, nil +} + +// askQuestion allows the command to ask the operator a question requiring a +// y/n response. The optional noResp is used when the operator responds no to +// a question. +func (e *EvalDeleteCommand) askQuestion(question, noResp string) (int, bool) { + + answer, err := e.Ui.Ask(question) + if err != nil { + e.Ui.Error(fmt.Sprintf("Failed to parse answer: %v", err)) + return 1, false + } + + if answer == "" || strings.ToLower(answer)[0] == 'n' { + if noResp != "" { + e.Ui.Output(noResp) + } + return 0, false + } else if strings.ToLower(answer)[0] == 'y' && len(answer) > 1 { + e.Ui.Output("For confirmation, an exact ‘y’ is required.") + return 0, false + } else if answer != "y" { + e.Ui.Output("No confirmation detected. For confirmation, an exact 'y' is required.") + return 1, false + } + return 0, true +} + +func correctGrammar(word string, num int) string { + if num > 1 { + return word + "s" + } + return word +} diff --git a/command/eval_delete_test.go b/command/eval_delete_test.go new file mode 100644 index 00000000000..bfdd3d8dd1f --- /dev/null +++ b/command/eval_delete_test.go @@ -0,0 +1,184 @@ +package command + +import ( + "errors" + "fmt" + "testing" + + "github.com/hashicorp/nomad/ci" + "github.com/mitchellh/cli" + "github.com/stretchr/testify/require" +) + +func TestEvalDeleteCommand_Run(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + testFn func() + name string + }{ + { + testFn: func() { + + testServer, client, url := testServer(t, false, nil) + defer testServer.Shutdown() + + // Create the UI and command. + ui := cli.NewMockUi() + cmd := &EvalDeleteCommand{ + Meta: Meta{ + Ui: ui, + flagAddress: url, + }, + } + + // Test basic command input validation. + require.Equal(t, 1, cmd.Run([]string{"-address=" + url})) + require.Contains(t, ui.ErrorWriter.String(), "Error validating command args and flags") + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + + // Try running the command when the eval broker is not paused. + require.Equal(t, 1, cmd.Run([]string{"-address=" + url, "fa3a8c37-eac3-00c7-3410-5ba3f7318fd8"})) + require.Contains(t, ui.ErrorWriter.String(), "Eval broker is not paused") + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + + // Paused the eval broker, then try deleting with an eval that + // does not exist. + schedulerConfig, _, err := client.Operator().SchedulerGetConfiguration(nil) + require.NoError(t, err) + require.False(t, schedulerConfig.SchedulerConfig.PauseEvalBroker) + + schedulerConfig.SchedulerConfig.PauseEvalBroker = true + _, _, err = client.Operator().SchedulerSetConfiguration(schedulerConfig.SchedulerConfig, nil) + require.NoError(t, err) + require.True(t, schedulerConfig.SchedulerConfig.PauseEvalBroker) + + require.Equal(t, 1, cmd.Run([]string{"-address=" + url, "fa3a8c37-eac3-00c7-3410-5ba3f7318fd8"})) + require.Contains(t, ui.ErrorWriter.String(), "eval not found") + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + }, + name: "failure", + }, + { + testFn: func() { + + testServer, client, url := testServer(t, false, nil) + defer testServer.Shutdown() + + // Create the UI and command. + ui := cli.NewMockUi() + cmd := &EvalDeleteCommand{ + Meta: Meta{ + Ui: ui, + flagAddress: url, + }, + } + + // Paused the eval broker. + schedulerConfig, _, err := client.Operator().SchedulerGetConfiguration(nil) + require.NoError(t, err) + require.False(t, schedulerConfig.SchedulerConfig.PauseEvalBroker) + + schedulerConfig.SchedulerConfig.PauseEvalBroker = true + _, _, err = client.Operator().SchedulerSetConfiguration(schedulerConfig.SchedulerConfig, nil) + require.NoError(t, err) + require.True(t, schedulerConfig.SchedulerConfig.PauseEvalBroker) + + // With the eval broker paused, run a job register several times + // to generate evals that will not be acted on. + testJob := testJob("eval-delete") + + evalIDs := make([]string, 3) + for i := 0; i < 3; i++ { + regResp, _, err := client.Jobs().Register(testJob, nil) + require.NoError(t, err) + require.NotNil(t, regResp) + require.NotEmpty(t, regResp.EvalID) + evalIDs[i] = regResp.EvalID + } + + // Ensure we have three evaluations in state. + evalList, _, err := client.Evaluations().List(nil) + require.NoError(t, err) + require.Len(t, evalList, 3) + + // Attempted to delete one eval using the ID. + require.Equal(t, 0, cmd.Run([]string{"-address=" + url, evalIDs[0]})) + require.Contains(t, ui.OutputWriter.String(), "Successfully deleted 1 evaluation") + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + + // We modify the number deleted on each command run, so we + // need to reset this in order to check the next command + // output. + cmd.numDeleted = 0 + + // Attempted to delete the remaining two evals using a filter + // expression. + expr := fmt.Sprintf("JobID == %q and Status == \"pending\" ", *testJob.Name) + require.Equal(t, 0, cmd.Run([]string{"-address=" + url, "-filter=" + expr})) + require.Contains(t, ui.OutputWriter.String(), "Successfully deleted 2 evaluations") + ui.ErrorWriter.Reset() + ui.OutputWriter.Reset() + + // Ensure we have zero evaluations in state. + evalList, _, err = client.Evaluations().List(nil) + require.NoError(t, err) + require.Len(t, evalList, 0) + }, + name: "successful", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.testFn() + }) + } +} + +func TestEvalDeleteCommand_verifyArgsAndFlags(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + inputEvalDeleteCommand *EvalDeleteCommand + inputArgs []string + expectedError error + name string + }{ + { + inputEvalDeleteCommand: &EvalDeleteCommand{ + filter: `Status == "Pending"`, + }, + inputArgs: []string{"fa3a8c37-eac3-00c7-3410-5ba3f7318fd8"}, + expectedError: errors.New("evaluation ID or filter flag required"), + name: "arg and flags", + }, + { + inputEvalDeleteCommand: &EvalDeleteCommand{ + filter: "", + }, + inputArgs: []string{}, + expectedError: errors.New("evaluation ID or filter flag required"), + name: "no arg or flags", + }, + { + inputEvalDeleteCommand: &EvalDeleteCommand{ + filter: "", + }, + inputArgs: []string{"fa3a8c37-eac3-00c7-3410-5ba3f7318fd8", "fa3a8c37-eac3-00c7-3410-5ba3f7318fd9"}, + expectedError: errors.New("expected 1 argument, got 2"), + name: "multiple args", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + actualError := tc.inputEvalDeleteCommand.verifyArgsAndFlags(tc.inputArgs) + require.Equal(t, tc.expectedError, actualError) + }) + } +} diff --git a/command/eval_list.go b/command/eval_list.go index 86d2b361fff..f74d172e921 100644 --- a/command/eval_list.go +++ b/command/eval_list.go @@ -167,20 +167,7 @@ func (c *EvalListCommand) Run(args []string) int { length = fullId } - out := make([]string, len(evals)+1) - out[0] = "ID|Priority|Triggered By|Job ID|Status|Placement Failures" - for i, eval := range evals { - failures, _ := evalFailureStatus(eval) - out[i+1] = fmt.Sprintf("%s|%d|%s|%s|%s|%s", - limit(eval.ID, length), - eval.Priority, - eval.TriggeredBy, - eval.JobID, - eval.Status, - failures, - ) - } - c.Ui.Output(formatList(out)) + outputEvalList(c.Ui, evals, length) if qm.NextToken != "" { c.Ui.Output(fmt.Sprintf(` From c16352a67d026c6f34ad46431c1e7f6a370bda1c Mon Sep 17 00:00:00 2001 From: James Rasell Date: Fri, 1 Jul 2022 15:42:07 +0100 Subject: [PATCH 11/11] docs: add eval delete website doucmentation. --- .changelog/13492.txt | 7 ++ website/content/api-docs/evaluations.mdx | 51 +++++++++++++ website/content/docs/commands/eval/delete.mdx | 75 +++++++++++++++++++ website/content/docs/commands/eval/index.mdx | 3 +- website/data/docs-nav-data.json | 4 + 5 files changed, 139 insertions(+), 1 deletion(-) create mode 100644 .changelog/13492.txt create mode 100644 website/content/docs/commands/eval/delete.mdx diff --git a/.changelog/13492.txt b/.changelog/13492.txt new file mode 100644 index 00000000000..73e01837d2b --- /dev/null +++ b/.changelog/13492.txt @@ -0,0 +1,7 @@ +```release-note:improvement +cli: Added `delete` command to the eval CLI +``` + +```release-note:improvement +agent: Added delete support to the eval HTTP API +``` diff --git a/website/content/api-docs/evaluations.mdx b/website/content/api-docs/evaluations.mdx index bea826da652..0a36a6f5ef6 100644 --- a/website/content/api-docs/evaluations.mdx +++ b/website/content/api-docs/evaluations.mdx @@ -204,6 +204,55 @@ $ curl \ } ``` +## Delete Evaluations + +This endpoint deletes evaluations. In order to utilise this endpoint the +eval broker should be paused via the +[update_scheduler_configuration][operator scheduler update configuration] API +endpoint. + +This API endpoint should be used cautiously and only in outage situations where +there is a large backlog of evaluations not being processed. During most normal +and outage scenarios, Nomad's reconciliation and state management will handle +evaluations as needed. + +| Method | Path | Produces | +| --------- | ----------------- | ------------------ | +| `DELETE` | `/v1/evaluations` | `application/json` | + +The table below shows this endpoint's support for +[blocking queries](/api-docs#blocking-queries) and +[required ACLs](/api-docs#acls). + +| Blocking Queries | ACL Required | +| ---------------- | ------------ | +| `NO` | `management` | + +### Parameters + +- `EvalIDs` `(array: )`- An array of evaluation UUIDs to + delete. This must be a full length UUID and not a prefix. + +### Sample Payload + +```javascript +{ + "EvalIDs": [ + "167ec27d-2e36-979a-280a-a6b920d382db", + "6c193955-ac66-42e2-f4c7-f1fc707f1f5e" + ] +} +``` + +### Sample Request + +```shell-session +$ curl \ + --request DELETE \ + --data @payload.json \ + https://localhost:4646/v1/evaluations +``` + ## List Allocations for Evaluation This endpoint lists the allocations created or modified for the given @@ -332,3 +381,5 @@ $ curl \ } ] ``` + +[update_scheduler_configuration]: api-docs/operator/scheduler#update-scheduler-configuration diff --git a/website/content/docs/commands/eval/delete.mdx b/website/content/docs/commands/eval/delete.mdx new file mode 100644 index 00000000000..42cf5125eb4 --- /dev/null +++ b/website/content/docs/commands/eval/delete.mdx @@ -0,0 +1,75 @@ +--- +layout: docs +page_title: 'Commands: eval delete' +description: | + The eval delete command is used to delete evaluations. +--- + +# Command: eval delete + +The `eval delete` command is used to delete evaluations. It should be used +cautiously and only in outage situations where there is a large backlog of +evaluations not being processed. During most normal and outage scenarios, +Nomad's reconciliation and state management will handle evaluations as needed. + +The eval broker is expected to be paused prior to running this command and +un-paused after. These actions can be performed by the +[`operator scheduler get-config`][scheduler_get_config] +and [`operator scheduler set-config`][scheduler_set_config] commands respectively. + +## Usage + +```plaintext +nomad eval delete [options] [args] +``` + +It takes an optional argument which is the ID of the evaluation to delete. If +the evaluation ID is omitted, this command will use the filter flag to identify +and delete a set of evaluations. + +When ACLs are enabled, this command requires a `management` token. + +## General Options + +@include 'general_options.mdx' + +## Delete Options + +- `-filter`: Specifies an expression used to filter evaluations by for + deletion. + +- `-yes`: Bypass the confirmation prompt if an evaluation ID was not provided. + +## Examples + +Delete an evaluation using its ID: + +```shell-session +$ nomad eval delete 9ecffbba-73be-d909-5d7e-ac2694c10e0c +Successfuly deleted 1 evaluation +``` + +Delete all evaluations with status `pending` for the `example` job: +```shell-session +$ nomad eval delete -filter='Stauts == "pending" and JobID == "example"' +Do you want to list evals (3) before deletion? [y/N] y + +ID Priority Triggered By Job ID Status Placement Failures +cef92121 50 job-register example pending false +1c905ca0 50 job-register example pending false +b9e77692 50 job-register example pending false + +Are you sure you want to delete 3 evals? [y/N] y + +Successfuly deleted 3 evaluations +``` + +Delete all evaluations for the `system` and `service` whilst skipping all +prompts: +```shell-session +$ nomad eval delete -filter='Scheduler == "system" or Scheduler == "service"' -yes +Successfuly deleted 23 evaluations +``` + +[scheduler_get_config]: /docs/commands/operator/scheduler-get-config +[scheduler_set_config]: /docs/commands/operator/scheduler-set-config diff --git a/website/content/docs/commands/eval/index.mdx b/website/content/docs/commands/eval/index.mdx index 504cd8a1777..3b8e4b6c844 100644 --- a/website/content/docs/commands/eval/index.mdx +++ b/website/content/docs/commands/eval/index.mdx @@ -15,9 +15,10 @@ Usage: `nomad eval [options]` Run `nomad eval -h` for help on that subcommand. The following subcommands are available: - +- [`eval delete`][delete] - Delete evals - [`eval list`][list] - List all evals - [`eval status`][status] - Display the status of a eval +[delete]: /docs/commands/eval/delete 'Delete evals' [list]: /docs/commands/eval/list 'List all evals' [status]: /docs/commands/eval/status 'Display the status of a eval' diff --git a/website/data/docs-nav-data.json b/website/data/docs-nav-data.json index 50199de2a2e..bd116674692 100644 --- a/website/data/docs-nav-data.json +++ b/website/data/docs-nav-data.json @@ -372,6 +372,10 @@ "title": "Overview", "path": "commands/eval" }, + { + "title": "delete", + "path": "commands/eval/delete" + }, { "title": "list", "path": "commands/eval/list"