Skip to content

Commit

Permalink
Ensure we do not reference unsafe memory, add threshold checks for GC…
Browse files Browse the repository at this point in the history
… of allocations that belong to batch jobs. Add Eval GC to batch job GC. Add tests.
  • Loading branch information
stswidwinski committed Nov 3, 2022
1 parent 439a190 commit 4943db4
Show file tree
Hide file tree
Showing 2 changed files with 213 additions and 39 deletions.
37 changes: 20 additions & 17 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,16 +297,18 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
}

// If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the
// allocations.
// collect its most current allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the allocations. However,
// we do want to GC old Evals and Allocs if there are newer ones due to an update.
if eval.Type == structs.JobTypeBatch {
// Check if the job is running

// Can collect if:
// Job doesn't exist
// Job is Stopped and dead
// allowBatch and the job is dead
// Can collect if either holds:
// - Job doesn't exist
// - Job is Stopped and dead
// - allowBatch and the job is dead
//
// If we cannot collect outright, check if a partial GC may occur
collect := false
if job == nil {
collect = true
Expand All @@ -318,12 +320,9 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
collect = true
}

// We don't want to gc anything related to a job which is not dead
// If the batch job doesn't exist we can GC it regardless of allowBatch
if !collect {
// Find allocs associated with older (based on createindex) and GC them if terminal
oldAllocs := olderVersionTerminalAllocs(allocs)
return false, oldAllocs, nil
oldAllocs, gcEval := olderVersionTerminalAllocs(allocs, job, thresholdIndex)
return gcEval, oldAllocs, nil
}
}

Expand All @@ -344,16 +343,20 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return gcEval, gcAllocIDs, nil
}

// olderVersionTerminalAllocs returns terminal allocations whose job create index
// is older than the job's create index
func olderVersionTerminalAllocs(allocs []*structs.Allocation) []string {
// olderVersionTerminalAllocs returns a tuplie ([]string, bool). The first element is the list of
// terminal allocations which may be garbage collected for batch jobs. The second element indicates
// whether or not the allocation itself may be garbage collected.
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job, thresholdIndex uint64) ([]string, bool) {
var ret []string
var mayGCEval = true
for _, alloc := range allocs {
if alloc.Job != nil && alloc.CreateIndex < alloc.Job.ModifyIndex && alloc.TerminalStatus() {
if alloc.CreateIndex < job.JobModifyIndex && alloc.ModifyIndex < thresholdIndex && alloc.TerminalStatus() {
ret = append(ret, alloc.ID)
} else {
mayGCEval = false
}
}
return ret
return ret, mayGCEval
}

// evalReap contacts the leader and issues a reap on the passed evals and
Expand Down
215 changes: 193 additions & 22 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {
}
}

// An EvalGC should reap allocations from jobs with an older modify index
// An EvalGC should reap allocations from jobs with a newer modify index
func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
ci.Parallel(t)

Expand All @@ -404,12 +404,14 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)

var jobModifyIdx uint64 = 1000

// Insert a "dead" job
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job)
err := store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, job)
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -419,7 +421,7 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval})
err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}
Expand All @@ -439,28 +441,194 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost

err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2})
err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx+2, []*structs.Allocation{alloc, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert alloc with indexes older than job.ModifyIndex
// Insert allocs with indexes older than job.ModifyIndex. Two cases:
// 1. Terminal state
// 2. Non-terminal state
alloc3 := mock.Alloc()

alloc3.Job = job
alloc3.JobID = job.ID
alloc3.EvalID = eval.ID
alloc3.DesiredStatus = structs.AllocDesiredStatusRun
alloc3.ClientStatus = structs.AllocClientStatusLost

err = store.UpsertAllocs(structs.MsgTypeTestSetup, job.ModifyIndex - 1, []*structs.Allocation{alloc3})
alloc4 := mock.Alloc()
alloc4.Job = job
alloc4.JobID = job.ID
alloc4.EvalID = eval.ID
alloc4.DesiredStatus = structs.AllocDesiredStatusRun
alloc4.ClientStatus = structs.AllocClientStatusRunning

err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{alloc3, alloc4})
if err != nil {
t.Fatalf("err: %v", err)
}

