Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix resuming Jobset after restoring PodTemplate (by deleting Jobs) #625

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,10 +206,11 @@ func (r *JobSetReconciler) reconcile(ctx context.Context, js *jobset.JobSet, upd
// Handle suspending a jobset or resuming a suspended jobset.
jobsetSuspended := jobSetSuspended(js)
if jobsetSuspended {
if err := r.suspendJobs(ctx, js, ownedJobs.active, updateStatusOpts); err != nil {
log.Error(err, "suspending jobset")
if err := r.deleteJobs(ctx, ownedJobs.active); err != nil {
mimowo marked this conversation as resolved.
Show resolved Hide resolved
log.Error(err, "deleting jobs")
return ctrl.Result{}, err
}
setJobSetSuspendedCondition(js, updateStatusOpts)
} else {
if err := r.resumeJobsIfNecessary(ctx, js, ownedJobs.active, rjobStatuses, updateStatusOpts); err != nil {
log.Error(err, "resuming jobset")
Expand Down Expand Up @@ -379,19 +380,6 @@ func (r *JobSetReconciler) calculateReplicatedJobStatuses(ctx context.Context, j
return rjStatus
}

func (r *JobSetReconciler) suspendJobs(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, updateStatusOpts *statusUpdateOpts) error {
for _, job := range activeJobs {
if !jobSuspended(job) {
job.Spec.Suspend = ptr.To(true)
if err := r.Update(ctx, job); err != nil {
return err
}
}
}
setJobSetSuspendedCondition(js, updateStatusOpts)
return nil
}

// resumeJobsIfNecessary iterates through each replicatedJob, resuming any suspended jobs if the JobSet
// is not suspended.
func (r *JobSetReconciler) resumeJobsIfNecessary(ctx context.Context, js *jobset.JobSet, activeJobs []*batchv1.Job, replicatedJobStatuses []jobset.ReplicatedJobStatus, updateStatusOpts *statusUpdateOpts) error {
Expand Down Expand Up @@ -494,6 +482,11 @@ func (r *JobSetReconciler) reconcileReplicatedJobs(ctx context.Context, js *jobs
return err
}

// Don't create child Jobs if the JobSet is suspended
if jobSetSuspended(js) {
continue
}

status := findReplicatedJobStatus(replicatedJobStatus, replicatedJob.Name)

// For startup policy, if the replicatedJob is started we can skip this loop.
Expand Down
48 changes: 38 additions & 10 deletions pkg/webhooks/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,18 +314,20 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime.
}
mungedSpec := js.Spec.DeepCopy()

// Allow pod template to be mutated for suspended JobSets.
// Allow pod template to be mutated for suspended JobSets, or JobSets getting suspended.
// This is needed for integration with Kueue/DWS.
if ptr.Deref(oldJS.Spec.Suspend, false) {
if ptr.Deref(oldJS.Spec.Suspend, false) || ptr.Deref(js.Spec.Suspend, false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if ptr.Deref(oldJS.Spec.Suspend, false) || ptr.Deref(js.Spec.Suspend, false) {
if ptr.Deref(oldJS.Spec.Suspend, false) || (ptr.Deref(js.Spec.Suspend, false) && js.Spec.ReplicatedJobs[*].status.startTime == nil) {

Wouldn't it be helpful to validate if the jobs don't have startTime?
Because if any jobs have startTime, this operation should fail due to batch/job validation.
But, I'm not sure if we should add validations deeply depending on the batch/job specifications.

@danielvegamyhre @mimowo WDYT?

Copy link
Contributor Author

@mimowo mimowo Jul 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the flow is that we first suspend the parent JobSet, then it suspends the Jobs. So, if we added this condition, the startTime on running child jobs would prevent the update.

However, this is complex, and I think I should have added an e2e test in JobSet to verify the suspending works.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea I’m thinking integration and e2e test solidifying the requirements that JobSet needs for working with Kueue will be a great idea.

We obviously don’t want to bring in Kueue as a dependency so I think just verifying that the updates/patches work in integration/e2e will be welcome

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the flow is that we first suspend the parent JobSet, then it suspends the Jobs. So, if we added this condition, the startTime on running child jobs would prevent the update.

Uhm, that makes sense.
But, I guess that users fall into the rabbit hole when they accidentally modify the scheduling directives against the ReplicatedJob with startTime because the jobset-controller just output errors in the controller logs, this is not directly user feedback.

Maybe we can improve these implied specifications for the JobSet users once we introduce kubernetes/kubernetes#113221.

So, I'm ok with the current implementation for now.

Copy link
Contributor Author

@mimowo mimowo Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the e2e test would not work (see in the PR I already added it), because there is a deeper issue - JobSet does not delete the Jobs once suspended: #535. The fact that we keep the old jobs means we never create new jobs on resuming again if the PodTemplate is updated in JobSet.

I'm trying to implement it by deleting the Jobs on suspend, and it looks promising. There are some integration tests for startupPolicy to adjust I yet need to look into.

Copy link
Contributor Author

@mimowo mimowo Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if you have some more context knowledge if traps I may encounter with this approach (I will probably continue next week), but essentially this is the only I see, and it would mimic the Job behavior, where suspended Jobs deletes the pods.

for index := range js.Spec.ReplicatedJobs {
// Pod values which must be mutable for Kueue are defined here: https://github.com/kubernetes-sigs/kueue/blob/a50d395c36a2cb3965be5232162cf1fded1bdb08/apis/kueue/v1beta1/workload_types.go#L256-L260
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Annotations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Annotations
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Labels = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Labels
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.NodeSelector = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.NodeSelector
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.Tolerations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.Tolerations

// Pod Scheduling Gates can be updated for batch/v1 Job: https://github.com/kubernetes/kubernetes/blob/ceb58a4dbc671b9d0a2de6d73a1616bc0c299863/pkg/apis/batch/validation/validation.go#L662
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates
if allowToMutatePodTemplate(js, index) {
// Pod values which must be mutable for Kueue are defined here: https://github.com/kubernetes-sigs/kueue/blob/a50d395c36a2cb3965be5232162cf1fded1bdb08/apis/kueue/v1beta1/workload_types.go#L256-L260
mimowo marked this conversation as resolved.
Show resolved Hide resolved
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Annotations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Annotations
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Labels = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Labels
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.NodeSelector = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.NodeSelector
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.Tolerations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.Tolerations

// Pod Scheduling Gates can be updated for batch/v1 Job: https://github.com/kubernetes/kubernetes/blob/ceb58a4dbc671b9d0a2de6d73a1616bc0c299863/pkg/apis/batch/validation/validation.go#L662
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.SchedulingGates
}
}
}

Expand All @@ -335,6 +337,32 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime.
return nil, errs.ToAggregate()
}

// allowToMutatePodTemplate checks if a PodTemplate in a specifc ReplicatedJob
// can be mutated.
func allowToMutatePodTemplate(js *jobset.JobSet, index int) bool {
// We allow to mutate the PodTemplate when the JobSet is suspended already
// or is getting suspended.
if ptr.Deref(js.Spec.Suspend, false) {
return true
}
// We allow to mutate the PodTemplate if the ReplicatedJob status is not
// initialized yet, because in that case there are no Jobs from the previous
// run.
if len(js.Status.ReplicatedJobsStatus) < len(js.Spec.ReplicatedJobs) {
return true
}

// We don't allow to mutate the PodTemplate when the JobSet is resuming, and
// the Jobs from the previous run still exist. This gives time the JobSet
// controller to delete the Jobs from the previous run if they could
// conflict with the Job creation on resume.
rStatus := js.Status.ReplicatedJobsStatus
if rStatus[index].Active > 0 || rStatus[index].Suspended > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should include terminating?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, currently we only count as "active" Jobs which have at least one "active" pods. This means that it is possible that some Jobs which have only terminating pods may still exist.

One fixing idea would be to count Jobs as active if they have at least one active or terminating pod. Another is to introduce a new counter for "terminating" jobs - Jobs which have all pods terminating. WDYT?

I guess this is a corner case for now, so maybe could be done in a follow up?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not too bad in this PR, because a terminating Job (all pods are terminating) is going to be deleted eventually. Yes, if we resume the JobSet while it exists, then it can block creating replacement Job for a while, but IIUC the JobSet controller will create replacement for such a Job as soon as it is fully gone, and the replacement Job will have a good PodTemplate.

In order to prevent resuming a JobSet while there is a terminating Job we could:

  1. include such a Job as active, if job.Status.Active > 0 || ptr.Deref(job.Status.Terminating, 0) > 0 here
  2. introduce a new counter for Terminating Jobs. Terminating would be a fallback (so job.Status.Active == 0) counter if ptr.Deref(job.Status.Terminating, 0) > 0

I'm leaning towards option (2.). I could implement it either in this PR or a follow up.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a follow up is fine. Terminating on Jobs is currently a beta feature so logic may be somewhat complex.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Terminating on Jobs is currently a beta feature so logic may be somewhat complex.

I think we don't need any code in JobSet depending on the feature-gate in k8s. If the feature gate is disabled in k8s the field is not set. So, the code in JobSet would see nil, we could prevent panics with ptr.Deref(job.Status.Terminating, 0), but this should be enough, IIUC.

return false
}
return true
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
func (j *jobSetWebhook) ValidateDelete(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
return nil, nil
Expand Down
43 changes: 43 additions & 0 deletions pkg/webhooks/jobset_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1640,6 +1640,49 @@ func TestValidateUpdate(t *testing.T) {
},
},
},
{
name: "replicated job pod template can be updated for jobset getting suspended",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Suspend: ptr.To(true),
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
// Restoring the template by removing the annotation
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Template: corev1.PodTemplateSpec{},
},
},
},
},
},
},
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{"key": "value"},
},
},
},
},
},
},
},
},
},
{
name: "replicated job pod template cannot be updated for running jobset",
js: &jobset.JobSet{
Expand Down
183 changes: 180 additions & 3 deletions test/e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package e2e
import (
"context"
"fmt"
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/ptr"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
"sigs.k8s.io/jobset/pkg/util/testing"
Expand Down Expand Up @@ -131,6 +133,159 @@ var _ = ginkgo.Describe("JobSet", func() {
})
})

ginkgo.When("job is unsuspended and suspend", func() {
ginkgo.It("should not create Jobs while suspended, and delete Jobs on suspend", func() {
ctx := context.Background()
js := shortSleepTestJobSet(ns).Obj()
jsKey := types.NamespacedName{Name: js.Name, Namespace: js.Namespace}

ginkgo.By("Create a suspended JobSet", func() {
js.Spec.Suspend = ptr.To(true)
js.Spec.TTLSecondsAfterFinished = ptr.To[int32](5)
mimowo marked this conversation as resolved.
Show resolved Hide resolved
gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed())
})

ginkgo.By("Verify Jobs aren't created", func() {
gomega.Consistently(func() int32 {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
if js.Status.ReplicatedJobsStatus == nil {
return 0
}
return js.Status.ReplicatedJobsStatus[0].Active
}).WithTimeout(time.Second).WithPolling(200 * time.Millisecond).Should(gomega.Equal(int32(0)))
})

ginkgo.By("Unsuspend the JobSet setting schedulingGates that prevent pods from being scheduled", func() {
mimowo marked this conversation as resolved.
Show resolved Hide resolved
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(false)
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
podTemplate.Spec.SchedulingGates = []corev1.PodSchedulingGate{
{
Name: "example.com/gate",
},
}
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for all Jobs to be created", func() {
gomega.Eventually(func() int32 {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
if js.Status.ReplicatedJobsStatus == nil {
return 0
}
return js.Status.ReplicatedJobsStatus[0].Active
}, timeout, interval).Should(gomega.Equal(js.Spec.ReplicatedJobs[0].Replicas))
})

ginkgo.By("Suspend the JobSet restoring the PodTemplate properties", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(true)
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
delete(podTemplate.Spec.NodeSelector, "kubernetes.io/hostname")
delete(podTemplate.Labels, "custom-label-key")
delete(podTemplate.Annotations, "custom-annotation-key")
podTemplate.Spec.SchedulingGates = nil
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for all Jobs to be deleted", func() {
gomega.Eventually(func() int32 {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
return js.Status.ReplicatedJobsStatus[0].Active
}, timeout, interval).Should(gomega.Equal(int32(0)))
})

ginkgo.By("Unsuspending the JobSet again with PodTemplate allowing completion", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(false)
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for the JobSet to complete successfully", func() {
util.JobSetCompleted(ctx, k8sClient, js, timeout)
})
})

