Skip to content

Commit

Permalink
CA: extend SchedulerPluginRunner with RunReserveOnNode
Browse files Browse the repository at this point in the history
RunReserveOnNode runs the Reserve phase of schedulerframework,
which is necessary to obtain ResourceClaim allocations computed
by the DRA scheduler plugin.

RunReserveOnNode isn't used anywhere yet, so this should be a no-op.
  • Loading branch information
towca committed Nov 25, 2024
1 parent 55bc94b commit ed63ac6
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapsh
// function - until a Node where the filters pass is found. Filters are only run for matching Nodes. If no matching node with passing filters is found, an error is returned.
//
// The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner.
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) {
func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) {
nodeInfosList, err := p.snapshot.ListNodeInfos()
if err != nil {
return "", clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided")
return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, "ClusterSnapshot not provided")
}

p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
Expand All @@ -56,7 +56,7 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
state := schedulerframework.NewCycleState()
preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod)
if !preFilterStatus.IsSuccess() {
return "", clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
}

for i := range nodeInfosList {
Expand All @@ -77,17 +77,17 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM
filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler())
if filterStatus.IsSuccess() {
p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList)
return nodeInfo.Node().Name, nil
return nodeInfo.Node(), state, nil
}
}
return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod)
return nil, nil, clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod)
}

// RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node.
func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) {
nodeInfo, err := p.snapshot.GetNodeInfo(nodeName)
if err != nil {
return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err))
return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err))
}

p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
Expand All @@ -96,7 +96,7 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string
state := schedulerframework.NewCycleState()
_, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod)
if !preFilterStatus.IsSuccess() {
return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "")
}

filterStatus := p.fwHandle.Framework.RunFilterPlugins(context.TODO(), state, pod, nodeInfo.ToScheduler())
Expand All @@ -108,9 +108,21 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string
if !filterStatus.IsRejected() {
unexpectedErrMsg = fmt.Sprintf("unexpected filter status %q", filterStatus.Code().String())
}
return clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo))
return nil, nil, clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo))
}

return nodeInfo.Node(), state, nil
}

// RunReserveOnNode runs the scheduler framework Reserve phase to update the scheduler plugins state to reflect the Pod being scheduled on the Node.
func (p *SchedulerPluginRunner) RunReserveOnNode(pod *apiv1.Pod, nodeName string, postFilterState *schedulerframework.CycleState) error {
p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot)
defer p.fwHandle.DelegatingLister.ResetDelegate()

status := p.fwHandle.Framework.RunReservePluginsReserve(context.Background(), postFilterState, pod, nodeName)
if !status.IsSuccess() {
return fmt.Errorf("couldn't reserve node %s for pod %s/%s: %v", nodeName, pod.Namespace, pod.Name, status.Message())
}
return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ func TestRunFiltersOnNode(t *testing.T) {
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...))
assert.NoError(t, err)

predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name)
node, state, predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name)
if tt.expectError {
assert.Nil(t, node)
assert.Nil(t, state)
assert.NotNil(t, predicateError)
assert.Equal(t, clustersnapshot.FailingPredicateError, predicateError.Type())
assert.Equal(t, "NodeResourcesFit", predicateError.FailingPredicateName())
Expand All @@ -154,6 +156,8 @@ func TestRunFiltersOnNode(t *testing.T) {
assert.Contains(t, predicateError.Error(), "Insufficient cpu")
} else {
assert.Nil(t, predicateError)
assert.NotNil(t, state)
assert.Equal(t, tt.node, node)
}
})
}
Expand Down Expand Up @@ -243,12 +247,15 @@ func TestRunFilterUntilPassingNode(t *testing.T) {
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000))
assert.NoError(t, err)

nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true })
node, state, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true })
if tc.expectError {
assert.Nil(t, node)
assert.Nil(t, state)
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Contains(t, tc.expectedNodes, nodeName)
assert.NotNil(t, state)
assert.Contains(t, tc.expectedNodes, node.Name)
}
})
}
Expand Down Expand Up @@ -278,7 +285,7 @@ func TestDebugInfo(t *testing.T) {
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1))
assert.NoError(t, err)

predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1")
_, _, predicateErr := defaultPluginnRunner.RunFiltersOnNode(p1, "n1")
assert.NotNil(t, predicateErr)
assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}")
assert.Contains(t, predicateErr.Error(), "node(s) had untolerated taint {SomeTaint: WhyNot?}")
Expand Down Expand Up @@ -308,7 +315,7 @@ func TestDebugInfo(t *testing.T) {
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1))
assert.NoError(t, err)

predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1")
_, _, predicateErr = customPluginnRunner.RunFiltersOnNode(p1, "n1")
assert.Nil(t, predicateErr)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (s *PredicateSnapshot) RemoveNodeInfo(nodeName string) error {

// SchedulePod adds pod to the snapshot and schedules it to given node.
func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
if schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil {
if _, _, schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil {
return schedErr
}
if err := s.ForceAddPod(pod, nodeName); err != nil {
Expand All @@ -85,14 +85,14 @@ func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) cluster

// SchedulePodOnAnyNodeMatching adds pod to the snapshot and schedules it to any node matching the provided function.
func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNodeMatching func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) {
nodeName, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching)
node, _, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching)
if schedErr != nil {
return "", schedErr
}
if err := s.ForceAddPod(pod, nodeName); err != nil {
if err := s.ForceAddPod(pod, node.Name); err != nil {
return "", clustersnapshot.NewSchedulingInternalError(pod, err.Error())
}
return nodeName, nil
return node.Name, nil
}

// UnschedulePod removes the given Pod from the given Node inside the snapshot.
Expand All @@ -102,5 +102,6 @@ func (s *PredicateSnapshot) UnschedulePod(namespace string, podName string, node

// CheckPredicates checks whether scheduler predicates pass for the given pod on the given node.
func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError {
return s.pluginRunner.RunFiltersOnNode(pod, nodeName)
_, _, err := s.pluginRunner.RunFiltersOnNode(pod, nodeName)
return err
}

0 comments on commit ed63ac6

Please sign in to comment.