Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved scheduler retry logic under high contention #787

Merged
merged 2 commits into from
Feb 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 38 additions & 13 deletions scheduler/generic_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"log"

"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
)

Expand Down Expand Up @@ -50,11 +51,12 @@ type GenericScheduler struct {
planner Planner
batch bool

eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
ctx *EvalContext
stack *GenericStack
eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
planResult *structs.PlanResult
ctx *EvalContext
stack *GenericStack

limitReached bool
nextEval *structs.Evaluation
Expand Down Expand Up @@ -99,14 +101,24 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc)
}

// Retry up to the maxScheduleAttempts
// Retry up to the maxScheduleAttempts and reset if progress is made.
progress := func() bool { return progressMade(s.planResult) }
limit := maxServiceScheduleAttempts
if s.batch {
limit = maxBatchScheduleAttempts
}
if err := retryMax(limit, s.process); err != nil {
if err := retryMax(limit, s.process, progress); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error())
// Scheduling was tried but made no forward progress so create a
// blocked eval to retry once resources become available.
var mErr multierror.Error
if err := s.createBlockedEval(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error()); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
return mErr.ErrorOrNil()
}
return err
}
Expand All @@ -115,6 +127,21 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusComplete, "")
}

// createBlockedEval creates a blocked eval and stores it.
func (s *GenericScheduler) createBlockedEval() error {
e := s.ctx.Eligibility()
escaped := e.HasEscaped()

// Only store the eligible classes if the eval hasn't escaped.
var classEligibility map[string]bool
if !escaped {
classEligibility = e.GetClasses()
}

s.blocked = s.eval.BlockedEval(classEligibility, escaped)
return s.planner.CreateEval(s.blocked)
}

// process is wrapped in retryMax to iteratively run the handler until we have no
// further work or we've made the maximum number of attempts.
func (s *GenericScheduler) process() (bool, error) {
Expand Down Expand Up @@ -163,18 +190,16 @@ func (s *GenericScheduler) process() (bool, error) {
// If there are failed allocations, we need to create a blocked evaluation
// to place the failed allocations when resources become available.
if len(s.plan.FailedAllocs) != 0 && s.blocked == nil {
e := s.ctx.Eligibility()
classes := e.GetClasses()
s.blocked = s.eval.BlockedEval(classes, e.HasEscaped())
if err := s.planner.CreateEval(s.blocked); err != nil {
if err := s.createBlockedEval(); err != nil {
s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err)
return false, err
}
s.logger.Printf("[DEBUG] sched: %#v: failed to place all allocations, blocked eval '%s' created", s.eval, s.blocked.ID)
}

// Submit the plan
// Submit the plan and store the results.
result, newState, err := s.planner.SubmitPlan(s.plan)
s.planResult = result
if err != nil {
return false, err
}
Expand Down
21 changes: 12 additions & 9 deletions scheduler/system_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,14 @@ type SystemScheduler struct {
state State
planner Planner

eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
ctx *EvalContext
stack *SystemStack
nodes []*structs.Node
nodesByDC map[string]int
eval *structs.Evaluation
job *structs.Job
plan *structs.Plan
planResult *structs.PlanResult
ctx *EvalContext
stack *SystemStack
nodes []*structs.Node
nodesByDC map[string]int

limitReached bool
nextEval *structs.Evaluation
Expand Down Expand Up @@ -62,8 +63,9 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, structs.EvalStatusFailed, desc)
}

// Retry up to the maxSystemScheduleAttempts
if err := retryMax(maxSystemScheduleAttempts, s.process); err != nil {
// Retry up to the maxSystemScheduleAttempts and reset if progress is made.
progress := func() bool { return progressMade(s.planResult) }
if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return setStatus(s.logger, s.planner, s.eval, s.nextEval, statusErr.EvalStatus, err.Error())
}
Expand Down Expand Up @@ -129,6 +131,7 @@ func (s *SystemScheduler) process() (bool, error) {

// Submit the plan
result, newState, err := s.planner.SubmitPlan(s.plan)
s.planResult = result
if err != nil {
return false, err
}
Expand Down
21 changes: 18 additions & 3 deletions scheduler/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,10 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, map[string]int
}

// retryMax is used to retry a callback until it returns success or
// a maximum number of attempts is reached
func retryMax(max int, cb func() (bool, error)) error {
// a maximum number of attempts is reached. An optional reset function may be
// passed which is called after each failed iteration. If the reset function is
// set and returns true, the number of attempts is reset back to max.
func retryMax(max int, cb func() (bool, error), reset func() bool) error {
attempts := 0
for attempts < max {
done, err := cb()
Expand All @@ -219,14 +221,27 @@ func retryMax(max int, cb func() (bool, error)) error {
if done {
return nil
}
attempts += 1

// Check if we should reset the number attempts
if reset != nil && reset() {
attempts = 0
} else {
attempts += 1
}
}
return &SetStatusError{
Err: fmt.Errorf("maximum attempts reached (%d)", max),
EvalStatus: structs.EvalStatusFailed,
}
}

// progressMade checks to see if the plan result made allocations or updates.
// If the result is nil, false is returned.
func progressMade(result *structs.PlanResult) bool {
return result != nil && len(result.NodeUpdate) != 0 &&
len(result.NodeAllocation) != 0
}

// taintedNodes is used to scan the allocations and then check if the
// underlying nodes are tainted, and should force a migration of the allocation.
func taintedNodes(state State, allocs []*structs.Allocation) (map[string]bool, error) {
Expand Down
21 changes: 19 additions & 2 deletions scheduler/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,20 +229,37 @@ func TestRetryMax(t *testing.T) {
calls += 1
return false, nil
}
err := retryMax(3, bad)
err := retryMax(3, bad, nil)
if err == nil {
t.Fatalf("should fail")
}
if calls != 3 {
t.Fatalf("mis match")
}

calls = 0
first := true
reset := func() bool {
if calls == 3 && first {
first = false
return true
}
return false
}
err = retryMax(3, bad, reset)
if err == nil {
t.Fatalf("should fail")
}
if calls != 6 {
t.Fatalf("mis match")
}

calls = 0
good := func() (bool, error) {
calls += 1
return true, nil
}
err = retryMax(3, good)
err = retryMax(3, good, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand Down