diff --git a/.changelog/11596.txt b/.changelog/11596.txt new file mode 100644 index 00000000000..74b451c0278 --- /dev/null +++ b/.changelog/11596.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: provide `-no-shutdown-delay` option to `job stop` and `alloc stop` commands to ignore `shutdown_delay` +``` diff --git a/api/jobs.go b/api/jobs.go index 146f65cf713..61d0c2892ce 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -299,6 +299,11 @@ type DeregisterOptions struct { // is useful when an operator wishes to push through a job deregistration // in busy clusters with a large evaluation backlog. EvalPriority int + + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and ignore the delay for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay bool } // DeregisterOpts is used to remove an existing job. See DeregisterOptions @@ -312,8 +317,8 @@ func (j *Jobs) DeregisterOpts(jobID string, opts *DeregisterOptions, q *WriteOpt // Protect against nil opts. url.Values expects a string, and so using // fmt.Sprintf is the best way to do this. if opts != nil { - endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v", - opts.Purge, opts.Global, opts.EvalPriority) + endpoint += fmt.Sprintf("?purge=%t&global=%t&eval_priority=%v&no_shutdown_delay=%t", + opts.Purge, opts.Global, opts.EvalPriority, opts.NoShutdownDelay) } wm, err := j.client.delete(endpoint, &resp, q) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 7dcf072a948..c846b091693 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -171,6 +171,9 @@ type allocRunner struct { taskHookCoordinator *taskHookCoordinator + shutdownDelayCtx context.Context + shutdownDelayCancelFn context.CancelFunc + // rpcClient is the RPC Client that should be used by the allocrunner and its // hooks to communicate with Nomad Servers. rpcClient RPCer @@ -230,6 +233,10 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { ar.taskHookCoordinator = newTaskHookCoordinator(ar.logger, tg.Tasks) + shutdownDelayCtx, shutdownDelayCancel := context.WithCancel(context.Background()) + ar.shutdownDelayCtx = shutdownDelayCtx + ar.shutdownDelayCancelFn = shutdownDelayCancel + // Initialize the runners hooks. if err := ar.initRunnerHooks(config.ClientConfig); err != nil { return nil, err @@ -265,6 +272,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { DriverManager: ar.driverManager, ServersContactedCh: ar.serversContactedCh, StartConditionMetCtx: ar.taskHookCoordinator.startConditionForTask(task), + ShutdownDelayCtx: ar.shutdownDelayCtx, } if ar.cpusetManager != nil { @@ -824,6 +832,10 @@ func (ar *allocRunner) Update(update *structs.Allocation) { default: } + if update.DesiredTransition.ShouldIgnoreShutdownDelay() { + ar.shutdownDelayCancelFn() + } + // Queue the new update ar.allocUpdatedCh <- update } diff --git a/client/allocrunner/alloc_runner_hooks.go b/client/allocrunner/alloc_runner_hooks.go index 9624e633c73..8b79da4d5cf 100644 --- a/client/allocrunner/alloc_runner_hooks.go +++ b/client/allocrunner/alloc_runner_hooks.go @@ -159,6 +159,7 @@ func (ar *allocRunner) initRunnerHooks(config *clientconfig.Config) error { taskEnvBuilder: envBuilder, networkStatusGetter: ar, logger: hookLogger, + shutdownDelayCtx: ar.shutdownDelayCtx, }), newConsulGRPCSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), newConsulHTTPSocketHook(hookLogger, alloc, ar.allocDir, config.ConsulConfig), diff --git a/client/allocrunner/groupservice_hook.go b/client/allocrunner/groupservice_hook.go index 778109e65f4..69eae41e8bf 100644 --- a/client/allocrunner/groupservice_hook.go +++ b/client/allocrunner/groupservice_hook.go @@ -1,6 +1,7 @@ package allocrunner import ( + "context" "sync" "time" @@ -29,9 +30,9 @@ type groupServiceHook struct { consulClient consul.ConsulServiceAPI consulNamespace string prerun bool - delay time.Duration deregistered bool networkStatusGetter networkStatusGetter + shutdownDelayCtx context.Context logger log.Logger @@ -41,6 +42,7 @@ type groupServiceHook struct { networks structs.Networks ports structs.AllocatedPorts taskEnvBuilder *taskenv.Builder + delay time.Duration // Since Update() may be called concurrently with any other hook all // hook methods must be fully serialized @@ -54,6 +56,7 @@ type groupServiceHookConfig struct { restarter agentconsul.WorkloadRestarter taskEnvBuilder *taskenv.Builder networkStatusGetter networkStatusGetter + shutdownDelayCtx context.Context logger log.Logger } @@ -76,6 +79,7 @@ func newGroupServiceHook(cfg groupServiceHookConfig) *groupServiceHook { networkStatusGetter: cfg.networkStatusGetter, logger: cfg.logger.Named(groupServiceHookName), services: cfg.alloc.Job.LookupTaskGroup(cfg.alloc.TaskGroup).Services, + shutdownDelayCtx: cfg.shutdownDelayCtx, } if cfg.alloc.AllocatedResources != nil { @@ -187,9 +191,12 @@ func (h *groupServiceHook) preKillLocked() { h.logger.Debug("delay before killing tasks", "group", h.group, "shutdown_delay", h.delay) - // Wait for specified shutdown_delay + select { + // Wait for specified shutdown_delay unless ignored // This will block an agent from shutting down. - <-time.After(h.delay) + case <-time.After(h.delay): + case <-h.shutdownDelayCtx.Done(): + } } func (h *groupServiceHook) Postrun() error { diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index 68df0827f60..dba2f092f31 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -112,6 +112,11 @@ type TaskRunner struct { killErr error killErrLock sync.Mutex + // shutdownDelayCtx is a context from the alloc runner which will + // tell us to exit early from shutdown_delay + shutdownDelayCtx context.Context + shutdownDelayCancelFn context.CancelFunc + // Logger is the logger for the task runner. logger log.Logger @@ -287,6 +292,13 @@ type Config struct { // startConditionMetCtx is done when TR should start the task StartConditionMetCtx <-chan struct{} + + // ShutdownDelayCtx is a context from the alloc runner which will + // tell us to exit early from shutdown_delay + ShutdownDelayCtx context.Context + + // ShutdownDelayCancelFn should only be used in testing. + ShutdownDelayCancelFn context.CancelFunc } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -342,6 +354,8 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { maxEvents: defaultMaxEvents, serversContactedCh: config.ServersContactedCh, startConditionMetCtx: config.StartConditionMetCtx, + shutdownDelayCtx: config.ShutdownDelayCtx, + shutdownDelayCancelFn: config.ShutdownDelayCancelFn, } // Create the logger based on the allocation ID @@ -895,6 +909,8 @@ func (tr *TaskRunner) handleKill(resultCh <-chan *drivers.ExitResult) *drivers.E select { case result := <-resultCh: return result + case <-tr.shutdownDelayCtx.Done(): + break case <-time.After(delay): } } @@ -1478,3 +1494,9 @@ func (tr *TaskRunner) DriverCapabilities() (*drivers.Capabilities, error) { func (tr *TaskRunner) SetAllocHookResources(res *cstructs.AllocHookResources) { tr.allocHookResources = res } + +// shutdownDelayCancel is used for testing only and cancels the +// shutdownDelayCtx +func (tr *TaskRunner) shutdownDelayCancel() { + tr.shutdownDelayCancelFn() +} diff --git a/client/allocrunner/taskrunner/task_runner_test.go b/client/allocrunner/taskrunner/task_runner_test.go index f3ef206cfd6..9b60458b4b0 100644 --- a/client/allocrunner/taskrunner/task_runner_test.go +++ b/client/allocrunner/taskrunner/task_runner_test.go @@ -14,6 +14,10 @@ import ( "time" "github.com/golang/snappy" + "github.com/kr/pretty" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/allocrunner/interfaces" "github.com/hashicorp/nomad/client/config" @@ -26,6 +30,7 @@ import ( agentconsul "github.com/hashicorp/nomad/command/agent/consul" mockdriver "github.com/hashicorp/nomad/drivers/mock" "github.com/hashicorp/nomad/drivers/rawexec" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -33,9 +38,6 @@ import ( "github.com/hashicorp/nomad/plugins/device" "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/testutil" - "github.com/kr/pretty" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) type MockTaskStateUpdater struct { @@ -94,26 +96,30 @@ func testTaskRunnerConfig(t *testing.T, alloc *structs.Allocation, taskName stri cleanup() } + shutdownDelayCtx, shutdownDelayCancelFn := context.WithCancel(context.Background()) + // Create a closed channel to mock TaskHookCoordinator.startConditionForTask. // Closed channel indicates this task is not blocked on prestart hooks. closedCh := make(chan struct{}) close(closedCh) conf := &Config{ - Alloc: alloc, - ClientConfig: clientConf, - Task: thisTask, - TaskDir: taskDir, - Logger: clientConf.Logger, - Consul: consulapi.NewMockConsulServiceClient(t, logger), - ConsulSI: consulapi.NewMockServiceIdentitiesClient(), - Vault: vaultclient.NewMockVaultClient(), - StateDB: cstate.NoopDB{}, - StateUpdater: NewMockTaskStateUpdater(), - DeviceManager: devicemanager.NoopMockManager(), - DriverManager: drivermanager.TestDriverManager(t), - ServersContactedCh: make(chan struct{}), - StartConditionMetCtx: closedCh, + Alloc: alloc, + ClientConfig: clientConf, + Task: thisTask, + TaskDir: taskDir, + Logger: clientConf.Logger, + Consul: consulapi.NewMockConsulServiceClient(t, logger), + ConsulSI: consulapi.NewMockServiceIdentitiesClient(), + Vault: vaultclient.NewMockVaultClient(), + StateDB: cstate.NoopDB{}, + StateUpdater: NewMockTaskStateUpdater(), + DeviceManager: devicemanager.NoopMockManager(), + DriverManager: drivermanager.TestDriverManager(t), + ServersContactedCh: make(chan struct{}), + StartConditionMetCtx: closedCh, + ShutdownDelayCtx: shutdownDelayCtx, + ShutdownDelayCancelFn: shutdownDelayCancelFn, } return conf, trCleanup } @@ -996,6 +1002,82 @@ WAIT: } } +// TestTaskRunner_NoShutdownDelay asserts services are removed from +// Consul and tasks are killed without waiting for ${shutdown_delay} +// when the alloc has the NoShutdownDelay transition flag set. +func TestTaskRunner_NoShutdownDelay(t *testing.T) { + t.Parallel() + + // don't set this too high so that we don't block the test runner + // on shutting down the agent if the test fails + maxTestDuration := time.Duration(testutil.TestMultiplier()*10) * time.Second + maxTimeToFailDuration := time.Duration(testutil.TestMultiplier()) * time.Second + + alloc := mock.Alloc() + alloc.DesiredTransition = structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)} + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Services[0].Tags = []string{"tag1"} + task.Services = task.Services[:1] // only need 1 for this test + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "run_for": "1000s", + } + task.ShutdownDelay = maxTestDuration + + tr, conf, cleanup := runTestTaskRunner(t, alloc, task.Name) + defer cleanup() + + mockConsul := conf.Consul.(*consulapi.MockConsulServiceClient) + + testWaitForTaskToStart(t, tr) + + testutil.WaitForResult(func() (bool, error) { + ops := mockConsul.GetOps() + if n := len(ops); n != 1 { + return false, fmt.Errorf("expected 1 consul operation. Found %d", n) + } + return ops[0].Op == "add", fmt.Errorf("consul operation was not a registration: %#v", ops[0]) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + testCtx, cancel := context.WithTimeout(context.Background(), maxTimeToFailDuration) + defer cancel() + + killed := make(chan error) + go func() { + tr.shutdownDelayCancel() + err := tr.Kill(testCtx, structs.NewTaskEvent("test")) + killed <- err + }() + + // Wait for first de-registration call. Note that unlike + // TestTaskRunner_ShutdownDelay, we're racing with task exit + // and can't assert that we only get the first deregistration op + // (from serviceHook.PreKill). + testutil.WaitForResult(func() (bool, error) { + ops := mockConsul.GetOps() + if n := len(ops); n < 2 { + return false, fmt.Errorf("expected at least 2 consul operations.") + } + return ops[1].Op == "remove", fmt.Errorf( + "consul operation was not a deregistration: %#v", ops[1]) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Wait for the task to exit + select { + case <-tr.WaitCh(): + case <-time.After(maxTimeToFailDuration): + t.Fatalf("task kill did not ignore shutdown delay") + return + } + + err := <-killed + require.NoError(t, err, "killing task returned unexpected error") +} + // TestTaskRunner_Dispatch_Payload asserts that a dispatch job runs and the // payload was written to disk. func TestTaskRunner_Dispatch_Payload(t *testing.T) { diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index d1a7e210c5d..f6f724001ba 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -138,8 +138,18 @@ func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *ht return nil, CodedError(405, ErrInvalidMethod) } + noShutdownDelay := false + if noShutdownDelayQS := req.URL.Query().Get("no_shutdown_delay"); noShutdownDelayQS != "" { + var err error + noShutdownDelay, err = strconv.ParseBool(noShutdownDelayQS) + if err != nil { + return nil, fmt.Errorf("no_shutdown_delay value is not a boolean: %v", err) + } + } + sr := &structs.AllocStopRequest{ - AllocID: allocID, + AllocID: allocID, + NoShutdownDelay: noShutdownDelay, } s.parseWriteRequest(req, &sr.WriteRequest) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 8a9da76feba..1ff8a7bde07 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -453,6 +453,18 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, return nil, err } + // Identify the no_shutdown_delay query param and parse. + noShutdownDelayStr := req.URL.Query().Get("no_shutdown_delay") + var noShutdownDelay bool + if noShutdownDelayStr != "" { + var err error + noShutdownDelay, err = strconv.ParseBool(noShutdownDelayStr) + if err != nil { + return nil, fmt.Errorf("Failed to parse value of %qq (%v) as a bool: %v", "no_shutdown_delay", noShutdownDelayStr, err) + } + } + args.NoShutdownDelay = noShutdownDelay + // Validate the evaluation priority if the user supplied a non-default // value. It's more efficient to do it here, within the agent rather than // sending a bad request for the server to reject. diff --git a/command/alloc_stop.go b/command/alloc_stop.go index 3c8f9cbd93c..3b1d10218ab 100644 --- a/command/alloc_stop.go +++ b/command/alloc_stop.go @@ -38,6 +38,12 @@ Stop Specific Options: screen, which can be used to examine the rescheduling evaluation using the eval-status command. + -no-shutdown-delay + Ignore the the group and task shutdown_delay configuration so there is no + delay between service deregistration and task shutdown. Note that using + this flag will result in failed network connections to the allocation + being stopped. + -verbose Show full information. ` @@ -47,12 +53,13 @@ Stop Specific Options: func (c *AllocStopCommand) Name() string { return "alloc stop" } func (c *AllocStopCommand) Run(args []string) int { - var detach, verbose bool + var detach, verbose, noShutdownDelay bool flags := c.Meta.FlagSet(c.Name(), FlagSetClient) flags.Usage = func() { c.Ui.Output(c.Help()) } flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") + flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "") if err := flags.Parse(args); err != nil { return 1 @@ -115,7 +122,12 @@ func (c *AllocStopCommand) Run(args []string) int { return 1 } - resp, err := client.Allocations().Stop(alloc, nil) + var opts *api.QueryOptions + if noShutdownDelay { + opts = &api.QueryOptions{Params: map[string]string{"no_shutdown_delay": "true"}} + } + + resp, err := client.Allocations().Stop(alloc, opts) if err != nil { c.Ui.Error(fmt.Sprintf("Error stopping allocation: %s", err)) return 1 diff --git a/command/job_stop.go b/command/job_stop.go index 8dd5d8a1197..1df4d0610ac 100644 --- a/command/job_stop.go +++ b/command/job_stop.go @@ -43,14 +43,20 @@ Stop Options: Override the priority of the evaluations produced as a result of this job deregistration. By default, this is set to the priority of the job. - -purge - Purge is used to stop the job and purge it from the system. If not set, the - job will still be queryable and will be purged by the garbage collector. - -global Stop a multi-region job in all its regions. By default job stop will stop only a single region at a time. Ignored for single-region jobs. + -no-shutdown-delay + Ignore the the group and task shutdown_delay configuration so that there is no + delay between service deregistration and task shutdown. Note that using + this flag will result in failed network connections to the allocations + being stopped. + + -purge + Purge is used to stop the job and purge it from the system. If not set, the + job will still be queryable and will be purged by the garbage collector. + -yes Automatic yes to prompts. @@ -67,12 +73,13 @@ func (c *JobStopCommand) Synopsis() string { func (c *JobStopCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ - "-detach": complete.PredictNothing, - "-eval-priority": complete.PredictNothing, - "-purge": complete.PredictNothing, - "-global": complete.PredictNothing, - "-yes": complete.PredictNothing, - "-verbose": complete.PredictNothing, + "-detach": complete.PredictNothing, + "-eval-priority": complete.PredictNothing, + "-purge": complete.PredictNothing, + "-global": complete.PredictNothing, + "-no-shutdown-delay": complete.PredictNothing, + "-yes": complete.PredictNothing, + "-verbose": complete.PredictNothing, }) } @@ -94,7 +101,7 @@ func (c *JobStopCommand) AutocompleteArgs() complete.Predictor { func (c *JobStopCommand) Name() string { return "job stop" } func (c *JobStopCommand) Run(args []string) int { - var detach, purge, verbose, global, autoYes bool + var detach, purge, verbose, global, autoYes, noShutdownDelay bool var evalPriority int flags := c.Meta.FlagSet(c.Name(), FlagSetClient) @@ -102,6 +109,7 @@ func (c *JobStopCommand) Run(args []string) int { flags.BoolVar(&detach, "detach", false, "") flags.BoolVar(&verbose, "verbose", false, "") flags.BoolVar(&global, "global", false, "") + flags.BoolVar(&noShutdownDelay, "no-shutdown-delay", false, "") flags.BoolVar(&autoYes, "yes", false, "") flags.BoolVar(&purge, "purge", false, "") flags.IntVar(&evalPriority, "eval-priority", 0, "") @@ -199,7 +207,7 @@ func (c *JobStopCommand) Run(args []string) int { } // Invoke the stop - opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority} + opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay} wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace} evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq) if err != nil { diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 3a32b5f1964..0b44175adf6 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -320,7 +320,8 @@ func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopRes Evals: []*structs.Evaluation{eval}, Allocs: map[string]*structs.DesiredTransition{ args.AllocID: { - Migrate: helper.BoolToPtr(true), + Migrate: helper.BoolToPtr(true), + NoShutdownDelay: helper.BoolToPtr(args.NoShutdownDelay), }, }, } diff --git a/nomad/fsm.go b/nomad/fsm.go index 84721014560..72757431211 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -11,6 +11,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-msgpack/codec" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -605,7 +606,7 @@ func (n *nomadFSM) applyDeregisterJob(msgType structs.MessageType, buf []byte, i } err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { - err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx) + err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, req.NoShutdownDelay, tx) if err != nil { n.logger.Error("deregistering job failed", @@ -645,7 +646,7 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // evals for jobs whose deregistering didn't get committed yet. err := n.state.WithWriteTransaction(msgType, index, func(tx state.Txn) error { for jobNS, options := range req.Jobs { - if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, tx); err != nil { + if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge, false, tx); err != nil { n.logger.Error("deregistering job failed", "job", jobNS.ID, "error", err) return err } @@ -670,12 +671,31 @@ func (n *nomadFSM) applyBatchDeregisterJob(msgType structs.MessageType, buf []by // handleJobDeregister is used to deregister a job. Leaves error logging up to // caller. -func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, tx state.Txn) error { +func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool, noShutdownDelay bool, tx state.Txn) error { // If it is periodic remove it from the dispatcher if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil { return fmt.Errorf("periodicDispatcher.Remove failed: %w", err) } + if noShutdownDelay { + ws := memdb.NewWatchSet() + allocs, err := n.state.AllocsByJob(ws, namespace, jobID, false) + if err != nil { + return err + } + transition := &structs.DesiredTransition{NoShutdownDelay: helper.BoolToPtr(true)} + for _, alloc := range allocs { + err := n.state.UpdateAllocDesiredTransitionTxn(tx, index, alloc.ID, transition) + if err != nil { + return err + } + err = tx.Insert("index", &state.IndexEntry{Key: "allocs", Value: index}) + if err != nil { + return fmt.Errorf("index update failed: %v", err) + } + } + } + if purge { if err := n.state.DeleteJobTxn(index, namespace, jobID, tx); err != nil { return fmt.Errorf("DeleteJob failed: %w", err) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 9fce33ca7e2..51da5d219c9 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -3831,6 +3831,97 @@ func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) { }) } +func TestJobEndpoint_Deregister_NoShutdownDelay(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register requests + job := mock.Job() + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp0 structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp0)) + + // Deregister but don't purge + dereg1 := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp1 structs.JobDeregisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg1, &resp1)) + require.NotZero(resp1.Index) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(err) + require.NotNil(out) + require.True(out.Stop) + + // Lookup the evaluation + eval, err := state.EvalByID(nil, resp1.EvalID) + require.NoError(err) + require.NotNil(eval) + require.EqualValues(resp1.EvalCreateIndex, eval.CreateIndex) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + + // Lookup allocation transitions + var ws memdb.WatchSet + allocs, err := state.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(err) + + for _, alloc := range allocs { + require.Nil(alloc.DesiredTransition) + } + + // Deregister with no shutdown delay + dereg2 := &structs.JobDeregisterRequest{ + JobID: job.ID, + NoShutdownDelay: true, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg2, &resp2)) + require.NotZero(resp2.Index) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp2.EvalID) + require.NoError(err) + require.NotNil(eval) + require.EqualValues(resp2.EvalCreateIndex, eval.CreateIndex) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + + // Lookup allocation transitions + allocs, err = state.AllocsByJob(ws, job.Namespace, job.ID, true) + require.NoError(err) + + for _, alloc := range allocs { + require.NotNil(alloc.DesiredTransition) + require.True(*(alloc.DesiredTransition.NoShutdownDelay)) + } + +} + func TestJobEndpoint_BatchDeregister(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3d08fdca768..81cbd7c633c 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1600,7 +1600,7 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion b } if err := s.updateJobCSIPlugins(index, job, existingJob, txn); err != nil { - return fmt.Errorf("unable to update job scaling policies: %v", err) + return fmt.Errorf("unable to update job csi plugins: %v", err) } // Insert the job @@ -3371,7 +3371,7 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, // Handle each of the updated allocations for id, transition := range allocs { - if err := s.nestedUpdateAllocDesiredTransition(txn, index, id, transition); err != nil { + if err := s.UpdateAllocDesiredTransitionTxn(txn, index, id, transition); err != nil { return err } } @@ -3390,9 +3390,9 @@ func (s *StateStore) UpdateAllocsDesiredTransitions(msgType structs.MessageType, return txn.Commit() } -// nestedUpdateAllocDesiredTransition is used to nest an update of an +// UpdateAllocDesiredTransitionTxn is used to nest an update of an // allocations desired transition -func (s *StateStore) nestedUpdateAllocDesiredTransition( +func (s *StateStore) UpdateAllocDesiredTransitionTxn( txn *txn, index uint64, allocID string, transition *structs.DesiredTransition) error { @@ -3414,8 +3414,9 @@ func (s *StateStore) nestedUpdateAllocDesiredTransition( // Merge the desired transitions copyAlloc.DesiredTransition.Merge(transition) - // Update the modify index + // Update the modify indexes copyAlloc.ModifyIndex = index + copyAlloc.AllocModifyIndex = index // Update the allocation if err := txn.Insert("allocs", copyAlloc); err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 48c9782289a..742fce06db6 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -628,6 +628,11 @@ type JobDeregisterRequest struct { // in busy clusters with a large evaluation backlog. EvalPriority int + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and ignore the delay for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay bool + // Eval is the evaluation to create that's associated with job deregister Eval *Evaluation @@ -955,7 +960,8 @@ type AllocUpdateDesiredTransitionRequest struct { // AllocStopRequest is used to stop and reschedule a running Allocation. type AllocStopRequest struct { - AllocID string + AllocID string + NoShutdownDelay bool WriteRequest } @@ -9140,6 +9146,11 @@ type DesiredTransition struct { // This field is only used when operators want to force a placement even if // a failed allocation is not eligible to be rescheduled ForceReschedule *bool + + // NoShutdownDelay, if set to true, will override the group and + // task shutdown_delay configuration and ignore the delay for any + // allocations stopped as a result of this Deregister call. + NoShutdownDelay *bool } // Merge merges the two desired transitions, preferring the values from the @@ -9156,6 +9167,10 @@ func (d *DesiredTransition) Merge(o *DesiredTransition) { if o.ForceReschedule != nil { d.ForceReschedule = o.ForceReschedule } + + if o.NoShutdownDelay != nil { + d.NoShutdownDelay = o.NoShutdownDelay + } } // ShouldMigrate returns whether the transition object dictates a migration. @@ -9178,6 +9193,15 @@ func (d *DesiredTransition) ShouldForceReschedule() bool { return d.ForceReschedule != nil && *d.ForceReschedule } +// ShouldIgnoreShutdownDelay returns whether the transition object dictates +// that shutdown skip any shutdown delays. +func (d *DesiredTransition) ShouldIgnoreShutdownDelay() bool { + if d == nil { + return false + } + return d.NoShutdownDelay != nil && *d.NoShutdownDelay +} + const ( AllocDesiredStatusRun = "run" // Allocation should run AllocDesiredStatusStop = "stop" // Allocation should stop diff --git a/website/content/docs/commands/alloc/stop.mdx b/website/content/docs/commands/alloc/stop.mdx index b82195172a4..060d47ad4d3 100644 --- a/website/content/docs/commands/alloc/stop.mdx +++ b/website/content/docs/commands/alloc/stop.mdx @@ -42,6 +42,12 @@ allocation's namespace. - `-verbose`: Display verbose output. +- `-no-shutdown-delay` + Ignore the the group and task [`shutdown_delay`] configuration so that + there is no delay between service deregistration and task + shutdown. Note that using this flag will result in failed network + connections to the allocation being stopped. + ## Examples ```shell-session @@ -58,3 +64,4 @@ $ nomad alloc stop -detach eb17e557 ``` [eval status]: /docs/commands/eval-status +[`shutdown_delay`]: /docs/job-specification/group#shutdown_delay diff --git a/website/content/docs/commands/job/stop.mdx b/website/content/docs/commands/job/stop.mdx index 004520b978e..134715ffe65 100644 --- a/website/content/docs/commands/job/stop.mdx +++ b/website/content/docs/commands/job/stop.mdx @@ -55,6 +55,12 @@ When ACLs are enabled, this command requires a token with the `submit-job`, Stop a [multi-region] job in all its regions. By default, `job stop` will stop only a single region at a time. Ignored for single-region jobs. +- `-no-shutdown-delay` + Ignore the the group and task [`shutdown_delay`] configuration so that + there is no delay between service deregistration and task + shutdown. Note that using this flag will result in failed network + connections to the allocations being stopped. + ## Examples Stop the job with ID "job1": @@ -75,3 +81,4 @@ $ nomad job stop -detach job1 [eval status]: /docs/commands/eval-status [multi-region]: /docs/job-specification/multiregion +[`shutdown_delay`]: /docs/job-specification/group#shutdown_delay