diff --git a/CHANGELOG.md b/CHANGELOG.md index e253c5737af..cb7f45e49f2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ IMPROVEMENTS: BUG FIXES: * core: *Fix restoration of stopped periodic jobs [GH-3201] + * core: Fix a race condition in which scheduling results from one invocation of + the scheduler wouldn't be considered by the next for the same job [GH-3206] * api: Sort /v1/agent/servers output so that output of Consul checks does not change [GH-3214] * api: Fix search handling of jobs with more than four hyphens and case were diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 7b50c313a60..9b5cd3cdce9 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -6,6 +6,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/go-memdb" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -92,8 +93,24 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, // Provide the output if any if eval != nil { + // Get the index that the worker should wait until before scheduling. + waitIndex, err := e.getWaitIndex(eval.Namespace, eval.JobID) + if err != nil { + var mErr multierror.Error + multierror.Append(&mErr, err) + + // We have dequeued the evaluation but won't be returning it to the + // worker so Nack the eval. + if err := e.srv.evalBroker.Nack(eval.ID, token); err != nil { + multierror.Append(&mErr, err) + } + + return &mErr + } + reply.Eval = eval reply.Token = token + reply.WaitIndex = waitIndex } // Set the query response @@ -101,6 +118,31 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, return nil } +// getWaitIndex returns the wait index that should be used by the worker before +// invoking the scheduler. The index should be the highest modify index of any +// evaluation for the job. This prevents scheduling races for the same job when +// there are blocked evaluations. +func (e *Eval) getWaitIndex(namespace, job string) (uint64, error) { + snap, err := e.srv.State().Snapshot() + if err != nil { + return 0, err + } + + evals, err := snap.EvalsByJob(nil, namespace, job) + if err != nil { + return 0, err + } + + var max uint64 + for _, eval := range evals { + if max < eval.ModifyIndex { + max = eval.ModifyIndex + } + } + + return max, nil +} + // Ack is used to acknowledge completion of a dequeued evaluation func (e *Eval) Ack(args *structs.EvalAckRequest, reply *structs.GenericResponse) error { diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 3330eda4355..9cf1b21a1df 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -170,6 +170,56 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { if token != resp.Token { t.Fatalf("bad token: %#v %#v", token, resp.Token) } + + if resp.WaitIndex != eval1.ModifyIndex { + t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, eval1.ModifyIndex) + } +} + +func TestEvalEndpoint_Dequeue_WaitIndex(t *testing.T) { + t.Parallel() + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + eval1 := mock.Eval() + eval2 := mock.Eval() + eval2.JobID = eval1.JobID + s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}) + s1.evalBroker.Enqueue(eval1) + s1.fsm.State().UpsertEvals(1001, []*structs.Evaluation{eval2}) + + // Dequeue the eval + get := &structs.EvalDequeueRequest{ + Schedulers: defaultSched, + SchedulerVersion: scheduler.SchedulerVersion, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.EvalDequeueResponse + if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(eval1, resp.Eval) { + t.Fatalf("bad: %v %v", eval1, resp.Eval) + } + + // Ensure outstanding + token, ok := s1.evalBroker.Outstanding(eval1.ID) + if !ok { + t.Fatalf("should be outstanding") + } + if token != resp.Token { + t.Fatalf("bad token: %#v %#v", token, resp.Token) + } + + if resp.WaitIndex != 1001 { + t.Fatalf("bad wait index; got %d; want %d", resp.WaitIndex, 1001) + } } func TestEvalEndpoint_Dequeue_Version_Mismatch(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 832ee476440..578d93dbac8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -932,9 +932,29 @@ type SingleEvalResponse struct { type EvalDequeueResponse struct { Eval *Evaluation Token string + + // WaitIndex is the Raft index the worker should wait until invoking the + // scheduler. + WaitIndex uint64 + QueryMeta } +// GetWaitIndex is used to retrieve the Raft index in which state should be at +// or beyond before invoking the scheduler. +func (e *EvalDequeueResponse) GetWaitIndex() uint64 { + // Prefer the wait index sent. This will be populated on all responses from + // 0.7.0 and above + if e.WaitIndex != 0 { + return e.WaitIndex + } else if e.Eval != nil { + return e.Eval.ModifyIndex + } + + // This should never happen + return 1 +} + // PlanResponse is used to return from a PlanRequest type PlanResponse struct { Result *PlanResult diff --git a/nomad/worker.go b/nomad/worker.go index 0a6b66cc328..76fb4eefacf 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -106,7 +106,7 @@ func (w *Worker) checkPaused() { func (w *Worker) run() { for { // Dequeue a pending evaluation - eval, token, shutdown := w.dequeueEvaluation(dequeueTimeout) + eval, token, waitIndex, shutdown := w.dequeueEvaluation(dequeueTimeout) if shutdown { return } @@ -118,7 +118,7 @@ func (w *Worker) run() { } // Wait for the raft log to catchup to the evaluation - if err := w.waitForIndex(eval.ModifyIndex, raftSyncLimit); err != nil { + if err := w.waitForIndex(waitIndex, raftSyncLimit); err != nil { w.sendAck(eval.ID, token, false) continue } @@ -136,7 +136,8 @@ func (w *Worker) run() { // dequeueEvaluation is used to fetch the next ready evaluation. // This blocks until an evaluation is available or a timeout is reached. -func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) { +func (w *Worker) dequeueEvaluation(timeout time.Duration) ( + eval *structs.Evaluation, token string, waitIndex uint64, shutdown bool) { // Setup the request req := structs.EvalDequeueRequest{ Schedulers: w.srv.config.EnabledSchedulers, @@ -170,7 +171,7 @@ REQ: } if w.backoffErr(base, limit) { - return nil, "", true + return nil, "", 0, true } goto REQ } @@ -179,12 +180,12 @@ REQ: // Check if we got a response if resp.Eval != nil { w.logger.Printf("[DEBUG] worker: dequeued evaluation %s", resp.Eval.ID) - return resp.Eval, resp.Token, false + return resp.Eval, resp.Token, resp.GetWaitIndex(), false } // Check for potential shutdown if w.srv.IsShutdown() { - return nil, "", true + return nil, "", 0, true } goto REQ } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 3e716f31dc0..567a3132153 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -60,13 +60,16 @@ func TestWorker_dequeueEvaluation(t *testing.T) { w := &Worker{srv: s1, logger: s1.logger} // Attempt dequeue - eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond) + eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) if shutdown { t.Fatalf("should not shutdown") } if token == "" { t.Fatalf("should get token") } + if waitIndex != eval1.ModifyIndex { + t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex) + } // Ensure we get a sane eval if !reflect.DeepEqual(eval, eval1) { @@ -74,6 +77,76 @@ func TestWorker_dequeueEvaluation(t *testing.T) { } } +// Test that the worker picks up the correct wait index when there are multiple +// evals for the same job. +func TestWorker_dequeueEvaluation_SerialJobs(t *testing.T) { + t.Parallel() + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EnabledSchedulers = []string{structs.JobTypeService} + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create the evaluation + eval1 := mock.Eval() + eval2 := mock.Eval() + eval2.JobID = eval1.JobID + + // Insert the evals into the state store + if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1, eval2}); err != nil { + t.Fatal(err) + } + + s1.evalBroker.Enqueue(eval1) + s1.evalBroker.Enqueue(eval2) + + // Create a worker + w := &Worker{srv: s1, logger: s1.logger} + + // Attempt dequeue + eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) + if shutdown { + t.Fatalf("should not shutdown") + } + if token == "" { + t.Fatalf("should get token") + } + if waitIndex != eval1.ModifyIndex { + t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex) + } + + // Ensure we get a sane eval + if !reflect.DeepEqual(eval, eval1) { + t.Fatalf("bad: %#v %#v", eval, eval1) + } + + // Update the modify index of the first eval + if err := s1.fsm.State().UpsertEvals(2000, []*structs.Evaluation{eval1}); err != nil { + t.Fatal(err) + } + + // Send the Ack + w.sendAck(eval1.ID, token, true) + + // Attempt second dequeue + eval, token, waitIndex, shutdown = w.dequeueEvaluation(10 * time.Millisecond) + if shutdown { + t.Fatalf("should not shutdown") + } + if token == "" { + t.Fatalf("should get token") + } + if waitIndex != 2000 { + t.Fatalf("bad wait index; got %d; want 2000", eval2.ModifyIndex) + } + + // Ensure we get a sane eval + if !reflect.DeepEqual(eval, eval2) { + t.Fatalf("bad: %#v %#v", eval, eval2) + } +} + func TestWorker_dequeueEvaluation_paused(t *testing.T) { t.Parallel() s1 := testServer(t, func(c *Config) { @@ -101,7 +174,7 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) { // Attempt dequeue start := time.Now() - eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond) + eval, token, waitIndex, shutdown := w.dequeueEvaluation(10 * time.Millisecond) if diff := time.Since(start); diff < 100*time.Millisecond { t.Fatalf("should have paused: %v", diff) } @@ -111,6 +184,9 @@ func TestWorker_dequeueEvaluation_paused(t *testing.T) { if token == "" { t.Fatalf("should get token") } + if waitIndex != eval1.ModifyIndex { + t.Fatalf("bad wait index; got %d; want %d", eval1.ModifyIndex) + } // Ensure we get a sane eval if !reflect.DeepEqual(eval, eval1) { @@ -136,7 +212,7 @@ func TestWorker_dequeueEvaluation_shutdown(t *testing.T) { }() // Attempt dequeue - eval, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond) + eval, _, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond) if !shutdown { t.Fatalf("should not shutdown") } @@ -164,7 +240,7 @@ func TestWorker_sendAck(t *testing.T) { w := &Worker{srv: s1, logger: s1.logger} // Attempt dequeue - eval, token, _ := w.dequeueEvaluation(10 * time.Millisecond) + eval, token, _, _ := w.dequeueEvaluation(10 * time.Millisecond) // Check the depth is 0, 1 unacked stats := s1.evalBroker.Stats() @@ -182,7 +258,7 @@ func TestWorker_sendAck(t *testing.T) { } // Attempt dequeue - eval, token, _ = w.dequeueEvaluation(10 * time.Millisecond) + eval, token, _, _ = w.dequeueEvaluation(10 * time.Millisecond) // Send the Ack w.sendAck(eval.ID, token, true)