diff --git a/apis/kueue/webhooks/workload_webhook.go b/apis/kueue/webhooks/workload_webhook.go index ff8137201d..b43c996f3d 100644 --- a/apis/kueue/webhooks/workload_webhook.go +++ b/apis/kueue/webhooks/workload_webhook.go @@ -19,7 +19,6 @@ package webhooks import ( "context" - corev1 "k8s.io/api/core/v1" apivalidation "k8s.io/apimachinery/pkg/api/validation" metav1validation "k8s.io/apimachinery/pkg/apis/meta/v1/validation" "k8s.io/apimachinery/pkg/runtime" @@ -31,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/webhook" kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" + utilapi "sigs.k8s.io/kueue/pkg/util/api" ) type WorkloadWebhook struct{} @@ -62,29 +62,12 @@ func (w *WorkloadWebhook) Default(ctx context.Context, obj runtime.Object) error } } for i := range wl.Spec.PodSets { - podSet := &wl.Spec.PodSets[i] - setContainersDefaults(podSet.Template.Spec.InitContainers) - setContainersDefaults(podSet.Template.Spec.Containers) + wl.Spec.PodSets[i].Template.Spec.Containers = utilapi.SetContainersDefaults(wl.Spec.PodSets[i].Template.Spec.Containers) + wl.Spec.PodSets[i].Template.Spec.InitContainers = utilapi.SetContainersDefaults(wl.Spec.PodSets[i].Template.Spec.InitContainers) } return nil } -func setContainersDefaults(containers []corev1.Container) { - for i := range containers { - c := &containers[i] - if c.Resources.Limits != nil { - if c.Resources.Requests == nil { - c.Resources.Requests = make(corev1.ResourceList) - } - for k, v := range c.Resources.Limits { - if _, exists := c.Resources.Requests[k]; !exists { - c.Resources.Requests[k] = v.DeepCopy() - } - } - } - } -} - // +kubebuilder:webhook:path=/validate-kueue-x-k8s-io-v1beta1-workload,mutating=false,failurePolicy=fail,sideEffects=None,groups=kueue.x-k8s.io,resources=workloads;workloads/status,verbs=create;update,versions=v1beta1,name=vworkload.kb.io,admissionReviewVersions=v1 var _ webhook.CustomValidator = &WorkloadWebhook{} diff --git a/pkg/controller/workload/job/job_controller.go b/pkg/controller/workload/job/job_controller.go index 1d27a5f3ca..7ce801eec6 100644 --- a/pkg/controller/workload/job/job_controller.go +++ b/pkg/controller/workload/job/job_controller.go @@ -40,6 +40,7 @@ import ( kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1" "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/workload/jobframework" + utilapi "sigs.k8s.io/kueue/pkg/util/api" "sigs.k8s.io/kueue/pkg/workload" ) @@ -278,12 +279,12 @@ func (b *BatchJob) EquivalentToWorkload(wl kueue.Workload) bool { // nodeSelector may change, hence we are not checking for // equality of the whole job.Spec.Template.Spec. - if !equality.Semantic.DeepEqual(b.Spec.Template.Spec.InitContainers, - wl.Spec.PodSets[0].Template.Spec.InitContainers) { + initContainersDefaulted := utilapi.SetContainersDefaults(b.Spec.Template.Spec.InitContainers) + if !equality.Semantic.DeepEqual(initContainersDefaulted, wl.Spec.PodSets[0].Template.Spec.InitContainers) { return false } - return equality.Semantic.DeepEqual(b.Spec.Template.Spec.Containers, - wl.Spec.PodSets[0].Template.Spec.Containers) + containersDefaulted := utilapi.SetContainersDefaults(b.Spec.Template.Spec.Containers) + return equality.Semantic.DeepEqual(containersDefaulted, wl.Spec.PodSets[0].Template.Spec.Containers) } func (b *BatchJob) PriorityClass() string { diff --git a/pkg/controller/workload/jobframework/helpers.go b/pkg/controller/workload/jobframework/helpers.go index 257cdec43d..5e09e23de1 100644 --- a/pkg/controller/workload/jobframework/helpers.go +++ b/pkg/controller/workload/jobframework/helpers.go @@ -76,6 +76,7 @@ func EnsureOneWorkload(ctx context.Context, cli client.Client, req ctrl.Request, if match == nil && job.EquivalentToWorkload(*w) { match = w } else { + log.V(2).Info("job not equal with the workload, will delete the workload") toDelete = append(toDelete, w) } } diff --git a/pkg/util/api/api.go b/pkg/util/api/api.go index 7fe3b8a57d..3ff1ebec28 100644 --- a/pkg/util/api/api.go +++ b/pkg/util/api/api.go @@ -16,6 +16,10 @@ limitations under the License. package api +import ( + v1 "k8s.io/api/core/v1" +) + const ( maxEventMsgSize = 1024 maxConditionMsgSize = 32 * 1024 @@ -38,3 +42,35 @@ func truncateMessage(message string, limit int) string { suffix := " ..." return message[:limit-len(suffix)] + suffix } + +// SetContainersDefaults will fill up the containers with default values. +// Note that this is aimed to bridge the gap between job and workload, +// E.g. job may have resource limits configured but no requests, +// however Kueue depends on the necessary workload requests for computing, +// so we need a function like this to make up for this. +// +// This is inspired by Kubernetes pod defaulting: +// https://github.com/kubernetes/kubernetes/blob/ab002db78835a94bd19ce4aaa46fb39b4d9c276f/pkg/apis/core/v1/defaults.go#L144 +func SetContainersDefaults(containers []v1.Container) []v1.Container { + // This only happens in tests for containers should always exist. + // We return containers itself here rather than nil or empty slice directly for + // not failing the unit tests or integration tests. + if len(containers) == 0 { + return containers + } + res := make([]v1.Container, len(containers)) + for i, c := range containers { + if c.Resources.Limits != nil { + if c.Resources.Requests == nil { + c.Resources.Requests = make(v1.ResourceList) + } + for k, v := range c.Resources.Limits { + if _, exists := c.Resources.Requests[k]; !exists { + c.Resources.Requests[k] = v.DeepCopy() + } + } + } + res[i] = c + } + return res +} diff --git a/pkg/util/api/api_test.go b/pkg/util/api/api_test.go new file mode 100644 index 0000000000..52f1ad9e9d --- /dev/null +++ b/pkg/util/api/api_test.go @@ -0,0 +1,106 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "testing" + + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" + + utiltesting "sigs.k8s.io/kueue/pkg/util/testing" +) + +func TestSetContainersDefaults(t *testing.T) { + testCases := []struct { + name string + containers []v1.Container + wantContainers []v1.Container + }{ + { + name: "container with no resources", + containers: []v1.Container{ + *utiltesting.MakeContainer("no-resources").Obj(), + }, + wantContainers: []v1.Container{ + *utiltesting.MakeContainer("no-resources").Obj(), + }, + }, + { + name: "container with resource requests only", + containers: []v1.Container{ + *utiltesting.MakeContainer("with-requests-only").Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).Obj(), + }, + wantContainers: []v1.Container{ + *utiltesting.MakeContainer("with-requests-only").Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).Obj(), + }, + }, + { + name: "container with resource limits only", + containers: []v1.Container{ + *utiltesting.MakeContainer("with-limits-only"). + Limit(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}). + Obj(), + }, + wantContainers: []v1.Container{ + *utiltesting.MakeContainer("with-limits-only"). + Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}). + Limit(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}). + Obj(), + }, + }, + { + name: "container with both resource requests and limits, values equal", + containers: []v1.Container{ + *utiltesting.MakeContainer("with-requests-and-limits"). + Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}). + Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}). + Obj(), + }, + wantContainers: []v1.Container{ + *utiltesting.MakeContainer("with-requests-and-limits"). + Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m", v1.ResourceMemory: "200Mi"}). + Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}). + Obj(), + }, + }, + { + name: "container with both resource requests and limits, values not equal", + containers: []v1.Container{ + *utiltesting.MakeContainer("with-requests-and-limits"). + Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m", v1.ResourceMemory: "100Mi"}). + Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}). + Obj(), + }, + wantContainers: []v1.Container{ + *utiltesting.MakeContainer("with-requests-and-limits"). + Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m", v1.ResourceMemory: "100Mi"}). + Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}). + Obj(), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + containers := SetContainersDefaults(tc.containers) + if !equality.Semantic.DeepEqual(tc.wantContainers, containers) { + t.Errorf("containers are not semantically equal, expected: %v, got: %v", tc.wantContainers, containers) + } + }) + } +} diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index bc31e81417..97b537a251 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -51,10 +51,13 @@ func MakeJob(name, ns string) *JobWrapper { RestartPolicy: "Never", Containers: []corev1.Container{ { - Name: "c", - Image: "pause", - Command: []string{}, - Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}}, + Name: "c", + Image: "pause", + Command: []string{}, + Resources: corev1.ResourceRequirements{ + Requests: corev1.ResourceList{}, + Limits: corev1.ResourceList{}, + }, }, }, NodeSelector: map[string]string{}, @@ -111,12 +114,18 @@ func (j *JobWrapper) NodeSelector(k, v string) *JobWrapper { return j } -// Request adds a resource request to the default container. +// Request adds a resource requests to the default container. func (j *JobWrapper) Request(r corev1.ResourceName, v string) *JobWrapper { j.Spec.Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v) return j } +// Limit adds a resource limits to the default container. +func (j *JobWrapper) Limit(r corev1.ResourceName, v string) *JobWrapper { + j.Spec.Template.Spec.Containers[0].Resources.Limits[r] = resource.MustParse(v) + return j +} + func (j *JobWrapper) Image(name string, image string, args []string) *JobWrapper { j.Spec.Template.Spec.Containers[0] = corev1.Container{ Name: name, @@ -499,3 +508,38 @@ func (rc *RuntimeClassWrapper) PodOverhead(resources corev1.ResourceList) *Runti func (rc *RuntimeClassWrapper) Obj() *nodev1.RuntimeClass { return &rc.RuntimeClass } + +// ContainerWrapper wraps a container. +type ContainerWrapper struct{ corev1.Container } + +// Obj returns the inner Container. +func (c *ContainerWrapper) Obj() *corev1.Container { + return &c.Container +} + +// Requests sets the container resources requests to the given resource map of requests. +func (c *ContainerWrapper) Requests(reqMap map[corev1.ResourceName]string) *ContainerWrapper { + res := corev1.ResourceList{} + for k, v := range reqMap { + res[k] = resource.MustParse(v) + } + c.Container.Resources.Requests = res + return c +} + +// Limit sets the container resource limits to the given resource map. +func (c *ContainerWrapper) Limit(limMap map[corev1.ResourceName]string) *ContainerWrapper { + res := corev1.ResourceList{} + for k, v := range limMap { + res[k] = resource.MustParse(v) + } + c.Container.Resources.Limits = res + return c +} + +// MakeContainer creates a wrapper for a Container. +func MakeContainer(name string) *ContainerWrapper { + return &ContainerWrapper{corev1.Container{ + Name: name, + }} +} diff --git a/test/integration/controller/job/job_controller_test.go b/test/integration/controller/job/job_controller_test.go index f298e69def..fdc8cbc562 100644 --- a/test/integration/controller/job/job_controller_test.go +++ b/test/integration/controller/job/job_controller_test.go @@ -657,6 +657,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() { ginkgo.BeforeEach(func() { fwk = &framework.Framework{ ManagerSetup: managerAndSchedulerSetup(), + WebhookPath: webhookPath, CRDPath: crdPath, } ctx, cfg, k8sClient = fwk.Setup() @@ -812,4 +813,35 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() { return createdProdJob.Spec.Suspend }, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(false))) }) + + ginkgo.It("Should not re-create workloads even job has no resources requests", func() { + ginkgo.By("creating a job") + job := testing.MakeJob(jobName, jobNamespace).Limit(corev1.ResourceCPU, "2").Queue("queue").Obj() + gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed()) + + lookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name), Namespace: jobNamespace} + workload := &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, workload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + ginkgo.By("triggering job reconciling") + job.Annotations = map[string]string{"foo": "bar"} + gomega.Expect(k8sClient.Update(ctx, job)).Should(gomega.Succeed()) + + jobLookupKey := types.NamespacedName{Name: job.Name, Namespace: jobNamespace} + gomega.Eventually(func() string { + if err := k8sClient.Get(ctx, jobLookupKey, job); err != nil { + return "" + } + return job.Annotations["foo"] + }, util.Timeout, util.Interval).Should(gomega.Equal("bar")) + + ginkgo.By("checking the workload not recreated") + newWorkload := &kueue.Workload{} + gomega.Eventually(func() error { + return k8sClient.Get(ctx, lookupKey, newWorkload) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + gomega.Expect(workload.ObjectMeta.UID).Should(gomega.Equal(newWorkload.ObjectMeta.UID)) + }) }) diff --git a/test/integration/controller/job/suite_test.go b/test/integration/controller/job/suite_test.go index e3ac74b63f..8d9b3dc9ef 100644 --- a/test/integration/controller/job/suite_test.go +++ b/test/integration/controller/job/suite_test.go @@ -28,6 +28,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" config "sigs.k8s.io/kueue/apis/config/v1alpha2" + "sigs.k8s.io/kueue/apis/kueue/webhooks" "sigs.k8s.io/kueue/pkg/cache" "sigs.k8s.io/kueue/pkg/constants" "sigs.k8s.io/kueue/pkg/controller/core" @@ -76,6 +77,8 @@ func managerAndSchedulerSetup(opts ...job.Option) framework.ManagerSetup { return func(mgr manager.Manager, ctx context.Context) { err := indexer.Setup(ctx, mgr.GetFieldIndexer()) gomega.Expect(err).NotTo(gomega.HaveOccurred()) + failedWebhook, err := webhooks.Setup(mgr) + gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook) cCache := cache.New(mgr.GetClient()) queues := queue.NewManager(mgr.GetClient(), cCache)