Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

command/job_stop: accept multiple jobs, stop concurrently #12582

Merged
merged 3 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/12582.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
cli: `nomad job stop` can be used to stop multiple jobs concurrently.
```
238 changes: 145 additions & 93 deletions command/job_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package command
import (
"fmt"
"strings"
"sync"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/api/contexts"
Expand Down Expand Up @@ -118,20 +119,18 @@ func (c *JobStopCommand) Run(args []string) int {
return 1
}

// Truncate the id unless full length is requested
length := shortId
if verbose {
length = fullId
}

// Check that we got exactly one job
args = flags.Args()
if len(args) != 1 {
c.Ui.Error("This command takes one argument: <job>")
if len(args) < 1 {
c.Ui.Error("This command takes at least one argument: <job>")
c.Ui.Error(commandErrorText(c))
return 1
}
jobID := strings.TrimSpace(args[0])

var jobIDs []string
for _, jobID := range flags.Args() {
jobIDs = append(jobIDs, strings.TrimSpace(jobID))
}

// Get the HTTP client
client, err := c.Meta.Client()
Expand All @@ -140,92 +139,145 @@ func (c *JobStopCommand) Run(args []string) int {
return 1
}

// Check if the job exists
jobs, _, err := client.Jobs().PrefixList(jobID)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err))
return 1
}
if len(jobs) == 0 {
c.Ui.Error(fmt.Sprintf("No job(s) with prefix or id %q found", jobID))
return 1
}
if len(jobs) > 1 {
if (jobID != jobs[0].ID) || (c.allNamespaces() && jobs[0].ID == jobs[1].ID) {
c.Ui.Error(fmt.Sprintf("Prefix matched multiple jobs\n\n%s", createStatusListOutput(jobs, c.allNamespaces())))
return 1
}
}

// Prefix lookup matched a single job
q := &api.QueryOptions{Namespace: jobs[0].JobSummary.Namespace}
job, _, err := client.Jobs().Info(jobs[0].ID, q)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err))
return 1
}

getConfirmation := func(question string) (int, bool) {
answer, err := c.Ui.Ask(question)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse answer: %v", err))
return 1, false
}

if answer == "" || strings.ToLower(answer)[0] == 'n' {
// No case
c.Ui.Output("Cancelling job stop")
return 0, false
} else if strings.ToLower(answer)[0] == 'y' && len(answer) > 1 {
// Non exact match yes
c.Ui.Output("For confirmation, an exact ‘y’ is required.")
return 0, false
} else if answer != "y" {
c.Ui.Output("No confirmation detected. For confirmation, an exact 'y' is required.")
return 1, false
}
return 0, true
statusCh := make(chan int, len(jobIDs))

var wg sync.WaitGroup
for _, jobID := range jobIDs {
jobID := jobID

wg.Add(1)
go func() {
danishprakash marked this conversation as resolved.
Show resolved Hide resolved
defer wg.Done()

// Truncate the id unless full length is requested
length := shortId
if verbose {
length = fullId
}

// Check if the job exists
jobs, _, err := client.Jobs().PrefixList(jobID)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error finding jobs with prefix: %s err: %s", jobID, err))
statusCh <- 1
return
}
if len(jobs) == 0 {
c.Ui.Error(fmt.Sprintf("No job(s) with prefix or id %q found", jobID))
danishprakash marked this conversation as resolved.
Show resolved Hide resolved
statusCh <- 1
return
danishprakash marked this conversation as resolved.
Show resolved Hide resolved
}
if len(jobs) > 1 {
if (jobID != jobs[0].ID) || (c.allNamespaces() && jobs[0].ID == jobs[1].ID) {
c.Ui.Error(fmt.Sprintf("Prefix %q matched multiple jobs\n\n%s", jobID, createStatusListOutput(jobs, c.allNamespaces())))
statusCh <- 1
return
}
}

// Prefix lookup matched a single job
q := &api.QueryOptions{Namespace: jobs[0].JobSummary.Namespace}
job, _, err := client.Jobs().Info(jobs[0].ID, q)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job with id %s err: %s", jobID, err))
statusCh <- 1
return
}

getConfirmation := func(question string) (int, bool) {
answer, err := c.Ui.Ask(question)
if err != nil {
c.Ui.Error(fmt.Sprintf("Failed to parse answer: %v", err))
return 1, false
}

if answer == "" || strings.ToLower(answer)[0] == 'n' {
// No case
c.Ui.Output("Cancelling job stop")
return 0, false
} else if strings.ToLower(answer)[0] == 'y' && len(answer) > 1 {
// Non exact match yes
c.Ui.Output("For confirmation, an exact ‘y’ is required.")
return 0, false
} else if answer != "y" {
c.Ui.Output("No confirmation detected. For confirmation, an exact 'y' is required.")
return 1, false
}
return 0, true
}

// Confirm the stop if the job was a prefix match
// Ask for confirmation only when there's just one
// job that needs to be stopped. Since we're stopping
// jobs concurrently, we're going to skip confirmation
// for when multiple jobs need to be stopped.
if len(jobIDs) == 1 && jobID != *job.ID && !autoYes {
question := fmt.Sprintf("Are you sure you want to stop job %q? [y/N]", *job.ID)
code, confirmed := getConfirmation(question)
if !confirmed {
statusCh <- code
return
}
}

// Confirm we want to stop only a single region of a multiregion job
if len(jobIDs) == 1 && job.IsMultiregion() && !global && !autoYes {
question := fmt.Sprintf(
"Are you sure you want to stop multi-region job %q in a single region? [y/N]", *job.ID)
code, confirmed := getConfirmation(question)
if !confirmed {
statusCh <- code
return
}
}

// Invoke the stop
opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay}
wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace}
evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job with id %s err: %s", jobID, err))
statusCh <- 1
return
}

// If we are stopping a periodic job there won't be an evalID.
if evalID == "" {
statusCh <- 0
return
}

// Goroutine won't wait on monitor
if detach {
c.Ui.Output(evalID)
statusCh <- 0
return
}

// Start monitoring the stop eval
// and return result on status channel
mon := newMonitor(c.Ui, client, length)
statusCh <- mon.monitor(evalID)
}()
}

// Confirm the stop if the job was a prefix match
if jobID != *job.ID && !autoYes {
question := fmt.Sprintf("Are you sure you want to stop job %q? [y/N]", *job.ID)
code, confirmed := getConfirmation(question)
if !confirmed {
return code
}
}

// Confirm we want to stop only a single region of a multiregion job
if job.IsMultiregion() && !global {
question := fmt.Sprintf(
"Are you sure you want to stop multi-region job %q in a single region? [y/N]", *job.ID)
code, confirmed := getConfirmation(question)
if !confirmed {
return code
// users will still see
// errors if any while we
// wait for the goroutines
// to finish processing
wg.Wait()

// close the channel to ensure
// the range statement below
// doesn't go on indefinitely
close(statusCh)

// return a non-zero exit code
// if even a single job stop fails
for status := range statusCh {
danishprakash marked this conversation as resolved.
Show resolved Hide resolved
if status != 0 {
return status
}
}

// Invoke the stop
opts := &api.DeregisterOptions{Purge: purge, Global: global, EvalPriority: evalPriority, NoShutdownDelay: noShutdownDelay}
wq := &api.WriteOptions{Namespace: jobs[0].JobSummary.Namespace}
evalID, _, err := client.Jobs().DeregisterOpts(*job.ID, opts, wq)
if err != nil {
c.Ui.Error(fmt.Sprintf("Error deregistering job: %s", err))
return 1
}

// If we are stopping a periodic job there won't be an evalID.
if evalID == "" {
return 0
}

if detach {
c.Ui.Output(evalID)
return 0
}

// Start monitoring the stop eval
mon := newMonitor(c.Ui, client, length)
return mon.monitor(evalID)
return 0
}
65 changes: 65 additions & 0 deletions command/job_stop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,85 @@ package command
import (
"strings"
"testing"
"time"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/command/agent"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/mitchellh/cli"
"github.com/posener/complete"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStopCommand_Implements(t *testing.T) {
ci.Parallel(t)
var _ cli.Command = &JobStopCommand{}
}

func TestStopCommand_JSON(t *testing.T) {
danishprakash marked this conversation as resolved.
Show resolved Hide resolved
ci.Parallel(t)
ui := cli.NewMockUi()
stop := func(args ...string) (stdout string, stderr string, code int) {
cmd := &JobStopCommand{
Meta: Meta{Ui: ui},
}
t.Logf("run: nomad job stop %s", strings.Join(args, " "))
code = cmd.Run(args)
return ui.OutputWriter.String(), ui.ErrorWriter.String(), code
}

// Agent startup is slow, do some work while we wait
agentReady := make(chan string)
var srv *agent.TestAgent
var client *api.Client
go func() {
var addr string
srv, client, addr = testServer(t, false, nil)
agentReady <- addr
}()
defer srv.Shutdown()

// Wait for agent to start and get its address
select {
case <-agentReady:
case <-time.After(20 * time.Second):
t.Fatalf("timed out waiting for agent to start")
}

// create and run 10 jobs
jobIDs := make([]string, 10)
for i := 0; i < 10; i++ {
jobID := uuid.Generate()
jobIDs = append(jobIDs, jobID)

job := testJob(jobID)
job.TaskGroups[0].Tasks[0].Config = map[string]interface{}{
"run_for": "30s",
}

resp, _, err := client.Jobs().Register(job, nil)
if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 {
t.Fatalf("[DEBUG] waiting for job to register; status code non zero saw %d", code)
}

require.NoError(t, err)
}

// stop all jobs
var args []string
args = append(args, "-detach")
args = append(args, jobIDs...)
stdout, stderr, code := stop(args...)
t.Logf("[DEBUG] run: nomad job stop stdout/stderr: %s/%s", stdout, stderr)
require.Zero(t, code)
require.Empty(t, stderr)

danishprakash marked this conversation as resolved.
Show resolved Hide resolved
}

func TestStopCommand_Fails(t *testing.T) {
ci.Parallel(t)
srv, _, url := testServer(t, false, nil)
Expand Down
Loading