Skip to content

Commit

Permalink
[workload] Add evicted condition
Browse files Browse the repository at this point in the history
The new condition is set when a workload is preempted or
it's pod ready timeout expired.

In case of pods ready timeout, the condition's transition
timestamp will be used in ordering the workloads in the
scheduling queues.
  • Loading branch information
trasc committed Apr 26, 2023
1 parent b1af37a commit 80b1009
Show file tree
Hide file tree
Showing 14 changed files with 345 additions and 62 deletions.
13 changes: 13 additions & 0 deletions apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,19 @@ const (
// WorkloadPodsReady means that at least `.spec.podSets[*].count` Pods are
// ready or have succeeded.
WorkloadPodsReady = "PodsReady"

// WorkloadEvicted means that the Workload was evicted by a ClusterQueue
WorkloadEvicted = "Evicted"
)

const (
// WorkloadEvictedByPreemption indicates that the workload was evicted
// in order to free resources for a workload with a higher priority.
WorkloadEvictedByPreemption = "Preempted"

// WorkloadEvictedByPodsReadyTimeout indicates that the eviction took
// place due to a PodsReady timeout.
WorkloadEvictedByPodsReadyTimeout = "PodsReadyTimeout"
)

// +kubebuilder:object:root=true
Expand Down
34 changes: 23 additions & 11 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,17 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling Workload")

// if a pods ready timeout eviction is ongoing.
if evictionCond := apimeta.FindStatusCondition(wl.Status.Conditions, kueue.WorkloadEvicted); evictionCond != nil && evictionCond.Status == metav1.ConditionTrue &&
evictionCond.Reason == kueue.WorkloadEvictedByPodsReadyTimeout &&
apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) {

log.V(2).Info("Cancelling admission of the workload due to exceeding the PodsReady timeout")
workload.UnsetAdmissionWithCondition(&wl, "Evicted", evictionCond.Message)
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
return ctrl.Result{}, nil
}
Expand All @@ -136,40 +147,41 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

if !r.queues.QueueForWorkloadExists(&wl) {
log.V(3).Info("Workload is inadmissible because of missing LocalQueue", "localQueue", klog.KRef(wl.Namespace, wl.Spec.QueueName))
err := workload.UnsetAdmissionWithCondition(ctx, r.client, &wl,
"Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName))
workload.UnsetAdmissionWithCondition(&wl, "Inadmissible", fmt.Sprintf("LocalQueue %s doesn't exist", wl.Spec.QueueName))
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

cqName, cqOk := r.queues.ClusterQueueForWorkload(&wl)
if !cqOk {
log.V(3).Info("Workload is inadmissible because of missing ClusterQueue", "clusterQueue", klog.KRef("", cqName))
err := workload.UnsetAdmissionWithCondition(ctx, r.client, &wl,
"Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName))
workload.UnsetAdmissionWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s doesn't exist", cqName))
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}

if !r.cache.ClusterQueueActive(cqName) {
log.V(3).Info("Workload is inadmissible because ClusterQueue is inactive", "clusterQueue", klog.KRef("", cqName))
err := workload.UnsetAdmissionWithCondition(ctx, r.client, &wl,
"Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName))
workload.UnsetAdmissionWithCondition(&wl, "Inadmissible", fmt.Sprintf("ClusterQueue %s is inactive", cqName))
err := workload.ApplyAdmissionStatus(ctx, r.client, &wl, true)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
return ctrl.Result{}, nil
}

func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req ctrl.Request, wl *kueue.Workload) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
countingTowardsTimeout, recheckAfter := r.admittedNotReadyWorkload(wl, realClock)
if !countingTowardsTimeout {
return ctrl.Result{}, nil
}
if recheckAfter > 0 {
klog.V(4).InfoS("Workload not yet ready and did not exceed its timeout", "workload", req.NamespacedName.String(), "recheckAfter", recheckAfter)
log.V(4).Info("Workload not yet ready and did not exceed its timeout", "recheckAfter", recheckAfter)
return ctrl.Result{RequeueAfter: recheckAfter}, nil
} else {
klog.V(2).InfoS("Cancelling admission of the workload due to exceeding the PodsReady timeout", "workload", req.NamespacedName.String())
err := workload.UnsetAdmissionWithCondition(ctx, r.client, wl,
"Evicted", fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String()))
log.V(2).Info("Start the eviction of the workload due to exceeding the PodsReady timeout")
workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByPodsReadyTimeout, fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String()))
err := workload.ApplyAdmissionStatus(ctx, r.client, wl, false)
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
Expand Down Expand Up @@ -520,7 +532,7 @@ func (h *resourceUpdatesHandler) handle(ctx context.Context, obj client.Object,
}
}

