Skip to content

Commit

Permalink
Suspendable jobs feature
Browse files Browse the repository at this point in the history
removes evicted condition status update, removes job_controller unit tests, adds pkg job unit tests

updated workload_controller log

added StrictFIFO queueing strategy in e2e test while creating a second job when the first one is suspended

changed behaviour in reconciler to allow kueue suspend a job directly from the workload signal, updated integration and job test, still missing to update the field name

changing implementation to use active and not suspending the job manually

updated yaml file

addressed comments, removed log lines, used step 6 to handle workload eviction, fixed Active field to be a pointer, fixed e2e tests to use on Consistently method, still missing to make one unit test case work

updated unit tests and integration tests with new implementation to evict a workload based on spec.active field

added crd charts yaml

added workloadspec.go file

updated e2e and unit tests, missing to rebase commits

added zz generated file

addressed comments, moved e2e test to happen inside integration tests

nit comments addressed

nit comments addressed

final nit comments addressed and changed Eviction constant to be WorkloadEvictedByDeactivation

deleted by accident a comment, reverted

updated workload types comment
  • Loading branch information
vicentefb committed Dec 1, 2023
1 parent 5825d15 commit 8f3ef1b
Show file tree
Hide file tree
Showing 10 changed files with 353 additions and 2 deletions.
15 changes: 15 additions & 0 deletions apis/kueue/v1beta1/workload_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ type WorkloadSpec struct {
// +kubebuilder:default=""
// +kubebuilder:validation:Enum=kueue.x-k8s.io/workloadpriorityclass;scheduling.k8s.io/priorityclass;""
PriorityClassSource string `json:"priorityClassSource,omitempty"`

// Active determines if a workload can be admitted into a queue.
// Changing active from true to false will evict any running workloads.
// Possible values are:
//
// - false: indicates that a workload should never be admitted and evicts running workloads
// - true: indicates that a workload can be evaluated for admission into it's respective queue.
//
// Defaults to true
// +kubebuilder:default=true
Active *bool `json:"active,omitempty"`
}

type Admission struct {
Expand Down Expand Up @@ -267,6 +278,10 @@ const (
// WorkloadEvictedByAdmissionCheck indicates that the workload was evicted
// beacuse at least one admission check transitioned to False.
WorkloadEvictedByAdmissionCheck = "AdmissionCheck"

// WorkloadEvictedByDeactivation indicates that the workload was evicted
// because spec.active is set to false.
WorkloadEvictedByDeactivation = "InactiveWorkload"
)

// +genclient
Expand Down
5 changes: 5 additions & 0 deletions apis/kueue/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ spec:
spec:
description: WorkloadSpec defines the desired state of Workload
properties:
active:
default: true
description: "Active determines if a workload can be admitted into
a queue. Changing active from true to false will evict any running
workloads. Possible values are: \n - false: indicates that a workload
should never be admitted and evicts running workloads - true: indicates
that a workload can be evaluated for admission into it's respective
queue. \n Defaults to true"
type: boolean
podSets:
description: podSets is a list of sets of homogeneous pods, each described
by a Pod spec and a count. There must be at least one element and
Expand Down
9 changes: 9 additions & 0 deletions client-go/applyconfiguration/kueue/v1beta1/workloadspec.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions config/components/crd/bases/kueue.x-k8s.io_workloads.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ spec:
spec:
description: WorkloadSpec defines the desired state of Workload
properties:
active:
default: true
description: "Active determines if a workload can be admitted into
a queue. Changing active from true to false will evict any running
workloads. Possible values are: \n - false: indicates that a workload
should never be admitted and evicts running workloads - true: indicates
that a workload can be evaluated for admission into it's respective
queue. \n Defaults to true"
type: boolean
podSets:
description: podSets is a list of sets of homogeneous pods, each described
by a Pod spec and a count. There must be at least one element and
Expand Down
7 changes: 6 additions & 1 deletion pkg/controller/core/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
Expand Down Expand Up @@ -376,6 +377,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
status := workloadStatus(wl)
log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status)
ctx := ctrl.LoggerInto(context.Background(), log)
active := ptr.Deref(wl.Spec.Active, true)

prevQueue := oldWl.Spec.QueueName
if prevQueue != wl.Spec.QueueName {
Expand All @@ -398,7 +400,10 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool {
workload.AdjustResources(ctrl.LoggerInto(ctx, log), r.client, wlCopy)

switch {
case status == finished:
case status == finished || !active:
if !active {
log.V(2).Info("Workload will not be queued because the workload is not active", "workload", klog.KObj(wl))
}
// The workload could have been in the queues if we missed an event.
r.queues.DeleteWorkload(wl)

Expand Down
12 changes: 11 additions & 1 deletion pkg/controller/jobframework/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,17 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, nil
}

// 8. handle job is unsuspended.
// 8. handle workload is deactivated.
if !ptr.Deref(wl.Spec.Active, true) {
workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByDeactivation, "The workload is deactivated")
err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true)
if err != nil {
return ctrl.Result{}, fmt.Errorf("setting eviction: %w", err)
}
return ctrl.Result{}, nil
}

