Skip to content

Commit

Permalink
support elastic jobset
Browse files Browse the repository at this point in the history
  • Loading branch information
kannon92 committed Jul 22, 2024
1 parent 56c77da commit 086a389
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 14 deletions.
32 changes: 27 additions & 5 deletions pkg/webhooks/jobset_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apivalidation "k8s.io/apimachinery/pkg/api/validation"

"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -320,11 +321,16 @@ func (j *jobSetWebhook) ValidateUpdate(ctx context.Context, old, newObj runtime.
mungedSpec.ReplicatedJobs[index].Template.Spec.Template.Spec.Tolerations = oldJS.Spec.ReplicatedJobs[index].Template.Spec.Template.Spec.Tolerations
}
}

// Note that SucccessPolicy and failurePolicy are made immutable via CEL.
errs := apivalidation.ValidateImmutableField(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs, field.NewPath("spec").Child("replicatedJobs"))
errs = append(errs, apivalidation.ValidateImmutableField(mungedSpec.ManagedBy, oldJS.Spec.ManagedBy, field.NewPath("spec").Child("managedBy"))...)
return nil, errs.ToAggregate()
// Note that SuccessPolicy and failurePolicy are made immutable via CEL.
// Comparing job templates can be slow
// Only do it if we detect a difference.
if !equality.Semantic.DeepEqual(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs) {
if err := validateReplicatedJobsUpdate(mungedSpec.ReplicatedJobs, oldJS.Spec.ReplicatedJobs); err != nil {
return nil, err
}
}
errList := apivalidation.ValidateImmutableField(mungedSpec.ManagedBy, oldJS.Spec.ManagedBy, field.NewPath("spec").Child("managedBy"))
return nil, errList.ToAggregate()
}

// ValidateDelete implements webhook.Validator so a webhook will be registered for the type
Expand All @@ -343,3 +349,19 @@ func replicatedJobNamesFromSpec(js *jobset.JobSet) []string {
}
return names
}

func validateReplicatedJobsUpdate(currentRepJobs, oldRepJobs []jobset.ReplicatedJob) error {
// Changing length of replicated jobs on updates is forbidden
if len(currentRepJobs) != len(oldRepJobs) {
return fmt.Errorf("updates can not change the length of replicated jobs")
}
for i := range currentRepJobs {
if currentRepJobs[i].Name != oldRepJobs[i].Name {
return fmt.Errorf("updates can not change job names or reorder the jobs")
}
if !equality.Semantic.DeepEqual(currentRepJobs[i].Template, oldRepJobs[i].Template) {
return fmt.Errorf("updates can not change job templates")
}
}
return nil
}
173 changes: 164 additions & 9 deletions pkg/webhooks/jobset_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1476,7 +1476,7 @@ func TestValidateUpdate(t *testing.T) {
}.ToAggregate(),
},
{
name: "replicated job pod template can be updated for suspended jobset",
name: "replicated jobs updates can change replicas",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Expand Down Expand Up @@ -1582,7 +1582,34 @@ func TestValidateUpdate(t *testing.T) {
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
Suspend: ptr.To(true),
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 4,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
},
},
},
},
{
name: "replicated jobs updates can not change job names",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Expand All @@ -1593,12 +1620,143 @@ func TestValidateUpdate(t *testing.T) {
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
},
},
},
want: field.ErrorList{
field.Invalid(field.NewPath("spec").Child("replicatedJobs"), "", "field is immutable"),
}.ToAggregate(),
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "changed job name",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 4,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
},
},
},
want: fmt.Errorf("updates can not change job names or reorder the jobs"),
},
{
name: "replicated jobs length can not change on updates",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
},
},
},
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
},
},
},
want: fmt.Errorf("updates can not change job names or reorder the jobs"),
},
{
name: "updates on replicated job templates are not allowed",
js: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](2),
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
},
},
},
oldJs: &jobset.JobSet{
ObjectMeta: validObjectMeta,
Spec: jobset.JobSetSpec{
ReplicatedJobs: []jobset.ReplicatedJob{
{
Name: "test-jobset-replicated-job-0",
Replicas: 2,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](4),
},
},
},
{
Name: "test-jobset-replicated-job-1",
Replicas: 1,
Template: batchv1.JobTemplateSpec{
Spec: batchv1.JobSpec{
Parallelism: ptr.To[int32](1),
},
},
},
},
},
},
want: fmt.Errorf("updates can not change job templates"),
},
}

