From 4943db4cc0c2942c55a882a84132ec0cca6ba15e Mon Sep 17 00:00:00 2001 From: stswidwinski Date: Thu, 3 Nov 2022 22:20:58 +0800 Subject: [PATCH] Ensure we do not reference unsafe memory, add threshold checks for GC of allocations that belong to batch jobs. Add Eval GC to batch job GC. Add tests. --- nomad/core_sched.go | 37 +++---- nomad/core_sched_test.go | 215 +++++++++++++++++++++++++++++++++++---- 2 files changed, 213 insertions(+), 39 deletions(-) diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 77c1701836e..b23c54f52b4 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -297,16 +297,18 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, } // If the eval is from a running "batch" job we don't want to garbage - // collect its allocations. If there is a long running batch job and its - // terminal allocations get GC'd the scheduler would re-run the - // allocations. + // collect its most current allocations. If there is a long running batch job and its + // terminal allocations get GC'd the scheduler would re-run the allocations. However, + // we do want to GC old Evals and Allocs if there are newer ones due to an update. if eval.Type == structs.JobTypeBatch { // Check if the job is running - // Can collect if: - // Job doesn't exist - // Job is Stopped and dead - // allowBatch and the job is dead + // Can collect if either holds: + // - Job doesn't exist + // - Job is Stopped and dead + // - allowBatch and the job is dead + // + // If we cannot collect outright, check if a partial GC may occur collect := false if job == nil { collect = true @@ -318,12 +320,9 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, collect = true } - // We don't want to gc anything related to a job which is not dead - // If the batch job doesn't exist we can GC it regardless of allowBatch if !collect { - // Find allocs associated with older (based on createindex) and GC them if terminal - oldAllocs := olderVersionTerminalAllocs(allocs) - return false, oldAllocs, nil + oldAllocs, gcEval := olderVersionTerminalAllocs(allocs, job, thresholdIndex) + return gcEval, oldAllocs, nil } } @@ -344,16 +343,20 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, return gcEval, gcAllocIDs, nil } -// olderVersionTerminalAllocs returns terminal allocations whose job create index -// is older than the job's create index -func olderVersionTerminalAllocs(allocs []*structs.Allocation) []string { +// olderVersionTerminalAllocs returns a tuplie ([]string, bool). The first element is the list of +// terminal allocations which may be garbage collected for batch jobs. The second element indicates +// whether or not the allocation itself may be garbage collected. +func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job, thresholdIndex uint64) ([]string, bool) { var ret []string + var mayGCEval = true for _, alloc := range allocs { - if alloc.Job != nil && alloc.CreateIndex < alloc.Job.ModifyIndex && alloc.TerminalStatus() { + if alloc.CreateIndex < job.JobModifyIndex && alloc.ModifyIndex < thresholdIndex && alloc.TerminalStatus() { ret = append(ret, alloc.ID) + } else { + mayGCEval = false } } - return ret + return ret, mayGCEval } // evalReap contacts the leader and issues a reap on the passed evals and diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index a4f1c0a02a2..4d3cbc6cfb0 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -393,7 +393,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) { } } -// An EvalGC should reap allocations from jobs with an older modify index +// An EvalGC should reap allocations from jobs with a newer modify index func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) { ci.Parallel(t) @@ -404,12 +404,14 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) { // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) + var jobModifyIdx uint64 = 1000 + // Insert a "dead" job store := s1.fsm.State() job := mock.Job() job.Type = structs.JobTypeBatch job.Status = structs.JobStatusDead - err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, job) + err := store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, job) if err != nil { t.Fatalf("err: %v", err) } @@ -419,7 +421,7 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) { eval.Status = structs.EvalStatusComplete eval.Type = structs.JobTypeBatch eval.JobID = job.ID - err = store.UpsertEvals(structs.MsgTypeTestSetup, 1001, []*structs.Evaluation{eval}) + err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{eval}) if err != nil { t.Fatalf("err: %v", err) } @@ -439,28 +441,194 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) { alloc2.DesiredStatus = structs.AllocDesiredStatusRun alloc2.ClientStatus = structs.AllocClientStatusLost - err = store.UpsertAllocs(structs.MsgTypeTestSetup, 1002, []*structs.Allocation{alloc, alloc2}) + err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx+2, []*structs.Allocation{alloc, alloc2}) if err != nil { t.Fatalf("err: %v", err) } - // Insert alloc with indexes older than job.ModifyIndex + // Insert allocs with indexes older than job.ModifyIndex. Two cases: + // 1. Terminal state + // 2. Non-terminal state alloc3 := mock.Alloc() - alloc3.Job = job alloc3.JobID = job.ID alloc3.EvalID = eval.ID alloc3.DesiredStatus = structs.AllocDesiredStatusRun alloc3.ClientStatus = structs.AllocClientStatusLost - err = store.UpsertAllocs(structs.MsgTypeTestSetup, job.ModifyIndex - 1, []*structs.Allocation{alloc3}) + alloc4 := mock.Alloc() + alloc4.Job = job + alloc4.JobID = job.ID + alloc4.EvalID = eval.ID + alloc4.DesiredStatus = structs.AllocDesiredStatusRun + alloc4.ClientStatus = structs.AllocClientStatusRunning + + err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{alloc3, alloc4}) if err != nil { t.Fatalf("err: %v", err) } - // Update the time tables to make this work + // A little helper for assertions + assertCorrectEvalAlloc := func( + ws memdb.WatchSet, + eval *structs.Evaluation, + allocsShouldExist []*structs.Allocation, + allocsShouldNotExist []*structs.Allocation, + ) { + out, err := store.EvalByID(ws, eval.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("bad: %v", out) + } + + for _, alloc := range allocsShouldExist { + outA, err := store.AllocByID(ws, alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA == nil { + t.Fatalf("bad: %v", outA) + } + } + + for _, alloc := range allocsShouldNotExist { + outA, err := store.AllocByID(ws, alloc.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outA != nil { + t.Fatalf("expected alloc to be nil:%v", outA) + } + } + } + + // Create a core scheduler + snap, err := store.Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Attempt the GC without moving the time significantly. + gc := s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // All allocations should remain in place. + assertCorrectEvalAlloc( + memdb.NewWatchSet(), + eval, + []*structs.Allocation{alloc, alloc2, alloc3, alloc4}, + []*structs.Allocation{}, + ) + + // Attempt the GC while moving the time forward significantly. tt := s1.fsm.TimeTable() - tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) + tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) + + gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2) + err = core.Process(gc) + if err != nil { + t.Fatalf("err: %v", err) + } + + // The only remaining allocations are those which are "current", or such that + // the batch job is still using them. To spell it out: + // + // Alloc1 and Alloc 2 should remain due to the update threshold. + // Alloc 3 should be GCed due to age + // Alloc 4 should remain due to non-terminal state + assertCorrectEvalAlloc( + memdb.NewWatchSet(), + eval, + []*structs.Allocation{alloc, alloc2, alloc4}, + []*structs.Allocation{alloc3}, + ) + + // The job should still exist. + ws := memdb.NewWatchSet() + outB, err := store.JobByID(ws, job.Namespace, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if outB == nil { + t.Fatalf("bad: %v", outB) + } +} + +// An EvalGC should reap allocations from jobs with a newer modify index and reap the eval itself +// if all allocs are reaped. +func TestCoreScheduler_EvalGC_Batch_OldVersionReapsEval(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + testutil.WaitForLeader(t, s1.RPC) + + // COMPAT Remove in 0.6: Reset the FSM time table since we reconcile which sets index 0 + s1.fsm.timetable.table = make([]TimeTableEntry, 1, 10) + + var jobModifyIdx uint64 = 1000 + + // Insert a "dead" job + store := s1.fsm.State() + job := mock.Job() + job.Type = structs.JobTypeBatch + job.Status = structs.JobStatusDead + err := store.UpsertJob(structs.MsgTypeTestSetup, jobModifyIdx, job) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert "complete" eval + eval := mock.Eval() + eval.Status = structs.EvalStatusComplete + eval.Type = structs.JobTypeBatch + eval.JobID = job.ID + err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{eval}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert an alloc with index older than job.ModifyIndex. + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.EvalID = eval.ID + alloc.DesiredStatus = structs.AllocDesiredStatusRun + alloc.ClientStatus = structs.AllocClientStatusLost + + err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx-1, []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert a new eval + eval2 := mock.Eval() + eval2.Status = structs.EvalStatusComplete + eval2.Type = structs.JobTypeBatch + eval2.JobID = job.ID + err = store.UpsertEvals(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Evaluation{eval2}) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Insert a running alloc belonging to the above eval + alloc2 := mock.Alloc() + alloc2.Job = job + alloc2.JobID = job.ID + alloc2.EvalID = eval2.ID + alloc2.DesiredStatus = structs.AllocDesiredStatusRun + alloc2.ClientStatus = structs.AllocClientStatusLost + + err = store.UpsertAllocs(structs.MsgTypeTestSetup, jobModifyIdx+1, []*structs.Allocation{alloc2}) + if err != nil { + t.Fatalf("err: %v", err) + } // Create a core scheduler snap, err := store.Snapshot() @@ -469,45 +637,48 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) { } core := NewCoreScheduler(s1, snap) - // Attempt the GC - gc := s1.coreJobEval(structs.CoreJobEvalGC, 2000) + // Attempt the GC while moving the time forward significantly. + tt := s1.fsm.TimeTable() + tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold)) + + gc := s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2) err = core.Process(gc) if err != nil { t.Fatalf("err: %v", err) } - // Alloc1 and 2 should be there, and alloc3 should be gone + // The old alloc and the old eval are gone. The new eval and the new alloc are not. ws := memdb.NewWatchSet() out, err := store.EvalByID(ws, eval.ID) if err != nil { t.Fatalf("err: %v", err) } - if out == nil { + if out != nil { t.Fatalf("bad: %v", out) } - outA, err := store.AllocByID(ws, alloc.ID) + out, err = store.EvalByID(ws, eval2.ID) if err != nil { t.Fatalf("err: %v", err) } - if outA == nil { - t.Fatalf("bad: %v", outA) + if out == nil { + t.Fatalf("bad: %v", out) } - outA2, err := store.AllocByID(ws, alloc2.ID) + outA, err := store.AllocByID(ws, alloc.ID) if err != nil { t.Fatalf("err: %v", err) } - if outA2 == nil { - t.Fatalf("bad: %v", outA2) + if outA != nil { + t.Fatalf("bad: %v", outA) } - outA3, err := store.AllocByID(ws, alloc3.ID) + outA, err = store.AllocByID(ws, alloc2.ID) if err != nil { t.Fatalf("err: %v", err) } - if outA3 != nil { - t.Fatalf("expected alloc to be nil:%v", outA2) + if outA == nil { + t.Fatalf("bad: %v", outA) } outB, err := store.JobByID(ws, job.Namespace, job.ID)