From 14d1a8d7b5e969e698c33332d4daa1e1cbd3515d Mon Sep 17 00:00:00 2001 From: wangyang Date: Sat, 26 Aug 2023 16:07:21 +0800 Subject: [PATCH] Volcano supports resource preemption such as cpu, memory, and gpu, but does not support preemption by strategies such as antiAffinity and topologyspread Signed-off-by: wangyang --- pkg/scheduler/actions/backfill/backfill.go | 6 +- pkg/scheduler/actions/preempt/preempt.go | 6 +- pkg/scheduler/actions/reclaim/reclaim.go | 6 +- pkg/scheduler/framework/util.go | 13 ++-- .../plugins/predicates/predicates.go | 64 ++++++------------- .../plugins/predicates/proportional.go | 7 +- .../plugins/predicates/proportional_test.go | 2 +- pkg/scheduler/util/predicate_helper.go | 6 ++ 8 files changed, 40 insertions(+), 70 deletions(-) diff --git a/pkg/scheduler/actions/backfill/backfill.go b/pkg/scheduler/actions/backfill/backfill.go index 6f6847778b..e137aa2340 100644 --- a/pkg/scheduler/actions/backfill/backfill.go +++ b/pkg/scheduler/actions/backfill/backfill.go @@ -78,17 +78,13 @@ func (backfill *Action) Execute(ssn *framework.Session) { var statusSets util.StatusSets statusSets, err := ssn.PredicateFn(task, node) if err != nil { - klog.V(3).Infof("predicates failed in backfill for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, node.Name, err) fe.SetNodeError(node.Name, err) continue } if statusSets.ContainsUnschedulable() || statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - err := fmt.Errorf("predicates failed in backfill for task <%s/%s> on node <%s>, status is not success", - task.Namespace, task.Name, node.Name) - klog.V(3).Infof("%v", err) + err := fmt.Errorf("%s", statusSets.Message()) fe.SetNodeError(node.Name, err) continue } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index f3be6aa64b..88f1ea845b 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -213,13 +213,11 @@ func preempt( var statusSets util.StatusSets statusSets, err := ssn.PredicateFn(task, node) if err != nil { - return nil, fmt.Errorf("preempt predicates failed for task <%s/%s> on node <%s>: %v", - task.Namespace, task.Name, node.Name, err) + return nil, api.NewFitError(task, node, err.Error()) } if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - return nil, fmt.Errorf("predicates failed in preempt for task <%s/%s> on node <%s>, status is not success or unschedulable", - task.Namespace, task.Name, node.Name) + return nil, api.NewFitError(task, node, statusSets.Message()) } return nil, nil } diff --git a/pkg/scheduler/actions/reclaim/reclaim.go b/pkg/scheduler/actions/reclaim/reclaim.go index 8bced3d9ec..430ec99a98 100644 --- a/pkg/scheduler/actions/reclaim/reclaim.go +++ b/pkg/scheduler/actions/reclaim/reclaim.go @@ -126,15 +126,15 @@ func (ra *Action) Execute(ssn *framework.Session) { var statusSets util.StatusSets statusSets, err := ssn.PredicateFn(task, n) if err != nil { - klog.V(3).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v", + klog.V(5).Infof("reclaim predicates failed for task <%s/%s> on node <%s>: %v", task.Namespace, task.Name, n.Name, err) continue } // Allows scheduling to nodes that are in Success or Unschedulable state after filtering by predicate. if statusSets.ContainsUnschedulableAndUnresolvable() || statusSets.ContainsErrorSkipOrWait() { - klog.V(3).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, status is not success or unschedulable.", - task.Namespace, task.Name, n.Name) + klog.V(5).Infof("predicates failed in reclaim for task <%s/%s> on node <%s>, reason is %s.", + task.Namespace, task.Name, n.Name, statusSets.Message()) continue } klog.V(3).Infof("Considering Task <%s/%s> on Node <%s>.", diff --git a/pkg/scheduler/framework/util.go b/pkg/scheduler/framework/util.go index e43ef65e34..e89a4a7d95 100644 --- a/pkg/scheduler/framework/util.go +++ b/pkg/scheduler/framework/util.go @@ -17,8 +17,6 @@ limitations under the License. package framework import ( - "fmt" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/labels" @@ -267,23 +265,22 @@ func (nl *NodeLister) List() ([]*v1.Node, error) { } // ConvertPredicateStatus return predicate status from k8sframework status -func ConvertPredicateStatus(status *k8sframework.Status) (*api.Status, error) { +func ConvertPredicateStatus(status *k8sframework.Status) *api.Status { internalStatus := &api.Status{} if status.Code() == k8sframework.Success { internalStatus.Code = api.Success - return internalStatus, nil + return internalStatus } else if status.Code() == k8sframework.Unschedulable { internalStatus.Code = api.Unschedulable internalStatus.Reason = status.Message() - return internalStatus, nil + return internalStatus } else if status.Code() == k8sframework.UnschedulableAndUnresolvable { internalStatus.Code = api.UnschedulableAndUnresolvable internalStatus.Reason = status.Message() - return internalStatus, nil + return internalStatus } else { internalStatus.Code = api.Error internalStatus.Reason = status.Message() - return internalStatus, fmt.Errorf("Convert predicate status error, k8s status code is %d, Reason is %s", - status.Code(), status.Message()) + return internalStatus } } diff --git a/pkg/scheduler/plugins/predicates/predicates.go b/pkg/scheduler/plugins/predicates/predicates.go index c5c89208ae..e1ae67b90b 100644 --- a/pkg/scheduler/plugins/predicates/predicates.go +++ b/pkg/scheduler/plugins/predicates/predicates.go @@ -411,45 +411,35 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { Reason: api.NodePodNumberExceeded, } predicateStatus = append(predicateStatus, podsNumStatus) - return predicateStatus, nil } predicateByStablefilter := func(pod *v1.Pod, nodeInfo *k8sframework.NodeInfo) ([]*api.Status, bool, error) { // CheckNodeUnschedulable predicateStatus := make([]*api.Status, 0) status := nodeUnscheduleFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - nodeUnscheduleStatus, err := framework.ConvertPredicateStatus(status) - if err != nil { - return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeunschedulable.Name, status.Message()) - } + nodeUnscheduleStatus := framework.ConvertPredicateStatus(status) if nodeUnscheduleStatus.Code != api.Success { predicateStatus = append(predicateStatus, nodeUnscheduleStatus) - return predicateStatus, false, nil + return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeUnscheduleFilter.Name(), status.Message()) } // Check NodeAffinity if predicate.nodeAffinityEnable { status := nodeAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - nodeAffinityStatus, err := framework.ConvertPredicateStatus(status) - if err != nil { - return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeaffinity.Name, status.Message()) - } + nodeAffinityStatus := framework.ConvertPredicateStatus(status) if nodeAffinityStatus.Code != api.Success { predicateStatus = append(predicateStatus, nodeAffinityStatus) - return predicateStatus, false, nil + return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", nodeAffinityFilter.Name(), status.Message()) } } // PodToleratesNodeTaints: TaintToleration if predicate.taintTolerationEnable { status := tolerationFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - tolerationStatus, err := framework.ConvertPredicateStatus(status) - if err != nil { - return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", tainttoleration.Name, status.Message()) - } + tolerationStatus := framework.ConvertPredicateStatus(status) if tolerationStatus.Code != api.Success { predicateStatus = append(predicateStatus, tolerationStatus) - return predicateStatus, false, nil + return predicateStatus, false, fmt.Errorf("plugin %s predicates failed %s", tolerationFilter.Name(), status.Message()) } } @@ -482,65 +472,50 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { // Check NodePort if predicate.nodePortEnable { status := nodePortFilter.Filter(context.TODO(), state, nil, nodeInfo) - nodePortStatus, err := framework.ConvertPredicateStatus(status) - if err != nil { - return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodeports.Name, status.Message()) - } + nodePortStatus := framework.ConvertPredicateStatus(status) if nodePortStatus.Code != api.Success { predicateStatus = append(predicateStatus, nodePortStatus) - return predicateStatus, nil + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodePortFilter.Name(), status.Message()) } } // Check PodAffinity if predicate.podAffinityEnable { status := podAffinityFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - podAffinityStatus, err := framework.ConvertPredicateStatus(status) - if err != nil { - return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", interpodaffinity.Name, status.Message()) - } + podAffinityStatus := framework.ConvertPredicateStatus(status) if podAffinityStatus.Code != api.Success { predicateStatus = append(predicateStatus, podAffinityStatus) - return predicateStatus, nil + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", podAffinityFilter.Name(), status.Message()) } } // Check NodeVolumeLimits if predicate.nodeVolumeLimitsEnable { status := nodeVolumeLimitsCSIFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - nodeVolumeStatus, err := framework.ConvertPredicateStatus(status) - if err != nil { - return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodeVolumeLimitsCSIFilter.Name(), status.Message()) - } + nodeVolumeStatus := framework.ConvertPredicateStatus(status) if nodeVolumeStatus.Code != api.Success { predicateStatus = append(predicateStatus, nodeVolumeStatus) - return predicateStatus, nil + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", nodeVolumeLimitsCSIFilter.Name(), status.Message()) } } // Check VolumeZone if predicate.volumeZoneEnable { status := volumeZoneFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - volumeZoneStatus, err := framework.ConvertPredicateStatus(status) - if err != nil { - return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message()) - } + volumeZoneStatus := framework.ConvertPredicateStatus(status) if volumeZoneStatus.Code != api.Success { predicateStatus = append(predicateStatus, volumeZoneStatus) - return predicateStatus, nil + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", volumeZoneFilter.Name(), status.Message()) } } // Check PodTopologySpread if predicate.podTopologySpreadEnable { status := podTopologySpreadFilter.Filter(context.TODO(), state, task.Pod, nodeInfo) - podTopologyStatus, err := framework.ConvertPredicateStatus(status) - if err != nil { - return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) - } + podTopologyStatus := framework.ConvertPredicateStatus(status) if podTopologyStatus.Code != api.Success { predicateStatus = append(predicateStatus, podTopologyStatus) - return predicateStatus, nil + return predicateStatus, fmt.Errorf("plugin %s predicates failed %s", podTopologySpreadFilter.Name(), status.Message()) } } @@ -556,7 +531,7 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { } if filterNodeStatus.Code != api.Success { predicateStatus = append(predicateStatus, filterNodeStatus) - return predicateStatus, nil + return predicateStatus, fmt.Errorf("plugin device filternode predicates failed %s", msg) } } else { klog.Warningf("Devices %s assertion conversion failed, skip", val) @@ -569,12 +544,9 @@ func (pp *predicatesPlugin) OnSessionOpen(ssn *framework.Session) { if predicate.proportionalEnable { // Check ProportionalPredicate proportionalStatus, err := checkNodeResourceIsProportional(task, node, predicate.proportional) - if err != nil { - return predicateStatus, err - } if proportionalStatus.Code != api.Success { predicateStatus = append(predicateStatus, proportionalStatus) - return predicateStatus, nil + return predicateStatus, err } klog.V(4).Infof("checkNodeResourceIsProportional predicates Task <%s/%s> on Node <%s>: fit %v", task.Namespace, task.Name, node.Name, fit) diff --git a/pkg/scheduler/plugins/predicates/proportional.go b/pkg/scheduler/plugins/predicates/proportional.go index 469631e220..65322274e5 100644 --- a/pkg/scheduler/plugins/predicates/proportional.go +++ b/pkg/scheduler/plugins/predicates/proportional.go @@ -26,10 +26,11 @@ import ( // checkNodeResourceIsProportional checks if a gpu:cpu:memory is Proportional func checkNodeResourceIsProportional(task *api.TaskInfo, node *api.NodeInfo, proportional map[v1.ResourceName]baseResource) (*api.Status, error) { - status := &api.Status{} + status := &api.Status{ + Code: api.Success, + } for resourceName := range proportional { if value, found := task.Resreq.ScalarResources[resourceName]; found && value > 0 { - status.Code = api.Success return status, nil } } @@ -40,7 +41,7 @@ func checkNodeResourceIsProportional(task *api.TaskInfo, node *api.NodeInfo, pro memoryReserved := value * resourceRate.Memory * 1000 * 1000 if node.Idle.MilliCPU-task.Resreq.MilliCPU < cpuReserved || node.Idle.Memory-task.Resreq.Memory < memoryReserved { - status.Code = api.Unschedulable + status.Code = api.UnschedulableAndUnresolvable status.Reason = fmt.Sprintf("proportional of resource %s check failed", resourceName) return status, fmt.Errorf("proportional of resource %s check failed", resourceName) } diff --git a/pkg/scheduler/plugins/predicates/proportional_test.go b/pkg/scheduler/plugins/predicates/proportional_test.go index a893b2ac78..7500d714ee 100644 --- a/pkg/scheduler/plugins/predicates/proportional_test.go +++ b/pkg/scheduler/plugins/predicates/proportional_test.go @@ -65,7 +65,7 @@ func Test_checkNodeResourceIsProportional(t *testing.T) { node: n1, proportional: proportional, }, - api.Unschedulable, + api.UnschedulableAndUnresolvable, true, }, { diff --git a/pkg/scheduler/util/predicate_helper.go b/pkg/scheduler/util/predicate_helper.go index a9b2cff65b..ff3a3e1995 100644 --- a/pkg/scheduler/util/predicate_helper.go +++ b/pkg/scheduler/util/predicate_helper.go @@ -154,6 +154,9 @@ func (s StatusSets) Message() string { } all := make([]string, 0, len(s)) for _, status := range s { + if status.Reason == "" { + continue + } all = append(all, status.Reason) } return strings.Join(all, ",") @@ -166,6 +169,9 @@ func (s StatusSets) Reasons() []string { } all := make([]string, 0, len(s)) for _, status := range s { + if status.Reason == "" { + continue + } all = append(all, status.Reason) } return all