From 179b60b1c4669206089466c34b74328cb8aee5cc Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 30 Nov 2021 17:03:16 -0500 Subject: [PATCH 1/2] provide `-no-shutdown-delay` flag for job/alloc stop Some operators use very long group/task `shutdown_delay` settings to safely drain network connections to their workloads after service deregistration. But during incident response, they may want to cause that drain to be skipped so they can quickly shed load. Provide a `-no-shutdown-delay` flag on the `nomad alloc stop` and `nomad job stop` commands that bypasses the delay. This sets a new desired transition state on the affected allocations that the allocation/task runner will identify during pre-kill on the client. Note (as documented here) that using this flag will almost always result in failed inbound network connections for workloads as the tasks will exit before clients receive updated service discovery information and won't be gracefully drained. --- .changelog/11596.txt | 3 + api/jobs.go | 9 +- client/allocrunner/alloc_runner.go | 12 ++ client/allocrunner/alloc_runner_hooks.go | 1 + client/allocrunner/groupservice_hook.go | 13 +- client/allocrunner/taskrunner/task_runner.go | 22 ++++ .../taskrunner/task_runner_test.go | 116 +++++++++++++++--- command/agent/alloc_endpoint.go | 12 +- command/agent/job_endpoint.go | 12 ++ command/alloc_stop.go | 16 ++- command/job_stop.go | 32 +++-- nomad/alloc_endpoint.go | 3 +- nomad/fsm.go | 26 +++- nomad/job_endpoint_test.go | 91 ++++++++++++++ nomad/state/state_store.go | 11 +- nomad/structs/structs.go | 26 +++- website/content/docs/commands/alloc/stop.mdx | 7 ++ website/content/docs/commands/job/stop.mdx | 7 ++ 18 files changed, 372 insertions(+), 47 deletions(-) create mode 100644 .changelog/11596.txt diff --git a/.changelog/11596.txt b/.changelog/11596.txt new file mode 100644 index 00000000000..74b451c0278 --- /dev/null +++ b/.changelog/11596.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: provide `-no-shutdown-delay` option to `job stop` and `alloc stop` commands to ignore `shutdown_delay` +``` diff --git a/api/jobs.go b/api/jobs.go index 146f65cf713..61d0c2892ce 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -299,6 +299,11 @@ type DeregisterOptions struct { // is useful when an operator wishes to push through a job deregistration // in busy clusters with a large evaluation backlog. EvalPriority int + + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and ignore the delay for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay bool } // DeregisterOpts is used to remove an existing job. See DeregisterOptions @@ -312,8 +317,8 @@ func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOpt // Protect against nil opts. url.Values expects a string, and so using // fmt.Sprintf is the best way to do this. if opts != nil { - endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v", - opts.Purge, opts.Global, opts.EvalPriority) + endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v&no_shutdown_delay=%t", + opts.Purge, opts.Global, opts.EvalPriority, opts.NoShutdownDelay) } wm, err := j.client.delete(endpoint, &resp, q) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 7dcf072a948..c846b091693 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -171,6 +171,9 @@ type allocRunner struct { taskHookCoordinator *taskHookCoordinator + shutdownDelayCtx context.Context + shutdownDelayCancelFn context.CancelFunc + // rpcClient is the RPC Client that should be used by the allocrunner and its // hooks to communicate with Nomad Servers. rpcClient RPCer @@ -230,6 +233,10 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { ar.taskHookCoordinator = newTaskHookCoordinator(ar.logger, tg.Tasks) + shutdownDelayCtx, shutdownDelayCancel := context.WithCancel(context.Background()) + ar.shutdownDelayCtx = shutdownDelayCtx + ar.shutdownDelayCancelFn = shutdownDelayCancel + // Initialize the runners hooks. if err := ar.initRunnerHooks(config.ClientConfig); err != nil { return nil, err @@ -265,6 +272,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { DriverManager: ar.driverManager, ServersContactedCh: ar.serversContactedCh, StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task), + ShutdownDelayCtx: ar.shutdownDelayCtx, } if ar.cpusetManager != nil { @@ -824,6 +832,10 @@ func (ar *allocRunner) Update(update *structs.Allocation) { default: } + if update.DesiredTransition.ShouldIgnoreShutdownDelay() { + ar.shutdownDelayCancelFn() + } + // Queue the new update ar.allocUpdatedCh <- update } diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 9624e633c73..8b79da4d5cf 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -159,6 +159,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { taskEnvBuilder: envBuilder, networkStatusGetter: ar, logger: hookLogger, + shutdownDelayCtx: ar.shutdownDelayCtx, }), newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), diff --git a/client/allocrunner/groupservice_hook.go b/client/allocrunner/groupservice_hook.go index 778109e65f4..69eae41e8bf 100644 --- a/client/allocrunner/groupservice_hook.go +++ b/client/allocrunner/groupservice_hook.go @@ -1,6 +1,7 @@ package allocrunner import ( + "context" "sync" "time" @@ -29,9 +30,9 @@ type groupServiceHook struct { consulClient consul.ConsulServiceAPI consulNamespace string prerun bool - delay time.Duration deregistered bool networkStatusGetter networkStatusGetter + shutdownDelayCtx context.Context logger log.Logger @@ -41,6 +42,7 @@ type groupServiceHook struct { networks structs.Networks ports structs.AllocatedPorts taskEnvBuilder *taskenv.Builder + delay time.Duration // Since Update() may be called concurrently with any other hook all // hook methods must be fully serialized @@ -54,6 +56,7 @@ type groupServiceHookConfig struct { restarter agentconsul.WorkloadRestarter taskEnvBuilder *taskenv.Builder networkStatusGetter networkStatusGetter + shutdownDelayCtx context.Context logger log.Logger } @@ -76,6 +79,7 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook { networkStatusGetter: cfg.networkStatusGetter, logger: cfg.logger.Named(groupServiceHookName), services: cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup).Services, + shutdownDelayCtx: cfg.shutdownDelayCtx, } if cfg.alloc.AllocatedResources != nil { @@ -187,9 +191,12 @@ func (h *groupServiceHook) preKillLocked() { h.logger.Debug("delay before killing tasks", "group", h.group, "shutdown_delay", h.delay) - // Wait for specified shutdown_delay + select { + // Wait for specified shutdown_delay unless ignored // This will block an agent from shutting down. - <-time.After(h.delay) + case <-time.After(h.delay): + case <-h.shutdownDelayCtx.Done(): + } } func (h *groupServiceHook) Postrun() error { diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 68df0827f60..dba2f092f31 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -112,6 +112,11 @@ type TaskRunner struct { killErr error killErrLock sync.Mutex + // shutdownDelayCtx is a context from the alloc runner which will + // tell us to exit early from shutdown_delay + shutdownDelayCtx context.Context + shutdownDelayCancelFn context.CancelFunc + // Logger is the logger for the task runner. logger log.Logger @@ -287,6 +292,13 @@ type Config struct { // startConditionMetCtx is done when TR should start the task StartConditionMetCtx <-chan struct{} + + // ShutdownDelayCtx is a context from the alloc runner which will + // tell us to exit early from shutdown_delay + ShutdownDelayCtx context.Context + + // ShutdownDelayCancelFn should only be used in testing. + ShutdownDelayCancelFn context.CancelFunc } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -342,6 +354,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { maxEvents: defaultMaxEvents, serversContactedCh: config.ServersContactedCh, startConditionMetCtx: config.StartConditionMetCtx, + shutdownDelayCtx: config.ShutdownDelayCtx, + shutdownDelayCancelFn: config.ShutdownDelayCancelFn, } // Create the logger based on the allocation ID @@ -895,6 +909,8 @@ func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.E select { case result := <-resultCh: return result + case <-tr.shutdownDelayCtx.Done(): + break case <-time.After(delay): } } @@ -1478,3 +1494,9 @@ func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) { func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) { tr.allocHookResources = res } + +// shutdownDelayCancel is used for testing only and cancels the +// shutdownDelayCtx +func (tr *TaskRunner) shutdownDelayCancel() { + tr.shutdownDelayCancelFn() +} diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index f3ef206cfd6..9b60458b4b0 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -14,6 +14,10 @@ import ( "time" "github.com/golang/snappy" + "github.com/kr/pretty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" @@ -26,6 +30,7 @@ import ( agentconsul "github.com/hashicorp/nomad/command/agent/consul" mockdriver "github.com/hashicorp/nomad/drivers/mock" "github.com/hashicorp/nomad/drivers/rawexec" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -33,9 +38,6 @@ import ( "github.com/hashicorp/nomad/plugins/device" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/testutil" - "github.com/kr/pretty" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) type MockTaskStateUpdater struct { @@ -94,26 +96,30 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri cleanup() } + shutdownDelayCtx, shutdownDelayCancelFn := context.WithCancel(context.Background()) + // Create a closed channel to mock TaskHookCoordinator.startConditionForTask. // Closed channel indicates this task is not blocked on prestart hooks. closedCh := make(chan struct{}) close(closedCh) conf := &Config{ - Alloc: alloc, - ClientConfig: clientConf, - Task: thisTask, - TaskDir: taskDir, - Logger: clientConf.Logger, - Consul: consulapi.NewMockConsulServiceClient(t, logger), - ConsulSI: consulapi.NewMockServiceIdentitiesClient(), - Vault: vaultclient.NewMockVaultClient(), - StateDB: cstate.NoopDB{}, - StateUpdater: NewMockTaskStateUpdater(), - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), - ServersContactedCh: make(chan struct{}), - StartConditionMetCtx: closedCh, + Alloc: alloc, + ClientConfig: clientConf, + Task: thisTask, + TaskDir: taskDir, + Logger: clientConf.Logger, + Consul: consulapi.NewMockConsulServiceClient(t, logger), + ConsulSI: consulapi.NewMockServiceIdentitiesClient(), + Vault: vaultclient.NewMockVaultClient(), + StateDB: cstate.NoopDB{}, + StateUpdater: NewMockTaskStateUpdater(), + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + ServersContactedCh: make(chan struct{}), + StartConditionMetCtx: closedCh, + ShutdownDelayCtx: shutdownDelayCtx, + ShutdownDelayCancelFn: shutdownDelayCancelFn, } return conf, trCleanup } @@ -996,6 +1002,82 @@ WAIT: } } +// TestTaskRunner_NoShutdownDelay asserts services are removed from +// Consul and tasks are killed without waiting for ${shutdown_delay} +// when the alloc has the NoShutdownDelay transition flag set. +func TestTaskRunner_NoShutdownDelay(t *testing.T) { + t.Parallel() + + // don't set this too high so that we don't block the test runner + // on shutting down the agent if the test fails + maxTestDuration := time.Duration(testutil.TestMultiplier()*10) * time.Second + maxTimeToFailDuration := time.Duration(testutil.TestMultiplier()) * time.Second + + alloc := mock.Alloc() + alloc.DesiredTransition = structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)} + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Services[0].Tags = []string{"tag1"} + task.Services = task.Services[:1] // only need 1 for this test + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "1000s", + } + task.ShutdownDelay = maxTestDuration + + tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) + defer cleanup() + + mockConsul := conf.Consul.(*consulapi.MockConsulServiceClient) + + testWaitForTaskToStart(t, tr) + + testutil.WaitForResult(func() (bool, error) { + ops := mockConsul.GetOps() + if n := len(ops); n != 1 { + return false, fmt.Errorf("expected 1 consul operation. Found %d", n) + } + return ops[0].Op == "add", fmt.Errorf("consul operation was not a registration: %#v", ops[0]) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + testCtx, cancel := context.WithTimeout(context.Background(), maxTimeToFailDuration) + defer cancel() + + killed := make(chan error) + go func() { + tr.shutdownDelayCancel() + err := tr.Kill(testCtx, structs.NewTaskEvent("test")) + killed <- err + }() + + // Wait for first de-registration call. Note that unlike + // TestTaskRunner_ShutdownDelay, we're racing with task exit + // and can't assert that we only get the first deregistration op + // (from serviceHook.PreKill). + testutil.WaitForResult(func() (bool, error) { + ops := mockConsul.GetOps() + if n := len(ops); n < 2 { + return false, fmt.Errorf("expected at least 2 consul operations.") + } + return ops[1].Op == "remove", fmt.Errorf( + "consul operation was not a deregistration: %#v", ops[1]) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Wait for the task to exit + select { + case <-tr.WaitCh(): + case <-time.After(maxTimeToFailDuration): + t.Fatalf("task kill did not ignore shutdown delay") + return + } + + err := <-killed + require.NoError(t, err, "killing task returned unexpected error") +} + // TestTaskRunner_Dispatch_Payload asserts that a dispatch job runs and the // payload was written to disk. func TestTaskRunner_Dispatch_Payload(t *testing.T) { diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index d1a7e210c5d..f6f724001ba 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -138,8 +138,18 @@ func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *ht return nil, CodedError(405, ErrInvalidMethod) } + noShutdownDelay := false + if noShutdownDelayQS := req.URL.Query().Get("no_shutdown_delay"); noShutdownDelayQS != "" { + var err error + noShutdownDelay, err = strconv.ParseBool(noShutdownDelayQS) + if err != nil { + return nil, fmt.Errorf("no_shutdown_delay value is not a boolean: %v", err) + } + } + sr := &structs.AllocStopRequest{ - AllocID: allocID, + AllocID: allocID, + NoShutdownDelay: noShutdownDelay, } s.parseWriteRequest(req, &sr.WriteRequest) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 8a9da76feba..1ff8a7bde07 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -453,6 +453,18 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, return nil, err } + // Identify the no_shutdown_delay query param and parse. + noShutdownDelayStr := req.URL.Query().Get("no_shutdown_delay") + var noShutdownDelay bool + if noShutdownDelayStr != "" { + var err error + noShutdownDelay, err = strconv.ParseBool(noShutdownDelayStr) + if err != nil { + return nil, fmt.Errorf("Failed to parse value of %qq (%v) as a bool: %v", "no_shutdown_delay", noShutdownDelayStr, err) + } + } + args.NoShutdownDelay = noShutdownDelay + // Validate the evaluation priority if the user supplied a non-default // value. It's more efficient to do it here, within the agent rather than // sending a bad request for the server to reject. diff --git a/command/alloc_stop.go b/command/alloc_stop.go index 3c8f9cbd93c..3b1d10218ab 100644 --- a/command/alloc_stop.go +++ b/command/alloc_stop.go @@ -38,6 +38,12 @@ Stop Specific Options: screen, which can be used to examine the rescheduling evaluation using the eval-status command. + -no-shutdown-delay + Ignore the the group and task shutdown_delay configuration so there is no + delay between service deregistration and task shutdown. Note that using + this flag will result in failed network connections to the allocation + being stopped. + -verbose Show full information. ` @@ -47,12 +53,13 @@ Stop Specific Options: func (c *AllocStopCommand) Name() string { return "alloc stop" } func (c *AllocStopCommand) Run(args []string) int { - var detach, verbose bool + var detach, verbose, noShutdownDelay bool flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") + flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "") if err := flags.Parse(args); err != nil { return 1 @@ -115,7 +122,12 @@ func (c *AllocStopCommand) Run(args []string) int { return 1 } - resp, err := client.Allocations().Stop(alloc, nil) + var opts *api.QueryOptions + if noShutdownDelay { + opts = &api.QueryOptions{Params: map[string]string{"no_shutdown_delay": "true"}} + } + + resp, err := client.Allocations().Stop(alloc, opts) if err != nil { c.Ui.Error(fmt.Sprintf("Error stopping allocation: %s", err)) return 1 diff --git a/command/job_stop.go b/command/job_stop.go index 8dd5d8a1197..f6a51d31d78 100644 --- a/command/job_stop.go +++ b/command/job_stop.go @@ -43,14 +43,20 @@ Stop Options: Override the priority of the evaluations produced as a result of this job deregistration. By default, this is set to the priority of the job. - -purge - Purge is used to stop the job and purge it from the system. If not set, the - job will still be queryable and will be purged by the garbage collector. - -global Stop a multi-region job in all its regions. By default job stop will stop only a single region at a time. Ignored for single-region jobs. + -no-shutdown-delay + Ignore the the group and task shutdown_delay configuration so there is no + delay between service deregistration and task shutdown. Note that using + this flag will result in failed network connections to the allocations + being stopped. + + -purge + Purge is used to stop the job and purge it from the system. If not set, the + job will still be queryable and will be purged by the garbage collector. + -yes Automatic yes to prompts. @@ -67,12 +73,13 @@ func (c *JobStopCommand) Synopsis() string { func (c *JobStopCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ - "-detach": complete.PredictNothing, - "-eval-priority": complete.PredictNothing, - "-purge": complete.PredictNothing, - "-global": complete.PredictNothing, - "-yes": complete.PredictNothing, - "-verbose": complete.PredictNothing, + "-detach": complete.PredictNothing, + "-eval-priority": complete.PredictNothing, + "-purge": complete.PredictNothing, + "-global": complete.PredictNothing, + "-no-shutdown-delay": complete.PredictNothing, + "-yes": complete.PredictNothing, + "-verbose": complete.PredictNothing, }) } @@ -94,7 +101,7 @@ func (c *JobStopCommand) AutocompleteArgs() complete.Predictor { func (c *JobStopCommand) Name() string { return "job stop" } func (c *JobStopCommand) Run(args []string) int { - var detach, purge, verbose, global, autoYes bool + var detach, purge, verbose, global, autoYes, noShutdownDelay bool var evalPriority int flags := c.Meta.FlagSet(c.Name(), FlagSetClient) @@ -102,6 +109,7 @@ func (c *JobStopCommand) Run(args []string) int { flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&global, "global", false, "") + flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "") flags.BoolVar(&autoYes, "yes", false, "") flags.BoolVar(&purge, "purge", false, "") flags.IntVar(&evalPriority, "eval-priority", 0, "") @@ -199,7 +207,7 @@ func (c *JobStopCommand) Run(args []string) int { } // Invoke the stop - opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority} + opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay} wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace} evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq) if err != nil { diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 3a32b5f1964..0b44175adf6 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -320,7 +320,8 @@ func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopRes Evals: []*structs.Evaluation{eval}, Allocs: map[string]*structs.DesiredTransition{ args.AllocID: { - Migrate: helper.BoolToPtr(true), + Migrate: helper.BoolToPtr(true), + NoShutdownDelay: helper.BoolToPtr(args.NoShutdownDelay), }, }, } diff --git a/nomad/fsm.go b/nomad/fsm.go index 84721014560..72757431211 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -605,7 +606,7 @@ func (n *nomadFSM) applyDeregisterJob(msgType structs.MessageType, buf []byte, i } err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { - err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx) + err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, req.NoShutdownDelay, tx) if err != nil { n.logger.Error("deregistering job failed", @@ -645,7 +646,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // evals for jobs whose deregistering didn't get committed yet. err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { for jobNS, options := range req.Jobs { - if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil { + if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, false, tx); err != nil { n.logger.Error("deregistering job failed", "job", jobNS.ID, "error", err) return err } @@ -670,12 +671,31 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // handleJobDeregister is used to deregister a job. Leaves error logging up to // caller. -func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, tx state.Txn) error { +func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, noShutdownDelay bool, tx state.Txn) error { // If it is periodic remove it from the dispatcher if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil { return fmt.Errorf("periodicDispatcher.Remove failed: %w", err) } + if noShutdownDelay { + ws := memdb.NewWatchSet() + allocs, err := n.state.AllocsByJob(ws, namespace, jobID, false) + if err != nil { + return err + } + transition := &structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)} + for _, alloc := range allocs { + err := n.state.UpdateAllocDesiredTransitionTxn(tx, index, alloc.ID, transition) + if err != nil { + return err + } + err = tx.Insert("index", &state.IndexEntry{Key: "allocs", Value: index}) + if err != nil { + return fmt.Errorf("index update failed: %v", err) + } + } + } + if purge { if err := n.state.DeleteJobTxn(index, namespace, jobID, tx); err != nil { return fmt.Errorf("DeleteJob failed: %w", err) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index a3e12332f31..92351c86cee 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -3737,6 +3737,97 @@ func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) { }) } +func TestJobEndpoint_Deregister_NoShutdownDelay(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register requests + job := mock.Job() + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp0 structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp0)) + + // Deregister but don't purge + dereg1 := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp1 structs.JobDeregisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg1, &resp1)) + require.NotZero(resp1.Index) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(err) + require.NotNil(out) + require.True(out.Stop) + + // Lookup the evaluation + eval, err := state.EvalByID(nil, resp1.EvalID) + require.NoError(err) + require.NotNil(eval) + require.EqualValues(resp1.EvalCreateIndex, eval.CreateIndex) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + + // Lookup allocation transitions + var ws memdb.WatchSet + allocs, err := state.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(err) + + for _, alloc := range allocs { + require.Nil(alloc.DesiredTransition) + } + + // Deregister with no shutdown delay + dereg2 := &structs.JobDeregisterRequest{ + JobID: job.ID, + NoShutdownDelay: true, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg2, &resp2)) + require.NotZero(resp2.Index) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp2.EvalID) + require.NoError(err) + require.NotNil(eval) + require.EqualValues(resp2.EvalCreateIndex, eval.CreateIndex) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + + // Lookup allocation transitions + allocs, err = state.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(err) + + for _, alloc := range allocs { + require.NotNil(alloc.DesiredTransition) + require.True(*(alloc.DesiredTransition.NoShutdownDelay)) + } + +} + func TestJobEndpoint_BatchDeregister(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3d08fdca768..81cbd7c633c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1600,7 +1600,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b } if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil { - return fmt.Errorf("unable to update job scaling policies: %v", err) + return fmt.Errorf("unable to update job csi plugins: %v", err) } // Insert the job @@ -3371,7 +3371,7 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, // Handle each of the updated allocations for id, transition := range allocs { - if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transition); err != nil { + if err := s.UpdateAllocDesiredTransitionTxn(txn, index, id, transition); err != nil { return err } } @@ -3390,9 +3390,9 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, return txn.Commit() } -// nestedUpdateAllocDesiredTransition is used to nest an update of an +// UpdateAllocDesiredTransitionTxn is used to nest an update of an // allocations desired transition -func (s *StateStore) nestedUpdateAllocDesiredTransition( +func (s *StateStore) UpdateAllocDesiredTransitionTxn( txn *txn, index uint64, allocID string, transition *structs.DesiredTransition) error { @@ -3414,8 +3414,9 @@ func (s *StateStore) nestedUpdateAllocDesiredTransition( // Merge the desired transitions copyAlloc.DesiredTransition.Merge(transition) - // Update the modify index + // Update the modify indexes copyAlloc.ModifyIndex = index + copyAlloc.AllocModifyIndex = index // Update the allocation if err := txn.Insert("allocs", copyAlloc); err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 50f9f640b7c..fa77f017d47 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -621,6 +621,11 @@ type JobDeregisterRequest struct { // in busy clusters with a large evaluation backlog. EvalPriority int + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and ignore the delay for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay bool + // Eval is the evaluation to create that's associated with job deregister Eval *Evaluation @@ -934,7 +939,8 @@ type AllocUpdateDesiredTransitionRequest struct { // AllocStopRequest is used to stop and reschedule a running Allocation. type AllocStopRequest struct { - AllocID string + AllocID string + NoShutdownDelay bool WriteRequest } @@ -9119,6 +9125,11 @@ type DesiredTransition struct { // This field is only used when operators want to force a placement even if // a failed allocation is not eligible to be rescheduled ForceReschedule *bool + + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and ignore the delay for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay *bool } // Merge merges the two desired transitions, preferring the values from the @@ -9135,6 +9146,10 @@ func (d *DesiredTransition) Merge(o *DesiredTransition) { if o.ForceReschedule != nil { d.ForceReschedule = o.ForceReschedule } + + if o.NoShutdownDelay != nil { + d.NoShutdownDelay = o.NoShutdownDelay + } } // ShouldMigrate returns whether the transition object dictates a migration. @@ -9157,6 +9172,15 @@ func (d *DesiredTransition) ShouldForceReschedule() bool { return d.ForceReschedule != nil && *d.ForceReschedule } +// ShouldIgnoreShutdownDelay returns whether the transition object dictates +// that shutdown skip any shutdown delays. +func (d *DesiredTransition) ShouldIgnoreShutdownDelay() bool { + if d == nil { + return false + } + return d.NoShutdownDelay != nil && *d.NoShutdownDelay +} + const ( AllocDesiredStatusRun = "run" // Allocation should run AllocDesiredStatusStop = "stop" // Allocation should stop diff --git a/website/content/docs/commands/alloc/stop.mdx b/website/content/docs/commands/alloc/stop.mdx index b82195172a4..03e78e04d58 100644 --- a/website/content/docs/commands/alloc/stop.mdx +++ b/website/content/docs/commands/alloc/stop.mdx @@ -42,6 +42,12 @@ allocation's namespace. - `-verbose`: Display verbose output. +- `-no-shutdown-delay` + Ignore the the group and task [`shutdown_delay`] configuration so + there is no delay between service deregistration and task + shutdown. Note that using this flag will result in failed network + connections to the allocation being stopped. + ## Examples ```shell-session @@ -58,3 +64,4 @@ $ nomad alloc stop -detach eb17e557 ``` [eval status]: /docs/commands/eval-status +[`shutdown_delay`]: /docs/job-specification/group#shutdown_delay diff --git a/website/content/docs/commands/job/stop.mdx b/website/content/docs/commands/job/stop.mdx index 004520b978e..e231935d03b 100644 --- a/website/content/docs/commands/job/stop.mdx +++ b/website/content/docs/commands/job/stop.mdx @@ -55,6 +55,12 @@ When ACLs are enabled, this command requires a token with the `submit-job`, Stop a [multi-region] job in all its regions. By default, `job stop` will stop only a single region at a time. Ignored for single-region jobs. +- `-no-shutdown-delay` + Ignore the the group and task [`shutdown_delay`] configuration so + there is no delay between service deregistration and task + shutdown. Note that using this flag will result in failed network + connections to the allocations being stopped. + ## Examples Stop the job with ID "job1": @@ -75,3 +81,4 @@ $ nomad job stop -detach job1 [eval status]: /docs/commands/eval-status [multi-region]: /docs/job-specification/multiregion +[`shutdown_delay`]: /docs/job-specification/group#shutdown_delay From 824d40581d57211c25603eea3ecc08aa16d6f805 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 13 Dec 2021 13:42:55 -0500 Subject: [PATCH 2/2] Apply suggestions from code review Co-authored-by: Derek Strickland <1111455+DerekStrickland@users.noreply.github.com> --- command/job_stop.go | 2 +- website/content/docs/commands/alloc/stop.mdx | 2 +- website/content/docs/commands/job/stop.mdx | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/command/job_stop.go b/command/job_stop.go index f6a51d31d78..1df4d0610ac 100644 --- a/command/job_stop.go +++ b/command/job_stop.go @@ -48,7 +48,7 @@ Stop Options: only a single region at a time. Ignored for single-region jobs. -no-shutdown-delay - Ignore the the group and task shutdown_delay configuration so there is no + Ignore the the group and task shutdown_delay configuration so that there is no delay between service deregistration and task shutdown. Note that using this flag will result in failed network connections to the allocations being stopped. diff --git a/website/content/docs/commands/alloc/stop.mdx b/website/content/docs/commands/alloc/stop.mdx index 03e78e04d58..060d47ad4d3 100644 --- a/website/content/docs/commands/alloc/stop.mdx +++ b/website/content/docs/commands/alloc/stop.mdx @@ -43,7 +43,7 @@ allocation's namespace. - `-verbose`: Display verbose output. - `-no-shutdown-delay` - Ignore the the group and task [`shutdown_delay`] configuration so + Ignore the the group and task [`shutdown_delay`] configuration so that there is no delay between service deregistration and task shutdown. Note that using this flag will result in failed network connections to the allocation being stopped. diff --git a/website/content/docs/commands/job/stop.mdx b/website/content/docs/commands/job/stop.mdx index e231935d03b..134715ffe65 100644 --- a/website/content/docs/commands/job/stop.mdx +++ b/website/content/docs/commands/job/stop.mdx @@ -56,7 +56,7 @@ When ACLs are enabled, this command requires a token with the `submit-job`, stop only a single region at a time. Ignored for single-region jobs. - `-no-shutdown-delay` - Ignore the the group and task [`shutdown_delay`] configuration so + Ignore the the group and task [`shutdown_delay`] configuration so that there is no delay between service deregistration and task shutdown. Note that using this flag will result in failed network connections to the allocations being stopped.