// Update the time tables to make this work
// A little helper for assertions
assertCorrectEvalAlloc := func(
ws memdb.WatchSet,
eval *structs.Evaluation,
allocsShouldExist []*structs.Allocation,
allocsShouldNotExist []*structs.Allocation,
) {
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("bad: %v", out)
}

for _, alloc := range allocsShouldExist {
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
}
}

for _, alloc := range allocsShouldNotExist {
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA != nil {
t.Fatalf("expected alloc to be nil:%v", outA)
}
}
}

// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC without moving the time significantly.
gc := s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// All allocations should remain in place.
assertCorrectEvalAlloc(
memdb.NewWatchSet(),
eval,
[]*structs.Allocation{alloc, alloc2, alloc3, alloc4},
[]*structs.Allocation{},
)

// Attempt the GC while moving the time forward significantly.
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))

gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// The only remaining allocations are those which are "current", or such that
// the batch job is still using them. To spell it out:
//
// Alloc1 and Alloc 2 should remain due to the update threshold.
// Alloc 3 should be GCed due to age
// Alloc 4 should remain due to non-terminal state
assertCorrectEvalAlloc(
memdb.NewWatchSet(),
eval,
[]*structs.Allocation{alloc, alloc2, alloc4},
[]*structs.Allocation{alloc3},
)

// The job should still exist.
ws := memdb.NewWatchSet()
outB, err := store.JobByID(ws, job.Namespace, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outB == nil {
t.Fatalf("bad: %v", outB)
}
}

// An EvalGC should reap allocations from jobs with a newer modify index and reap the eval itself
// if all allocs are reaped.
func TestCoreScheduler_EvalGC_Batch_OldVersionReapsEval(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

// COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0
s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10)

var jobModifyIdx uint64 = 1000

// Insert a "dead" job
store := s1.fsm.State()
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Status = structs.JobStatusDead
err := store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, job)
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert "complete" eval
eval := mock.Eval()
eval.Status = structs.EvalStatusComplete
eval.Type = structs.JobTypeBatch
eval.JobID = job.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{eval})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert an alloc with index older than job.ModifyIndex.
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.EvalID = eval.ID
alloc.DesiredStatus = structs.AllocDesiredStatusRun
alloc.ClientStatus = structs.AllocClientStatusLost

err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert a new eval
eval2 := mock.Eval()
eval2.Status = structs.EvalStatusComplete
eval2.Type = structs.JobTypeBatch
eval2.JobID = job.ID
err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{eval2})
if err != nil {
t.Fatalf("err: %v", err)
}

// Insert a running alloc belonging to the above eval
alloc2 := mock.Alloc()
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval2.ID
alloc2.DesiredStatus = structs.AllocDesiredStatusRun
alloc2.ClientStatus = structs.AllocClientStatusLost

err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Allocation{alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}

// Create a core scheduler
snap, err := store.Snapshot()
Expand All @@ -469,45 +637,48 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000)
// Attempt the GC while moving the time forward significantly.
tt := s1.fsm.TimeTable()
tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))

gc := s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2)
err = core.Process(gc)
if err != nil {
t.Fatalf("err: %v", err)
}

// Alloc1 and 2 should be there, and alloc3 should be gone
// The old alloc and the old eval are gone. The new eval and the new alloc are not.
ws := memdb.NewWatchSet()
out, err := store.EvalByID(ws, eval.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
if out != nil {
t.Fatalf("bad: %v", out)
}

outA, err := store.AllocByID(ws, alloc.ID)
out, err = store.EvalByID(ws, eval2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA == nil {
t.Fatalf("bad: %v", outA)
if out == nil {
t.Fatalf("bad: %v", out)
}

outA2, err := store.AllocByID(ws, alloc2.ID)
outA, err := store.AllocByID(ws, alloc.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA2 == nil {
t.Fatalf("bad: %v", outA2)
if outA != nil {
t.Fatalf("bad: %v", outA)
}

outA3, err := store.AllocByID(ws, alloc3.ID)
outA, err = store.AllocByID(ws, alloc2.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if outA3 != nil {
t.Fatalf("expected alloc to be nil:%v", outA2)
if outA == nil {
t.Fatalf("bad: %v", outA)
}

outB, err := store.JobByID(ws, job.Namespace, job.ID)
Expand Down

0 comments on commit 4943db4

Please sign in to comment.