Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CA: refactor ClusterSnapshot methods #7466

Merged
merged 6 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

type filterOutExpendable struct {
Expand Down Expand Up @@ -56,7 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
Expand Down Expand Up @@ -109,7 +110,8 @@ func TestFilterOutExpendable(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpendablePodListProcessor()
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot.AddNodes(tc.nodes)
err := snapshot.SetClusterState(tc.nodes, nil)
assert.NoError(t, err)

pods, err := processor.Process(&context.AutoscalingContext{
ClusterSnapshot: snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
Expand Down Expand Up @@ -183,16 +184,12 @@ func TestFilterOutSchedulable(t *testing.T) {
allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...)

for node, pods := range tc.nodesWithPods {
err := clusterSnapshot.AddNode(node)
assert.NoError(t, err)

for _, pod := range pods {
pod.Spec.NodeName = node.Name
err = clusterSnapshot.AddPod(pod, node.Name)
assert.NoError(t, err)

allExpectedScheduledPods = append(allExpectedScheduledPods, pod)
}
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...))
assert.NoError(t, err)
}

clusterSnapshot.Fork()
Expand Down Expand Up @@ -286,15 +283,10 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
assert.NoError(b, err)

clusterSnapshot := snapshotFactory()
if err := clusterSnapshot.AddNodes(nodes); err != nil {
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil {
assert.NoError(b, err)
}

for _, pod := range scheduledPods {
if err := clusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
assert.NoError(b, err)
}
}
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand Down
19 changes: 3 additions & 16 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
}

func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
knownNodes := make(map[string]bool)
snapshot := clustersnapshot.NewBasicClusterSnapshot()
pods, err := a.ctx.AllPodLister().List()
if err != nil {
Expand All @@ -366,22 +365,10 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)

for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
return nil, err
}

knownNodes[node.Name] = true
}

for _, pod := range nonExpendableScheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
return nil, err
}
}
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods)
if err != nil {
return nil, err
}

