From 37ebb36ac2ce3f69428ccd7784696c0748156d3b Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Mon, 1 Apr 2019 16:21:03 +0200 Subject: [PATCH] allocs: Add nomad alloc stop This adds a `nomad alloc stop` command that can be used to stop and force migrate an allocation to a different node. This is built on top of the AllocUpdateDesiredTransitionRequest and explicitly limits the scope of access to that transition to expose it under the alloc-lifecycle ACL. The API returns the follow up eval that can be used as part of monitoring in the CLI or parsed and used in an external tool. --- api/allocations.go | 14 +++ command/agent/alloc_endpoint.go | 40 ++++++++- command/agent/alloc_endpoint_test.go | 26 ++++++ command/alloc_stop.go | 128 +++++++++++++++++++++++++++ command/alloc_stop_test.go | 112 +++++++++++++++++++++++ command/commands.go | 5 ++ nomad/alloc_endpoint.go | 59 ++++++++++++ nomad/alloc_endpoint_test.go | 62 +++++++++++++ nomad/structs/structs.go | 16 ++++ scheduler/generic_sched.go | 1 + scheduler/system_sched.go | 2 +- 11 files changed, 462 insertions(+), 3 deletions(-) create mode 100644 command/alloc_stop.go create mode 100644 command/alloc_stop_test.go diff --git a/api/allocations.go b/api/allocations.go index 109174f0752..682cdec9430 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -89,6 +89,20 @@ func (a *Allocations) Restart(alloc *Allocation, taskName string, q *QueryOption return err } +func (a *Allocations) Stop(alloc *Allocation, q *QueryOptions) (*AllocStopResponse, error) { + var resp AllocStopResponse + _, err := a.client.putQuery("/v1/allocation/"+alloc.ID+"/stop", nil, &resp, q) + return &resp, err +} + +// AllocStopResponse is the response to an `AllocStopRequest` +type AllocStopResponse struct { + // EvalID is the id of the follow up evalution for the rescheduled alloc. + EvalID string + + WriteMeta +} + // Allocation is used for serialization of allocations. type Allocation struct { ID string diff --git a/command/agent/alloc_endpoint.go b/command/agent/alloc_endpoint.go index aab13c54583..d6d624365ea 100644 --- a/command/agent/alloc_endpoint.go +++ b/command/agent/alloc_endpoint.go @@ -42,7 +42,29 @@ func (s *HTTPServer) AllocsRequest(resp http.ResponseWriter, req *http.Request) } func (s *HTTPServer) AllocSpecificRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { - allocID := strings.TrimPrefix(req.URL.Path, "/v1/allocation/") + reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/allocation/") + + // tokenize the suffix of the path to get the alloc id and find the action + // invoked on the alloc id + tokens := strings.Split(reqSuffix, "/") + if len(tokens) > 2 || len(tokens) < 1 { + return nil, CodedError(404, resourceNotFoundErr) + } + allocID := tokens[0] + + if len(tokens) == 1 { + return s.allocGet(allocID, resp, req) + } + + switch tokens[1] { + case "stop": + return s.allocStop(allocID, resp, req) + } + + return nil, CodedError(404, resourceNotFoundErr) +} + +func (s *HTTPServer) allocGet(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { if req.Method != "GET" { return nil, CodedError(405, ErrInvalidMethod) } @@ -79,8 +101,22 @@ func (s *HTTPServer) AllocSpecificRequest(resp http.ResponseWriter, req *http.Re return alloc, nil } -func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { +func (s *HTTPServer) allocStop(allocID string, resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if !(req.Method == "POST" || req.Method == "PUT") { + return nil, CodedError(405, ErrInvalidMethod) + } + sr := &structs.AllocStopRequest{ + AllocID: allocID, + } + s.parseWriteRequest(req, &sr.WriteRequest) + + var out structs.AllocStopResponse + err := s.agent.RPC("Alloc.Stop", &sr, &out) + return &out, err +} + +func (s *HTTPServer) ClientAllocRequest(resp http.ResponseWriter, req *http.Request) (interface{}, error) { reqSuffix := strings.TrimPrefix(req.URL.Path, "/v1/client/allocation/") // tokenize the suffix of the path to get the alloc id and find the action diff --git a/command/agent/alloc_endpoint_test.go b/command/agent/alloc_endpoint_test.go index 457f6d9ee4a..2fed04a4291 100644 --- a/command/agent/alloc_endpoint_test.go +++ b/command/agent/alloc_endpoint_test.go @@ -394,6 +394,32 @@ func TestHTTP_AllocRestart_ACL(t *testing.T) { }) } +func TestHTTP_AllocStop(t *testing.T) { + t.Parallel() + httpTest(t, nil, func(s *TestAgent) { + // Directly manipulate the state + state := s.Agent.server.State() + alloc := mock.Alloc() + require := require.New(t) + require.NoError(state.UpsertJobSummary(999, mock.JobSummary(alloc.JobID))) + + require.NoError(state.UpsertAllocs(1000, []*structs.Allocation{alloc})) + + // Make the HTTP request + req, err := http.NewRequest("POST", "/v1/allocation/"+alloc.ID+"/stop", nil) + require.NoError(err) + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.AllocSpecificRequest(respW, req) + require.NoError(err) + + a := obj.(*structs.AllocStopResponse) + require.NotEmpty(a.EvalID, "missing eval") + require.NotEmpty(a.Index, "missing index") + }) +} + func TestHTTP_AllocStats(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/command/alloc_stop.go b/command/alloc_stop.go new file mode 100644 index 00000000000..a44842f69df --- /dev/null +++ b/command/alloc_stop.go @@ -0,0 +1,128 @@ +package command + +import ( + "fmt" + "strings" +) + +type AllocStopCommand struct { + Meta +} + +func (a *AllocStopCommand) Help() string { + helpText := ` +Usage: nomad alloc stop [options] +Alias: nomad stop + + stop an existing allocation. This command is used to signal a specific alloc + to shut down. When the allocation has been shut down, it will then be + rescheduled. An interactive monitoring session will display log lines as the + allocation completes shutting down. It is safe to exit the monitor early with + ctrl-c. + +General Options: + + ` + generalOptionsUsage() + ` + +Stop Specific Options: + + -detach + Return immediately instead of entering monitor mode. After the + stop command is submitted, a new evaluation ID is printed to the + screen, which can be used to examine the rescheduling evaluation using the + eval-status command. + + -verbose + Show full information. +` + return strings.TrimSpace(helpText) +} + +func (c *AllocStopCommand) Name() string { return "alloc stop" } + +func (c *AllocStopCommand) Run(args []string) int { + var detach, verbose bool + + flags := c.Meta.FlagSet(c.Name(), FlagSetClient) + flags.Usage = func() { c.Ui.Output(c.Help()) } + flags.BoolVar(&detach, "detach", false, "") + flags.BoolVar(&verbose, "verbose", false, "") + + if err := flags.Parse(args); err != nil { + return 1 + } + + // Check that we got exactly one alloc + args = flags.Args() + if len(args) != 1 { + c.Ui.Error("This command takes one argument: ") + c.Ui.Error(commandErrorText(c)) + return 1 + } + + allocID := args[0] + + // Truncate the id unless full length is requested + length := shortId + if verbose { + length = fullId + } + + // Query the allocation info + if len(allocID) == 1 { + c.Ui.Error(fmt.Sprintf("Alloc ID must contain at least two characters.")) + return 1 + } + + allocID = sanitizeUUIDPrefix(allocID) + + // Get the HTTP client + client, err := c.Meta.Client() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error initializing client: %s", err)) + return 1 + } + + allocs, _, err := client.Allocations().PrefixList(allocID) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying allocation: %v", err)) + return 1 + } + + if len(allocs) == 0 { + c.Ui.Error(fmt.Sprintf("No allocation(s) with prefix or id %q found", allocID)) + return 1 + } + + if len(allocs) > 1 { + // Format the allocs + out := formatAllocListStubs(allocs, verbose, length) + c.Ui.Error(fmt.Sprintf("Prefix matched multiple allocations\n\n%s", out)) + return 1 + } + + // Prefix lookup matched a single allocation + alloc, _, err := client.Allocations().Info(allocs[0].ID, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying allocation: %s", err)) + return 1 + } + + resp, err := client.Allocations().Stop(alloc, nil) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error stopping allocation: %s", err)) + return 1 + } + + if detach { + c.Ui.Output(resp.EvalID) + return 0 + } + + mon := newMonitor(c.Ui, client, length) + return mon.monitor(resp.EvalID, false) +} + +func (a *AllocStopCommand) Synopsis() string { + return "Stop and reschedule a running allocation" +} diff --git a/command/alloc_stop_test.go b/command/alloc_stop_test.go new file mode 100644 index 00000000000..f46532ef86f --- /dev/null +++ b/command/alloc_stop_test.go @@ -0,0 +1,112 @@ +package command + +import ( + "fmt" + "testing" + + "github.com/hashicorp/nomad/api" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/mitchellh/cli" + "github.com/stretchr/testify/require" +) + +func TestAllocStopCommand_Implements(t *testing.T) { + t.Parallel() + var _ cli.Command = &AllocStopCommand{} +} + +func TestAllocStop_Fails(t *testing.T) { + srv, _, url := testServer(t, false, nil) + defer srv.Shutdown() + + require := require.New(t) + ui := new(cli.MockUi) + cmd := &AllocStopCommand{Meta: Meta{Ui: ui}} + + // Fails on misuse + require.Equal(cmd.Run([]string{"some", "garbage", "args"}), 1, "Expected failure") + require.Contains(ui.ErrorWriter.String(), commandErrorText(cmd), "Expected help output") + ui.ErrorWriter.Reset() + + // Fails on connection failure + require.Equal(cmd.Run([]string{"-address=nope", "foobar"}), 1, "expected failure") + require.Contains(ui.ErrorWriter.String(), "Error querying allocation") + ui.ErrorWriter.Reset() + + // Fails on missing alloc + require.Equal(cmd.Run([]string{"-address=" + url, "26470238-5CF2-438F-8772-DC67CFB0705C"}), 1) + require.Contains(ui.ErrorWriter.String(), "No allocation(s) with prefix or id") + ui.ErrorWriter.Reset() + + // Fail on identifier with too few characters + require.Equal(cmd.Run([]string{"-address=" + url, "2"}), 1) + require.Contains(ui.ErrorWriter.String(), "must contain at least two characters") + ui.ErrorWriter.Reset() + + // Identifiers with uneven length should produce a query result + require.Equal(cmd.Run([]string{"-address=" + url, "123"}), 1) + require.Contains(ui.ErrorWriter.String(), "No allocation(s) with prefix or id") + ui.ErrorWriter.Reset() +} + +func TestAllocStop_Run(t *testing.T) { + srv, client, url := testServer(t, true, nil) + defer srv.Shutdown() + + require := require.New(t) + + // Wait for a node to be ready + testutil.WaitForResult(func() (bool, error) { + nodes, _, err := client.Nodes().List(nil) + if err != nil { + return false, err + } + for _, node := range nodes { + if _, ok := node.Drivers["mock_driver"]; ok && + node.Status == structs.NodeStatusReady { + return true, nil + } + } + return false, fmt.Errorf("no ready nodes") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + ui := new(cli.MockUi) + cmd := &AllocStopCommand{Meta: Meta{Ui: ui}} + + jobID := "job1_sfx" + job1 := testJob(jobID) + resp, _, err := client.Jobs().Register(job1, nil) + require.NoError(err) + if code := waitForSuccess(ui, client, fullId, t, resp.EvalID); code != 0 { + t.Fatalf("status code non zero saw %d", code) + } + // get an alloc id + allocId1 := "" + if allocs, _, err := client.Jobs().Allocations(jobID, false, nil); err == nil { + if len(allocs) > 0 { + allocId1 = allocs[0].ID + } + } + require.NotEmpty(allocId1, "unable to find allocation") + + // Wait for alloc to be running + testutil.WaitForResult(func() (bool, error) { + alloc, _, err := client.Allocations().Info(allocId1, nil) + if err != nil { + return false, err + } + if alloc.ClientStatus == api.AllocClientStatusRunning { + return true, nil + } + return false, fmt.Errorf("alloc is not running, is: %s", alloc.ClientStatus) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + require.Equal(cmd.Run([]string{"-address=" + url, allocId1}), 0, "expected successful exit code") + + ui.OutputWriter.Reset() +} diff --git a/command/commands.go b/command/commands.go index 29be6200bdf..550302d49d0 100644 --- a/command/commands.go +++ b/command/commands.go @@ -140,6 +140,11 @@ func Commands(metaPtr *Meta, agentUi cli.Ui) map[string]cli.CommandFactory { Meta: meta, }, nil }, + "alloc stop": func() (cli.Command, error) { + return &AllocStopCommand{ + Meta: meta, + }, nil + }, "alloc fs": func() (cli.Command, error) { return &AllocFSCommand{ Meta: meta, diff --git a/nomad/alloc_endpoint.go b/nomad/alloc_endpoint.go index 643fa2bf878..491541b32c1 100644 --- a/nomad/alloc_endpoint.go +++ b/nomad/alloc_endpoint.go @@ -10,6 +10,8 @@ import ( multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/acl" + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -205,6 +207,63 @@ func (a *Alloc) GetAllocs(args *structs.AllocsGetRequest, return a.srv.blockingRPC(&opts) } +// Stop is used to stop an allocation and migrate it to another node. +func (a *Alloc) Stop(args *structs.AllocStopRequest, reply *structs.AllocStopResponse) error { + if done, err := a.srv.forward("Alloc.Stop", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "alloc", "stop"}, time.Now()) + + // Check that it is a management token. + if aclObj, err := a.srv.ResolveToken(args.AuthToken); err != nil { + return err + } else if aclObj != nil && !aclObj.AllowNsOp(args.Namespace, acl.NamespaceCapabilityAllocLifecycle) { + return structs.ErrPermissionDenied + } + + if args.AllocID == "" { + return fmt.Errorf("must provide an alloc id") + } + + ws := memdb.NewWatchSet() + alloc, err := a.srv.State().AllocByID(ws, args.AllocID) + if err != nil { + return err + } + + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: alloc.Namespace, + Priority: alloc.Job.Priority, + Type: alloc.Job.Type, + TriggeredBy: structs.EvalTriggerAllocStop, + JobID: alloc.Job.ID, + JobModifyIndex: alloc.Job.ModifyIndex, + Status: structs.EvalStatusPending, + } + + transitionReq := &structs.AllocUpdateDesiredTransitionRequest{ + Evals: []*structs.Evaluation{eval}, + Allocs: map[string]*structs.DesiredTransition{ + args.AllocID: { + Migrate: helper.BoolToPtr(true), + }, + }, + } + + // Commit this update via Raft + _, index, err := a.srv.raftApply(structs.AllocUpdateDesiredTransitionRequestType, transitionReq) + if err != nil { + a.logger.Error("AllocUpdateDesiredTransitionRequest failed", "error", err) + return err + } + + // Setup the response + reply.Index = index + reply.EvalID = eval.ID + return nil +} + // UpdateDesiredTransition is used to update the desired transitions of an // allocation. func (a *Alloc) UpdateDesiredTransition(args *structs.AllocUpdateDesiredTransitionRequest, reply *structs.GenericResponse) error { diff --git a/nomad/alloc_endpoint_test.go b/nomad/alloc_endpoint_test.go index b34dc7345a7..f26c6a20383 100644 --- a/nomad/alloc_endpoint_test.go +++ b/nomad/alloc_endpoint_test.go @@ -567,3 +567,65 @@ func TestAllocEndpoint_UpdateDesiredTransition(t *testing.T) { require.True(*out1.DesiredTransition.Migrate) require.True(*out2.DesiredTransition.Migrate) } + +func TestAllocEndpoint_Stop_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, _ := TestACLServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + alloc := mock.Alloc() + alloc2 := mock.Alloc() + state := s1.fsm.State() + require.Nil(state.UpsertJobSummary(998, mock.JobSummary(alloc.JobID))) + require.Nil(state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) + require.Nil(state.UpsertAllocs(1000, []*structs.Allocation{alloc, alloc2})) + + req := &structs.AllocStopRequest{ + AllocID: alloc.ID, + } + req.Namespace = structs.DefaultNamespace + req.Region = alloc.Job.Region + + // Try without permissions + var resp structs.AllocStopResponse + err := msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp) + require.True(structs.IsErrPermissionDenied(err), "expected permissions error, got: %v", err) + + // Try with management permissions + req.WriteRequest.AuthToken = s1.getLeaderAcl() + var resp2 structs.AllocStopResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp2)) + require.NotZero(resp2.Index) + + // Try with alloc-lifecycle permissions + validToken := mock.CreatePolicyAndToken(t, state, 1002, "valid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityAllocLifecycle})) + req.WriteRequest.AuthToken = validToken.SecretID + req.AllocID = alloc2.ID + + var resp3 structs.AllocStopResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Alloc.Stop", req, &resp3)) + require.NotZero(resp3.Index) + + // Look up the allocations + out1, err := state.AllocByID(nil, alloc.ID) + require.Nil(err) + out2, err := state.AllocByID(nil, alloc2.ID) + require.Nil(err) + e1, err := state.EvalByID(nil, resp2.EvalID) + require.Nil(err) + e2, err := state.EvalByID(nil, resp3.EvalID) + require.Nil(err) + + require.NotNil(out1.DesiredTransition.Migrate) + require.NotNil(out2.DesiredTransition.Migrate) + require.NotNil(e1) + require.NotNil(e2) + require.True(*out1.DesiredTransition.Migrate) + require.True(*out2.DesiredTransition.Migrate) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 43dd3c43822..feb6b568241 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -700,6 +700,21 @@ type AllocUpdateDesiredTransitionRequest struct { WriteRequest } +// AllocStopRequest is used to stop and reschedule a running Allocation. +type AllocStopRequest struct { + AllocID string + + WriteRequest +} + +// AllocStopResponse is the response to an `AllocStopRequest` +type AllocStopResponse struct { + // EvalID is the id of the follow up evalution for the rescheduled alloc. + EvalID string + + WriteMeta +} + // AllocListRequest is used to request a list of allocations type AllocListRequest struct { QueryOptions @@ -7974,6 +7989,7 @@ const ( EvalTriggerPeriodicJob = "periodic-job" EvalTriggerNodeDrain = "node-drain" EvalTriggerNodeUpdate = "node-update" + EvalTriggerAllocStop = "alloc-stop" EvalTriggerScheduled = "scheduled" EvalTriggerRollingUpdate = "rolling-update" EvalTriggerDeploymentWatcher = "deployment-watcher" diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index a9d6267a8a7..1f7d79e64b8 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -127,6 +127,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { switch eval.TriggeredBy { case structs.EvalTriggerJobRegister, structs.EvalTriggerJobDeregister, structs.EvalTriggerNodeDrain, structs.EvalTriggerNodeUpdate, + structs.EvalTriggerAllocStop, structs.EvalTriggerRollingUpdate, structs.EvalTriggerQueuedAllocs, structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc, diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index e00cea147ae..bcfd6e5be3d 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -61,7 +61,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { switch eval.TriggeredBy { case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerFailedFollowUp, structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPreemption, - structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain: + structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerNodeDrain, structs.EvalTriggerAllocStop: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy)