Skip to content

Commit

Permalink
Merge pull request #37 from william-wang/master
Browse files Browse the repository at this point in the history
improve scheduling performance on batch jobs
  • Loading branch information
volcano-sh-bot authored Jul 3, 2019
2 parents 0302d83 + b2baa8d commit 5749259
Show file tree
Hide file tree
Showing 3 changed files with 169 additions and 7 deletions.
15 changes: 11 additions & 4 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
glog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
tasks.Len(), job.Namespace, job.Name)

stmt := ssn.Statement()

for !tasks.Empty() {
task := tasks.Pop().(*api.TaskInfo)

Expand Down Expand Up @@ -159,7 +161,7 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
if task.InitResreq.LessEqual(node.Idle) {
glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
task.Namespace, task.Name, node.Name)
if err := ssn.Allocate(task, node.Name); err != nil {
if err := stmt.Allocate(task, node.Name); err != nil {
glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, ssn.UID, err)
}
Expand All @@ -174,9 +176,9 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
if task.InitResreq.LessEqual(node.Releasing) {
glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
if err := ssn.Pipeline(task, node.Name); err != nil {
glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
task.UID, node.Name, ssn.UID)
if err := stmt.Pipeline(task, node.Name); err != nil {
glog.Errorf("Failed to pipeline Task %v on %v",
task.UID, node.Name)
}
}
}
Expand All @@ -187,6 +189,11 @@ func (alloc *allocateAction) Execute(ssn *framework.Session) {
}
}

if ssn.JobReady(job) {
stmt.Commit()
} else {
stmt.Discard()
}
// Added Queue back until no job in Queue.
queues.Push(queue)
}
Expand Down
121 changes: 118 additions & 3 deletions pkg/scheduler/framework/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@ limitations under the License.
package framework

import (
"fmt"

"github.com/golang/glog"

"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/api"
"github.com/kubernetes-sigs/kube-batch/pkg/scheduler/metrics"
)

// Statement structure
Expand Down Expand Up @@ -111,7 +114,6 @@ func (s *Statement) unevict(reclaimee *api.TaskInfo, reason string) error {

// Pipeline the task for the node
func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error {
// Only update status in session
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil {
Expand Down Expand Up @@ -157,7 +159,6 @@ func (s *Statement) pipeline(task *api.TaskInfo) {
}

func (s *Statement) unpipeline(task *api.TaskInfo) error {
// Only update status in session
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pending); err != nil {
Expand Down Expand Up @@ -194,7 +195,117 @@ func (s *Statement) unpipeline(task *api.TaskInfo) error {
return nil
}

// Discard operation for evict and pipeline
// Allocate the task to node
func (s *Statement) Allocate(task *api.TaskInfo, hostname string) error {
if err := s.ssn.cache.AllocateVolumes(task, hostname); err != nil {
return err
}

// Only update status in session
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Allocated, s.ssn.UID, err)
return err
}
} else {
glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.",
task.Job, s.ssn.UID)
return fmt.Errorf("failed to find job %s", task.Job)
}

task.NodeName = hostname

if node, found := s.ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, s.ssn.UID, err)
return err
}
glog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
glog.Errorf("Failed to found Node <%s> in Session <%s> index when binding.",
hostname, s.ssn.UID)
return fmt.Errorf("failed to find node %s", hostname)
}

// Callbacks
for _, eh := range s.ssn.eventHandlers {
if eh.AllocateFunc != nil {
eh.AllocateFunc(&Event{
Task: task,
})
}
}

// Update status in session
glog.V(3).Info("Allocating operations ...")
s.operations = append(s.operations, operation{
name: "allocate",
args: []interface{}{task, hostname},
})

return nil
}

func (s *Statement) allocate(task *api.TaskInfo, hostname string) error {
if err := s.ssn.cache.BindVolumes(task); err != nil {
return err
}

if err := s.ssn.cache.Bind(task, task.NodeName); err != nil {
return err
}

// Update status in session
if job, found := s.ssn.Jobs[task.Job]; found {
if err := job.UpdateTaskStatus(task, api.Binding); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Binding, s.ssn.UID, err)
return err
}
} else {
glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.",
task.Job, s.ssn.UID)
return fmt.Errorf("failed to find job %s", task.Job)
}

metrics.UpdateTaskScheduleDuration(metrics.Duration(task.Pod.CreationTimestamp.Time))
return nil
}

// unallocate the pod for task
func (s *Statement) unallocate(task *api.TaskInfo, reason string) error {
// Update status in session
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pending); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Pending, s.ssn.UID, err)
}
} else {
glog.Errorf("Failed to find Job <%s> in Session <%s> index when unallocating.",
task.Job, s.ssn.UID)
}

if node, found := s.ssn.Nodes[task.NodeName]; found {
glog.V(3).Infof("Remove Task <%v> on node <%v>", task.Name, task.NodeName)
node.RemoveTask(task)
}

for _, eh := range s.ssn.eventHandlers {
if eh.DeallocateFunc != nil {
eh.DeallocateFunc(&Event{
Task: task,
})
}
}
return nil
}

// Discard operation for evict, pipeline and allocate
func (s *Statement) Discard() {
glog.V(3).Info("Discarding operations ...")
for i := len(s.operations) - 1; i >= 0; i-- {
Expand All @@ -204,6 +315,8 @@ func (s *Statement) Discard() {
s.unevict(op.args[0].(*api.TaskInfo), op.args[1].(string))
case "pipeline":
s.unpipeline(op.args[0].(*api.TaskInfo))
case "allocate":
s.unallocate(op.args[0].(*api.TaskInfo), op.args[1].(string))
}
}
}
Expand All @@ -217,6 +330,8 @@ func (s *Statement) Commit() {
s.evict(op.args[0].(*api.TaskInfo), op.args[1].(string))
case "pipeline":
s.pipeline(op.args[0].(*api.TaskInfo))
case "allocate":
s.allocate(op.args[0].(*api.TaskInfo), op.args[1].(string))
}
}
}
40 changes: 40 additions & 0 deletions test/e2e/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,46 @@ var _ = Describe("Job E2E Test", func() {
checkError(context, err)
})

It("Gang scheduling: Unsatisfied Job release owned res", func() {
context := initTestContext()
defer cleanupTestContext(context)
rep := clusterSize(context, oneCPU)
rep2 := rep * 2

job1 := &jobSpec{
name: "gang-qj-1",
namespace: "test",
tasks: []taskSpec{
{
img: "busybox",
req: oneCPU,
min: rep2,
rep: rep2,
},
},
}

job2 := &jobSpec{
name: "gang-qj-2",
namespace: "test",
tasks: []taskSpec{
{
img: "busybox",
req: oneCPU,
min: rep,
rep: rep,
},
},
}
_, pg1 := createJob(context, job1)
err := waitPodGroupPending(context, pg1)
checkError(context, err)

_, pg2 := createJob(context, job2)
err = waitPodGroupReady(context, pg2)
checkError(context, err)
})

It("Preemption", func() {
context := initTestContext()
defer cleanupTestContext(context)
Expand Down

0 comments on commit 5749259

Please sign in to comment.