Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pods are preferentially scheduled to machines that meet the current session resources #2815

Merged
merged 1 commit into from
Aug 7, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 50 additions & 24 deletions pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,50 +195,76 @@ func (alloc *Action) Execute(ssn *framework.Session) {
break
}

var candidateNodes []*api.NodeInfo
// Candidate nodes are divided into two gradients:
// - the first gradient node: a list of free nodes that satisfy the task resource request;
// - The second gradient node: the node list whose sum of node idle resources and future idle meets the task resource request;
// Score the first gradient node first. If the first gradient node meets the requirements, ignore the second gradient node list,
// otherwise, score the second gradient node and select the appropriate node.
var candidateNodes [][]*api.NodeInfo
var idleCandidateNodes []*api.NodeInfo
var futureIdleCandidateNodes []*api.NodeInfo
for _, n := range predicateNodes {
if task.InitResreq.LessEqual(n.Idle, api.Zero) || task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) {
candidateNodes = append(candidateNodes, n)
if task.InitResreq.LessEqual(n.Idle, api.Zero) {
idleCandidateNodes = append(idleCandidateNodes, n)
} else if task.InitResreq.LessEqual(n.FutureIdle(), api.Zero) {
futureIdleCandidateNodes = append(futureIdleCandidateNodes, n)
} else {
klog.V(5).Infof("Predicate filtered node %v, idle: %v and future idle: %v do not meet the requirements of task: %v",
n.Name, n.Idle, n.FutureIdle(), task.Name)
}
}
candidateNodes = append(candidateNodes, idleCandidateNodes)
candidateNodes = append(candidateNodes, futureIdleCandidateNodes)

var bestNode *api.NodeInfo
for index, nodes := range candidateNodes {
if klog.V(5).Enabled() {
for _, node := range nodes {
klog.V(5).Infof("node %v, idle: %v, future idle: %v", node.Name, node.Idle, node.FutureIdle())
}
}
switch {
case len(nodes) == 0:
klog.V(5).Infof("Task: %v, no matching node is found in the candidateNodes(index: %d) list.", task.Name, index)
case len(nodes) == 1: // If only one node after predicate, just use it.
bestNode = nodes[0]
case len(nodes) > 1: // If more than one node after predicate, using "the best" one
nodeScores := util.PrioritizeNodes(task, nodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

bestNode = ssn.BestNodeFn(task, nodeScores)
if bestNode == nil {
bestNode = util.SelectBestNode(nodeScores)
}
}

var node *api.NodeInfo
switch {
case len(candidateNodes) == 0: // If not candidate nodes for this task, skip it.
continue
case len(candidateNodes) == 1: // If only one node after predicate, just use it.
node = candidateNodes[0]
case len(candidateNodes) > 1: // If more than one node after predicate, using "the best" one
nodeScores := util.PrioritizeNodes(task, candidateNodes, ssn.BatchNodeOrderFn, ssn.NodeOrderMapFn, ssn.NodeOrderReduceFn)

node = ssn.BestNodeFn(task, nodeScores)
if node == nil {
node = util.SelectBestNode(nodeScores)
// If a proper node is found in idleCandidateNodes, skip futureIdleCandidateNodes and directly return the node information.
if bestNode != nil {
break
}
}

// Allocate idle resource to the task.
if task.InitResreq.LessEqual(node.Idle, api.Zero) {
if task.InitResreq.LessEqual(bestNode.Idle, api.Zero) {
klog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
task.Namespace, task.Name, node.Name)
if err := stmt.Allocate(task, node); err != nil {
task.Namespace, task.Name, bestNode.Name)
if err := stmt.Allocate(task, bestNode); err != nil {
klog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, ssn.UID, err)
task.UID, bestNode.Name, ssn.UID, err)
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
}
} else {
klog.V(3).Infof("Predicates failed in allocate for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)
task.Namespace, task.Name, bestNode.Name)

// Allocate releasing resource to the task if any.
if task.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
if task.InitResreq.LessEqual(bestNode.FutureIdle(), api.Zero) {
klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
if err := stmt.Pipeline(task, node.Name); err != nil {
task.Namespace, task.Name, bestNode.Name, task.InitResreq, bestNode.Releasing)
if err := stmt.Pipeline(task, bestNode.Name); err != nil {
klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
task.UID, node.Name, ssn.UID, err)
task.UID, bestNode.Name, ssn.UID, err)
} else {
metrics.UpdateE2eSchedulingDurationByJob(job.Name, string(job.Queue), job.Namespace, metrics.Duration(job.CreationTimestamp.Time))
metrics.UpdateE2eSchedulingLastTimeByJob(job.Name, string(job.Queue), job.Namespace, time.Now())
Expand Down