From 17ef915077edf379e598c2e3c34b608e92b4519c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Mon, 30 Sep 2024 20:49:05 +0200 Subject: [PATCH] DRA: remove AddNode from ClusterSnapshot AddNodeInfo already provides the same functionality, and has to be used in production code in order to propagate DRA objects correctly. Uses in production are replaced with Initialize(), which will later take DRA objects into account. Uses in the test code are replaced with AddNodeInfo(). --- .../filter_out_schedulable_test.go | 8 ++---- .../core/scaledown/actuation/actuator.go | 19 +++---------- cluster-autoscaler/core/static_autoscaler.go | 18 ++----------- .../estimator/binpacking_estimator_test.go | 6 +++-- .../pod_injection_processor_test.go | 7 +++-- .../simulator/clustersnapshot/basic.go | 9 ++----- .../clustersnapshot/clustersnapshot.go | 3 +-- .../clustersnapshot_benchmark_test.go | 13 ++++----- .../clustersnapshot/clustersnapshot_test.go | 27 +++++++++---------- .../simulator/clustersnapshot/delta.go | 9 ++----- .../simulator/clustersnapshot/test_utils.go | 3 ++- .../predicatechecker/schedulerbased_test.go | 6 ++--- 12 files changed, 43 insertions(+), 85 deletions(-) diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go index 7ac2372a1771..6400e9f60359 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go @@ -183,16 +183,12 @@ func TestFilterOutSchedulable(t *testing.T) { allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...) for node, pods := range tc.nodesWithPods { - err := clusterSnapshot.AddNode(node) - assert.NoError(t, err) - for _, pod := range pods { pod.Spec.NodeName = node.Name - err = clusterSnapshot.AddPod(pod, node.Name) - assert.NoError(t, err) - allExpectedScheduledPods = append(allExpectedScheduledPods, pod) } + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...)) + assert.NoError(t, err) } clusterSnapshot.Fork() diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index b02e2016aaab..e5e7a9928fb1 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -356,7 +356,6 @@ func (a *Actuator) taintNode(node *apiv1.Node) error { } func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) { - knownNodes := make(map[string]bool) snapshot := clustersnapshot.NewBasicClusterSnapshot() pods, err := a.ctx.AllPodLister().List() if err != nil { @@ -366,22 +365,10 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS scheduledPods := kube_util.ScheduledPods(pods) nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff) - for _, node := range nodes { - if err := snapshot.AddNode(node); err != nil { - return nil, err - } - - knownNodes[node.Name] = true - } - - for _, pod := range nonExpendableScheduledPods { - if knownNodes[pod.Spec.NodeName] { - if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil { - return nil, err - } - } + err = snapshot.Initialize(nodes, nonExpendableScheduledPods) + if err != nil { + return nil, err } - return snapshot, nil } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index c9f9c6310ceb..b413d26093b7 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -244,22 +244,8 @@ func (a *StaticAutoscaler) cleanUpIfRequired() { func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError { a.ClusterSnapshot.Clear() - - knownNodes := make(map[string]bool) - for _, node := range nodes { - if err := a.ClusterSnapshot.AddNode(node); err != nil { - klog.Errorf("Failed to add node %s to cluster snapshot: %v", node.Name, err) - return caerrors.ToAutoscalerError(caerrors.InternalError, err) - } - knownNodes[node.Name] = true - } - for _, pod := range scheduledPods { - if knownNodes[pod.Spec.NodeName] { - if err := a.ClusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil { - klog.Errorf("Failed to add pod %s scheduled to node %s to cluster snapshot: %v", pod.Name, pod.Spec.NodeName, err) - return caerrors.ToAutoscalerError(caerrors.InternalError, err) - } - } + if err := a.ClusterSnapshot.Initialize(nodes, scheduledPods); err != nil { + return caerrors.ToAutoscalerError(caerrors.InternalError, err) } return nil } diff --git a/cluster-autoscaler/estimator/binpacking_estimator_test.go b/cluster-autoscaler/estimator/binpacking_estimator_test.go index baeebb1f1eb0..0c7a785e82a2 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator_test.go +++ b/cluster-autoscaler/estimator/binpacking_estimator_test.go @@ -214,7 +214,8 @@ func TestBinpackingEstimate(t *testing.T) { t.Run(tc.name, func(t *testing.T) { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() // Add one node in different zone to trigger topology spread constraints - clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter")) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))) + assert.NoError(t, err) predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(t, err) @@ -268,7 +269,8 @@ func BenchmarkBinpackingEstimate(b *testing.B) { for i := 0; i < b.N; i++ { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter")) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))) + assert.NoError(b, err) predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(b, err) diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go index d2f96b244585..13a98c8d78c8 100644 --- a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" ) @@ -112,10 +113,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry()) clusterSnapshot := clustersnapshot.NewDeltaClusterSnapshot() - clusterSnapshot.AddNode(node) - for _, pod := range tc.scheduledPods { - clusterSnapshot.AddPod(pod, node.Name) - } + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...)) + assert.NoError(t, err) ctx := context.AutoscalingContext{ AutoscalingKubeClients: context.AutoscalingKubeClients{ ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister), diff --git a/cluster-autoscaler/simulator/clustersnapshot/basic.go b/cluster-autoscaler/simulator/clustersnapshot/basic.go index b0b96c2e3108..af5b12c9972d 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/basic.go +++ b/cluster-autoscaler/simulator/clustersnapshot/basic.go @@ -238,14 +238,14 @@ func (snapshot *BasicClusterSnapshot) Initialize(nodes []*apiv1.Node, scheduledP knownNodes := make(map[string]bool) for _, node := range nodes { - if err := snapshot.AddNode(node); err != nil { + if err := snapshot.getInternalData().addNode(node); err != nil { return err } knownNodes[node.Name] = true } for _, pod := range scheduledPods { if knownNodes[pod.Spec.NodeName] { - if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil { + if err := snapshot.getInternalData().addPod(pod, pod.Spec.NodeName); err != nil { return err } } @@ -253,11 +253,6 @@ func (snapshot *BasicClusterSnapshot) Initialize(nodes []*apiv1.Node, scheduledP return nil } -// AddNode adds node to the snapshot. -func (snapshot *BasicClusterSnapshot) AddNode(node *apiv1.Node) error { - return snapshot.getInternalData().addNode(node) -} - // RemoveNode removes nodes (and pods scheduled to it) from the snapshot. func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error { return snapshot.getInternalData().removeNode(nodeName) diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go index a69f840e047f..762534a241fc 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go @@ -33,8 +33,7 @@ type ClusterSnapshot interface { // Initialize clears the snapshot and initializes it with real objects from the cluster - Nodes, // scheduled pods. Initialize(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error - // AddNode adds node to the snapshot. - AddNode(node *apiv1.Node) error + // RemoveNode removes nodes (and pods scheduled to it) from the snapshot. RemoveNode(nodeName string) error // AddPod adds pod to the snapshot and schedules it to given node. diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go index ec2f73db6e6e..cd681ae0287a 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" apiv1 "k8s.io/api/core/v1" @@ -67,7 +68,7 @@ func assignPodsToNodes(pods []*apiv1.Pod, nodes []*apiv1.Node) { } } -func BenchmarkAddNode(b *testing.B) { +func BenchmarkAddNodeInfo(b *testing.B) { testCases := []int{1, 10, 100, 1000, 5000, 15000, 100000} for snapshotName, snapshotFactory := range snapshots { @@ -75,13 +76,13 @@ func BenchmarkAddNode(b *testing.B) { nodes := createTestNodes(tc) clusterSnapshot := snapshotFactory() b.ResetTimer() - b.Run(fmt.Sprintf("%s: AddNode() %d", snapshotName, tc), func(b *testing.B) { + b.Run(fmt.Sprintf("%s: AddNodeInfo() %d", snapshotName, tc), func(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() clusterSnapshot.Clear() b.StartTimer() for _, node := range nodes { - err := clusterSnapshot.AddNode(node) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) if err != nil { assert.NoError(b, err) } @@ -171,12 +172,12 @@ func BenchmarkForkAddRevert(b *testing.B) { b.Run(fmt.Sprintf("%s: ForkAddRevert (%d nodes, %d pods)", snapshotName, ntc, ptc), func(b *testing.B) { for i := 0; i < b.N; i++ { clusterSnapshot.Fork() - err = clusterSnapshot.AddNode(tmpNode1) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode1)) if err != nil { assert.NoError(b, err) } clusterSnapshot.Fork() - err = clusterSnapshot.AddNode(tmpNode2) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode2)) if err != nil { assert.NoError(b, err) } @@ -216,7 +217,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { } snapshot.Fork() for _, node := range nodes[tc.nodeCount:] { - if err := snapshot.AddNode(node); err != nil { + if err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)); err != nil { assert.NoError(b, err) } } diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go index ab1c362cb4ee..22adf2965d10 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go @@ -94,9 +94,9 @@ func validTestCases(t *testing.T) []modificationTestCase { testCases := []modificationTestCase{ { - name: "add node", + name: "add empty nodeInfo", op: func(snapshot ClusterSnapshot) { - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) }, modifiedState: snapshotState{ @@ -133,7 +133,7 @@ func validTestCases(t *testing.T) []modificationTestCase { err := snapshot.RemoveNode(node.Name) assert.NoError(t, err) - err = snapshot.AddNode(node) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) }, modifiedState: snapshotState{ @@ -199,7 +199,7 @@ func TestForking(t *testing.T) { tc.op(snapshot) snapshot.Fork() - snapshot.AddNode(node) + snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) snapshot.Revert() snapshot.Revert() @@ -243,7 +243,7 @@ func TestForking(t *testing.T) { snapshot.Fork() tc.op(snapshot) snapshot.Fork() - snapshot.AddNode(node) + snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) snapshot.Revert() err := snapshot.Commit() assert.NoError(t, err) @@ -321,7 +321,7 @@ func TestClear(t *testing.T) { snapshot.Fork() for _, node := range extraNodes { - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) } @@ -379,7 +379,7 @@ func TestNode404(t *testing.T) { snapshot := snapshotFactory() node := BuildTestNode("node", 10, 100) - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) snapshot.Fork() @@ -405,7 +405,7 @@ func TestNode404(t *testing.T) { snapshot := snapshotFactory() node := BuildTestNode("node", 10, 100) - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) err = snapshot.RemoveNode("node") @@ -428,9 +428,6 @@ func TestNodeAlreadyExists(t *testing.T) { name string op func(ClusterSnapshot) error }{ - {"add node", func(snapshot ClusterSnapshot) error { - return snapshot.AddNode(node) - }}, {"add nodeInfo", func(snapshot ClusterSnapshot) error { return snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod)) }}, @@ -442,7 +439,7 @@ func TestNodeAlreadyExists(t *testing.T) { func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) // Node already in base. @@ -454,7 +451,7 @@ func TestNodeAlreadyExists(t *testing.T) { func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) snapshot.Fork() @@ -471,7 +468,7 @@ func TestNodeAlreadyExists(t *testing.T) { snapshot.Fork() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) // Node already in fork. @@ -484,7 +481,7 @@ func TestNodeAlreadyExists(t *testing.T) { snapshot.Fork() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) err = snapshot.Commit() diff --git a/cluster-autoscaler/simulator/clustersnapshot/delta.go b/cluster-autoscaler/simulator/clustersnapshot/delta.go index 553a3fb556f7..7aed12a1aea7 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/delta.go @@ -427,14 +427,14 @@ func (snapshot *DeltaClusterSnapshot) Initialize(nodes []*apiv1.Node, scheduledP knownNodes := make(map[string]bool) for _, node := range nodes { - if err := snapshot.AddNode(node); err != nil { + if err := snapshot.data.addNode(node); err != nil { return err } knownNodes[node.Name] = true } for _, pod := range scheduledPods { if knownNodes[pod.Spec.NodeName] { - if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil { + if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil { return err } } @@ -442,11 +442,6 @@ func (snapshot *DeltaClusterSnapshot) Initialize(nodes []*apiv1.Node, scheduledP return nil } -// AddNode adds node to the snapshot. -func (snapshot *DeltaClusterSnapshot) AddNode(node *apiv1.Node) error { - return snapshot.data.addNode(node) -} - // RemoveNode removes nodes (and pods scheduled to it) from the snapshot. func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error { return snapshot.data.removeNode(nodeName) diff --git a/cluster-autoscaler/simulator/clustersnapshot/test_utils.go b/cluster-autoscaler/simulator/clustersnapshot/test_utils.go index 501756fe2438..0307ae78bca9 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/test_utils.go +++ b/cluster-autoscaler/simulator/clustersnapshot/test_utils.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" ) // InitializeClusterSnapshotOrDie clears cluster snapshot and then initializes it with given set of nodes and pods. @@ -35,7 +36,7 @@ func InitializeClusterSnapshotOrDie( snapshot.Clear() for _, node := range nodes { - err = snapshot.AddNode(node) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err, "error while adding node %s", node.Name) } diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go b/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go index c92e4d211e96..d5423777f711 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go +++ b/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go @@ -248,9 +248,9 @@ func TestFitsAnyNode(t *testing.T) { } clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err = clusterSnapshot.AddNode(n1000) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n1000)) assert.NoError(t, err) - err = clusterSnapshot.AddNode(n2000) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) for _, tc := range testCases { @@ -286,7 +286,7 @@ func TestDebugInfo(t *testing.T) { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err := clusterSnapshot.AddNode(node1) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) // with default predicate checker