diff --git a/.changelog/16278.txt b/.changelog/16278.txt new file mode 100644 index 00000000000..7609e3ae1c0 --- /dev/null +++ b/.changelog/16278.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: Added new `nomad job restart` command to restart all allocations for a job +``` diff --git a/api/tasks.go b/api/tasks.go index ecf35727109..ecc14a8ea88 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -976,6 +976,12 @@ func (t *Task) SetLogConfig(l *LogConfig) *Task { return t } +// SetLifecycle is used to set lifecycle config to a task. +func (t *Task) SetLifecycle(l *TaskLifecycle) *Task { + t.Lifecycle = l + return t +} + // TaskState tracks the current state of a task and events that caused state // transitions. type TaskState struct { diff --git a/command/commands.go b/command/commands.go index 7cb9780062d..b7b041ca327 100644 --- a/command/commands.go +++ b/command/commands.go @@ -415,6 +415,14 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "job restart": func() (cli.Command, error) { + // Use a *cli.ConcurrentUi because this command spawns several + // goroutines that write to the terminal concurrently. + meta.Ui = &cli.ConcurrentUi{Ui: meta.Ui} + return &JobRestartCommand{ + Meta: meta, + }, nil + }, "job deployments": func() (cli.Command, error) { return &JobDeploymentsCommand{ Meta: meta, diff --git a/command/helpers.go b/command/helpers.go index 22ba4dd7f99..1d9de2467db 100644 --- a/command/helpers.go +++ b/command/helpers.go @@ -62,6 +62,13 @@ func limit(s string, length int) string { return s[:length] } +// indentString returns the string s padded with the given number of empty +// spaces before each line except for the first one. +func indentString(s string, pad int) string { + prefix := strings.Repeat(" ", pad) + return strings.Join(strings.Split(s, "\n"), fmt.Sprintf("\n%s", prefix)) +} + // wrapAtLengthWithPadding wraps the given text at the maxLineLength, taking // into account any provided left padding. func wrapAtLengthWithPadding(s string, pad int) string { diff --git a/command/job_restart.go b/command/job_restart.go new file mode 100644 index 00000000000..55084bf7980 --- /dev/null +++ b/command/job_restart.go @@ -0,0 +1,1205 @@ +package command + +import ( + "context" + "errors" + "fmt" + "math" + "os" + "os/signal" + "regexp" + "strconv" + "strings" + "time" + + humanize "github.com/dustin/go-humanize" + "github.com/dustin/go-humanize/english" + multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/go-set" + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/api/contexts" + "github.com/hashicorp/nomad/helper" + "github.com/posener/complete" +) + +const ( + // jobRestartTimestampPrefixLength is the number of characters in the + // "==> [timestamp]: " string that prefixes most of the outputs of this + // command. + jobRestartTimestampPrefixLength = 31 + + // jobRestartBatchWaitAsk is the special token used to indicate that the + // command should ask user for confirmation between batches. + jobRestartBatchWaitAsk = "ask" + + // jobRestartOnErrorFail is the special token used to indicate that the + // command should exit when a batch has errors. + jobRestartOnErrorFail = "fail" + + // jobRestartOnErrorAks is the special token used to indicate that the + // command should ask user for confirmation when a batch has errors. + jobRestartOnErrorAsk = "ask" +) + +var ( + // jobRestartBatchSizeValueRegex validates that the value passed to + // -batch-size is an integer optionally followed by a % sign. + // + // Use ^...$ to make sure we're matching over the entire input to avoid + // partial matches such as 10%20%. + jobRestartBatchSizeValueRegex = regexp.MustCompile(`^(\d+)%?$`) +) + +// ErrJobRestartPlacementFailure is an error that indicates a placement failure +type ErrJobRestartPlacementFailure struct { + EvalID string + TaskGroup string + Failures *api.AllocationMetric +} + +func (e ErrJobRestartPlacementFailure) Error() string { + return fmt.Sprintf("Evaluation %q has placement failures for group %q:\n%s", + e.EvalID, + e.TaskGroup, + formatAllocMetrics(e.Failures, false, strings.Repeat(" ", 4)), + ) +} + +func (e ErrJobRestartPlacementFailure) Is(err error) bool { + _, ok := err.(ErrJobRestartPlacementFailure) + return ok +} + +// JobRestartCommand is the implementation for the command that restarts a job. +type JobRestartCommand struct { + Meta + + // client is the Nomad API client shared by all functions in the command to + // reuse the same connection. + client *api.Client + + // Configuration values read and parsed from command flags and args. + allTasks bool + autoYes bool + batchSize int + batchSizePercent bool + batchWait time.Duration + batchWaitAsk bool + groups *set.Set[string] + jobID string + noShutdownDelay bool + onError string + reschedule bool + tasks *set.Set[string] + verbose bool + length int + + // canceled is set to true when the user gives a negative answer to any of + // the questions. + canceled bool + + // sigsCh is used to subscribe to signals from the operating system. + sigsCh chan os.Signal +} + +func (c *JobRestartCommand) Help() string { + helpText := ` +Usage: nomad job restart [options] + + Restart or reschedule allocations for a particular job. + + Restarting the job calls the 'Restart Allocation' API endpoint to restart the + tasks inside allocations, so the allocations themselves are not modified but + rather restarted in-place. + + Rescheduling the job uses the 'Stop Allocation' API endpoint to stop the + allocations and trigger the Nomad scheduler to compute new placements. This + may cause the new allocations to be scheduled in different clients from the + originals. + + This command can operate in batches and it waits until all restarted or + rescheduled allocations are running again before proceeding to the next + batch. It is also possible to specify additional time to wait between + batches. + + Allocations can be restarted in-place or rescheduled. When restarting + in-place the command may target specific tasks in the allocations, restart + only tasks that are currently running, or restart all tasks, even the ones + that have already run. Allocations can also be targeted by group. When both + groups and tasks are defined only the tasks for the allocations of those + groups are restarted. + + When rescheduling, the current allocations are stopped triggering the Nomad + scheduler to create replacement allocations that may be placed in different + clients. The command waits until the new allocations have client status + 'ready' before proceeding with the remaining batches. Services health checks + are not taken into account. + + By default the command restarts all running tasks in-place with one + allocation per batch. + + When ACLs are enabled, this command requires a token with the + 'alloc-lifecycle' and 'read-job' capabilities for the job's namespace. The + 'list-jobs' capability is required to run the command with a job prefix + instead of the exact job ID. + +General Options: + + ` + generalOptionsUsage(usageOptsDefault) + ` + +Restart Options: + + -all-tasks + If set, all tasks in the allocations are restarted, even the ones that + have already run, such as non-sidecar tasks. Tasks will restart following + their lifecycle order. This option cannot be used with '-task'. + + -batch-size= + Number of allocations to restart at once. It may be defined as a percentage + value of the current number of running allocations. Percentage values are + rounded up to increase parallelism. Defaults to 1. + + -batch-wait= + Time to wait between restart batches. If set to 'ask' the command halts + between batches and waits for user input on how to proceed. If the answer + is a time duration all remaining batches will use this new value. Defaults + to 0. + + -group= + Only restart allocations for the given group. Can be specified multiple + times. If no group is set all allocations for the job are restarted. + + -no-shutdown-delay + Ignore the group and task 'shutdown_delay' configuration so there is no + delay between service deregistration and task shutdown or restart. Note + that using this flag will result in failed network connections to the + allocation being restarted. + + -on-error=<'ask'|'fail'> + Determines what action to take when an error happens during a restart + batch. If 'ask' the command stops and waits for user confirmation on how to + proceed. If 'fail' the command exits immediately. Defaults to 'ask'. + + -reschedule + If set, allocations are stopped and rescheduled instead of restarted + in-place. Since the group is not modified the restart does not create a new + deployment, and so values defined in 'update' blocks, such as + 'max_parallel', are not taken into account. This option cannot be used with + '-task'. + + -task= + Specify the task to restart. Can be specified multiple times. If groups are + also specified the task must exist in at least one of them. If no task is + set only tasks that are currently running are restarted. For example, + non-sidecar tasks that already ran are not restarted unless '-all-tasks' is + used instead. This option cannot be used with '-all-tasks' or + '-reschedule'. + + -yes + Automatic yes to prompts. If set, the command automatically restarts + multi-region jobs only in the region targeted by the command, ignores batch + errors, and automatically proceeds with the remaining batches without + waiting. Use '-on-error' and '-batch-wait' to adjust these behaviors. + + -verbose + Display full information. +` + return strings.TrimSpace(helpText) +} + +func (c *JobRestartCommand) Synopsis() string { + return "Restart or reschedule allocations for a job" +} + +func (c *JobRestartCommand) AutocompleteFlags() complete.Flags { + return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), + complete.Flags{ + "-all-tasks": complete.PredictNothing, + "-batch-size": complete.PredictAnything, + "-batch-wait": complete.PredictAnything, + "-no-shutdown-delay": complete.PredictNothing, + "-on-error": complete.PredictSet(jobRestartOnErrorAsk, jobRestartOnErrorFail), + "-reschedule": complete.PredictNothing, + "-task": complete.PredictAnything, + "-yes": complete.PredictNothing, + "-verbose": complete.PredictNothing, + }) +} + +func (c *JobRestartCommand) AutocompleteArgs() complete.Predictor { + return complete.PredictFunc(func(a complete.Args) []string { + client, err := c.Meta.Client() + if err != nil { + return nil + } + + resp, _, err := client.Search().PrefixSearch(a.Last, contexts.Jobs, nil) + if err != nil { + return []string{} + } + return resp.Matches[contexts.Jobs] + }) +} + +func (c *JobRestartCommand) Name() string { return "job restart" } + +func (c *JobRestartCommand) Run(args []string) int { + // Parse and validate command line arguments. + code, err := c.parseAndValidate(args) + if err != nil { + c.Ui.Error(err.Error()) + c.Ui.Error(commandErrorText(c)) + return code + } + if code != 0 { + return code + } + + c.client, err = c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %v", err)) + return 1 + } + + // Use prefix matching to find job. + job, err := c.JobByPrefix(c.client, c.jobID, nil) + if err != nil { + c.Ui.Error(err.Error()) + return 1 + } + + c.jobID = *job.ID + if job.Namespace != nil { + c.client.SetNamespace(*job.Namespace) + } + + // Confirm that we should restart a multi-region job in a single region. + if job.IsMultiregion() && !c.autoYes && !c.shouldRestartMultiregion() { + c.Ui.Output("\nJob restart canceled.") + return 0 + } + + // Retrieve the job history so we can properly determine if a group or task + // exists in the specific allocation job version. + jobVersions, _, _, err := c.client.Jobs().Versions(c.jobID, false, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error retrieving versions of job %q: %s", c.jobID, err)) + return 1 + } + + // Index jobs by version. + jobVersionIndex := make(map[uint64]*api.Job, len(jobVersions)) + for _, job := range jobVersions { + jobVersionIndex[*job.Version] = job + } + + // Fetch all allocations for the job and filter out the ones that are not + // eligible for restart. + allocStubs, _, err := c.client.Jobs().Allocations(c.jobID, true, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error retrieving allocations for job %q: %v", c.jobID, err)) + return 1 + } + allocStubsWithJob := make([]AllocationListStubWithJob, 0, len(allocStubs)) + for _, stub := range allocStubs { + allocStubsWithJob = append(allocStubsWithJob, AllocationListStubWithJob{ + AllocationListStub: stub, + Job: jobVersionIndex[stub.JobVersion], + }) + } + restartAllocs := c.filterAllocs(allocStubsWithJob) + + // Exit early if there's nothing to do. + if len(restartAllocs) == 0 { + c.Ui.Output("No allocations to restart") + return 0 + } + + // Calculate absolute batch size based on the number of eligible + // allocations. Round values up to increase parallelism. + if c.batchSizePercent { + c.batchSize = int(math.Ceil(float64(len(restartAllocs)*c.batchSize) / 100)) + } + + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Restarting %s[reset]", + formatTime(time.Now()), + english.Plural(len(restartAllocs), "allocation", "allocations"), + ))) + + // Handle SIGINT to prevent accidental cancellations of the long-lived + // restart loop. activeCh is blocked while a signal is being handled to + // prevent new work from starting while the user is deciding if they want + // to cancel the command or not. + activeCh := make(chan any) + c.sigsCh = make(chan os.Signal, 1) + signal.Notify(c.sigsCh, os.Interrupt) + defer signal.Stop(c.sigsCh) + + go c.handleSignal(c.sigsCh, activeCh) + + // restartErr accumulates the errors that happen in each batch. + var restartErr *multierror.Error + + // Restart allocations in batches. + batch := multierror.Group{} + for restartCount, alloc := range restartAllocs { + // Block and wait before each iteration if the command is handling an + // interrupt signal. + <-activeCh + + // Make sure there are not active deployments to prevent the restart + // process from interfering with it. + err := c.ensureNoActiveDeployment() + if err != nil { + restartErr = multierror.Append(restartErr, err) + break + } + + // Print new batch header every time we restart a multiple of the batch + // size which indicates that we're starting a new batch. + // Skip batch header if batch size is one because it's redundant. + if restartCount%c.batchSize == 0 && c.batchSize > 1 { + batchNumber := restartCount/c.batchSize + 1 + remaining := len(restartAllocs) - restartCount + + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Restarting %s batch of %d allocations[reset]", + formatTime(time.Now()), + humanize.Ordinal(batchNumber), + helper.Min(c.batchSize, remaining), + ))) + } + + // Restart allocation. Wrap the callback function to capture the + // allocID loop variable and prevent it from changing inside the + // goroutine at each iteration. + batch.Go(func(allocStubWithJob AllocationListStubWithJob) func() error { + return func() error { + return c.handleAlloc(allocStubWithJob) + } + }(alloc)) + + // Check if we restarted enough allocations to complete a batch or if + // we restarted the last allocation. + batchComplete := (restartCount+1)%c.batchSize == 0 + restartComplete := restartCount+1 == len(restartAllocs) + if batchComplete || restartComplete { + + // Block and wait for the batch to finish. Handle the + // *mutierror.Error response to add the custom formatting and to + // convert it to an error to avoid problems where an empty + // *multierror.Error is not considered a nil error. + var batchErr error + if batchMerr := batch.Wait(); batchMerr != nil { + restartErr = multierror.Append(restartErr, batchMerr) + batchMerr.ErrorFormat = c.errorFormat(jobRestartTimestampPrefixLength) + batchErr = batchMerr.ErrorOrNil() + } + + // Block if the command is handling an interrupt signal. + <-activeCh + + // Exit loop before sleeping or asking for user input if we just + // finished the last batch. + if restartComplete { + break + } + + // Handle errors that happened in this batch. + if batchErr != nil { + // Exit early if -on-error is 'fail'. + if c.onError == jobRestartOnErrorFail { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Stopping job restart due to error[reset]", + formatTime(time.Now()), + ))) + break + } + + // Exit early if -yes but error is not recoverable. + if c.autoYes && !c.isErrorRecoverable(batchErr) { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Stopping job restart due to unrecoverable error[reset]", + formatTime(time.Now()), + ))) + break + } + } + + // Check if we need to ask the user how to proceed. This is needed + // in case -yes is not set and -batch-wait is 'ask' or an error + // happened and -on-error is 'ask'. + askUser := !c.autoYes && (c.batchWaitAsk || c.onError == jobRestartOnErrorAsk && batchErr != nil) + if askUser { + if batchErr != nil { + // Print errors so user can decide what to below. + c.Ui.Warn(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: %s[reset]", formatTime(time.Now()), batchErr, + ))) + } + + // Exit early if user provides a negative answer. + if !c.shouldProceed(batchErr) { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Job restart canceled[reset]", + formatTime(time.Now()), + ))) + c.canceled = true + break + } + } + + // Sleep if -batch-wait is set or if -batch-wait is 'ask' and user + // responded with a new interval above. + if c.batchWait > 0 { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Waiting %s before restarting the next batch[reset]", + formatTime(time.Now()), + c.batchWait, + ))) + time.Sleep(c.batchWait) + } + + // Start a new batch. + batch = multierror.Group{} + } + } + + if restartErr != nil && len(restartErr.Errors) > 0 { + if !c.canceled { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Job restart finished with errors[reset]", + formatTime(time.Now()), + ))) + } + + restartErr.ErrorFormat = c.errorFormat(0) + c.Ui.Error(fmt.Sprintf("\n%s", restartErr)) + return 1 + } + + if !c.canceled { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Job restart finished[reset]", + formatTime(time.Now()), + ))) + + c.Ui.Output("\nJob restarted successfully!") + } + return 0 +} + +// parseAndValidate parses and validates the arguments passed to the command. +// +// This function mutates the command and is not thread-safe so it must be +// called only once and early in the command lifecycle. +func (c *JobRestartCommand) parseAndValidate(args []string) (int, error) { + var batchSizeStr string + var batchWaitStr string + var groups []string + var tasks []string + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&c.allTasks, "all-tasks", false, "") + flags.BoolVar(&c.autoYes, "yes", false, "") + flags.StringVar(&batchSizeStr, "batch-size", "1", "") + flags.StringVar(&batchWaitStr, "batch-wait", "0s", "") + flags.StringVar(&c.onError, "on-error", jobRestartOnErrorAsk, "") + flags.BoolVar(&c.noShutdownDelay, "no-shutdown-delay", false, "") + flags.BoolVar(&c.reschedule, "reschedule", false, "") + flags.BoolVar(&c.verbose, "verbose", false, "") + flags.Var((funcVar)(func(s string) error { + groups = append(groups, s) + return nil + }), "group", "") + flags.Var((funcVar)(func(s string) error { + tasks = append(tasks, s) + return nil + }), "task", "") + + err := flags.Parse(args) + if err != nil { + // Let the flags library handle and print the error message. + return 1, nil + } + + // Truncate IDs unless full length is requested. + c.length = shortId + if c.verbose { + c.length = fullId + } + + // Check that we got exactly one job. + args = flags.Args() + if len(args) != 1 { + return 1, fmt.Errorf("This command takes one argument: ") + } + c.jobID = strings.TrimSpace(args[0]) + + // Parse and validate -batch-size. + matches := jobRestartBatchSizeValueRegex.FindStringSubmatch(batchSizeStr) + if len(matches) != 2 { + return 1, fmt.Errorf( + "Invalid -batch-size value %q: batch size must be an integer or a percentage", + batchSizeStr, + ) + } + + c.batchSizePercent = strings.HasSuffix(batchSizeStr, "%") + c.batchSize, err = strconv.Atoi(matches[1]) + if err != nil { + return 1, fmt.Errorf("Invalid -batch-size value %q: %w", batchSizeStr, err) + } + if c.batchSize == 0 { + return 1, fmt.Errorf( + "Invalid -batch-size value %q: number value must be greater than zero", + batchSizeStr, + ) + } + + // Parse and validate -batch-wait. + if strings.ToLower(batchWaitStr) == jobRestartBatchWaitAsk { + if !isTty() && !c.autoYes { + return 1, fmt.Errorf( + "Invalid -batch-wait value %[1]q: %[1]q cannot be used when terminal is not interactive", + jobRestartBatchWaitAsk, + ) + } + c.batchWaitAsk = true + } else { + c.batchWait, err = time.ParseDuration(batchWaitStr) + if err != nil { + return 1, fmt.Errorf("Invalid -batch-wait value %q: %w", batchWaitStr, err) + } + } + + // Parse and validate -on-error. + switch c.onError { + case jobRestartOnErrorAsk: + if !isTty() && !c.autoYes { + return 1, fmt.Errorf( + "Invalid -on-error value %[1]q: %[1]q cannot be used when terminal is not interactive", + jobRestartOnErrorAsk, + ) + } + case jobRestartOnErrorFail: + default: + return 1, fmt.Errorf( + "Invalid -on-error value %q: valid options are %q and %q", + c.onError, + jobRestartOnErrorAsk, + jobRestartOnErrorFail, + ) + } + + // -all-tasks conflicts with -task and . + if c.allTasks && len(tasks) != 0 { + return 1, fmt.Errorf("The -all-tasks option cannot be used with -task") + } + + // -reschedule conflicts with -task and . + if c.reschedule && len(tasks) != 0 { + return 1, fmt.Errorf("The -reschedule option cannot be used with -task") + } + + // Dedup tasks and groups. + c.groups = set.From(groups) + c.tasks = set.From(tasks) + + return 0, nil +} + +// filterAllocs returns a slice of the allocations that should be restarted. +func (c *JobRestartCommand) filterAllocs(stubs []AllocationListStubWithJob) []AllocationListStubWithJob { + result := []AllocationListStubWithJob{} + for _, stub := range stubs { + shortAllocID := limit(stub.ID, c.length) + + // Skip allocations that are not running. + if !stub.IsRunning() { + if c.verbose { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[dark_gray] %s: Skipping allocation %q because desired status is %q and client status is %q[reset]", + formatTime(time.Now()), + shortAllocID, + stub.ClientStatus, + stub.DesiredStatus, + ))) + } + continue + } + + // Skip allocations for groups that were not requested. + if c.groups.Size() > 0 { + if !c.groups.Contains(stub.TaskGroup) { + if c.verbose { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[dark_gray] %s: Skipping allocation %q because it doesn't have any of requested groups[reset]", + formatTime(time.Now()), + shortAllocID, + ))) + } + continue + } + } + + // Skip allocations that don't have any of the requested tasks. + if c.tasks.Size() > 0 { + hasTask := false + for _, taskName := range c.tasks.Slice() { + if stub.HasTask(taskName) { + hasTask = true + break + } + } + + if !hasTask { + if c.verbose { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[dark_gray] %s: Skipping allocation %q because it doesn't have any of requested tasks[reset]", + formatTime(time.Now()), + shortAllocID, + ))) + } + continue + } + } + + result = append(result, stub) + } + + return result +} + +// ensureNoActiveDeployment returns an error if the job has an active +// deployment. +func (c *JobRestartCommand) ensureNoActiveDeployment() error { + deployments, _, err := c.client.Jobs().Deployments(c.jobID, true, nil) + if err != nil { + return fmt.Errorf("Error retrieving deployments for job %q: %v", c.jobID, err) + + } + + for _, d := range deployments { + switch d.Status { + case api.DeploymentStatusFailed, api.DeploymentStatusSuccessful, api.DeploymentStatusCancelled: + // Deployment is terminal so it's safe to proceed. + default: + return fmt.Errorf("Deployment %q is %q", limit(d.ID, c.length), d.Status) + } + } + return nil +} + +// shouldRestartMultiregion blocks and waits for the user to confirm if the +// restart of a multi-region job should proceed. Returns true if the answer is +// positive. +func (c *JobRestartCommand) shouldRestartMultiregion() bool { + question := fmt.Sprintf( + "Are you sure you want to restart multi-region job %q in a single region? [y/N]", + c.jobID, + ) + + return c.askQuestion( + question, + false, + func(answer string) (bool, error) { + switch strings.TrimSpace(strings.ToLower(answer)) { + case "", "n", "no": + return false, nil + case "y", "yes": + return true, nil + default: + return false, fmt.Errorf("Invalid answer %q", answer) + } + }) +} + +// shouldProceed blocks and waits for the user to provide a valid input on how +// to proceed. Returns true if the answer is positive. +// +// The flags -batch-wait and -on-error have an 'ask' option. This function +// handles both to prevent asking the user twice in case they are both set to +// 'ask' and an error happens. +func (c *JobRestartCommand) shouldProceed(err error) bool { + var question, options string + + if err == nil { + question = "Proceed with the next batch?" + options = "Y/n" + } else { + question = "Ignore the errors above and proceed with the next batch?" + options = "y/N" // Defaults to 'no' if an error happens. + + if !c.isErrorRecoverable(err) { + question = `The errors above are likely to happen again. +Ignore them anyway and proceed with the next batch?` + } + } + + // If -batch-wait is 'ask' the user can provide a new wait duration. + if c.batchWaitAsk { + options += "/" + } + + return c.askQuestion( + fmt.Sprintf("%s [%s]", question, options), + false, + func(answer string) (bool, error) { + switch strings.ToLower(answer) { + case "": + // Proceed by default only if there is no error. + return err == nil, nil + case "y", "yes": + return true, nil + case "n", "no": + return false, nil + default: + if c.batchWaitAsk { + // Check if user passed a time duration and adjust the + // command to use that moving forward. + batchWait, err := time.ParseDuration(answer) + if err != nil { + return false, fmt.Errorf("Invalid answer %q", answer) + } + + c.batchWaitAsk = false + c.batchWait = batchWait + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[bold]==> %s: Proceeding restarts with new wait time of %s[reset]", + formatTime(time.Now()), + c.batchWait, + ))) + return true, nil + } else { + return false, fmt.Errorf("Invalid answer %q", answer) + } + } + }) +} + +// shouldExit blocks and waits for the user for confirmation if they would like +// to interrupt the command. Returns true if the answer is positive. +func (c *JobRestartCommand) shouldExit() bool { + question := `Restart interrupted, no more allocations will be restarted. +Are you sure you want to stop the restart process? [y/N]` + + return c.askQuestion( + question, + true, + func(answer string) (bool, error) { + switch strings.ToLower(answer) { + case "n", "no", "": + return false, nil + case "y", "yes": + return true, nil + default: + return false, fmt.Errorf("Invalid answer %q", answer) + } + }) +} + +// askQuestion asks question to user until they provide a valid response. +func (c *JobRestartCommand) askQuestion(question string, onError bool, cb func(string) (bool, error)) bool { + prefixedQuestion := fmt.Sprintf( + "[bold]==> %s: %s[reset]", + formatTime(time.Now()), + indentString(question, jobRestartTimestampPrefixLength), + ) + + // Let ui.Ask() handle interrupt signals. + signal.Stop(c.sigsCh) + defer func() { + signal.Notify(c.sigsCh, os.Interrupt) + }() + + for { + answer, err := c.Ui.Ask(c.Colorize().Color(prefixedQuestion)) + if err != nil { + if err.Error() != "interrupted" { + c.Ui.Output(err.Error()) + } + return onError + } + + exit, err := cb(strings.TrimSpace(answer)) + if err != nil { + c.Ui.Output(fmt.Sprintf("%s%s", strings.Repeat(" ", jobRestartTimestampPrefixLength), err)) + continue + } + return exit + } +} + +// handleAlloc stops or restarts an allocation in-place. Blocks until the +// allocation is done restarting or the rescheduled allocation is running. +func (c *JobRestartCommand) handleAlloc(alloc AllocationListStubWithJob) error { + var err error + if c.reschedule { + // Stopping an allocation triggers a reschedule. + err = c.stopAlloc(alloc) + } else { + err = c.restartAlloc(alloc) + } + if err != nil { + msg := fmt.Sprintf("Error restarting allocation %q:", limit(alloc.ID, c.length)) + if mErr, ok := err.(*multierror.Error); ok { + // Unwrap the errors and prefix them with a common message to + // prevent deep nesting of errors. + return multierror.Prefix(mErr, msg) + } + return fmt.Errorf("%s %w", msg, err) + } + return nil +} + +// restartAlloc restarts an allocation in place and blocks until the tasks are +// done restarting. +func (c *JobRestartCommand) restartAlloc(alloc AllocationListStubWithJob) error { + shortAllocID := limit(alloc.ID, c.length) + + if c.allTasks { + c.Ui.Output(fmt.Sprintf( + " %s: Restarting all tasks in allocation %q for group %q", + formatTime(time.Now()), + shortAllocID, + alloc.TaskGroup, + )) + + return c.client.Allocations().RestartAllTasks(&api.Allocation{ID: alloc.ID}, nil) + } + + if c.tasks.Size() == 0 { + c.Ui.Output(fmt.Sprintf( + " %s: Restarting running tasks in allocation %q for group %q", + formatTime(time.Now()), + shortAllocID, + alloc.TaskGroup, + )) + + return c.client.Allocations().Restart(&api.Allocation{ID: alloc.ID}, "", nil) + } + + // Run restarts concurrently when specific tasks were requested. + var restarts multierror.Group + for _, task := range c.tasks.Slice() { + if !alloc.HasTask(task) { + continue + } + + c.Ui.Output(fmt.Sprintf( + " %s: Restarting task %q in allocation %q for group %q", + formatTime(time.Now()), + task, + shortAllocID, + alloc.TaskGroup, + )) + + restarts.Go(func(taskName string) func() error { + return func() error { + err := c.client.Allocations().Restart(&api.Allocation{ID: alloc.ID}, taskName, nil) + if err != nil { + return fmt.Errorf("Failed to restart task %q: %w", taskName, err) + } + return nil + } + }(task)) + } + return restarts.Wait().ErrorOrNil() +} + +// stopAlloc stops an allocation and blocks until the replacement allocation is +// running. +func (c *JobRestartCommand) stopAlloc(alloc AllocationListStubWithJob) error { + shortAllocID := limit(alloc.ID, c.length) + + c.Ui.Output(fmt.Sprintf( + " %s: Rescheduling allocation %q for group %q", + formatTime(time.Now()), + shortAllocID, + alloc.TaskGroup, + )) + + var q *api.QueryOptions + if c.noShutdownDelay { + q = &api.QueryOptions{ + Params: map[string]string{"no_shutdown_delay": "true"}, + } + } + + // Stop allocation and wait for its replacement to be running or for a + // blocked evaluation that prevents placements for this task group to + // happen. + resp, err := c.client.Allocations().Stop(&api.Allocation{ID: alloc.ID}, q) + if err != nil { + return fmt.Errorf("Failed to stop allocation: %w", err) + } + + // errCh receives an error if anything goes wrong or nil when the + // replacement allocation is running. + // Use a buffered channel to prevent both goroutine from blocking trying to + // send a result back. + errCh := make(chan error, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Pass the LastIndex from the Stop() call to only monitor data that was + // created after the Stop() call. + go c.monitorPlacementFailures(ctx, alloc, resp.LastIndex, errCh) + go c.monitorReplacementAlloc(ctx, alloc, errCh) + + // This process may take a while, so ping user from time to time to + // indicate the command is still alive. + ticker := time.NewTicker(time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.Ui.Output(fmt.Sprintf( + " %s: Still waiting for allocation %q to be replaced", + formatTime(time.Now()), + shortAllocID, + )) + case err := <-errCh: + return err + } + } +} + +// monitorPlacementFailures searches for evaluations of the allocation job that +// have placement failures. +// +// Returns an error in errCh if anything goes wrong or if there are placement +// failures for the allocation task group. +func (c *JobRestartCommand) monitorPlacementFailures( + ctx context.Context, + alloc AllocationListStubWithJob, + index uint64, + errCh chan<- error, +) { + q := &api.QueryOptions{WaitIndex: index} + for { + select { + case <-ctx.Done(): + return + default: + } + + evals, qm, err := c.client.Jobs().Evaluations(alloc.JobID, q) + if err != nil { + errCh <- fmt.Errorf("Failed to retrieve evaluations for job %q: %w", alloc.JobID, err) + return + } + + for _, eval := range evals { + select { + case <-ctx.Done(): + return + default: + } + + // Skip evaluations created before the allocation was stopped or + // that are not blocked. + if eval.CreateIndex < index || eval.Status != api.EvalStatusBlocked { + continue + } + + failures := eval.FailedTGAllocs[alloc.TaskGroup] + if failures != nil { + errCh <- ErrJobRestartPlacementFailure{ + EvalID: limit(eval.ID, c.length), + TaskGroup: alloc.TaskGroup, + Failures: failures, + } + return + } + } + q.WaitIndex = qm.LastIndex + } +} + +// monitorReplacementAlloc waits for the allocation to have a follow-up +// placement and for the new allocation be running. +// +// Returns an error in errCh if anything goes wrong or nil when the new +// allocation is running. +func (c *JobRestartCommand) monitorReplacementAlloc( + ctx context.Context, + allocStub AllocationListStubWithJob, + errCh chan<- error, +) { + currentAllocID := allocStub.ID + q := &api.QueryOptions{WaitIndex: 1} + for { + select { + case <-ctx.Done(): + return + default: + } + + alloc, qm, err := c.client.Allocations().Info(currentAllocID, q) + if err != nil { + errCh <- fmt.Errorf("Failed to retrieve allocation %q: %w", limit(alloc.ID, c.length), err) + return + } + + // Follow replacement allocations. We expect the original allocation to + // be replaced, but the replacements may be themselves replaced in + // cases such as the allocation failing. + if alloc.NextAllocation != "" { + c.Ui.Output(fmt.Sprintf( + " %s: Allocation %q replaced by %[3]q, waiting for %[3]q to start running", + formatTime(time.Now()), + limit(alloc.ID, c.length), + limit(alloc.NextAllocation, c.length), + )) + currentAllocID = alloc.NextAllocation + + // Reset the blocking query so the Info() API call returns the new + // allocation immediately. + q.WaitIndex = 1 + continue + } + + switch alloc.ClientStatus { + case api.AllocClientStatusRunning: + // Make sure the running allocation we found is a replacement, not + // the original one. + if alloc.ID != allocStub.ID { + c.Ui.Output(fmt.Sprintf( + " %s: Allocation %q is %q", + formatTime(time.Now()), + limit(alloc.ID, c.length), + alloc.ClientStatus, + )) + errCh <- nil + return + } + + default: + if c.verbose { + c.Ui.Output(c.Colorize().Color(fmt.Sprintf( + "[dark_gray] %s: Allocation %q is %q[reset]", + formatTime(time.Now()), + limit(alloc.ID, c.length), + alloc.ClientStatus, + ))) + } + } + + q.WaitIndex = qm.LastIndex + } +} + +// handleSignal receives input signals and blocks the activeCh until the user +// confirms how to proceed. +// +// Exit immediately if the user confirms the interrupt, otherwise resume the +// command and feed activeCh to unblock it. +func (c *JobRestartCommand) handleSignal(sigsCh chan os.Signal, activeCh chan any) { + for { + select { + case <-sigsCh: + // Consume activeCh to prevent the main loop from proceeding. + select { + case <-activeCh: + default: + } + + if c.shouldExit() { + c.Ui.Output("\nCanceling job restart process") + os.Exit(0) + } + case activeCh <- struct{}{}: + } + } +} + +// isErrorRecoverable returns true when the error is likely to impact all +// restarts and so there is not reason to keep going. +func (c *JobRestartCommand) isErrorRecoverable(err error) bool { + if err == nil { + return true + } + + if errors.Is(err, ErrJobRestartPlacementFailure{}) { + return false + } + + if strings.Contains(err.Error(), api.PermissionDeniedErrorContent) { + return false + } + + return true +} + +// errorFormat returns a multierror.ErrorFormatFunc that indents each line, +// except for the first one, of the resulting error string with the given +// number of spaces. +func (c *JobRestartCommand) errorFormat(indent int) func([]error) string { + return func(es []error) string { + points := make([]string, len(es)) + for i, err := range es { + points[i] = fmt.Sprintf("* %s", strings.TrimSpace(err.Error())) + } + + out := fmt.Sprintf( + "%s occurred while restarting job:\n%s", + english.Plural(len(es), "error", "errors"), + strings.Join(points, "\n"), + ) + return indentString(out, indent) + } +} + +// AllocationListStubWithJob combines an AllocationListStub with its +// corresponding job at the right version. +type AllocationListStubWithJob struct { + *api.AllocationListStub + Job *api.Job +} + +// HasTask returns true if the allocation has the given task in the specific +// job version it was created. +func (a *AllocationListStubWithJob) HasTask(name string) bool { + // Check task state first as it's the fastest and most reliable source. + if _, ok := a.TaskStates[name]; ok { + return true + } + + // But task states are only set when the client updates its allocations + // with the server, so they may not be available yet. Lookup the task in + // the job version as a fallback. + if a.Job == nil { + return false + } + + var taskGroup *api.TaskGroup + for _, tg := range a.Job.TaskGroups { + if tg.Name == nil || *tg.Name != a.TaskGroup { + continue + } + taskGroup = tg + } + if taskGroup == nil { + return false + } + + for _, task := range taskGroup.Tasks { + if task.Name == name { + return true + } + } + + return false +} + +// IsRunning returns true if the allocation's ClientStatus or DesiredStatus is +// running. +func (a *AllocationListStubWithJob) IsRunning() bool { + return a.ClientStatus == api.AllocClientStatusRunning || + a.DesiredStatus == api.AllocDesiredStatusRun +} diff --git a/command/job_restart_test.go b/command/job_restart_test.go new file mode 100644 index 00000000000..d33bf879aca --- /dev/null +++ b/command/job_restart_test.go @@ -0,0 +1,1591 @@ +package command + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "net/http/httputil" + neturl "net/url" + "regexp" + "sort" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/hashicorp/go-set" + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/command/agent" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/testutil" + "github.com/mitchellh/cli" + + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" +) + +func TestJobRestartCommand_Implements(t *testing.T) { + ci.Parallel(t) + var _ cli.Command = &JobRestartCommand{} +} + +func TestJobRestartCommand_parseAndValidate(t *testing.T) { + ci.Parallel(t) + + testCases := []struct { + name string + args []string + expectedErr string + expectedCmd *JobRestartCommand + }{ + { + name: "missing job", + args: []string{}, + expectedErr: "This command takes one argument", + }, + { + name: "too many args", + args: []string{"one", "two", "three"}, + expectedErr: "This command takes one argument", + }, + { + name: "tasks and groups", + args: []string{ + "-task", "my-task-1", "-task", "my-task-2", + "-group", "my-group-1", "-group", "my-group-2", + "my-job", + }, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + groups: set.From([]string{"my-group-1", "my-group-2"}), + tasks: set.From([]string{"my-task-1", "my-task-2"}), + batchSize: 1, + }, + }, + { + name: "all tasks", + args: []string{"-all-tasks", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + allTasks: true, + batchSize: 1, + }, + }, + { + name: "all tasks conflicts with task", + args: []string{"-all-tasks", "-task", "my-task", "-yes", "my-job"}, + expectedErr: "The -all-tasks option cannot be used with -task", + }, + { + name: "batch size as number", + args: []string{"-batch-size", "10", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 10, + }, + }, + { + name: "batch size as percentage", + args: []string{"-batch-size", "10%", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 10, + batchSizePercent: true, + }, + }, + { + name: "batch size not valid", + args: []string{"-batch-size", "not-valid", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch size decimal not valid", + args: []string{"-batch-size", "1.5", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch size zero", + args: []string{"-batch-size", "0", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch size decimal percent not valid", + args: []string{"-batch-size", "1.5%", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch size zero percentage", + args: []string{"-batch-size", "0%", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch size with multiple numbers and percentages", + args: []string{"-batch-size", "15%10%", "my-job"}, + expectedErr: "Invalid -batch-size value", + }, + { + name: "batch wait ask", + args: []string{"-batch-wait", "ask", "my-job"}, + expectedErr: "terminal is not interactive", // Can't test non-interactive. + }, + { + name: "batch wait duration", + args: []string{"-batch-wait", "10s", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 1, + batchWait: 10 * time.Second, + }, + }, + { + name: "batch wait invalid", + args: []string{"-batch-wait", "10", "my-job"}, + expectedErr: "Invalid -batch-wait value", + }, + { + name: "on error fail", + args: []string{"-on-error", "fail", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 1, + onError: jobRestartOnErrorFail, + }, + }, + { + name: "on error invalid", + args: []string{"-on-error", "invalid", "my-job"}, + expectedErr: "Invalid -on-error value", + }, + { + name: "no shutdown delay", + args: []string{"-no-shutdown-delay", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 1, + noShutdownDelay: true, + }, + }, + { + name: "reschedule", + args: []string{"-reschedule", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 1, + reschedule: true, + }, + }, + { + name: "reschedule conflicts with task", + args: []string{"-reschedule", "-task", "my-task", "-yes", "my-job"}, + expectedErr: "The -reschedule option cannot be used with -task", + }, + { + name: "verbose", + args: []string{"-verbose", "my-job"}, + expectedCmd: &JobRestartCommand{ + jobID: "my-job", + batchSize: 1, + verbose: true, + length: fullId, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ui := &cli.ConcurrentUi{Ui: cli.NewMockUi()} + meta := Meta{Ui: ui} + + // Set some default values if not defined in test case. + if tc.expectedCmd != nil { + tc.expectedCmd.Meta = meta + + if tc.expectedCmd.length == 0 { + tc.expectedCmd.length = shortId + } + if tc.expectedCmd.groups == nil { + tc.expectedCmd.groups = set.New[string](0) + } + if tc.expectedCmd.tasks == nil { + tc.expectedCmd.tasks = set.New[string](0) + } + if tc.expectedCmd.onError == "" { + tc.expectedCmd.onError = jobRestartOnErrorAsk + tc.expectedCmd.autoYes = true + tc.args = append([]string{"-yes"}, tc.args...) + } + } + + cmd := &JobRestartCommand{Meta: meta} + code, err := cmd.parseAndValidate(tc.args) + + if tc.expectedErr != "" { + must.NonZero(t, code) + must.ErrorContains(t, err, tc.expectedErr) + } else { + must.NoError(t, err) + must.Zero(t, code) + must.Eq(t, tc.expectedCmd, cmd, must.Cmp(cmpopts.IgnoreFields(JobRestartCommand{}, "Meta", "Meta.Ui"))) + } + }) + } +} + +func TestJobRestartCommand_Run(t *testing.T) { + ci.Parallel(t) + + // Create a job with multiple tasks, groups, and allocations. + prestartTask := api.NewTask("prestart", "mock_driver"). + SetConfig("run_for", "100ms"). + SetConfig("exit_code", 0). + SetLifecycle(&api.TaskLifecycle{ + Hook: api.TaskLifecycleHookPrestart, + Sidecar: false, + }) + sidecarTask := api.NewTask("sidecar", "mock_driver"). + SetConfig("run_for", "1m"). + SetConfig("exit_code", 0). + SetLifecycle(&api.TaskLifecycle{ + Hook: api.TaskLifecycleHookPoststart, + Sidecar: true, + }) + mainTask := api.NewTask("main", "mock_driver"). + SetConfig("run_for", "1m"). + SetConfig("exit_code", 0) + + jobID := "test_job_restart_cmd" + job := api.NewServiceJob(jobID, jobID, "global", 1). + AddDatacenter("dc1"). + AddTaskGroup( + api.NewTaskGroup("single_task", 3). + AddTask(mainTask), + ). + AddTaskGroup( + api.NewTaskGroup("multiple_tasks", 2). + AddTask(prestartTask). + AddTask(sidecarTask). + AddTask(mainTask), + ) + + testCases := []struct { + name string + args []string // Job arg is added automatically. + expectedCode int + validateFn func(*testing.T, *api.Client, []*api.AllocationListStub, string, string) + }{ + { + name: "restart only running tasks in all groups by default", + args: []string{"-batch-size", "100%"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": true, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") + must.Len(t, 5, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + + }, + }, + { + name: "restart specific task in all groups", + args: []string{"-batch-size", "100%", "-task", "main"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") + must.Len(t, 5, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart multiple tasks in all groups", + args: []string{"-batch-size", "100%", "-task", "main", "-task", "sidecar"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": true, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") + must.Len(t, 5, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart all tasks in all groups", + args: []string{"-batch-size", "100%", "-all-tasks"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": true, + "sidecar": true, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task", "multiple_tasks"}, "main") + must.Len(t, 5, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart running tasks in specific group", + args: []string{"-batch-size", "100%", "-group", "single_task"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": false, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task"}, "main") + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + + }, + }, + { + name: "restart specific task that is not running", + args: []string{"-batch-size", "100%", "-task", "prestart"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": false, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": false, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task"}, "main") + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + + // Check that we have an error message. + must.StrContains(t, stderr, "Task not running") + }, + expectedCode: 1, + }, + { + name: "restart specific task in specific group", + args: []string{"-batch-size", "100%", "-task", "main", "-group", "single_task"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": false, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task"}, "main") + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart multiple tasks in specific group", + args: []string{"-batch-size", "100%", "-task", "main", "-task", "sidecar", "-group", "multiple_tasks"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": false, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": true, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"multiple_tasks"}, "main") + must.Len(t, 2, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart all tasks in specific group", + args: []string{"-batch-size", "100%", "-all-tasks", "-group", "multiple_tasks"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": false, + }, + "multiple_tasks": { + "prestart": true, + "sidecar": true, + "main": true, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"multiple_tasks"}, "main") + must.Len(t, 2, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "restart in batches", + args: []string{"-batch-size", "3", "-batch-wait", "3s", "-task", "main"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": true, + }, + }) + + // Check that allocations were properly batched. + batches := getRestartBatches(restarted, []string{"multiple_tasks", "single_task"}, "main") + + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch of 3 allocations") + + must.Len(t, 2, batches[1]) + must.StrContains(t, stdout, "Restarting 2nd batch of 2 allocations") + + // Check that we only waited between batches. + waitMsgCount := strings.Count(stdout, "Waiting 3s before restarting the next batch") + must.Eq(t, 1, waitMsgCount) + + // Check that batches waited the expected time. + batch1Restart := batches[0][0].TaskStates["main"].LastRestart + batch2Restart := batches[1][0].TaskStates["main"].LastRestart + diff := batch2Restart.Sub(batch1Restart) + must.Between(t, 3*time.Second, diff, 4*time.Second) + }, + }, + { + name: "restart in percent batch", + args: []string{"-batch-size", "50%", "-batch-wait", "3s", "-task", "main"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": true, + }, + }) + + // Check that allocations were properly batched. + batches := getRestartBatches(restarted, []string{"multiple_tasks", "single_task"}, "main") + + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch of 3 allocations") + + must.Len(t, 2, batches[1]) + must.StrContains(t, stdout, "Restarting 2nd batch of 2 allocations") + + // Check that we only waited between batches. + waitMsgCount := strings.Count(stdout, "Waiting 3s before restarting the next batch") + must.Eq(t, 1, waitMsgCount) + + // Check that batches waited the expected time. + batch1Restart := batches[0][0].TaskStates["main"].LastRestart + batch2Restart := batches[1][0].TaskStates["main"].LastRestart + diff := batch2Restart.Sub(batch1Restart) + must.Between(t, 3*time.Second, diff, 4*time.Second) + }, + }, + { + name: "restart in batch ask with yes", + args: []string{"-batch-size", "100%", "-batch-wait", "ask", "-yes", "-group", "single_task"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + restarted := waitTasksRestarted(t, client, allocs, map[string]map[string]bool{ + "single_task": { + "main": true, + }, + "multiple_tasks": { + "prestart": false, + "sidecar": false, + "main": false, + }, + }) + + // Check that allocations restarted in a single batch. + batches := getRestartBatches(restarted, []string{"single_task"}, "main") + must.Len(t, 3, batches[0]) + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + { + name: "reschedule in batches", + args: []string{"-reschedule", "-batch-size", "3"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + // Expect all allocations were rescheduled. + reschedules := map[string]bool{} + for _, alloc := range allocs { + reschedules[alloc.ID] = true + } + waitAllocsRescheduled(t, client, reschedules) + + // Check that allocations were properly batched. + must.StrContains(t, stdout, "Restarting 1st batch of 3 allocations") + must.StrContains(t, stdout, "Restarting 2nd batch of 2 allocations") + must.StrNotContains(t, stdout, "Waiting") + }, + }, + { + name: "reschedule specific group", + args: []string{"-reschedule", "-batch-size", "100%", "-group", "single_task"}, + validateFn: func(t *testing.T, client *api.Client, allocs []*api.AllocationListStub, stdout string, stderr string) { + // Expect that only allocs for the single_task group were + // rescheduled. + reschedules := map[string]bool{} + for _, alloc := range allocs { + if alloc.TaskGroup == "single_task" { + reschedules[alloc.ID] = true + } + } + waitAllocsRescheduled(t, client, reschedules) + + // Check that allocations restarted in a single batch. + must.StrContains(t, stdout, "Restarting 1st batch") + must.StrNotContains(t, stdout, "restarting the next batch") + }, + }, + } + + for _, tc := range testCases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + // Run each test case in parallel because they are fairly slow. + ci.Parallel(t) + + // Initialize UI and command. + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Start client and server and wait for node to be ready. + // User separate cluster for each test case so they can run in + // parallel without affecting each other. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + // Register test job and wait for its allocs to be running. + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + + allocStubs, _, err := client.Jobs().Allocations(jobID, true, nil) + must.NoError(t, err) + for _, alloc := range allocStubs { + waitForAllocRunning(t, client, alloc.ID) + } + + // Fetch allocations before the restart so we know which ones are + // supposed to be affected in case the test reschedules allocs. + allocStubs, _, err = client.Jobs().Allocations(jobID, true, nil) + must.NoError(t, err) + + // Prepend server URL and append job ID to the test case command. + args := []string{"-address", url, "-yes"} + args = append(args, tc.args...) + args = append(args, jobID) + + // Run job restart command. + code = cmd.Run(args) + must.Eq(t, code, tc.expectedCode) + + // Run test case validation function. + if tc.validateFn != nil { + tc.validateFn(t, client, allocStubs, ui.OutputWriter.String(), ui.ErrorWriter.String()) + } + }) + } +} + +func TestJobRestartCommand_jobPrefixAndNamespace(t *testing.T) { + ci.Parallel(t) + + ui := cli.NewMockUi() + + // Start client and server and wait for node to be ready. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + // Create non-default namespace. + _, err := client.Namespaces().Register(&api.Namespace{Name: "prod"}, nil) + must.NoError(t, err) + + // Register job with same name in both namespaces. + evalIDs := []string{} + + jobDefault := testJob("test_job_restart") + resp, _, err := client.Jobs().Register(jobDefault, nil) + must.NoError(t, err) + evalIDs = append(evalIDs, resp.EvalID) + + jobProd := testJob("test_job_restart") + jobProd.Namespace = pointer.Of("prod") + resp, _, err = client.Jobs().Register(jobProd, nil) + must.NoError(t, err) + evalIDs = append(evalIDs, resp.EvalID) + + jobUniqueProd := testJob("test_job_restart_prod_ns") + jobUniqueProd.Namespace = pointer.Of("prod") + resp, _, err = client.Jobs().Register(jobUniqueProd, nil) + must.NoError(t, err) + evalIDs = append(evalIDs, resp.EvalID) + + // Wait for evals to be processed. + for _, evalID := range evalIDs { + code := waitForSuccess(ui, client, fullId, t, evalID) + must.Eq(t, 0, code) + } + ui.OutputWriter.Reset() + + testCases := []struct { + name string + args []string + expectedErr string + }{ + { + name: "prefix match in default namespace", + args: []string{"test_job"}, + }, + { + name: "invalid job", + args: []string{"not-valid"}, + expectedErr: "No job(s) with prefix or ID", + }, + { + name: "prefix matches multiple jobs", + args: []string{"-namespace", "prod", "test_job"}, + expectedErr: "matched multiple jobs", + }, + { + name: "prefix matches multiple jobs across namespaces", + args: []string{"-namespace", "*", "test_job"}, + expectedErr: "matched multiple jobs", + }, + { + name: "unique prefix match across namespaces", + args: []string{"-namespace", "*", "test_job_restart_prod"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + defer func() { + ui.OutputWriter.Reset() + ui.ErrorWriter.Reset() + }() + + cmd := &JobRestartCommand{ + Meta: Meta{Ui: &cli.ConcurrentUi{Ui: ui}}, + } + args := append([]string{"-address", url, "-yes"}, tc.args...) + code := cmd.Run(args) + + if tc.expectedErr != "" { + must.NonZero(t, code) + must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr) + } else { + must.Zero(t, code) + } + }) + } +} + +func TestJobRestartCommand_noAllocs(t *testing.T) { + ci.Parallel(t) + + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Start client and server and wait for node to be ready. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + // Register test job with impossible constraint so it doesn't get allocs. + jobID := "test_job_restart_no_allocs" + job := testJob(jobID) + job.Datacenters = []string{"invalid"} + + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Eq(t, 2, code) // Placement is expected to fail so exit code is not 0. + ui.OutputWriter.Reset() + + // Run job restart command and expect it to exit without restarts. + code = cmd.Run([]string{ + "-address", url, + "-yes", + jobID, + }) + must.Zero(t, code) + must.StrContains(t, ui.OutputWriter.String(), "No allocations to restart") +} + +func TestJobRestartCommand_rescheduleFail(t *testing.T) { + ci.Parallel(t) + + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Start client and server and wait for node to be ready. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + // Register test job with 3 allocs. + jobID := "test_job_restart_reschedule_fail" + job := testJob(jobID) + job.TaskGroups[0].Count = pointer.Of(3) + + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + ui.OutputWriter.Reset() + + // Wait for allocs to be running. + allocs, _, err := client.Jobs().Allocations(jobID, true, nil) + must.NoError(t, err) + for _, alloc := range allocs { + waitForAllocRunning(t, client, alloc.ID) + } + + // Mark node as ineligible to prevent allocs from being replaced. + nodeID := srv.Agent.Client().NodeID() + client.Nodes().ToggleEligibility(nodeID, false, nil) + + // Run job restart command and expect it to fail. + code = cmd.Run([]string{ + "-address", url, + "-batch-size", "2", + "-reschedule", + "-yes", + jobID, + }) + must.One(t, code) + must.StrContains(t, ui.ErrorWriter.String(), "No nodes were eligible for evaluation") +} + +func TestJobRestartCommand_monitorReplacementAlloc(t *testing.T) { + ci.Parallel(t) + + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + srv, client, _ := testServer(t, true, nil) + defer srv.Shutdown() + waitForNodes(t, client) + + // Register test job and update it twice so we end up with three + // allocations, one replacing the next one. + jobID := "test_job_restart_monitor_replacement" + job := testJob(jobID) + + for i := 1; i <= 3; i++ { + job.TaskGroups[0].Tasks[0].Config["run_for"] = fmt.Sprintf("%ds", i) + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + } + ui.OutputWriter.Reset() + + // Prepare the command internals. We want to run a specific function and + // target a specific allocation, so we can't run the full command. + cmd.client = client + cmd.verbose = true + cmd.length = fullId + + // Fetch, sort, and monitor the oldest allocation. + allocs, _, err := client.Jobs().Allocations(jobID, true, nil) + must.NoError(t, err) + sort.Slice(allocs, func(i, j int) bool { + return allocs[i].CreateIndex < allocs[j].CreateIndex + }) + + errCh := make(chan error) + go cmd.monitorReplacementAlloc(context.Background(), AllocationListStubWithJob{ + AllocationListStub: allocs[0], + Job: job, + }, errCh) + + // Make sure the command doesn't get stuck and that we traverse the + // follow-up allocations properly. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + select { + case err := <-errCh: + return err + default: + return fmt.Errorf("waiting for response") + } + }), + wait.Timeout(time.Duration(testutil.TestMultiplier()*3)*time.Second), + )) + must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q replaced by %q", allocs[0].ID, allocs[1].ID)) + must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q replaced by %q", allocs[1].ID, allocs[2].ID)) + must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q is %q", allocs[2].ID, api.AllocClientStatusRunning)) +} + +func TestJobRestartCommand_activeDeployment(t *testing.T) { + ci.Parallel(t) + + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + waitForNodes(t, client) + + // Register test job and update it once to trigger a deployment. + jobID := "test_job_restart_deployment" + job := testJob(jobID) + job.Type = pointer.Of(api.JobTypeService) + job.Update = &api.UpdateStrategy{ + Canary: pointer.Of(1), + AutoPromote: pointer.Of(false), + } + + _, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + _, _, err = client.Jobs().Register(job, nil) + must.NoError(t, err) + + // Wait for a deployment to be running. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + deployments, _, err := client.Jobs().Deployments(jobID, true, nil) + if err != nil { + return err + } + for _, d := range deployments { + if d.Status == api.DeploymentStatusRunning { + return nil + } + } + return fmt.Errorf("no running deployments") + }), + wait.Timeout(time.Duration(testutil.TestMultiplier()*3)*time.Second), + )) + + // Run job restart command and expect it to fail. + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + code := cmd.Run([]string{ + "-address", url, + "-on-error", jobRestartOnErrorFail, + "-verbose", + jobID, + }) + must.One(t, code) + must.RegexMatch(t, regexp.MustCompile(`Deployment .+ is "running"`), ui.ErrorWriter.String()) +} + +func TestJobRestartCommand_ACL(t *testing.T) { + ci.Parallel(t) + + // Start server with ACL enabled. + srv, client, url := testServer(t, true, func(c *agent.Config) { + c.ACL.Enabled = true + }) + defer srv.Shutdown() + + rootTokenOpts := &api.WriteOptions{ + AuthToken: srv.RootToken.SecretID, + } + + // Register test job. + jobID := "test_job_restart_acl" + job := testJob(jobID) + _, _, err := client.Jobs().Register(job, rootTokenOpts) + must.NoError(t, err) + + // Wait for allocs to be running. + waitForJobAllocsStatus(t, client, jobID, api.AllocClientStatusRunning, srv.RootToken.SecretID) + + testCases := []struct { + name string + jobPrefix bool + aclPolicy string + expectedErr string + }{ + { + name: "no token", + aclPolicy: "", + expectedErr: api.PermissionDeniedErrorContent, + }, + { + name: "alloc-lifecycle not enough", + aclPolicy: ` +namespace "default" { + capabilities = ["alloc-lifecycle"] +} +`, + expectedErr: api.PermissionDeniedErrorContent, + }, + { + name: "read-job not enough", + aclPolicy: ` +namespace "default" { + capabilities = ["read-job"] +} +`, + expectedErr: api.PermissionDeniedErrorContent, + }, + { + name: "alloc-lifecycle and read-job allowed", + aclPolicy: ` +namespace "default" { + capabilities = ["alloc-lifecycle", "read-job"] +} +`, + }, + { + name: "job prefix requires list-jobs", + aclPolicy: ` +namespace "default" { + capabilities = ["alloc-lifecycle", "read-job"] +} +`, + jobPrefix: true, + expectedErr: "job not found", + }, + { + name: "job prefix works with list-jobs", + aclPolicy: ` +namespace "default" { + capabilities = ["list-jobs", "alloc-lifecycle", "read-job"] +} +`, + jobPrefix: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + args := []string{ + "-address", url, + "-yes", + } + + if tc.aclPolicy != "" { + // Create ACL token with test case policy. + policy := &api.ACLPolicy{ + Name: nonAlphaNum.ReplaceAllString(tc.name, "-"), + Rules: tc.aclPolicy, + } + _, err := client.ACLPolicies().Upsert(policy, rootTokenOpts) + must.NoError(t, err) + + token := &api.ACLToken{ + Type: "client", + Policies: []string{policy.Name}, + } + token, _, err = client.ACLTokens().Create(token, rootTokenOpts) + must.NoError(t, err) + + // Set token in command args. + args = append(args, "-token", token.SecretID) + } + + // Add job ID or job ID prefix to the command. + if tc.jobPrefix { + args = append(args, jobID[0:3]) + } else { + args = append(args, jobID) + } + + // Run command. + code := cmd.Run(args) + if tc.expectedErr == "" { + must.Zero(t, code) + } else { + must.One(t, code) + must.StrContains(t, ui.ErrorWriter.String(), tc.expectedErr) + } + }) + } +} + +// TODO(luiz): update once alloc restart supports -no-shutdown-delay. +func TestJobRestartCommand_shutdownDelay_reschedule(t *testing.T) { + ci.Parallel(t) + + // Start client and server and wait for node to be ready. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + waitForNodes(t, client) + + testCases := []struct { + name string + args []string + shutdownDelay bool + }{ + { + name: "job reschedule with shutdown delay by default", + args: []string{"-reschedule"}, + shutdownDelay: true, + }, + { + name: "job reschedule no shutdown delay", + args: []string{"-reschedule", "-no-shutdown-delay"}, + shutdownDelay: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Register job with 2 allocations and shutdown_delay. + shutdownDelay := 3 * time.Second + jobID := nonAlphaNum.ReplaceAllString(tc.name, "-") + + job := testJob(jobID) + job.TaskGroups[0].Count = pointer.Of(2) + job.TaskGroups[0].Tasks[0].Config["run_for"] = "10m" + job.TaskGroups[0].Tasks[0].ShutdownDelay = shutdownDelay + job.TaskGroups[0].Tasks[0].Services = []*api.Service{{ + Name: "service", + Provider: "nomad", + }} + + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + ui.OutputWriter.Reset() + + // Wait for alloc to be running. + allocStubs, _, err := client.Jobs().Allocations(jobID, true, nil) + must.NoError(t, err) + for _, alloc := range allocStubs { + waitForAllocRunning(t, client, alloc.ID) + } + + // Add address and job ID to the command and run. + args := []string{ + "-address", url, + "-batch-size", "1", + "-batch-wait", "0", + "-yes", + } + args = append(args, tc.args...) + args = append(args, jobID) + + code = cmd.Run(args) + must.Zero(t, code) + + // Wait for all allocs to restart. + reschedules := map[string]bool{} + for _, alloc := range allocStubs { + reschedules[alloc.ID] = true + } + allocs := waitAllocsRescheduled(t, client, reschedules) + + // Check that allocs have shutdown delay event. + for _, alloc := range allocs { + for _, s := range alloc.TaskStates { + var killedEv *api.TaskEvent + var killingEv *api.TaskEvent + for _, ev := range s.Events { + if strings.Contains(ev.Type, "Killed") { + killedEv = ev + } + if strings.Contains(ev.Type, "Killing") { + killingEv = ev + } + } + + diff := killedEv.Time - killingEv.Time + if tc.shutdownDelay { + must.GreaterEq(t, shutdownDelay, time.Duration(diff)) + } else { + // Add a bit of slack to account for the actual + // shutdown time of the task. + must.Between(t, shutdownDelay, time.Duration(diff), shutdownDelay+time.Second) + } + } + } + }) + } +} + +func TestJobRestartCommand_filterAllocs(t *testing.T) { + ci.Parallel(t) + + task1 := api.NewTask("task_1", "mock_driver") + task2 := api.NewTask("task_2", "mock_driver") + task3 := api.NewTask("task_3", "mock_driver") + + jobV1 := api.NewServiceJob("example", "example", "global", 1). + AddTaskGroup( + api.NewTaskGroup("group_1", 1). + AddTask(task1), + ). + AddTaskGroup( + api.NewTaskGroup("group_2", 1). + AddTask(task1). + AddTask(task2), + ). + AddTaskGroup( + api.NewTaskGroup("group_3", 1). + AddTask(task3), + ) + jobV1.Version = pointer.Of(uint64(1)) + + jobV2 := api.NewServiceJob("example", "example", "global", 1). + AddTaskGroup( + api.NewTaskGroup("group_1", 1). + AddTask(task1), + ). + AddTaskGroup( + api.NewTaskGroup("group_2", 1). + AddTask(task2), + ) + jobV2.Version = pointer.Of(uint64(2)) + + allAllocs := []AllocationListStubWithJob{} + allocs := map[string]AllocationListStubWithJob{} + for _, job := range []*api.Job{jobV1, jobV2} { + for _, tg := range job.TaskGroups { + for _, desired := range []string{api.AllocDesiredStatusRun, api.AllocDesiredStatusStop} { + for _, client := range []string{api.AllocClientStatusRunning, api.AllocClientStatusComplete} { + key := fmt.Sprintf("job_v%d_%s_%s_%s", *job.Version, *tg.Name, desired, client) + alloc := AllocationListStubWithJob{ + AllocationListStub: &api.AllocationListStub{ + ID: key, + JobVersion: *job.Version, + TaskGroup: *tg.Name, + DesiredStatus: desired, + ClientStatus: client, + }, + Job: job, + } + allocs[key] = alloc + allAllocs = append(allAllocs, alloc) + } + } + } + } + + testCases := []struct { + name string + args []string + expectedAllocs []AllocationListStubWithJob + }{ + { + name: "skip by group", + args: []string{"-group", "group_1"}, + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_1_run_running"], + allocs["job_v1_group_1_run_complete"], + allocs["job_v1_group_1_stop_running"], + allocs["job_v2_group_1_run_running"], + allocs["job_v2_group_1_run_complete"], + allocs["job_v2_group_1_stop_running"], + }, + }, + { + name: "skip by old group", + args: []string{"-group", "group_3"}, + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_3_run_running"], + allocs["job_v1_group_3_run_complete"], + allocs["job_v1_group_3_stop_running"], + }, + }, + { + name: "skip by task", + args: []string{"-task", "task_2"}, + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_2_run_running"], + allocs["job_v1_group_2_run_complete"], + allocs["job_v1_group_2_stop_running"], + allocs["job_v2_group_2_run_running"], + allocs["job_v2_group_2_run_complete"], + allocs["job_v2_group_2_stop_running"], + }, + }, + { + name: "skip by old task", + args: []string{"-task", "task_3"}, + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_3_run_running"], + allocs["job_v1_group_3_run_complete"], + allocs["job_v1_group_3_stop_running"], + }, + }, + { + name: "skip by group and task", + args: []string{ + "-group", "group_1", + "-group", "group_2", + "-task", "task_2", + }, + // Only group_2 has task_2 in all job versions. + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_2_run_running"], + allocs["job_v1_group_2_run_complete"], + allocs["job_v1_group_2_stop_running"], + allocs["job_v2_group_2_run_running"], + allocs["job_v2_group_2_run_complete"], + allocs["job_v2_group_2_stop_running"], + }, + }, + { + name: "skip by status", + args: []string{}, + expectedAllocs: []AllocationListStubWithJob{ + allocs["job_v1_group_1_run_running"], + allocs["job_v1_group_1_run_complete"], + allocs["job_v1_group_1_stop_running"], + allocs["job_v1_group_2_run_running"], + allocs["job_v1_group_2_run_complete"], + allocs["job_v1_group_2_stop_running"], + allocs["job_v1_group_3_run_running"], + allocs["job_v1_group_3_run_complete"], + allocs["job_v1_group_3_stop_running"], + allocs["job_v2_group_1_run_running"], + allocs["job_v2_group_1_run_complete"], + allocs["job_v2_group_1_stop_running"], + allocs["job_v2_group_2_run_running"], + allocs["job_v2_group_2_run_complete"], + allocs["job_v2_group_2_stop_running"], + }, + }, + { + name: "no matches by group", + args: []string{"-group", "group_404"}, + expectedAllocs: []AllocationListStubWithJob{}, + }, + { + name: "no matches by task", + args: []string{"-task", "task_404"}, + expectedAllocs: []AllocationListStubWithJob{}, + }, + { + name: "no matches by task with group", + args: []string{ + "-group", "group_1", + "-task", "task_2", // group_1 never has task_2. + }, + expectedAllocs: []AllocationListStubWithJob{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ui := cli.NewMockUi() + cmd := &JobRestartCommand{ + Meta: Meta{Ui: &cli.ConcurrentUi{Ui: ui}}, + } + + args := append(tc.args, "-verbose", "-yes", "example") + code, err := cmd.parseAndValidate(args) + must.NoError(t, err) + must.Zero(t, code) + + got := cmd.filterAllocs(allAllocs) + must.SliceEqFunc(t, tc.expectedAllocs, got, func(a, b AllocationListStubWithJob) bool { + return a.ID == b.ID + }) + + expected := set.FromFunc(tc.expectedAllocs, func(a AllocationListStubWithJob) string { + return a.ID + }) + for _, a := range allAllocs { + if !expected.Contains(a.ID) { + must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("Skipping allocation %q", a.ID)) + } + } + }) + } +} + +func TestJobRestartCommand_onErrorFail(t *testing.T) { + ci.Parallel(t) + + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + // Start client and server and wait for node to be ready. + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + parsedURL, err := neturl.Parse(url) + must.NoError(t, err) + + waitForNodes(t, client) + + // Register a job with 3 allocations. + jobID := "test_job_restart_command_fail_on_error" + job := testJob(jobID) + job.TaskGroups[0].Count = pointer.Of(3) + + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + ui.OutputWriter.Reset() + + // Create a proxy to inject an error after 2 allocation restarts. + // Also counts how many restart requests are made so we can check that the + // command stops after the error happens. + var allocRestarts int32 + proxy := httptest.NewServer(&httputil.ReverseProxy{ + ModifyResponse: func(resp *http.Response) error { + if strings.HasSuffix(resp.Request.URL.Path, "/restart") { + count := atomic.AddInt32(&allocRestarts, 1) + if count == 2 { + return fmt.Errorf("fail") + } + } + return nil + }, + Rewrite: func(r *httputil.ProxyRequest) { + r.SetURL(parsedURL) + }, + }) + defer proxy.Close() + + // Run command with -fail-on-error. + // Expect only 2 restarts requests even though there are 3 allocations. + code = cmd.Run([]string{ + "-address", proxy.URL, + "-on-error", jobRestartOnErrorFail, + jobID, + }) + must.One(t, code) + must.Eq(t, 2, allocRestarts) +} + +// waitTasksRestarted blocks until the given allocations have restarted or not. +// Returns a list with updated state of the allocations. +// +// To determine if a restart happened the function looks for a "Restart +// Signaled" event in the list of task events. Allocations that are reused +// between tests may contain a restart event from a past test case, leading to +// false positives. +// +// The restarts map contains values structured as group:task:. +func waitTasksRestarted( + t *testing.T, + client *api.Client, + allocs []*api.AllocationListStub, + restarts map[string]map[string]bool, +) []*api.Allocation { + t.Helper() + + var newAllocs []*api.Allocation + testutil.WaitForResult(func() (bool, error) { + newAllocs = make([]*api.Allocation, 0, len(allocs)) + + for _, alloc := range allocs { + if _, ok := restarts[alloc.TaskGroup]; !ok { + t.Fatalf("Missing group %q in restarts map", alloc.TaskGroup) + } + + // Skip allocations that are not supposed to be running. + if alloc.DesiredStatus != api.AllocDesiredStatusRun { + continue + } + + updated, _, err := client.Allocations().Info(alloc.ID, nil) + if err != nil { + return false, err + } + newAllocs = append(newAllocs, updated) + + for task, state := range updated.TaskStates { + restarted := false + for _, ev := range state.Events { + if ev.Type == api.TaskRestartSignal { + restarted = true + break + } + } + + if restarted && !restarts[updated.TaskGroup][task] { + return false, fmt.Errorf( + "task %q in alloc %s for group %q not expected to restart", + task, updated.ID, updated.TaskGroup, + ) + } + if !restarted && restarts[updated.TaskGroup][task] { + return false, fmt.Errorf( + "task %q in alloc %s for group %q expected to restart but didn't", + task, updated.ID, updated.TaskGroup, + ) + } + } + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) + + return newAllocs +} + +// waitAllocsRescheduled blocks until the given allocations have been +// rescueduled or not. Returns a list with updated state of the allocations. +// +// To determined if an allocation has been rescheduled the function looks for +// a non-empty NextAllocation field. +// +// The reschedules map maps allocation IDs to a boolean indicating if a +// reschedule is expected for that allocation. +func waitAllocsRescheduled(t *testing.T, client *api.Client, reschedules map[string]bool) []*api.Allocation { + t.Helper() + + var newAllocs []*api.Allocation + testutil.WaitForResult(func() (bool, error) { + newAllocs = make([]*api.Allocation, 0, len(reschedules)) + + for allocID, reschedule := range reschedules { + alloc, _, err := client.Allocations().Info(allocID, nil) + if err != nil { + return false, err + } + newAllocs = append(newAllocs, alloc) + + wasRescheduled := alloc.NextAllocation != "" + if wasRescheduled && !reschedule { + return false, fmt.Errorf("alloc %s not expected to be rescheduled", alloc.ID) + } + if !wasRescheduled && reschedule { + return false, fmt.Errorf("alloc %s expected to be rescheduled but wasn't", alloc.ID) + } + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) + + return newAllocs +} + +// getRestartBatches returns a list of allocations per batch of restarts. +// +// Since restarts are issued concurrently, it's expected that allocations in +// the same batch have fairly close LastRestart times, so a 1s delay between +// restarts may be enough to indicate a new batch. +func getRestartBatches(allocs []*api.Allocation, groups []string, task string) [][]*api.Allocation { + groupsSet := set.From(groups) + batches := [][]*api.Allocation{} + + type allocRestart struct { + alloc *api.Allocation + restart time.Time + } + + restarts := make([]allocRestart, 0, len(allocs)) + for _, alloc := range allocs { + if !groupsSet.Contains(alloc.TaskGroup) { + continue + } + + restarts = append(restarts, allocRestart{ + alloc: alloc, + restart: alloc.TaskStates[task].LastRestart, + }) + } + + sort.Slice(restarts, func(i, j int) bool { + return restarts[i].restart.Before(restarts[j].restart) + }) + + prev := restarts[0].restart + batch := []*api.Allocation{} + for _, r := range restarts { + if r.restart.Sub(prev) >= time.Second { + prev = r.restart + batches = append(batches, batch) + batch = []*api.Allocation{} + } + batch = append(batch, r.alloc) + } + batches = append(batches, batch) + + return batches +} diff --git a/command/meta.go b/command/meta.go index 06aa63e22a9..b8eac19000a 100644 --- a/command/meta.go +++ b/command/meta.go @@ -4,6 +4,7 @@ import ( "flag" "fmt" "os" + "reflect" "strings" "github.com/hashicorp/nomad/api" @@ -176,7 +177,35 @@ func (m *Meta) allNamespaces() bool { } func (m *Meta) Colorize() *colorstring.Colorize { - _, coloredUi := m.Ui.(*cli.ColoredUi) + ui := m.Ui + coloredUi := false + + // Meta.Ui may wrap other cli.Ui instances, so unwrap them until we find a + // *cli.ColoredUi or there is nothing left to unwrap. + for { + if ui == nil { + break + } + + _, coloredUi = ui.(*cli.ColoredUi) + if coloredUi { + break + } + + v := reflect.ValueOf(ui) + if v.Kind() == reflect.Ptr { + v = v.Elem() + } + for i := 0; i < v.NumField(); i++ { + if !v.Field(i).CanInterface() { + continue + } + ui, _ = v.Field(i).Interface().(cli.Ui) + if ui != nil { + break + } + } + } return &colorstring.Colorize{ Colors: colorstring.DefaultColors, diff --git a/command/testing_test.go b/command/testing_test.go index b1ffc23fdf3..6f8ca07878e 100644 --- a/command/testing_test.go +++ b/command/testing_test.go @@ -152,21 +152,48 @@ func waitForNodes(t *testing.T, client *api.Client) { }) } -func waitForAllocRunning(t *testing.T, client *api.Client, allocID string) { +func waitForJobAllocsStatus(t *testing.T, client *api.Client, jobID string, status string, token string) { + testutil.WaitForResult(func() (bool, error) { + q := &api.QueryOptions{AuthToken: token} + + allocs, _, err := client.Jobs().Allocations(jobID, true, q) + if err != nil { + return false, fmt.Errorf("failed to query job allocs: %v", err) + } + if len(allocs) == 0 { + return false, fmt.Errorf("no allocs") + } + + for _, alloc := range allocs { + if alloc.ClientStatus != status { + return false, fmt.Errorf("alloc status is %q not %q", alloc.ClientStatus, status) + } + } + return true, nil + }, func(err error) { + must.NoError(t, err) + }) +} + +func waitForAllocStatus(t *testing.T, client *api.Client, allocID string, status string) { testutil.WaitForResult(func() (bool, error) { alloc, _, err := client.Allocations().Info(allocID, nil) if err != nil { return false, err } - if alloc.ClientStatus == api.AllocClientStatusRunning { + if alloc.ClientStatus == status { return true, nil } - return false, fmt.Errorf("alloc status: %s", alloc.ClientStatus) + return false, fmt.Errorf("alloc status is %q not %q", alloc.ClientStatus, status) }, func(err error) { - t.Fatalf("timed out waiting for alloc to be running: %v", err) + must.NoError(t, err) }) } +func waitForAllocRunning(t *testing.T, client *api.Client, allocID string) { + waitForAllocStatus(t, client, allocID, api.AllocClientStatusRunning) +} + func waitForCheckStatus(t *testing.T, client *api.Client, allocID, status string) { testutil.WaitForResult(func() (bool, error) { results, err := client.Allocations().Checks(allocID, nil) diff --git a/website/content/docs/commands/job/restart.mdx b/website/content/docs/commands/job/restart.mdx new file mode 100644 index 00000000000..601fce1bf8f --- /dev/null +++ b/website/content/docs/commands/job/restart.mdx @@ -0,0 +1,234 @@ +--- +layout: docs +page_title: 'Commands: job restart' +description: | + The job restart command is used to restart allocations for a job. +--- + +# Command: job restart + +The `job restart` command is used to restart or reschedule allocations for a +particular job. + +Restarting the job calls the [Restart Allocation][api_alloc_restart] API +endpoint to restart the tasks inside allocations, so the allocations themselves +are not modified but rather restarted in-place. + +Rescheduling the job uses the [Stop Allocation][api_alloc_stop] API endpoint to +stop the allocations and trigger the Nomad scheduler to compute new placements. +This may cause the new allocations to be scheduled in different clients from +the originals. + +## Usage + +```plaintext +nomad job restart [options] +``` + +The `job restart` command requires a single argument, specifying the job ID to +restart. + +The command can operate in batches and wait until all restarted or +rescheduled allocations are running again before proceeding to the next batch. +It is also possible to specify additional time to wait between batches. + +Allocations can be restarted in-place or rescheduled. When restarting +in-place the command may target specific tasks in the allocations, restart +only tasks that are currently running, or restart all tasks, even the ones +that have already run. Allocations can also be targeted by groups and tasks. +When both groups and tasks are defined only the tasks for the allocations of +those groups are restarted. + +When rescheduling, the current allocations are stopped triggering the Nomad +scheduler to create replacement allocations that may be placed in different +clients. The command waits until the new allocations have client status `ready` +before proceeding with the remaining batches. Services health checks are not +taken into account. + +By default the command restarts all running tasks in-place with one allocation +per batch. + +When ACLs are enabled, this command requires a token with the +`alloc-lifecycle` and `read-job` capabilities for the job's namespace. The +`list-jobs` capability is required to run the command with a job prefix instead +of the exact job ID. + +## General Options + +@include 'general_options.mdx' + +## Restart Options + +- `-all-tasks`: If set, all tasks in the allocations are restarted, even the + ones that have already run, such as non-sidecar tasks. Tasks will restart + following their [`lifecycle`][] order. This option cannot be used with + `-task`. + +- `-batch-size=`: Number of allocations to restart at once. It may be + defined as a percentage value of the current number of running allocations. + Percentage values are rounded up to increase parallelism. Defaults to `1`. + +- `-batch-wait=`: Time to wait between restart batches. If set + to `ask` the command halts between batches and waits for user input on how to + proceed. If the answer is a time duration all remaining batches will use this + new value. Defaults to `0`. + +- `-group=`: Only restart allocations for the given group. Can be + specified multiple times. If no group is set all allocations for the job are + restarted. + +- `-no-shutdown-delay`: Ignore the group and task [`shutdown_delay`][] + configuration so there is no delay between service deregistration and task + shutdown or restart. Note that using this flag will result in failed network + connections to the allocation being restarted. + +- `-reschedule`: If set, allocations are stopped and rescheduled instead of + restarted in-place. Since the group is not modified the restart does not + create a new deployment, and so values defined in [`update`][] blocks, such + as [`max_parallel`][], are not taken into account. This option cannot be used + with `-task`. + +- `-on-error=`: Determines what action to take when an error happens + during a restart batch. If `ask` the command stops and waits for user + confirmation on how to proceed. If `fail` the command exits immediately. + Defaults to `ask`. + +- `-task=`: Specify the task to restart. Can be specified multiple + times. If groups are also specified the task must exist in at least one of + them. If no task is set only tasks that are currently running are restarted. + For example, non-sidecar tasks that already ran are not restarted unless + `-all-tasks` is used instead. This option cannot be used with `-all-tasks` or + `-reschedule`. + +- `-yes`: Automatic yes to prompts. If set, the command automatically restarts + multi-region jobs only in the region targeted by the command, ignores batch + errors, and automatically proceeds with the remaining batches without + waiting. Use `-on-error` and `-batch-wait` to adjust these behaviors. + + +- `-verbose`: Display full information. + +## Examples + +Restart running tasks of all allocations. + +```shell-session +$ nomad job restart example +==> 2023-02-28T17:36:31-05:00: Restarting 5 allocations + 2023-02-28T17:36:31-05:00: Restarting running tasks in allocation "32e143f8" for group "proxy" + 2023-02-28T17:36:31-05:00: Restarting running tasks in allocation "388129e0" for group "web" + 2023-02-28T17:36:31-05:00: Restarting running tasks in allocation "4fd581ee" for group "proxy" + 2023-02-28T17:36:32-05:00: Restarting running tasks in allocation "77d5c4f6" for group "proxy" + 2023-02-28T17:36:32-05:00: Restarting running tasks in allocation "d4303a30" for group "web" +==> 2023-02-28T17:36:32-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Target allocations of a specific group to restart. + +```shell-session +$ nomad job restart -group=web example +==> 2023-02-28T17:37:36-05:00: Restarting 2 allocations + 2023-02-28T17:37:36-05:00: Restarting running tasks in allocation "388129e0" for group "web" + 2023-02-28T17:37:37-05:00: Restarting running tasks in allocation "d4303a30" for group "web" +==> 2023-02-28T17:37:37-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Reschedule allocations instead of restarting them in-place. + +```shell-session +❯ nomad job restart -group=web -reschedule example +==> 2023-02-28T17:39:14-05:00: Restarting 2 allocations + 2023-02-28T17:39:14-05:00: Rescheduling allocation "388129e0" for group "web" + 2023-02-28T17:39:45-05:00: Rescheduling allocation "d4303a30" for group "web" +==> 2023-02-28T17:40:16-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Batch allocations to restart them 2 at a time. + +```shell-session +$ nomad job restart -batch-size=2 example +==> 2023-02-28T17:40:58-05:00: Restarting 5 allocations +==> 2023-02-28T17:40:58-05:00: Restarting 1st batch of 2 allocations + 2023-02-28T17:40:58-05:00: Restarting running tasks in allocation "653f983e" for group "web" + 2023-02-28T17:40:58-05:00: Restarting running tasks in allocation "4d18e545" for group "web" +==> 2023-02-28T17:40:58-05:00: Restarting 2nd batch of 2 allocations + 2023-02-28T17:40:58-05:00: Restarting running tasks in allocation "32e143f8" for group "proxy" + 2023-02-28T17:40:58-05:00: Restarting running tasks in allocation "4fd581ee" for group "proxy" +==> 2023-02-28T17:40:59-05:00: Restarting 3rd batch of 1 allocations + 2023-02-28T17:40:59-05:00: Restarting running tasks in allocation "77d5c4f6" for group "proxy" +==> 2023-02-28T17:40:59-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Batch allocations as a percentage of total running allocations. + +```shell-session +$ nomad job restart -batch-size=50% example +==> 2023-02-28T18:52:47-05:00: Restarting 5 allocations +==> 2023-02-28T18:52:47-05:00: Restarting 1st batch of 3 allocations + 2023-02-28T18:52:47-05:00: Restarting running tasks in allocation "d28f6f60" for group "proxy" + 2023-02-28T18:52:47-05:00: Restarting running tasks in allocation "b931b496" for group "proxy" + 2023-02-28T18:52:47-05:00: Restarting running tasks in allocation "18673b40" for group "proxy" +==> 2023-02-28T18:52:48-05:00: Restarting 2nd batch of 2 allocations + 2023-02-28T18:52:48-05:00: Restarting running tasks in allocation "439b1632" for group "web" + 2023-02-28T18:52:48-05:00: Restarting running tasks in allocation "8fae60f6" for group "web" +==> 2023-02-28T18:52:48-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Pause between batches of restart and wait for user input on how to proceed. + +```shell-session +$ nomad job restart -batch-size=2 -batch-wait=ask example +==> 2023-02-28T18:04:19-05:00: Restarting 5 allocations +==> 2023-02-28T18:04:19-05:00: Restarting 1st batch of 2 allocations + 2023-02-28T18:04:19-05:00: Restarting running tasks in allocation "4d18e545" for group "web" + 2023-02-28T18:04:19-05:00: Restarting running tasks in allocation "653f983e" for group "web" +==> 2023-02-28T18:04:19-05:00: Proceed with next batch? [Y/n/] y +==> 2023-02-28T18:04:20-05:00: Restarting 2nd batch of 2 allocations + 2023-02-28T18:04:20-05:00: Restarting running tasks in allocation "4fd581ee" for group "proxy" + 2023-02-28T18:04:20-05:00: Restarting running tasks in allocation "32e143f8" for group "proxy" +==> 2023-02-28T18:04:20-05:00: Proceed with next batch? [Y/n/] 10s +==> 2023-02-28T18:04:22-05:00: Proceeding restarts with new wait time of 10s +==> 2023-02-28T18:04:22-05:00: Waiting 10s before restarting the next batch +==> 2023-02-28T18:04:32-05:00: Restarting 3rd batch of 1 allocations + 2023-02-28T18:04:32-05:00: Restarting running tasks in allocation "77d5c4f6" for group "proxy" +==> 2023-02-28T18:04:32-05:00: Finished job restart + +All allocations restarted successfully! +``` + +Wait 10 seconds before each restart batch. + +```shell-session +$ nomad job restart -batch-size=2 -batch-wait=10s example +==> 2023-02-28T18:03:43-05:00: Restarting 5 allocations +==> 2023-02-28T18:03:43-05:00: Restarting 1st batch of 2 allocations + 2023-02-28T18:03:43-05:00: Restarting running tasks in allocation "653f983e" for group "web" + 2023-02-28T18:03:43-05:00: Restarting running tasks in allocation "4d18e545" for group "web" +==> 2023-02-28T18:03:43-05:00: Waiting 10s before restarting the next batch +==> 2023-02-28T18:03:53-05:00: Restarting 2nd batch of 2 allocations + 2023-02-28T18:03:53-05:00: Restarting running tasks in allocation "4fd581ee" for group "proxy" + 2023-02-28T18:03:53-05:00: Restarting running tasks in allocation "32e143f8" for group "proxy" +==> 2023-02-28T18:03:53-05:00: Waiting 10s before restarting the next batch +==> 2023-02-28T18:04:03-05:00: Restarting 3rd batch of 1 allocations + 2023-02-28T18:04:03-05:00: Restarting running tasks in allocation "77d5c4f6" for group "proxy" +==> 2023-02-28T18:04:03-05:00: Finished job restart + +All allocations restarted successfully! +``` + +[`lifecycle`]: /nomad/docs/job-specification/lifecycle +[`max_parallel`]: /nomad/docs/job-specification/update#max_parallel +[`shutdown_delay`]: /nomad/docs/job-specification/task#shutdown_delay +[`update`]: /nomad/docs/job-specification/update +[api_alloc_restart]: /nomad/api-docs/allocations#restart-allocation +[api_alloc_stop]: /nomad/api-docs/allocations#stop-allocation diff --git a/website/data/docs-nav-data.json b/website/data/docs-nav-data.json index e2d2a7e86f1..2b38a16c41f 100644 --- a/website/data/docs-nav-data.json +++ b/website/data/docs-nav-data.json @@ -542,6 +542,10 @@ "title": "promote", "path": "commands/job/promote" }, + { + "title": "restart", + "path": "commands/job/restart" + }, { "title": "revert", "path": "commands/job/revert"