Skip to content

Commit

Permalink
As described in #15090 let us introduce a new knob which allows us to…
Browse files Browse the repository at this point in the history
… control the pace of GC of Batch Evals independent of other evals.
  • Loading branch information
stswidwinski committed Dec 5, 2022
1 parent 4943db4 commit 823b8bc
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 12 deletions.
10 changes: 9 additions & 1 deletion command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,9 +467,14 @@ type ServerConfig struct {

// EvalGCThreshold controls how "old" an eval must be to be collected by GC.
// Age is not the only requirement for a eval to be GCed but the threshold
// can be used to filter by age.
// can be used to filter by age. Please note that batch job evaluations are
// controlled by 'BatchEvalGCThreshold' instead.
EvalGCThreshold string `hcl:"eval_gc_threshold"`

// BatchEvalGCThreshold controls how "old" an evaluation must be to be eligible
// for GC if the eval belongs to a batch job.
BatchEvalGCThreshold string `hcl:"batch_eval_gc_threshold"`

// DeploymentGCThreshold controls how "old" a deployment must be to be
// collected by GC. Age is not the only requirement for a deployment to be
// GCed but the threshold can be used to filter by age.
Expand Down Expand Up @@ -1827,6 +1832,9 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.EvalGCThreshold != "" {
result.EvalGCThreshold = b.EvalGCThreshold
}
if b.BatchEvalGCThreshold != "" {
result.BatchEvalGCThreshold = b.BatchEvalGCThreshold
}
if b.DeploymentGCThreshold != "" {
result.DeploymentGCThreshold = b.DeploymentGCThreshold
}
Expand Down
2 changes: 2 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ func TestConfig_Merge(t *testing.T) {
RaftMultiplier: pointer.Of(5),
NumSchedulers: pointer.Of(1),
NodeGCThreshold: "1h",
BatchEvalGCThreshold: "4h",
HeartbeatGrace: 30 * time.Second,
MinHeartbeatTTL: 30 * time.Second,
MaxHeartbeatsPerSecond: 30.0,
Expand Down Expand Up @@ -339,6 +340,7 @@ func TestConfig_Merge(t *testing.T) {
NumSchedulers: pointer.Of(2),
EnabledSchedulers: []string{structs.JobTypeBatch},
NodeGCThreshold: "12h",
BatchEvalGCThreshold: "4h",
HeartbeatGrace: 2 * time.Minute,
MinHeartbeatTTL: 2 * time.Minute,
MaxHeartbeatsPerSecond: 200.0,
Expand Down
8 changes: 8 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,15 @@ type Config struct {

// EvalGCThreshold is how "old" an evaluation must be to be eligible
// for GC. This gives users some time to debug a failed evaluation.
//
// Please note that the rules for GC of evaluations which belong to a batch
// job are separate and controlled by `BatchEvalGCThreshold`
EvalGCThreshold time.Duration

// BatchEvalGCThreshold is how "old" an evaluation must be to be eligible
// for GC if the eval belongs to a batch job.
BatchEvalGCThreshold time.Duration

// JobGCInterval is how often we dispatch a job to GC jobs that are
// available for garbage collection.
JobGCInterval time.Duration
Expand Down Expand Up @@ -444,6 +451,7 @@ func DefaultConfig() *Config {
ReconcileInterval: 60 * time.Second,
EvalGCInterval: 5 * time.Minute,
EvalGCThreshold: 1 * time.Hour,
BatchEvalGCThreshold: 168 * time.Hour,
JobGCInterval: 5 * time.Minute,
JobGCThreshold: 4 * time.Hour,
NodeGCInterval: 5 * time.Minute,
Expand Down
14 changes: 9 additions & 5 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ OUTER:
allEvalsGC := true
var jobAlloc, jobEval []string
for _, eval := range evals {
gc, allocs, err := c.gcEval(eval, oldThreshold, true)
gc, allocs, err := c.gcEval(eval, oldThreshold, oldThreshold, true)
if err != nil {
continue OUTER
} else if gc {
Expand Down Expand Up @@ -238,6 +238,8 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {

oldThreshold := c.getThreshold(eval, "eval",
"eval_gc_threshold", c.srv.config.EvalGCThreshold)
batchOldThreshold := c.getThreshold(eval, "eval",
"batch_eval_gc_threshold", c.srv.config.BatchEvalGCThreshold)

// Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string
Expand All @@ -246,7 +248,7 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {

// The Evaluation GC should not handle batch jobs since those need to be
// garbage collected in one shot
gc, allocs, err := c.gcEval(eval, oldThreshold, false)
gc, allocs, err := c.gcEval(eval, oldThreshold, batchOldThreshold, false)
if err != nil {
return err
}
Expand All @@ -272,7 +274,7 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
// allocs are not older than the threshold. If the eval should be garbage
// collected, the associated alloc ids that should also be removed are also
// returned
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, allowBatch bool) (
func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64, batchThresholdIndex uint64, allowBatch bool) (
bool, []string, error) {
// Ignore non-terminal and new evaluations
if !eval.TerminalStatus() || eval.ModifyIndex > thresholdIndex {
Expand All @@ -299,7 +301,9 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
// If the eval is from a running "batch" job we don't want to garbage
// collect its most current allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the allocations. However,
// we do want to GC old Evals and Allocs if there are newer ones due to an update.
// we do want to GC old Evals and Allocs if there are newer ones due to update. The age
// of the evaluation must also reach the threshold configured to be GCed so that one may
// debug old evaluations and referenced allocations.
if eval.Type == structs.JobTypeBatch {
// Check if the job is running

Expand All @@ -321,7 +325,7 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
}

if !collect {
oldAllocs, gcEval := olderVersionTerminalAllocs(allocs, job, thresholdIndex)
oldAllocs, gcEval := olderVersionTerminalAllocs(allocs, job, batchThresholdIndex)
return gcEval, oldAllocs, nil
}
}
Expand Down
12 changes: 6 additions & 6 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func TestCoreScheduler_EvalGC_Batch(t *testing.T) {

// Update the time tables to make this work
tt := s1.fsm.TimeTable()
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
tt.Witness(2000, time.Now().UTC().Add(-1*s1.config.BatchEvalGCThreshold))

// Create a core scheduler
snap, err := store.Snapshot()
Expand Down Expand Up @@ -503,15 +503,14 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
}
}
}

// Create a core scheduler
snap, err := store.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Attempt the GC without moving the time significantly.
// Attempt the GC
gc := s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx)
err = core.Process(gc)
if err != nil {
Expand All @@ -526,9 +525,10 @@ func TestCoreScheduler_EvalGC_Batch_OldVersion(t *testing.T) {
[]*structs.Allocation{},
)

// Attempt the GC while moving the time forward significantly.
// Attempt the GC while moving the time forward significantly so that our threshold is
// breached.
tt := s1.fsm.TimeTable()
tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.BatchEvalGCThreshold))

gc = s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2)
err = core.Process(gc)
Expand Down Expand Up @@ -639,7 +639,7 @@ func TestCoreScheduler_EvalGC_Batch_OldVersionReapsEval(t *testing.T) {

// Attempt the GC while moving the time forward significantly.
tt := s1.fsm.TimeTable()
tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.EvalGCThreshold))
tt.Witness(2*jobModifyIdx, time.Now().UTC().Add(-1*s1.config.BatchEvalGCThreshold))

gc := s1.coreJobEval(structs.CoreJobEvalGC, jobModifyIdx*2)
err = core.Process(gc)
Expand Down

0 comments on commit 823b8bc

Please sign in to comment.