Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable Multiple Preemptions within Cohort in a single Scheduling Cycle #2641

Merged
merged 2 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions pkg/cache/clusterqueue_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ func (c *ClusterQueueSnapshot) RGByResource(resource corev1.ResourceName) *Resou
return nil
}

func (c *ClusterQueueSnapshot) AddUsage(frq resources.FlavorResourceQuantitiesFlat) {
c.addOrRemoveUsage(frq, 1)
}

func (c *ClusterQueueSnapshot) Fits(frq resources.FlavorResourceQuantitiesFlat) bool {
for fr, q := range frq {
if c.Available(fr) < q {
return false
}
}
return true
}

func (c *ClusterQueueSnapshot) QuotaFor(fr resources.FlavorResource) *ResourceQuota {
return c.Quotas[fr]
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@ const (
//
// Enable the usage of batch.Job spec.managedBy field its MultiKueue integration.
MultiKueueBatchJobWithManagedBy featuregate.Feature = "MultiKueueBatchJobWithManagedBy"

// owner: @gabesaba
// alpha: v0.8
//
// Enable more than one workload sharing flavors to preempt within a Cohort,
// as long as the preemption targets don't overlap.
MultiplePreemptions featuregate.Feature = "MultiplePreemptions"
)

func init() {
Expand All @@ -112,6 +119,7 @@ var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
MultiKueue: {Default: false, PreRelease: featuregate.Alpha},
LendingLimit: {Default: false, PreRelease: featuregate.Alpha},
MultiKueueBatchJobWithManagedBy: {Default: false, PreRelease: featuregate.Alpha},
MultiplePreemptions: {Default: false, PreRelease: featuregate.Alpha},
}

func SetFeatureGateDuringTest(tb testing.TB, f featuregate.Feature, value bool) func() {
Expand Down
85 changes: 72 additions & 13 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package scheduler
import (
"context"
"fmt"
"maps"
"sort"
"strings"
"time"
Expand Down Expand Up @@ -172,6 +173,16 @@ func (cu cohortsUsage) hasCommonFlavorResources(cohort string, assignment resour
return false
}

func setSkipped(e *entry, inadmissibleMsg string) {
e.status = skipped
e.inadmissibleMsg = inadmissibleMsg
// Reset assignment so that we retry all flavors
// after skipping due to Fit no longer fitting,
// or Preempt being skipped due to an overlapping
// earlier admission.
e.LastAssignment = nil
}

func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
s.attemptCount++
log := ctrl.LoggerFrom(ctx).WithValues("attemptCount", s.attemptCount)
Expand Down Expand Up @@ -207,6 +218,7 @@ func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
// of other clusterQueues.
cycleCohortsUsage := cohortsUsage{}
cycleCohortsSkipPreemption := sets.New[string]()
preemptedWorkloads := sets.New[string]()
for i := range entries {
e := &entries[i]
mode := e.assignment.RepresentativeMode()
Expand All @@ -215,28 +227,55 @@ func (s *Scheduler) schedule(ctx context.Context) wait.SpeedSignal {
}

cq := snapshot.ClusterQueues[e.ClusterQueue]
if cq.Cohort != nil {
log := log.WithValues("workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue))
ctx := ctrl.LoggerInto(ctx, log)

if features.Enabled(features.MultiplePreemptions) {
if mode == flavorassigner.Preempt && len(e.preemptionTargets) == 0 {
log.V(2).Info("Workload requires preemption, but there are no candidate workloads allowed for preemption", "preemption", cq.Preemption)
// we use resourcesToReserve to block capacity up to either the nominal capacity,
// or the borrowing limit when borrowing, so that a lower priority workload cannot
// admit before us.
cq.AddUsage(resourcesToReserve(e, cq))
continue
}

// We skip multiple-preemptions per cohort if any of the targets are overlapping
pendingPreemptions := make([]string, 0, len(e.preemptionTargets))
for _, target := range e.preemptionTargets {
pendingPreemptions = append(pendingPreemptions, workload.Key(target.WorkloadInfo.Obj))
}
if preemptedWorkloads.HasAny(pendingPreemptions...) {
setSkipped(e, "Workload has overlapping preemption targets with another workload")
continue
}

usage := e.netUsage()
if !cq.Fits(usage) {
setSkipped(e, "Workload no longer fits after processing another workload")
continue
}
preemptedWorkloads.Insert(pendingPreemptions...)
cq.AddUsage(usage)
} else if cq.Cohort != nil {
sum := cycleCohortsUsage.totalUsageForCommonFlavorResources(cq.Cohort.Name, e.assignment.Usage)
// Check whether there was an assignment in this cycle that could render the next assignments invalid:
// - If the workload no longer fits in the cohort.
// - If there was another assignment in the cohort, then the preemption calculation is no longer valid.
if cycleCohortsUsage.hasCommonFlavorResources(cq.Cohort.Name, e.assignment.Usage) &&
((mode == flavorassigner.Fit && !cq.FitInCohort(sum)) ||
(mode == flavorassigner.Preempt && cycleCohortsSkipPreemption.Has(cq.Cohort.Name))) {
e.status = skipped
e.inadmissibleMsg = "other workloads in the cohort were prioritized"
// When the workload needs borrowing and there is another workload in cohort doesn't
// need borrowing, the workload needborrowing will come again. In this case we should
// not skip the previous flavors.
e.LastAssignment = nil
continue
if cycleCohortsUsage.hasCommonFlavorResources(cq.Cohort.Name, e.assignment.Usage) {
if mode == flavorassigner.Fit && !cq.FitInCohort(sum) {
setSkipped(e, "Workload no longer fits after processing another workload")
continue
}
if mode == flavorassigner.Preempt && cycleCohortsSkipPreemption.Has(cq.Cohort.Name) {
setSkipped(e, "Workload skipped because its premption calculations were invalidated by another workload")
continue
}
}
// Even if the workload will not be admitted after this point, due to preemption pending or other failures,
// we should still account for its usage.
cycleCohortsUsage.add(cq.Cohort.Name, resourcesToReserve(e, cq))
}
log := log.WithValues("workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue))
ctx := ctrl.LoggerInto(ctx, log)
if e.assignment.RepresentativeMode() != flavorassigner.Fit {
if len(e.preemptionTargets) != 0 {
// If preemptions are issued, the next attempt should try all the flavors.
Expand Down Expand Up @@ -322,6 +361,26 @@ type entry struct {
preemptionTargets []*preemption.Target
}

// netUsage returns how much capacity this entry will require from the ClusterQueue/Cohort.
// When a workload is preempting, it subtracts the preempted resources from the resources
// required, as the remaining quota is all we need from the CQ/Cohort.
func (e *entry) netUsage() resources.FlavorResourceQuantitiesFlat {
if e.assignment.RepresentativeMode() == flavorassigner.Fit {
return e.assignment.Usage
}

usage := maps.Clone(e.assignment.Usage)
for target := range e.preemptionTargets {
for fr, v := range e.preemptionTargets[target].WorkloadInfo.FlavorResourceUsage() {
if _, uses := usage[fr]; !uses {
continue
}
usage[fr] = max(0, usage[fr]-v)
}
}
return usage
}

// nominate returns the workloads with their requirements (resource flavors, borrowing) if
// they were admitted by the clusterQueues in the snapshot.
func (s *Scheduler) nominate(ctx context.Context, workloads []workload.Info, snap cache.Snapshot) []entry {
Expand Down
Loading