diff --git a/config/armada/config.yaml b/config/armada/config.yaml index cb9b8e56681..32875689747 100644 --- a/config/armada/config.yaml +++ b/config/armada/config.yaml @@ -67,6 +67,10 @@ scheduling: maximumResourceFractionToSchedule: memory: 1.0 cpu: 1.0 + maximumSchedulingRate: 100.0 + maximumSchedulingBurst: 1000 + maximumPerQueueSchedulingRate: 50.0 + maximumPerQueueSchedulingBurst: 1000 maxJobSchedulingContextsPerExecutor: 10000 lease: expireAfter: 15m diff --git a/config/scheduler/config.yaml b/config/scheduler/config.yaml index 61a7daa0adc..dab4f2780a6 100644 --- a/config/scheduler/config.yaml +++ b/config/scheduler/config.yaml @@ -97,7 +97,10 @@ scheduling: maximumResourceFractionToSchedule: memory: 1.0 cpu: 1.0 - maximumJobsToSchedule: 5000 + maximumSchedulingRate: 100.0 + maximumSchedulingBurst: 1000 + maximumPerQueueSchedulingRate: 50.0 + maximumPerQueueSchedulingBurst: 1000 maxUnacknowledgedJobsPerExecutor: 2500 maxJobSchedulingContextsPerExecutor: 10000 defaultJobLimits: diff --git a/go.mod b/go.mod index 5346d5c3ed2..e0d6c851eda 100644 --- a/go.mod +++ b/go.mod @@ -93,6 +93,7 @@ require ( github.com/prometheus/common v0.37.0 github.com/sanity-io/litter v1.5.5 github.com/segmentio/fasthash v1.0.3 + golang.org/x/time v0.3.0 google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9 ) @@ -195,7 +196,6 @@ require ( golang.org/x/sys v0.7.0 // indirect golang.org/x/term v0.7.0 // indirect golang.org/x/text v0.9.0 // indirect - golang.org/x/time v0.3.0 // indirect google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect google.golang.org/protobuf v1.30.0 // indirect diff --git a/internal/armada/configuration/types.go b/internal/armada/configuration/types.go index 77fc34a548c..c1b518d09a0 100644 --- a/internal/armada/configuration/types.go +++ b/internal/armada/configuration/types.go @@ -124,10 +124,33 @@ type SchedulingConfig struct { MaximumResourceFractionToSchedule map[string]float64 // Overrides MaximalClusterFractionToSchedule if set for the current pool. MaximumResourceFractionToScheduleByPool map[string]map[string]float64 - // Max number of jobs to schedule in each invocation of the scheduler. - MaximumJobsToSchedule uint - // Max number of gangs to schedule in each invocation of the scheduler. - MaximumGangsToSchedule uint + // The rate at which Armada schedules jobs is rate-limited using a token bucket approach. + // Specifically, there is a token bucket that persists between scheduling rounds. + // The bucket fills up at a rate of MaximumSchedulingRate tokens per second and has capacity MaximumSchedulingBurst. + // A token is removed from the bucket when a scheduling a job and scheduling stops while the bucket is empty. + // + // Hence, MaximumSchedulingRate controls the maximum number of jobs scheduled per second in steady-state, + // i.e., once the burst capacity has been exhausted. + // + // Rate-limiting is based on the number of tokens available at the start of each scheduling round, + // i.e., tokens accumulated while scheduling become available at the start of the next scheduling round. + // + // For more information about the rate-limiter, see: + // https://pkg.go.dev/golang.org/x/time/rate#Limiter + MaximumSchedulingRate float64 `validate:"gt=0"` + // MaximumSchedulingBurst controls the burst capacity of the rate-limiter. + // + // There are two important implications: + // - Armada will never schedule more than MaximumSchedulingBurst jobs per scheduling round. + // - Gang jobs with cardinality greater than MaximumSchedulingBurst can never be scheduled. + MaximumSchedulingBurst int `validate:"gt=0"` + // In addition to the global rate-limiter, there is a separate rate-limiter for each queue. + // These work the same as the global rate-limiter, except they apply only to jobs scheduled from a specific queue. + // + // Per-queue version of MaximumSchedulingRate. + MaximumPerQueueSchedulingRate float64 `validate:"gt=0"` + // Per-queue version of MaximumSchedulingBurst. + MaximumPerQueueSchedulingBurst int `validate:"gt=0"` // Armada stores contexts associated with recent job scheduling attempts. // This setting limits the number of such contexts to store. // Contexts associated with the most recent scheduling attempt for each queue and cluster are always stored. diff --git a/internal/armada/server/lease.go b/internal/armada/server/lease.go index d2f8a063c5c..7d1d7c2abec 100644 --- a/internal/armada/server/lease.go +++ b/internal/armada/server/lease.go @@ -18,6 +18,7 @@ import ( log "github.com/sirupsen/logrus" "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" + "golang.org/x/time/rate" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "k8s.io/utils/clock" @@ -59,6 +60,10 @@ type AggregatedQueueServer struct { schedulingInfoRepository repository.SchedulingInfoRepository decompressorPool *pool.ObjectPool clock clock.Clock + // Global job scheduling rate-limiter. + limiter *rate.Limiter + // Per-queue job scheduling rate-limiters. + limiterByQueue map[string]*rate.Limiter // For storing reports of scheduling attempts. SchedulingContextRepository *scheduler.SchedulingContextRepository // Stores the most recent NodeDb for each executor. @@ -92,18 +97,22 @@ func NewAggregatedQueueServer( TimeBetweenEvictionRuns: 0, NumTestsPerEvictionRun: 10, } - decompressorPool := pool.NewObjectPool(context.Background(), pool.NewPooledObjectFactorySimple( func(context.Context) (interface{}, error) { return compress.NewZlibDecompressor(), nil }), &poolConfig) return &AggregatedQueueServer{ - permissions: permissions, - schedulingConfig: schedulingConfig, - jobRepository: jobRepository, - queueRepository: queueRepository, - usageRepository: usageRepository, - eventStore: eventStore, + permissions: permissions, + schedulingConfig: schedulingConfig, + jobRepository: jobRepository, + queueRepository: queueRepository, + usageRepository: usageRepository, + eventStore: eventStore, + limiter: rate.NewLimiter( + rate.Limit(schedulingConfig.MaximumSchedulingRate), + schedulingConfig.MaximumSchedulingBurst, + ), + limiterByQueue: make(map[string]*rate.Limiter), schedulingInfoRepository: schedulingInfoRepository, decompressorPool: decompressorPool, executorRepository: executorRepository, @@ -491,6 +500,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL q.schedulingConfig.Preemption.PriorityClasses, q.schedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + q.limiter, totalResources, ) for queue, priorityFactor := range priorityFactorByQueue { @@ -502,7 +512,16 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL if priorityFactor > 0 { weight = 1 / priorityFactor } - if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue]); err != nil { + queueLimiter, ok := q.limiterByQueue[queue] + if !ok { + // Create per-queue limiters lazily. + queueLimiter = rate.NewLimiter( + rate.Limit(q.schedulingConfig.MaximumPerQueueSchedulingRate), + q.schedulingConfig.MaximumPerQueueSchedulingBurst, + ) + q.limiterByQueue[queue] = queueLimiter + } + if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue], queueLimiter); err != nil { return nil, err } } diff --git a/internal/scheduler/constraints/constraints.go b/internal/scheduler/constraints/constraints.go index 477b121b2e4..b859d71bca1 100644 --- a/internal/scheduler/constraints/constraints.go +++ b/internal/scheduler/constraints/constraints.go @@ -1,6 +1,7 @@ package constraints import ( + "fmt" "math" "github.com/pkg/errors" @@ -12,32 +13,46 @@ import ( ) const ( - UnschedulableReasonMaximumResourcesScheduled = "maximum resources scheduled" - UnschedulableReasonMaximumNumberOfJobsScheduled = "maximum number of jobs scheduled" - UnschedulableReasonMaximumNumberOfGangsScheduled = "maximum number of gangs scheduled" - UnschedulableReasonMaximumResourcesPerQueueExceeded = "maximum total resources for this queue exceeded" + // Indicates that the limit on resources scheduled per round has been exceeded. + MaximumResourcesScheduledUnschedulableReason = "maximum resources scheduled" + + // Indicates that a queue has been assigned more than its allowed amount of resources. + MaximumResourcesPerQueueExceededUnschedulableReason = "maximum total resources for this queue exceeded" + + // Indicates that the scheduling rate limit has been exceeded. + GlobalRateLimitExceededUnschedulableReason = "global scheduling rate limit exceeded" + QueueRateLimitExceededUnschedulableReason = "queue scheduling rate limit exceeded" + + // Indicates that scheduling a gang would exceed the rate limit. + GlobalRateLimitExceededByGangUnschedulableReason = "gang would exceed global scheduling rate limit" + QueueRateLimitExceededByGangUnschedulableReason = "gang would exceed queue scheduling rate limit" + + // Indicates that the number of jobs in a gang exceeds the burst size. + // This means the gang can not be scheduled without first increasing the burst size. + GangExceedsGlobalBurstSizeUnschedulableReason = "gang cardinality too large: exceeds global max burst size" + GangExceedsQueueBurstSizeUnschedulableReason = "gang cardinality too large: exceeds queue max burst size" ) -// IsTerminalUnschedulableReason returns true if reason indicates it's not possible to schedule any more jobs in this round. +// IsTerminalUnschedulableReason returns true if reason indicates +// it's not possible to schedule any more jobs in this round. func IsTerminalUnschedulableReason(reason string) bool { - if reason == UnschedulableReasonMaximumResourcesScheduled { + if reason == MaximumResourcesScheduledUnschedulableReason { return true } - if reason == UnschedulableReasonMaximumNumberOfJobsScheduled { - return true - } - if reason == UnschedulableReasonMaximumNumberOfGangsScheduled { + if reason == GlobalRateLimitExceededUnschedulableReason { return true } return false } +// IsTerminalQueueUnschedulableReason returns true if reason indicates +// it's not possible to schedule any more jobs from this queue in this round. +func IsTerminalQueueUnschedulableReason(reason string) bool { + return reason == QueueRateLimitExceededUnschedulableReason +} + // SchedulingConstraints contains scheduling constraints, e.g., per-queue resource limits. type SchedulingConstraints struct { - // Max number of jobs to scheduler per lease jobs call. - MaximumJobsToSchedule uint - // Max number of jobs to scheduler per lease jobs call. - MaximumGangsToSchedule uint // Max number of jobs to consider for a queue before giving up. MaxQueueLookback uint // Jobs leased to this executor must be at least this large. @@ -82,8 +97,6 @@ func SchedulingConstraintsFromSchedulingConfig( maximumResourceFractionToSchedule = m } return SchedulingConstraints{ - MaximumJobsToSchedule: config.MaximumJobsToSchedule, - MaximumGangsToSchedule: config.MaximumGangsToSchedule, MaxQueueLookback: config.MaxQueueLookback, MinimumJobSize: minimumJobSize, MaximumResourcesToSchedule: absoluteFromRelativeLimits(totalResources, maximumResourceFractionToSchedule), @@ -99,47 +112,75 @@ func absoluteFromRelativeLimits(totalResources schedulerobjects.ResourceList, re return absoluteLimits } -func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext) (bool, string, error) { - // MaximumJobsToSchedule check. - if constraints.MaximumJobsToSchedule != 0 && sctx.NumScheduledJobs == int(constraints.MaximumJobsToSchedule) { - return false, UnschedulableReasonMaximumNumberOfJobsScheduled, nil - } - - // MaximumGangsToSchedule check. - if constraints.MaximumGangsToSchedule != 0 && sctx.NumScheduledGangs == int(constraints.MaximumGangsToSchedule) { - return false, UnschedulableReasonMaximumNumberOfGangsScheduled, nil - } +// ScaleQuantity scales q in-place by a factor f. +// This functions overflows for quantities the milli value of which can't be expressed as an int64. +// E.g., 1Pi is ok, but not 10Pi. +func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity { + q.SetMilli(int64(math.Round(float64(q.MilliValue()) * f))) + return q +} +func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext, queue string) (bool, string, error) { // MaximumResourcesToSchedule check. if !sctx.ScheduledResources.IsStrictlyLessOrEqual(constraints.MaximumResourcesToSchedule) { - return false, UnschedulableReasonMaximumResourcesScheduled, nil + return false, MaximumResourcesScheduledUnschedulableReason, nil } return true, "", nil } -func (constraints *SchedulingConstraints) CheckPerQueueAndPriorityClassConstraints( +func (constraints *SchedulingConstraints) CheckConstraints( sctx *schedulercontext.SchedulingContext, - queue string, - priorityClassName string, + gctx *schedulercontext.GangSchedulingContext, ) (bool, string, error) { - qctx := sctx.QueueSchedulingContexts[queue] + qctx := sctx.QueueSchedulingContexts[gctx.Queue] if qctx == nil { - return false, "", errors.Errorf("no QueueSchedulingContext for queue %s", queue) + return false, "", errors.Errorf("no QueueSchedulingContext for queue %s", gctx.Queue) + } + + // Check that the job is large enough for this executor. + if ok, unschedulableReason := RequestsAreLargeEnough(gctx.TotalResourceRequests, constraints.MinimumJobSize); !ok { + return false, unschedulableReason, nil + } + + // Global rate limiter check. + tokens := sctx.Limiter.TokensAt(sctx.Started) + if tokens <= 0 { + return false, GlobalRateLimitExceededUnschedulableReason, nil + } + if sctx.Limiter.Burst() < gctx.Cardinality() { + return false, GangExceedsGlobalBurstSizeUnschedulableReason, nil + } + if tokens < float64(gctx.Cardinality()) { + return false, GlobalRateLimitExceededByGangUnschedulableReason, nil + } + + // Per-queue rate limiter check. + tokens = qctx.Limiter.TokensAt(sctx.Started) + if tokens <= 0 { + return false, QueueRateLimitExceededUnschedulableReason, nil + } + if qctx.Limiter.Burst() < gctx.Cardinality() { + return false, GangExceedsQueueBurstSizeUnschedulableReason, nil + } + if tokens < float64(gctx.Cardinality()) { + return false, QueueRateLimitExceededByGangUnschedulableReason, nil } // PriorityClassSchedulingConstraintsByPriorityClassName check. - if priorityClassConstraint, ok := constraints.PriorityClassSchedulingConstraintsByPriorityClassName[priorityClassName]; ok { - if !qctx.AllocatedByPriorityClass[priorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) { - return false, UnschedulableReasonMaximumResourcesPerQueueExceeded, nil + if priorityClassConstraint, ok := constraints.PriorityClassSchedulingConstraintsByPriorityClassName[gctx.PriorityClassName]; ok { + if !qctx.AllocatedByPriorityClass[gctx.PriorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) { + return false, MaximumResourcesPerQueueExceededUnschedulableReason, nil } } return true, "", nil } -// ScaleQuantity scales q in-place by a factor f. -// This functions overflows for quantities the milli value of which can't be expressed as an int64. -// E.g., 1Pi is ok, but not 10Pi. -func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity { - q.SetMilli(int64(math.Round(float64(q.MilliValue()) * f))) - return q +func RequestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) { + for t, minQuantity := range minRequest.Resources { + q := totalResourceRequests.Get(t) + if minQuantity.Cmp(q) == 1 { + return false, fmt.Sprintf("job requests %s %s, but the minimum is %s", q.String(), t, minQuantity.String()) + } + } + return true, "" } diff --git a/internal/scheduler/constraints/constraints_test.go b/internal/scheduler/constraints/constraints_test.go index e387bf60ea7..081058191dc 100644 --- a/internal/scheduler/constraints/constraints_test.go +++ b/internal/scheduler/constraints/constraints_test.go @@ -21,12 +21,12 @@ func TestConstraints(t *testing.T) { }{} // TODO: Add tests. for name, tc := range tests { t.Run(name, func(t *testing.T) { - ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx) + ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx, tc.queue) require.NoError(t, err) require.Equal(t, tc.globalUnschedulableReason == "", ok) require.Equal(t, tc.globalUnschedulableReason, unschedulableReason) - ok, unschedulableReason, err = tc.constraints.CheckPerQueueAndPriorityClassConstraints(tc.sctx, tc.queue, tc.priorityClassName) + ok, unschedulableReason, err = tc.constraints.CheckConstraints(tc.sctx, nil) require.NoError(t, err) require.Equal(t, tc.perQueueAndPriorityClassUnschedulableReason == "", ok) require.Equal(t, tc.perQueueAndPriorityClassUnschedulableReason, unschedulableReason) diff --git a/internal/scheduler/context/context.go b/internal/scheduler/context/context.go index 30f2c87e6ec..29c12bbc374 100644 --- a/internal/scheduler/context/context.go +++ b/internal/scheduler/context/context.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "golang.org/x/time/rate" "github.com/armadaproject/armada/internal/armada/configuration" "github.com/armadaproject/armada/internal/common/armadaerrors" @@ -38,6 +39,9 @@ type SchedulingContext struct { DefaultPriorityClass string // Determines how fairness is computed. FairnessCostProvider fairness.FairnessCostProvider + // Limits job scheduling rate globally across all queues. + // Use the "Started" time to ensure limiter state remains constant within each scheduling round. + Limiter *rate.Limiter // Sum of queue weights across all queues. WeightSum float64 // Per-queue scheduling contexts. @@ -73,6 +77,7 @@ func NewSchedulingContext( priorityClasses map[string]types.PriorityClass, defaultPriorityClass string, fairnessCostProvider fairness.FairnessCostProvider, + limiter *rate.Limiter, totalResources schedulerobjects.ResourceList, ) *SchedulingContext { return &SchedulingContext{ @@ -82,6 +87,7 @@ func NewSchedulingContext( PriorityClasses: priorityClasses, DefaultPriorityClass: defaultPriorityClass, FairnessCostProvider: fairnessCostProvider, + Limiter: limiter, QueueSchedulingContexts: make(map[string]*QueueSchedulingContext), TotalResources: totalResources.DeepCopy(), ScheduledResources: schedulerobjects.NewResourceListWithDefaultSize(), @@ -110,7 +116,11 @@ func (sctx *SchedulingContext) ClearUnfeasibleSchedulingKeys() { sctx.UnfeasibleSchedulingKeys = make(map[schedulerobjects.SchedulingKey]*JobSchedulingContext) } -func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, weight float64, initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string]) error { +func (sctx *SchedulingContext) AddQueueSchedulingContext( + queue string, weight float64, + initialAllocatedByPriorityClass schedulerobjects.QuantityByTAndResourceType[string], + limiter *rate.Limiter, +) error { if _, ok := sctx.QueueSchedulingContexts[queue]; ok { return errors.WithStack(&armadaerrors.ErrInvalidArgument{ Name: "queue", @@ -134,6 +144,7 @@ func (sctx *SchedulingContext) AddQueueSchedulingContext(queue string, weight fl ExecutorId: sctx.ExecutorId, Queue: queue, Weight: weight, + Limiter: limiter, Allocated: allocated, AllocatedByPriorityClass: initialAllocatedByPriorityClass, ScheduledResourcesByPriorityClass: make(schedulerobjects.QuantityByTAndResourceType[string]), @@ -335,6 +346,9 @@ type QueueSchedulingContext struct { Queue string // Determines the fair share of this queue relative to other queues. Weight float64 + // Limits job scheduling rate for this queue. + // Use the "Started" time to ensure limiter state remains constant within each scheduling round. + Limiter *rate.Limiter // Total resources assigned to the queue across all clusters by priority class priority. // Includes jobs scheduled during this invocation of the scheduler. Allocated schedulerobjects.ResourceList @@ -560,6 +574,11 @@ func NewGangSchedulingContext(jctxs []*JobSchedulingContext) *GangSchedulingCont } } +// Cardinality returns the number of jobs in the gang. +func (gctx *GangSchedulingContext) Cardinality() int { + return len(gctx.JobSchedulingContexts) +} + func isEvictedJob(job interfaces.LegacySchedulerJob) bool { return job.GetAnnotations()[schedulerconfig.IsEvictedAnnotation] == "true" } diff --git a/internal/scheduler/context/context_test.go b/internal/scheduler/context/context_test.go index 3b932f30540..67fdabd483f 100644 --- a/internal/scheduler/context/context_test.go +++ b/internal/scheduler/context/context_test.go @@ -43,6 +43,7 @@ func TestSchedulingContextAccounting(t *testing.T) { testfixtures.TestPriorityClasses, testfixtures.TestDefaultPriorityClass, fairnessCostProvider, + nil, totalResources, ) priorityFactorByQueue := map[string]float64{"A": 1, "B": 1} @@ -52,7 +53,7 @@ func TestSchedulingContextAccounting(t *testing.T) { }, } for _, queue := range []string{"A", "B"} { - err := sctx.AddQueueSchedulingContext(queue, priorityFactorByQueue[queue], allocatedByQueueAndPriorityClass[queue]) + err := sctx.AddQueueSchedulingContext(queue, priorityFactorByQueue[queue], allocatedByQueueAndPriorityClass[queue], nil) require.NoError(t, err) } diff --git a/internal/scheduler/gang_scheduler.go b/internal/scheduler/gang_scheduler.go index f1a39e31ead..ffca7be9f8e 100644 --- a/internal/scheduler/gang_scheduler.go +++ b/internal/scheduler/gang_scheduler.go @@ -11,7 +11,6 @@ import ( schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/interfaces" "github.com/armadaproject/armada/internal/scheduler/nodedb" - "github.com/armadaproject/armada/internal/scheduler/schedulerobjects" ) // GangScheduler schedules one gang at a time. GangScheduler is not aware of queues. @@ -40,9 +39,12 @@ func (sch *GangScheduler) SkipUnsuccessfulSchedulingKeyCheck() { } func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.GangSchedulingContext) (ok bool, unschedulableReason string, err error) { - // Exit immediately if this is a new gang and we've hit any round limits. + // Exit immediately if this is a new gang and we've exceeded any round limits. + // + // Because this check occurs before adding the gctx to the sctx, + // the round limits can be exceeded by one gang. if !gctx.AllJobsEvicted { - if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext); err != nil || !ok { + if ok, unschedulableReason, err = sch.constraints.CheckRoundConstraints(sch.schedulingContext, gctx.Queue); err != nil || !ok { return } } @@ -55,6 +57,16 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G if err != nil { return } + + // Update rate-limiters to account for new successfully scheduled jobs. + if ok && !gctx.AllJobsEvicted { + sch.schedulingContext.Limiter.ReserveN(sch.schedulingContext.Started, gctx.Cardinality()) + if qctx := sch.schedulingContext.QueueSchedulingContexts[gctx.Queue]; qctx != nil { + qctx.Limiter.ReserveN(sch.schedulingContext.Started, gctx.Cardinality()) + } + } + + // Process unschedulable jobs. if !ok { // Register the job as unschedulable. If the job was added to the context, remove it first. if gangAddedToSchedulingContext { @@ -74,7 +86,7 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G // // Only record unfeasible scheduling keys for single-job gangs. // Since a gang may be unschedulable even if all its members are individually schedulable. - if !sch.skipUnsuccessfulSchedulingKeyCheck && len(gctx.JobSchedulingContexts) == 1 { + if !sch.skipUnsuccessfulSchedulingKeyCheck && gctx.Cardinality() == 1 { jctx := gctx.JobSchedulingContexts[0] schedulingKey := sch.schedulingContext.SchedulingKeyFromLegacySchedulerJob(jctx.Job) if _, ok := sch.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; !ok { @@ -84,24 +96,13 @@ func (sch *GangScheduler) Schedule(ctx context.Context, gctx *schedulercontext.G } } }() - - // Try scheduling the gang. if _, err = sch.schedulingContext.AddGangSchedulingContext(gctx); err != nil { return } gangAddedToSchedulingContext = true if !gctx.AllJobsEvicted { - // Check that the job is large enough for this executor. - // This check needs to be here, since it relates to a specific job. - // Only perform limit checks for new jobs to avoid preempting jobs if, e.g., MinimumJobSize changes. - if ok, unschedulableReason = requestsAreLargeEnough(gctx.TotalResourceRequests, sch.constraints.MinimumJobSize); !ok { - return - } - if ok, unschedulableReason, err = sch.constraints.CheckPerQueueAndPriorityClassConstraints( - sch.schedulingContext, - gctx.Queue, - gctx.PriorityClassName, - ); err != nil || !ok { + // Only perform these checks for new jobs to avoid preempting jobs if, e.g., MinimumJobSize changes. + if ok, unschedulableReason, err = sch.constraints.CheckConstraints(sch.schedulingContext, gctx); err != nil || !ok { return } } @@ -195,7 +196,7 @@ func (sch *GangScheduler) tryScheduleGangWithTxn(ctx context.Context, txn *memdb jctx.PodSchedulingContext.NodeId = "" } } - if len(gctx.JobSchedulingContexts) > 1 { + if gctx.Cardinality() > 1 { unschedulableReason = "at least one job in the gang does not fit on any node" } else { unschedulableReason = "job does not fit on any node" @@ -222,18 +223,5 @@ func meanScheduledAtPriorityFromGctx(gctx *schedulercontext.GangSchedulingContex } sum += jctx.PodSchedulingContext.ScheduledAtPriority } - return float64(sum) / float64(len(gctx.JobSchedulingContexts)), true -} - -func requestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) { - if len(minRequest.Resources) == 0 { - return true, "" - } - for t, minQuantity := range minRequest.Resources { - q := totalResourceRequests.Get(t) - if minQuantity.Cmp(q) == 1 { - return false, fmt.Sprintf("job requests %s %s, but the minimum is %s", q.String(), t, minQuantity.String()) - } - } - return true, "" + return float64(sum) / float64(gctx.Cardinality()), true } diff --git a/internal/scheduler/gang_scheduler_test.go b/internal/scheduler/gang_scheduler_test.go index bab895a0421..e5fbafad703 100644 --- a/internal/scheduler/gang_scheduler_test.go +++ b/internal/scheduler/gang_scheduler_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/armada/configuration" @@ -340,10 +341,22 @@ func TestGangScheduler(t *testing.T) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), + tc.SchedulingConfig.MaximumSchedulingBurst, + ), tc.TotalResources, ) for queue, priorityFactor := range priorityFactorByQueue { - err := sctx.AddQueueSchedulingContext(queue, priorityFactor, nil) + err := sctx.AddQueueSchedulingContext( + queue, + priorityFactor, + nil, + rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumPerQueueSchedulingRate), + tc.SchedulingConfig.MaximumPerQueueSchedulingBurst, + ), + ) require.NoError(t, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( diff --git a/internal/scheduler/pool_assigner.go b/internal/scheduler/pool_assigner.go index 66e2ff999f0..94aa07e4908 100644 --- a/internal/scheduler/pool_assigner.go +++ b/internal/scheduler/pool_assigner.go @@ -11,6 +11,7 @@ import ( "github.com/armadaproject/armada/internal/armada/configuration" "github.com/armadaproject/armada/internal/common/types" + "github.com/armadaproject/armada/internal/scheduler/constraints" schedulercontext "github.com/armadaproject/armada/internal/scheduler/context" "github.com/armadaproject/armada/internal/scheduler/database" "github.com/armadaproject/armada/internal/scheduler/jobdb" @@ -123,11 +124,12 @@ func (p *DefaultPoolAssigner) AssignPool(j *jobdb.Job) (string, error) { req := j.PodRequirements() req = p.clearAnnotations(req) - // Otherwise iterate through each pool and detect the first one the job is potentially schedulable on + // Otherwise iterate through each pool and detect the first one the job is potentially schedulable on. + // TODO: We should use the real scheduler instead since this check may go out of sync with the scheduler. for pool, executors := range p.executorsByPool { for _, e := range executors { requests := req.GetResourceRequirements().Requests - if ok, _ := requestsAreLargeEnough(schedulerobjects.ResourceListFromV1ResourceList(requests), e.minimumJobSize); !ok { + if ok, _ := constraints.RequestsAreLargeEnough(schedulerobjects.ResourceListFromV1ResourceList(requests), e.minimumJobSize); !ok { continue } nodeDb := e.nodeDb diff --git a/internal/scheduler/preempting_queue_scheduler_test.go b/internal/scheduler/preempting_queue_scheduler_test.go index 923d77296bc..dc84cd225ae 100644 --- a/internal/scheduler/preempting_queue_scheduler_test.go +++ b/internal/scheduler/preempting_queue_scheduler_test.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "testing" + "time" "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus" "github.com/sirupsen/logrus" @@ -12,6 +13,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/armada/configuration" @@ -625,8 +627,8 @@ func TestPreemptingQueueScheduler(t *testing.T) { "B": 1, }, }, - "rescheduled jobs don't count towards maxJobsToSchedule": { - SchedulingConfig: testfixtures.WithMaxJobsToScheduleConfig(5, testfixtures.TestSchedulingConfig()), + "rescheduled jobs don't count towards global scheduling rate limit": { + SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(2, 5, testfixtures.TestSchedulingConfig()), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), Rounds: []SchedulingRound{ { @@ -642,7 +644,7 @@ func TestPreemptingQueueScheduler(t *testing.T) { "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), }, ExpectedScheduledIndices: map[string][]int{ - "A": testfixtures.IntRange(0, 4), + "A": testfixtures.IntRange(0, 1), }, }, }, @@ -650,6 +652,42 @@ func TestPreemptingQueueScheduler(t *testing.T) { "A": 1, }, }, + "MaximumSchedulingRate": { + SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(2, 4, testfixtures.TestSchedulingConfig()), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Rounds: []SchedulingRound{ + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.WithGangAnnotationsJobs(testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 6)), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 3), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 1), + }, + }, + { + JobsByQueue: map[string][]*jobdb.Job{ + "A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 10), + }, + ExpectedScheduledIndices: map[string][]int{ + "A": testfixtures.IntRange(0, 1), + }, + }, + }, + PriorityFactorByQueue: map[string]float64{"A": 1}, + }, "rescheduled jobs don't count towards maxQueueLookback": { SchedulingConfig: testfixtures.WithMaxLookbackPerQueueConfig(5, testfixtures.TestSchedulingConfig()), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), @@ -1300,6 +1338,22 @@ func TestPreemptingQueueScheduler(t *testing.T) { var jobIdsByGangId map[string]map[string]bool var gangIdByJobId map[string]string + // Scheduling rate-limiters persist between rounds. + // We control the rate at which time passes between scheduling rounds. + schedulingStarted := time.Now() + schedulingInterval := time.Second + limiter := rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), + tc.SchedulingConfig.MaximumSchedulingBurst, + ) + limiterByQueue := make(map[string]*rate.Limiter) + for queue := range tc.PriorityFactorByQueue { + limiterByQueue[queue] = rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumPerQueueSchedulingRate), + tc.SchedulingConfig.MaximumPerQueueSchedulingBurst, + ) + } + // Run the scheduler. log := logrus.NewEntry(logrus.New()) for i, round := range tc.Rounds { @@ -1367,12 +1421,21 @@ func TestPreemptingQueueScheduler(t *testing.T) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + limiter, tc.TotalResources, ) + sctx.Started = schedulingStarted.Add(time.Duration(i) * schedulingInterval) + for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor - err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue]) + err := sctx.AddQueueSchedulingContext( + queue, + weight, + allocatedByQueueAndPriorityClass[queue], + limiterByQueue[queue], + ) require.NoError(t, err) + } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( "pool", @@ -1645,11 +1708,12 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + nil, nodeDb.TotalResources(), ) for queue, priorityFactor := range priorityFactorByQueue { weight := 1 / priorityFactor - err := sctx.AddQueueSchedulingContext(queue, weight, make(schedulerobjects.QuantityByTAndResourceType[string])) + err := sctx.AddQueueSchedulingContext(queue, weight, make(schedulerobjects.QuantityByTAndResourceType[string]), nil) require.NoError(b, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( @@ -1706,11 +1770,12 @@ func BenchmarkPreemptingQueueScheduler(b *testing.B) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + nil, nodeDb.TotalResources(), ) for queue, priorityFactor := range priorityFactorByQueue { weight := 1 / priorityFactor - err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue]) + err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClass[queue], nil) require.NoError(b, err) } sch := NewPreemptingQueueScheduler( diff --git a/internal/scheduler/queue_scheduler.go b/internal/scheduler/queue_scheduler.go index f262f924b8b..825c9f26bfb 100644 --- a/internal/scheduler/queue_scheduler.go +++ b/internal/scheduler/queue_scheduler.go @@ -74,7 +74,7 @@ func (sch *QueueScheduler) Schedule(ctx context.Context) (*SchedulerResult, erro if gctx == nil { break } - if len(gctx.JobSchedulingContexts) == 0 { + if gctx.Cardinality() == 0 { if err := sch.candidateGangIterator.Clear(); err != nil { return nil, err } @@ -102,6 +102,10 @@ func (sch *QueueScheduler) Schedule(ctx context.Context) (*SchedulerResult, erro // If unschedulableReason indicates no more new jobs can be scheduled, // instruct the underlying iterator to only yield evicted jobs from now on. sch.candidateGangIterator.OnlyYieldEvicted() + } else if schedulerconstraints.IsTerminalQueueUnschedulableReason(unschedulableReason) { + // If unschedulableReason indicates no more new jobs can be scheduled for this queue, + // instruct the underlying iterator to only yield evicted jobs for this queue from now on. + sch.candidateGangIterator.OnlyYieldEvictedForQueue(gctx.Queue) } // Clear() to get the next gang in order of smallest fair share. // Calling clear here ensures the gang scheduled in this iteration is accounted for. @@ -197,6 +201,7 @@ func (it *QueuedGangIterator) Peek() (*schedulercontext.GangSchedulingContext, e if len(it.schedulingContext.UnfeasibleSchedulingKeys) > 0 { schedulingKey := it.schedulingContext.SchedulingKeyFromLegacySchedulerJob(job) if unsuccessfulJctx, ok := it.schedulingContext.UnfeasibleSchedulingKeys[schedulingKey]; ok { + // TODO: For performance, we should avoid creating new objects and instead reference the existing one. jctx := &schedulercontext.JobSchedulingContext{ Created: time.Now(), JobId: job.GetId(), @@ -254,10 +259,13 @@ func (it *QueuedGangIterator) hitLookbackLimit() bool { // Specifically, it yields the next gang in the queue with smallest fraction of its fair share, // where the fraction of fair share computation includes the yielded gang. type CandidateGangIterator struct { - queueProvier fairness.QueueRepository + queueRepository fairness.QueueRepository fairnessCostProvider fairness.FairnessCostProvider // If true, this iterator only yields gangs where all jobs are evicted. onlyYieldEvicted bool + // If, e.g., onlyYieldEvictedByQueue["A"] is true, + // this iterator only yields gangs where all jobs are evicted for queue A. + onlyYieldEvictedByQueue map[string]bool // Reusable buffer to avoid allocations. buffer schedulerobjects.ResourceList // Priority queue containing per-queue iterators. @@ -266,15 +274,16 @@ type CandidateGangIterator struct { } func NewCandidateGangIterator( - queueProvier fairness.QueueRepository, + queueRepository fairness.QueueRepository, fairnessCostProvider fairness.FairnessCostProvider, iteratorsByQueue map[string]*QueuedGangIterator, ) (*CandidateGangIterator, error) { it := &CandidateGangIterator{ - queueProvier: queueProvier, - fairnessCostProvider: fairnessCostProvider, - buffer: schedulerobjects.NewResourceListWithDefaultSize(), - pq: make(QueueCandidateGangIteratorPQ, 0, len(iteratorsByQueue)), + queueRepository: queueRepository, + fairnessCostProvider: fairnessCostProvider, + onlyYieldEvictedByQueue: make(map[string]bool), + buffer: schedulerobjects.NewResourceListWithDefaultSize(), + pq: make(QueueCandidateGangIteratorPQ, 0, len(iteratorsByQueue)), } for queue, queueIt := range iteratorsByQueue { if _, err := it.updateAndPushPQItem(it.newPQItem(queue, queueIt)); err != nil { @@ -288,6 +297,48 @@ func (it *CandidateGangIterator) OnlyYieldEvicted() { it.onlyYieldEvicted = true } +func (it *CandidateGangIterator) OnlyYieldEvictedForQueue(queue string) { + it.onlyYieldEvictedByQueue[queue] = true +} + +// Clear removes the first item in the iterator. +// If it.onlyYieldEvicted is true, any consecutive non-evicted jobs are also removed. +func (it *CandidateGangIterator) Clear() error { + if len(it.pq) == 0 { + return nil + } + item := heap.Pop(&it.pq).(*QueueCandidateGangIteratorItem) + if err := item.it.Clear(); err != nil { + return err + } + if _, err := it.updateAndPushPQItem(item); err != nil { + return err + } + + // If set to only yield evicted gangs, drop any queues for which the next gang is non-evicted here. + // We assume here that all evicted jobs appear before non-evicted jobs in the queue. + // Hence, it's safe to drop a queue if the first job is non-evicted. + if it.onlyYieldEvicted { + for len(it.pq) > 0 && !it.pq[0].gctx.AllJobsEvicted { + heap.Pop(&it.pq) + } + } else { + // Same check as above on a per-queue basis. + for len(it.pq) > 0 && it.onlyYieldEvictedByQueue[it.pq[0].gctx.Queue] && !it.pq[0].gctx.AllJobsEvicted { + heap.Pop(&it.pq) + } + } + return nil +} + +func (it *CandidateGangIterator) Peek() (*schedulercontext.GangSchedulingContext, error) { + if len(it.pq) == 0 { + // No queued jobs left. + return nil, nil + } + return it.pq[0].gctx, nil +} + func (it *CandidateGangIterator) newPQItem(queue string, queueIt *QueuedGangIterator) *QueueCandidateGangIteratorItem { return &QueueCandidateGangIteratorItem{ queue: queue, @@ -303,8 +354,9 @@ func (it *CandidateGangIterator) updateAndPushPQItem(item *QueueCandidateGangIte return false, nil } if it.onlyYieldEvicted && !item.gctx.AllJobsEvicted { - // We assume here that all evicted jobs appear before non-evicted jobs in the queue. - // Hence, it's safe to drop a queue once a non-evicted job has been seen. + return false, nil + } + if it.onlyYieldEvictedByQueue[item.gctx.Queue] && !item.gctx.AllJobsEvicted { return false, nil } heap.Push(&it.pq, item) @@ -335,7 +387,7 @@ func (it *CandidateGangIterator) updatePQItem(item *QueueCandidateGangIteratorIt // queueCostWithGctx returns the cost associated with a queue if gctx were to be scheduled. func (it *CandidateGangIterator) queueCostWithGctx(gctx *schedulercontext.GangSchedulingContext) (float64, error) { - queue, ok := it.queueProvier.GetQueue(gctx.Queue) + queue, ok := it.queueRepository.GetQueue(gctx.Queue) if !ok { return 0, errors.Errorf("unknown queue %s", gctx.Queue) } @@ -345,33 +397,6 @@ func (it *CandidateGangIterator) queueCostWithGctx(gctx *schedulercontext.GangSc return it.fairnessCostProvider.CostFromAllocationAndWeight(it.buffer, queue.GetWeight()), nil } -// Clear removes the first item in the iterator. -// If it.onlyYieldEvicted is true, any consecutive non-evicted jobs are also removed. -func (it *CandidateGangIterator) Clear() error { - if len(it.pq) == 0 { - return nil - } - item := heap.Pop(&it.pq).(*QueueCandidateGangIteratorItem) - if err := item.it.Clear(); err != nil { - return err - } - if _, err := it.updateAndPushPQItem(item); err != nil { - return err - } - for len(it.pq) > 0 && it.onlyYieldEvicted && !it.pq[0].gctx.AllJobsEvicted { - heap.Pop(&it.pq) - } - return nil -} - -func (it *CandidateGangIterator) Peek() (*schedulercontext.GangSchedulingContext, error) { - if len(it.pq) == 0 { - // No queued jobs left. - return nil, nil - } - return it.pq[0].gctx, nil -} - // Priority queue used by CandidateGangIterator to determine from which queue to schedule the next job. type QueueCandidateGangIteratorPQ []*QueueCandidateGangIteratorItem diff --git a/internal/scheduler/queue_scheduler_test.go b/internal/scheduler/queue_scheduler_test.go index 979ec21df38..cbbc537e495 100644 --- a/internal/scheduler/queue_scheduler_test.go +++ b/internal/scheduler/queue_scheduler_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/require" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/api/resource" "github.com/armadaproject/armada/internal/armada/configuration" @@ -87,44 +88,56 @@ func TestQueueScheduler(t *testing.T) { PriorityFactorByQueue: map[string]float64{"A": 1}, ExpectedScheduledIndices: []int{0, 11}, }, - "MaximumJobsToSchedule": { - SchedulingConfig: testfixtures.WithMaxJobsToScheduleConfig(2, testfixtures.TestSchedulingConfig()), + "MaximumSchedulingBurst": { + SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Jobs: armadaslices.Concatenate( + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), + testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 10), + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 4), + ), + PriorityFactorByQueue: map[string]float64{"A": 1}, + ExpectedScheduledIndices: []int{0, 11}, + }, + "MaximumPerQueueSchedulingBurst": { + SchedulingConfig: testfixtures.WithPerQueueSchedulingLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 10), testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 3), + testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass0, 1), ), - PriorityFactorByQueue: map[string]float64{"A": 1}, - ExpectedScheduledIndices: []int{0, 11}, - ExpectedNeverAttemptedIndices: []int{12, 13}, + PriorityFactorByQueue: map[string]float64{"A": 1, "B": 1}, + ExpectedScheduledIndices: []int{0, 11, 14}, }, - "MaximumGangsToSchedule": { - SchedulingConfig: testfixtures.WithMaxGangsToScheduleConfig(2, testfixtures.TestSchedulingConfig()), + "MaximumSchedulingBurst is not exceeded by gangs": { + SchedulingConfig: testfixtures.WithGlobalSchedulingRateLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), Jobs: armadaslices.Concatenate( + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), + testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.WithGangAnnotationsJobs( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), ), - testfixtures.WithGangAnnotationsJobs( - testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 2), - ), - testfixtures.WithGangAnnotationsJobs( - testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 2), - ), - testfixtures.WithGangAnnotationsJobs( - testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), - ), - testfixtures.WithGangAnnotationsJobs( - testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), - ), + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), + ), + PriorityFactorByQueue: map[string]float64{"A": 1}, + ExpectedScheduledIndices: []int{0, 4}, + }, + "MaximumPerQueueSchedulingBurst is not exceeded by gangs": { + SchedulingConfig: testfixtures.WithPerQueueSchedulingLimiterConfig(10, 2, testfixtures.TestSchedulingConfig()), + Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities), + Jobs: armadaslices.Concatenate( + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 1), + testfixtures.N32Cpu256GiJobs("A", testfixtures.PriorityClass0, 1), testfixtures.WithGangAnnotationsJobs( testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), ), + testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass0, 2), ), - PriorityFactorByQueue: map[string]float64{"A": 1}, - ExpectedScheduledIndices: []int{0, 1, 6, 7}, - ExpectedNeverAttemptedIndices: []int{8, 9, 10, 11}, + PriorityFactorByQueue: map[string]float64{"A": 1}, + ExpectedScheduledIndices: []int{0, 4}, }, "MaximumResourceFractionToSchedule": { SchedulingConfig: testfixtures.WithRoundLimitsConfig( @@ -473,11 +486,22 @@ func TestQueueScheduler(t *testing.T) { tc.SchedulingConfig.Preemption.PriorityClasses, tc.SchedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumSchedulingRate), + tc.SchedulingConfig.MaximumSchedulingBurst, + ), tc.TotalResources, ) for queue, priorityFactor := range tc.PriorityFactorByQueue { weight := 1 / priorityFactor - err := sctx.AddQueueSchedulingContext(queue, weight, tc.InitialAllocatedByQueueAndPriorityClass[queue]) + err := sctx.AddQueueSchedulingContext( + queue, weight, + tc.InitialAllocatedByQueueAndPriorityClass[queue], + rate.NewLimiter( + rate.Limit(tc.SchedulingConfig.MaximumPerQueueSchedulingRate), + tc.SchedulingConfig.MaximumPerQueueSchedulingBurst, + ), + ) require.NoError(t, err) } constraints := schedulerconstraints.SchedulingConstraintsFromSchedulingConfig( diff --git a/internal/scheduler/reports_test.go b/internal/scheduler/reports_test.go index fc96cc1b25e..b3c8f568d38 100644 --- a/internal/scheduler/reports_test.go +++ b/internal/scheduler/reports_test.go @@ -246,7 +246,7 @@ func withSuccessfulJobSchedulingContext(sctx *schedulercontext.SchedulingContext } qctx := sctx.QueueSchedulingContexts[queue] if qctx == nil { - if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string])); err != nil { + if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string]), nil); err != nil { panic(err) } qctx = sctx.QueueSchedulingContexts[queue] @@ -266,7 +266,7 @@ func withPreemptingJobSchedulingContext(sctx *schedulercontext.SchedulingContext } qctx := sctx.QueueSchedulingContexts[queue] if qctx == nil { - if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string])); err != nil { + if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string]), nil); err != nil { panic(err) } qctx = sctx.QueueSchedulingContexts[queue] @@ -286,7 +286,7 @@ func withUnsuccessfulJobSchedulingContext(sctx *schedulercontext.SchedulingConte } qctx := sctx.QueueSchedulingContexts[queue] if qctx == nil { - if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string])); err != nil { + if err := sctx.AddQueueSchedulingContext(queue, 1.0, make(schedulerobjects.QuantityByTAndResourceType[string]), nil); err != nil { panic(err) } qctx = sctx.QueueSchedulingContexts[queue] @@ -304,6 +304,7 @@ func testSchedulingContext(executorId string) *schedulercontext.SchedulingContex nil, "", nil, + nil, schedulerobjects.ResourceList{}, ) sctx.Started = time.Time{} diff --git a/internal/scheduler/scheduling_algo.go b/internal/scheduler/scheduling_algo.go index 2c8f26fdd65..11f6b667f96 100644 --- a/internal/scheduler/scheduling_algo.go +++ b/internal/scheduler/scheduling_algo.go @@ -12,6 +12,7 @@ import ( "github.com/sirupsen/logrus" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/util/clock" "github.com/armadaproject/armada/internal/armada/configuration" @@ -42,7 +43,12 @@ type FairSchedulingAlgo struct { executorRepository database.ExecutorRepository queueRepository database.QueueRepository schedulingContextRepository *SchedulingContextRepository - maxSchedulingDuration time.Duration + // Global job scheduling rate-limiter. + limiter *rate.Limiter + // Per-queue job scheduling rate-limiters. + limiterByQueue map[string]*rate.Limiter + // Max amount of time each scheduling round is allowed to take. + maxSchedulingDuration time.Duration // Order in which to schedule executor groups. // Executors are grouped by either id (i.e., individually) or by pool. executorGroupsToSchedule []string @@ -68,6 +74,8 @@ func NewFairSchedulingAlgo( executorRepository: executorRepository, queueRepository: queueRepository, schedulingContextRepository: schedulingContextRepository, + limiter: rate.NewLimiter(rate.Limit(config.MaximumSchedulingRate), config.MaximumSchedulingBurst), + limiterByQueue: make(map[string]*rate.Limiter), maxSchedulingDuration: maxSchedulingDuration, rand: util.NewThreadsafeRand(time.Now().UnixNano()), clock: clock.RealClock{}, @@ -372,6 +380,7 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( l.schedulingConfig.Preemption.PriorityClasses, l.schedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + l.limiter, totalResources, ) for queue, priorityFactor := range fsctx.priorityFactorByQueue { @@ -387,7 +396,16 @@ func (l *FairSchedulingAlgo) scheduleOnExecutors( if priorityFactor > 0 { weight = 1 / priorityFactor } - if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByPriorityClass); err != nil { + queueLimiter, ok := l.limiterByQueue[queue] + if !ok { + // Create per-queue limiters lazily. + queueLimiter = rate.NewLimiter( + rate.Limit(l.schedulingConfig.MaximumPerQueueSchedulingRate), + l.schedulingConfig.MaximumPerQueueSchedulingBurst, + ) + l.limiterByQueue[queue] = queueLimiter + } + if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByPriorityClass, queueLimiter); err != nil { return nil, nil, err } } diff --git a/internal/scheduler/simulator/simulator.go b/internal/scheduler/simulator/simulator.go index 97666a7a9fc..639f617c275 100644 --- a/internal/scheduler/simulator/simulator.go +++ b/internal/scheduler/simulator/simulator.go @@ -20,6 +20,7 @@ import ( "github.com/spf13/viper" "golang.org/x/exp/maps" "golang.org/x/exp/slices" + "golang.org/x/time/rate" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/yaml" @@ -69,6 +70,11 @@ type Simulator struct { eventLog EventLog // Simulated events are emitted on this channel in order. c chan *armadaevents.EventSequence + + // Global job scheduling rate-limiter. + limiter *rate.Limiter + // Per-queue job scheduling rate-limiters. + limiterByQueue map[string]*rate.Limiter } func NewSimulator(testCase *TestCase, schedulingConfig configuration.SchedulingConfig) (*Simulator, error) { @@ -143,6 +149,11 @@ func NewSimulator(testCase *TestCase, schedulingConfig configuration.SchedulingC allocationByPoolAndQueueAndPriorityClass: make(map[string]map[string]schedulerobjects.QuantityByTAndResourceType[string]), totalResourcesByPool: totalResourcesByPool, c: make(chan *armadaevents.EventSequence), + limiter: rate.NewLimiter( + rate.Limit(schedulingConfig.MaximumSchedulingRate), + schedulingConfig.MaximumSchedulingBurst, + ), + limiterByQueue: make(map[string]*rate.Limiter), } // Mark all jobTemplates as active. @@ -415,13 +426,24 @@ func (s *Simulator) handleScheduleEvent() error { s.schedulingConfig.Preemption.PriorityClasses, s.schedulingConfig.Preemption.DefaultPriorityClass, fairnessCostProvider, + s.limiter, totalResources, ) + sctx.Started = s.time for _, queue := range s.testCase.Queues { + limiter, ok := s.limiterByQueue[queue.Name] + if !ok { + limiter = rate.NewLimiter( + rate.Limit(s.schedulingConfig.MaximumPerQueueSchedulingRate), + s.schedulingConfig.MaximumPerQueueSchedulingBurst, + ) + s.limiterByQueue[queue.Name] = limiter + } err := sctx.AddQueueSchedulingContext( queue.Name, queue.Weight, s.allocationByPoolAndQueueAndPriorityClass[pool.Name][queue.Name], + limiter, ) if err != nil { return err diff --git a/internal/scheduler/testfixtures/testfixtures.go b/internal/scheduler/testfixtures/testfixtures.go index 7c6e01f39c4..0acda7d60a6 100644 --- a/internal/scheduler/testfixtures/testfixtures.go +++ b/internal/scheduler/testfixtures/testfixtures.go @@ -95,12 +95,16 @@ func TestSchedulingConfig() configuration.SchedulingConfig { NodeEvictionProbability: 1.0, NodeOversubscriptionEvictionProbability: 1.0, }, - IndexedResources: TestResources, - IndexedNodeLabels: TestIndexedNodeLabels, + MaximumSchedulingRate: math.Inf(1), + MaximumSchedulingBurst: math.MaxInt, + MaximumPerQueueSchedulingRate: math.Inf(1), + MaximumPerQueueSchedulingBurst: math.MaxInt, + IndexedResources: TestResources, + IndexedNodeLabels: TestIndexedNodeLabels, DominantResourceFairnessResourcesToConsider: TestResourceNames, - ExecutorTimeout: 15 * time.Minute, - MaxUnacknowledgedJobsPerExecutor: math.MaxInt, - EnableNewPreemptionStrategy: true, + ExecutorTimeout: 15 * time.Minute, + MaxUnacknowledgedJobsPerExecutor: math.MaxInt, + EnableNewPreemptionStrategy: true, } } @@ -166,18 +170,19 @@ func WithIndexedResourcesConfig(indexResources []configuration.IndexedResource, return config } -func WithMaxJobsToScheduleConfig(n uint, config configuration.SchedulingConfig) configuration.SchedulingConfig { - config.MaximumJobsToSchedule = n +func WithGlobalSchedulingRateLimiterConfig(maximumSchedulingRate float64, maximumSchedulingBurst int, config configuration.SchedulingConfig) configuration.SchedulingConfig { + config.MaximumSchedulingRate = maximumSchedulingRate + config.MaximumSchedulingBurst = maximumSchedulingBurst return config } -func WithMaxGangsToScheduleConfig(n uint, config configuration.SchedulingConfig) configuration.SchedulingConfig { - config.MaximumGangsToSchedule = n +func WithPerQueueSchedulingLimiterConfig(maximumPerQueueSchedulingRate float64, maximumPerQueueSchedulingBurst int, config configuration.SchedulingConfig) configuration.SchedulingConfig { + config.MaximumPerQueueSchedulingRate = maximumPerQueueSchedulingRate + config.MaximumPerQueueSchedulingBurst = maximumPerQueueSchedulingBurst return config } func WithMaxLookbackPerQueueConfig(n uint, config configuration.SchedulingConfig) configuration.SchedulingConfig { - // For legacy reasons, it's called QueueLeaseBatchSize in config. config.MaxQueueLookback = n return config }