Skip to content

Commit

Permalink
Merge pull request #4720 from hashicorp/b-jet-fixes
Browse files Browse the repository at this point in the history
Series of scheduler fixes / debugging enhancements
  • Loading branch information
dadgar committed Sep 25, 2018
1 parent 387ac3c commit e294b37
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 67 deletions.
112 changes: 51 additions & 61 deletions nomad/deploymentwatcher/deployment_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ func (w *deploymentWatcher) SetAllocHealth(
if j != nil {
resp.RevertedJobVersion = helper.Uint64ToPtr(j.Version)
}
w.setLatestEval(index)
return nil
}

Expand All @@ -251,7 +250,7 @@ func (w *deploymentWatcher) PromoteDeployment(
// Create the request
areq := &structs.ApplyDeploymentPromoteRequest{
DeploymentPromoteRequest: *req,
Eval: w.getEval(),
Eval: w.getEval(),
}

index, err := w.upsertDeploymentPromotion(areq)
Expand All @@ -264,7 +263,6 @@ func (w *deploymentWatcher) PromoteDeployment(
resp.EvalCreateIndex = index
resp.DeploymentModifyIndex = index
resp.Index = index
w.setLatestEval(index)
return nil
}

Expand Down Expand Up @@ -296,7 +294,6 @@ func (w *deploymentWatcher) PauseDeployment(
}
resp.DeploymentModifyIndex = i
resp.Index = i
w.setLatestEval(i)
return nil
}

Expand Down Expand Up @@ -346,7 +343,6 @@ func (w *deploymentWatcher) FailDeployment(
if rollbackJob != nil {
resp.RevertedJobVersion = helper.Uint64ToPtr(rollbackJob.Version)
}
w.setLatestEval(i)
return nil
}

Expand Down Expand Up @@ -489,10 +485,8 @@ FAIL:
// Update the status of the deployment to failed and create an evaluation.
e := w.getEval()
u := w.getDeploymentStatusUpdate(structs.DeploymentStatusFailed, desc)
if index, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil {
if _, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil {
w.logger.Printf("[ERR] nomad.deployment_watcher: failed to update deployment %q status: %v", w.deploymentID, err)
} else {
w.setLatestEval(index)
}
}

Expand All @@ -511,7 +505,7 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
var res allocUpdateResult

