Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix workload would be created and deleted indefinitely #597

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 3 additions & 20 deletions apis/kueue/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package webhooks
import (
"context"

corev1 "k8s.io/api/core/v1"
apivalidation "k8s.io/apimachinery/pkg/api/validation"
metav1validation "k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -31,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
utilapi "sigs.k8s.io/kueue/pkg/util/api"
)

type WorkloadWebhook struct{}
Expand Down Expand Up @@ -62,29 +62,12 @@ func (w *WorkloadWebhook) Default(ctx context.Context, obj runtime.Object) error
}
}
for i := range wl.Spec.PodSets {
podSet := &wl.Spec.PodSets[i]
setContainersDefaults(podSet.Template.Spec.InitContainers)
setContainersDefaults(podSet.Template.Spec.Containers)
wl.Spec.PodSets[i].Template.Spec.Containers = utilapi.SetContainersDefaults(wl.Spec.PodSets[i].Template.Spec.Containers)
wl.Spec.PodSets[i].Template.Spec.InitContainers = utilapi.SetContainersDefaults(wl.Spec.PodSets[i].Template.Spec.InitContainers)
}
return nil
}

func setContainersDefaults(containers []corev1.Container) {
for i := range containers {
c := &containers[i]
if c.Resources.Limits != nil {
if c.Resources.Requests == nil {
c.Resources.Requests = make(corev1.ResourceList)
}
for k, v := range c.Resources.Limits {
if _, exists := c.Resources.Requests[k]; !exists {
c.Resources.Requests[k] = v.DeepCopy()
}
}
}
}
}

// +kubebuilder:webhook:path=/validate-kueue-x-k8s-io-v1beta1-workload,mutating=false,failurePolicy=fail,sideEffects=None,groups=kueue.x-k8s.io,resources=workloads;workloads/status,verbs=create;update,versions=v1beta1,name=vworkload.kb.io,admissionReviewVersions=v1