ginkgo.It("should allow to quickly update PodTemplate on unsuspend and restore the PodTemplate on suspend", func() {
ctx := context.Background()
js := shortSleepTestJobSet(ns).Obj()
jsKey := types.NamespacedName{Name: js.Name, Namespace: js.Namespace}

ginkgo.By("Create a suspended JobSet", func() {
js.Spec.Suspend = ptr.To(true)
js.Spec.TTLSecondsAfterFinished = ptr.To[int32](5)
gomega.Expect(k8sClient.Create(ctx, js)).Should(gomega.Succeed())
})

ginkgo.By("Unsuspend the JobSet setting nodeSelectors that prevent pods from being scheduled", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(false)
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
if podTemplate.Spec.NodeSelector == nil {
podTemplate.Spec.NodeSelector = make(map[string]string)
}
podTemplate.Spec.NodeSelector["kubernetes.io/hostname"] = "non-existing-node"
if podTemplate.Labels == nil {
podTemplate.Labels = make(map[string]string)
}
podTemplate.Labels["custom-label-key"] = "custom-label-value"
if podTemplate.Annotations == nil {
podTemplate.Annotations = make(map[string]string)
}
podTemplate.Annotations["custom-annotation-key"] = "custom-annotation-value"
podTemplate.Spec.SchedulingGates = []corev1.PodSchedulingGate{
{
Name: "example.com/gate",
},
}
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for at least one active Job to make sure there are some running Pods", func() {
gomega.Eventually(func() int32 {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
if js.Status.ReplicatedJobsStatus == nil {
return 0
}
return js.Status.ReplicatedJobsStatus[0].Active
}, timeout, interval).Should(gomega.BeNumerically(">=", 1))
})

ginkgo.By("Suspend the JobSet restoring the PodTemplate properties", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(true)
podTemplate := &js.Spec.ReplicatedJobs[0].Template.Spec.Template
delete(podTemplate.Spec.NodeSelector, "kubernetes.io/hostname")
delete(podTemplate.Labels, "custom-label-key")
delete(podTemplate.Annotations, "custom-annotation-key")
podTemplate.Spec.SchedulingGates = nil
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Unsuspending the JobSet again with PodTemplate allowing completion", func() {
gomega.Eventually(func() error {
gomega.Expect(k8sClient.Get(ctx, jsKey, js)).Should(gomega.Succeed())
js.Spec.Suspend = ptr.To(false)
return k8sClient.Update(ctx, js)
}, timeout, interval).Should(gomega.Succeed())
})

