From fe3fb7afadfd98dc5097f33dffdc8ddc5e6eb117 Mon Sep 17 00:00:00 2001 From: Seth Hoenig Date: Fri, 9 Oct 2020 16:31:38 -0500 Subject: [PATCH] core: implement system batch scheduler This PR implements a new "System Batch" scheduler type. Jobs can make use of this new scheduler by setting their type to 'sysbatch'. Like the name implies, sysbatch can be thought of as a hybrid between system and batch jobs - it is for running short lived jobs intended to run on every compatible node in the cluster. As with batch jobs, sysbatch jobs can also be periodic and/or parameterized dispatch jobs. A sysbatch job is considered complete when it has been run on all compatible nodes until reaching a terminal state (success or failed on retries). Feasibility, rolling updates, and preemption are governed the same as with system jobs. Closes #2527 --- .../taskrunner/restarts/restarts.go | 18 +++--- nomad/core_sched.go | 5 +- nomad/state/schema.go | 13 +++-- nomad/state/state_store.go | 2 +- nomad/structs/structs.go | 27 +++++---- scheduler/generic_sched.go | 2 +- scheduler/rank.go | 4 +- scheduler/scheduler.go | 7 ++- scheduler/stack.go | 13 +++-- scheduler/system_sched.go | 57 ++++++++++++------- 10 files changed, 91 insertions(+), 57 deletions(-) diff --git a/client/allocrunner/taskrunner/restarts/restarts.go b/client/allocrunner/taskrunner/restarts/restarts.go index 6ee0056ccd8..429ee07a038 100644 --- a/client/allocrunner/taskrunner/restarts/restarts.go +++ b/client/allocrunner/taskrunner/restarts/restarts.go @@ -14,15 +14,19 @@ const ( // jitter is the percent of jitter added to restart delays. jitter = 0.25 - ReasonNoRestartsAllowed = "Policy allows no restarts" - ReasonUnrecoverableErrror = "Error was unrecoverable" - ReasonWithinPolicy = "Restart within policy" - ReasonDelay = "Exceeded allowed attempts, applying a delay" + ReasonNoRestartsAllowed = "Policy allows no restarts" + ReasonUnrecoverableError = "Error was unrecoverable" + ReasonWithinPolicy = "Restart within policy" + ReasonDelay = "Exceeded allowed attempts, applying a delay" ) func NewRestartTracker(policy *structs.RestartPolicy, jobType string, tlc *structs.TaskLifecycleConfig) *RestartTracker { - // Batch jobs should not restart if they exit successfully - onSuccess := jobType != structs.JobTypeBatch + onSuccess := true + + // Batch & SysBatch jobs should not restart if they exit successfully + if jobType == structs.JobTypeBatch || jobType == structs.JobTypeSysBatch { + onSuccess = false + } // Prestart sidecars should get restarted on success if tlc != nil && tlc.Hook == structs.TaskLifecycleHookPrestart { @@ -196,7 +200,7 @@ func (r *RestartTracker) GetState() (string, time.Duration) { if r.startErr != nil { // If the error is not recoverable, do not restart. if !structs.IsRecoverable(r.startErr) { - r.reason = ReasonUnrecoverableErrror + r.reason = ReasonUnrecoverableError return structs.TaskNotRestarting, 0 } } else if r.exitRes != nil { diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 1ac135d0aae..eb796f66bca 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -136,9 +136,7 @@ OUTER: gc, allocs, err := c.gcEval(eval, oldThreshold, true) if err != nil { continue OUTER - } - - if gc { + } else if gc { jobEval = append(jobEval, eval.ID) jobAlloc = append(jobAlloc, allocs...) } else { @@ -160,6 +158,7 @@ OUTER: if len(gcEval) == 0 && len(gcAlloc) == 0 && len(gcJob) == 0 { return nil } + c.logger.Debug("job GC found eligible objects", "jobs", len(gcJob), "evals", len(gcEval), "allocs", len(gcAlloc)) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 3d1308859ae..041a1ec3bef 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -265,13 +265,16 @@ func jobIsGCable(obj interface{}) (bool, error) { return true, nil } - // Otherwise, only batch jobs are eligible because they complete on their - // own without a user stopping them. - if j.Type != structs.JobTypeBatch { + switch j.Type { + // Otherwise, batch and sysbatch jobs are eligible because they complete on + // their own without a user stopping them. + case structs.JobTypeBatch, structs.JobTypeSysBatch: + return true, nil + + default: + // other job types may not be GC until stopped return false, nil } - - return true, nil } // jobIsPeriodic satisfies the ConditionalIndexFunc interface and creates an index diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d5b2f1d3ffa..ed4e94efd88 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1935,7 +1935,7 @@ func (s *StateStore) JobsByScheduler(ws memdb.WatchSet, schedulerType string) (m return iter, nil } -// JobsByGC returns an iterator over all jobs eligible or uneligible for garbage +// JobsByGC returns an iterator over all jobs eligible or ineligible for garbage // collection. func (s *StateStore) JobsByGC(ws memdb.WatchSet, gc bool) (memdb.ResultIterator, error) { txn := s.db.ReadTxn() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 44f9b1f9c41..932c40b4d08 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3744,10 +3744,11 @@ func (c *ComparableResources) NetIndex(n *NetworkResource) int { const ( // JobTypeNomad is reserved for internal system tasks and is // always handled by the CoreScheduler. - JobTypeCore = "_core" - JobTypeService = "service" - JobTypeBatch = "batch" - JobTypeSystem = "system" + JobTypeCore = "_core" + JobTypeService = "service" + JobTypeBatch = "batch" + JobTypeSystem = "system" + JobTypeSysBatch = "sysbatch" ) const ( @@ -4010,7 +4011,7 @@ func (j *Job) Validate() error { mErr.Errors = append(mErr.Errors, errors.New("Job must be in a namespace")) } switch j.Type { - case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem: + case JobTypeCore, JobTypeService, JobTypeBatch, JobTypeSystem, JobTypeSysBatch: case "": mErr.Errors = append(mErr.Errors, errors.New("Missing job type")) default: @@ -4102,11 +4103,12 @@ func (j *Job) Validate() error { } } - // Validate periodic is only used with batch jobs. + // Validate periodic is only used with batch or sysbatch jobs. if j.IsPeriodic() && j.Periodic.Enabled { - if j.Type != JobTypeBatch { - mErr.Errors = append(mErr.Errors, - fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch)) + if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch { + mErr.Errors = append(mErr.Errors, fmt.Errorf( + "Periodic can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch, + )) } if err := j.Periodic.Validate(); err != nil { @@ -4115,9 +4117,10 @@ func (j *Job) Validate() error { } if j.IsParameterized() { - if j.Type != JobTypeBatch { - mErr.Errors = append(mErr.Errors, - fmt.Errorf("Parameterized job can only be used with %q scheduler", JobTypeBatch)) + if j.Type != JobTypeBatch && j.Type != JobTypeSysBatch { + mErr.Errors = append(mErr.Errors, fmt.Errorf( + "Parameterized job can only be used with %q or %q scheduler", JobTypeBatch, JobTypeSysBatch, + )) } if err := j.ParameterizedJob.Validate(); err != nil { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index c67eafad870..b933deb1eb2 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -36,7 +36,7 @@ const ( // allocInPlace is the status used when speculating on an in-place update allocInPlace = "alloc updating in-place" - // allocNodeTainted is the status used when stopping an alloc because it's + // allocNodeTainted is the status used when stopping an alloc because its // node is tainted. allocNodeTainted = "alloc not needed as node is tainted" diff --git a/scheduler/rank.go b/scheduler/rank.go index 1653d9cf906..ec4b2635d42 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -24,7 +24,7 @@ type RankedNode struct { TaskLifecycles map[string]*structs.TaskLifecycleConfig AllocResources *structs.AllocatedSharedResources - // Allocs is used to cache the proposed allocations on the + // Proposed is used to cache the proposed allocations on the // node. This can be shared between iterators that require it. Proposed []*structs.Allocation @@ -60,7 +60,7 @@ func (r *RankedNode) SetTaskResources(task *structs.Task, r.TaskLifecycles[task.Name] = task.Lifecycle } -// RankFeasibleIterator is used to iteratively yield nodes along +// RankIterator is used to iteratively yield nodes along // with ranking metadata. The iterators may manage some state for // performance optimizations. type RankIterator interface { diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index a950690db44..d1bbfa4c3e4 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -21,9 +21,10 @@ const ( // BuiltinSchedulers contains the built in registered schedulers // which are available var BuiltinSchedulers = map[string]Factory{ - "service": NewServiceScheduler, - "batch": NewBatchScheduler, - "system": NewSystemScheduler, + "service": NewServiceScheduler, + "batch": NewBatchScheduler, + "system": NewSystemScheduler, + "sysbatch": NewSysBatchScheduler, } // NewScheduler is used to instantiate and return a new scheduler diff --git a/scheduler/stack.go b/scheduler/stack.go index bccabc7899a..96ce92713bf 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -237,10 +237,13 @@ func NewSystemStack(ctx Context) *SystemStack { // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, + tgs := []FeasibilityChecker{ + s.taskGroupDrivers, + s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices, - s.taskGroupNetwork} + s.taskGroupNetwork, + } avail := []FeasibilityChecker{s.taskGroupCSIVolumes} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail) @@ -360,11 +363,13 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, + tgs := []FeasibilityChecker{ + s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupHostVolumes, s.taskGroupDevices, - s.taskGroupNetwork} + s.taskGroupNetwork, + } avail := []FeasibilityChecker{s.taskGroupCSIVolumes} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail) diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 4b1e5c8cbfa..70f5b6e3d1d 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -14,15 +14,21 @@ const ( // we will attempt to schedule if we continue to hit conflicts for system // jobs. maxSystemScheduleAttempts = 5 + + // maxSysBatchScheduleAttempts is used to limit the number of times we will + // attempt to schedule if we continue to hit conflicts for sysbatch jobs. + maxSysBatchScheduleAttempts = 2 ) -// SystemScheduler is used for 'system' jobs. This scheduler is -// designed for services that should be run on every client. -// One for each job, containing an allocation for each node +// SystemScheduler is used for 'system' and 'sysbatch' jobs. This scheduler is +// designed for jobs that should be run on every client. The 'system' mode +// will ensure those jobs continuously run regardless of successful task exits, +// whereas 'sysbatch' marks the task complete on success. type SystemScheduler struct { - logger log.Logger - state State - planner Planner + logger log.Logger + state State + planner Planner + sysbatch bool eval *structs.Evaluation job *structs.Job @@ -30,8 +36,9 @@ type SystemScheduler struct { planResult *structs.PlanResult ctx *EvalContext stack *SystemStack - nodes []*structs.Node - nodesByDC map[string]int + + nodes []*structs.Node + nodesByDC map[string]int limitReached bool nextEval *structs.Evaluation @@ -44,9 +51,19 @@ type SystemScheduler struct { // scheduler. func NewSystemScheduler(logger log.Logger, state State, planner Planner) Scheduler { return &SystemScheduler{ - logger: logger.Named("system_sched"), - state: state, - planner: planner, + logger: logger.Named("system_sched"), + state: state, + planner: planner, + sysbatch: false, + } +} + +func NewSysBatchScheduler(logger log.Logger, state State, planner Planner) Scheduler { + return &SystemScheduler{ + logger: logger.Named("sysbatch_sched"), + state: state, + planner: planner, + sysbatch: true, } } @@ -71,9 +88,14 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { s.queuedAllocs, "") } + limit := maxSystemScheduleAttempts + if s.sysbatch { + limit = maxSysBatchScheduleAttempts + } + // Retry up to the maxSystemScheduleAttempts and reset if progress is made. progress := func() bool { return progressMade(s.planResult) } - if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil { + if err := retryMax(limit, s.process, progress); err != nil { if statusErr, ok := err.(*SetStatusError); ok { return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, statusErr.EvalStatus, err.Error(), s.queuedAllocs, "") @@ -94,8 +116,7 @@ func (s *SystemScheduler) process() (bool, error) { ws := memdb.NewWatchSet() s.job, err = s.state.JobByID(ws, s.eval.Namespace, s.eval.JobID) if err != nil { - return false, fmt.Errorf("failed to get job '%s': %v", - s.eval.JobID, err) + return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err) } numTaskGroups := 0 if !s.job.Stopped() { @@ -185,19 +206,17 @@ func (s *SystemScheduler) computeJobAllocs() error { ws := memdb.NewWatchSet() allocs, err := s.state.AllocsByJob(ws, s.eval.Namespace, s.eval.JobID, true) if err != nil { - return fmt.Errorf("failed to get allocs for job '%s': %v", - s.eval.JobID, err) + return fmt.Errorf("failed to get allocs for job '%s': %v", s.eval.JobID, err) } // Determine the tainted nodes containing job allocs tainted, err := taintedNodes(s.state, allocs) if err != nil { - return fmt.Errorf("failed to get tainted nodes for job '%s': %v", - s.eval.JobID, err) + return fmt.Errorf("failed to get tainted nodes for job '%s': %v", s.eval.JobID, err) } // Update the allocations which are in pending/running state on tainted - // nodes to lost + // nodes to lost. updateNonTerminalAllocsToLost(s.plan, tainted, allocs) // Filter out the allocations in a terminal state