Skip to content

Commit

Permalink
daniel comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kannon92 committed Oct 4, 2024
1 parent 1b89010 commit bf458c6
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 25 deletions.
58 changes: 47 additions & 11 deletions pkg/controllers/elastic_jobset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,72 @@ package controllers

import (
"fmt"
"slices"
"strconv"

batchv1 "k8s.io/api/batch/v1"

jobset "sigs.k8s.io/jobset/api/jobset/v1alpha2"
)

// jobsToDeleteDownScale gathers the excess jobs during a downscale
func indexFunc(a, b batchv1.Job) int {
jobIndexA, errA := strconv.Atoi(a.Labels[jobset.JobIndexKey])
jobIndexB, errB := strconv.Atoi(b.Labels[jobset.JobIndexKey])
if errA != nil {
return 0
}
if errB != nil {
return 0
}
if jobIndexA > jobIndexB {
return 1
} else if jobIndexA < jobIndexB {
return -1
} else {
return 0
}
}

// jobsToDeleteForDownScale gathers the excess jobs during a downscale
// and deletes the jobs
func jobsToDeleteDownScale(replicatedJobs []jobset.ReplicatedJob, replicatedJobStatus []jobset.ReplicatedJobStatus, jobItems []batchv1.Job) ([]*batchv1.Job, error) {
func jobsToDeleteForDownScale(replicatedJobs []jobset.ReplicatedJob, replicatedJobStatuses []jobset.ReplicatedJobStatus, jobItems []batchv1.Job) ([]*batchv1.Job, error) {
jobsToDelete := []*batchv1.Job{}
type payload struct {
batchJobs []batchv1.Job
rjStatus jobset.ReplicatedJobStatus
replicas int32
}
replicatedJobToBatchJobMap := map[string]payload{}
for _, replicatedJob := range replicatedJobs {
status := findReplicatedJobStatus(replicatedJobStatus, replicatedJob.Name)
countOfJobsToDelete := status.Ready - replicatedJob.Replicas
status := findReplicatedJobStatus(replicatedJobStatuses, replicatedJob.Name)
newPayload := &payload{}
newPayload.rjStatus = status
newPayload.replicas = replicatedJob.Replicas
for _, val := range jobItems {
if val.Labels[jobset.ReplicatedJobNameKey] != replicatedJob.Name {
continue
}
newPayload.batchJobs = append(newPayload.batchJobs, val)
}
slices.SortFunc(newPayload.batchJobs, indexFunc)
replicatedJobToBatchJobMap[replicatedJob.Name] = *newPayload
}
for _, jobAndStatus := range replicatedJobToBatchJobMap {
countOfJobsToDelete := jobAndStatus.rjStatus.Ready - jobAndStatus.replicas
if countOfJobsToDelete > 0 {
jobsWeDeleted := 0
for _, val := range jobItems {
if val.Labels[jobset.ReplicatedJobNameKey] != replicatedJob.Name {
continue
}
jobIndex, err := strconv.Atoi(val.Labels[jobset.JobIndexKey])
for i := len(jobAndStatus.batchJobs) - 1; i >= 0; i-- {

jobIndex, err := strconv.Atoi(jobAndStatus.batchJobs[i].Labels[jobset.JobIndexKey])
if err != nil {
return nil, fmt.Errorf("unable get integer from job index key")
}
if jobIndex >= int(countOfJobsToDelete) {
jobsWeDeleted = jobsWeDeleted + 1
jobsToDelete = append(jobsToDelete, &val)
jobsToDelete = append(jobsToDelete, &jobAndStatus.batchJobs[i])
}
if jobsWeDeleted == int(countOfJobsToDelete) {
continue
break
}
}
}
Expand Down
66 changes: 53 additions & 13 deletions pkg/controllers/elastic_jobset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ import (
func TestJobsToDeleteDownScale(t *testing.T) {

tests := []struct {
name string
replicatedJobs []jobset.ReplicatedJob
replicatedJobStatus []jobset.ReplicatedJobStatus
jobs []batchv1.Job
expectedJobsToDelete int32
gotError error
name string
replicatedJobs []jobset.ReplicatedJob
replicatedJobStatus []jobset.ReplicatedJobStatus
jobs []batchv1.Job
expectedJobsThatWereDeleted []batchv1.Job
gotError error
}{
{
name: "no elastic downscale",
Expand Down Expand Up @@ -118,7 +118,16 @@ func TestJobsToDeleteDownScale(t *testing.T) {
},
},
},
expectedJobsToDelete: 1,
expectedJobsThatWereDeleted: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test",
jobset.JobIndexKey: "1",
},
},
},
},
},
{
name: "elastic downscale is needed for second replicated job",
Expand Down Expand Up @@ -161,6 +170,22 @@ func TestJobsToDeleteDownScale(t *testing.T) {
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "2",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "3",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
Expand All @@ -177,35 +202,50 @@ func TestJobsToDeleteDownScale(t *testing.T) {
},
},
},
},
expectedJobsThatWereDeleted: []batchv1.Job{
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "2",
jobset.JobIndexKey: "3",
},
},
},
{
ObjectMeta: v1.ObjectMeta{
Labels: map[string]string{
jobset.ReplicatedJobNameKey: "test-2",
jobset.JobIndexKey: "3",
jobset.JobIndexKey: "2",
},
},
},
},
expectedJobsToDelete: 2,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
actual, err := jobsToDeleteDownScale(tc.replicatedJobs, tc.replicatedJobStatus, tc.jobs)
actual, err := jobsToDeleteForDownScale(tc.replicatedJobs, tc.replicatedJobStatus, tc.jobs)
if diff := cmp.Diff(tc.gotError, err); diff != "" {
t.Errorf("unexpected finished value (+got/-want): %s", diff)
}
if diff := cmp.Diff(tc.expectedJobsToDelete, int32(len(actual))); diff != "" {
t.Errorf("unexpected finished value (+got/-want): %s", diff)
if len(actual) != len(tc.expectedJobsThatWereDeleted) {
t.Errorf("unexpected length mismatch for deleted jobs: got: %d want: %d", len(actual), len(tc.expectedJobsThatWereDeleted))
}
if tc.expectedJobsThatWereDeleted != nil {
for i := range actual {
actualReplicatedJobName := actual[i].ObjectMeta.Labels[jobset.ReplicatedJobNameKey]
actualJobIndexKey := actual[i].ObjectMeta.Labels[jobset.JobIndexKey]
expectedReplicatedJobName := tc.expectedJobsThatWereDeleted[i].ObjectMeta.Labels[jobset.ReplicatedJobNameKey]
expectedJobIndexKey := tc.expectedJobsThatWereDeleted[i].ObjectMeta.Labels[jobset.JobIndexKey]
if diff := cmp.Diff(actualReplicatedJobName, expectedReplicatedJobName); diff != "" {
t.Errorf("unexpected replicated job name (+got/-want): %s", diff)
}
if diff := cmp.Diff(actualJobIndexKey, expectedJobIndexKey); diff != "" {
t.Errorf("unexpected job index (+got/-want): %s", diff)
}
}
}
})
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/jobset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,7 +536,7 @@ func (r *JobSetReconciler) downscaleElasticJobs(ctx context.Context, js *jobset.
return err
}

jobsToDelete, err := jobsToDeleteDownScale(js.Spec.ReplicatedJobs, replicatedJobStatus, childJobList.Items)
jobsToDelete, err := jobsToDeleteForDownScale(js.Spec.ReplicatedJobs, replicatedJobStatus, childJobList.Items)
if err != nil {
return err
}
Expand Down

0 comments on commit bf458c6

Please sign in to comment.