var _ webhook.CustomValidator = &WorkloadWebhook{}
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/workload/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/workload/jobframework"
utilapi "sigs.k8s.io/kueue/pkg/util/api"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -278,12 +279,12 @@ func (b *BatchJob) EquivalentToWorkload(wl kueue.Workload) bool {

// nodeSelector may change, hence we are not checking for
// equality of the whole job.Spec.Template.Spec.
if !equality.Semantic.DeepEqual(b.Spec.Template.Spec.InitContainers,
wl.Spec.PodSets[0].Template.Spec.InitContainers) {
initContainersDefaulted := utilapi.SetContainersDefaults(b.Spec.Template.Spec.InitContainers)
if !equality.Semantic.DeepEqual(initContainersDefaulted, wl.Spec.PodSets[0].Template.Spec.InitContainers) {
return false
}
return equality.Semantic.DeepEqual(b.Spec.Template.Spec.Containers,
wl.Spec.PodSets[0].Template.Spec.Containers)
containersDefaulted := utilapi.SetContainersDefaults(b.Spec.Template.Spec.Containers)
return equality.Semantic.DeepEqual(containersDefaulted, wl.Spec.PodSets[0].Template.Spec.Containers)
}

func (b *BatchJob) PriorityClass() string {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/workload/jobframework/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func EnsureOneWorkload(ctx context.Context, cli client.Client, req ctrl.Request,
if match == nil && job.EquivalentToWorkload(*w) {
match = w
} else {
log.V(2).Info("job not equal with the workload, will delete the workload")
toDelete = append(toDelete, w)
}
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/util/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ limitations under the License.

package api
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not convinced about putting this function here. But we can revisit once we have the jobs library completed, which will probably use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deal. Didn't find where to place this.


import (
v1 "k8s.io/api/core/v1"
)

const (
maxEventMsgSize = 1024
maxConditionMsgSize = 32 * 1024
Expand All @@ -38,3 +42,35 @@ func truncateMessage(message string, limit int) string {
suffix := " ..."
return message[:limit-len(suffix)] + suffix
}

// SetContainersDefaults will fill up the containers with default values.
// Note that this is aimed to bridge the gap between job and workload,
// E.g. job may have resource limits configured but no requests,
// however Kueue depends on the necessary workload requests for computing,
// so we need a function like this to make up for this.
//
// This is inspired by Kubernetes pod defaulting:
// https://github.com/kubernetes/kubernetes/blob/ab002db78835a94bd19ce4aaa46fb39b4d9c276f/pkg/apis/core/v1/defaults.go#L144
func SetContainersDefaults(containers []v1.Container) []v1.Container {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we support LimitRange later, we can also add here. If the LimitRange changes, workload will be non-equal to job.

// This only happens in tests for containers should always exist.
// We return containers itself here rather than nil or empty slice directly for
// not failing the unit tests or integration tests.
if len(containers) == 0 {
return containers
}
res := make([]v1.Container, len(containers))
for i, c := range containers {
kerthcet marked this conversation as resolved.
Show resolved Hide resolved
if c.Resources.Limits != nil {
if c.Resources.Requests == nil {
c.Resources.Requests = make(v1.ResourceList)
}
for k, v := range c.Resources.Limits {
if _, exists := c.Resources.Requests[k]; !exists {
c.Resources.Requests[k] = v.DeepCopy()
}
}
}
res[i] = c
}
return res
}
106 changes: 106 additions & 0 deletions pkg/util/api/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright 2023 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
"testing"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"

utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
)

func TestSetContainersDefaults(t *testing.T) {
testCases := []struct {
name string
containers []v1.Container
wantContainers []v1.Container
}{
{
name: "container with no resources",
containers: []v1.Container{
*utiltesting.MakeContainer("no-resources").Obj(),
},
wantContainers: []v1.Container{
*utiltesting.MakeContainer("no-resources").Obj(),
},
},
{
name: "container with resource requests only",
containers: []v1.Container{
*utiltesting.MakeContainer("with-requests-only").Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).Obj(),
},
wantContainers: []v1.Container{
*utiltesting.MakeContainer("with-requests-only").Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).Obj(),
},
},
{
name: "container with resource limits only",
containers: []v1.Container{
*utiltesting.MakeContainer("with-limits-only").
Limit(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).
Obj(),
},
wantContainers: []v1.Container{
*utiltesting.MakeContainer("with-limits-only").
Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).
Limit(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).
Obj(),
},
},
{
name: "container with both resource requests and limits, values equal",
containers: []v1.Container{
*utiltesting.MakeContainer("with-requests-and-limits").
Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).
Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}).
Obj(),
},
wantContainers: []v1.Container{
*utiltesting.MakeContainer("with-requests-and-limits").
Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m", v1.ResourceMemory: "200Mi"}).
Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}).
Obj(),
},
},
{
name: "container with both resource requests and limits, values not equal",
containers: []v1.Container{
*utiltesting.MakeContainer("with-requests-and-limits").
Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m", v1.ResourceMemory: "100Mi"}).
Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}).
Obj(),
},
wantContainers: []v1.Container{
*utiltesting.MakeContainer("with-requests-and-limits").
Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m", v1.ResourceMemory: "100Mi"}).
Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}).
Obj(),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
containers := SetContainersDefaults(tc.containers)
if !equality.Semantic.DeepEqual(tc.wantContainers, containers) {
t.Errorf("containers are not semantically equal, expected: %v, got: %v", tc.wantContainers, containers)
}
})
}
}
54 changes: 49 additions & 5 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ func MakeJob(name, ns string) *JobWrapper {
RestartPolicy: "Never",
Containers: []corev1.Container{
{
Name: "c",
Image: "pause",
Command: []string{},
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
Name: "c",
Image: "pause",
Command: []string{},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
},
},
},
NodeSelector: map[string]string{},
Expand Down Expand Up @@ -111,12 +114,18 @@ func (j *JobWrapper) NodeSelector(k, v string) *JobWrapper {
return j
}

// Request adds a resource request to the default container.
// Request adds a resource requests to the default container.
func (j *JobWrapper) Request(r corev1.ResourceName, v string) *JobWrapper {
j.Spec.Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v)
return j
}

