Skip to content

Commit

Permalink
Fix workload would be created and deleted indefinitely
Browse files Browse the repository at this point in the history
In #317, we will default workload resource requests to limits
if requests not specified. But this will introduce a bug for in job controller reconciling, job will be judged
as not equal to workload, so workload will be deleted and re-create indefinitely

Signed-off-by: Kante Yin <[email protected]>
  • Loading branch information
kerthcet committed Mar 9, 2023
1 parent e01a5a8 commit 89940d5
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 29 deletions.
23 changes: 3 additions & 20 deletions apis/kueue/webhooks/workload_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package webhooks
import (
"context"

corev1 "k8s.io/api/core/v1"
apivalidation "k8s.io/apimachinery/pkg/api/validation"
metav1validation "k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -31,6 +30,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/webhook"

kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
utilapi "sigs.k8s.io/kueue/pkg/util/api"
)

type WorkloadWebhook struct{}
Expand Down Expand Up @@ -62,29 +62,12 @@ func (w *WorkloadWebhook) Default(ctx context.Context, obj runtime.Object) error
}
}
for i := range wl.Spec.PodSets {
podSet := &wl.Spec.PodSets[i]
setContainersDefaults(podSet.Template.Spec.InitContainers)
setContainersDefaults(podSet.Template.Spec.Containers)
wl.Spec.PodSets[i].Template.Spec.Containers = utilapi.SetContainersDefaults(wl.Spec.PodSets[i].Template.Spec.Containers)
wl.Spec.PodSets[i].Template.Spec.InitContainers = utilapi.SetContainersDefaults(wl.Spec.PodSets[i].Template.Spec.InitContainers)
}
return nil
}

func setContainersDefaults(containers []corev1.Container) {
for i := range containers {
c := &containers[i]
if c.Resources.Limits != nil {
if c.Resources.Requests == nil {
c.Resources.Requests = make(corev1.ResourceList)
}
for k, v := range c.Resources.Limits {
if _, exists := c.Resources.Requests[k]; !exists {
c.Resources.Requests[k] = v.DeepCopy()
}
}
}
}
}

// +kubebuilder:webhook:path=/validate-kueue-x-k8s-io-v1beta1-workload,mutating=false,failurePolicy=fail,sideEffects=None,groups=kueue.x-k8s.io,resources=workloads;workloads/status,verbs=create;update,versions=v1beta1,name=vworkload.kb.io,admissionReviewVersions=v1

var _ webhook.CustomValidator = &WorkloadWebhook{}
Expand Down
9 changes: 5 additions & 4 deletions pkg/controller/workload/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/workload/jobframework"
utilapi "sigs.k8s.io/kueue/pkg/util/api"
"sigs.k8s.io/kueue/pkg/workload"
)

