Skip to content

Commit

Permalink
[workload] Add evicted condition
Browse files Browse the repository at this point in the history
The new condition is set when a workload is preempted or
it's pod ready timeout expired.

In case of pods ready timeout, the condition's transition
timestamp will be used in ordering the workloads in the
scheduling queues.
  • Loading branch information
trasc committed Apr 19, 2023
1 parent 54b0118 commit 3e4fa75
Show file tree
Hide file tree
Showing 13 changed files with 290 additions and 24 deletions.
13 changes: 13 additions & 0 deletions apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,19 @@ const (
// WorkloadPodsReady means that at least `.spec.podSets[*].count` Pods are
// ready or have succeeded.
WorkloadPodsReady = "PodsReady"

// WorkloadEvicted means that the Workload was evicted by a ClusterQueue
WorkloadEvicted = "Evicted"
)

const (
// WorkloadEvictedByPreemption indicates that the workload was evicted
// in order to free resources for a workload with a higher priority.
WorkloadEvictedByPreemption = "Preempted"

// WorkloadEvictedByPodsReadyTimeout indicates that the eviction took
// place due to a PodsReady timeout.
WorkloadEvictedByPodsReadyTimeout = "PodsReadyTimeout"
)

// +kubebuilder:object:root=true
Expand Down
24 changes: 19 additions & 5 deletions pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,19 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
ctx = ctrl.LoggerInto(ctx, log)
log.V(2).Info("Reconciling Workload")

if evictionCond, evictionOngoing := workload.GetEvictidCondition(&wl); evictionOngoing {
if evictionCond.Reason == kueue.WorkloadEvictedByPodsReadyTimeout && apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadAdmitted) {
log.V(2).Info("Cancelling admission of the workload due to exceeding the PodsReady timeout")
err := workload.UnsetAdmissionWithCondition(ctx, r.client, &wl, "Evicted", evictionCond.Message)
return ctrl.Result{}, client.IgnoreNotFound(err)
} else {
// finish up the eviction
log.V(2).Info("Reset eviction condition")
err := workload.UpdateStatus(ctx, r.client, &wl, kueue.WorkloadEvicted, metav1.ConditionFalse, evictionCond.Reason, evictionCond.Message, "evict")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}

if apimeta.IsStatusConditionTrue(wl.Status.Conditions, kueue.WorkloadFinished) {
return ctrl.Result{}, nil
}
Expand Down Expand Up @@ -159,17 +172,18 @@ func (r *WorkloadReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
}

func (r *WorkloadReconciler) reconcileNotReadyTimeout(ctx context.Context, req ctrl.Request, wl *kueue.Workload) (ctrl.Result, error) {
log := ctrl.LoggerFrom(ctx)
countingTowardsTimeout, recheckAfter := r.admittedNotReadyWorkload(wl, realClock)
if !countingTowardsTimeout {
return ctrl.Result{}, nil
}
if recheckAfter > 0 {
klog.V(4).InfoS("Workload not yet ready and did not exceed its timeout", "workload", req.NamespacedName.String(), "recheckAfter", recheckAfter)
log.V(4).Info("Workload not yet ready and did not exceed its timeout", "recheckAfter", recheckAfter)
return ctrl.Result{RequeueAfter: recheckAfter}, nil
} else {
klog.V(2).InfoS("Cancelling admission of the workload due to exceeding the PodsReady timeout", "workload", req.NamespacedName.String())
err := workload.UnsetAdmissionWithCondition(ctx, r.client, wl,
"Evicted", fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String()))
log.V(2).Info("Start the eviction of the workload due to exceeding the PodsReady timeout")
err := workload.UpdateStatus(ctx, r.client, wl, kueue.WorkloadEvicted, metav1.ConditionTrue,
kueue.WorkloadEvictedByPodsReadyTimeout, fmt.Sprintf("Exceeded the PodsReady timeout %s", req.NamespacedName.String()), "evict")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
}
Expand Down Expand Up @@ -520,7 +534,7 @@ func (h *resourceUpdatesHandler) handle(ctx context.Context, obj client.Object,
}
}