ginkgo.By("Await for the JobSet to complete successfully", func() {
util.JobSetCompleted(ctx, k8sClient, js, timeout)
})
})
})

}) // end of Describe

// getPingCommand returns ping command for 4 hostnames
Expand All @@ -144,14 +299,14 @@ do
gotStatus="-1"
wantStatus="0"
while [ $gotStatus -ne $wantStatus ]
do
do
ping -c 1 $pod > /dev/null 2>&1
gotStatus=$?
gotStatus=$?
if [ $gotStatus -ne $wantStatus ]; then
echo "Failed to ping pod $pod, retrying in 1 second..."
sleep 1
fi
done
done
echo "Successfully pinged pod: $pod"
done
sleep 30`, hostnames[0], hostnames[1], hostnames[2], hostnames[3])
Expand Down Expand Up @@ -246,3 +401,25 @@ func sleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper {
Replicas(int32(replicas)).
Obj())
}

func shortSleepTestJobSet(ns *corev1.Namespace) *testing.JobSetWrapper {
jsName := "js"
rjobName := "rjob"
replicas := 3
return testing.MakeJobSet(jsName, ns.Name).
ReplicatedJob(testing.MakeReplicatedJob(rjobName).
Job(testing.MakeJobTemplate("job", ns.Name).
PodSpec(corev1.PodSpec{
RestartPolicy: "Never",
Containers: []corev1.Container{
{
Name: "short-sleep-test-container",
Image: "bash:latest",
Command: []string{"bash", "-c"},
Args: []string{"sleep 1"},
},
},
}).Obj()).
Replicas(int32(replicas)).
Obj())
}
Loading