From 8f3ef1ba0483ede3a1ac5c7bc952eb0f4ebf844e Mon Sep 17 00:00:00 2001 From: Vicente Ferrara Brondo Date: Tue, 24 Oct 2023 14:18:14 -0700 Subject: [PATCH] Suspendable jobs feature removes evicted condition status update, removes job_controller unit tests, adds pkg job unit tests updated workload_controller log added StrictFIFO queueing strategy in e2e test while creating a second job when the first one is suspended changed behaviour in reconciler to allow kueue suspend a job directly from the workload signal, updated integration and job test, still missing to update the field name changing implementation to use active and not suspending the job manually updated yaml file addressed comments, removed log lines, used step 6 to handle workload eviction, fixed Active field to be a pointer, fixed e2e tests to use on Consistently method, still missing to make one unit test case work updated unit tests and integration tests with new implementation to evict a workload based on spec.active field added crd charts yaml added workloadspec.go file updated e2e and unit tests, missing to rebase commits added zz generated file addressed comments, moved e2e test to happen inside integration tests nit comments addressed nit comments addressed final nit comments addressed and changed Eviction constant to be WorkloadEvictedByDeactivation deleted by accident a comment, reverted updated workload types comment --- apis/kueue/v1beta1/workload_types.go | 15 ++ apis/kueue/v1beta1/zz_generated.deepcopy.go | 5 + .../crd/kueue.x-k8s.io_workloads.yaml | 9 + .../kueue/v1beta1/workloadspec.go | 9 + .../crd/bases/kueue.x-k8s.io_workloads.yaml | 9 + pkg/controller/core/workload_controller.go | 7 +- pkg/controller/jobframework/reconciler.go | 12 +- .../jobs/job/job_controller_test.go | 198 ++++++++++++++++++ pkg/util/testing/wrappers.go | 5 + .../jobs/job/job_controller_test.go | 86 ++++++++ 10 files changed, 353 insertions(+), 2 deletions(-) diff --git a/apis/kueue/v1beta1/workload_types.go b/apis/kueue/v1beta1/workload_types.go index 798cb55b5c..fca411c02d 100644 --- a/apis/kueue/v1beta1/workload_types.go +++ b/apis/kueue/v1beta1/workload_types.go @@ -59,6 +59,17 @@ type WorkloadSpec struct { // +kubebuilder:default="" // +kubebuilder:validation:Enum=kueue.x-k8s.io/workloadpriorityclass;scheduling.k8s.io/priorityclass;"" PriorityClassSource string `json:"priorityClassSource,omitempty"` + + // Active determines if a workload can be admitted into a queue. + // Changing active from true to false will evict any running workloads. + // Possible values are: + // + // - false: indicates that a workload should never be admitted and evicts running workloads + // - true: indicates that a workload can be evaluated for admission into it's respective queue. + // + // Defaults to true + // +kubebuilder:default=true + Active *bool `json:"active,omitempty"` } type Admission struct { @@ -267,6 +278,10 @@ const ( // WorkloadEvictedByAdmissionCheck indicates that the workload was evicted // beacuse at least one admission check transitioned to False. WorkloadEvictedByAdmissionCheck = "AdmissionCheck" + + // WorkloadEvictedByDeactivation indicates that the workload was evicted + // because spec.active is set to false. + WorkloadEvictedByDeactivation = "InactiveWorkload" ) // +genclient diff --git a/apis/kueue/v1beta1/zz_generated.deepcopy.go b/apis/kueue/v1beta1/zz_generated.deepcopy.go index 18eb9579fd..a0ad6a529a 100644 --- a/apis/kueue/v1beta1/zz_generated.deepcopy.go +++ b/apis/kueue/v1beta1/zz_generated.deepcopy.go @@ -1081,6 +1081,11 @@ func (in *WorkloadSpec) DeepCopyInto(out *WorkloadSpec) { *out = new(int32) **out = **in } + if in.Active != nil { + in, out := &in.Active, &out.Active + *out = new(bool) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkloadSpec. diff --git a/charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml b/charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml index 158f697309..25f791af83 100644 --- a/charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml +++ b/charts/kueue/templates/crd/kueue.x-k8s.io_workloads.yaml @@ -62,6 +62,15 @@ spec: spec: description: WorkloadSpec defines the desired state of Workload properties: + active: + default: true + description: "Active determines if a workload can be admitted into + a queue. Changing active from true to false will evict any running + workloads. Possible values are: \n - false: indicates that a workload + should never be admitted and evicts running workloads - true: indicates + that a workload can be evaluated for admission into it's respective + queue. \n Defaults to true" + type: boolean podSets: description: podSets is a list of sets of homogeneous pods, each described by a Pod spec and a count. There must be at least one element and diff --git a/client-go/applyconfiguration/kueue/v1beta1/workloadspec.go b/client-go/applyconfiguration/kueue/v1beta1/workloadspec.go index 04a855b67c..685da6adc4 100644 --- a/client-go/applyconfiguration/kueue/v1beta1/workloadspec.go +++ b/client-go/applyconfiguration/kueue/v1beta1/workloadspec.go @@ -25,6 +25,7 @@ type WorkloadSpecApplyConfiguration struct { PriorityClassName *string `json:"priorityClassName,omitempty"` Priority *int32 `json:"priority,omitempty"` PriorityClassSource *string `json:"priorityClassSource,omitempty"` + Active *bool `json:"active,omitempty"` } // WorkloadSpecApplyConfiguration constructs an declarative configuration of the WorkloadSpec type for use with @@ -77,3 +78,11 @@ func (b *WorkloadSpecApplyConfiguration) WithPriorityClassSource(value string) * b.PriorityClassSource = &value return b } + +// WithActive sets the Active field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the Active field is set to the value of the last call. +func (b *WorkloadSpecApplyConfiguration) WithActive(value bool) *WorkloadSpecApplyConfiguration { + b.Active = &value + return b +} diff --git a/config/components/crd/bases/kueue.x-k8s.io_workloads.yaml b/config/components/crd/bases/kueue.x-k8s.io_workloads.yaml index ba84762856..0d8e04cbeb 100644 --- a/config/components/crd/bases/kueue.x-k8s.io_workloads.yaml +++ b/config/components/crd/bases/kueue.x-k8s.io_workloads.yaml @@ -49,6 +49,15 @@ spec: spec: description: WorkloadSpec defines the desired state of Workload properties: + active: + default: true + description: "Active determines if a workload can be admitted into + a queue. Changing active from true to false will evict any running + workloads. Possible values are: \n - false: indicates that a workload + should never be admitted and evicts running workloads - true: indicates + that a workload can be evaluated for admission into it's respective + queue. \n Defaults to true" + type: boolean podSets: description: podSets is a list of sets of homogeneous pods, each described by a Pod spec and a count. There must be at least one element and diff --git a/pkg/controller/core/workload_controller.go b/pkg/controller/core/workload_controller.go index 0ea87583ee..949d7d663a 100644 --- a/pkg/controller/core/workload_controller.go +++ b/pkg/controller/core/workload_controller.go @@ -32,6 +32,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/utils/clock" + "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" @@ -376,6 +377,7 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { status := workloadStatus(wl) log := r.log.WithValues("workload", klog.KObj(wl), "queue", wl.Spec.QueueName, "status", status) ctx := ctrl.LoggerInto(context.Background(), log) + active := ptr.Deref(wl.Spec.Active, true) prevQueue := oldWl.Spec.QueueName if prevQueue != wl.Spec.QueueName { @@ -398,7 +400,10 @@ func (r *WorkloadReconciler) Update(e event.UpdateEvent) bool { workload.AdjustResources(ctrl.LoggerInto(ctx, log), r.client, wlCopy) switch { - case status == finished: + case status == finished || !active: + if !active { + log.V(2).Info("Workload will not be queued because the workload is not active", "workload", klog.KObj(wl)) + } // The workload could have been in the queues if we missed an event. r.queues.DeleteWorkload(wl) diff --git a/pkg/controller/jobframework/reconciler.go b/pkg/controller/jobframework/reconciler.go index c1ac22312b..4a6e71d308 100644 --- a/pkg/controller/jobframework/reconciler.go +++ b/pkg/controller/jobframework/reconciler.go @@ -363,7 +363,17 @@ func (r *JobReconciler) ReconcileGenericJob(ctx context.Context, req ctrl.Reques return ctrl.Result{}, nil } - // 8. handle job is unsuspended. + // 8. handle workload is deactivated. + if !ptr.Deref(wl.Spec.Active, true) { + workload.SetEvictedCondition(wl, kueue.WorkloadEvictedByDeactivation, "The workload is deactivated") + err := workload.ApplyAdmissionStatus(ctx, r.client, wl, true) + if err != nil { + return ctrl.Result{}, fmt.Errorf("setting eviction: %w", err) + } + return ctrl.Result{}, nil + } + + // 9. handle job is unsuspended. if !workload.IsAdmitted(wl) { // the job must be suspended if the workload is not yet admitted. log.V(2).Info("Running job is not admitted by a cluster queue, suspending") diff --git a/pkg/controller/jobs/job/job_controller_test.go b/pkg/controller/jobs/job/job_controller_test.go index ca4b02fd85..b10209e721 100644 --- a/pkg/controller/jobs/job/job_controller_test.go +++ b/pkg/controller/jobs/job/job_controller_test.go @@ -450,6 +450,204 @@ func TestReconciler(t *testing.T) { Obj(), }, }, + "when workload is admitted and spec.active is set to false, the workload's conditions is set to Evicted": { + job: *baseJobWrapper.Clone(). + Suspend(false). + Obj(), + wantJob: *baseJobWrapper.Clone(). + Suspend(false). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()). + Admitted(true). + Active(false). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "check", + State: kueue.CheckStateReady, + PodSetUpdates: []kueue.PodSetUpdate{ + { + Name: "main", + }, + }, + }). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()). + Admitted(true). + Active(false). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + Reason: "InactiveWorkload", + Message: "The workload is deactivated", + }). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "check", + State: kueue.CheckStateReady, + PodSetUpdates: []kueue.PodSetUpdate{ + { + Name: "main", + }, + }, + }). + Obj(), + }, + }, + "when workload is evicted due to spec.active field being false, job gets suspended and quota is unset": { + job: *baseJobWrapper.Clone(). + Suspend(false). + Obj(), + wantJob: *baseJobWrapper.Clone(). + Suspend(true). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()). + Admitted(true). + Active(false). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + Reason: "InactiveWorkload", + Message: "The workload is deactivated", + }). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "check", + State: kueue.CheckStateReady, + PodSetUpdates: []kueue.PodSetUpdate{ + { + Name: "main", + Labels: map[string]string{ + "ac-key": "ac-value", + }, + }, + }, + }). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()). + Admitted(true). + Active(false). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "NoReservation", + Message: "The workload has no reservation", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "Pending", + Message: "The workload is deactivated", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadEvicted, + Status: metav1.ConditionTrue, + Reason: "InactiveWorkload", + Message: "The workload is deactivated", + }). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "check", + State: kueue.CheckStateReady, + PodSetUpdates: []kueue.PodSetUpdate{ + { + Name: "main", + Labels: map[string]string{ + "ac-key": "ac-value", + }, + }, + }, + }). + Obj(), + }, + }, + "when job is initially suspended, the Workload has active=false and it's not admitted, " + + "it should not get an evicted condition, but the job should remain suspended": { + job: *baseJobWrapper.Clone(). + Suspend(true). + Obj(), + wantJob: *baseJobWrapper.Clone(). + Suspend(true). + Obj(), + workloads: []kueue.Workload{ + *utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()). + Admitted(true). + Active(false). + Queue("foo"). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "NoReservation", + Message: "The workload has no reservation", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "Pending", + Message: "The workload is deactivated", + }). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "check", + State: kueue.CheckStateReady, + PodSetUpdates: []kueue.PodSetUpdate{ + { + Name: "main", + Labels: map[string]string{ + "ac-key": "ac-value", + }, + }, + }, + }). + Obj(), + }, + wantWorkloads: []kueue.Workload{ + *utiltesting.MakeWorkload("a", "ns").Finalizers(kueue.ResourceInUseFinalizerName). + Finalizers(kueue.ResourceInUseFinalizerName). + PodSets(*utiltesting.MakePodSet(kueue.DefaultPodSetName, 10).Request(corev1.ResourceCPU, "1").Obj()). + ReserveQuota(utiltesting.MakeAdmission("cq").AssignmentPodCount(10).Obj()). + Admitted(true). + Active(false). + Queue("foo"). + Condition(metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionFalse, + Reason: "NoReservation", + Message: "The workload has no reservation", + }). + Condition(metav1.Condition{ + Type: kueue.WorkloadQuotaReserved, + Status: metav1.ConditionFalse, + Reason: "Pending", + Message: "The workload is deactivated", + }). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "check", + State: kueue.CheckStateReady, + PodSetUpdates: []kueue.PodSetUpdate{ + { + Name: "main", + Labels: map[string]string{ + "ac-key": "ac-value", + }, + }, + }, + }). + Obj(), + }, + }, "when workload is admitted and PodSetUpdates conflict between admission checks on labels, the workload is finished with failure": { job: *baseJobWrapper.Clone(). Obj(), diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index e3589309d2..559fc4e52f 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -102,6 +102,11 @@ func (w *WorkloadWrapper) Queue(q string) *WorkloadWrapper { return w } +func (w *WorkloadWrapper) Active(a bool) *WorkloadWrapper { + w.Spec.Active = ptr.To(a) + return w +} + // ReserveQuota sets workload admission and adds a "QuotaReserved" status condition func (w *WorkloadWrapper) ReserveQuota(a *kueue.Admission) *WorkloadWrapper { w.Status.Admission = a diff --git a/test/integration/controller/jobs/job/job_controller_test.go b/test/integration/controller/jobs/job/job_controller_test.go index 287ddd9054..20cfbe437a 100644 --- a/test/integration/controller/jobs/job/job_controller_test.go +++ b/test/integration/controller/jobs/job/job_controller_test.go @@ -1596,6 +1596,92 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde gomega.Expect(createdWorkload.CreationTimestamp).Should(gomega.Equal(createdTime)) }) }) + + ginkgo.When("Suspend a running Job without requeueing through Workload's spec.active field", func() { + ginkgo.It("Should not readmit a job to the queue after Active is changed to false", func() { + ginkgo.By("creating localQueue") + localQueue := testing.MakeLocalQueue("queue", ns.Name).ClusterQueue(prodClusterQ.Name).Obj() + gomega.Expect(k8sClient.Create(ctx, localQueue)).Should(gomega.Succeed()) + + sampleJob := testingjob.MakeJob("job1", ns.Name).Queue(localQueue.Name).Request(corev1.ResourceCPU, "2").Obj() + lookupKey1 := types.NamespacedName{Name: sampleJob.Name, Namespace: sampleJob.Namespace} + wll := &kueue.Workload{} + + ginkgo.By("checking the job starts") + gomega.Expect(k8sClient.Create(ctx, sampleJob)).Should(gomega.Succeed()) + + createdJob := &batchv1.Job{} + wlKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(sampleJob.Name), Namespace: sampleJob.Namespace} + + gomega.Eventually(func(g gomega.Gomega) { + ginkgo.By("checking the job's suspend field is false") + g.Expect(k8sClient.Get(ctx, lookupKey1, sampleJob)).Should(gomega.Succeed()) + g.Expect(sampleJob.Spec.Suspend).To(gomega.Equal(ptr.To(false))) + ginkgo.By("checking the workload is admitted") + g.Expect(k8sClient.Get(ctx, wlKey, wll)).Should(gomega.Succeed()) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wll) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("Change the Active field to suspend the job and check the job remains suspended and the workload unadmitted") + // Changing Active to false + wll.Spec.Active = ptr.To(false) + gomega.Expect(k8sClient.Update(ctx, wll)).Should(gomega.Succeed()) + + ginkgo.By("checking a second job starts after first job is suspended") + sampleJob2 := testingjob.MakeJob("job2", ns.Name).Queue(localQueue.Name).Request(corev1.ResourceCPU, "2").Obj() + + lookupKey2 := types.NamespacedName{Name: sampleJob2.Name, Namespace: sampleJob2.Namespace} + wll2 := &kueue.Workload{} + + gomega.Expect(k8sClient.Create(ctx, sampleJob2)).Should(gomega.Succeed()) + wlKey2 := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(sampleJob2.Name), Namespace: sampleJob2.Namespace} + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, lookupKey2, sampleJob2)).Should(gomega.Succeed()) + g.Expect(sampleJob2.Spec.Suspend).To(gomega.Equal(ptr.To(false))) + g.Expect(k8sClient.Get(ctx, wlKey2, wll2)).Should(gomega.Succeed()) + util.ExpectWorkloadsToBeAdmitted(ctx, k8sClient, wll2) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + // Checking job stays suspended + ginkgo.By("checking job is suspended") + gomega.Eventually(func() *bool { + gomega.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: sampleJob.Name, Namespace: sampleJob.Namespace}, createdJob)). + Should(gomega.Succeed()) + return createdJob.Spec.Suspend + }, util.Timeout, util.Interval).Should(gomega.Equal(ptr.To(true))) + + ginkgo.By("checking the first job and workload stay suspended and unadmitted") + gomega.Consistently(func(g gomega.Gomega) { + // Job should stay pending + g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: sampleJob.Name, Namespace: sampleJob.Namespace}, createdJob)). + Should(gomega.Succeed()) + g.Expect(createdJob.Spec.Suspend).To(gomega.Equal(ptr.To(true))) + // Workload should get unadmitted + g.Expect(k8sClient.Get(ctx, wlKey, wll)).Should(gomega.Succeed()) + util.ExpectWorkloadsToBePending(ctx, k8sClient, wll) + // Workload should stay pending + g.Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(wll), wll)).Should(gomega.Succeed()) + // Should have Evicted condition + isEvicting := apimeta.IsStatusConditionTrue(wll.Status.Conditions, kueue.WorkloadEvicted) + gomega.Expect(isEvicting).Should(gomega.BeTrue()) + }, util.ConsistentDuration, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("checking the first job becomes unsuspended after we update the Active field back to true") + gomega.Eventually(func() error { + gomega.Expect(k8sClient.Get(ctx, wlKey, wll)).Should(gomega.Succeed()) + wll.Spec.Active = ptr.To(true) + return k8sClient.Update(ctx, wll) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(k8sClient.Get(ctx, types.NamespacedName{Name: sampleJob.Name, Namespace: sampleJob.Namespace}, createdJob)). + Should(gomega.Succeed()) + g.Expect(sampleJob.Spec.Suspend).To(gomega.Equal(ptr.To(false))) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + }) + }) }) func expectJobUnsuspendedWithNodeSelectors(key types.NamespacedName, ns map[string]string) {