// Get the latest evaluation index
latestEval, err := w.latestEvalIndex()
latestEval, blocked, err := w.jobEvalStatus()
if err != nil {
if err == context.Canceled || w.ctx.Err() == context.Canceled {
return res, err
Expand All @@ -527,19 +521,20 @@ func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub) (
continue
}

// Nothing to do for this allocation
if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.ModifyIndex <= latestEval {
continue
}

// Determine if the update stanza for this group is progress based
progressBased := dstate.ProgressDeadline != 0

// Check if the allocation has failed and we need to mark it for allow
// replacements
if progressBased && alloc.DeploymentStatus.IsUnhealthy() &&
deployment.Active() && !alloc.DesiredTransition.ShouldReschedule() {
res.allowReplacements = append(res.allowReplacements, alloc.ID)
continue
}

// We need to create an eval so the job can progress.
if alloc.DeploymentStatus.IsHealthy() {
if !blocked && alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval {
res.createEval = true
} else if progressBased && alloc.DeploymentStatus.IsUnhealthy() && deployment.Active() && !alloc.DesiredTransition.ShouldReschedule() {
res.allowReplacements = append(res.allowReplacements, alloc.ID)
}

// If the group is using a progress deadline, we don't have to do anything.
Expand Down Expand Up @@ -684,10 +679,8 @@ func (w *deploymentWatcher) createBatchedUpdate(allowReplacements []string, forI
w.l.Unlock()

// Create the eval
if index, err := w.createUpdate(replacements, w.getEval()); err != nil {
if _, err := w.createUpdate(replacements, w.getEval()); err != nil {
w.logger.Printf("[ERR] nomad.deployment_watcher: failed to create evaluation for deployment %q: %v", w.deploymentID, err)
} else {
w.setLatestEval(index)
}
})
}
Expand Down Expand Up @@ -763,71 +756,68 @@ func (w *deploymentWatcher) getAllocsImpl(ws memdb.WatchSet, state *state.StateS
return nil, 0, err
}

maxIndex := uint64(0)
stubs := make([]*structs.AllocListStub, 0, len(allocs))
for _, alloc := range allocs {
stubs = append(stubs, alloc.Stub())

if maxIndex < alloc.ModifyIndex {
maxIndex = alloc.ModifyIndex
}
}

// Use the last index that affected the jobs table
index, err := state.Index("allocs")
if err != nil {
return nil, index, err
// Use the last index that affected the allocs table
if len(stubs) == 0 {
index, err := state.Index("allocs")
if err != nil {
return nil, index, err
}
maxIndex = index
}

return stubs, index, nil
return stubs, maxIndex, nil
}

// latestEvalIndex returns the index of the last evaluation created for
// the job. The index is used to determine if an allocation update requires an
// evaluation to be triggered.
func (w *deploymentWatcher) latestEvalIndex() (uint64, error) {
// jobEvalStatus returns the eval status for a job. It returns the index of the
// last evaluation created for the job, as well as whether there exists a
// blocked evaluation for the job. The index is used to determine if an
// allocation update requires an evaluation to be triggered. If there already is
// a blocked evaluations, no eval should be created.
func (w *deploymentWatcher) jobEvalStatus() (latestIndex uint64, blocked bool, err error) {
if err := w.queryLimiter.Wait(w.ctx); err != nil {
return 0, err
return 0, false, err
}

snap, err := w.state.Snapshot()
if err != nil {
return 0, err
return 0, false, err
}

evals, err := snap.EvalsByJob(nil, w.j.Namespace, w.j.ID)
if err != nil {
return 0, err
return 0, false, err
}

if len(evals) == 0 {
idx, err := snap.Index("evals")
if err != nil {
w.setLatestEval(idx)
}

return idx, err
index, err := snap.Index("evals")
return index, false, err
}

// Prefer using the snapshot index. Otherwise use the create index
e := evals[0]
if e.SnapshotIndex != 0 {
w.setLatestEval(e.SnapshotIndex)
return e.SnapshotIndex, nil
}

w.setLatestEval(e.CreateIndex)
return e.CreateIndex, nil
}
var max uint64
for _, eval := range evals {
// If we have a blocked eval, then we do not care what the index is
// since we will not need to make a new eval.
if eval.ShouldBlock() {
return 0, true, nil
}

// setLatestEval sets the given index as the latest eval unless the currently
// stored index is higher.
func (w *deploymentWatcher) setLatestEval(index uint64) {
w.l.Lock()
defer w.l.Unlock()
if index > w.latestEval {
w.latestEval = index
// Prefer using the snapshot index. Otherwise use the create index
if eval.SnapshotIndex != 0 && max < eval.SnapshotIndex {
max = eval.SnapshotIndex
} else if max < eval.CreateIndex {
max = eval.CreateIndex
}
}
}

// getLatestEval returns the latest eval index.
func (w *deploymentWatcher) getLatestEval() uint64 {
w.l.Lock()
defer w.l.Unlock()
return w.latestEval
return max, false, nil
}
12 changes: 12 additions & 0 deletions nomad/drainer/watch_jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St
}

// Capture the allocs for each draining job.
var maxIndex uint64 = 0
resp := make(map[structs.NamespacedID][]*structs.Allocation, l)
for jns := range draining {
allocs, err := state.AllocsByJob(ws, jns.Namespace, jns.ID, false)
Expand All @@ -454,6 +455,17 @@ func (w *drainingJobWatcher) getJobAllocsImpl(ws memdb.WatchSet, state *state.St
}

resp[jns] = allocs
for _, alloc := range allocs {
if maxIndex < alloc.ModifyIndex {
maxIndex = alloc.ModifyIndex
}
}
}

// Prefer using the actual max index of affected allocs since it means less
// unblocking
if maxIndex != 0 {
index = maxIndex
}

return resp, index, nil
Expand Down
10 changes: 10 additions & 0 deletions nomad/drainer/watch_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto
return nil, 0, err
}

var maxIndex uint64 = 0
resp := make(map[string]*structs.Node, 64)
for {
raw := iter.Next()
Expand All @@ -245,6 +246,15 @@ func (w *nodeDrainWatcher) getNodesImpl(ws memdb.WatchSet, state *state.StateSto

node := raw.(*structs.Node)
resp[node.ID] = node
if maxIndex < node.ModifyIndex {
maxIndex = node.ModifyIndex
}
}

// Prefer using the actual max index of affected nodes since it means less
// unblocking
if maxIndex != 0 {
index = maxIndex
}

return resp, index, nil
Expand Down
1 change: 1 addition & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2826,6 +2826,7 @@ func (s *StateStore) UpdateDeploymentAllocHealth(index uint64, req *structs.Appl
copy.DeploymentStatus.Healthy = helper.BoolToPtr(healthy)
copy.DeploymentStatus.Timestamp = ts
copy.DeploymentStatus.ModifyIndex = index
copy.ModifyIndex = index

if err := s.updateDeploymentWithAlloc(index, copy, old, txn); err != nil {
return fmt.Errorf("error updating deployment: %v", err)
Expand Down
5 changes: 5 additions & 0 deletions nomad/structs/funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,11 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex) (bool, st

// For each alloc, add the resources
for _, alloc := range allocs {
// Do not consider the resource impact of terminal allocations
if alloc.TerminalStatus() {
continue
}

if alloc.Resources != nil {
if err := used.Add(alloc.Resources); err != nil {
return false, "", nil, err
Expand Down
85 changes: 85 additions & 0 deletions nomad/structs/funcs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,91 @@ func TestAllocsFit(t *testing.T) {

}

func TestAllocsFit_TerminalAlloc(t *testing.T) {
n := &Node{
Resources: &Resources{
CPU: 2000,
MemoryMB: 2048,
DiskMB: 10000,
IOPS: 100,
Networks: []*NetworkResource{
{
Device: "eth0",
CIDR: "10.0.0.0/8",
MBits: 100,
},
},
},
Reserved: &Resources{
CPU: 1000,
MemoryMB: 1024,
DiskMB: 5000,
IOPS: 50,
Networks: []*NetworkResource{
{
Device: "eth0",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"main", 80}},
},
},
},
}

a1 := &Allocation{
Resources: &Resources{
CPU: 1000,
MemoryMB: 1024,
DiskMB: 5000,
IOPS: 50,
Networks: []*NetworkResource{
{
Device: "eth0",
IP: "10.0.0.1",
MBits: 50,
ReservedPorts: []Port{{"main", 8000}},
},
},
},
}

// Should fit one allocation
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if !fit {
t.Fatalf("Bad")
}

// Sanity check the used resources
if used.CPU != 2000 {
t.Fatalf("bad: %#v", used)
}
if used.MemoryMB != 2048 {
t.Fatalf("bad: %#v", used)
}

// Should fit second allocation since it is terminal
a2 := a1.Copy()
a2.DesiredStatus = AllocDesiredStatusStop
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a2}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
if !fit {
t.Fatalf("Bad")
}

// Sanity check the used resources
if used.CPU != 2000 {
t.Fatalf("bad: %#v", used)
}
if used.MemoryMB != 2048 {
t.Fatalf("bad: %#v", used)
}
}

func TestScoreFit(t *testing.T) {
node := &Node{}
node.Resources = &Resources{
Expand Down
Loading

0 comments on commit e294b37

Please sign in to comment.