diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go index 5a0953009ad0..e38e63bcbb21 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner.go @@ -44,10 +44,10 @@ func NewSchedulerPluginRunner(fwHandle *framework.Handle, snapshot clustersnapsh // function - until a Node where the Filters pass is found. Filters are only run for matching Nodes. If no matching Node with passing Filters is found, an error is returned. // // The node iteration always starts from the next Node from the last Node that was found by this method. TODO: Extract the iteration strategy out of SchedulerPluginRunner. -func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { +func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeMatches func(*framework.NodeInfo) bool) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) { nodeInfosList, err := p.snapshot.ListNodeInfos() if err != nil { - return "", clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err)) + return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error listing NodeInfos: %v", err)) } p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) @@ -61,7 +61,7 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM if !preFilterStatus.IsSuccess() { // If any of the plugin PreFilter methods isn't successful, the corresponding Filter method can't be run, so the whole scheduling cycle is aborted. // Match that behavior here. - return "", clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") + return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") } for i := range nodeInfosList { @@ -92,18 +92,18 @@ func (p *SchedulerPluginRunner) RunFiltersUntilPassingNode(pod *apiv1.Pod, nodeM if filterStatus.IsSuccess() { // Filter passed for all plugins, so this pod can be scheduled on this Node. p.lastIndex = (p.lastIndex + i + 1) % len(nodeInfosList) - return nodeInfo.Node().Name, nil + return nodeInfo.Node(), state, nil } // Filter didn't pass for some plugin, so this Node won't work - move on to the next one. } - return "", clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod) + return nil, nil, clustersnapshot.NewNoNodesPassingPredicatesFoundError(pod) } // RunFiltersOnNode runs the scheduler framework PreFilter and Filter phases to check if the given pod can be scheduled on the given node. -func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { +func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string) (*apiv1.Node, *schedulerframework.CycleState, clustersnapshot.SchedulingError) { nodeInfo, err := p.snapshot.GetNodeInfo(nodeName) if err != nil { - return clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) + return nil, nil, clustersnapshot.NewSchedulingInternalError(pod, fmt.Sprintf("error obtaining NodeInfo for name %q: %v", nodeName, err)) } p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) @@ -113,10 +113,10 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string // Run the PreFilter phase of the framework for the Pod and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info. preFilterResult, preFilterStatus, _ := p.fwHandle.Framework.RunPreFilterPlugins(context.TODO(), state, pod) if !preFilterStatus.IsSuccess() { - return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") + return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter failed", "") } if !preFilterResult.AllNodes() && !preFilterResult.NodeNames.Has(nodeInfo.Node().Name) { - return clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter filtered the Node out", "") + return nil, nil, clustersnapshot.NewFailingPredicateError(pod, preFilterStatus.Plugin(), preFilterStatus.Reasons(), "PreFilter filtered the Node out", "") } // Run the Filter phase of the framework for the Pod and the Node and check the results. See the corresponding comments in RunFiltersUntilPassingNode() for more info. @@ -128,10 +128,22 @@ func (p *SchedulerPluginRunner) RunFiltersOnNode(pod *apiv1.Pod, nodeName string if !filterStatus.IsRejected() { unexpectedErrMsg = fmt.Sprintf("unexpected filter status %q", filterStatus.Code().String()) } - return clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo)) + return nil, nil, clustersnapshot.NewFailingPredicateError(pod, filterName, filterReasons, unexpectedErrMsg, p.failingFilterDebugInfo(filterName, nodeInfo)) } // PreFilter and Filter phases checked, this Pod can be scheduled on this Node. + return nodeInfo.Node(), state, nil +} + +// RunReserveOnNode runs the scheduler framework Reserve phase to update the scheduler plugins state to reflect the Pod being scheduled on the Node. +func (p *SchedulerPluginRunner) RunReserveOnNode(pod *apiv1.Pod, nodeName string, postFilterState *schedulerframework.CycleState) error { + p.fwHandle.DelegatingLister.UpdateDelegate(p.snapshot) + defer p.fwHandle.DelegatingLister.ResetDelegate() + + status := p.fwHandle.Framework.RunReservePluginsReserve(context.Background(), postFilterState, pod, nodeName) + if !status.IsSuccess() { + return fmt.Errorf("couldn't reserve node %s for pod %s/%s: %v", nodeName, pod.Namespace, pod.Name, status.Message()) + } return nil } diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go index 3f60c78158de..abebaa3aa7dc 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/plugin_runner_test.go @@ -144,8 +144,10 @@ func TestRunFiltersOnNode(t *testing.T) { err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) assert.NoError(t, err) - predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name) + node, state, predicateError := pluginRunner.RunFiltersOnNode(tt.testPod, tt.node.Name) if tt.expectError { + assert.Nil(t, node) + assert.Nil(t, state) assert.NotNil(t, predicateError) assert.Equal(t, clustersnapshot.FailingPredicateError, predicateError.Type()) assert.Equal(t, "NodeResourcesFit", predicateError.FailingPredicateName()) @@ -154,6 +156,8 @@ func TestRunFiltersOnNode(t *testing.T) { assert.Contains(t, predicateError.Error(), "Insufficient cpu") } else { assert.Nil(t, predicateError) + assert.NotNil(t, state) + assert.Equal(t, tt.node, node) } }) } @@ -243,12 +247,15 @@ func TestRunFilterUntilPassingNode(t *testing.T) { err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) - nodeName, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true }) + node, state, err := pluginRunner.RunFiltersUntilPassingNode(tc.pod, func(info *framework.NodeInfo) bool { return true }) if tc.expectError { + assert.Nil(t, node) + assert.Nil(t, state) assert.Error(t, err) } else { assert.NoError(t, err) - assert.Contains(t, tc.expectedNodes, nodeName) + assert.NotNil(t, state) + assert.Contains(t, tc.expectedNodes, node.Name) } }) } @@ -278,7 +285,7 @@ func TestDebugInfo(t *testing.T) { err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) - predicateErr := defaultPluginRunner.RunFiltersOnNode(p1, "n1") + _, _, predicateErr := defaultPluginRunner.RunFiltersOnNode(p1, "n1") assert.NotNil(t, predicateErr) assert.Contains(t, predicateErr.FailingPredicateReasons(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") assert.Contains(t, predicateErr.Error(), "node(s) had untolerated taint {SomeTaint: WhyNot?}") @@ -308,7 +315,7 @@ func TestDebugInfo(t *testing.T) { err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) - predicateErr = customPluginRunner.RunFiltersOnNode(p1, "n1") + _, _, predicateErr = customPluginRunner.RunFiltersOnNode(p1, "n1") assert.Nil(t, predicateErr) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go index dd57b47ffa39..d59fc9bfa135 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/predicate/predicate_snapshot.go @@ -74,7 +74,7 @@ func (s *PredicateSnapshot) RemoveNodeInfo(nodeName string) error { // SchedulePod adds pod to the snapshot and schedules it to given node. func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - if schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil { + if _, _, schedErr := s.pluginRunner.RunFiltersOnNode(pod, nodeName); schedErr != nil { return schedErr } if err := s.ClusterSnapshotStore.ForceAddPod(pod, nodeName); err != nil { @@ -85,14 +85,14 @@ func (s *PredicateSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) cluster // SchedulePodOnAnyNodeMatching adds pod to the snapshot and schedules it to any node matching the provided function. func (s *PredicateSnapshot) SchedulePodOnAnyNodeMatching(pod *apiv1.Pod, anyNodeMatching func(*framework.NodeInfo) bool) (string, clustersnapshot.SchedulingError) { - nodeName, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching) + node, _, schedErr := s.pluginRunner.RunFiltersUntilPassingNode(pod, anyNodeMatching) if schedErr != nil { return "", schedErr } - if err := s.ClusterSnapshotStore.ForceAddPod(pod, nodeName); err != nil { + if err := s.ClusterSnapshotStore.ForceAddPod(pod, node.Name); err != nil { return "", clustersnapshot.NewSchedulingInternalError(pod, err.Error()) } - return nodeName, nil + return node.Name, nil } // UnschedulePod removes the given Pod from the given Node inside the snapshot. @@ -102,5 +102,6 @@ func (s *PredicateSnapshot) UnschedulePod(namespace string, podName string, node // CheckPredicates checks whether scheduler predicates pass for the given pod on the given node. func (s *PredicateSnapshot) CheckPredicates(pod *apiv1.Pod, nodeName string) clustersnapshot.SchedulingError { - return s.pluginRunner.RunFiltersOnNode(pod, nodeName) + _, _, err := s.pluginRunner.RunFiltersOnNode(pod, nodeName) + return err }