// Limit adds a resource limits to the default container.
func (j *JobWrapper) Limit(r corev1.ResourceName, v string) *JobWrapper {
j.Spec.Template.Spec.Containers[0].Resources.Limits[r] = resource.MustParse(v)
return j
}

func (j *JobWrapper) Image(name string, image string, args []string) *JobWrapper {
j.Spec.Template.Spec.Containers[0] = corev1.Container{
Name: name,
Expand Down Expand Up @@ -499,3 +508,38 @@ func (rc *RuntimeClassWrapper) PodOverhead(resources corev1.ResourceList) *Runti
func (rc *RuntimeClassWrapper) Obj() *nodev1.RuntimeClass {
return &rc.RuntimeClass
}

// ContainerWrapper wraps a container.
type ContainerWrapper struct{ corev1.Container }

// Obj returns the inner Container.
func (c *ContainerWrapper) Obj() *corev1.Container {
return &c.Container
}

// Requests sets the container resources requests to the given resource map of requests.
func (c *ContainerWrapper) Requests(reqMap map[corev1.ResourceName]string) *ContainerWrapper {
res := corev1.ResourceList{}
for k, v := range reqMap {
res[k] = resource.MustParse(v)
}
c.Container.Resources.Requests = res
return c
}

// Limit sets the container resource limits to the given resource map.
func (c *ContainerWrapper) Limit(limMap map[corev1.ResourceName]string) *ContainerWrapper {
res := corev1.ResourceList{}
for k, v := range limMap {
res[k] = resource.MustParse(v)
}
c.Container.Resources.Limits = res
return c
}

// MakeContainer creates a wrapper for a Container.
func MakeContainer(name string) *ContainerWrapper {
return &ContainerWrapper{corev1.Container{
Name: name,
}}
}
32 changes: 32 additions & 0 deletions test/integration/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() {
ginkgo.BeforeEach(func() {
fwk = &framework.Framework{
ManagerSetup: managerAndSchedulerSetup(),
WebhookPath: webhookPath,
CRDPath: crdPath,
}
ctx, cfg, k8sClient = fwk.Setup()
Expand Down Expand Up @@ -812,4 +813,35 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() {
return createdProdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(false)))
})

ginkgo.It("Should not re-create workloads even job has no resources requests", func() {
ginkgo.By("creating a job")
job := testing.MakeJob(jobName, jobNamespace).Limit(corev1.ResourceCPU, "2").Queue("queue").Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())

lookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name), Namespace: jobNamespace}
workload := &kueue.Workload{}
gomega.Eventually(func() error {
return k8sClient.Get(ctx, lookupKey, workload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("triggering job reconciling")
job.Annotations = map[string]string{"foo": "bar"}
gomega.Expect(k8sClient.Update(ctx, job)).Should(gomega.Succeed())

jobLookupKey := types.NamespacedName{Name: job.Name, Namespace: jobNamespace}
gomega.Eventually(func() string {
if err := k8sClient.Get(ctx, jobLookupKey, job); err != nil {
return ""
}
return job.Annotations["foo"]
}, util.Timeout, util.Interval).Should(gomega.Equal("bar"))

ginkgo.By("checking the workload not recreated")
newWorkload := &kueue.Workload{}
gomega.Eventually(func() error {
return k8sClient.Get(ctx, lookupKey, newWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Expect(workload.ObjectMeta.UID).Should(gomega.Equal(newWorkload.ObjectMeta.UID))
})
})
3 changes: 3 additions & 0 deletions test/integration/controller/job/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"

config "sigs.k8s.io/kueue/apis/config/v1alpha2"
"sigs.k8s.io/kueue/apis/kueue/webhooks"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core"
Expand Down Expand Up @@ -76,6 +77,8 @@ func managerAndSchedulerSetup(opts ...job.Option) framework.ManagerSetup {
return func(mgr manager.Manager, ctx context.Context) {
err := indexer.Setup(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
failedWebhook, err := webhooks.Setup(mgr)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook)

cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)
Expand Down