func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, q workqueue.RateLimitingInterface, opts ...client.ListOption) {
func (h *resourceUpdatesHandler) queueReconcileForPending(ctx context.Context, _ workqueue.RateLimitingInterface, opts ...client.ListOption) {
log := ctrl.LoggerFrom(ctx)
lst := kueue.WorkloadList{}
opts = append(opts, client.MatchingFields{indexer.WorkloadAdmittedKey: string(metav1.ConditionFalse)})
Expand Down
2 changes: 1 addition & 1 deletion pkg/queue/cluster_queue_best_effort_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ClusterQueueBestEffortFIFO struct {
var _ ClusterQueue = &ClusterQueueBestEffortFIFO{}

func newClusterQueueBestEffortFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) {
cqImpl := newClusterQueueImpl(keyFunc, byCreationTime)
cqImpl := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime)
cqBE := &ClusterQueueBestEffortFIFO{
clusterQueueBase: cqImpl,
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/queue/cluster_queue_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -89,8 +90,10 @@ func (c *clusterQueueBase) PushOrUpdate(wInfo *workload.Info) {
oldInfo := c.inadmissibleWorkloads[key]
if oldInfo != nil {
// update in place if the workload was inadmissible and didn't change
// to potentially become admissible.
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) {
// to potentially become admissible, unless the Eviction status changed.
if equality.Semantic.DeepEqual(oldInfo.Obj.Spec, wInfo.Obj.Spec) &&
equality.Semantic.DeepEqual(apimeta.FindStatusCondition(oldInfo.Obj.Status.Conditions, kueue.WorkloadEvicted),
apimeta.FindStatusCondition(wInfo.Obj.Status.Conditions, kueue.WorkloadEvicted)) {
c.inadmissibleWorkloads[key] = wInfo
return
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/queue/cluster_queue_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
)

func Test_PushOrUpdate(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime)
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
if cq.Pending() != 0 {
t.Error("ClusterQueue should be empty")
Expand All @@ -57,7 +57,7 @@ func Test_PushOrUpdate(t *testing.T) {
}

func Test_Pop(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime)
now := time.Now()
wl1 := workload.NewInfo(utiltesting.MakeWorkload("workload-1", defaultNamespace).Creation(now).Obj())
wl2 := workload.NewInfo(utiltesting.MakeWorkload("workload-2", defaultNamespace).Creation(now.Add(time.Second)).Obj())
Expand All @@ -80,7 +80,7 @@ func Test_Pop(t *testing.T) {
}

func Test_Delete(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime)
wl1 := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
wl2 := utiltesting.MakeWorkload("workload-2", defaultNamespace).Obj()
cq.PushOrUpdate(workload.NewInfo(wl1))
Expand All @@ -101,7 +101,7 @@ func Test_Delete(t *testing.T) {
}

func Test_Info(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime)
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
if info := cq.Info(keyFunc(workload.NewInfo(wl))); info != nil {
t.Error("workload doesn't exist")
Expand All @@ -113,7 +113,7 @@ func Test_Info(t *testing.T) {
}

func Test_AddFromLocalQueue(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime)
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
queue := &LocalQueue{
items: map[string]*workload.Info{
Expand All @@ -131,7 +131,7 @@ func Test_AddFromLocalQueue(t *testing.T) {
}

func Test_DeleteFromLocalQueue(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime)
q := utiltesting.MakeLocalQueue("foo", "").ClusterQueue("cq").Obj()
qImpl := newLocalQueue(q)
wl1 := utiltesting.MakeWorkload("wl1", "").Queue(q.Name).Obj()
Expand Down Expand Up @@ -261,7 +261,7 @@ func TestClusterQueueImpl(t *testing.T) {

for name, test := range tests {
t.Run(name, func(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime)

err := cq.Update(utiltesting.MakeClusterQueue("cq").
NamespaceSelector(&metav1.LabelSelector{
Expand Down Expand Up @@ -315,7 +315,7 @@ func TestClusterQueueImpl(t *testing.T) {
}

func TestQueueInadmissibleWorkloadsDuringScheduling(t *testing.T) {
cq := newClusterQueueImpl(keyFunc, byCreationTime)
cq := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime)
cq.namespaceSelector = labels.Everything()
wl := utiltesting.MakeWorkload("workload-1", defaultNamespace).Obj()
cl := utiltesting.NewFakeClient(
Expand Down
11 changes: 7 additions & 4 deletions pkg/queue/cluster_queue_strict_fifo.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type ClusterQueueStrictFIFO struct {
var _ ClusterQueue = &ClusterQueueStrictFIFO{}

func newClusterQueueStrictFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) {
cqImpl := newClusterQueueImpl(keyFunc, byCreationTime)
cqImpl := newClusterQueueImpl(keyFunc, byCreationOrEvictionTime)
cqStrict := &ClusterQueueStrictFIFO{
clusterQueueBase: cqImpl,
}
Expand All @@ -40,10 +40,10 @@ func newClusterQueueStrictFIFO(cq *kueue.ClusterQueue) (ClusterQueue, error) {
return cqStrict, err
}

// byCreationTime is the function used by the clusterQueue heap algorithm to sort
// byCreationOrEvictionTime is the function used by the clusterQueue heap algorithm to sort
// workloads. It sorts workloads based on their priority.
// When priorities are equal, it uses workloads.creationTimestamp.
func byCreationTime(a, b interface{}) bool {
func byCreationOrEvictionTime(a, b interface{}) bool {
objA := a.(*workload.Info)
objB := b.(*workload.Info)
p1 := utilpriority.Priority(objA.Obj)
Expand All @@ -52,7 +52,10 @@ func byCreationTime(a, b interface{}) bool {
if p1 != p2 {
return p1 > p2
}
return objA.Obj.CreationTimestamp.Before(&objB.Obj.CreationTimestamp)

tA := workload.GetSchedulingTimestamp(objA.Obj)
tB := workload.GetSchedulingTimestamp(objB.Obj)
return !tB.Before(tA)
}

// RequeueIfNotPresent requeues if the workload is not present.
Expand Down
28 changes: 28 additions & 0 deletions pkg/queue/cluster_queue_strict_fifo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func TestFIFOClusterQueue(t *testing.T) {
func TestStrictFIFO(t *testing.T) {
t1 := time.Now()
t2 := t1.Add(time.Second)
t3 := t2.Add(time.Second)
for _, tt := range []struct {
name string
w1 *kueue.Workload
Expand Down Expand Up @@ -146,6 +147,33 @@ func TestStrictFIFO(t *testing.T) {
},
expected: "w1",
},
{
name: "w1.priority equals w2.priority and w1.create time is earlier than w2.create time but w1 was evicted",
w1: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "w1",
CreationTimestamp: metav1.NewTime(t1),
},
Status: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(t3),
Reason: kueue.WorkloadEvictedByPodsReadyTimeout,
Message: "by test",
},
},
},
},
w2: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "w2",
CreationTimestamp: metav1.NewTime(t2),
},
},
expected: "w2",
},
{
name: "p1.priority is lower than p2.priority and w1.create time is earlier than w2.create time",
w1: &kueue.Workload{
Expand Down
7 changes: 7 additions & 0 deletions pkg/scheduler/preemption/preemption.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ func (p *Preemptor) issuePreemptions(ctx context.Context, targets []*workload.In
errCh.SendErrorWithCancel(err, cancel)
return
}

err = workload.UpdateStatus(ctx, p.client, target.Obj, kueue.WorkloadEvicted, metav1.ConditionTrue, kueue.WorkloadEvictedByPreemption, "Preempted to accommodate a higher priority Workload", "evict")
if err != nil {
errCh.SendErrorWithCancel(err, cancel)
return
}

origin := "ClusterQueue"
if cq.Name != target.ClusterQueue {
origin = "cohort"
Expand Down
4 changes: 3 additions & 1 deletion pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,9 @@ func (e entryOrdering) Less(i, j int) bool {
return !aBorrows
}
// 2. FIFO.
return a.Obj.CreationTimestamp.Before(&b.Obj.CreationTimestamp)
aComparisonTimestamp := workload.GetSchedulingTimestamp(a.Obj)
bComparisonTimestamp := workload.GetSchedulingTimestamp(b.Obj)
return aComparisonTimestamp.Before(bComparisonTimestamp)
}

func (s *Scheduler) requeueAndUpdate(log logr.Logger, ctx context.Context, e entry) {
Expand Down
51 changes: 48 additions & 3 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -737,15 +737,15 @@ func TestEntryOrdering(t *testing.T) {
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "gamma",
CreationTimestamp: metav1.NewTime(now.Add(2 * time.Second)),
CreationTimestamp: metav1.NewTime(now.Add(3 * time.Second)),
}},
},
},
{
Info: workload.Info{
Obj: &kueue.Workload{ObjectMeta: metav1.ObjectMeta{
Name: "delta",
CreationTimestamp: metav1.NewTime(now.Add(time.Second)),
CreationTimestamp: metav1.NewTime(now.Add(3 * time.Second)),
}},
},
assignment: flavorassigner.Assignment{
Expand All @@ -754,13 +754,58 @@ func TestEntryOrdering(t *testing.T) {
},
},
},
{
Info: workload.Info{
Obj: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "epsilon",
CreationTimestamp: metav1.NewTime(now),
},
Status: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now.Add(2 * time.Second)),
Reason: kueue.WorkloadEvictedByPodsReadyTimeout,
},
},
},
},
},
assignment: flavorassigner.Assignment{
TotalBorrow: cache.FlavorResourceQuantities{
"flavor": {},
},
},
},
{
Info: workload.Info{
Obj: &kueue.Workload{
ObjectMeta: metav1.ObjectMeta{
Name: "zeta",
CreationTimestamp: metav1.NewTime(now.Add(2 * time.Second)),
},
Status: kueue.WorkloadStatus{
Conditions: []metav1.Condition{
{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
LastTransitionTime: metav1.NewTime(now.Add(2 * time.Second)),
Reason: kueue.WorkloadEvictedByPodsReadyTimeout,
},
},
},
},
},
},
}
sort.Sort(entryOrdering(input))
order := make([]string, len(input))
for i, e := range input {
order[i] = e.Obj.Name
}
wantOrder := []string{"beta", "gamma", "alpha", "delta"}
wantOrder := []string{"beta", "zeta", "gamma", "alpha", "epsilon", "delta"}
if diff := cmp.Diff(wantOrder, order); diff != "" {
t.Errorf("Unexpected order (-want,+got):\n%s", diff)
}
Expand Down
20 changes: 20 additions & 0 deletions pkg/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"

corev1 "k8s.io/api/core/v1"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -273,3 +274,22 @@ func AdmissionPatch(w *kueue.Workload) *kueue.Workload {
wlCopy.Status.Admission = w.Status.Admission.DeepCopy()
return wlCopy
}

// GetEvictedCondition returns the Evicted condition if present and a bool
// indication if the eviction is ongoing
func GetEvictidCondition(w *kueue.Workload) (*metav1.Condition, bool) {
cond := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadEvicted)
if cond == nil || cond.Status != metav1.ConditionTrue {
return cond, false
}
return cond, true
}

// GetSchedulingTimestamp return the timestamp to be used by the scheduler. It could
// be the workload creation time or the last time a PodsReady timeout has occurred.
func GetSchedulingTimestamp(w *kueue.Workload) *metav1.Time {
if c, _ := GetEvictidCondition(w); c != nil && c.Reason == kueue.WorkloadEvictedByPodsReadyTimeout {
return &c.LastTransitionTime
}
return &w.CreationTimestamp
}
Loading

0 comments on commit 3e4fa75

Please sign in to comment.