Skip to content

Commit

Permalink
Only evict jobs if we know about their queue (armadaproject#4001)
Browse files Browse the repository at this point in the history
* Only evict jobs if we know about their queue

Signed-off-by: Chris Martin <[email protected]>

* fix tests

Signed-off-by: Chris Martin <[email protected]>

* add test

Signed-off-by: Chris Martin <[email protected]>

---------

Signed-off-by: Chris Martin <[email protected]>
  • Loading branch information
d80tb7 authored Oct 11, 2024
1 parent 6e481d4 commit 3cd72a2
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 22 deletions.
7 changes: 1 addition & 6 deletions internal/scheduler/scheduling/context/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,7 @@ type JobSchedulingContext struct {
}

func (jctx *JobSchedulingContext) IsHomeJob(currentPool string) bool {
// Away jobs can never have been scheduled in this round
// and therefore must have an active run
if jctx.Job.Queued() || jctx.Job.LatestRun() == nil {
return true
}
return jctx.Job.LatestRun().Pool() == currentPool
return IsHomeJob(jctx.Job, currentPool)
}

func (jctx *JobSchedulingContext) String() string {
Expand Down
22 changes: 18 additions & 4 deletions internal/scheduler/scheduling/context/scheduling.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/armadaproject/armada/internal/common/armadaerrors"
armadamaps "github.com/armadaproject/armada/internal/common/maps"
"github.com/armadaproject/armada/internal/scheduler/jobdb"
"github.com/armadaproject/armada/internal/scheduler/schedulerobjects"
"github.com/armadaproject/armada/internal/scheduler/scheduling/fairness"
)
Expand Down Expand Up @@ -319,11 +320,16 @@ func (sctx *SchedulingContext) EvictGang(gctx *GangSchedulingContext) (bool, err
return allJobsScheduledInThisRound, nil
}

// QueueContextExists returns true if we know about the queue associated with the job. An example of when this can
// return false is when a job is running on a node
func (sctx *SchedulingContext) QueueContextExists(job *jobdb.Job) bool {
queue := sctx.resolveQueueName(job)
_, ok := sctx.QueueSchedulingContexts[queue]
return ok
}

func (sctx *SchedulingContext) EvictJob(jctx *JobSchedulingContext) (bool, error) {
queue := jctx.Job.Queue()
if !jctx.IsHomeJob(sctx.Pool) {
queue = CalculateAwayQueueName(jctx.Job.Queue())
}
queue := sctx.resolveQueueName(jctx.Job)
qctx, ok := sctx.QueueSchedulingContexts[queue]
if !ok {
return false, errors.Errorf("failed adding job %s to scheduling context: no context for queue %s", jctx.JobId, queue)
Expand Down Expand Up @@ -391,3 +397,11 @@ func (sctx *SchedulingContext) FairnessError() float64 {
}
return fairnessError
}

func (sctx *SchedulingContext) resolveQueueName(job *jobdb.Job) string {
queue := job.Queue()
if !IsHomeJob(job, sctx.Pool) {
queue = CalculateAwayQueueName(job.Queue())
}
return queue
}
11 changes: 11 additions & 0 deletions internal/scheduler/scheduling/context/util.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
package context

import "github.com/armadaproject/armada/internal/scheduler/jobdb"

func CalculateAwayQueueName(queueName string) string {
return queueName + "-away"
}

func IsHomeJob(job *jobdb.Job, currentPool string) bool {
// Away jobs can never have been scheduled in this round
// and therefore must have an active run
if job.Queued() || job.LatestRun() == nil {
return true
}
return job.LatestRun().Pool() == currentPool
}
14 changes: 14 additions & 0 deletions internal/scheduler/scheduling/preempting_queue_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
if job.LatestRun().Pool() != sch.schedulingContext.Pool {
return false
}
if !sch.schedulingContext.QueueContextExists(job) {
ctx.Warnf("No queue context found for job %s. This job cannot be evicted", job.Id())
return false
}
priorityClass := job.PriorityClass()
if !priorityClass.Preemptible {
return false
Expand Down Expand Up @@ -162,6 +166,7 @@ func (sch *PreemptingQueueScheduler) Schedule(ctx *armadacontext.Context) (*Sche
evictorResult, inMemoryJobRepo, err = sch.evict(
armadacontext.WithLogField(ctx, "stage", "evict oversubscribed"),
NewOversubscribedEvictor(
sch.schedulingContext,
sch.jobRepo,
sch.nodeDb,
),
Expand Down Expand Up @@ -660,9 +665,14 @@ func NewFilteredEvictor(
}
}

type queueChecker interface {
QueueContextExists(job *jobdb.Job) bool
}

// NewOversubscribedEvictor returns a new evictor that
// for each node evicts all preemptible jobs of a priority class for which at least one job could not be scheduled
func NewOversubscribedEvictor(
queueChecker queueChecker,
jobRepo JobRepository,
nodeDb *nodedb.NodeDb,
) *Evictor {
Expand All @@ -687,6 +697,10 @@ func NewOversubscribedEvictor(
return len(overSubscribedPriorities) > 0
},
jobFilter: func(ctx *armadacontext.Context, job *jobdb.Job) bool {
if !queueChecker.QueueContextExists(job) {
ctx.Warnf("No queue context found for job %s. This job cannot be evicted", job.Id())
return false
}
priorityClass := job.PriorityClass()
if !priorityClass.Preemptible {
return false
Expand Down
59 changes: 47 additions & 12 deletions internal/scheduler/scheduling/preempting_queue_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@ import (
"github.com/armadaproject/armada/internal/scheduler/testfixtures"
)

type testQueueContextChecker struct {
jobIds map[string]bool
}

func (t testQueueContextChecker) QueueContextExists(job *jobdb.Job) bool {
return t.jobIds[job.Id()]
}

func TestEvictOversubscribed(t *testing.T) {
config := testfixtures.TestSchedulingConfig()

Expand Down Expand Up @@ -58,6 +66,7 @@ func TestEvictOversubscribed(t *testing.T) {
require.NoError(t, err)

evictor := NewOversubscribedEvictor(
testQueueContextChecker{},
jobDbTxn,
nodeDb)
result, err := evictor.Evict(armadacontext.Background(), nodeDbTxn)
Expand Down Expand Up @@ -95,12 +104,8 @@ func TestPreemptingQueueScheduler(t *testing.T) {
Rounds []SchedulingRound
// Map from queue to the priority factor associated with that queue.
PriorityFactorByQueue map[string]float64
// Initial resource usage for all queues. This value is used across all rounds,
// i.e., we don't update it based on preempted/scheduled jobs.
InitialAllocationByQueueAndPriorityClass map[string]schedulerobjects.QuantityByTAndResourceType[string]
// Total resources across all clusters.
// If empty, it is computed as the total resources across the provided nodes.
TotalResources schedulerobjects.ResourceList
// Map of nodeId to jobs running on those nodes
InitialRunningJobs map[int][]*jobdb.Job
}{
"balancing three queues": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Expand Down Expand Up @@ -236,6 +241,26 @@ func TestPreemptingQueueScheduler(t *testing.T) {
"D": 100,
},
},
"don't prempt jobs where we don't know the queue": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Rounds: []SchedulingRound{
{
JobsByQueue: map[string][]*jobdb.Job{
"A": testfixtures.N1Cpu4GiJobs("A", testfixtures.PriorityClass1, 32),
},
ExpectedScheduledIndices: map[string][]int{
"A": testfixtures.IntRange(0, 23),
},
},
},
InitialRunningJobs: map[int][]*jobdb.Job{
0: testfixtures.N1Cpu4GiJobs("B", testfixtures.PriorityClass1, 8),
},
PriorityFactorByQueue: map[string]float64{
"A": 1,
},
},
"avoid preemption when not improving fairness": {
SchedulingConfig: testfixtures.TestSchedulingConfig(),
Nodes: testfixtures.N32CpuNodes(1, testfixtures.TestPriorities),
Expand Down Expand Up @@ -1879,10 +1904,22 @@ func TestPreemptingQueueScheduler(t *testing.T) {
jobDb := jobdb.NewJobDb(tc.SchedulingConfig.PriorityClasses, tc.SchedulingConfig.DefaultPriorityClassName, stringinterner.New(1024), testfixtures.TestResourceListFactory)
jobDbTxn := jobDb.WriteTxn()

// Add all the initial jobs, creating runs for them
for nodeIdx, jobs := range tc.InitialRunningJobs {
node := tc.Nodes[nodeIdx]
for _, job := range jobs {
err := jobDbTxn.Upsert([]*jobdb.Job{
job.WithQueued(false).
WithNewRun(node.GetExecutor(), node.GetId(), node.GetName(), node.GetPool(), job.PriorityClass().Priority),
})
require.NoError(t, err)
}
}

// Accounting across scheduling rounds.
roundByJobId := make(map[string]int)
indexByJobId := make(map[string]int)
allocatedByQueueAndPriorityClass := armadamaps.DeepCopy(tc.InitialAllocationByQueueAndPriorityClass)
allocatedByQueueAndPriorityClass := make(map[string]schedulerobjects.QuantityByTAndResourceType[string])
nodeIdByJobId := make(map[string]string)
var jobIdsByGangId map[string]map[string]bool
var gangIdByJobId map[string]string
Expand Down Expand Up @@ -1994,9 +2031,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
}

// If not provided, set total resources equal to the aggregate over tc.Nodes.
if tc.TotalResources.Resources == nil {
tc.TotalResources = nodeDb.TotalKubernetesResources()
}
totalResources := nodeDb.TotalKubernetesResources()

fairnessCostProvider, err := fairness.NewDominantResourceFairness(
nodeDb.TotalKubernetesResources(),
Expand All @@ -2007,7 +2042,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
testfixtures.TestPool,
fairnessCostProvider,
limiter,
tc.TotalResources,
totalResources,
)
sctx.Started = schedulingStarted.Add(time.Duration(i) * schedulingInterval)

Expand All @@ -2023,7 +2058,7 @@ func TestPreemptingQueueScheduler(t *testing.T) {
)
require.NoError(t, err)
}
constraints := schedulerconstraints.NewSchedulingConstraints("pool", tc.TotalResources, tc.SchedulingConfig, nil)
constraints := schedulerconstraints.NewSchedulingConstraints("pool", totalResources, tc.SchedulingConfig, nil)
sctx.UpdateFairShares()
sch := NewPreemptingQueueScheduler(
sctx,
Expand Down

0 comments on commit 3cd72a2

Please sign in to comment.