Skip to content

Commit

Permalink
keep the origin logic of calculate podgorup minresource when jobMinAv…
Browse files Browse the repository at this point in the history
…ailable < totalTask's

Signed-off-by: lowang-bh <[email protected]>
  • Loading branch information
lowang-bh committed Aug 26, 2023
1 parent f4c1244 commit 9e7a2a2
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 3 deletions.
15 changes: 15 additions & 0 deletions pkg/controllers/job/job_controller_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,7 @@ func (cc *jobcontroller) deleteJobPod(jobName string, pod *v1.Pod) error {
func (cc *jobcontroller) calcPGMinResources(job *batch.Job) *v1.ResourceList {
// sort task by priorityClasses
var tasksPriority TasksPriority
totalMinAvailable := int32(0)
for _, task := range job.Spec.Tasks {
tp := TaskPriority{0, task}
pc := task.Template.Spec.PriorityClassName
Expand All @@ -779,8 +780,22 @@ func (cc *jobcontroller) calcPGMinResources(job *batch.Job) *v1.ResourceList {
}
}
tasksPriority = append(tasksPriority, tp)
if task.MinAvailable != nil { // actually, it can not be nil, because nil value will be patched in webhook
totalMinAvailable += *task.MinAvailable
} else {
totalMinAvailable += task.Replicas
}
}

// see docs https://github.com/volcano-sh/volcano/pull/2945
// 1. job.MinAvailable < sum(task.MinAvailable), regard podgroup's min resource as sum of the first minAvailable,
// according to https://github.com/volcano-sh/volcano/blob/c91eb07f2c300e4d5c826ff11a63b91781b3ac11/pkg/scheduler/api/job_info.go#L738-L740
if job.Spec.MinAvailable < totalMinAvailable {
minReq := tasksPriority.CalcFirstCountResources(job.Spec.MinAvailable)
return &minReq
}

// 2. job.MinAvailable >= sum(task.MinAvailable)
minReq := tasksPriority.CalcPGMinResources(job.Spec.MinAvailable)

return &minReq
Expand Down
17 changes: 17 additions & 0 deletions pkg/controllers/job/job_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,23 @@ func isControlledBy(obj metav1.Object, gvk schema.GroupVersionKind) bool {
return false
}

// CalcFirstCountResources return the first count tasks resource, sorted by priority
func (p TasksPriority) CalcFirstCountResources(count int32) v1.ResourceList {
sort.Sort(p)
minReq := v1.ResourceList{}

for _, task := range p {
if count <= task.Replicas {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, count))
break
} else {
minReq = quotav1.Add(minReq, calTaskRequests(&v1.Pod{Spec: task.Template.Spec}, task.Replicas))
count -= task.Replicas
}
}
return minReq
}

// CalcPGMinResources sums up all task's min available; if not enough, then fill up to jobMinAvailable via task's replicas
func (p TasksPriority) CalcPGMinResources(jobMinAvailable int32) v1.ResourceList {
sort.Sort(p)
Expand Down
100 changes: 97 additions & 3 deletions pkg/controllers/job/job_controller_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"volcano.sh/apis/pkg/apis/batch/v1alpha1"
batch "volcano.sh/apis/pkg/apis/batch/v1alpha1"
busv1alpha1 "volcano.sh/apis/pkg/apis/bus/v1alpha1"
"volcano.sh/volcano/pkg/controllers/apis"
)
Expand Down Expand Up @@ -685,8 +686,8 @@ func TestTasksPriority_Swap(t *testing.T) {
}
}

