From 7a71ee402b6dc103b814b7c647a7e2963e316145 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Mon, 30 Sep 2024 20:49:05 +0200 Subject: [PATCH] DRA: remove AddNode from ClusterSnapshot AddNodeInfo already provides the same functionality, and has to be used in production code in order to propagate DRA objects correctly. Uses in production are replaced with SetClusterState(), which will later take DRA objects into account. Uses in the test code are replaced with AddNodeInfo(). --- .../filter_out_schedulable_test.go | 8 ++--- .../core/scaledown/actuation/actuator.go | 19 ++---------- cluster-autoscaler/core/static_autoscaler.go | 30 +++---------------- .../estimator/binpacking_estimator_test.go | 6 ++-- .../pod_injection_processor_test.go | 7 ++--- .../simulator/clustersnapshot/basic.go | 9 ++---- .../clustersnapshot/clustersnapshot.go | 2 -- .../clustersnapshot_benchmark_test.go | 13 ++++---- .../clustersnapshot/clustersnapshot_test.go | 27 ++++++++--------- .../simulator/clustersnapshot/delta.go | 9 ++---- .../simulator/clustersnapshot/test_utils.go | 3 +- .../predicatechecker/schedulerbased_test.go | 6 ++-- 12 files changed, 44 insertions(+), 95 deletions(-) diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go index a912368ed4ea..7b0054f9a2f2 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go @@ -184,16 +184,12 @@ func TestFilterOutSchedulable(t *testing.T) { allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...) for node, pods := range tc.nodesWithPods { - err := clusterSnapshot.AddNode(node) - assert.NoError(t, err) - for _, pod := range pods { pod.Spec.NodeName = node.Name - err = clusterSnapshot.AddPod(pod, node.Name) - assert.NoError(t, err) - allExpectedScheduledPods = append(allExpectedScheduledPods, pod) } + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...)) + assert.NoError(t, err) } clusterSnapshot.Fork() diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index b02e2016aaab..a85410172684 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -356,7 +356,6 @@ func (a *Actuator) taintNode(node *apiv1.Node) error { } func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) { - knownNodes := make(map[string]bool) snapshot := clustersnapshot.NewBasicClusterSnapshot() pods, err := a.ctx.AllPodLister().List() if err != nil { @@ -366,22 +365,10 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS scheduledPods := kube_util.ScheduledPods(pods) nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff) - for _, node := range nodes { - if err := snapshot.AddNode(node); err != nil { - return nil, err - } - - knownNodes[node.Name] = true - } - - for _, pod := range nonExpendableScheduledPods { - if knownNodes[pod.Spec.NodeName] { - if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil { - return nil, err - } - } + err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods) + if err != nil { + return nil, err } - return snapshot, nil } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index c9f9c6310ceb..703d4ce1518f 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -34,7 +34,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner" scaledownstatus "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup" - orchestrator "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" + "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/estimator" @@ -58,7 +58,7 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) const ( @@ -242,28 +242,6 @@ func (a *StaticAutoscaler) cleanUpIfRequired() { a.initialized = true } -func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError { - a.ClusterSnapshot.Clear() - - knownNodes := make(map[string]bool) - for _, node := range nodes { - if err := a.ClusterSnapshot.AddNode(node); err != nil { - klog.Errorf("Failed to add node %s to cluster snapshot: %v", node.Name, err) - return caerrors.ToAutoscalerError(caerrors.InternalError, err) - } - knownNodes[node.Name] = true - } - for _, pod := range scheduledPods { - if knownNodes[pod.Spec.NodeName] { - if err := a.ClusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil { - klog.Errorf("Failed to add pod %s scheduled to node %s to cluster snapshot: %v", pod.Name, pod.Spec.NodeName, err) - return caerrors.ToAutoscalerError(caerrors.InternalError, err) - } - } - } - return nil -} - func (a *StaticAutoscaler) initializeRemainingPdbTracker() caerrors.AutoscalerError { a.RemainingPdbTracker.Clear() @@ -361,8 +339,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr } nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff) // Initialize cluster state to ClusterSnapshot - if typedErr := a.initializeClusterSnapshot(allNodes, nonExpendableScheduledPods); typedErr != nil { - return typedErr.AddPrefix("failed to initialize ClusterSnapshot: ") + if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil { + return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ") } // Initialize Pod Disruption Budget tracking if typedErr := a.initializeRemainingPdbTracker(); typedErr != nil { diff --git a/cluster-autoscaler/estimator/binpacking_estimator_test.go b/cluster-autoscaler/estimator/binpacking_estimator_test.go index e0fa48aeda10..e0205ffdc854 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator_test.go +++ b/cluster-autoscaler/estimator/binpacking_estimator_test.go @@ -214,7 +214,8 @@ func TestBinpackingEstimate(t *testing.T) { t.Run(tc.name, func(t *testing.T) { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() // Add one node in different zone to trigger topology spread constraints - clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter")) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))) + assert.NoError(t, err) predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(t, err) @@ -268,7 +269,8 @@ func BenchmarkBinpackingEstimate(b *testing.B) { for i := 0; i < b.N; i++ { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter")) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))) + assert.NoError(b, err) predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(b, err) diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go index d2f96b244585..13a98c8d78c8 100644 --- a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" ) @@ -112,10 +113,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry()) clusterSnapshot := clustersnapshot.NewDeltaClusterSnapshot() - clusterSnapshot.AddNode(node) - for _, pod := range tc.scheduledPods { - clusterSnapshot.AddPod(pod, node.Name) - } + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...)) + assert.NoError(t, err) ctx := context.AutoscalingContext{ AutoscalingKubeClients: context.AutoscalingKubeClients{ ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister), diff --git a/cluster-autoscaler/simulator/clustersnapshot/basic.go b/cluster-autoscaler/simulator/clustersnapshot/basic.go index 1728924be7cd..e85ea865b40b 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/basic.go +++ b/cluster-autoscaler/simulator/clustersnapshot/basic.go @@ -238,14 +238,14 @@ func (snapshot *BasicClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched knownNodes := make(map[string]bool) for _, node := range nodes { - if err := snapshot.AddNode(node); err != nil { + if err := snapshot.getInternalData().addNode(node); err != nil { return err } knownNodes[node.Name] = true } for _, pod := range scheduledPods { if knownNodes[pod.Spec.NodeName] { - if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil { + if err := snapshot.getInternalData().addPod(pod, pod.Spec.NodeName); err != nil { return err } } @@ -253,11 +253,6 @@ func (snapshot *BasicClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched return nil } -// AddNode adds node to the snapshot. -func (snapshot *BasicClusterSnapshot) AddNode(node *apiv1.Node) error { - return snapshot.getInternalData().addNode(node) -} - // RemoveNode removes nodes (and pods scheduled to it) from the snapshot. func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error { return snapshot.getInternalData().removeNode(nodeName) diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go index 9696c1f1e0ba..199f1c74f116 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go @@ -34,8 +34,6 @@ type ClusterSnapshot interface { // with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName. SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error - // AddNode adds node to the snapshot. - AddNode(node *apiv1.Node) error // RemoveNode removes nodes (and pods scheduled to it) from the snapshot. RemoveNode(nodeName string) error // AddPod adds pod to the snapshot and schedules it to given node. diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go index 108ce9b73c18..091b7f92f539 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go @@ -23,6 +23,7 @@ import ( "github.com/stretchr/testify/assert" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" apiv1 "k8s.io/api/core/v1" @@ -68,7 +69,7 @@ func assignPodsToNodes(pods []*apiv1.Pod, nodes []*apiv1.Node) { } } -func BenchmarkAddNode(b *testing.B) { +func BenchmarkAddNodeInfo(b *testing.B) { testCases := []int{1, 10, 100, 1000, 5000, 15000, 100000} for snapshotName, snapshotFactory := range snapshots { @@ -76,13 +77,13 @@ func BenchmarkAddNode(b *testing.B) { nodes := createTestNodes(tc) clusterSnapshot := snapshotFactory() b.ResetTimer() - b.Run(fmt.Sprintf("%s: AddNode() %d", snapshotName, tc), func(b *testing.B) { + b.Run(fmt.Sprintf("%s: AddNodeInfo() %d", snapshotName, tc), func(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() clusterSnapshot.Clear() b.StartTimer() for _, node := range nodes { - err := clusterSnapshot.AddNode(node) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) if err != nil { assert.NoError(b, err) } @@ -172,12 +173,12 @@ func BenchmarkForkAddRevert(b *testing.B) { b.Run(fmt.Sprintf("%s: ForkAddRevert (%d nodes, %d pods)", snapshotName, ntc, ptc), func(b *testing.B) { for i := 0; i < b.N; i++ { clusterSnapshot.Fork() - err = clusterSnapshot.AddNode(tmpNode1) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode1)) if err != nil { assert.NoError(b, err) } clusterSnapshot.Fork() - err = clusterSnapshot.AddNode(tmpNode2) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode2)) if err != nil { assert.NoError(b, err) } @@ -217,7 +218,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { } snapshot.Fork() for _, node := range nodes[tc.nodeCount:] { - if err := snapshot.AddNode(node); err != nil { + if err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)); err != nil { assert.NoError(b, err) } } diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go index 24034ef6ade6..f951cf1e4a94 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go @@ -94,9 +94,9 @@ func validTestCases(t *testing.T) []modificationTestCase { testCases := []modificationTestCase{ { - name: "add node", + name: "add empty nodeInfo", op: func(snapshot ClusterSnapshot) { - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) }, modifiedState: snapshotState{ @@ -133,7 +133,7 @@ func validTestCases(t *testing.T) []modificationTestCase { err := snapshot.RemoveNode(node.Name) assert.NoError(t, err) - err = snapshot.AddNode(node) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) }, modifiedState: snapshotState{ @@ -199,7 +199,7 @@ func TestForking(t *testing.T) { tc.op(snapshot) snapshot.Fork() - snapshot.AddNode(node) + snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) snapshot.Revert() snapshot.Revert() @@ -243,7 +243,7 @@ func TestForking(t *testing.T) { snapshot.Fork() tc.op(snapshot) snapshot.Fork() - snapshot.AddNode(node) + snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) snapshot.Revert() err := snapshot.Commit() assert.NoError(t, err) @@ -321,7 +321,7 @@ func TestClear(t *testing.T) { snapshot.Fork() for _, node := range extraNodes { - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) } @@ -379,7 +379,7 @@ func TestNode404(t *testing.T) { snapshot := snapshotFactory() node := BuildTestNode("node", 10, 100) - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) snapshot.Fork() @@ -405,7 +405,7 @@ func TestNode404(t *testing.T) { snapshot := snapshotFactory() node := BuildTestNode("node", 10, 100) - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) err = snapshot.RemoveNode("node") @@ -428,9 +428,6 @@ func TestNodeAlreadyExists(t *testing.T) { name string op func(ClusterSnapshot) error }{ - {"add node", func(snapshot ClusterSnapshot) error { - return snapshot.AddNode(node) - }}, {"add nodeInfo", func(snapshot ClusterSnapshot) error { return snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod)) }}, @@ -442,7 +439,7 @@ func TestNodeAlreadyExists(t *testing.T) { func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) // Node already in base. @@ -454,7 +451,7 @@ func TestNodeAlreadyExists(t *testing.T) { func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) snapshot.Fork() @@ -471,7 +468,7 @@ func TestNodeAlreadyExists(t *testing.T) { snapshot.Fork() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) // Node already in fork. @@ -484,7 +481,7 @@ func TestNodeAlreadyExists(t *testing.T) { snapshot.Fork() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) err = snapshot.Commit() diff --git a/cluster-autoscaler/simulator/clustersnapshot/delta.go b/cluster-autoscaler/simulator/clustersnapshot/delta.go index 1a7bea8c167b..c490d679db41 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/delta.go @@ -427,14 +427,14 @@ func (snapshot *DeltaClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched knownNodes := make(map[string]bool) for _, node := range nodes { - if err := snapshot.AddNode(node); err != nil { + if err := snapshot.data.addNode(node); err != nil { return err } knownNodes[node.Name] = true } for _, pod := range scheduledPods { if knownNodes[pod.Spec.NodeName] { - if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil { + if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil { return err } } @@ -442,11 +442,6 @@ func (snapshot *DeltaClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched return nil } -// AddNode adds node to the snapshot. -func (snapshot *DeltaClusterSnapshot) AddNode(node *apiv1.Node) error { - return snapshot.data.addNode(node) -} - // RemoveNode removes nodes (and pods scheduled to it) from the snapshot. func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error { return snapshot.data.removeNode(nodeName) diff --git a/cluster-autoscaler/simulator/clustersnapshot/test_utils.go b/cluster-autoscaler/simulator/clustersnapshot/test_utils.go index 501756fe2438..0307ae78bca9 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/test_utils.go +++ b/cluster-autoscaler/simulator/clustersnapshot/test_utils.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" ) // InitializeClusterSnapshotOrDie clears cluster snapshot and then initializes it with given set of nodes and pods. @@ -35,7 +36,7 @@ func InitializeClusterSnapshotOrDie( snapshot.Clear() for _, node := range nodes { - err = snapshot.AddNode(node) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err, "error while adding node %s", node.Name) } diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go b/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go index c92e4d211e96..d5423777f711 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go +++ b/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go @@ -248,9 +248,9 @@ func TestFitsAnyNode(t *testing.T) { } clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err = clusterSnapshot.AddNode(n1000) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n1000)) assert.NoError(t, err) - err = clusterSnapshot.AddNode(n2000) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) for _, tc := range testCases { @@ -286,7 +286,7 @@ func TestDebugInfo(t *testing.T) { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err := clusterSnapshot.AddNode(node1) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) // with default predicate checker