// 9. handle job is unsuspended.
if !workload.IsAdmitted(wl) {
// the job must be suspended if the workload is not yet admitted.
log.V(2).Info("Running job is not admitted by a cluster queue, suspending")
Expand Down
198 changes: 198 additions & 0 deletions pkg/controller/jobs/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,204 @@ func TestReconciler(t *testing.T) {
Obj(),
},
},
"when workload is admitted and spec.active is set to false, the workload's conditions is set to Evicted": {
job: *baseJobWrapper.Clone().
Suspend(false).
Obj(),
wantJob: *baseJobWrapper.Clone().
Suspend(false).
Obj(),
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()).
Admitted(true).
Active(false).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
PodSetUpdates: []kueue.PodSetUpdate{
{
Name: "main",
},
},
}).
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()).
Admitted(true).
Active(false).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: "InactiveWorkload",
Message: "The workload is deactivated",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
PodSetUpdates: []kueue.PodSetUpdate{
{
Name: "main",
},
},
}).
Obj(),
},
},
"when workload is evicted due to spec.active field being false, job gets suspended and quota is unset": {
job: *baseJobWrapper.Clone().
Suspend(false).
Obj(),
wantJob: *baseJobWrapper.Clone().
Suspend(true).
Obj(),
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()).
Admitted(true).
Active(false).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: "InactiveWorkload",
Message: "The workload is deactivated",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
PodSetUpdates: []kueue.PodSetUpdate{
{
Name: "main",
Labels: map[string]string{
"ac-key": "ac-value",
},
},
},
}).
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()).
Admitted(true).
Active(false).
Condition(metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionFalse,
Reason: "NoReservation",
Message: "The workload has no reservation",
}).
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
Reason: "Pending",
Message: "The workload is deactivated",
}).
Condition(metav1.Condition{
Type: kueue.WorkloadEvicted,
Status: metav1.ConditionTrue,
Reason: "InactiveWorkload",
Message: "The workload is deactivated",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
PodSetUpdates: []kueue.PodSetUpdate{
{
Name: "main",
Labels: map[string]string{
"ac-key": "ac-value",
},
},
},
}).
Obj(),
},
},
"when job is initially suspended, the Workload has active=false and it's not admitted, " +
"it should not get an evicted condition, but the job should remain suspended": {
job: *baseJobWrapper.Clone().
Suspend(true).
Obj(),
wantJob: *baseJobWrapper.Clone().
Suspend(true).
Obj(),
workloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()).
Admitted(true).
Active(false).
Queue("foo").
Condition(metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionFalse,
Reason: "NoReservation",
Message: "The workload has no reservation",
}).
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
Reason: "Pending",
Message: "The workload is deactivated",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
PodSetUpdates: []kueue.PodSetUpdate{
{
Name: "main",
Labels: map[string]string{
"ac-key": "ac-value",
},
},
},
}).
Obj(),
},
wantWorkloads: []kueue.Workload{
*utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName).
Finalizers(kueue.ResourceInUseFinalizerName).
PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()).
ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()).
Admitted(true).
Active(false).
Queue("foo").
Condition(metav1.Condition{
Type: kueue.WorkloadAdmitted,
Status: metav1.ConditionFalse,
Reason: "NoReservation",
Message: "The workload has no reservation",
}).
Condition(metav1.Condition{
Type: kueue.WorkloadQuotaReserved,
Status: metav1.ConditionFalse,
Reason: "Pending",
Message: "The workload is deactivated",
}).
AdmissionCheck(kueue.AdmissionCheckState{
Name: "check",
State: kueue.CheckStateReady,
PodSetUpdates: []kueue.PodSetUpdate{
{
Name: "main",
Labels: map[string]string{
"ac-key": "ac-value",
},
},
},
}).
Obj(),
},
},
"when workload is admitted and PodSetUpdates conflict between admission checks on labels, the workload is finished with failure": {
job: *baseJobWrapper.Clone().
Obj(),
Expand Down
5 changes: 5 additions & 0 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ func (w *WorkloadWrapper) Queue(q string) *WorkloadWrapper {
return w
}

func (w *WorkloadWrapper) Active(a bool) *WorkloadWrapper {
w.Spec.Active = ptr.To(a)
return w
}

// ReserveQuota sets workload admission and adds a "QuotaReserved" status condition
func (w *WorkloadWrapper) ReserveQuota(a *kueue.Admission) *WorkloadWrapper {
w.Status.Admission = a
Expand Down
Loading

0 comments on commit 8f3ef1b

Please sign in to comment.