return snapshot, nil
}

Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
Expand Down Expand Up @@ -1159,7 +1160,7 @@ func TestStartDeletion(t *testing.T) {
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
for _, bucket := range emptyNodeGroupViews {
for _, node := range bucket.Nodes {
err := ctx.ClusterSnapshot.AddNodeWithPods(node, tc.pods[node.Name])
err := ctx.ClusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods[node.Name]...))
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
}
Expand All @@ -1171,7 +1172,7 @@ func TestStartDeletion(t *testing.T) {
if !found {
t.Fatalf("Drain node %q doesn't have pods defined in the test case.", node.Name)
}
err := ctx.ClusterSnapshot.AddNodeWithPods(node, pods)
err := ctx.ClusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...))
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
}
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/scaledown/actuation/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -612,7 +613,7 @@ func TestPodsToEvict(t *testing.T) {
t.Run(tn, func(t *testing.T) {
snapshot := clustersnapshot.NewBasicClusterSnapshot()
node := BuildTestNode("test-node", 1000, 1000)
err := snapshot.AddNodeWithPods(node, tc.pods)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods...))
if err != nil {
t.Errorf("AddNodeWithPods unexpected error: %v", err)
}
Expand Down
6 changes: 1 addition & 5 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,11 +569,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups(
defer o.autoscalingContext.ClusterSnapshot.Revert()

// Add test node to snapshot.
var allPods []*apiv1.Pod
for _, podInfo := range nodeInfo.Pods() {
allPods = append(allPods, podInfo.Pod)
}
if err := o.autoscalingContext.ClusterSnapshot.AddNodeWithPods(nodeInfo.Node(), allPods); err != nil {
if err := o.autoscalingContext.ClusterSnapshot.AddNodeInfo(nodeInfo); err != nil {
klog.Errorf("Error while adding test Node: %v", err)
return []estimator.PodEquivalenceGroup{}
}
Expand Down
44 changes: 9 additions & 35 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner"
scaledownstatus "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
orchestrator "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
Expand All @@ -58,7 +58,7 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

const (
Expand Down Expand Up @@ -242,28 +242,6 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
a.initialized = true
}

func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError {
a.ClusterSnapshot.Clear()

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := a.ClusterSnapshot.AddNode(node); err != nil {
klog.Errorf("Failed to add node %s to cluster snapshot: %v", node.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := a.ClusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
klog.Errorf("Failed to add pod %s scheduled to node %s to cluster snapshot: %v", pod.Name, pod.Spec.NodeName, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
}
return nil
}

func (a *StaticAutoscaler) initializeRemainingPdbTracker() caerrors.AutoscalerError {
a.RemainingPdbTracker.Clear()

Expand Down Expand Up @@ -361,8 +339,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
}
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
// Initialize cluster state to ClusterSnapshot
if typedErr := a.initializeClusterSnapshot(allNodes, nonExpendableScheduledPods); typedErr != nil {
return typedErr.AddPrefix("failed to initialize ClusterSnapshot: ")
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil {
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
}
// Initialize Pod Disruption Budget tracking
if typedErr := a.initializeRemainingPdbTracker(); typedErr != nil {
Expand Down Expand Up @@ -486,7 +464,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming)
// Remove the nodes from the snapshot as well so that the state is consistent.
for _, notStartedNodeName := range allRegisteredUpcoming {
err := a.ClusterSnapshot.RemoveNode(notStartedNodeName)
err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName)
if err != nil {
klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
// ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the
Expand Down Expand Up @@ -682,20 +660,16 @@ func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[
nodeGroups := a.nodeGroupsById()
upcomingNodeGroups := make(map[string]int)
upcomingNodesFromUpcomingNodeGroups := 0
for nodeGroupName, upcomingNodes := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
for nodeGroupName, upcomingNodeInfos := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
nodeGroup := nodeGroups[nodeGroupName]
if nodeGroup == nil {
return fmt.Errorf("failed to find node group: %s", nodeGroupName)
}
isUpcomingNodeGroup := a.processors.AsyncNodeGroupStateChecker.IsUpcoming(nodeGroup)
for _, upcomingNode := range upcomingNodes {
var pods []*apiv1.Pod
for _, podInfo := range upcomingNode.Pods() {
pods = append(pods, podInfo.Pod)
}
err := a.ClusterSnapshot.AddNodeWithPods(upcomingNode.Node(), pods)
for _, upcomingNodeInfo := range upcomingNodeInfos {
err := a.ClusterSnapshot.AddNodeInfo(upcomingNodeInfo)
if err != nil {
return fmt.Errorf("Failed to add upcoming node %s to cluster snapshot: %w", upcomingNode.Node().Name, err)
return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNodeInfo.Node().Name, err)
}
if isUpcomingNodeGroup {
upcomingNodesFromUpcomingNodeGroups++
Expand Down
10 changes: 3 additions & 7 deletions cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

// BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods.
Expand Down Expand Up @@ -211,11 +211,7 @@ func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(
template *framework.NodeInfo,
) error {
newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", estimationState.newNodeNameIndex))
var pods []*apiv1.Pod
for _, podInfo := range newNodeInfo.Pods() {
pods = append(pods, podInfo.Pod)
}
if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil {
if err := e.clusterSnapshot.AddNodeInfo(newNodeInfo); err != nil {
return err
}
estimationState.newNodeNameIndex++
Expand All @@ -229,7 +225,7 @@ func (e *BinpackingNodeEstimator) tryToAddNode(
pod *apiv1.Pod,
nodeName string,
) error {
if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil {
if err := e.clusterSnapshot.ForceAddPod(pod, nodeName); err != nil {
return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err)
}
estimationState.newNodesWithPods[nodeName] = true
Expand Down
6 changes: 4 additions & 2 deletions cluster-autoscaler/estimator/binpacking_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ func TestBinpackingEstimate(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
// Add one node in different zone to trigger topology spread constraints
clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
assert.NoError(t, err)

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(t, err)
Expand Down Expand Up @@ -268,7 +269,8 @@ func BenchmarkBinpackingEstimate(b *testing.B) {

for i := 0; i < b.N; i++ {
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
assert.NoError(b, err)

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(b, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -112,10 +113,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry())
clusterSnapshot := clustersnapshot.NewDeltaClusterSnapshot()
clusterSnapshot.AddNode(node)
for _, pod := range tc.scheduledPods {
clusterSnapshot.AddPod(pod, node.Name)
}
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
assert.NoError(t, err)
ctx := context.AutoscalingContext{
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/simulator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"

klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

// NodeToBeRemoved contain information about a node that can be removed.
Expand Down Expand Up @@ -223,7 +223,7 @@ func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, n

// remove pods from clusterSnapshot first
for _, pod := range pods {
if err := r.clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
if err := r.clusterSnapshot.ForceRemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
// just log error
klog.Errorf("Simulating removal of %s/%s return error; %v", pod.Namespace, pod.Name, err)
}
Expand Down
Loading
Loading