Skip to content

Commit

Permalink
DRA: remove AddNode from ClusterSnapshot
Browse files Browse the repository at this point in the history
AddNodeInfo already provides the same functionality, and has to be used
in production code in order to propagate DRA objects correctly.

Uses in production are replaced with SetClusterState(), which will later
take DRA objects into account. Uses in the test code are replaced with
AddNodeInfo().
  • Loading branch information
towca committed Nov 18, 2024
1 parent 0e9cb91 commit 7a71ee4
Show file tree
Hide file tree
Showing 12 changed files with 44 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,16 +184,12 @@ func TestFilterOutSchedulable(t *testing.T) {
allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...)

for node, pods := range tc.nodesWithPods {
err := clusterSnapshot.AddNode(node)
assert.NoError(t, err)

for _, pod := range pods {
pod.Spec.NodeName = node.Name
err = clusterSnapshot.AddPod(pod, node.Name)
assert.NoError(t, err)

allExpectedScheduledPods = append(allExpectedScheduledPods, pod)
}
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...))
assert.NoError(t, err)
}

clusterSnapshot.Fork()
Expand Down
19 changes: 3 additions & 16 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
}

func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
knownNodes := make(map[string]bool)
snapshot := clustersnapshot.NewBasicClusterSnapshot()
pods, err := a.ctx.AllPodLister().List()
if err != nil {
Expand All @@ -366,22 +365,10 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)

for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
return nil, err
}

knownNodes[node.Name] = true
}

for _, pod := range nonExpendableScheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
return nil, err
}
}
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods)
if err != nil {
return nil, err
}

return snapshot, nil
}

Expand Down
30 changes: 4 additions & 26 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner"
scaledownstatus "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
orchestrator "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
Expand All @@ -58,7 +58,7 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

const (
Expand Down Expand Up @@ -242,28 +242,6 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
a.initialized = true
}