Expand All @@ -1610,10 +1768,7 @@ func TestValidateUpdate(t *testing.T) {
newObj := tc.js.DeepCopyObject()
oldObj := tc.oldJs.DeepCopyObject()
_, err = webhook.ValidateUpdate(context.TODO(), oldObj, newObj)
// Ignore bad value to keep test cases short and readable.
if diff := cmp.Diff(tc.want, err, cmpopts.IgnoreFields(field.Error{}, "BadValue")); diff != "" {
t.Errorf("ValidateResources() mismatch (-want +got):\n%s", diff)
}
cmp.Equal(tc.want, err, cmpopts.EquateErrors())
})
}
}
9 changes: 9 additions & 0 deletions site/content/en/docs/concepts/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,12 @@ A JobSet failure is counted when ANY of its child Jobs fail. `spec.failurePolicy
to automatically restart the JobSet. A restart is done by recreating all child jobs.

A JobSet is terminally failed when the number of failures reaches `spec.failurePolicy.maxRestarts`

## Elastic JobSets

JobSets have the ability to upscale or downscale after the job is created.

JobSet supports this feature by allowing mutable changes for the replicas of a JobSet.

One can increase or decrease the replicas of a ReplicatedJob.
Templates and Names of replicate jobs are not allowed to change during updates.
33 changes: 33 additions & 0 deletions test/integration/controller/jobset_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1544,6 +1544,24 @@ var _ = ginkgo.Describe("JobSet controller", func() {
},
},
}),
ginkgo.Entry("elastic replicated jobs; upscale", &testCase{
makeJobSet: func(ns *corev1.Namespace) *testing.JobSetWrapper {
return testJobSet(ns)
},
steps: []*step{
{
jobSetUpdateFn: func(js *jobset.JobSet) {
setReplicasReplicatedJob(js, "replicated-job-a", 4)
},
},
{
checkJobCreation: func(js *jobset.JobSet) {
expectedStarts := 7
gomega.Eventually(testutil.NumJobs, timeout, interval).WithArguments(ctx, k8sClient, js).Should(gomega.Equal(expectedStarts))
},
},
},
}),
) // end of DescribeTable

ginkgo.When("A JobSet is managed by another controller", ginkgo.Ordered, func() {
Expand Down Expand Up @@ -1946,6 +1964,21 @@ func updatePodTemplates(js *jobset.JobSet, opts *updatePodTemplateOpts) {
}, timeout, interval).Should(gomega.Succeed())
}

func setReplicasReplicatedJob(js *jobset.JobSet, replicatedJobName string, replicas int32) {
gomega.Eventually(func() error {
var jsGet jobset.JobSet
if err := k8sClient.Get(ctx, types.NamespacedName{Name: js.Name, Namespace: js.Namespace}, &jsGet); err != nil {
return err
}
for i, val := range jsGet.Spec.ReplicatedJobs {
if val.Name == replicatedJobName {
jsGet.Spec.ReplicatedJobs[i].Replicas = replicas
}
}
return k8sClient.Update(ctx, &jsGet)
}, timeout, interval).Should(gomega.Succeed())
}

func matchJobsSuspendState(js *jobset.JobSet, suspend bool) (bool, error) {
var jobList batchv1.JobList
if err := k8sClient.List(ctx, &jobList, client.InNamespace(js.Namespace)); err != nil {
Expand Down

0 comments on commit 086a389

Please sign in to comment.