From fde22d65221dff71c5ac0a5dc1824691b73d03bf Mon Sep 17 00:00:00 2001 From: Luiz Aoqui Date: Fri, 17 Mar 2023 17:08:17 -0400 Subject: [PATCH] cli: fix follow-up alloc monitoring The `job restart` command monitors the allocation that was stopped and its follow-up until it is running. But in some scenarios is may be possible for the follow-up allocation itself to be replaced, such as in case the new allocation fails. In this scenario the command needs to keep following the `NextAllocation` ID until it finds one that is running. --- command/job_restart.go | 79 +++++++++++++++++-------------------- command/job_restart_test.go | 64 ++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 43 deletions(-) diff --git a/command/job_restart.go b/command/job_restart.go index 1298787b253..a48495c46ed 100644 --- a/command/job_restart.go +++ b/command/job_restart.go @@ -920,7 +920,7 @@ func (c *JobRestartCommand) stopAlloc(alloc AllocationListStubWithJob) error { // Pass the LastIndex from the Stop() call to only monitor data that was // created after the Stop() call. go c.monitorPlacementFailures(ctx, alloc, resp.LastIndex, errCh) - go c.monitorReplacementAlloc(ctx, alloc, resp.LastIndex, errCh) + go c.monitorReplacementAlloc(ctx, alloc, errCh) // This process may take a while, so ping user from time to time to // indicate the command is still alive. @@ -1000,12 +1000,11 @@ func (c *JobRestartCommand) monitorPlacementFailures( // allocation is running. func (c *JobRestartCommand) monitorReplacementAlloc( ctx context.Context, - alloc AllocationListStubWithJob, - index uint64, + allocStub AllocationListStubWithJob, errCh chan<- error, ) { - q := &api.QueryOptions{WaitIndex: index} - var nextAllocID string + currentAllocID := allocStub.ID + q := &api.QueryOptions{WaitIndex: 1} for { select { case <-ctx.Done(): @@ -1013,62 +1012,56 @@ func (c *JobRestartCommand) monitorReplacementAlloc( default: } - oldAlloc, qm, err := c.client.Allocations().Info(alloc.ID, q) + alloc, qm, err := c.client.Allocations().Info(currentAllocID, q) if err != nil { errCh <- fmt.Errorf("Failed to retrieve allocation %q: %w", limit(alloc.ID, c.length), err) return } - if oldAlloc.NextAllocation != "" { - nextAllocID = oldAlloc.NextAllocation - break - } - q.WaitIndex = qm.LastIndex - } - c.Ui.Output(fmt.Sprintf( - " %s: Allocation %q replaced by %[3]q, waiting for %[3]q to start running", - formatTime(time.Now()), - limit(alloc.ID, c.length), - limit(nextAllocID, c.length), - )) - - // Reset the blocking query to the initial index because old allocation - // update may happen after the new allocation transitioned to "running". - q = &api.QueryOptions{WaitIndex: index} - for { - select { - case <-ctx.Done(): - return - default: - } + // Follow replacement allocations. We expect the original allocation to + // be replaced, but the replacements may be themselves replaced in + // cases such as the allocation failing. + if alloc.NextAllocation != "" { + c.Ui.Output(fmt.Sprintf( + " %s: Allocation %q replaced by %[3]q, waiting for %[3]q to start running", + formatTime(time.Now()), + limit(alloc.ID, c.length), + limit(alloc.NextAllocation, c.length), + )) + currentAllocID = alloc.NextAllocation - newAlloc, qm, err := c.client.Allocations().Info(nextAllocID, q) - if err != nil { - errCh <- fmt.Errorf("Failed to retrieve replacement allocation %q: %w", limit(nextAllocID, c.length), err) - return + // Reset the blocking query so the Info() API call returns the new + // allocation immediately. + q.WaitIndex = 1 + continue } - switch newAlloc.ClientStatus { + switch alloc.ClientStatus { case api.AllocClientStatusRunning: - c.Ui.Output(fmt.Sprintf( - " %s: Allocation %q is %q", - formatTime(time.Now()), - limit(newAlloc.ID, c.length), - newAlloc.ClientStatus, - )) - errCh <- nil - return + // Make sure the running allocation we found is a replacement, not + // the original one. + if alloc.ID != allocStub.ID { + c.Ui.Output(fmt.Sprintf( + " %s: Allocation %q is %q", + formatTime(time.Now()), + limit(alloc.ID, c.length), + alloc.ClientStatus, + )) + errCh <- nil + return + } default: if c.verbose { c.Ui.Output(c.Colorize().Color(fmt.Sprintf( "[dark_gray] %s: Allocation %q is %q[reset]", formatTime(time.Now()), - limit(newAlloc.ID, c.length), - newAlloc.ClientStatus, + limit(alloc.ID, c.length), + alloc.ClientStatus, ))) } } + q.WaitIndex = qm.LastIndex } } diff --git a/command/job_restart_test.go b/command/job_restart_test.go index bafd10ca5fb..0087db9c3a0 100644 --- a/command/job_restart_test.go +++ b/command/job_restart_test.go @@ -1,6 +1,7 @@ package command import ( + "context" "fmt" "net/http" "net/http/httptest" @@ -22,6 +23,7 @@ import ( "github.com/mitchellh/cli" "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" ) func TestJobRestartCommand_Implements(t *testing.T) { @@ -836,6 +838,68 @@ func TestJobRestartCommand_rescheduleFail(t *testing.T) { must.StrContains(t, ui.ErrorWriter.String(), "No nodes were eligible for evaluation") } +func TestJobRestartCommand_monitorReplacementAlloc(t *testing.T) { + ci.Parallel(t) + + ui := cli.NewMockUi() + cmd := &JobRestartCommand{Meta: Meta{Ui: ui}} + + srv, client, _ := testServer(t, true, nil) + defer srv.Shutdown() + waitForNodes(t, client) + + // Register test job and update it twice so we end up with three + // allocations, one replacing the next one. + jobID := "test_job_restart_monitor_replacement" + job := testNomadServiceJob(jobID) + + for i := 1; i <= 3; i++ { + job.TaskGroups[0].Tasks[0].Config["run_for"] = fmt.Sprintf("%ds", i) + resp, _, err := client.Jobs().Register(job, nil) + must.NoError(t, err) + + code := waitForSuccess(ui, client, fullId, t, resp.EvalID) + must.Zero(t, code) + } + ui.OutputWriter.Reset() + + // Prepare the command internals. We want to run a specific function and + // target a specific allocation, so we can't run the full command. + cmd.client = client + cmd.verbose = true + cmd.length = fullId + + // Fetch, sort, and monitor the oldest allocation. + allocs, _, err := client.Jobs().Allocations(jobID, true, nil) + must.NoError(t, err) + sort.Slice(allocs, func(i, j int) bool { + return allocs[i].CreateIndex < allocs[j].CreateIndex + }) + + errCh := make(chan error) + go cmd.monitorReplacementAlloc(context.Background(), AllocationListStubWithJob{ + AllocationListStub: allocs[0], + Job: job, + }, errCh) + + // Make sure the command doesn't get stuck and that we traverse the + // follow-up allocations properly. + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + select { + case err := <-errCh: + return err + default: + return fmt.Errorf("waiting for response") + } + }), + wait.Timeout(time.Duration(testutil.TestMultiplier()*3)*time.Second), + )) + must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q replaced by %q", allocs[0].ID, allocs[1].ID)) + must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q replaced by %q", allocs[1].ID, allocs[2].ID)) + must.StrContains(t, ui.OutputWriter.String(), fmt.Sprintf("%q is %q", allocs[2].ID, api.AllocClientStatusRunning)) +} + func TestJobRestartCommand_ACL(t *testing.T) { ci.Parallel(t)