func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError {
a.ClusterSnapshot.Clear()

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := a.ClusterSnapshot.AddNode(node); err != nil {
klog.Errorf("Failed to add node %s to cluster snapshot: %v", node.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := a.ClusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
klog.Errorf("Failed to add pod %s scheduled to node %s to cluster snapshot: %v", pod.Name, pod.Spec.NodeName, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
}
return nil
}

func (a *StaticAutoscaler) initializeRemainingPdbTracker() caerrors.AutoscalerError {
a.RemainingPdbTracker.Clear()

Expand Down Expand Up @@ -361,8 +339,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
}
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
// Initialize cluster state to ClusterSnapshot
if typedErr := a.initializeClusterSnapshot(allNodes, nonExpendableScheduledPods); typedErr != nil {
return typedErr.AddPrefix("failed to initialize ClusterSnapshot: ")
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil {
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
}
// Initialize Pod Disruption Budget tracking
if typedErr := a.initializeRemainingPdbTracker(); typedErr != nil {
Expand Down
6 changes: 4 additions & 2 deletions cluster-autoscaler/estimator/binpacking_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ func TestBinpackingEstimate(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
// Add one node in different zone to trigger topology spread constraints
clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
assert.NoError(t, err)

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(t, err)
Expand Down Expand Up @@ -268,7 +269,8 @@ func BenchmarkBinpackingEstimate(b *testing.B) {

for i := 0; i < b.N; i++ {
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
assert.NoError(b, err)

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(b, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -112,10 +113,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry())
clusterSnapshot := clustersnapshot.NewDeltaClusterSnapshot()
clusterSnapshot.AddNode(node)
for _, pod := range tc.scheduledPods {
clusterSnapshot.AddPod(pod, node.Name)
}
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
assert.NoError(t, err)
ctx := context.AutoscalingContext{
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
Expand Down
9 changes: 2 additions & 7 deletions cluster-autoscaler/simulator/clustersnapshot/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,26 +238,21 @@ func (snapshot *BasicClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
if err := snapshot.getInternalData().addNode(node); err != nil {
return err
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
if err := snapshot.getInternalData().addPod(pod, pod.Spec.NodeName); err != nil {
return err
}
}
}
return nil
}

// AddNode adds node to the snapshot.
func (snapshot *BasicClusterSnapshot) AddNode(node *apiv1.Node) error {
return snapshot.getInternalData().addNode(node)
}

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.getInternalData().removeNode(nodeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ type ClusterSnapshot interface {
// with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName.
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error

// AddNode adds node to the snapshot.
AddNode(node *apiv1.Node) error
// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
RemoveNode(nodeName string) error
// AddPod adds pod to the snapshot and schedules it to given node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/stretchr/testify/assert"

"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -68,21 +69,21 @@ func assignPodsToNodes(pods []*apiv1.Pod, nodes []*apiv1.Node) {
}
}

func BenchmarkAddNode(b *testing.B) {
func BenchmarkAddNodeInfo(b *testing.B) {
testCases := []int{1, 10, 100, 1000, 5000, 15000, 100000}

for snapshotName, snapshotFactory := range snapshots {
for _, tc := range testCases {
nodes := createTestNodes(tc)
clusterSnapshot := snapshotFactory()
b.ResetTimer()
b.Run(fmt.Sprintf("%s: AddNode() %d", snapshotName, tc), func(b *testing.B) {
b.Run(fmt.Sprintf("%s: AddNodeInfo() %d", snapshotName, tc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
clusterSnapshot.Clear()
b.StartTimer()
for _, node := range nodes {
err := clusterSnapshot.AddNode(node)
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
if err != nil {
assert.NoError(b, err)
}
Expand Down Expand Up @@ -172,12 +173,12 @@ func BenchmarkForkAddRevert(b *testing.B) {
b.Run(fmt.Sprintf("%s: ForkAddRevert (%d nodes, %d pods)", snapshotName, ntc, ptc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
clusterSnapshot.Fork()
err = clusterSnapshot.AddNode(tmpNode1)
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode1))
if err != nil {
assert.NoError(b, err)
}
clusterSnapshot.Fork()
err = clusterSnapshot.AddNode(tmpNode2)
err = clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(tmpNode2))
if err != nil {
assert.NoError(b, err)
}
Expand Down Expand Up @@ -217,7 +218,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
}
snapshot.Fork()
for _, node := range nodes[tc.nodeCount:] {
if err := snapshot.AddNode(node); err != nil {
if err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node)); err != nil {
assert.NoError(b, err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,9 @@ func validTestCases(t *testing.T) []modificationTestCase {

testCases := []modificationTestCase{
{
name: "add node",
name: "add empty nodeInfo",
op: func(snapshot ClusterSnapshot) {
err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)
},
modifiedState: snapshotState{
Expand Down Expand Up @@ -133,7 +133,7 @@ func validTestCases(t *testing.T) []modificationTestCase {
err := snapshot.RemoveNode(node.Name)
assert.NoError(t, err)

err = snapshot.AddNode(node)
err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)
},
modifiedState: snapshotState{
Expand Down Expand Up @@ -199,7 +199,7 @@ func TestForking(t *testing.T) {
tc.op(snapshot)
snapshot.Fork()

snapshot.AddNode(node)
snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))

snapshot.Revert()
snapshot.Revert()
Expand Down Expand Up @@ -243,7 +243,7 @@ func TestForking(t *testing.T) {
snapshot.Fork()
tc.op(snapshot)
snapshot.Fork()
snapshot.AddNode(node)
snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
snapshot.Revert()
err := snapshot.Commit()
assert.NoError(t, err)
Expand Down Expand Up @@ -321,7 +321,7 @@ func TestClear(t *testing.T) {
snapshot.Fork()

for _, node := range extraNodes {
err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)
}

Expand Down Expand Up @@ -379,7 +379,7 @@ func TestNode404(t *testing.T) {
snapshot := snapshotFactory()

node := BuildTestNode("node", 10, 100)
err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

snapshot.Fork()
Expand All @@ -405,7 +405,7 @@ func TestNode404(t *testing.T) {
snapshot := snapshotFactory()

node := BuildTestNode("node", 10, 100)
err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

err = snapshot.RemoveNode("node")
Expand All @@ -428,9 +428,6 @@ func TestNodeAlreadyExists(t *testing.T) {
name string
op func(ClusterSnapshot) error
}{
{"add node", func(snapshot ClusterSnapshot) error {
return snapshot.AddNode(node)
}},
{"add nodeInfo", func(snapshot ClusterSnapshot) error {
return snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pod))
}},
Expand All @@ -442,7 +439,7 @@ func TestNodeAlreadyExists(t *testing.T) {
func(t *testing.T) {
snapshot := snapshotFactory()

err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

// Node already in base.
Expand All @@ -454,7 +451,7 @@ func TestNodeAlreadyExists(t *testing.T) {
func(t *testing.T) {
snapshot := snapshotFactory()

err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

snapshot.Fork()
Expand All @@ -471,7 +468,7 @@ func TestNodeAlreadyExists(t *testing.T) {

snapshot.Fork()

err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

// Node already in fork.
Expand All @@ -484,7 +481,7 @@ func TestNodeAlreadyExists(t *testing.T) {

snapshot.Fork()

err := snapshot.AddNode(node)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

err = snapshot.Commit()
Expand Down
9 changes: 2 additions & 7 deletions cluster-autoscaler/simulator/clustersnapshot/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,26 +427,21 @@ func (snapshot *DeltaClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
if err := snapshot.data.addNode(node); err != nil {
return err
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil {
return err
}
}
}
return nil
}

// AddNode adds node to the snapshot.
func (snapshot *DeltaClusterSnapshot) AddNode(node *apiv1.Node) error {
return snapshot.data.addNode(node)
}

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.data.removeNode(nodeName)
Expand Down
Loading

0 comments on commit 7a71ee4

Please sign in to comment.