Skip to content

Commit

Permalink
Merge pull request #644 from mimowo/fix-suspend
Browse files Browse the repository at this point in the history
Allow to update JobSets on suspend
  • Loading branch information
k8s-ci-robot authored Aug 9, 2024
2 parents c67e998 + c89e112 commit 8bade1e
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 5 deletions.
4 changes: 2 additions & 2 deletions pkg/webhooks/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,9 +314,9 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime.
}
mungedSpec := js.Spec.DeepCopy()

// Allow pod template to be mutated for suspended JobSets.
// Allow pod template to be mutated for suspended JobSets, or JobSets getting suspended.
// This is needed for integration with Kueue/DWS.
if ptr.Deref(oldJS.Spec.Suspend, false) {
if ptr.Deref(oldJS.Spec.Suspend, false) || ptr.Deref(js.Spec.Suspend, false) {
for index := range js.Spec.ReplicatedJobs {
// Pod values which must be mutable for Kueue are defined here: https://github.com/kubernetes-sigs/kueue/blob/a50d395c36a2cb3965be5232162cf1fded1bdb08/apis/kueue/v1beta1/workload_types.go#L256-L260
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Annotations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Annotations
Expand Down
43 changes: 43 additions & 0 deletions pkg/webhooks/jobset_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,49 @@ func TestValidateUpdate(t *testing.T) {
},
},
},
{
name: "replicated job pod template can be updated for jobset getting suspended",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Suspend: ptr.To(true),
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
// Adding an annotation.
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"key": "value"},
},
},
},
},
},
},
},
},
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Suspend: ptr.To(false),
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
},
},
},
},
{
name: "replicated job pod template cannot be updated for running jobset",
js: &jobset.JobSet{
Expand Down
73 changes: 70 additions & 3 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
"sigs.k8s.io/jobset/pkg/util/testing"
Expand Down Expand Up @@ -112,7 +113,7 @@ var _ = ginkgo.Describe("JobSet", func() {
// Create JobSet.
testFinalizer := "fake.example.com/blockDeletion"
ginkgo.By("creating jobset with ttl seconds after finished")
js := sleepTestJobSet(ns).Finalizers([]string{testFinalizer}).TTLSecondsAfterFinished(5).Obj()
js := sleepTestJobSet(ns, 20).Finalizers([]string{testFinalizer}).TTLSecondsAfterFinished(5).Obj()

// Verify jobset created successfully.
ginkgo.By("checking that jobset creation succeeds")
Expand All @@ -131,6 +132,72 @@ var _ = ginkgo.Describe("JobSet", func() {
})
})

// This test is added to test the JobSet transitions as Kueue would when:
// doing: resume in ResourceFlavor1 -> suspend -> resume in ResourceFlavor2.
// In particular, Kueue updates the PodTemplate on suspending and resuming
// the JobSet.
ginkgo.When("JobSet is suspended and resumed", func() {

ginkgo.It("should allow to resume JobSet after updating PodTemplate", func() {
ctx := context.Background()
js := sleepTestJobSet(ns, 1).Obj()
jsKey := types.NamespacedName{Name: js.Name, Namespace: js.Namespace}

ginkgo.By("Create a suspended JobSet", func() {
js.Spec.Suspend = ptr.To(true)
gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed())
})

ginkgo.By("Unsuspend the JobSet setting nodeSelectors that prevent pods from being scheduled", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(false)
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
if podTemplate.Spec.NodeSelector == nil {
podTemplate.Spec.NodeSelector = make(map[string]string)
}
podTemplate.Spec.NodeSelector["kubernetes.io/os"] = "non-existing-os"
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for all Jobs to be active", func() {
// In this step the Pods remain Pending due to the nodeSelector
// which does not match any nodes. Still, JobSet considers as
// active any Jobs which have at least one Pending or Running Pod.
gomega.Eventually(func() int32 {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
if js.Status.ReplicatedJobsStatus == nil {
return 0
}
return js.Status.ReplicatedJobsStatus[0].Active
}, timeout, interval).Should(gomega.Equal(js.Spec.ReplicatedJobs[0].Replicas))
})

ginkgo.By("Suspend the JobSet updating the PodTemplate properties", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(true)
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
podTemplate.Spec.NodeSelector["kubernetes.io/os"] = "linux"
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Unsuspending the JobSet again with PodTemplate allowing completion", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(false)
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for the JobSet to complete successfully", func() {
util.JobSetCompleted(ctx, k8sClient, js, timeout)
})
})
})

}) // end of Describe

// getPingCommand returns ping command for 4 hostnames
Expand Down Expand Up @@ -225,7 +292,7 @@ func pingTestJobSetSubdomain(ns *corev1.Namespace) *testing.JobSetWrapper {
Obj())
}

func sleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper {
func sleepTestJobSet(ns *corev1.Namespace, durationSeconds int32) *testing.JobSetWrapper {
jsName := "js"
rjobName := "rjob"
replicas := 4
Expand All @@ -239,7 +306,7 @@ func sleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper {
Name: "sleep-test-container",
Image: "bash:latest",
Command: []string{"bash", "-c"},
Args: []string{"sleep 20"},
Args: []string{fmt.Sprintf("sleep %d", durationSeconds)},
},
},
}).Obj()).
Expand Down

0 comments on commit 8bade1e

Please sign in to comment.