From acc4c4c39a2cda8aae5b0e677e793e64487da8a5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kuba=20Tu=C5=BCnik?= Date: Mon, 30 Sep 2024 19:08:24 +0200 Subject: [PATCH] TMP squash: #7466, #7479 DONOTSUBMIT --- .../podlistprocessor/filter_out_expendable.go | 4 +- .../filter_out_expendable_test.go | 4 +- .../filter_out_schedulable_test.go | 16 +- .../core/scaledown/actuation/actuator.go | 19 +- .../core/scaledown/actuation/actuator_test.go | 5 +- .../core/scaledown/actuation/drain_test.go | 3 +- .../scaleup/orchestrator/async_initializer.go | 4 +- .../core/scaleup/orchestrator/orchestrator.go | 12 +- .../scaleup/orchestrator/orchestrator_test.go | 29 +- .../core/scaleup/resource/manager_test.go | 9 + cluster-autoscaler/core/static_autoscaler.go | 47 +- .../core/static_autoscaler_test.go | 42 +- cluster-autoscaler/core/utils/utils.go | 77 --- cluster-autoscaler/core/utils/utils_test.go | 29 - .../estimator/binpacking_estimator.go | 14 +- .../estimator/binpacking_estimator_test.go | 6 +- .../mixed_nodeinfos_processor.go | 42 +- .../mixed_nodeinfos_processor_test.go | 33 +- .../pod_injection_processor_test.go | 7 +- cluster-autoscaler/simulator/cluster.go | 4 +- .../simulator/clustersnapshot/basic.go | 61 +-- .../clustersnapshot/clustersnapshot.go | 28 +- .../clustersnapshot_benchmark_test.go | 63 +-- .../clustersnapshot/clustersnapshot_test.go | 121 +++-- .../simulator/clustersnapshot/delta.go | 64 +-- .../simulator/clustersnapshot/test_utils.go | 10 +- .../simulator/framework/infos.go | 17 + .../simulator/framework/infos_test.go | 58 ++ .../simulator/node_info_utils.go | 153 ++++++ .../simulator/node_info_utils_test.go | 510 ++++++++++++++++++ cluster-autoscaler/simulator/nodes.go | 71 --- cluster-autoscaler/simulator/nodes_test.go | 239 -------- .../predicatechecker/schedulerbased_test.go | 9 +- .../simulator/scheduling/hinting_simulator.go | 2 +- .../utils/daemonset/daemonset.go | 5 + .../utils/scheduler/scheduler.go | 22 - 36 files changed, 1035 insertions(+), 804 deletions(-) create mode 100644 cluster-autoscaler/simulator/node_info_utils.go create mode 100644 cluster-autoscaler/simulator/node_info_utils_test.go delete mode 100644 cluster-autoscaler/simulator/nodes.go delete mode 100644 cluster-autoscaler/simulator/nodes_test.go diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go index 0ec929814a1a..550f8a10520f 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable.go @@ -23,7 +23,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) type filterOutExpendable struct { @@ -56,7 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods // CA logic from before migration to scheduler framework. So let's keep it for now func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error { for _, p := range pods { - if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil { + if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil { klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err) return caerrors.ToAutoscalerError(caerrors.InternalError, err) } diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go index 458f633c7152..94f6915e3028 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_expendable_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/context" @@ -109,7 +110,8 @@ func TestFilterOutExpendable(t *testing.T) { t.Run(tc.name, func(t *testing.T) { processor := NewFilterOutExpendablePodListProcessor() snapshot := clustersnapshot.NewBasicClusterSnapshot() - snapshot.AddNodes(tc.nodes) + err := snapshot.SetClusterState(tc.nodes, nil) + assert.NoError(t, err) pods, err := processor.Process(&context.AutoscalingContext{ ClusterSnapshot: snapshot, diff --git a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go index e02e0b9c0bb6..7b0054f9a2f2 100644 --- a/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go +++ b/cluster-autoscaler/core/podlistprocessor/filter_out_schedulable_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" @@ -183,16 +184,12 @@ func TestFilterOutSchedulable(t *testing.T) { allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...) for node, pods := range tc.nodesWithPods { - err := clusterSnapshot.AddNode(node) - assert.NoError(t, err) - for _, pod := range pods { pod.Spec.NodeName = node.Name - err = clusterSnapshot.AddPod(pod, node.Name) - assert.NoError(t, err) - allExpectedScheduledPods = append(allExpectedScheduledPods, pod) } + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...)) + assert.NoError(t, err) } clusterSnapshot.Fork() @@ -286,15 +283,10 @@ func BenchmarkFilterOutSchedulable(b *testing.B) { assert.NoError(b, err) clusterSnapshot := snapshotFactory() - if err := clusterSnapshot.AddNodes(nodes); err != nil { + if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil { assert.NoError(b, err) } - for _, pod := range scheduledPods { - if err := clusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil { - assert.NoError(b, err) - } - } b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator.go b/cluster-autoscaler/core/scaledown/actuation/actuator.go index b02e2016aaab..a85410172684 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator.go @@ -356,7 +356,6 @@ func (a *Actuator) taintNode(node *apiv1.Node) error { } func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) { - knownNodes := make(map[string]bool) snapshot := clustersnapshot.NewBasicClusterSnapshot() pods, err := a.ctx.AllPodLister().List() if err != nil { @@ -366,22 +365,10 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS scheduledPods := kube_util.ScheduledPods(pods) nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff) - for _, node := range nodes { - if err := snapshot.AddNode(node); err != nil { - return nil, err - } - - knownNodes[node.Name] = true - } - - for _, pod := range nonExpendableScheduledPods { - if knownNodes[pod.Spec.NodeName] { - if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil { - return nil, err - } - } + err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods) + if err != nil { + return nil, err } - return snapshot, nil } diff --git a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go index 2f48e498c8ee..6f44abaf06b6 100644 --- a/cluster-autoscaler/core/scaledown/actuation/actuator_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/actuator_test.go @@ -43,6 +43,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/simulator/utilization" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" @@ -1159,7 +1160,7 @@ func TestStartDeletion(t *testing.T) { csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) for _, bucket := range emptyNodeGroupViews { for _, node := range bucket.Nodes { - err := ctx.ClusterSnapshot.AddNodeWithPods(node, tc.pods[node.Name]) + err := ctx.ClusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods[node.Name]...)) if err != nil { t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err) } @@ -1171,7 +1172,7 @@ func TestStartDeletion(t *testing.T) { if !found { t.Fatalf("Drain node %q doesn't have pods defined in the test case.", node.Name) } - err := ctx.ClusterSnapshot.AddNodeWithPods(node, pods) + err := ctx.ClusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...)) if err != nil { t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err) } diff --git a/cluster-autoscaler/core/scaledown/actuation/drain_test.go b/cluster-autoscaler/core/scaledown/actuation/drain_test.go index 7205afe4a975..6ba905761db5 100644 --- a/cluster-autoscaler/core/scaledown/actuation/drain_test.go +++ b/cluster-autoscaler/core/scaledown/actuation/drain_test.go @@ -37,6 +37,7 @@ import ( . "k8s.io/autoscaler/cluster-autoscaler/core/test" "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/daemonset" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -612,7 +613,7 @@ func TestPodsToEvict(t *testing.T) { t.Run(tn, func(t *testing.T) { snapshot := clustersnapshot.NewBasicClusterSnapshot() node := BuildTestNode("test-node", 1000, 1000) - err := snapshot.AddNodeWithPods(node, tc.pods) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods...)) if err != nil { t.Errorf("AddNodeWithPods unexpected error: %v", err) } diff --git a/cluster-autoscaler/core/scaleup/orchestrator/async_initializer.go b/cluster-autoscaler/core/scaleup/orchestrator/async_initializer.go index de2dabf600bc..a8e82b87e7ba 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/async_initializer.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/async_initializer.go @@ -25,10 +25,10 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" @@ -110,7 +110,7 @@ func (s *AsyncNodeGroupInitializer) InitializeNodeGroup(result nodegroups.AsyncN mainCreatedNodeGroup := result.CreationResult.MainCreatedNodeGroup // If possible replace candidate node-info with node info based on crated node group. The latter // one should be more in line with nodes which will be created by node group. - nodeInfo, aErr := utils.GetNodeInfoFromTemplate(mainCreatedNodeGroup, s.daemonSets, s.taintConfig) + nodeInfo, aErr := simulator.TemplateNodeInfoFromNodeGroupTemplate(mainCreatedNodeGroup, s.daemonSets, s.taintConfig) if aErr != nil { klog.Warningf("Cannot build node info for newly created main node group %s. Using fallback. Error: %v", mainCreatedNodeGroup.Id(), aErr) nodeInfo = s.nodeInfo diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go index dd4b53a241ac..80020cf319e5 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go @@ -27,7 +27,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/equivalence" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/resource" - "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/metrics" @@ -35,6 +34,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/klogx" @@ -527,7 +527,7 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup( // If possible replace candidate node-info with node info based on crated node group. The latter // one should be more in line with nodes which will be created by node group. - mainCreatedNodeInfo, aErr := utils.GetNodeInfoFromTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, o.taintConfig) + mainCreatedNodeInfo, aErr := simulator.TemplateNodeInfoFromNodeGroupTemplate(createNodeGroupResult.MainCreatedNodeGroup, daemonSets, o.taintConfig) if aErr == nil { nodeInfos[createNodeGroupResult.MainCreatedNodeGroup.Id()] = mainCreatedNodeInfo schedulablePodGroups[createNodeGroupResult.MainCreatedNodeGroup.Id()] = o.SchedulablePodGroups(podEquivalenceGroups, createNodeGroupResult.MainCreatedNodeGroup, mainCreatedNodeInfo) @@ -542,7 +542,7 @@ func (o *ScaleUpOrchestrator) CreateNodeGroup( delete(schedulablePodGroups, oldId) } for _, nodeGroup := range createNodeGroupResult.ExtraCreatedNodeGroups { - nodeInfo, aErr := utils.GetNodeInfoFromTemplate(nodeGroup, daemonSets, o.taintConfig) + nodeInfo, aErr := simulator.TemplateNodeInfoFromNodeGroupTemplate(nodeGroup, daemonSets, o.taintConfig) if aErr != nil { klog.Warningf("Cannot build node info for newly created extra node group %v; balancing similar node groups will not work; err=%v", nodeGroup.Id(), aErr) continue @@ -569,11 +569,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups( defer o.autoscalingContext.ClusterSnapshot.Revert() // Add test node to snapshot. - var allPods []*apiv1.Pod - for _, podInfo := range nodeInfo.Pods() { - allPods = append(allPods, podInfo.Pod) - } - if err := o.autoscalingContext.ClusterSnapshot.AddNodeWithPods(nodeInfo.Node(), allPods); err != nil { + if err := o.autoscalingContext.ClusterSnapshot.AddNodeInfo(nodeInfo); err != nil { klog.Errorf("Error while adding test Node: %v", err) return []estimator.PodEquivalenceGroup{} } diff --git a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go index a1cdd15eba91..bd270be6774a 100644 --- a/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go +++ b/cluster-autoscaler/core/scaleup/orchestrator/orchestrator_test.go @@ -1049,6 +1049,8 @@ func runSimpleScaleUpTest(t *testing.T, config *ScaleUpTestConfig) *ScaleUpTestR // build orchestrator context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) + err = context.ClusterSnapshot.SetClusterState(nodes, kube_util.ScheduledPods(pods)) + assert.NoError(t, err) nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false). Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) assert.NoError(t, err) @@ -1130,13 +1132,15 @@ func TestScaleUpUnhealthy(t *testing.T) { SetNodeReadyState(n1, true, someTimeAgo) n2 := BuildTestNode("n2", 1000, 1000) SetNodeReadyState(n2, true, someTimeAgo) + nodes := []*apiv1.Node{n1, n2} p1 := BuildTestPod("p1", 80, 0) p2 := BuildTestPod("p2", 800, 0) p1.Spec.NodeName = "n1" p2.Spec.NodeName = "n2" + pods := []*apiv1.Pod{p1, p2} - podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1, p2}) + podLister := kube_util.NewTestPodLister(pods) listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil) provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error { @@ -1155,8 +1159,8 @@ func TestScaleUpUnhealthy(t *testing.T) { } context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) - - nodes := []*apiv1.Node{n1, n2} + err = context.ClusterSnapshot.SetClusterState(nodes, pods) + assert.NoError(t, err) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) @@ -1198,7 +1202,8 @@ func TestBinpackingLimiter(t *testing.T) { context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) - + err = context.ClusterSnapshot.SetClusterState(nodes, nil) + assert.NoError(t, err) nodeInfos, err := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false). Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) assert.NoError(t, err) @@ -1233,11 +1238,13 @@ func TestScaleUpNoHelp(t *testing.T) { n1 := BuildTestNode("n1", 100, 1000) now := time.Now() SetNodeReadyState(n1, true, now.Add(-2*time.Minute)) + nodes := []*apiv1.Node{n1} p1 := BuildTestPod("p1", 80, 0) p1.Spec.NodeName = "n1" + pods := []*apiv1.Pod{p1} - podLister := kube_util.NewTestPodLister([]*apiv1.Pod{p1}) + podLister := kube_util.NewTestPodLister(pods) listers := kube_util.NewListerRegistry(nil, nil, podLister, nil, nil, nil, nil, nil, nil) provider := testprovider.NewTestCloudProvider(func(nodeGroup string, increase int) error { @@ -1255,8 +1262,8 @@ func TestScaleUpNoHelp(t *testing.T) { } context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) - - nodes := []*apiv1.Node{n1} + err = context.ClusterSnapshot.SetClusterState(nodes, pods) + assert.NoError(t, err) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) @@ -1410,7 +1417,8 @@ func TestComputeSimilarNodeGroups(t *testing.T) { listers := kube_util.NewListerRegistry(nil, nil, kube_util.NewTestPodLister(nil), nil, nil, nil, nil, nil, nil) ctx, err := NewScaleTestAutoscalingContext(config.AutoscalingOptions{BalanceSimilarNodeGroups: tc.balancingEnabled}, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) - + err = ctx.ClusterSnapshot.SetClusterState(nodes, nil) + assert.NoError(t, err) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) assert.NoError(t, clusterState.UpdateNodes(nodes, nodeInfos, time.Now())) @@ -1474,7 +1482,8 @@ func TestScaleUpBalanceGroups(t *testing.T) { } context, err := NewScaleTestAutoscalingContext(options, &fake.Clientset{}, listers, provider, nil, nil) assert.NoError(t, err) - + err = context.ClusterSnapshot.SetClusterState(nodes, podList) + assert.NoError(t, err) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) clusterState.UpdateNodes(nodes, nodeInfos, time.Now()) @@ -1650,6 +1659,8 @@ func TestScaleUpToMeetNodeGroupMinSize(t *testing.T) { assert.NoError(t, err) nodes := []*apiv1.Node{n1, n2} + err = context.ClusterSnapshot.SetClusterState(nodes, nil) + assert.NoError(t, err) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now()) processors := processorstest.NewTestProcessors(&context) clusterState := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker()) diff --git a/cluster-autoscaler/core/scaleup/resource/manager_test.go b/cluster-autoscaler/core/scaleup/resource/manager_test.go index ac1204be1b55..3824edb4dcbc 100644 --- a/cluster-autoscaler/core/scaleup/resource/manager_test.go +++ b/cluster-autoscaler/core/scaleup/resource/manager_test.go @@ -22,6 +22,7 @@ import ( "time" "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" @@ -73,6 +74,8 @@ func TestDeltaForNode(t *testing.T) { ng := testCase.nodeGroupConfig group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem) + err := ctx.ClusterSnapshot.SetClusterState(nodes, nil) + assert.NoError(t, err) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now()) rm := NewManager(processors.CustomResourcesProcessor) @@ -114,6 +117,8 @@ func TestResourcesLeft(t *testing.T) { ng := testCase.nodeGroupConfig _, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem) + err := ctx.ClusterSnapshot.SetClusterState(nodes, nil) + assert.NoError(t, err) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now()) rm := NewManager(processors.CustomResourcesProcessor) @@ -165,6 +170,8 @@ func TestApplyLimits(t *testing.T) { ng := testCase.nodeGroupConfig group, nodes := newNodeGroup(t, cp, ng.Name, ng.Min, ng.Max, ng.Size, ng.CPU, ng.Mem) + err := ctx.ClusterSnapshot.SetClusterState(nodes, nil) + assert.NoError(t, err) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now()) rm := NewManager(processors.CustomResourcesProcessor) @@ -230,6 +237,8 @@ func TestResourceManagerWithGpuResource(t *testing.T) { assert.NoError(t, err) nodes := []*corev1.Node{n1} + err = context.ClusterSnapshot.SetClusterState(nodes, nil) + assert.NoError(t, err) nodeInfos, _ := nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(nil, false).Process(&context, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, time.Now()) rm := NewManager(processors.CustomResourcesProcessor) diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index f14e8db7f681..280a1f589a28 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -34,7 +34,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner" scaledownstatus "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status" "k8s.io/autoscaler/cluster-autoscaler/core/scaleup" - orchestrator "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" + "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator" core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot" "k8s.io/autoscaler/cluster-autoscaler/estimator" @@ -52,13 +52,12 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - scheduler_utils "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" "k8s.io/autoscaler/cluster-autoscaler/utils/taints" "k8s.io/utils/integer" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/labels" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) const ( @@ -242,28 +241,6 @@ func (a *StaticAutoscaler) cleanUpIfRequired() { a.initialized = true } -func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError { - a.ClusterSnapshot.Clear() - - knownNodes := make(map[string]bool) - for _, node := range nodes { - if err := a.ClusterSnapshot.AddNode(node); err != nil { - klog.Errorf("Failed to add node %s to cluster snapshot: %v", node.Name, err) - return caerrors.ToAutoscalerError(caerrors.InternalError, err) - } - knownNodes[node.Name] = true - } - for _, pod := range scheduledPods { - if knownNodes[pod.Spec.NodeName] { - if err := a.ClusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil { - klog.Errorf("Failed to add pod %s scheduled to node %s to cluster snapshot: %v", pod.Name, pod.Spec.NodeName, err) - return caerrors.ToAutoscalerError(caerrors.InternalError, err) - } - } - } - return nil -} - func (a *StaticAutoscaler) initializeRemainingPdbTracker() caerrors.AutoscalerError { a.RemainingPdbTracker.Clear() @@ -361,8 +338,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr } nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff) // Initialize cluster state to ClusterSnapshot - if typedErr := a.initializeClusterSnapshot(allNodes, nonExpendableScheduledPods); typedErr != nil { - return typedErr.AddPrefix("failed to initialize ClusterSnapshot: ") + if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil { + return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ") } // Initialize Pod Disruption Budget tracking if typedErr := a.initializeRemainingPdbTracker(); typedErr != nil { @@ -486,7 +463,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming) // Remove the nodes from the snapshot as well so that the state is consistent. for _, notStartedNodeName := range allRegisteredUpcoming { - err := a.ClusterSnapshot.RemoveNode(notStartedNodeName) + err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName) if err != nil { klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err) // ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the @@ -682,20 +659,16 @@ func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[ nodeGroups := a.nodeGroupsById() upcomingNodeGroups := make(map[string]int) upcomingNodesFromUpcomingNodeGroups := 0 - for nodeGroupName, upcomingNodes := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) { + for nodeGroupName, upcomingNodeInfos := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) { nodeGroup := nodeGroups[nodeGroupName] if nodeGroup == nil { return fmt.Errorf("failed to find node group: %s", nodeGroupName) } isUpcomingNodeGroup := a.processors.AsyncNodeGroupStateChecker.IsUpcoming(nodeGroup) - for _, upcomingNode := range upcomingNodes { - var pods []*apiv1.Pod - for _, podInfo := range upcomingNode.Pods() { - pods = append(pods, podInfo.Pod) - } - err := a.ClusterSnapshot.AddNodeWithPods(upcomingNode.Node(), pods) + for _, upcomingNodeInfo := range upcomingNodeInfos { + err := a.ClusterSnapshot.AddNodeInfo(upcomingNodeInfo) if err != nil { - return fmt.Errorf("Failed to add upcoming node %s to cluster snapshot: %w", upcomingNode.Node().Name, err) + return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNodeInfo.Node().Name, err) } if isUpcomingNodeGroup { upcomingNodesFromUpcomingNodeGroups++ @@ -1054,7 +1027,7 @@ func getUpcomingNodeInfos(upcomingCounts map[string]int, nodeInfos map[string]*f // Ensure new nodes have different names because nodeName // will be used as a map key. Also deep copy pods (daemonsets & // any pods added by cloud provider on template). - nodes = append(nodes, scheduler_utils.DeepCopyTemplateNode(nodeTemplate, fmt.Sprintf("upcoming-%d", i))) + nodes = append(nodes, simulator.FreshNodeInfoFromTemplateNodeInfo(nodeTemplate, fmt.Sprintf("upcoming-%d", i))) } upcomingNodes[nodeGroup] = nodes } diff --git a/cluster-autoscaler/core/static_autoscaler_test.go b/cluster-autoscaler/core/static_autoscaler_test.go index 2df10d4b7355..00ff2a8c6338 100644 --- a/cluster-autoscaler/core/static_autoscaler_test.go +++ b/cluster-autoscaler/core/static_autoscaler_test.go @@ -64,6 +64,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/taints" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" kube_record "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics" appsv1 "k8s.io/api/apps/v1" @@ -78,7 +79,6 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" - klog "k8s.io/klog/v2" ) type podListerMock struct { @@ -406,7 +406,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // MaxNodesTotal reached. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -417,7 +417,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Scale up. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once() @@ -431,7 +431,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Mark unneeded nodes. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -446,7 +446,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Scale down. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() onScaleDownMock.On("ScaleDown", "ng1", "n2").Return(nil).Once() @@ -460,7 +460,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Mark unregistered nodes. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -475,7 +475,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Remove unregistered nodes. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() onScaleDownMock.On("ScaleDown", "ng2", "n3").Return(nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -489,7 +489,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) { // Scale up to node group min size. readyNodeLister.SetNodes([]*apiv1.Node{n4}) allNodeLister.SetNodes([]*apiv1.Node{n4}) - allPodListerMock.On("List").Return([]*apiv1.Pod{}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil) podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil) onScaleUpMock.On("ScaleUp", "ng3", 2).Return(nil).Once() // 2 new nodes are supposed to be scaled up. @@ -689,7 +689,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { // Mark unneeded nodes. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -701,7 +701,7 @@ func TestStaticAutoscalerRunOnceWithScaleDownDelayPerNG(t *testing.T) { // Scale down nodegroup readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil) onScaleDownMock.On("ScaleDown", tc.expectedScaleDownNG, tc.expectedScaleDownNode).Return(nil).Once() @@ -828,7 +828,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { // Scale up. readyNodeLister.SetNodes([]*apiv1.Node{n1}) allNodeLister.SetNodes([]*apiv1.Node{n1}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() onNodeGroupCreateMock.On("Create", "autoprovisioned-TN2").Return(nil).Once() @@ -845,7 +845,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { // Remove autoprovisioned node group and mark unneeded nodes. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() onNodeGroupDeleteMock.On("Delete", "autoprovisioned-TN1").Return(nil).Once() @@ -861,7 +861,7 @@ func TestStaticAutoscalerRunOnceWithAutoprovisionedEnabled(t *testing.T) { // Scale down. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Times(3) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1}, nil).Twice() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() onNodeGroupDeleteMock.On("Delete", "autoprovisioned-"+ @@ -984,7 +984,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { // Scale up. readyNodeLister.SetNodes(nodes) allNodeLister.SetNodes(nodes) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() onScaleUpMock.On("ScaleUp", "ng1", 1).Return(nil).Once() @@ -1002,7 +1002,7 @@ func TestStaticAutoscalerRunOnceWithALongUnregisteredNode(t *testing.T) { // Remove broken node readyNodeLister.SetNodes(nodes) allNodeLister.SetNodes(nodes) - allPodListerMock.On("List").Return([]*apiv1.Pod{}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{}, nil).Once() onScaleDownMock.On("ScaleDown", "ng1", "broken").Return(nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -1137,7 +1137,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { // Scale up readyNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4, p5, p6}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4, p5, p6}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() onScaleUpMock.On("ScaleUp", "ng2", 1).Return(nil).Once() @@ -1150,7 +1150,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { // Mark unneeded nodes. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4, p5}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4, p5}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -1164,7 +1164,7 @@ func TestStaticAutoscalerRunOncePodsWithPriorities(t *testing.T) { // Scale down. readyNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2, n3}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4, p5}, nil).Times(3) + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3, p4, p5}, nil).Twice() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() onScaleDownMock.On("ScaleDown", "ng1", "n1").Return(nil).Once() @@ -1266,7 +1266,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnBinPackingEstimator(t *testing.T) // Scale up readyNodeLister.SetNodes([]*apiv1.Node{n1, n2}) allNodeLister.SetNodes([]*apiv1.Node{n1, n2}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p3, p4}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p3, p4}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -1365,7 +1365,7 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t * // Scale up readyNodeLister.SetNodes([]*apiv1.Node{n2, n3}) allNodeLister.SetNodes([]*apiv1.Node{n2, n3}) - allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Twice() + allPodListerMock.On("List").Return([]*apiv1.Pod{p1, p2, p3}, nil).Once() daemonSetListerMock.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() podDisruptionBudgetListerMock.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() @@ -1566,7 +1566,7 @@ func TestStaticAutoscalerRunOnceWithBypassedSchedulers(t *testing.T) { tc.setupConfig.mocks.readyNodeLister.SetNodes([]*apiv1.Node{n1}) tc.setupConfig.mocks.allNodeLister.SetNodes([]*apiv1.Node{n1}) - tc.setupConfig.mocks.allPodLister.On("List").Return(tc.pods, nil).Twice() + tc.setupConfig.mocks.allPodLister.On("List").Return(tc.pods, nil).Once() tc.setupConfig.mocks.daemonSetLister.On("List", labels.Everything()).Return([]*appsv1.DaemonSet{}, nil).Once() tc.setupConfig.mocks.podDisruptionBudgetLister.On("List").Return([]*policyv1.PodDisruptionBudget{}, nil).Once() if tc.expectedScaleUp != nil { diff --git a/cluster-autoscaler/core/utils/utils.go b/cluster-autoscaler/core/utils/utils.go index c25db2ef8453..1b493b783cfc 100644 --- a/cluster-autoscaler/core/utils/utils.go +++ b/cluster-autoscaler/core/utils/utils.go @@ -17,52 +17,17 @@ limitations under the License. package utils import ( - "fmt" - "math/rand" "reflect" "time" - appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/metrics" - "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" - "k8s.io/autoscaler/cluster-autoscaler/utils/daemonset" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" - "k8s.io/autoscaler/cluster-autoscaler/utils/labels" - "k8s.io/autoscaler/cluster-autoscaler/utils/taints" ) -// GetNodeInfoFromTemplate returns NodeInfo object built base on TemplateNodeInfo returned by NodeGroup.TemplateNodeInfo(). -func GetNodeInfoFromTemplate(nodeGroup cloudprovider.NodeGroup, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig) (*framework.NodeInfo, errors.AutoscalerError) { - id := nodeGroup.Id() - baseNodeInfo, err := nodeGroup.TemplateNodeInfo() - if err != nil { - return nil, errors.ToAutoscalerError(errors.CloudProviderError, err) - } - - labels.UpdateDeprecatedLabels(baseNodeInfo.Node().ObjectMeta.Labels) - - sanitizedNode, typedErr := SanitizeNode(baseNodeInfo.Node(), id, taintConfig) - if err != nil { - return nil, typedErr - } - baseNodeInfo.SetNode(sanitizedNode) - - pods, err := daemonset.GetDaemonSetPodsForNode(baseNodeInfo, daemonsets) - if err != nil { - return nil, errors.ToAutoscalerError(errors.InternalError, err) - } - for _, podInfo := range baseNodeInfo.Pods() { - pods = append(pods, &framework.PodInfo{Pod: podInfo.Pod}) - } - - sanitizedNodeInfo := framework.NewNodeInfo(sanitizedNode, nil, SanitizePods(pods, sanitizedNode)...) - return sanitizedNodeInfo, nil -} - // isVirtualNode determines if the node is created by virtual kubelet func isVirtualNode(node *apiv1.Node) bool { return node.ObjectMeta.Labels["type"] == "virtual-kubelet" @@ -89,48 +54,6 @@ func FilterOutNodesFromNotAutoscaledGroups(nodes []*apiv1.Node, cloudProvider cl return result, nil } -// DeepCopyNodeInfo clones the provided nodeInfo -func DeepCopyNodeInfo(nodeInfo *framework.NodeInfo) *framework.NodeInfo { - newPods := make([]*framework.PodInfo, 0) - for _, podInfo := range nodeInfo.Pods() { - newPods = append(newPods, &framework.PodInfo{Pod: podInfo.Pod.DeepCopy()}) - } - - // Build a new node info. - newNodeInfo := framework.NewNodeInfo(nodeInfo.Node().DeepCopy(), nil, newPods...) - return newNodeInfo -} - -// SanitizeNode cleans up nodes used for node group templates -func SanitizeNode(node *apiv1.Node, nodeGroup string, taintConfig taints.TaintConfig) (*apiv1.Node, errors.AutoscalerError) { - newNode := node.DeepCopy() - nodeName := fmt.Sprintf("template-node-for-%s-%d", nodeGroup, rand.Int63()) - newNode.Labels = make(map[string]string, len(node.Labels)) - for k, v := range node.Labels { - if k != apiv1.LabelHostname { - newNode.Labels[k] = v - } else { - newNode.Labels[k] = nodeName - } - } - newNode.Name = nodeName - newNode.Spec.Taints = taints.SanitizeTaints(newNode.Spec.Taints, taintConfig) - return newNode, nil -} - -// SanitizePods cleans up pods used for node group templates -func SanitizePods(pods []*framework.PodInfo, sanitizedNode *apiv1.Node) []*framework.PodInfo { - // Update node name in pods. - sanitizedPods := make([]*framework.PodInfo, 0) - for _, pod := range pods { - sanitizedPod := pod.Pod.DeepCopy() - sanitizedPod.Spec.NodeName = sanitizedNode.Name - sanitizedPods = append(sanitizedPods, &framework.PodInfo{Pod: sanitizedPod}) - } - - return sanitizedPods -} - func hasHardInterPodAffinity(affinity *apiv1.Affinity) bool { if affinity == nil { return false diff --git a/cluster-autoscaler/core/utils/utils_test.go b/cluster-autoscaler/core/utils/utils_test.go index b63badbcc834..2613b0419a13 100644 --- a/cluster-autoscaler/core/utils/utils_test.go +++ b/cluster-autoscaler/core/utils/utils_test.go @@ -20,8 +20,6 @@ import ( "testing" "time" - "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" - "k8s.io/autoscaler/cluster-autoscaler/utils/taints" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "github.com/stretchr/testify/assert" @@ -29,33 +27,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func TestSanitizePods(t *testing.T) { - pod := BuildTestPod("p1", 80, 0) - pod.Spec.NodeName = "n1" - pods := []*framework.PodInfo{{Pod: pod}} - - node := BuildTestNode("node", 1000, 1000) - - resNode, err := SanitizeNode(node, "test-group", taints.TaintConfig{}) - assert.NoError(t, err) - res := SanitizePods(pods, resNode) - assert.Equal(t, 1, len(res)) -} - -func TestSanitizeLabels(t *testing.T) { - oldNode := BuildTestNode("ng1-1", 1000, 1000) - oldNode.Labels = map[string]string{ - apiv1.LabelHostname: "abc", - "x": "y", - } - node, err := SanitizeNode(oldNode, "bzium", taints.TaintConfig{}) - assert.NoError(t, err) - assert.NotEqual(t, node.Labels[apiv1.LabelHostname], "abc", nil) - assert.Equal(t, node.Labels["x"], "y") - assert.NotEqual(t, node.Name, oldNode.Name) - assert.Equal(t, node.Labels[apiv1.LabelHostname], node.Name) -} - func TestGetNodeResource(t *testing.T) { node := BuildTestNode("n1", 1000, 2*MiB) diff --git a/cluster-autoscaler/estimator/binpacking_estimator.go b/cluster-autoscaler/estimator/binpacking_estimator.go index 6ffad3800df6..1f602348df50 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator.go +++ b/cluster-autoscaler/estimator/binpacking_estimator.go @@ -21,11 +21,11 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + core_utils "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" - "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) // BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods. @@ -210,12 +210,8 @@ func (e *BinpackingNodeEstimator) addNewNodeToSnapshot( estimationState *estimationState, template *framework.NodeInfo, ) error { - newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", estimationState.newNodeNameIndex)) - var pods []*apiv1.Pod - for _, podInfo := range newNodeInfo.Pods() { - pods = append(pods, podInfo.Pod) - } - if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil { + newNodeInfo := core_utils.FreshNodeInfoFromTemplateNodeInfo(template, fmt.Sprintf("e-%d", estimationState.newNodeNameIndex)) + if err := e.clusterSnapshot.AddNodeInfo(newNodeInfo); err != nil { return err } estimationState.newNodeNameIndex++ @@ -229,7 +225,7 @@ func (e *BinpackingNodeEstimator) tryToAddNode( pod *apiv1.Pod, nodeName string, ) error { - if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil { + if err := e.clusterSnapshot.ForceAddPod(pod, nodeName); err != nil { return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err) } estimationState.newNodesWithPods[nodeName] = true diff --git a/cluster-autoscaler/estimator/binpacking_estimator_test.go b/cluster-autoscaler/estimator/binpacking_estimator_test.go index e0fa48aeda10..e0205ffdc854 100644 --- a/cluster-autoscaler/estimator/binpacking_estimator_test.go +++ b/cluster-autoscaler/estimator/binpacking_estimator_test.go @@ -214,7 +214,8 @@ func TestBinpackingEstimate(t *testing.T) { t.Run(tc.name, func(t *testing.T) { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() // Add one node in different zone to trigger topology spread constraints - clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter")) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))) + assert.NoError(t, err) predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(t, err) @@ -268,7 +269,8 @@ func BenchmarkBinpackingEstimate(b *testing.B) { for i := 0; i < b.N; i++ { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter")) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))) + assert.NoError(b, err) predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(b, err) diff --git a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go index 8b0ebd58571a..34f486392099 100644 --- a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go +++ b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor.go @@ -24,7 +24,6 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/context" - "k8s.io/autoscaler/cluster-autoscaler/core/utils" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" @@ -78,11 +77,6 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, result := make(map[string]*framework.NodeInfo) seenGroups := make(map[string]bool) - podsForNodes, err := getPodsForNodes(ctx.ListerRegistry) - if err != nil { - return map[string]*framework.NodeInfo{}, err - } - // processNode returns information whether the nodeTemplate was generated and if there was an error. processNode := func(node *apiv1.Node) (bool, string, errors.AutoscalerError) { nodeGroup, err := ctx.CloudProvider.NodeGroupForNode(node) @@ -94,22 +88,15 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, } id := nodeGroup.Id() if _, found := result[id]; !found { - // Build nodeInfo. - sanitizedNode, err := utils.SanitizeNode(node, id, taintConfig) + nodeInfo, err := ctx.ClusterSnapshot.GetNodeInfo(node.Name) if err != nil { - return false, "", err + return false, "", errors.NewAutoscalerError(errors.InternalError, "error while retrieving node %s from cluster snapshot - this shouldn't happen: %v", node.Name, err) } - nodeInfo, err := simulator.BuildNodeInfoForNode(sanitizedNode, podsForNodes[node.Name], daemonsets, p.forceDaemonSets) + templateNodeInfo, caErr := simulator.TemplateNodeInfoFromExampleNodeInfo(nodeInfo, id, daemonsets, p.forceDaemonSets, taintConfig) if err != nil { - return false, "", err - } - - var pods []*apiv1.Pod - for _, podInfo := range nodeInfo.Pods() { - pods = append(pods, podInfo.Pod) + return false, "", caErr } - sanitizedNodeInfo := framework.NewNodeInfo(sanitizedNode, nil, utils.SanitizePods(nodeInfo.Pods(), sanitizedNode)...) - result[id] = sanitizedNodeInfo + result[id] = templateNodeInfo return true, id, nil } return false, "", nil @@ -125,7 +112,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, return map[string]*framework.NodeInfo{}, typedErr } if added && p.nodeInfoCache != nil { - nodeInfoCopy := utils.DeepCopyNodeInfo(result[id]) + nodeInfoCopy := result[id].DeepCopy() p.nodeInfoCache[id] = cacheItem{NodeInfo: nodeInfoCopy, added: time.Now()} } } @@ -142,7 +129,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, if p.isCacheItemExpired(cacheItem.added) { delete(p.nodeInfoCache, id) } else { - result[id] = utils.DeepCopyNodeInfo(cacheItem.NodeInfo) + result[id] = cacheItem.NodeInfo.DeepCopy() continue } } @@ -150,7 +137,7 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, // No good template, trying to generate one. This is called only if there are no // working nodes in the node groups. By default CA tries to use a real-world example. - nodeInfo, err := utils.GetNodeInfoFromTemplate(nodeGroup, daemonsets, taintConfig) + nodeInfo, err := simulator.TemplateNodeInfoFromNodeGroupTemplate(nodeGroup, daemonsets, taintConfig) if err != nil { if err == cloudprovider.ErrNotImplemented { continue @@ -192,19 +179,6 @@ func (p *MixedTemplateNodeInfoProvider) Process(ctx *context.AutoscalingContext, return result, nil } -func getPodsForNodes(listers kube_util.ListerRegistry) (map[string][]*apiv1.Pod, errors.AutoscalerError) { - pods, err := listers.AllPodLister().List() - if err != nil { - return nil, errors.ToAutoscalerError(errors.ApiCallError, err) - } - scheduledPods := kube_util.ScheduledPods(pods) - podsForNodes := map[string][]*apiv1.Pod{} - for _, p := range scheduledPods { - podsForNodes[p.Spec.NodeName] = append(podsForNodes[p.Spec.NodeName], p) - } - return podsForNodes, nil -} - func isNodeGoodTemplateCandidate(node *apiv1.Node, now time.Time) bool { ready, lastTransitionTime, _ := kube_util.GetReadinessState(node) stable := lastTransitionTime.Add(stabilizationDelay).Before(now) diff --git a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go index 68e04752a8dc..cd3eccc390af 100644 --- a/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go +++ b/cluster-autoscaler/processors/nodeinfosprovider/mixed_nodeinfos_processor_test.go @@ -22,6 +22,7 @@ import ( testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" @@ -30,6 +31,7 @@ import ( schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics" "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" ) @@ -81,14 +83,20 @@ func TestGetNodeInfosForGroups(t *testing.T) { predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(t, err) + nodes := []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1} + snapshot := clustersnapshot.NewBasicClusterSnapshot() + err = snapshot.SetClusterState(nodes, nil) + assert.NoError(t, err) + ctx := context.AutoscalingContext{ CloudProvider: provider1, + ClusterSnapshot: snapshot, PredicateChecker: predicateChecker, AutoscalingKubeClients: context.AutoscalingKubeClients{ ListerRegistry: registry, }, } - res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl, false).Process(&ctx, []*apiv1.Node{justReady5, unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) + res, err := NewMixedTemplateNodeInfoProvider(&cacheTtl, false).Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) assert.NoError(t, err) assert.Equal(t, 5, len(res)) info, found := res["ng1"] @@ -110,6 +118,7 @@ func TestGetNodeInfosForGroups(t *testing.T) { // Test for a nodegroup without nodes and TemplateNodeInfo not implemented by cloud proivder ctx = context.AutoscalingContext{ CloudProvider: provider2, + ClusterSnapshot: clustersnapshot.NewBasicClusterSnapshot(), PredicateChecker: predicateChecker, AutoscalingKubeClients: context.AutoscalingKubeClients{ ListerRegistry: registry, @@ -165,16 +174,22 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(t, err) + nodes := []*apiv1.Node{unready4, unready3, ready2, ready1} + snapshot := clustersnapshot.NewBasicClusterSnapshot() + err = snapshot.SetClusterState(nodes, nil) + assert.NoError(t, err) + // Fill cache ctx := context.AutoscalingContext{ CloudProvider: provider1, + ClusterSnapshot: snapshot, PredicateChecker: predicateChecker, AutoscalingKubeClients: context.AutoscalingKubeClients{ ListerRegistry: registry, }, } niProcessor := NewMixedTemplateNodeInfoProvider(&cacheTtl, false) - res, err := niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) + res, err := niProcessor.Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) assert.NoError(t, err) // Check results assert.Equal(t, 4, len(res)) @@ -208,7 +223,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { assert.Equal(t, "ng3", lastDeletedGroup) // Check cache with all nodes removed - res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) + res, err = niProcessor.Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) assert.NoError(t, err) // Check results assert.Equal(t, 2, len(res)) @@ -229,7 +244,7 @@ func TestGetNodeInfosForGroupsCache(t *testing.T) { // Fill cache manually infoNg4Node6 := framework.NewTestNodeInfo(ready6.DeepCopy()) niProcessor.nodeInfoCache = map[string]cacheItem{"ng4": {NodeInfo: infoNg4Node6, added: now}} - res, err = niProcessor.Process(&ctx, []*apiv1.Node{unready4, unready3, ready2, ready1}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) + res, err = niProcessor.Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) // Check if cache was used assert.NoError(t, err) assert.Equal(t, 2, len(res)) @@ -253,8 +268,14 @@ func TestGetNodeInfosCacheExpired(t *testing.T) { predicateChecker, err := predicatechecker.NewTestPredicateChecker() assert.NoError(t, err) + nodes := []*apiv1.Node{ready1} + snapshot := clustersnapshot.NewBasicClusterSnapshot() + err = snapshot.SetClusterState(nodes, nil) + assert.NoError(t, err) + ctx := context.AutoscalingContext{ CloudProvider: provider, + ClusterSnapshot: snapshot, PredicateChecker: predicateChecker, AutoscalingKubeClients: context.AutoscalingKubeClients{ ListerRegistry: registry, @@ -272,7 +293,7 @@ func TestGetNodeInfosCacheExpired(t *testing.T) { provider.AddNode("ng1", ready1) assert.Equal(t, 2, len(niProcessor1.nodeInfoCache)) - _, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) + _, err = niProcessor1.Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) assert.NoError(t, err) assert.Equal(t, 1, len(niProcessor1.nodeInfoCache)) @@ -283,7 +304,7 @@ func TestGetNodeInfosCacheExpired(t *testing.T) { "ng2": {NodeInfo: tni, added: now.Add(-2 * time.Second)}, } assert.Equal(t, 2, len(niProcessor2.nodeInfoCache)) - _, err = niProcessor1.Process(&ctx, []*apiv1.Node{ready1}, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) + _, err = niProcessor1.Process(&ctx, nodes, []*appsv1.DaemonSet{}, taints.TaintConfig{}, now) assert.NoError(t, err) assert.Equal(t, 2, len(niProcessor2.nodeInfoCache)) diff --git a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go index d2f96b244585..13a98c8d78c8 100644 --- a/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go +++ b/cluster-autoscaler/processors/podinjection/pod_injection_processor_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/context" podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" ) @@ -112,10 +113,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry()) clusterSnapshot := clustersnapshot.NewDeltaClusterSnapshot() - clusterSnapshot.AddNode(node) - for _, pod := range tc.scheduledPods { - clusterSnapshot.AddPod(pod, node.Name) - } + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...)) + assert.NoError(t, err) ctx := context.AutoscalingContext{ AutoscalingKubeClients: context.AutoscalingKubeClients{ ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister), diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index 6855ae5efb95..e81d9ecea0ff 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -32,7 +32,7 @@ import ( kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/autoscaler/cluster-autoscaler/utils/tpu" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) // NodeToBeRemoved contain information about a node that can be removed. @@ -223,7 +223,7 @@ func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, n // remove pods from clusterSnapshot first for _, pod := range pods { - if err := r.clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil { + if err := r.clusterSnapshot.ForceRemovePod(pod.Namespace, pod.Name, removedNode); err != nil { // just log error klog.Errorf("Simulating removal of %s/%s return error; %v", pod.Namespace, pod.Name, err) } diff --git a/cluster-autoscaler/simulator/clustersnapshot/basic.go b/cluster-autoscaler/simulator/clustersnapshot/basic.go index ce083894f64e..be8388c5ce8b 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/basic.go +++ b/cluster-autoscaler/simulator/clustersnapshot/basic.go @@ -153,16 +153,7 @@ func (data *internalBasicSnapshotData) addNode(node *apiv1.Node) error { return nil } -func (data *internalBasicSnapshotData) addNodes(nodes []*apiv1.Node) error { - for _, node := range nodes { - if err := data.addNode(node); err != nil { - return err - } - } - return nil -} - -func (data *internalBasicSnapshotData) removeNode(nodeName string) error { +func (data *internalBasicSnapshotData) removeNodeInfo(nodeName string) error { if _, found := data.nodeInfoMap[nodeName]; !found { return ErrNodeNotFound } @@ -205,7 +196,7 @@ func (data *internalBasicSnapshotData) removePod(namespace, podName, nodeName st // NewBasicClusterSnapshot creates instances of BasicClusterSnapshot. func NewBasicClusterSnapshot() *BasicClusterSnapshot { snapshot := &BasicClusterSnapshot{} - snapshot.Clear() + snapshot.clear() return snapshot } @@ -241,41 +232,39 @@ func (snapshot *BasicClusterSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo) return nil } -// AddNode adds node to the snapshot. -func (snapshot *BasicClusterSnapshot) AddNode(node *apiv1.Node) error { - return snapshot.getInternalData().addNode(node) -} - -// AddNodes adds nodes in batch to the snapshot. -func (snapshot *BasicClusterSnapshot) AddNodes(nodes []*apiv1.Node) error { - return snapshot.getInternalData().addNodes(nodes) -} +// SetClusterState sets the cluster state. +func (snapshot *BasicClusterSnapshot) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error { + snapshot.clear() -// AddNodeWithPods adds a node and set of pods to be scheduled to this node to the snapshot. -func (snapshot *BasicClusterSnapshot) AddNodeWithPods(node *apiv1.Node, pods []*apiv1.Pod) error { - if err := snapshot.AddNode(node); err != nil { - return err - } - for _, pod := range pods { - if err := snapshot.AddPod(pod, node.Name); err != nil { + knownNodes := make(map[string]bool) + for _, node := range nodes { + if err := snapshot.getInternalData().addNode(node); err != nil { return err } + knownNodes[node.Name] = true + } + for _, pod := range scheduledPods { + if knownNodes[pod.Spec.NodeName] { + if err := snapshot.getInternalData().addPod(pod, pod.Spec.NodeName); err != nil { + return err + } + } } return nil } -// RemoveNode removes nodes (and pods scheduled to it) from the snapshot. -func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error { - return snapshot.getInternalData().removeNode(nodeName) +// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot. +func (snapshot *BasicClusterSnapshot) RemoveNodeInfo(nodeName string) error { + return snapshot.getInternalData().removeNodeInfo(nodeName) } -// AddPod adds pod to the snapshot and schedules it to given node. -func (snapshot *BasicClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error { +// ForceAddPod adds pod to the snapshot and schedules it to given node. +func (snapshot *BasicClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error { return snapshot.getInternalData().addPod(pod, nodeName) } -// RemovePod removes pod from the snapshot. -func (snapshot *BasicClusterSnapshot) RemovePod(namespace, podName, nodeName string) error { +// ForceRemovePod removes pod from the snapshot. +func (snapshot *BasicClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error { return snapshot.getInternalData().removePod(namespace, podName, nodeName) } @@ -308,8 +297,8 @@ func (snapshot *BasicClusterSnapshot) Commit() error { return nil } -// Clear reset cluster snapshot to empty, unforked state -func (snapshot *BasicClusterSnapshot) Clear() { +// clear reset cluster snapshot to empty, unforked state +func (snapshot *BasicClusterSnapshot) clear() { baseData := newInternalBasicSnapshotData() snapshot.data = []*internalBasicSnapshotData{baseData} } diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go index a80c85c22d22..1c60fcc0b730 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go @@ -29,24 +29,22 @@ import ( // It exposes mutation methods and can be viewed as scheduler's SharedLister. type ClusterSnapshot interface { schedulerframework.SharedLister - // AddNode adds node to the snapshot. - AddNode(node *apiv1.Node) error - // AddNodes adds nodes to the snapshot. - AddNodes(nodes []*apiv1.Node) error - // RemoveNode removes nodes (and pods scheduled to it) from the snapshot. - RemoveNode(nodeName string) error - // AddPod adds pod to the snapshot and schedules it to given node. - AddPod(pod *apiv1.Pod, nodeName string) error - // RemovePod removes pod from the snapshot. - RemovePod(namespace string, podName string, nodeName string) error - // AddNodeWithPods adds a node and set of pods to be scheduled to this node to the snapshot. - AddNodeWithPods(node *apiv1.Node, pods []*apiv1.Pod) error - // IsPVCUsedByPods returns if the pvc is used by any pod, key = / - IsPVCUsedByPods(key string) bool + + // SetClusterState resets the snapshot to an unforked state and replaces the contents of the snapshot + // with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName. + SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error + + // ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot. + ForceAddPod(pod *apiv1.Pod, nodeName string) error + // ForceRemovePod removes the given Pod (and all DRA objects it owns) from the snapshot. + ForceRemovePod(namespace string, podName string, nodeName string) error // AddNodeInfo adds the given NodeInfo to the snapshot. The Node and the Pods are added, as well as // any DRA objects passed along them. AddNodeInfo(nodeInfo *framework.NodeInfo) error + // RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as + // any DRA objects owned by them. + RemoveNodeInfo(nodeName string) error // GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot. // This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos // obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo. @@ -61,8 +59,6 @@ type ClusterSnapshot interface { Revert() // Commit commits changes done after forking. Commit() error - // Clear reset cluster snapshot to empty, unforked state. - Clear() } // ErrNodeNotFound means that a node wasn't found in the snapshot. diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go index cf851773537e..fb6468adad6f 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_benchmark_test.go @@ -22,6 +22,8 @@ import ( "time" "github.com/stretchr/testify/assert" + + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" apiv1 "k8s.io/api/core/v1" @@ -67,7 +69,7 @@ func assignPodsToNodes(pods []*apiv1.Pod, nodes []*apiv1.Node) { } } -func BenchmarkAddNodes(b *testing.B) { +func BenchmarkAddNodeInfo(b *testing.B) { testCases := []int{1, 10, 100, 1000, 5000, 15000, 100000} for snapshotName, snapshotFactory := range snapshots { @@ -75,13 +77,13 @@ func BenchmarkAddNodes(b *testing.B) { nodes := createTestNodes(tc) clusterSnapshot := snapshotFactory() b.ResetTimer() - b.Run(fmt.Sprintf("%s: AddNode() %d", snapshotName, tc), func(b *testing.B) { + b.Run(fmt.Sprintf("%s: AddNodeInfo() %d", snapshotName, tc), func(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - clusterSnapshot.Clear() + assert.NoError(b, clusterSnapshot.SetClusterState(nil, nil)) b.StartTimer() for _, node := range nodes { - err := clusterSnapshot.AddNode(node) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) if err != nil { assert.NoError(b, err) } @@ -90,24 +92,6 @@ func BenchmarkAddNodes(b *testing.B) { }) } } - for snapshotName, snapshotFactory := range snapshots { - for _, tc := range testCases { - nodes := createTestNodes(tc) - clusterSnapshot := snapshotFactory() - b.ResetTimer() - b.Run(fmt.Sprintf("%s: AddNodes() %d", snapshotName, tc), func(b *testing.B) { - for i := 0; i < b.N; i++ { - b.StopTimer() - clusterSnapshot.Clear() - b.StartTimer() - err := clusterSnapshot.AddNodes(nodes) - if err != nil { - assert.NoError(b, err) - } - } - }) - } - } } func BenchmarkListNodeInfos(b *testing.B) { @@ -117,7 +101,7 @@ func BenchmarkListNodeInfos(b *testing.B) { for _, tc := range testCases { nodes := createTestNodes(tc) clusterSnapshot := snapshotFactory() - err := clusterSnapshot.AddNodes(nodes) + err := clusterSnapshot.SetClusterState(nodes, nil) if err != nil { assert.NoError(b, err) } @@ -142,25 +126,24 @@ func BenchmarkAddPods(b *testing.B) { for snapshotName, snapshotFactory := range snapshots { for _, tc := range testCases { - clusterSnapshot := snapshotFactory() nodes := createTestNodes(tc) - err := clusterSnapshot.AddNodes(nodes) - assert.NoError(b, err) pods := createTestPods(tc * 30) assignPodsToNodes(pods, nodes) + clusterSnapshot := snapshotFactory() + err := clusterSnapshot.SetClusterState(nodes, nil) + assert.NoError(b, err) b.ResetTimer() - b.Run(fmt.Sprintf("%s: AddPod() 30*%d", snapshotName, tc), func(b *testing.B) { + b.Run(fmt.Sprintf("%s: ForceAddPod() 30*%d", snapshotName, tc), func(b *testing.B) { for i := 0; i < b.N; i++ { b.StopTimer() - clusterSnapshot.Clear() - err = clusterSnapshot.AddNodes(nodes) + err = clusterSnapshot.SetClusterState(nodes, nil) if err != nil { assert.NoError(b, err) } b.StartTimer() for _, pod := range pods { - err = clusterSnapshot.AddPod(pod, pod.Spec.NodeName) + err = clusterSnapshot.ForceAddPod(pod, pod.Spec.NodeName) if err != nil { assert.NoError(b, err) } @@ -182,24 +165,20 @@ func BenchmarkForkAddRevert(b *testing.B) { pods := createTestPods(ntc * ptc) assignPodsToNodes(pods, nodes) clusterSnapshot := snapshotFactory() - err := clusterSnapshot.AddNodes(nodes) + err := clusterSnapshot.SetClusterState(nodes, pods) assert.NoError(b, err) - for _, pod := range pods { - err = clusterSnapshot.AddPod(pod, pod.Spec.NodeName) - assert.NoError(b, err) - } tmpNode1 := BuildTestNode("tmp-1", 2000, 2000000) tmpNode2 := BuildTestNode("tmp-2", 2000, 2000000) b.ResetTimer() b.Run(fmt.Sprintf("%s: ForkAddRevert (%d nodes, %d pods)", snapshotName, ntc, ptc), func(b *testing.B) { for i := 0; i < b.N; i++ { clusterSnapshot.Fork() - err = clusterSnapshot.AddNode(tmpNode1) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode1)) if err != nil { assert.NoError(b, err) } clusterSnapshot.Fork() - err = clusterSnapshot.AddNode(tmpNode2) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode2)) if err != nil { assert.NoError(b, err) } @@ -234,12 +213,14 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) { nodes := createTestNodes(tc.nodeCount + 1000) snapshot := NewDeltaClusterSnapshot() - if err := snapshot.AddNodes(nodes[:tc.nodeCount]); err != nil { + if err := snapshot.SetClusterState(nodes[:tc.nodeCount], nil); err != nil { assert.NoError(b, err) } snapshot.Fork() - if err := snapshot.AddNodes(nodes[tc.nodeCount:]); err != nil { - assert.NoError(b, err) + for _, node := range nodes[tc.nodeCount:] { + if err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)); err != nil { + assert.NoError(b, err) + } } b.ResetTimer() for i := 0; i < b.N; i++ { @@ -254,7 +235,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) { b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) { nodes := createTestNodes(tc.nodeCount) snapshot := NewDeltaClusterSnapshot() - if err := snapshot.AddNodes(nodes); err != nil { + if err := snapshot.SetClusterState(nodes, nil); err != nil { assert.NoError(b, err) } b.ResetTimer() diff --git a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go index f9ce65162580..4eeb67253558 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go +++ b/cluster-autoscaler/simulator/clustersnapshot/clustersnapshot_test.go @@ -75,12 +75,8 @@ func getSnapshotState(t *testing.T, snapshot ClusterSnapshot) snapshotState { func startSnapshot(t *testing.T, snapshotFactory func() ClusterSnapshot, state snapshotState) ClusterSnapshot { snapshot := snapshotFactory() - err := snapshot.AddNodes(state.nodes) + err := snapshot.SetClusterState(state.nodes, state.pods) assert.NoError(t, err) - for _, pod := range state.pods { - err := snapshot.AddPod(pod, pod.Spec.NodeName) - assert.NoError(t, err) - } return snapshot } @@ -98,9 +94,9 @@ func validTestCases(t *testing.T) []modificationTestCase { testCases := []modificationTestCase{ { - name: "add node", + name: "add empty nodeInfo", op: func(snapshot ClusterSnapshot) { - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) }, modifiedState: snapshotState{ @@ -108,9 +104,9 @@ func validTestCases(t *testing.T) []modificationTestCase { }, }, { - name: "add node with pods", + name: "add nodeInfo", op: func(snapshot ClusterSnapshot) { - err := snapshot.AddNodeWithPods(node, []*apiv1.Pod{pod}) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod)) assert.NoError(t, err) }, modifiedState: snapshotState{ @@ -119,25 +115,25 @@ func validTestCases(t *testing.T) []modificationTestCase { }, }, { - name: "remove node", + name: "remove nodeInfo", state: snapshotState{ nodes: []*apiv1.Node{node}, }, op: func(snapshot ClusterSnapshot) { - err := snapshot.RemoveNode(node.Name) + err := snapshot.RemoveNodeInfo(node.Name) assert.NoError(t, err) }, }, { - name: "remove node, then add it back", + name: "remove nodeInfo, then add it back", state: snapshotState{ nodes: []*apiv1.Node{node}, }, op: func(snapshot ClusterSnapshot) { - err := snapshot.RemoveNode(node.Name) + err := snapshot.RemoveNodeInfo(node.Name) assert.NoError(t, err) - err = snapshot.AddNode(node) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) }, modifiedState: snapshotState{ @@ -145,14 +141,14 @@ func validTestCases(t *testing.T) []modificationTestCase { }, }, { - name: "add pod, then remove node", + name: "add pod, then remove nodeInfo", state: snapshotState{ nodes: []*apiv1.Node{node}, }, op: func(snapshot ClusterSnapshot) { - err := snapshot.AddPod(pod, node.Name) + err := snapshot.ForceAddPod(pod, node.Name) assert.NoError(t, err) - err = snapshot.RemoveNode(node.Name) + err = snapshot.RemoveNodeInfo(node.Name) assert.NoError(t, err) }, }, @@ -203,7 +199,7 @@ func TestForking(t *testing.T) { tc.op(snapshot) snapshot.Fork() - snapshot.AddNode(node) + snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) snapshot.Revert() snapshot.Revert() @@ -247,7 +243,7 @@ func TestForking(t *testing.T) { snapshot.Fork() tc.op(snapshot) snapshot.Fork() - snapshot.AddNode(node) + snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) snapshot.Revert() err := snapshot.Commit() assert.NoError(t, err) @@ -279,7 +275,7 @@ func TestForking(t *testing.T) { } } -func TestClear(t *testing.T) { +func TestSetClusterState(t *testing.T) { // Run with -count=1 to avoid caching. localRand := rand.New(rand.NewSource(time.Now().Unix())) @@ -313,10 +309,21 @@ func TestClear(t *testing.T) { snapshot := startSnapshot(t, snapshotFactory, state) compareStates(t, state, getSnapshotState(t, snapshot)) - snapshot.Clear() + assert.NoError(t, snapshot.SetClusterState(nil, nil)) compareStates(t, snapshotState{}, getSnapshotState(t, snapshot)) }) + t.Run(fmt.Sprintf("%s: clear base %d nodes %d pods and set a new state", name, nodeCount, podCount), + func(t *testing.T) { + snapshot := startSnapshot(t, snapshotFactory, state) + compareStates(t, state, getSnapshotState(t, snapshot)) + + newNodes, newPods := createTestNodes(13), createTestPods(37) + assignPodsToNodes(newPods, newNodes) + assert.NoError(t, snapshot.SetClusterState(newNodes, newPods)) + + compareStates(t, snapshotState{nodes: newNodes, pods: newPods}, getSnapshotState(t, snapshot)) + }) t.Run(fmt.Sprintf("%s: clear fork %d nodes %d pods %d extra nodes %d extra pods", name, nodeCount, podCount, extraNodeCount, extraPodCount), func(t *testing.T) { snapshot := startSnapshot(t, snapshotFactory, state) @@ -324,23 +331,24 @@ func TestClear(t *testing.T) { snapshot.Fork() - err := snapshot.AddNodes(extraNodes) - assert.NoError(t, err) + for _, node := range extraNodes { + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) + assert.NoError(t, err) + } for _, pod := range extraPods { - err := snapshot.AddPod(pod, pod.Spec.NodeName) + err := snapshot.ForceAddPod(pod, pod.Spec.NodeName) assert.NoError(t, err) } compareStates(t, snapshotState{allNodes, allPods}, getSnapshotState(t, snapshot)) - snapshot.Clear() + assert.NoError(t, snapshot.SetClusterState(nil, nil)) compareStates(t, snapshotState{}, getSnapshotState(t, snapshot)) - // Clear() should break out of forked state. + // SetClusterState() should break out of forked state. snapshot.Fork() - assert.NoError(t, err) }) } } @@ -352,17 +360,17 @@ func TestNode404(t *testing.T) { op func(ClusterSnapshot) error }{ {"add pod", func(snapshot ClusterSnapshot) error { - return snapshot.AddPod(BuildTestPod("p1", 0, 0), "node") + return snapshot.ForceAddPod(BuildTestPod("p1", 0, 0), "node") }}, {"remove pod", func(snapshot ClusterSnapshot) error { - return snapshot.RemovePod("default", "p1", "node") + return snapshot.ForceRemovePod("default", "p1", "node") }}, {"get node", func(snapshot ClusterSnapshot) error { _, err := snapshot.NodeInfos().Get("node") return err }}, - {"remove node", func(snapshot ClusterSnapshot) error { - return snapshot.RemoveNode("node") + {"remove nodeInfo", func(snapshot ClusterSnapshot) error { + return snapshot.RemoveNodeInfo("node") }}, } @@ -382,13 +390,13 @@ func TestNode404(t *testing.T) { snapshot := snapshotFactory() node := BuildTestNode("node", 10, 100) - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) snapshot.Fork() assert.NoError(t, err) - err = snapshot.RemoveNode("node") + err = snapshot.RemoveNodeInfo("node") assert.NoError(t, err) // Node deleted after fork - shouldn't be able to operate on it. @@ -408,10 +416,10 @@ func TestNode404(t *testing.T) { snapshot := snapshotFactory() node := BuildTestNode("node", 10, 100) - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) - err = snapshot.RemoveNode("node") + err = snapshot.RemoveNodeInfo("node") assert.NoError(t, err) // Node deleted from base - shouldn't be able to operate on it. @@ -431,11 +439,8 @@ func TestNodeAlreadyExists(t *testing.T) { name string op func(ClusterSnapshot) error }{ - {"add node", func(snapshot ClusterSnapshot) error { - return snapshot.AddNode(node) - }}, - {"add node with pod", func(snapshot ClusterSnapshot) error { - return snapshot.AddNodeWithPods(node, []*apiv1.Pod{pod}) + {"add nodeInfo", func(snapshot ClusterSnapshot) error { + return snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod)) }}, } @@ -445,7 +450,7 @@ func TestNodeAlreadyExists(t *testing.T) { func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) // Node already in base. @@ -457,7 +462,7 @@ func TestNodeAlreadyExists(t *testing.T) { func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) snapshot.Fork() @@ -474,7 +479,7 @@ func TestNodeAlreadyExists(t *testing.T) { snapshot.Fork() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) // Node already in fork. @@ -487,7 +492,7 @@ func TestNodeAlreadyExists(t *testing.T) { snapshot.Fork() - err := snapshot.AddNode(node) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err) err = snapshot.Commit() @@ -624,17 +629,17 @@ func TestPVCUsedByPods(t *testing.T) { for _, tc := range testcase { t.Run(fmt.Sprintf("%s with snapshot (%s)", tc.desc, snapshotName), func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNodeWithPods(tc.node, tc.pods) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(tc.node, tc.pods...)) assert.NoError(t, err) - volumeExists := snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName)) + volumeExists := snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName)) assert.Equal(t, tc.exists, volumeExists) if tc.removePod != "" { - err = snapshot.RemovePod("default", tc.removePod, "node") + err = snapshot.ForceRemovePod("default", tc.removePod, "node") assert.NoError(t, err) - volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName)) + volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName)) assert.Equal(t, tc.existsAfterRemove, volumeExists) } }) @@ -694,38 +699,38 @@ func TestPVCClearAndFork(t *testing.T) { for snapshotName, snapshotFactory := range snapshots { t.Run(fmt.Sprintf("fork and revert snapshot with pvc pods with snapshot: %s", snapshotName), func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNodeWithPods(node, []*apiv1.Pod{pod1}) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod1)) assert.NoError(t, err) - volumeExists := snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) + volumeExists := snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) assert.Equal(t, true, volumeExists) snapshot.Fork() assert.NoError(t, err) - volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) + volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) assert.Equal(t, true, volumeExists) - err = snapshot.AddPod(pod2, "node") + err = snapshot.ForceAddPod(pod2, "node") assert.NoError(t, err) - volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2")) + volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2")) assert.Equal(t, true, volumeExists) snapshot.Revert() - volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2")) + volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2")) assert.Equal(t, false, volumeExists) }) t.Run(fmt.Sprintf("clear snapshot with pvc pods with snapshot: %s", snapshotName), func(t *testing.T) { snapshot := snapshotFactory() - err := snapshot.AddNodeWithPods(node, []*apiv1.Pod{pod1}) + err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod1)) assert.NoError(t, err) - volumeExists := snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) + volumeExists := snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) assert.Equal(t, true, volumeExists) - snapshot.Clear() - volumeExists = snapshot.IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) + assert.NoError(t, snapshot.SetClusterState(nil, nil)) + volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1")) assert.Equal(t, false, volumeExists) }) diff --git a/cluster-autoscaler/simulator/clustersnapshot/delta.go b/cluster-autoscaler/simulator/clustersnapshot/delta.go index 9559b43dbb78..869e494e0226 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/delta.go +++ b/cluster-autoscaler/simulator/clustersnapshot/delta.go @@ -136,16 +136,6 @@ func (data *internalDeltaSnapshotData) buildNodeInfoList() []*schedulerframework return nodeInfoList } -// Convenience method to avoid writing loop for adding nodes. -func (data *internalDeltaSnapshotData) addNodes(nodes []*apiv1.Node) error { - for _, node := range nodes { - if err := data.addNode(node); err != nil { - return err - } - } - return nil -} - func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) error { nodeInfo := schedulerframework.NewNodeInfo() nodeInfo.SetNode(node) @@ -187,7 +177,7 @@ func (data *internalDeltaSnapshotData) clearPodCaches() { data.pvcNamespaceMap = nil } -func (data *internalDeltaSnapshotData) removeNode(nodeName string) error { +func (data *internalDeltaSnapshotData) removeNodeInfo(nodeName string) error { _, foundInDelta := data.addedNodeInfoMap[nodeName] if foundInDelta { // If node was added within this delta, delete this change. @@ -306,12 +296,12 @@ func (data *internalDeltaSnapshotData) commit() (*internalDeltaSnapshotData, err return data, nil } for node := range data.deletedNodeInfos { - if err := data.baseData.removeNode(node); err != nil { + if err := data.baseData.removeNodeInfo(node); err != nil { return nil, err } } for _, node := range data.modifiedNodeInfoMap { - if err := data.baseData.removeNode(node.Node().Name); err != nil { + if err := data.baseData.removeNodeInfo(node.Node().Name); err != nil { return nil, err } if err := data.baseData.addNodeInfo(node); err != nil { @@ -399,7 +389,7 @@ func (snapshot *DeltaClusterSnapshot) StorageInfos() schedulerframework.StorageI // NewDeltaClusterSnapshot creates instances of DeltaClusterSnapshot. func NewDeltaClusterSnapshot() *DeltaClusterSnapshot { snapshot := &DeltaClusterSnapshot{} - snapshot.Clear() + snapshot.clear() return snapshot } @@ -431,41 +421,39 @@ func (snapshot *DeltaClusterSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo) return nil } -// AddNode adds node to the snapshot. -func (snapshot *DeltaClusterSnapshot) AddNode(node *apiv1.Node) error { - return snapshot.data.addNode(node) -} - -// AddNodes adds nodes in batch to the snapshot. -func (snapshot *DeltaClusterSnapshot) AddNodes(nodes []*apiv1.Node) error { - return snapshot.data.addNodes(nodes) -} +// SetClusterState sets the cluster state. +func (snapshot *DeltaClusterSnapshot) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error { + snapshot.clear() -// AddNodeWithPods adds a node and set of pods to be scheduled to this node to the snapshot. -func (snapshot *DeltaClusterSnapshot) AddNodeWithPods(node *apiv1.Node, pods []*apiv1.Pod) error { - if err := snapshot.AddNode(node); err != nil { - return err - } - for _, pod := range pods { - if err := snapshot.AddPod(pod, node.Name); err != nil { + knownNodes := make(map[string]bool) + for _, node := range nodes { + if err := snapshot.data.addNode(node); err != nil { return err } + knownNodes[node.Name] = true + } + for _, pod := range scheduledPods { + if knownNodes[pod.Spec.NodeName] { + if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil { + return err + } + } } return nil } -// RemoveNode removes nodes (and pods scheduled to it) from the snapshot. -func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error { - return snapshot.data.removeNode(nodeName) +// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot. +func (snapshot *DeltaClusterSnapshot) RemoveNodeInfo(nodeName string) error { + return snapshot.data.removeNodeInfo(nodeName) } -// AddPod adds pod to the snapshot and schedules it to given node. -func (snapshot *DeltaClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error { +// ForceAddPod adds pod to the snapshot and schedules it to given node. +func (snapshot *DeltaClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error { return snapshot.data.addPod(pod, nodeName) } -// RemovePod removes pod from the snapshot. -func (snapshot *DeltaClusterSnapshot) RemovePod(namespace, podName, nodeName string) error { +// ForceRemovePod removes pod from the snapshot. +func (snapshot *DeltaClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error { return snapshot.data.removePod(namespace, podName, nodeName) } @@ -501,6 +489,6 @@ func (snapshot *DeltaClusterSnapshot) Commit() error { // Clear reset cluster snapshot to empty, unforked state // Time: O(1) -func (snapshot *DeltaClusterSnapshot) Clear() { +func (snapshot *DeltaClusterSnapshot) clear() { snapshot.data = newInternalDeltaSnapshotData() } diff --git a/cluster-autoscaler/simulator/clustersnapshot/test_utils.go b/cluster-autoscaler/simulator/clustersnapshot/test_utils.go index 501756fe2438..f0cd8c67546e 100644 --- a/cluster-autoscaler/simulator/clustersnapshot/test_utils.go +++ b/cluster-autoscaler/simulator/clustersnapshot/test_utils.go @@ -20,7 +20,9 @@ import ( "testing" "github.com/stretchr/testify/assert" + apiv1 "k8s.io/api/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" ) // InitializeClusterSnapshotOrDie clears cluster snapshot and then initializes it with given set of nodes and pods. @@ -32,19 +34,19 @@ func InitializeClusterSnapshotOrDie( pods []*apiv1.Pod) { var err error - snapshot.Clear() + assert.NoError(t, snapshot.SetClusterState(nil, nil)) for _, node := range nodes { - err = snapshot.AddNode(node) + err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)) assert.NoError(t, err, "error while adding node %s", node.Name) } for _, pod := range pods { if pod.Spec.NodeName != "" { - err = snapshot.AddPod(pod, pod.Spec.NodeName) + err = snapshot.ForceAddPod(pod, pod.Spec.NodeName) assert.NoError(t, err, "error while adding pod %s/%s to node %s", pod.Namespace, pod.Name, pod.Spec.NodeName) } else if pod.Status.NominatedNodeName != "" { - err = snapshot.AddPod(pod, pod.Status.NominatedNodeName) + err = snapshot.ForceAddPod(pod, pod.Status.NominatedNodeName) assert.NoError(t, err, "error while adding pod %s/%s to nominated node %s", pod.Namespace, pod.Name, pod.Status.NominatedNodeName) } else { assert.Fail(t, "pod %s/%s does not have Spec.NodeName nor Status.NominatedNodeName set", pod.Namespace, pod.Name) diff --git a/cluster-autoscaler/simulator/framework/infos.go b/cluster-autoscaler/simulator/framework/infos.go index c3af45d08258..c9062d170f8e 100644 --- a/cluster-autoscaler/simulator/framework/infos.go +++ b/cluster-autoscaler/simulator/framework/infos.go @@ -93,6 +93,23 @@ func (n *NodeInfo) ToScheduler() *schedulerframework.NodeInfo { return n.schedNodeInfo } +// DeepCopy clones the NodeInfo. +func (n *NodeInfo) DeepCopy() *NodeInfo { + var newPods []*PodInfo + for _, podInfo := range n.Pods() { + var newClaims []*resourceapi.ResourceClaim + for _, claim := range podInfo.NeededResourceClaims { + newClaims = append(newClaims, claim.DeepCopy()) + } + newPods = append(newPods, &PodInfo{Pod: podInfo.Pod.DeepCopy(), NeededResourceClaims: newClaims}) + } + var newSlices []*resourceapi.ResourceSlice + for _, slice := range n.LocalResourceSlices { + newSlices = append(newSlices, slice.DeepCopy()) + } + return NewNodeInfo(n.Node().DeepCopy(), newSlices, newPods...) +} + // NewNodeInfo returns a new internal NodeInfo from the provided data. func NewNodeInfo(node *apiv1.Node, slices []*resourceapi.ResourceSlice, pods ...*PodInfo) *NodeInfo { result := &NodeInfo{ diff --git a/cluster-autoscaler/simulator/framework/infos_test.go b/cluster-autoscaler/simulator/framework/infos_test.go index e6f997129253..be92e6f762ea 100644 --- a/cluster-autoscaler/simulator/framework/infos_test.go +++ b/cluster-autoscaler/simulator/framework/infos_test.go @@ -208,6 +208,64 @@ func TestNodeInfo(t *testing.T) { } } +func TestDeepCopyNodeInfo(t *testing.T) { + node := test.BuildTestNode("node", 1000, 1000) + pods := []*PodInfo{ + {Pod: test.BuildTestPod("p1", 80, 0, test.WithNodeName(node.Name))}, + { + Pod: test.BuildTestPod("p2", 80, 0, test.WithNodeName(node.Name)), + NeededResourceClaims: []*resourceapi.ResourceClaim{ + {ObjectMeta: v1.ObjectMeta{Name: "claim1"}, Spec: resourceapi.ResourceClaimSpec{Devices: resourceapi.DeviceClaim{Requests: []resourceapi.DeviceRequest{{Name: "req1"}}}}}, + {ObjectMeta: v1.ObjectMeta{Name: "claim2"}, Spec: resourceapi.ResourceClaimSpec{Devices: resourceapi.DeviceClaim{Requests: []resourceapi.DeviceRequest{{Name: "req2"}}}}}, + }, + }, + } + slices := []*resourceapi.ResourceSlice{ + {ObjectMeta: v1.ObjectMeta{Name: "slice1"}, Spec: resourceapi.ResourceSliceSpec{NodeName: "node"}}, + {ObjectMeta: v1.ObjectMeta{Name: "slice2"}, Spec: resourceapi.ResourceSliceSpec{NodeName: "node"}}, + } + nodeInfo := NewNodeInfo(node, slices, pods...) + + // Verify that the contents are identical after copying. + nodeInfoCopy := nodeInfo.DeepCopy() + if diff := cmp.Diff(nodeInfo, nodeInfoCopy, + cmp.AllowUnexported(schedulerframework.NodeInfo{}, NodeInfo{}, PodInfo{}, podExtraInfo{}), + // We don't care about this field staying the same, and it differs because it's a global counter bumped + // on every AddPod. + cmpopts.IgnoreFields(schedulerframework.NodeInfo{}, "Generation"), + ); diff != "" { + t.Errorf("nodeInfo differs after DeepCopyNodeInfo, diff (-want +got): %s", diff) + } + + // Verify that the object addresses changed in the copy. + if nodeInfo == nodeInfoCopy { + t.Error("nodeInfo address identical after DeepCopyNodeInfo") + } + if nodeInfo.ToScheduler() == nodeInfoCopy.ToScheduler() { + t.Error("schedulerframework.NodeInfo address identical after DeepCopyNodeInfo") + } + for i := range len(nodeInfo.LocalResourceSlices) { + if nodeInfo.LocalResourceSlices[i] == nodeInfoCopy.LocalResourceSlices[i] { + t.Errorf("%d-th LocalResourceSlice address identical after DeepCopyNodeInfo", i) + } + } + for podIndex := range len(pods) { + oldPodInfo := nodeInfo.Pods()[podIndex] + newPodInfo := nodeInfoCopy.Pods()[podIndex] + if oldPodInfo == newPodInfo { + t.Errorf("%d-th PodInfo address identical after DeepCopyNodeInfo", podIndex) + } + if oldPodInfo.Pod == newPodInfo.Pod { + t.Errorf("%d-th PodInfo.Pod address identical after DeepCopyNodeInfo", podIndex) + } + for claimIndex := range len(newPodInfo.NeededResourceClaims) { + if oldPodInfo.NeededResourceClaims[podIndex] == newPodInfo.NeededResourceClaims[podIndex] { + t.Errorf("%d-th PodInfo - %d-th NeededResourceClaim address identical after DeepCopyNodeInfo", podIndex, claimIndex) + } + } + } +} + func testPodInfos(pods []*apiv1.Pod, addClaims bool) []*PodInfo { var result []*PodInfo for _, pod := range pods { diff --git a/cluster-autoscaler/simulator/node_info_utils.go b/cluster-autoscaler/simulator/node_info_utils.go new file mode 100644 index 000000000000..943893db67a3 --- /dev/null +++ b/cluster-autoscaler/simulator/node_info_utils.go @@ -0,0 +1,153 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simulator + +import ( + "fmt" + "math/rand" + + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + "k8s.io/autoscaler/cluster-autoscaler/utils/daemonset" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/labels" + pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" +) + +type nodeGroupTemplateNodeInfoGetter interface { + Id() string + TemplateNodeInfo() (*framework.NodeInfo, error) +} + +// TemplateNodeInfoFromNodeGroupTemplate returns a template NodeInfo object based on NodeGroup.TemplateNodeInfo(). The template is sanitized, and only +// contains the pods that should appear on a new Node from the same node group (e.g. DaemonSet pods). +func TemplateNodeInfoFromNodeGroupTemplate(nodeGroup nodeGroupTemplateNodeInfoGetter, daemonsets []*appsv1.DaemonSet, taintConfig taints.TaintConfig) (*framework.NodeInfo, errors.AutoscalerError) { + id := nodeGroup.Id() + baseNodeInfo, err := nodeGroup.TemplateNodeInfo() + if err != nil { + return nil, errors.ToAutoscalerError(errors.CloudProviderError, err) + } + labels.UpdateDeprecatedLabels(baseNodeInfo.Node().ObjectMeta.Labels) + + return TemplateNodeInfoFromExampleNodeInfo(baseNodeInfo, id, daemonsets, true, taintConfig) +} + +// TemplateNodeInfoFromExampleNodeInfo returns a template NodeInfo object based on a real example NodeInfo from the cluster. The template is sanitized, and only +// contains the pods that should appear on a new Node from the same node group (e.g. DaemonSet pods). +func TemplateNodeInfoFromExampleNodeInfo(example *framework.NodeInfo, nodeGroupId string, daemonsets []*appsv1.DaemonSet, forceDaemonSets bool, taintConfig taints.TaintConfig) (*framework.NodeInfo, errors.AutoscalerError) { + randSuffix := fmt.Sprintf("%d", rand.Int63()) + newNodeNameBase := fmt.Sprintf("template-node-for-%s", nodeGroupId) + + // We need to sanitize the example before determining the DS pods, since taints are checked there, and + // we might need to filter some out during sanitization. + sanitizedExample := sanitizeNodeInfo(example, newNodeNameBase, randSuffix, &taintConfig) + expectedPods, err := podsExpectedOnFreshNode(sanitizedExample, daemonsets, forceDaemonSets, randSuffix) + if err != nil { + return nil, err + } + // No need to sanitize the expected pods again - they either come from sanitizedExample and were sanitized above, + // or were added by podsExpectedOnFreshNode and sanitized there. + return framework.NewNodeInfo(sanitizedExample.Node(), nil, expectedPods...), nil +} + +// FreshNodeInfoFromTemplateNodeInfo duplicates the provided template NodeInfo, returning a fresh NodeInfo that can be injected into the cluster snapshot. +// The NodeInfo is sanitized (names, UIDs are changed, etc.), so that it can be injected along other copies created from the same template. +func FreshNodeInfoFromTemplateNodeInfo(template *framework.NodeInfo, suffix string) *framework.NodeInfo { + // Template node infos should already have taints and pods filtered, so not setting these parameters. + return sanitizeNodeInfo(template, template.Node().Name, suffix, nil) +} + +func sanitizeNodeInfo(nodeInfo *framework.NodeInfo, newNodeNameBase string, namesSuffix string, taintConfig *taints.TaintConfig) *framework.NodeInfo { + freshNodeName := fmt.Sprintf("%s-%s", newNodeNameBase, namesSuffix) + freshNode := sanitizeNode(nodeInfo.Node(), freshNodeName, taintConfig) + result := framework.NewNodeInfo(freshNode, nil) + + for _, podInfo := range nodeInfo.Pods() { + freshPod := sanitizePod(podInfo.Pod, freshNode.Name, namesSuffix) + result.AddPod(&framework.PodInfo{Pod: freshPod}) + } + return result +} + +func sanitizeNode(node *apiv1.Node, newName string, taintConfig *taints.TaintConfig) *apiv1.Node { + newNode := node.DeepCopy() + newNode.UID = uuid.NewUUID() + + newNode.Name = newName + if newNode.Labels == nil { + newNode.Labels = make(map[string]string) + } + newNode.Labels[apiv1.LabelHostname] = newName + + if taintConfig != nil { + newNode.Spec.Taints = taints.SanitizeTaints(newNode.Spec.Taints, *taintConfig) + } + return newNode +} + +func sanitizePod(pod *apiv1.Pod, nodeName, nameSuffix string) *apiv1.Pod { + sanitizedPod := pod.DeepCopy() + sanitizedPod.UID = uuid.NewUUID() + sanitizedPod.Name = fmt.Sprintf("%s-%s", pod.Name, nameSuffix) + sanitizedPod.Spec.NodeName = nodeName + return sanitizedPod +} + +func podsExpectedOnFreshNode(sanitizedExampleNodeInfo *framework.NodeInfo, daemonsets []*appsv1.DaemonSet, forceDaemonSets bool, nameSuffix string) ([]*framework.PodInfo, errors.AutoscalerError) { + var result []*framework.PodInfo + runningDS := make(map[types.UID]bool) + for _, pod := range sanitizedExampleNodeInfo.Pods() { + // Ignore scheduled pods in deletion phase + if pod.DeletionTimestamp != nil { + continue + } + // Add scheduled mirror and DS pods + if pod_util.IsMirrorPod(pod.Pod) || pod_util.IsDaemonSetPod(pod.Pod) { + result = append(result, pod) + } + // Mark DS pods as running + controllerRef := metav1.GetControllerOf(pod) + if controllerRef != nil && controllerRef.Kind == "DaemonSet" { + runningDS[controllerRef.UID] = true + } + } + // Add all pending DS pods if force scheduling DS + if forceDaemonSets { + var pendingDS []*appsv1.DaemonSet + for _, ds := range daemonsets { + if !runningDS[ds.UID] { + pendingDS = append(pendingDS, ds) + } + } + // The provided nodeInfo has to have taints properly sanitized, or this won't work correctly. + daemonPods, err := daemonset.GetDaemonSetPodsForNode(sanitizedExampleNodeInfo, pendingDS) + if err != nil { + return nil, errors.ToAutoscalerError(errors.InternalError, err) + } + for _, pod := range daemonPods { + // There's technically no need to sanitize these pods since they're created from scratch, but + // it's nice to have the same suffix for all names in one sanitized NodeInfo when debugging. + result = append(result, &framework.PodInfo{Pod: sanitizePod(pod.Pod, sanitizedExampleNodeInfo.Node().Name, nameSuffix)}) + } + } + return result, nil +} diff --git a/cluster-autoscaler/simulator/node_info_utils_test.go b/cluster-autoscaler/simulator/node_info_utils_test.go new file mode 100644 index 000000000000..00350eb36947 --- /dev/null +++ b/cluster-autoscaler/simulator/node_info_utils_test.go @@ -0,0 +1,510 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package simulator + +import ( + "fmt" + "math/rand" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/autoscaler/cluster-autoscaler/utils/taints" + . "k8s.io/autoscaler/cluster-autoscaler/utils/test" + "k8s.io/kubernetes/pkg/controller/daemon" +) + +var ( + ds1 = &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds1", + Namespace: "ds1-namespace", + UID: types.UID("ds1"), + }, + } + ds2 = &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds2", + Namespace: "ds2-namespace", + UID: types.UID("ds2"), + }, + } + ds3 = &appsv1.DaemonSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "ds3", + Namespace: "ds3-namespace", + UID: types.UID("ds3"), + }, + Spec: appsv1.DaemonSetSpec{ + Template: apiv1.PodTemplateSpec{ + Spec: apiv1.PodSpec{ + NodeSelector: map[string]string{"key": "value"}, + }, + }, + }, + } + testDaemonSets = []*appsv1.DaemonSet{ds1, ds2, ds3} +) + +func TestTemplateNodeInfoFromNodeGroupTemplate(t *testing.T) { + exampleNode := BuildTestNode("n", 1000, 10) + exampleNode.Spec.Taints = []apiv1.Taint{ + {Key: taints.ToBeDeletedTaint, Value: "2312532423", Effect: apiv1.TaintEffectNoSchedule}, + } + + for _, tc := range []struct { + testName string + nodeGroup *fakeNodeGroup + + wantPods []*apiv1.Pod + wantCpError bool + }{ + { + testName: "node group error results in an error", + nodeGroup: &fakeNodeGroup{templateNodeInfoErr: fmt.Errorf("test error")}, + wantCpError: true, + }, + { + testName: "simple template with no pods", + nodeGroup: &fakeNodeGroup{ + templateNodeInfoResult: framework.NewNodeInfo(exampleNode, nil), + }, + wantPods: []*apiv1.Pod{ + buildDSPod(ds1, "n"), + buildDSPod(ds2, "n"), + }, + }, + { + testName: "template with all kinds of pods", + nodeGroup: &fakeNodeGroup{ + templateNodeInfoResult: framework.NewNodeInfo(exampleNode, nil, + &framework.PodInfo{Pod: BuildScheduledTestPod("p1", 100, 1, "n")}, + &framework.PodInfo{Pod: BuildScheduledTestPod("p2", 100, 1, "n")}, + &framework.PodInfo{Pod: SetMirrorPodSpec(BuildScheduledTestPod("p3", 100, 1, "n"))}, + &framework.PodInfo{Pod: setDeletionTimestamp(SetMirrorPodSpec(BuildScheduledTestPod("p4", 100, 1, "n")))}, + &framework.PodInfo{Pod: buildDSPod(ds1, "n")}, + &framework.PodInfo{Pod: setDeletionTimestamp(buildDSPod(ds2, "n"))}, + ), + }, + wantPods: []*apiv1.Pod{ + SetMirrorPodSpec(BuildScheduledTestPod("p3", 100, 1, "n")), + buildDSPod(ds1, "n"), + buildDSPod(ds2, "n"), + }, + }, + } { + t.Run(tc.testName, func(t *testing.T) { + templateNodeInfo, err := TemplateNodeInfoFromNodeGroupTemplate(tc.nodeGroup, testDaemonSets, taints.TaintConfig{}) + if tc.wantCpError { + if err == nil || err.Type() != errors.CloudProviderError { + t.Fatalf("TemplateNodeInfoFromNodeGroupTemplate(): want CloudProviderError, but got: %v (%T)", err, err) + } else { + return + } + } + if err != nil { + t.Fatalf("TemplateNodeInfoFromNodeGroupTemplate(): expected no error, but got %v", err) + } + + // Verify that the taints are correctly sanitized. + // Verify that the NodeInfo is sanitized using the node group id as base. + // Pass empty string as nameSuffix so that it's auto-determined from the sanitized templateNodeInfo, because + // TemplateNodeInfoFromNodeGroupTemplate randomizes the suffix. + // Pass non-empty expectedPods to verify that the set of pods is changed as expected (e.g. DS pods added, non-DS/deleted pods removed). + if err := verifyNodeInfoSanitization(tc.nodeGroup.templateNodeInfoResult, templateNodeInfo, tc.wantPods, "template-node-for-"+tc.nodeGroup.id, "", nil); err != nil { + t.Fatalf("TemplateNodeInfoFromExampleNodeInfo(): NodeInfo wasn't properly sanitized: %v", err) + } + }) + } +} + +func TestTemplateNodeInfoFromExampleNodeInfo(t *testing.T) { + exampleNode := BuildTestNode("n", 1000, 10) + exampleNode.Spec.Taints = []apiv1.Taint{ + {Key: taints.ToBeDeletedTaint, Value: "2312532423", Effect: apiv1.TaintEffectNoSchedule}, + } + + testCases := []struct { + name string + pods []*apiv1.Pod + daemonSets []*appsv1.DaemonSet + forceDS bool + + wantPods []*apiv1.Pod + wantError bool + }{ + { + name: "node without any pods", + }, + { + name: "node with non-DS/mirror pods", + pods: []*apiv1.Pod{ + BuildScheduledTestPod("p1", 100, 1, "n"), + BuildScheduledTestPod("p2", 100, 1, "n"), + }, + wantPods: []*apiv1.Pod{}, + }, + { + name: "node with a mirror pod", + pods: []*apiv1.Pod{ + SetMirrorPodSpec(BuildScheduledTestPod("p1", 100, 1, "n")), + }, + wantPods: []*apiv1.Pod{ + SetMirrorPodSpec(BuildScheduledTestPod("p1", 100, 1, "n")), + }, + }, + { + name: "node with a deleted mirror pod", + pods: []*apiv1.Pod{ + SetMirrorPodSpec(BuildScheduledTestPod("p1", 100, 1, "n")), + setDeletionTimestamp(SetMirrorPodSpec(BuildScheduledTestPod("p2", 100, 1, "n"))), + }, + wantPods: []*apiv1.Pod{ + SetMirrorPodSpec(BuildScheduledTestPod("p1", 100, 1, "n")), + }, + }, + { + name: "node with DS pods [forceDS=false, no daemon sets]", + pods: []*apiv1.Pod{ + buildDSPod(ds1, "n"), + setDeletionTimestamp(buildDSPod(ds2, "n")), + }, + wantPods: []*apiv1.Pod{ + buildDSPod(ds1, "n"), + }, + }, + { + name: "node with DS pods [forceDS=false, some daemon sets]", + pods: []*apiv1.Pod{ + buildDSPod(ds1, "n"), + setDeletionTimestamp(buildDSPod(ds2, "n")), + }, + daemonSets: testDaemonSets, + wantPods: []*apiv1.Pod{ + buildDSPod(ds1, "n"), + }, + }, + { + name: "node with a DS pod [forceDS=true, no daemon sets]", + pods: []*apiv1.Pod{ + buildDSPod(ds1, "n"), + setDeletionTimestamp(buildDSPod(ds2, "n")), + }, + wantPods: []*apiv1.Pod{ + buildDSPod(ds1, "n"), + }, + forceDS: true, + }, + { + name: "node with a DS pod [forceDS=true, some daemon sets]", + pods: []*apiv1.Pod{ + buildDSPod(ds1, "n"), + setDeletionTimestamp(buildDSPod(ds2, "n")), + }, + daemonSets: testDaemonSets, + forceDS: true, + wantPods: []*apiv1.Pod{ + buildDSPod(ds1, "n"), + buildDSPod(ds2, "n"), + }, + }, + { + name: "everything together [forceDS=false]", + pods: []*apiv1.Pod{ + BuildScheduledTestPod("p1", 100, 1, "n"), + BuildScheduledTestPod("p2", 100, 1, "n"), + SetMirrorPodSpec(BuildScheduledTestPod("p3", 100, 1, "n")), + setDeletionTimestamp(SetMirrorPodSpec(BuildScheduledTestPod("p4", 100, 1, "n"))), + buildDSPod(ds1, "n"), + setDeletionTimestamp(buildDSPod(ds2, "n")), + }, + daemonSets: testDaemonSets, + wantPods: []*apiv1.Pod{ + SetMirrorPodSpec(BuildScheduledTestPod("p3", 100, 1, "n")), + buildDSPod(ds1, "n"), + }, + }, + { + name: "everything together [forceDS=true]", + pods: []*apiv1.Pod{ + BuildScheduledTestPod("p1", 100, 1, "n"), + BuildScheduledTestPod("p2", 100, 1, "n"), + SetMirrorPodSpec(BuildScheduledTestPod("p3", 100, 1, "n")), + setDeletionTimestamp(SetMirrorPodSpec(BuildScheduledTestPod("p4", 100, 1, "n"))), + buildDSPod(ds1, "n"), + setDeletionTimestamp(buildDSPod(ds2, "n")), + }, + daemonSets: testDaemonSets, + forceDS: true, + wantPods: []*apiv1.Pod{ + SetMirrorPodSpec(BuildScheduledTestPod("p3", 100, 1, "n")), + buildDSPod(ds1, "n"), + buildDSPod(ds2, "n"), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + nodeGroupId := "nodeGroupId" + exampleNodeInfo := framework.NewNodeInfo(exampleNode, nil) + for _, pod := range tc.pods { + exampleNodeInfo.AddPod(&framework.PodInfo{Pod: pod}) + } + + templateNodeInfo, err := TemplateNodeInfoFromExampleNodeInfo(exampleNodeInfo, nodeGroupId, tc.daemonSets, tc.forceDS, taints.TaintConfig{}) + if tc.wantError { + if err == nil { + t.Fatal("TemplateNodeInfoFromExampleNodeInfo(): want error, but got nil") + } else { + return + } + } + if err != nil { + t.Fatalf("TemplateNodeInfoFromExampleNodeInfo(): expected no error, but got %v", err) + } + + // Verify that the taints are correctly sanitized. + // Verify that the NodeInfo is sanitized using the node group id as base. + // Pass empty string as nameSuffix so that it's auto-determined from the sanitized templateNodeInfo, because + // TemplateNodeInfoFromExampleNodeInfo randomizes the suffix. + // Pass non-empty expectedPods to verify that the set of pods is changed as expected (e.g. DS pods added, non-DS/deleted pods removed). + if err := verifyNodeInfoSanitization(exampleNodeInfo, templateNodeInfo, tc.wantPods, "template-node-for-"+nodeGroupId, "", nil); err != nil { + t.Fatalf("TemplateNodeInfoFromExampleNodeInfo(): NodeInfo wasn't properly sanitized: %v", err) + } + }) + } +} + +func TestFreshNodeInfoFromTemplateNodeInfo(t *testing.T) { + nodeName := "template-node" + templateNode := BuildTestNode(nodeName, 1000, 1000) + templateNode.Spec.Taints = []apiv1.Taint{ + {Key: "startup-taint", Value: "true", Effect: apiv1.TaintEffectNoSchedule}, + {Key: taints.ToBeDeletedTaint, Value: "2312532423", Effect: apiv1.TaintEffectNoSchedule}, + {Key: "a", Value: "b", Effect: apiv1.TaintEffectNoSchedule}, + } + pods := []*framework.PodInfo{ + {Pod: BuildTestPod("p1", 80, 0, WithNodeName(nodeName))}, + {Pod: BuildTestPod("p2", 80, 0, WithNodeName(nodeName))}, + } + templateNodeInfo := framework.NewNodeInfo(templateNode, nil, pods...) + + suffix := "abc" + freshNodeInfo := FreshNodeInfoFromTemplateNodeInfo(templateNodeInfo, suffix) + // Verify that the taints are not sanitized (they should be sanitized in the template already). + // Verify that the NodeInfo is sanitized using the template Node name as base. + initialTaints := templateNodeInfo.Node().Spec.Taints + if err := verifyNodeInfoSanitization(templateNodeInfo, freshNodeInfo, nil, templateNodeInfo.Node().Name, suffix, initialTaints); err != nil { + t.Fatalf("FreshNodeInfoFromTemplateNodeInfo(): NodeInfo wasn't properly sanitized: %v", err) + } +} + +func TestSanitizeNodeInfo(t *testing.T) { + oldNodeName := "old-node" + basicNode := BuildTestNode(oldNodeName, 1000, 1000) + + labelsNode := basicNode.DeepCopy() + labelsNode.Labels = map[string]string{ + apiv1.LabelHostname: oldNodeName, + "a": "b", + "x": "y", + } + + taintsNode := basicNode.DeepCopy() + taintsNode.Spec.Taints = []apiv1.Taint{ + {Key: "startup-taint", Value: "true", Effect: apiv1.TaintEffectNoSchedule}, + {Key: taints.ToBeDeletedTaint, Value: "2312532423", Effect: apiv1.TaintEffectNoSchedule}, + {Key: "a", Value: "b", Effect: apiv1.TaintEffectNoSchedule}, + } + taintConfig := taints.NewTaintConfig(config.AutoscalingOptions{StartupTaints: []string{"startup-taint"}}) + + taintsLabelsNode := labelsNode.DeepCopy() + taintsLabelsNode.Spec.Taints = taintsNode.Spec.Taints + + pods := []*framework.PodInfo{ + {Pod: BuildTestPod("p1", 80, 0, WithNodeName(oldNodeName))}, + {Pod: BuildTestPod("p2", 80, 0, WithNodeName(oldNodeName))}, + } + + for _, tc := range []struct { + testName string + + nodeInfo *framework.NodeInfo + taintConfig *taints.TaintConfig + + wantTaints []apiv1.Taint + }{ + { + testName: "sanitize node", + nodeInfo: framework.NewTestNodeInfo(basicNode), + }, + { + testName: "sanitize node labels", + nodeInfo: framework.NewTestNodeInfo(labelsNode), + }, + { + testName: "sanitize node taints - disabled", + nodeInfo: framework.NewTestNodeInfo(taintsNode), + taintConfig: nil, + wantTaints: taintsNode.Spec.Taints, + }, + { + testName: "sanitize node taints - enabled", + nodeInfo: framework.NewTestNodeInfo(taintsNode), + taintConfig: &taintConfig, + wantTaints: []apiv1.Taint{{Key: "a", Value: "b", Effect: apiv1.TaintEffectNoSchedule}}, + }, + { + testName: "sanitize pods", + nodeInfo: framework.NewNodeInfo(basicNode, nil, pods...), + }, + { + testName: "sanitize everything", + nodeInfo: framework.NewNodeInfo(taintsLabelsNode, nil, pods...), + taintConfig: &taintConfig, + wantTaints: []apiv1.Taint{{Key: "a", Value: "b", Effect: apiv1.TaintEffectNoSchedule}}, + }, + } { + t.Run(tc.testName, func(t *testing.T) { + newNameBase := "node" + suffix := "abc" + sanitizedNodeInfo := sanitizeNodeInfo(tc.nodeInfo, newNameBase, suffix, tc.taintConfig) + if err := verifyNodeInfoSanitization(tc.nodeInfo, sanitizedNodeInfo, nil, newNameBase, suffix, tc.wantTaints); err != nil { + t.Fatalf("sanitizeNodeInfo(): NodeInfo wasn't properly sanitized: %v", err) + } + }) + } +} + +// verifyNodeInfoSanitization verifies whether sanitizedNodeInfo was correctly sanitized starting from initialNodeInfo, with the provided +// nameBase and nameSuffix. The expected taints aren't auto-determined, so wantTaints should always be provided. +// +// If nameSuffix is an empty string, the suffix will be determined from sanitizedNodeInfo. This is useful if +// the test doesn't know/control the name suffix (e.g. because it's randomized by the tested function). +// +// If expectedPods is nil, the set of pods is expected not to change between initialNodeInfo and sanitizedNodeInfo. If the sanitization is +// expected to change the set of pods, the expected set should be passed to expectedPods. +func verifyNodeInfoSanitization(initialNodeInfo, sanitizedNodeInfo *framework.NodeInfo, expectedPods []*apiv1.Pod, nameBase, nameSuffix string, wantTaints []apiv1.Taint) error { + if nameSuffix == "" { + // Determine the suffix from the provided sanitized NodeInfo - it should be the last part of a dash-separated name. + nameParts := strings.Split(sanitizedNodeInfo.Node().Name, "-") + if len(nameParts) < 2 { + return fmt.Errorf("sanitized NodeInfo name unexpected: want format -, got %q", sanitizedNodeInfo.Node().Name) + } + nameSuffix = nameParts[len(nameParts)-1] + } + if expectedPods != nil { + // If the sanitization is expected to change the set of pods, hack the initial NodeInfo to have the expected pods. + // Then we can just compare things pod-by-pod as if the set didn't change. + initialNodeInfo = framework.NewNodeInfo(initialNodeInfo.Node(), nil) + for _, pod := range expectedPods { + initialNodeInfo.AddPod(&framework.PodInfo{Pod: pod}) + } + } + + // Verification below assumes the same set of pods between initialNodeInfo and sanitizedNodeInfo. + wantNodeName := fmt.Sprintf("%s-%s", nameBase, nameSuffix) + if gotName := sanitizedNodeInfo.Node().Name; gotName != wantNodeName { + return fmt.Errorf("want sanitized Node name %q, got %q", wantNodeName, gotName) + } + if gotUid, oldUid := sanitizedNodeInfo.Node().UID, initialNodeInfo.Node().UID; gotUid == "" || gotUid == oldUid { + return fmt.Errorf("sanitized Node UID wasn't randomized - got %q, old UID was %q", gotUid, oldUid) + } + wantLabels := make(map[string]string) + for k, v := range initialNodeInfo.Node().Labels { + wantLabels[k] = v + } + wantLabels[apiv1.LabelHostname] = wantNodeName + if diff := cmp.Diff(wantLabels, sanitizedNodeInfo.Node().Labels); diff != "" { + return fmt.Errorf("sanitized Node labels unexpected, diff (-want +got): %s", diff) + } + if diff := cmp.Diff(wantTaints, sanitizedNodeInfo.Node().Spec.Taints); diff != "" { + return fmt.Errorf("sanitized Node taints unexpected, diff (-want +got): %s", diff) + } + if diff := cmp.Diff(initialNodeInfo.Node(), sanitizedNodeInfo.Node(), + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name", "Labels", "UID"), + cmpopts.IgnoreFields(apiv1.NodeSpec{}, "Taints"), + ); diff != "" { + return fmt.Errorf("sanitized Node unexpected diff (-want +got): %s", diff) + } + + oldPods := initialNodeInfo.Pods() + newPods := sanitizedNodeInfo.Pods() + if len(oldPods) != len(newPods) { + return fmt.Errorf("want %d pods in sanitized NodeInfo, got %d", len(oldPods), len(newPods)) + } + for i, newPod := range newPods { + oldPod := oldPods[i] + + if newPod.Name == oldPod.Name || !strings.HasSuffix(newPod.Name, nameSuffix) { + return fmt.Errorf("sanitized Pod name unexpected: want (different than %q, ending in %q), got %q", oldPod.Name, nameSuffix, newPod.Name) + } + if gotUid, oldUid := newPod.UID, oldPod.UID; gotUid == "" || gotUid == oldUid { + return fmt.Errorf("sanitized Pod UID wasn't randomized - got %q, old UID was %q", gotUid, oldUid) + } + if gotNodeName := newPod.Spec.NodeName; gotNodeName != wantNodeName { + return fmt.Errorf("want sanitized Pod.Spec.NodeName %q, got %q", wantNodeName, gotNodeName) + } + if diff := cmp.Diff(oldPod, newPod, + cmpopts.IgnoreFields(metav1.ObjectMeta{}, "Name", "UID"), + cmpopts.IgnoreFields(apiv1.PodSpec{}, "NodeName"), + ); diff != "" { + return fmt.Errorf("sanitized Pod unexpected diff (-want +got): %s", diff) + } + } + return nil +} + +func buildDSPod(ds *appsv1.DaemonSet, nodeName string) *apiv1.Pod { + pod := daemon.NewPod(ds, nodeName) + pod.Name = fmt.Sprintf("%s-pod-%d", ds.Name, rand.Int63()) + ptrVal := true + pod.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ + {Kind: "DaemonSet", UID: ds.UID, Name: ds.Name, Controller: &ptrVal}, + } + return pod +} + +func setDeletionTimestamp(pod *apiv1.Pod) *apiv1.Pod { + now := metav1.NewTime(time.Now()) + pod.DeletionTimestamp = &now + return pod +} + +type fakeNodeGroup struct { + id string + templateNodeInfoResult *framework.NodeInfo + templateNodeInfoErr error +} + +func (f *fakeNodeGroup) Id() string { + return f.id +} + +func (f *fakeNodeGroup) TemplateNodeInfo() (*framework.NodeInfo, error) { + return f.templateNodeInfoResult, f.templateNodeInfoErr +} diff --git a/cluster-autoscaler/simulator/nodes.go b/cluster-autoscaler/simulator/nodes.go deleted file mode 100644 index c80fe0cbe092..000000000000 --- a/cluster-autoscaler/simulator/nodes.go +++ /dev/null @@ -1,71 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package simulator - -import ( - appsv1 "k8s.io/api/apps/v1" - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" - "k8s.io/autoscaler/cluster-autoscaler/utils/daemonset" - "k8s.io/autoscaler/cluster-autoscaler/utils/errors" - - pod_util "k8s.io/autoscaler/cluster-autoscaler/utils/pod" -) - -// BuildNodeInfoForNode build a NodeInfo structure for the given node as if the node was just created. -func BuildNodeInfoForNode(node *apiv1.Node, scheduledPods []*apiv1.Pod, daemonsets []*appsv1.DaemonSet, forceDaemonSets bool) (*framework.NodeInfo, errors.AutoscalerError) { - nodeInfo := framework.NewNodeInfo(node, nil) - return addExpectedPods(nodeInfo, scheduledPods, daemonsets, forceDaemonSets) -} - -func addExpectedPods(nodeInfo *framework.NodeInfo, scheduledPods []*apiv1.Pod, daemonsets []*appsv1.DaemonSet, forceDaemonSets bool) (*framework.NodeInfo, errors.AutoscalerError) { - runningDS := make(map[types.UID]bool) - for _, pod := range scheduledPods { - // Ignore scheduled pods in deletion phase - if pod.DeletionTimestamp != nil { - continue - } - // Add scheduled mirror and DS pods - if pod_util.IsMirrorPod(pod) || pod_util.IsDaemonSetPod(pod) { - nodeInfo.AddPod(&framework.PodInfo{Pod: pod}) - } - // Mark DS pods as running - controllerRef := metav1.GetControllerOf(pod) - if controllerRef != nil && controllerRef.Kind == "DaemonSet" { - runningDS[controllerRef.UID] = true - } - } - // Add all pending DS pods if force scheduling DS - if forceDaemonSets { - var pendingDS []*appsv1.DaemonSet - for _, ds := range daemonsets { - if !runningDS[ds.UID] { - pendingDS = append(pendingDS, ds) - } - } - daemonPods, err := daemonset.GetDaemonSetPodsForNode(nodeInfo, pendingDS) - if err != nil { - return nil, errors.ToAutoscalerError(errors.InternalError, err) - } - for _, pod := range daemonPods { - nodeInfo.AddPod(pod) - } - } - return nodeInfo, nil -} diff --git a/cluster-autoscaler/simulator/nodes_test.go b/cluster-autoscaler/simulator/nodes_test.go deleted file mode 100644 index b931979de6cb..000000000000 --- a/cluster-autoscaler/simulator/nodes_test.go +++ /dev/null @@ -1,239 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package simulator - -import ( - "strings" - "testing" - "time" - - "github.com/stretchr/testify/assert" - appsv1 "k8s.io/api/apps/v1" - apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/autoscaler/cluster-autoscaler/utils/test" - "k8s.io/kubernetes/pkg/controller/daemon" -) - -func TestBuildNodeInfoForNode(t *testing.T) { - ds1 := &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "ds1", - Namespace: "ds1-namespace", - UID: types.UID("ds1"), - }, - } - - ds2 := &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "ds2", - Namespace: "ds2-namespace", - UID: types.UID("ds2"), - }, - } - - ds3 := &appsv1.DaemonSet{ - ObjectMeta: metav1.ObjectMeta{ - Name: "ds3", - Namespace: "ds3-namespace", - UID: types.UID("ds3"), - }, - Spec: appsv1.DaemonSetSpec{ - Template: apiv1.PodTemplateSpec{ - Spec: apiv1.PodSpec{ - NodeSelector: map[string]string{"key": "value"}, - }, - }, - }, - } - - testCases := []struct { - name string - node *apiv1.Node - pods []*apiv1.Pod - daemonSets []*appsv1.DaemonSet - forceDS bool - - wantPods []*apiv1.Pod - wantError bool - }{ - { - name: "node without any pods", - node: test.BuildTestNode("n", 1000, 10), - }, - { - name: "node with non-DS/mirror pods", - node: test.BuildTestNode("n", 1000, 10), - pods: []*apiv1.Pod{ - test.BuildScheduledTestPod("p1", 100, 1, "n"), - test.BuildScheduledTestPod("p2", 100, 1, "n"), - }, - }, - { - name: "node with a mirror pod", - node: test.BuildTestNode("n", 1000, 10), - pods: []*apiv1.Pod{ - test.SetMirrorPodSpec(test.BuildScheduledTestPod("p1", 100, 1, "n")), - }, - wantPods: []*apiv1.Pod{ - test.SetMirrorPodSpec(test.BuildScheduledTestPod("p1", 100, 1, "n")), - }, - }, - { - name: "node with a deleted mirror pod", - node: test.BuildTestNode("n", 1000, 10), - pods: []*apiv1.Pod{ - test.SetMirrorPodSpec(test.BuildScheduledTestPod("p1", 100, 1, "n")), - setDeletionTimestamp(test.SetMirrorPodSpec(test.BuildScheduledTestPod("p2", 100, 1, "n"))), - }, - wantPods: []*apiv1.Pod{ - test.SetMirrorPodSpec(test.BuildScheduledTestPod("p1", 100, 1, "n")), - }, - }, - { - name: "node with DS pods [forceDS=false, no daemon sets]", - node: test.BuildTestNode("n", 1000, 10), - pods: []*apiv1.Pod{ - buildDSPod(ds1, "n"), - setDeletionTimestamp(buildDSPod(ds2, "n")), - }, - wantPods: []*apiv1.Pod{ - buildDSPod(ds1, "n"), - }, - }, - { - name: "node with DS pods [forceDS=false, some daemon sets]", - node: test.BuildTestNode("n", 1000, 10), - pods: []*apiv1.Pod{ - buildDSPod(ds1, "n"), - setDeletionTimestamp(buildDSPod(ds2, "n")), - }, - daemonSets: []*appsv1.DaemonSet{ds1, ds2, ds3}, - wantPods: []*apiv1.Pod{ - buildDSPod(ds1, "n"), - }, - }, - { - name: "node with a DS pod [forceDS=true, no daemon sets]", - node: test.BuildTestNode("n", 1000, 10), - pods: []*apiv1.Pod{ - buildDSPod(ds1, "n"), - setDeletionTimestamp(buildDSPod(ds2, "n")), - }, - wantPods: []*apiv1.Pod{ - buildDSPod(ds1, "n"), - }, - forceDS: true, - }, - { - name: "node with a DS pod [forceDS=true, some daemon sets]", - node: test.BuildTestNode("n", 1000, 10), - pods: []*apiv1.Pod{ - buildDSPod(ds1, "n"), - setDeletionTimestamp(buildDSPod(ds2, "n")), - }, - daemonSets: []*appsv1.DaemonSet{ds1, ds2, ds3}, - forceDS: true, - wantPods: []*apiv1.Pod{ - buildDSPod(ds1, "n"), - buildDSPod(ds2, "n"), - }, - }, - { - name: "everything together [forceDS=false]", - node: test.BuildTestNode("n", 1000, 10), - pods: []*apiv1.Pod{ - test.BuildScheduledTestPod("p1", 100, 1, "n"), - test.BuildScheduledTestPod("p2", 100, 1, "n"), - test.SetMirrorPodSpec(test.BuildScheduledTestPod("p3", 100, 1, "n")), - setDeletionTimestamp(test.SetMirrorPodSpec(test.BuildScheduledTestPod("p4", 100, 1, "n"))), - buildDSPod(ds1, "n"), - setDeletionTimestamp(buildDSPod(ds2, "n")), - }, - daemonSets: []*appsv1.DaemonSet{ds1, ds2, ds3}, - wantPods: []*apiv1.Pod{ - test.SetMirrorPodSpec(test.BuildScheduledTestPod("p3", 100, 1, "n")), - buildDSPod(ds1, "n"), - }, - }, - { - name: "everything together [forceDS=true]", - node: test.BuildTestNode("n", 1000, 10), - pods: []*apiv1.Pod{ - test.BuildScheduledTestPod("p1", 100, 1, "n"), - test.BuildScheduledTestPod("p2", 100, 1, "n"), - test.SetMirrorPodSpec(test.BuildScheduledTestPod("p3", 100, 1, "n")), - setDeletionTimestamp(test.SetMirrorPodSpec(test.BuildScheduledTestPod("p4", 100, 1, "n"))), - buildDSPod(ds1, "n"), - setDeletionTimestamp(buildDSPod(ds2, "n")), - }, - daemonSets: []*appsv1.DaemonSet{ds1, ds2, ds3}, - forceDS: true, - wantPods: []*apiv1.Pod{ - test.SetMirrorPodSpec(test.BuildScheduledTestPod("p3", 100, 1, "n")), - buildDSPod(ds1, "n"), - buildDSPod(ds2, "n"), - }, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - nodeInfo, err := BuildNodeInfoForNode(tc.node, tc.pods, tc.daemonSets, tc.forceDS) - - if tc.wantError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, nodeInfo.Node(), tc.node) - - // clean pod metadata for comparison purposes - var wantPods, pods []*apiv1.Pod - for _, pod := range tc.wantPods { - wantPods = append(wantPods, cleanPodMetadata(pod)) - } - for _, podInfo := range nodeInfo.Pods() { - pods = append(pods, cleanPodMetadata(podInfo.Pod)) - } - assert.ElementsMatch(t, tc.wantPods, pods) - } - }) - } -} - -func cleanPodMetadata(pod *apiv1.Pod) *apiv1.Pod { - pod.Name = strings.Split(pod.Name, "-")[0] - pod.OwnerReferences = nil - return pod -} - -func buildDSPod(ds *appsv1.DaemonSet, nodeName string) *apiv1.Pod { - pod := daemon.NewPod(ds, nodeName) - pod.Name = ds.Name - ptrVal := true - pod.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ - {Kind: "DaemonSet", UID: ds.UID, Controller: &ptrVal}, - } - return pod -} - -func setDeletionTimestamp(pod *apiv1.Pod) *apiv1.Pod { - now := metav1.NewTime(time.Now()) - pod.DeletionTimestamp = &now - return pod -} diff --git a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go b/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go index 44b7ebf60d00..d5423777f711 100644 --- a/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go +++ b/cluster-autoscaler/simulator/predicatechecker/schedulerbased_test.go @@ -24,6 +24,7 @@ import ( testconfig "k8s.io/autoscaler/cluster-autoscaler/config/test" "k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot" + "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" scheduler "k8s.io/autoscaler/cluster-autoscaler/utils/scheduler" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics" @@ -147,7 +148,7 @@ func TestCheckPredicate(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err = clusterSnapshot.AddNodeWithPods(tt.node, tt.scheduledPods) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tt.node, tt.scheduledPods...)) assert.NoError(t, err) predicateError := tt.predicateChecker.CheckPredicates(clusterSnapshot, tt.testPod, tt.node.Name) @@ -247,9 +248,9 @@ func TestFitsAnyNode(t *testing.T) { } clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err = clusterSnapshot.AddNode(n1000) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n1000)) assert.NoError(t, err) - err = clusterSnapshot.AddNode(n2000) + err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(n2000)) assert.NoError(t, err) for _, tc := range testCases { @@ -285,7 +286,7 @@ func TestDebugInfo(t *testing.T) { clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot() - err := clusterSnapshot.AddNode(node1) + err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node1)) assert.NoError(t, err) // with default predicate checker diff --git a/cluster-autoscaler/simulator/scheduling/hinting_simulator.go b/cluster-autoscaler/simulator/scheduling/hinting_simulator.go index 2287d28810e4..2f24bb8bf4ba 100644 --- a/cluster-autoscaler/simulator/scheduling/hinting_simulator.go +++ b/cluster-autoscaler/simulator/scheduling/hinting_simulator.go @@ -73,7 +73,7 @@ func (s *HintingSimulator) TrySchedulePods(clusterSnapshot clustersnapshot.Clust if nodeName != "" { klogx.V(4).UpTo(loggingQuota).Infof("Pod %s/%s can be moved to %s", pod.Namespace, pod.Name, nodeName) - if err := clusterSnapshot.AddPod(pod, nodeName); err != nil { + if err := clusterSnapshot.ForceAddPod(pod, nodeName); err != nil { return nil, 0, fmt.Errorf("simulating scheduling of %s/%s to %s return error; %v", pod.Namespace, pod.Name, nodeName, err) } statuses = append(statuses, Status{Pod: pod, NodeName: nodeName}) diff --git a/cluster-autoscaler/utils/daemonset/daemonset.go b/cluster-autoscaler/utils/daemonset/daemonset.go index 06236ae2443c..dbeab83ad96b 100644 --- a/cluster-autoscaler/utils/daemonset/daemonset.go +++ b/cluster-autoscaler/utils/daemonset/daemonset.go @@ -22,6 +22,7 @@ import ( appsv1 "k8s.io/api/apps/v1" apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" "k8s.io/kubernetes/pkg/controller/daemon" ) @@ -40,6 +41,10 @@ func GetDaemonSetPodsForNode(nodeInfo *framework.NodeInfo, daemonsets []*appsv1. if shouldRun { pod := daemon.NewPod(ds, nodeInfo.Node().Name) pod.Name = fmt.Sprintf("%s-pod-%d", ds.Name, rand.Int63()) + ptrVal := true + pod.ObjectMeta.OwnerReferences = []metav1.OwnerReference{ + {Kind: "DaemonSet", UID: ds.UID, Name: ds.Name, Controller: &ptrVal}, + } result = append(result, &framework.PodInfo{Pod: pod}) } } diff --git a/cluster-autoscaler/utils/scheduler/scheduler.go b/cluster-autoscaler/utils/scheduler/scheduler.go index 04a6e99e7af7..c63da3cbf437 100644 --- a/cluster-autoscaler/utils/scheduler/scheduler.go +++ b/cluster-autoscaler/utils/scheduler/scheduler.go @@ -23,7 +23,6 @@ import ( apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/autoscaler/cluster-autoscaler/simulator/framework" scheduler_config "k8s.io/kubernetes/pkg/scheduler/apis/config" scheduler_scheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme" @@ -79,27 +78,6 @@ func isHugePageResourceName(name apiv1.ResourceName) bool { return strings.HasPrefix(string(name), apiv1.ResourceHugePagesPrefix) } -// DeepCopyTemplateNode copies NodeInfo object used as a template. It changes -// names of UIDs of both node and pods running on it, so that copies can be used -// to represent multiple nodes. -func DeepCopyTemplateNode(nodeTemplate *framework.NodeInfo, suffix string) *framework.NodeInfo { - node := nodeTemplate.Node().DeepCopy() - node.Name = fmt.Sprintf("%s-%s", node.Name, suffix) - node.UID = uuid.NewUUID() - if node.Labels == nil { - node.Labels = make(map[string]string) - } - node.Labels["kubernetes.io/hostname"] = node.Name - nodeInfo := framework.NewNodeInfo(node, nil) - for _, podInfo := range nodeTemplate.Pods() { - pod := podInfo.Pod.DeepCopy() - pod.Name = fmt.Sprintf("%s-%s", podInfo.Pod.Name, suffix) - pod.UID = uuid.NewUUID() - nodeInfo.AddPod(&framework.PodInfo{Pod: pod}) - } - return nodeInfo -} - // ResourceToResourceList returns a resource list of the resource. func ResourceToResourceList(r *schedulerframework.Resource) apiv1.ResourceList { result := apiv1.ResourceList{