Skip to content

Commit

Permalink
cli: fix follow-up alloc monitoring
Browse files Browse the repository at this point in the history
The `job restart` command monitors the allocation that was stopped and
its follow-up until it is running. But in some scenarios is may be
possible for the follow-up allocation itself to be replaced, such as in
case the new allocation fails.

In this scenario the command needs to keep following the
`NextAllocation` ID until it finds one that is running.
  • Loading branch information
lgfa29 committed Mar 17, 2023
1 parent 810800b commit fde22d6
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 43 deletions.
79 changes: 36 additions & 43 deletions command/job_restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -920,7 +920,7 @@ func (c *JobRestartCommand) stopAlloc(alloc AllocationListStubWithJob) error {
// Pass the LastIndex from the Stop() call to only monitor data that was
// created after the Stop() call.
go c.monitorPlacementFailures(ctx, alloc, resp.LastIndex, errCh)
go c.monitorReplacementAlloc(ctx, alloc, resp.LastIndex, errCh)
go c.monitorReplacementAlloc(ctx, alloc, errCh)

// This process may take a while, so ping user from time to time to
// indicate the command is still alive.
Expand Down Expand Up @@ -1000,75 +1000,68 @@ func (c *JobRestartCommand) monitorPlacementFailures(
// allocation is running.
func (c *JobRestartCommand) monitorReplacementAlloc(
ctx context.Context,
alloc AllocationListStubWithJob,
index uint64,
allocStub AllocationListStubWithJob,
errCh chan<- error,
) {
q := &api.QueryOptions{WaitIndex: index}
var nextAllocID string
currentAllocID := allocStub.ID
q := &api.QueryOptions{WaitIndex: 1}
for {
select {
case <-ctx.Done():
return
default:
}

oldAlloc, qm, err := c.client.Allocations().Info(alloc.ID, q)
alloc, qm, err := c.client.Allocations().Info(currentAllocID, q)
if err != nil {
errCh <- fmt.Errorf("Failed to retrieve allocation %q: %w", limit(alloc.ID, c.length), err)
return
}
if oldAlloc.NextAllocation != "" {
nextAllocID = oldAlloc.NextAllocation
break
}
q.WaitIndex = qm.LastIndex
}

c.Ui.Output(fmt.Sprintf(
" %s: Allocation %q replaced by %[3]q, waiting for %[3]q to start running",
formatTime(time.Now()),
limit(alloc.ID, c.length),
limit(nextAllocID, c.length),
))

// Reset the blocking query to the initial index because old allocation
// update may happen after the new allocation transitioned to "running".
q = &api.QueryOptions{WaitIndex: index}
for {
select {
case <-ctx.Done():
return
default:
}
// Follow replacement allocations. We expect the original allocation to
// be replaced, but the replacements may be themselves replaced in
// cases such as the allocation failing.
if alloc.NextAllocation != "" {
c.Ui.Output(fmt.Sprintf(
" %s: Allocation %q replaced by %[3]q, waiting for %[3]q to start running",
formatTime(time.Now()),
limit(alloc.ID, c.length),
limit(alloc.NextAllocation, c.length),
))
currentAllocID = alloc.NextAllocation

newAlloc, qm, err := c.client.Allocations().Info(nextAllocID, q)
if err != nil {
errCh <- fmt.Errorf("Failed to retrieve replacement allocation %q: %w", limit(nextAllocID, c.length), err)
return
// Reset the blocking query so the Info() API call returns the new
// allocation immediately.
q.WaitIndex = 1
continue
}

switch newAlloc.ClientStatus {
switch alloc.ClientStatus {
case api.AllocClientStatusRunning:
c.Ui.Output(fmt.Sprintf(
" %s: Allocation %q is %q",
formatTime(time.Now()),
limit(newAlloc.ID, c.length),
newAlloc.ClientStatus,
))
errCh <- nil
return
// Make sure the running allocation we found is a replacement, not
// the original one.
if alloc.ID != allocStub.ID {
c.Ui.Output(fmt.Sprintf(
" %s: Allocation %q is %q",
formatTime(time.Now()),
limit(alloc.ID, c.length),
alloc.ClientStatus,
))
errCh <- nil
return
}

default:
if c.verbose {
c.Ui.Output(c.Colorize().Color(fmt.Sprintf(
"[dark_gray] %s: Allocation %q is %q[reset]",
formatTime(time.Now()),
limit(newAlloc.ID, c.length),
newAlloc.ClientStatus,
limit(alloc.ID, c.length),
alloc.ClientStatus,
)))
}
}

q.WaitIndex = qm.LastIndex
}
}
Expand Down
64 changes: 64 additions & 0 deletions command/job_restart_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package command

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
Expand All @@ -22,6 +23,7 @@ import (
"github.com/mitchellh/cli"

"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
)

func TestJobRestartCommand_Implements(t *testing.T) {
Expand Down Expand Up @@ -836,6 +838,68 @@ func TestJobRestartCommand_rescheduleFail(t *testing.T) {
must.StrContains(t, ui.ErrorWriter.String(), "No nodes were eligible for evaluation")
}

func TestJobRestartCommand_monitorReplacementAlloc(t *testing.T) {
ci.Parallel(t)

ui := cli.NewMockUi()
cmd := &JobRestartCommand{Meta: Meta{Ui: ui}}

srv, client, _ := testServer(t, true, nil)
defer srv.Shutdown()
waitForNodes(t, client)

// Register test job and update it twice so we end up with three
// allocations, one replacing the next one.
jobID := "test_job_restart_monitor_replacement"
job := testNomadServiceJob(jobID)

for i := 1; i <= 3; i++ {
job.TaskGroups[0].Tasks[0].Config["run_for"] = fmt.Sprintf("%ds", i)
resp, _, err := client.Jobs().Register(job, nil)
must.NoError(t, err)

code := waitForSuccess(ui, client, fullId, t, resp.EvalID)
must.Zero(t, code)
}
ui.OutputWriter.Reset()

// Prepare the command internals. We want to run a specific function and
// target a specific allocation, so we can't run the full command.
cmd.client = client
cmd.verbose = true
cmd.length = fullId

// Fetch, sort, and monitor the oldest allocation.
allocs, _, err := client.Jobs().Allocations(jobID, true, nil)
must.NoError(t, err)
sort.Slice(allocs, func(i, j int) bool {
return allocs[i].CreateIndex < allocs[j].CreateIndex
})

errCh := make(chan error)
go cmd.monitorReplacementAlloc(context.Background(), AllocationListStubWithJob{
AllocationListStub: allocs[0],
Job: job,
}, errCh)

// Make sure the command doesn't get stuck and that we traverse the
// follow-up allocations properly.
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
select {
case err := <-errCh:
return err
default:
return fmt.Errorf("waiting for response")
}
}),
wait.Timeout(time.Duration(testutil.TestMultiplier()*3)*time.Second),
))
must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q replaced by %q", allocs[0].ID, allocs[1].ID))
must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q replaced by %q", allocs[1].ID, allocs[2].ID))
must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q is %q", allocs[2].ID, api.AllocClientStatusRunning))
}

func TestJobRestartCommand_ACL(t *testing.T) {
ci.Parallel(t)

Expand Down

0 comments on commit fde22d6

Please sign in to comment.