Skip to content

Commit

Permalink
GC: ensure no leakage of evaluations for batch jobs. (#15097) (#15988)
Browse files Browse the repository at this point in the history
Prior to 2409f72 the code compared the modification index of a job to
itself. Afterwards, the code compared the creation index of the job to
itself. In either case there should never be a case of re-parenting of allocs
causing the evaluation to trivially always result in false, which leads to
unreclaimable memory.

Prior to this change allocations and evaluations for batch jobs were never
garbage collected until the batch job was explicitly stopped. The new
`batch_eval_gc_threshold` server configuration controls how often they are
collected. The default threshold is `24h`.

Co-authored-by: stswidwinski <[email protected]>
  • Loading branch information
hc-github-team-nomad-core and stswidwinski authored Jan 31, 2023
1 parent aa73ece commit 691a904
Show file tree
Hide file tree
Showing 8 changed files with 381 additions and 340 deletions.
3 changes: 3 additions & 0 deletions .changelog/15097.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:breaking-change
core: Ensure no leakage of evaluations for batch jobs. Prior to this change allocations and evaluations for batch jobs were never garbage collected until the batch job was explicitly stopped. The new `batch_eval_gc_threshold` server configuration controls how often they are collected. The default threshold is `24h`.
```
7 changes: 7 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,13 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
}
conf.EvalGCThreshold = dur
}
if gcThreshold := agentConfig.Server.BatchEvalGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
return nil, err
}
conf.BatchEvalGCThreshold = dur
}
if gcThreshold := agentConfig.Server.DeploymentGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,9 +449,14 @@ type ServerConfig struct {

// EvalGCThreshold controls how "old" an eval must be to be collected by GC.
// Age is not the only requirement for a eval to be GCed but the threshold
// can be used to filter by age.
// can be used to filter by age. Please note that batch job evaluations are
// controlled by 'BatchEvalGCThreshold' instead.
EvalGCThreshold string `hcl:"eval_gc_threshold"`

// BatchEvalGCThreshold controls how "old" an evaluation must be to be eligible
// for GC if the eval belongs to a batch job.
BatchEvalGCThreshold string `hcl:"batch_eval_gc_threshold"`

// DeploymentGCThreshold controls how "old" a deployment must be to be
// collected by GC. Age is not the only requirement for a deployment to be
// GCed but the threshold can be used to filter by age.
Expand Down Expand Up @@ -1773,6 +1778,9 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.EvalGCThreshold != "" {
result.EvalGCThreshold = b.EvalGCThreshold
}
if b.BatchEvalGCThreshold != "" {
result.BatchEvalGCThreshold = b.BatchEvalGCThreshold
}
if b.DeploymentGCThreshold != "" {
result.DeploymentGCThreshold = b.DeploymentGCThreshold
}
Expand Down
2 changes: 2 additions & 0 deletions command/agent/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func TestConfig_Merge(t *testing.T) {
RaftMultiplier: pointer.Of(5),
NumSchedulers: pointer.Of(1),
NodeGCThreshold: "1h",
BatchEvalGCThreshold: "4h",
HeartbeatGrace: 30 * time.Second,
MinHeartbeatTTL: 30 * time.Second,
MaxHeartbeatsPerSecond: 30.0,
Expand Down Expand Up @@ -335,6 +336,7 @@ func TestConfig_Merge(t *testing.T) {
NumSchedulers: pointer.Of(2),
EnabledSchedulers: []string{structs.JobTypeBatch},
NodeGCThreshold: "12h",
BatchEvalGCThreshold: "4h",
HeartbeatGrace: 2 * time.Minute,
MinHeartbeatTTL: 2 * time.Minute,
MaxHeartbeatsPerSecond: 200.0,
Expand Down
8 changes: 8 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,15 @@ type Config struct {

// EvalGCThreshold is how "old" an evaluation must be to be eligible
// for GC. This gives users some time to debug a failed evaluation.
//
// Please note that the rules for GC of evaluations which belong to a batch
// job are separate and controlled by `BatchEvalGCThreshold`
EvalGCThreshold time.Duration

// BatchEvalGCThreshold is how "old" an evaluation must be to be eligible
// for GC if the eval belongs to a batch job.
BatchEvalGCThreshold time.Duration

// JobGCInterval is how often we dispatch a job to GC jobs that are
// available for garbage collection.
JobGCInterval time.Duration
Expand Down Expand Up @@ -417,6 +424,7 @@ func DefaultConfig() *Config {
ReconcileInterval: 60 * time.Second,
EvalGCInterval: 5 * time.Minute,
EvalGCThreshold: 1 * time.Hour,
BatchEvalGCThreshold: 24 * time.Hour,
JobGCInterval: 5 * time.Minute,
JobGCThreshold: 4 * time.Hour,
NodeGCInterval: 5 * time.Minute,
Expand Down
100 changes: 54 additions & 46 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,31 +229,22 @@ func (c *CoreScheduler) evalGC(eval *structs.Evaluation) error {
return err
}

var oldThreshold uint64
if eval.JobID == structs.CoreJobForceGC {
// The GC was forced, so set the threshold to its maximum so everything
// will GC.
oldThreshold = math.MaxUint64
c.logger.Debug("forced eval GC")
} else {
// Compute the old threshold limit for GC using the FSM
// time table. This is a rough mapping of a time to the
// Raft index it belongs to.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.EvalGCThreshold)
oldThreshold = tt.NearestIndex(cutoff)
c.logger.Debug("eval GC scanning before cutoff index",
"index", oldThreshold, "eval_gc_threshold", c.srv.config.EvalGCThreshold)
}
oldThreshold := c.getThreshold(eval, "eval",
"eval_gc_threshold", c.srv.config.EvalGCThreshold)
batchOldThreshold := c.getThreshold(eval, "eval",
"batch_eval_gc_threshold", c.srv.config.BatchEvalGCThreshold)

// Collect the allocations and evaluations to GC
var gcAlloc, gcEval []string
for raw := iter.Next(); raw != nil; raw = iter.Next() {
eval := raw.(*structs.Evaluation)

// The Evaluation GC should not handle batch jobs since those need to be
// garbage collected in one shot
gc, allocs, err := c.gcEval(eval, oldThreshold, false)
gcThreshold := oldThreshold
if eval.Type == structs.JobTypeBatch {
gcThreshold = batchOldThreshold
}

gc, allocs, err := c.gcEval(eval, gcThreshold, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -304,33 +295,26 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
}

// If the eval is from a running "batch" job we don't want to garbage
// collect its allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the
// allocations.
// collect its most current allocations. If there is a long running batch job and its
// terminal allocations get GC'd the scheduler would re-run the allocations. However,
// we do want to GC old Evals and Allocs if there are newer ones due to update.
//
// The age of the evaluation must also reach the threshold configured to be GCed so that
// one may debug old evaluations and referenced allocations.
if eval.Type == structs.JobTypeBatch {
// Check if the job is running

// Can collect if:
// Job doesn't exist
// Job is Stopped and dead
// allowBatch and the job is dead
collect := false
if job == nil {
collect = true
} else if job.Status != structs.JobStatusDead {
collect = false
} else if job.Stop {
collect = true
} else if allowBatch {
collect = true
}

// We don't want to gc anything related to a job which is not dead
// If the batch job doesn't exist we can GC it regardless of allowBatch
// Can collect if either holds:
// - Job doesn't exist
// - Job is Stopped and dead
// - allowBatch and the job is dead
//
// If we cannot collect outright, check if a partial GC may occur
collect := job == nil || job.Status == structs.JobStatusDead && (job.Stop || allowBatch)
if !collect {
// Find allocs associated with older (based on createindex) and GC them if terminal
oldAllocs := olderVersionTerminalAllocs(allocs, job)
return false, oldAllocs, nil
oldAllocs := olderVersionTerminalAllocs(allocs, job, thresholdIndex)
gcEval := (len(oldAllocs) == len(allocs))
return gcEval, oldAllocs, nil
}
}

Expand All @@ -351,12 +335,12 @@ func (c *CoreScheduler) gcEval(eval *structs.Evaluation, thresholdIndex uint64,
return gcEval, gcAllocIDs, nil
}

// olderVersionTerminalAllocs returns terminal allocations whose job create index
// is older than the job's create index
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job) []string {
// olderVersionTerminalAllocs returns a list of terminal allocations that belong to the evaluation and may be
// GCed.
func olderVersionTerminalAllocs(allocs []*structs.Allocation, job *structs.Job, thresholdIndex uint64) []string {
var ret []string
for _, alloc := range allocs {
if alloc.Job != nil && alloc.Job.CreateIndex < job.CreateIndex && alloc.TerminalStatus() {
if alloc.CreateIndex < job.JobModifyIndex && alloc.ModifyIndex < thresholdIndex && alloc.TerminalStatus() {
ret = append(ret, alloc.ID)
}
}
Expand Down Expand Up @@ -851,3 +835,27 @@ func (c *CoreScheduler) expiredOneTimeTokenGC(eval *structs.Evaluation) error {
}
return c.srv.RPC("ACL.ExpireOneTimeTokens", req, &structs.GenericResponse{})
}

// getThreshold returns the index threshold for determining whether an
// object is old enough to GC
func (c *CoreScheduler) getThreshold(eval *structs.Evaluation, objectName, configName string, configThreshold time.Duration) uint64 {
var oldThreshold uint64
if eval.JobID == structs.CoreJobForceGC {
// The GC was forced, so set the threshold to its maximum so
// everything will GC.
oldThreshold = math.MaxUint64
c.logger.Debug(fmt.Sprintf("forced %s GC", objectName))
} else {
// Compute the old threshold limit for GC using the FSM
// time table. This is a rough mapping of a time to the
// Raft index it belongs to.
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * configThreshold)
oldThreshold = tt.NearestIndex(cutoff)
c.logger.Debug(
fmt.Sprintf("%s GC scanning before cutoff index", objectName),
"index", oldThreshold,
configName, configThreshold)
}
return oldThreshold
}
Loading

0 comments on commit 691a904

Please sign in to comment.