Expand Down Expand Up @@ -278,12 +279,12 @@ func (b *BatchJob) EquivalentToWorkload(wl kueue.Workload) bool {

// nodeSelector may change, hence we are not checking for
// equality of the whole job.Spec.Template.Spec.
if !equality.Semantic.DeepEqual(b.Spec.Template.Spec.InitContainers,
wl.Spec.PodSets[0].Template.Spec.InitContainers) {
initContainersDefaulted := utilapi.SetContainersDefaults(b.Spec.Template.Spec.InitContainers)
if !equality.Semantic.DeepEqual(initContainersDefaulted, wl.Spec.PodSets[0].Template.Spec.InitContainers) {
return false
}
return equality.Semantic.DeepEqual(b.Spec.Template.Spec.Containers,
wl.Spec.PodSets[0].Template.Spec.Containers)
containersDefaulted := utilapi.SetContainersDefaults(b.Spec.Template.Spec.Containers)
return equality.Semantic.DeepEqual(containersDefaulted, wl.Spec.PodSets[0].Template.Spec.Containers)
}

func (b *BatchJob) PriorityClass() string {
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/workload/jobframework/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func EnsureOneWorkload(ctx context.Context, cli client.Client, req ctrl.Request,
if match == nil && job.EquivalentToWorkload(*w) {
match = w
} else {
log.V(2).Info("job not equal with the workload, will delete the workload")
toDelete = append(toDelete, w)
}
}
Expand Down
36 changes: 36 additions & 0 deletions pkg/util/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ limitations under the License.

package api

import (
v1 "k8s.io/api/core/v1"
)

const (
maxEventMsgSize = 1024
maxConditionMsgSize = 32 * 1024
Expand All @@ -38,3 +42,35 @@ func truncateMessage(message string, limit int) string {
suffix := " ..."
return message[:limit-len(suffix)] + suffix
}

// SetContainersDefaults will fill up the containers with default values.
// Note that this is aimed to bridge the gap between job and workload,
// E.g. job may have resource limits configured but no requests,
// however Kueue depends on the necessary workload requests for computing,
// so we need a function like this to make up for this.
//
// This is inspired by Kubernetes pod defaulting:
// https://github.com/kubernetes/kubernetes/blob/ab002db78835a94bd19ce4aaa46fb39b4d9c276f/pkg/apis/core/v1/defaults.go#L144
func SetContainersDefaults(containers []v1.Container) []v1.Container {
// This only happens in tests for containers should always exist.
// We return containers itself here rather than nil or empty slice directly for
// not failing the unit tests or integration tests.
if len(containers) == 0 {
return containers
}
res := make([]v1.Container, len(containers))
for i, c := range containers {
if c.Resources.Limits != nil {
if c.Resources.Requests == nil {
c.Resources.Requests = make(v1.ResourceList)
}
for k, v := range c.Resources.Limits {
if _, exists := c.Resources.Requests[k]; !exists {
c.Resources.Requests[k] = v.DeepCopy()
}
}
}
res[i] = c
}
return res
}
106 changes: 106 additions & 0 deletions pkg/util/api/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
"testing"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"

utiltesting "sigs.k8s.io/kueue/pkg/util/testing"
)

func TestSetContainersDefaults(t *testing.T) {
testCases := []struct {
name string
containers []v1.Container
wantContainers []v1.Container
}{
{
name: "container with no resources",
containers: []v1.Container{
*utiltesting.MakeContainer("no-resources").Obj(),
},
wantContainers: []v1.Container{
*utiltesting.MakeContainer("no-resources").Obj(),
},
},
{
name: "container with resource requests only",
containers: []v1.Container{
*utiltesting.MakeContainer("with-requests-only").Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).Obj(),
},
wantContainers: []v1.Container{
*utiltesting.MakeContainer("with-requests-only").Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).Obj(),
},
},
{
name: "container with resource limits only",
containers: []v1.Container{
*utiltesting.MakeContainer("with-limits-only").
Limit(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).
Obj(),
},
wantContainers: []v1.Container{
*utiltesting.MakeContainer("with-limits-only").
Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).
Limit(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).
Obj(),
},
},
{
name: "container with both resource requests and limits, values equal",
containers: []v1.Container{
*utiltesting.MakeContainer("with-requests-and-limits").
Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m"}).
Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}).
Obj(),
},
wantContainers: []v1.Container{
*utiltesting.MakeContainer("with-requests-and-limits").
Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m", v1.ResourceMemory: "200Mi"}).
Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}).
Obj(),
},
},
{
name: "container with both resource requests and limits, values not equal",
containers: []v1.Container{
*utiltesting.MakeContainer("with-requests-and-limits").
Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m", v1.ResourceMemory: "100Mi"}).
Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}).
Obj(),
},
wantContainers: []v1.Container{
*utiltesting.MakeContainer("with-requests-and-limits").
Requests(map[v1.ResourceName]string{v1.ResourceCPU: "100m", v1.ResourceMemory: "100Mi"}).
Limit(map[v1.ResourceName]string{v1.ResourceMemory: "200Mi"}).
Obj(),
},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
containers := SetContainersDefaults(tc.containers)
if !equality.Semantic.DeepEqual(tc.wantContainers, containers) {
t.Errorf("containers are not semantically equal, expected: %v, got: %v", tc.wantContainers, containers)
}
})
}
}
54 changes: 49 additions & 5 deletions pkg/util/testing/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,13 @@ func MakeJob(name, ns string) *JobWrapper {
RestartPolicy: "Never",
Containers: []corev1.Container{
{
Name: "c",
Image: "pause",
Command: []string{},
Resources: corev1.ResourceRequirements{Requests: corev1.ResourceList{}},
Name: "c",
Image: "pause",
Command: []string{},
Resources: corev1.ResourceRequirements{
Requests: corev1.ResourceList{},
Limits: corev1.ResourceList{},
},
},
},
NodeSelector: map[string]string{},
Expand Down Expand Up @@ -111,12 +114,18 @@ func (j *JobWrapper) NodeSelector(k, v string) *JobWrapper {
return j
}

// Request adds a resource request to the default container.
// Request adds a resource requests to the default container.
func (j *JobWrapper) Request(r corev1.ResourceName, v string) *JobWrapper {
j.Spec.Template.Spec.Containers[0].Resources.Requests[r] = resource.MustParse(v)
return j
}

// Limit adds a resource limits to the default container.
func (j *JobWrapper) Limit(r corev1.ResourceName, v string) *JobWrapper {
j.Spec.Template.Spec.Containers[0].Resources.Limits[r] = resource.MustParse(v)
return j
}

func (j *JobWrapper) Image(name string, image string, args []string) *JobWrapper {
j.Spec.Template.Spec.Containers[0] = corev1.Container{
Name: name,
Expand Down Expand Up @@ -499,3 +508,38 @@ func (rc *RuntimeClassWrapper) PodOverhead(resources corev1.ResourceList) *Runti
func (rc *RuntimeClassWrapper) Obj() *nodev1.RuntimeClass {
return &rc.RuntimeClass
}

// ContainerWrapper wraps a container.
type ContainerWrapper struct{ corev1.Container }

// Obj returns the inner Container.
func (c *ContainerWrapper) Obj() *corev1.Container {
return &c.Container
}

// Requests sets the container resources requests to the given resource map of requests.
func (c *ContainerWrapper) Requests(reqMap map[corev1.ResourceName]string) *ContainerWrapper {
res := corev1.ResourceList{}
for k, v := range reqMap {
res[k] = resource.MustParse(v)
}
c.Container.Resources.Requests = res
return c
}

// Limit sets the container resource limits to the given resource map.
func (c *ContainerWrapper) Limit(limMap map[corev1.ResourceName]string) *ContainerWrapper {
res := corev1.ResourceList{}
for k, v := range limMap {
res[k] = resource.MustParse(v)
}
c.Container.Resources.Limits = res
return c
}

// MakeContainer creates a wrapper for a Container.
func MakeContainer(name string) *ContainerWrapper {
return &ContainerWrapper{corev1.Container{
Name: name,
}}
}
32 changes: 32 additions & 0 deletions test/integration/controller/job/job_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() {
ginkgo.BeforeEach(func() {
fwk = &framework.Framework{
ManagerSetup: managerAndSchedulerSetup(),
WebhookPath: webhookPath,
CRDPath: crdPath,
}
ctx, cfg, k8sClient = fwk.Setup()
Expand Down Expand Up @@ -812,4 +813,35 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", func() {
return createdProdJob.Spec.Suspend
}, util.Timeout, util.Interval).Should(gomega.Equal(pointer.Bool(false)))
})

ginkgo.It("Should not re-create workloads even job has no resources requests", func() {
ginkgo.By("creating a job")
job := testing.MakeJob(jobName, jobNamespace).Limit(corev1.ResourceCPU, "2").Queue("queue").Obj()
gomega.Expect(k8sClient.Create(ctx, job)).Should(gomega.Succeed())

lookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name), Namespace: jobNamespace}
workload := &kueue.Workload{}
gomega.Eventually(func() error {
return k8sClient.Get(ctx, lookupKey, workload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())

ginkgo.By("triggering job reconciling")
job.Annotations = map[string]string{"foo": "bar"}
gomega.Expect(k8sClient.Update(ctx, job)).Should(gomega.Succeed())

jobLookupKey := types.NamespacedName{Name: job.Name, Namespace: jobNamespace}
gomega.Eventually(func() string {
if err := k8sClient.Get(ctx, jobLookupKey, job); err != nil {
return ""
}
return job.Annotations["foo"]
}, util.Timeout, util.Interval).Should(gomega.Equal("bar"))

ginkgo.By("checking the workload not recreated")
newWorkload := &kueue.Workload{}
gomega.Eventually(func() error {
return k8sClient.Get(ctx, lookupKey, newWorkload)
}, util.Timeout, util.Interval).Should(gomega.Succeed())
gomega.Expect(workload.ObjectMeta.UID).Should(gomega.Equal(newWorkload.ObjectMeta.UID))
})
})
3 changes: 3 additions & 0 deletions test/integration/controller/job/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"

config "sigs.k8s.io/kueue/apis/config/v1alpha2"
"sigs.k8s.io/kueue/apis/kueue/webhooks"
"sigs.k8s.io/kueue/pkg/cache"
"sigs.k8s.io/kueue/pkg/constants"
"sigs.k8s.io/kueue/pkg/controller/core"
Expand Down Expand Up @@ -76,6 +77,8 @@ func managerAndSchedulerSetup(opts ...job.Option) framework.ManagerSetup {
return func(mgr manager.Manager, ctx context.Context) {
err := indexer.Setup(ctx, mgr.GetFieldIndexer())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
failedWebhook, err := webhooks.Setup(mgr)
gomega.Expect(err).ToNot(gomega.HaveOccurred(), "webhook", failedWebhook)

cCache := cache.New(mgr.GetClient())
queues := queue.NewManager(mgr.GetClient(), cCache)
Expand Down

0 comments on commit 89940d5

Please sign in to comment.