func TestTaskPriority_CalcPGMin(t *testing.T) {
worker := v1alpha1.TaskSpec{
var (
worker = v1alpha1.TaskSpec{
Name: "worker",
Replicas: 2,
Template: v1.PodTemplateSpec{
Expand All @@ -704,7 +705,7 @@ func TestTaskPriority_CalcPGMin(t *testing.T) {
},
},
}
master := v1alpha1.TaskSpec{
master = v1alpha1.TaskSpec{
Name: "master",
Replicas: 2,
Template: v1.PodTemplateSpec{
Expand All @@ -722,6 +723,9 @@ func TestTaskPriority_CalcPGMin(t *testing.T) {
},
},
}
)

func TestTaskPriority_CalcPGMin(t *testing.T) {

oneMinAvailable := int32(1)
zeroMinAvailable := int32(0)
Expand Down Expand Up @@ -976,3 +980,93 @@ func TestTaskPriority_CalcPGMin(t *testing.T) {
}
}
}

func TestCalcPGMinResources(t *testing.T) {
jc := newFakeController()
job := &batch.Job{
TypeMeta: metav1.TypeMeta{},
Spec: batch.JobSpec{
Tasks: []batch.TaskSpec{
master, worker,
},
},
}

oneMinAvailable := int32(1)
//zeroMinAvailable := int32(0)

tests := []struct {
TasksMinAvailable []*int32
JobMinMember int32
ExpectValue v1.ResourceList
}{
// jobMinAvailable < sum(taskMinAvailable)
{
JobMinMember: 2,
TasksMinAvailable: []*int32{&oneMinAvailable, nil},
ExpectValue: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), "requests.cpu": *resource.NewMilliQuantity(100, resource.DecimalSI),
"pods": *resource.NewQuantity(2, resource.DecimalSI), "count/pods": *resource.NewQuantity(2, resource.DecimalSI),
},
},
{
JobMinMember: 2,
TasksMinAvailable: []*int32{nil, &oneMinAvailable},
ExpectValue: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(100, resource.DecimalSI), "requests.cpu": *resource.NewMilliQuantity(100, resource.DecimalSI),
"pods": *resource.NewQuantity(2, resource.DecimalSI), "count/pods": *resource.NewQuantity(2, resource.DecimalSI),
},
},
{
JobMinMember: 3,
TasksMinAvailable: []*int32{nil, nil},
ExpectValue: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), "requests.cpu": *resource.NewMilliQuantity(200, resource.DecimalSI),
"pods": *resource.NewQuantity(3, resource.DecimalSI), "count/pods": *resource.NewQuantity(3, resource.DecimalSI),
},
},
{ // jobMinAvailable > sum(taskMinAvailable)
JobMinMember: 3,
TasksMinAvailable: []*int32{&oneMinAvailable, &oneMinAvailable},
ExpectValue: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(200, resource.DecimalSI), "requests.cpu": *resource.NewMilliQuantity(200, resource.DecimalSI),
"pods": *resource.NewQuantity(3, resource.DecimalSI), "count/pods": *resource.NewQuantity(3, resource.DecimalSI),
},
},
// jobMinAvailable = sum(taskMinAvailable)
{
JobMinMember: 3,
TasksMinAvailable: []*int32{&oneMinAvailable, nil},
ExpectValue: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(250, resource.DecimalSI), "requests.cpu": *resource.NewMilliQuantity(250, resource.DecimalSI),
"pods": *resource.NewQuantity(3, resource.DecimalSI), "count/pods": *resource.NewQuantity(3, resource.DecimalSI),
},
},
{
JobMinMember: 2,
TasksMinAvailable: []*int32{&oneMinAvailable, &oneMinAvailable},
ExpectValue: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(150, resource.DecimalSI), "requests.cpu": *resource.NewMilliQuantity(150, resource.DecimalSI),
"pods": *resource.NewQuantity(2, resource.DecimalSI), "count/pods": *resource.NewQuantity(2, resource.DecimalSI),
},
},
}
for i, tt := range tests {
job.Spec.MinAvailable = tt.JobMinMember
for i := range tt.TasksMinAvailable {
job.Spec.Tasks[i].MinAvailable = tt.TasksMinAvailable[i]
}
// simulating patch in webhook
for i, task := range job.Spec.Tasks {
if task.MinAvailable == nil {
min := &task.Replicas
job.Spec.Tasks[i].MinAvailable = min
}
}
gotMin := jc.calcPGMinResources(job)
if !reflect.DeepEqual(gotMin, &tt.ExpectValue) {
t.Fatalf("case %d: expected %v got %v", i, tt.ExpectValue, gotMin)
}

}
}

0 comments on commit 9e7a2a2

Please sign in to comment.