Skip to content

Commit

Permalink
Add per-queue scheduling rate-limiting (#2938)
Browse files Browse the repository at this point in the history
* Initial commit

* Add rate limiters

* go mod tidy

* Updates

* Add tests

* Update default config

* Update default scheduler config

* Whitespace

* Cleanup

* Docstring improvements

* Remove limiter nil checks

* Add Cardinality() function on gctx

* Fix test

* Fix test
  • Loading branch information
severinson authored Sep 8, 2023
1 parent 33e9377 commit 293c58c
Show file tree
Hide file tree
Showing 19 changed files with 448 additions and 175 deletions.
4 changes: 4 additions & 0 deletions config/armada/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ scheduling:
maximumResourceFractionToSchedule:
memory: 1.0
cpu: 1.0
maximumSchedulingRate: 100.0
maximumSchedulingBurst: 1000
maximumPerQueueSchedulingRate: 50.0
maximumPerQueueSchedulingBurst: 1000
maxJobSchedulingContextsPerExecutor: 10000
lease:
expireAfter: 15m
Expand Down
5 changes: 4 additions & 1 deletion config/scheduler/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ scheduling:
maximumResourceFractionToSchedule:
memory: 1.0
cpu: 1.0
maximumJobsToSchedule: 5000
maximumSchedulingRate: 100.0
maximumSchedulingBurst: 1000
maximumPerQueueSchedulingRate: 50.0
maximumPerQueueSchedulingBurst: 1000
maxUnacknowledgedJobsPerExecutor: 2500
maxJobSchedulingContextsPerExecutor: 10000
defaultJobLimits:
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ require (
github.com/prometheus/common v0.37.0
github.com/sanity-io/litter v1.5.5
github.com/segmentio/fasthash v1.0.3
golang.org/x/time v0.3.0
google.golang.org/genproto/googleapis/api v0.0.0-20230525234035-dd9d682886f9
)

Expand Down Expand Up @@ -195,7 +196,6 @@ require (
golang.org/x/sys v0.7.0 // indirect
golang.org/x/term v0.7.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230525234030-28d5490b6b19 // indirect
google.golang.org/protobuf v1.30.0 // indirect
Expand Down
31 changes: 27 additions & 4 deletions internal/armada/configuration/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,33 @@ type SchedulingConfig struct {
MaximumResourceFractionToSchedule map[string]float64
// Overrides MaximalClusterFractionToSchedule if set for the current pool.
MaximumResourceFractionToScheduleByPool map[string]map[string]float64
// Max number of jobs to schedule in each invocation of the scheduler.
MaximumJobsToSchedule uint
// Max number of gangs to schedule in each invocation of the scheduler.
MaximumGangsToSchedule uint
// The rate at which Armada schedules jobs is rate-limited using a token bucket approach.
// Specifically, there is a token bucket that persists between scheduling rounds.
// The bucket fills up at a rate of MaximumSchedulingRate tokens per second and has capacity MaximumSchedulingBurst.
// A token is removed from the bucket when a scheduling a job and scheduling stops while the bucket is empty.
//
// Hence, MaximumSchedulingRate controls the maximum number of jobs scheduled per second in steady-state,
// i.e., once the burst capacity has been exhausted.
//
// Rate-limiting is based on the number of tokens available at the start of each scheduling round,
// i.e., tokens accumulated while scheduling become available at the start of the next scheduling round.
//
// For more information about the rate-limiter, see:
// https://pkg.go.dev/golang.org/x/time/rate#Limiter
MaximumSchedulingRate float64 `validate:"gt=0"`
// MaximumSchedulingBurst controls the burst capacity of the rate-limiter.
//
// There are two important implications:
// - Armada will never schedule more than MaximumSchedulingBurst jobs per scheduling round.
// - Gang jobs with cardinality greater than MaximumSchedulingBurst can never be scheduled.
MaximumSchedulingBurst int `validate:"gt=0"`
// In addition to the global rate-limiter, there is a separate rate-limiter for each queue.
// These work the same as the global rate-limiter, except they apply only to jobs scheduled from a specific queue.
//
// Per-queue version of MaximumSchedulingRate.
MaximumPerQueueSchedulingRate float64 `validate:"gt=0"`
// Per-queue version of MaximumSchedulingBurst.
MaximumPerQueueSchedulingBurst int `validate:"gt=0"`
// Armada stores contexts associated with recent job scheduling attempts.
// This setting limits the number of such contexts to store.
// Contexts associated with the most recent scheduling attempt for each queue and cluster are always stored.
Expand Down
35 changes: 27 additions & 8 deletions internal/armada/server/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
log "github.com/sirupsen/logrus"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/utils/clock"
Expand Down Expand Up @@ -59,6 +60,10 @@ type AggregatedQueueServer struct {
schedulingInfoRepository repository.SchedulingInfoRepository
decompressorPool *pool.ObjectPool
clock clock.Clock
// Global job scheduling rate-limiter.
limiter *rate.Limiter
// Per-queue job scheduling rate-limiters.
limiterByQueue map[string]*rate.Limiter
// For storing reports of scheduling attempts.
SchedulingContextRepository *scheduler.SchedulingContextRepository
// Stores the most recent NodeDb for each executor.
Expand Down Expand Up @@ -92,18 +97,22 @@ func NewAggregatedQueueServer(
TimeBetweenEvictionRuns: 0,
NumTestsPerEvictionRun: 10,
}

decompressorPool := pool.NewObjectPool(context.Background(), pool.NewPooledObjectFactorySimple(
func(context.Context) (interface{}, error) {
return compress.NewZlibDecompressor(), nil
}), &poolConfig)
return &AggregatedQueueServer{
permissions: permissions,
schedulingConfig: schedulingConfig,
jobRepository: jobRepository,
queueRepository: queueRepository,
usageRepository: usageRepository,
eventStore: eventStore,
permissions: permissions,
schedulingConfig: schedulingConfig,
jobRepository: jobRepository,
queueRepository: queueRepository,
usageRepository: usageRepository,
eventStore: eventStore,
limiter: rate.NewLimiter(
rate.Limit(schedulingConfig.MaximumSchedulingRate),
schedulingConfig.MaximumSchedulingBurst,
),
limiterByQueue: make(map[string]*rate.Limiter),
schedulingInfoRepository: schedulingInfoRepository,
decompressorPool: decompressorPool,
executorRepository: executorRepository,
Expand Down Expand Up @@ -491,6 +500,7 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
q.schedulingConfig.Preemption.PriorityClasses,
q.schedulingConfig.Preemption.DefaultPriorityClass,
fairnessCostProvider,
q.limiter,
totalResources,
)
for queue, priorityFactor := range priorityFactorByQueue {
Expand All @@ -502,7 +512,16 @@ func (q *AggregatedQueueServer) getJobs(ctx context.Context, req *api.StreamingL
if priorityFactor > 0 {
weight = 1 / priorityFactor
}
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue]); err != nil {
queueLimiter, ok := q.limiterByQueue[queue]
if !ok {
// Create per-queue limiters lazily.
queueLimiter = rate.NewLimiter(
rate.Limit(q.schedulingConfig.MaximumPerQueueSchedulingRate),
q.schedulingConfig.MaximumPerQueueSchedulingBurst,
)
q.limiterByQueue[queue] = queueLimiter
}
if err := sctx.AddQueueSchedulingContext(queue, weight, allocatedByQueueAndPriorityClassForPool[queue], queueLimiter); err != nil {
return nil, err
}
}
Expand Down
123 changes: 82 additions & 41 deletions internal/scheduler/constraints/constraints.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package constraints

import (
"fmt"
"math"

"github.com/pkg/errors"
Expand All @@ -12,32 +13,46 @@ import (
)

const (
UnschedulableReasonMaximumResourcesScheduled = "maximum resources scheduled"
UnschedulableReasonMaximumNumberOfJobsScheduled = "maximum number of jobs scheduled"
UnschedulableReasonMaximumNumberOfGangsScheduled = "maximum number of gangs scheduled"
UnschedulableReasonMaximumResourcesPerQueueExceeded = "maximum total resources for this queue exceeded"
// Indicates that the limit on resources scheduled per round has been exceeded.
MaximumResourcesScheduledUnschedulableReason = "maximum resources scheduled"

// Indicates that a queue has been assigned more than its allowed amount of resources.
MaximumResourcesPerQueueExceededUnschedulableReason = "maximum total resources for this queue exceeded"

// Indicates that the scheduling rate limit has been exceeded.
GlobalRateLimitExceededUnschedulableReason = "global scheduling rate limit exceeded"
QueueRateLimitExceededUnschedulableReason = "queue scheduling rate limit exceeded"

// Indicates that scheduling a gang would exceed the rate limit.
GlobalRateLimitExceededByGangUnschedulableReason = "gang would exceed global scheduling rate limit"
QueueRateLimitExceededByGangUnschedulableReason = "gang would exceed queue scheduling rate limit"

// Indicates that the number of jobs in a gang exceeds the burst size.
// This means the gang can not be scheduled without first increasing the burst size.
GangExceedsGlobalBurstSizeUnschedulableReason = "gang cardinality too large: exceeds global max burst size"
GangExceedsQueueBurstSizeUnschedulableReason = "gang cardinality too large: exceeds queue max burst size"
)

// IsTerminalUnschedulableReason returns true if reason indicates it's not possible to schedule any more jobs in this round.
// IsTerminalUnschedulableReason returns true if reason indicates
// it's not possible to schedule any more jobs in this round.
func IsTerminalUnschedulableReason(reason string) bool {
if reason == UnschedulableReasonMaximumResourcesScheduled {
if reason == MaximumResourcesScheduledUnschedulableReason {
return true
}
if reason == UnschedulableReasonMaximumNumberOfJobsScheduled {
return true
}
if reason == UnschedulableReasonMaximumNumberOfGangsScheduled {
if reason == GlobalRateLimitExceededUnschedulableReason {
return true
}
return false
}

// IsTerminalQueueUnschedulableReason returns true if reason indicates
// it's not possible to schedule any more jobs from this queue in this round.
func IsTerminalQueueUnschedulableReason(reason string) bool {
return reason == QueueRateLimitExceededUnschedulableReason
}

// SchedulingConstraints contains scheduling constraints, e.g., per-queue resource limits.
type SchedulingConstraints struct {
// Max number of jobs to scheduler per lease jobs call.
MaximumJobsToSchedule uint
// Max number of jobs to scheduler per lease jobs call.
MaximumGangsToSchedule uint
// Max number of jobs to consider for a queue before giving up.
MaxQueueLookback uint
// Jobs leased to this executor must be at least this large.
Expand Down Expand Up @@ -82,8 +97,6 @@ func SchedulingConstraintsFromSchedulingConfig(
maximumResourceFractionToSchedule = m
}
return SchedulingConstraints{
MaximumJobsToSchedule: config.MaximumJobsToSchedule,
MaximumGangsToSchedule: config.MaximumGangsToSchedule,
MaxQueueLookback: config.MaxQueueLookback,
MinimumJobSize: minimumJobSize,
MaximumResourcesToSchedule: absoluteFromRelativeLimits(totalResources, maximumResourceFractionToSchedule),
Expand All @@ -99,47 +112,75 @@ func absoluteFromRelativeLimits(totalResources schedulerobjects.ResourceList, re
return absoluteLimits
}

func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext) (bool, string, error) {
// MaximumJobsToSchedule check.
if constraints.MaximumJobsToSchedule != 0 && sctx.NumScheduledJobs == int(constraints.MaximumJobsToSchedule) {
return false, UnschedulableReasonMaximumNumberOfJobsScheduled, nil
}

// MaximumGangsToSchedule check.
if constraints.MaximumGangsToSchedule != 0 && sctx.NumScheduledGangs == int(constraints.MaximumGangsToSchedule) {
return false, UnschedulableReasonMaximumNumberOfGangsScheduled, nil
}
// ScaleQuantity scales q in-place by a factor f.
// This functions overflows for quantities the milli value of which can't be expressed as an int64.
// E.g., 1Pi is ok, but not 10Pi.
func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity {
q.SetMilli(int64(math.Round(float64(q.MilliValue()) * f)))
return q
}

func (constraints *SchedulingConstraints) CheckRoundConstraints(sctx *schedulercontext.SchedulingContext, queue string) (bool, string, error) {
// MaximumResourcesToSchedule check.
if !sctx.ScheduledResources.IsStrictlyLessOrEqual(constraints.MaximumResourcesToSchedule) {
return false, UnschedulableReasonMaximumResourcesScheduled, nil
return false, MaximumResourcesScheduledUnschedulableReason, nil
}
return true, "", nil
}

func (constraints *SchedulingConstraints) CheckPerQueueAndPriorityClassConstraints(
func (constraints *SchedulingConstraints) CheckConstraints(
sctx *schedulercontext.SchedulingContext,
queue string,
priorityClassName string,
gctx *schedulercontext.GangSchedulingContext,
) (bool, string, error) {
qctx := sctx.QueueSchedulingContexts[queue]
qctx := sctx.QueueSchedulingContexts[gctx.Queue]
if qctx == nil {
return false, "", errors.Errorf("no QueueSchedulingContext for queue %s", queue)
return false, "", errors.Errorf("no QueueSchedulingContext for queue %s", gctx.Queue)
}

// Check that the job is large enough for this executor.
if ok, unschedulableReason := RequestsAreLargeEnough(gctx.TotalResourceRequests, constraints.MinimumJobSize); !ok {
return false, unschedulableReason, nil
}

// Global rate limiter check.
tokens := sctx.Limiter.TokensAt(sctx.Started)
if tokens <= 0 {
return false, GlobalRateLimitExceededUnschedulableReason, nil
}
if sctx.Limiter.Burst() < gctx.Cardinality() {
return false, GangExceedsGlobalBurstSizeUnschedulableReason, nil
}
if tokens < float64(gctx.Cardinality()) {
return false, GlobalRateLimitExceededByGangUnschedulableReason, nil
}

// Per-queue rate limiter check.
tokens = qctx.Limiter.TokensAt(sctx.Started)
if tokens <= 0 {
return false, QueueRateLimitExceededUnschedulableReason, nil
}
if qctx.Limiter.Burst() < gctx.Cardinality() {
return false, GangExceedsQueueBurstSizeUnschedulableReason, nil
}
if tokens < float64(gctx.Cardinality()) {
return false, QueueRateLimitExceededByGangUnschedulableReason, nil
}

// PriorityClassSchedulingConstraintsByPriorityClassName check.
if priorityClassConstraint, ok := constraints.PriorityClassSchedulingConstraintsByPriorityClassName[priorityClassName]; ok {
if !qctx.AllocatedByPriorityClass[priorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) {
return false, UnschedulableReasonMaximumResourcesPerQueueExceeded, nil
if priorityClassConstraint, ok := constraints.PriorityClassSchedulingConstraintsByPriorityClassName[gctx.PriorityClassName]; ok {
if !qctx.AllocatedByPriorityClass[gctx.PriorityClassName].IsStrictlyLessOrEqual(priorityClassConstraint.MaximumResourcesPerQueue) {
return false, MaximumResourcesPerQueueExceededUnschedulableReason, nil
}
}
return true, "", nil
}

// ScaleQuantity scales q in-place by a factor f.
// This functions overflows for quantities the milli value of which can't be expressed as an int64.
// E.g., 1Pi is ok, but not 10Pi.
func ScaleQuantity(q resource.Quantity, f float64) resource.Quantity {
q.SetMilli(int64(math.Round(float64(q.MilliValue()) * f)))
return q
func RequestsAreLargeEnough(totalResourceRequests, minRequest schedulerobjects.ResourceList) (bool, string) {
for t, minQuantity := range minRequest.Resources {
q := totalResourceRequests.Get(t)
if minQuantity.Cmp(q) == 1 {
return false, fmt.Sprintf("job requests %s %s, but the minimum is %s", q.String(), t, minQuantity.String())
}
}
return true, ""
}
4 changes: 2 additions & 2 deletions internal/scheduler/constraints/constraints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ func TestConstraints(t *testing.T) {
}{} // TODO: Add tests.
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx)
ok, unschedulableReason, err := tc.constraints.CheckRoundConstraints(tc.sctx, tc.queue)
require.NoError(t, err)
require.Equal(t, tc.globalUnschedulableReason == "", ok)
require.Equal(t, tc.globalUnschedulableReason, unschedulableReason)

ok, unschedulableReason, err = tc.constraints.CheckPerQueueAndPriorityClassConstraints(tc.sctx, tc.queue, tc.priorityClassName)
ok, unschedulableReason, err = tc.constraints.CheckConstraints(tc.sctx, nil)
require.NoError(t, err)
require.Equal(t, tc.perQueueAndPriorityClassUnschedulableReason == "", ok)
require.Equal(t, tc.perQueueAndPriorityClassUnschedulableReason, unschedulableReason)
Expand Down
Loading

0 comments on commit 293c58c

Please sign in to comment.