func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, q workqueue.RateLimitingInterface, opts ...client.ListOption) {
func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, _ workqueue.RateLimitingInterface, opts ...client.ListOption) {
log := ctrl.LoggerFrom(ctx)
lst := kueue.WorkloadList{}
opts = append(opts, client.MatchingFields{indexer.WorkloadAdmittedKey: string(metav1.ConditionFalse)})
Expand Down
2 changes: 1 addition & 1 deletion pkg/queue/cluster_queue_best_effort_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ClusterQueueBestEffortFIFO struct {
var _ ClusterQueue = &ClusterQueueBestEffortFIFO{}

func newClusterQueueBestEffortFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) {
cqImpl := newClusterQueueImpl(keyFunc, byCreationTime)
cqImpl := newClusterQueueImpl(keyFunc, queueOrdering)
cqBE := &ClusterQueueBestEffortFIFO{
clusterQueueBase: cqImpl,
}
Expand Down
8 changes: 6 additions & 2 deletions pkg/queue/cluster_queue_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -89,8 +90,11 @@ func (c *clusterQueueBase) PushOrUpdate(wInfo *workload.Info) {
oldInfo := c.inadmissibleWorkloads[key]
if oldInfo != nil {
// update in place if the workload was inadmissible and didn't change
// to potentially become admissible.
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) {
// to potentially become admissible, unless the Eviction status changed
// which can affect the workloads order in the queue.
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) &&
equality.Semantic.DeepEqual(apimeta.FindStatusCondition(oldInfo.Obj.Status.Conditions, kueue.WorkloadEvicted),
apimeta.FindStatusCondition(wInfo.Obj.Status.Conditions, kueue.WorkloadEvicted)) {
c.inadmissibleWorkloads[key] = wInfo
return
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/queue/cluster_queue_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
)

func Test_PushOrUpdate(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, queueOrdering)
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
if cq.Pending() != 0 {
t.Error("ClusterQueue should be empty")
Expand All @@ -57,7 +57,7 @@ func Test_PushOrUpdate(t *testing.T) {
}

func Test_Pop(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, queueOrdering)
now := time.Now()
wl1 := workload.NewInfo(utiltesting.MakeWorkload("workload-1", defaultNamespace).Creation(now).Obj())
wl2 := workload.NewInfo(utiltesting.MakeWorkload("workload-2", defaultNamespace).Creation(now.Add(time.Second)).Obj())
Expand All @@ -80,7 +80,7 @@ func Test_Pop(t *testing.T) {
}

func Test_Delete(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, queueOrdering)
wl1 := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
wl2 := utiltesting.MakeWorkload("workload-2", defaultNamespace).Obj()
cq.PushOrUpdate(workload.NewInfo(wl1))
Expand All @@ -101,7 +101,7 @@ func Test_Delete(t *testing.T) {
}

func Test_Info(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, queueOrdering)
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
if info := cq.Info(keyFunc(workload.NewInfo(wl))); info != nil {
t.Error("workload doesn't exist")
Expand All @@ -113,7 +113,7 @@ func Test_Info(t *testing.T) {
}

func Test_AddFromLocalQueue(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, queueOrdering)
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
queue := &LocalQueue{
items: map[string]*workload.Info{
Expand All @@ -131,7 +131,7 @@ func Test_AddFromLocalQueue(t *testing.T) {
}

func Test_DeleteFromLocalQueue(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, queueOrdering)
q := utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj()
qImpl := newLocalQueue(q)
wl1 := utiltesting.MakeWorkload("wl1", "").Queue(q.Name).Obj()
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestClusterQueueImpl(t *testing.T) {

for name, test := range tests {
t.Run(name, func(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, queueOrdering)

err := cq.Update(utiltesting.MakeClusterQueue("cq").
NamespaceSelector(&metav1.LabelSelector{
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestClusterQueueImpl(t *testing.T) {
}

func TestQueueInadmissibleWorkloadsDuringScheduling(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, queueOrdering)
cq.namespaceSelector = labels.Everything()
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
cl := utiltesting.NewFakeClient(
Expand Down
16 changes: 10 additions & 6 deletions pkg/queue/cluster_queue_strict_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ClusterQueueStrictFIFO struct {
var _ ClusterQueue = &ClusterQueueStrictFIFO{}

func newClusterQueueStrictFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) {
cqImpl := newClusterQueueImpl(keyFunc, byCreationTime)
cqImpl := newClusterQueueImpl(keyFunc, queueOrdering)
cqStrict := &ClusterQueueStrictFIFO{
clusterQueueBase: cqImpl,
}
Expand All @@ -40,10 +40,11 @@ func newClusterQueueStrictFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) {
return cqStrict, err
}

// byCreationTime is the function used by the clusterQueue heap algorithm to sort
// workloads. It sorts workloads based on their priority.
// When priorities are equal, it uses workloads.creationTimestamp.
func byCreationTime(a, b interface{}) bool {
// queueOrdering is the function used by the clusterQueue heap algorithm
// to sort workloads. It sorts workloads based on their priority.
// When priorities are equal, it uses the workload's creation or eviction
// time.
func queueOrdering(a, b interface{}) bool {
objA := a.(*workload.Info)
objB := b.(*workload.Info)
p1 := utilpriority.Priority(objA.Obj)
Expand All @@ -52,7 +53,10 @@ func byCreationTime(a, b interface{}) bool {
if p1 != p2 {
return p1 > p2
}
return objA.Obj.CreationTimestamp.Before(&objB.Obj.CreationTimestamp)

tA := workload.GetQueueOrderTimestamp(objA.Obj)
tB := workload.GetQueueOrderTimestamp(objB.Obj)
return !tB.Before(tA)
}

// RequeueIfNotPresent requeues if the workload is not present.
Expand Down
28 changes: 28 additions & 0 deletions pkg/queue/cluster_queue_strict_fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func TestFIFOClusterQueue(t *testing.T) {
func TestStrictFIFO(t *testing.T) {
t1 := time.Now()
t2 := t1.Add(time.Second)
t3 := t2.Add(time.Second)
for _, tt := range []struct {
name string
w1 *kueue.Workload
Expand Down Expand Up @@ -146,6 +147,33 @@ func TestStrictFIFO(t *testing.T) {
},
expected: "w1",
},
{
name: "w1.priority equals w2.priority and w1.create time is earlier than w2.create time but w1 was evicted",
w1: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
CreationTimestamp: metav1.NewTime(t1),
},
Status: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(t3),
Reason: kueue.WorkloadEvictedByPodsReadyTimeout,
Message: "by test",
},
},
},
},
w2: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
CreationTimestamp: metav1.NewTime(t2),
},
},
expected: "w2",
},
{
name: "p1.priority is lower than p2.priority and w1.create time is earlier than w2.create time",
w1: &kueue.Workload{
Expand Down
8 changes: 8 additions & 0 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,14 @@ func (p *Preemptor) issuePreemptions(ctx context.Context, targets []*workload.In
errCh.SendErrorWithCancel(err, cancel)
return
}

workload.SetEvictedCondition(patch, kueue.WorkloadEvictedByPreemption, "Preempted to accommodate a higher priority Workload")
err = workload.ApplyAdmissionStatus(ctx, p.client, patch, false)
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}

origin := "ClusterQueue"
if cq.Name != target.ClusterQueue {
origin = "cohort"
Expand Down
27 changes: 11 additions & 16 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -39,7 +38,6 @@ import (

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/metrics"
"sigs.k8s.io/kueue/pkg/queue"
"sigs.k8s.io/kueue/pkg/scheduler/flavorassigner"
Expand Down Expand Up @@ -180,7 +178,8 @@ func (s *Scheduler) schedule(ctx context.Context) {
log.V(5).Info("Waiting for all admitted workloads to be in the PodsReady condition")
// Block admission until all currently admitted workloads are in
// PodsReady condition if the waitForPodsReady is enabled
if err := workload.UnsetAdmissionWithCondition(ctx, s.client, e.Obj, "Waiting", "waiting for all admitted workloads to be in PodsReady condition"); err != nil {
workload.UnsetAdmissionWithCondition(e.Obj, "Waiting", "waiting for all admitted workloads to be in PodsReady condition")
if err := workload.ApplyAdmissionStatus(ctx, s.client, e.Obj, true); err != nil {
log.Error(err, "Could not update Workload status")
}
s.cache.WaitForPodsReady(ctx)
Expand Down Expand Up @@ -338,23 +337,16 @@ func (s *Scheduler) admit(ctx context.Context, e *entry) error {
ClusterQueue: kueue.ClusterQueueReference(e.ClusterQueue),
PodSetAssignments: e.assignment.ToAPI(),
}
newWorkload.Status.Admission = admission

workload.SetAdmission(newWorkload, admission)
if err := s.cache.AssumeWorkload(newWorkload); err != nil {
return err
}
e.status = assumed
log.V(2).Info("Workload assumed in the cache")

s.admissionRoutineWrapper.Run(func() {
patch := workload.AdmissionPatch(newWorkload)
patch.Status.Conditions = []metav1.Condition{{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.Now(),
Reason: "Admitted",
Message: fmt.Sprintf("Admitted by ClusterQueue %s", newWorkload.Status.Admission.ClusterQueue),
}}
err := s.applyAdmission(ctx, patch)
err := s.applyAdmission(ctx, newWorkload)
if err == nil {
waitTime := time.Since(e.Obj.CreationTimestamp.Time)
s.recorder.Eventf(newWorkload, corev1.EventTypeNormal, "Admitted", "Admitted by ClusterQueue %v, wait time was %.0fs", admission.ClusterQueue, waitTime.Seconds())
Expand All @@ -378,7 +370,7 @@ func (s *Scheduler) admit(ctx context.Context, e *entry) error {
}

func (s *Scheduler) applyAdmissionWithSSA(ctx context.Context, w *kueue.Workload) error {
return s.client.Status().Patch(ctx, w, client.Apply, client.FieldOwner(constants.AdmissionName))
return workload.ApplyAdmissionStatus(ctx, s.client, w, false)
}

type entryOrdering []entry
Expand All @@ -404,7 +396,9 @@ func (e entryOrdering) Less(i, j int) bool {
return !aBorrows
}
// 2. FIFO.
return a.Obj.CreationTimestamp.Before(&b.Obj.CreationTimestamp)
aComparisonTimestamp := workload.GetQueueOrderTimestamp(a.Obj)
bComparisonTimestamp := workload.GetQueueOrderTimestamp(b.Obj)
return aComparisonTimestamp.Before(bComparisonTimestamp)
}

func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, e entry) {
Expand All @@ -416,7 +410,8 @@ func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, e ent
log.V(2).Info("Workload re-queued", "workload", klog.KObj(e.Obj), "clusterQueue", klog.KRef("", e.ClusterQueue), "queue", klog.KRef(e.Obj.Namespace, e.Obj.Spec.QueueName), "requeueReason", e.requeueReason, "added", added)

if e.status == notNominated {
err := workload.UnsetAdmissionWithCondition(ctx, s.client, e.Obj, "Pending", e.inadmissibleMsg)
workload.UnsetAdmissionWithCondition(e.Obj, "Pending", e.inadmissibleMsg)
err := workload.ApplyAdmissionStatus(ctx, s.client, e.Obj, true)
if err != nil {
log.Error(err, "Could not update Workload status")
}
Expand Down
Loading

0 comments on commit 80b1009

Please sign in to comment.