Skip to content

Commit

Permalink
Merge pull request kubernetes#7466 from towca/jtuznik/dra-snapshot-cl…
Browse files Browse the repository at this point in the history
…eanup

CA: refactor ClusterSnapshot methods
  • Loading branch information
k8s-ci-robot authored Nov 21, 2024
2 parents 4c37ff3 + 473a1a8 commit 30e57c9
Show file tree
Hide file tree
Showing 20 changed files with 199 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

type filterOutExpendable struct {
Expand Down Expand Up @@ -56,7 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
Expand Down Expand Up @@ -109,7 +110,8 @@ func TestFilterOutExpendable(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpendablePodListProcessor()
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot.AddNodes(tc.nodes)
err := snapshot.SetClusterState(tc.nodes, nil)
assert.NoError(t, err)

pods, err := processor.Process(&context.AutoscalingContext{
ClusterSnapshot: snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
Expand Down Expand Up @@ -183,16 +184,12 @@ func TestFilterOutSchedulable(t *testing.T) {
allExpectedScheduledPods = append(allExpectedScheduledPods, tc.expectedScheduledPods...)

for node, pods := range tc.nodesWithPods {
err := clusterSnapshot.AddNode(node)
assert.NoError(t, err)

for _, pod := range pods {
pod.Spec.NodeName = node.Name
err = clusterSnapshot.AddPod(pod, node.Name)
assert.NoError(t, err)

allExpectedScheduledPods = append(allExpectedScheduledPods, pod)
}
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...))
assert.NoError(t, err)
}

clusterSnapshot.Fork()
Expand Down Expand Up @@ -286,15 +283,10 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
assert.NoError(b, err)

clusterSnapshot := snapshotFactory()
if err := clusterSnapshot.AddNodes(nodes); err != nil {
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil {
assert.NoError(b, err)
}

for _, pod := range scheduledPods {
if err := clusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
assert.NoError(b, err)
}
}
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand Down
19 changes: 3 additions & 16 deletions cluster-autoscaler/core/scaledown/actuation/actuator.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,6 @@ func (a *Actuator) taintNode(node *apiv1.Node) error {
}

func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterSnapshot, error) {
knownNodes := make(map[string]bool)
snapshot := clustersnapshot.NewBasicClusterSnapshot()
pods, err := a.ctx.AllPodLister().List()
if err != nil {
Expand All @@ -366,22 +365,10 @@ func (a *Actuator) createSnapshot(nodes []*apiv1.Node) (clustersnapshot.ClusterS
scheduledPods := kube_util.ScheduledPods(pods)
nonExpendableScheduledPods := utils.FilterOutExpendablePods(scheduledPods, a.ctx.ExpendablePodsPriorityCutoff)

for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
return nil, err
}

knownNodes[node.Name] = true
}

for _, pod := range nonExpendableScheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
return nil, err
}
}
err = snapshot.SetClusterState(nodes, nonExpendableScheduledPods)
if err != nil {
return nil, err
}

return snapshot, nil
}

Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/core/scaledown/actuation/actuator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/observers/nodegroupchange"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig"
"k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups/asyncnodegroups"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/utilization"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/taints"
Expand Down Expand Up @@ -1159,7 +1160,7 @@ func TestStartDeletion(t *testing.T) {
csr := clusterstate.NewClusterStateRegistry(provider, clusterstate.ClusterStateRegistryConfig{}, ctx.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(config.NodeGroupAutoscalingOptions{MaxNodeProvisionTime: 15 * time.Minute}), asyncnodegroups.NewDefaultAsyncNodeGroupStateChecker())
for _, bucket := range emptyNodeGroupViews {
for _, node := range bucket.Nodes {
err := ctx.ClusterSnapshot.AddNodeWithPods(node, tc.pods[node.Name])
err := ctx.ClusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods[node.Name]...))
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
}
Expand All @@ -1171,7 +1172,7 @@ func TestStartDeletion(t *testing.T) {
if !found {
t.Fatalf("Drain node %q doesn't have pods defined in the test case.", node.Name)
}
err := ctx.ClusterSnapshot.AddNodeWithPods(node, pods)
err := ctx.ClusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, pods...))
if err != nil {
t.Fatalf("Couldn't add node %q to snapshot: %v", node.Name, err)
}
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/scaledown/actuation/drain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
. "k8s.io/autoscaler/cluster-autoscaler/core/test"
"k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/daemonset"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -612,7 +613,7 @@ func TestPodsToEvict(t *testing.T) {
t.Run(tn, func(t *testing.T) {
snapshot := clustersnapshot.NewBasicClusterSnapshot()
node := BuildTestNode("test-node", 1000, 1000)
err := snapshot.AddNodeWithPods(node, tc.pods)
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.pods...))
if err != nil {
t.Errorf("AddNodeWithPods unexpected error: %v", err)
}
Expand Down
6 changes: 1 addition & 5 deletions cluster-autoscaler/core/scaleup/orchestrator/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,11 +569,7 @@ func (o *ScaleUpOrchestrator) SchedulablePodGroups(
defer o.autoscalingContext.ClusterSnapshot.Revert()

// Add test node to snapshot.
var allPods []*apiv1.Pod
for _, podInfo := range nodeInfo.Pods() {
allPods = append(allPods, podInfo.Pod)
}
if err := o.autoscalingContext.ClusterSnapshot.AddNodeWithPods(nodeInfo.Node(), allPods); err != nil {
if err := o.autoscalingContext.ClusterSnapshot.AddNodeInfo(nodeInfo); err != nil {
klog.Errorf("Error while adding test Node: %v", err)
return []estimator.PodEquivalenceGroup{}
}
Expand Down
44 changes: 9 additions & 35 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/core/scaledown/planner"
scaledownstatus "k8s.io/autoscaler/cluster-autoscaler/core/scaledown/status"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup"
orchestrator "k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
"k8s.io/autoscaler/cluster-autoscaler/core/scaleup/orchestrator"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
"k8s.io/autoscaler/cluster-autoscaler/debuggingsnapshot"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
Expand All @@ -58,7 +58,7 @@ import (

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

const (
Expand Down Expand Up @@ -242,28 +242,6 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
a.initialized = true
}

func (a *StaticAutoscaler) initializeClusterSnapshot(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) caerrors.AutoscalerError {
a.ClusterSnapshot.Clear()

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := a.ClusterSnapshot.AddNode(node); err != nil {
klog.Errorf("Failed to add node %s to cluster snapshot: %v", node.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := a.ClusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
klog.Errorf("Failed to add pod %s scheduled to node %s to cluster snapshot: %v", pod.Name, pod.Spec.NodeName, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
}
}
return nil
}

func (a *StaticAutoscaler) initializeRemainingPdbTracker() caerrors.AutoscalerError {
a.RemainingPdbTracker.Clear()

Expand Down Expand Up @@ -361,8 +339,8 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
}
nonExpendableScheduledPods := core_utils.FilterOutExpendablePods(originalScheduledPods, a.ExpendablePodsPriorityCutoff)
// Initialize cluster state to ClusterSnapshot
if typedErr := a.initializeClusterSnapshot(allNodes, nonExpendableScheduledPods); typedErr != nil {
return typedErr.AddPrefix("failed to initialize ClusterSnapshot: ")
if err := a.ClusterSnapshot.SetClusterState(allNodes, nonExpendableScheduledPods); err != nil {
return caerrors.ToAutoscalerError(caerrors.InternalError, err).AddPrefix("failed to initialize ClusterSnapshot: ")
}
// Initialize Pod Disruption Budget tracking
if typedErr := a.initializeRemainingPdbTracker(); typedErr != nil {
Expand Down Expand Up @@ -486,7 +464,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming)
// Remove the nodes from the snapshot as well so that the state is consistent.
for _, notStartedNodeName := range allRegisteredUpcoming {
err := a.ClusterSnapshot.RemoveNode(notStartedNodeName)
err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName)
if err != nil {
klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
// ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the
Expand Down Expand Up @@ -682,20 +660,16 @@ func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[
nodeGroups := a.nodeGroupsById()
upcomingNodeGroups := make(map[string]int)
upcomingNodesFromUpcomingNodeGroups := 0
for nodeGroupName, upcomingNodes := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
for nodeGroupName, upcomingNodeInfos := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
nodeGroup := nodeGroups[nodeGroupName]
if nodeGroup == nil {
return fmt.Errorf("failed to find node group: %s", nodeGroupName)
}
isUpcomingNodeGroup := a.processors.AsyncNodeGroupStateChecker.IsUpcoming(nodeGroup)
for _, upcomingNode := range upcomingNodes {
var pods []*apiv1.Pod
for _, podInfo := range upcomingNode.Pods() {
pods = append(pods, podInfo.Pod)
}
err := a.ClusterSnapshot.AddNodeWithPods(upcomingNode.Node(), pods)
for _, upcomingNodeInfo := range upcomingNodeInfos {
err := a.ClusterSnapshot.AddNodeInfo(upcomingNodeInfo)
if err != nil {
return fmt.Errorf("Failed to add upcoming node %s to cluster snapshot: %w", upcomingNode.Node().Name, err)
return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNodeInfo.Node().Name, err)
}
if isUpcomingNodeGroup {
upcomingNodesFromUpcomingNodeGroups++
Expand Down
10 changes: 3 additions & 7 deletions cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

// BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods.
Expand Down Expand Up @@ -211,11 +211,7 @@ func (e *BinpackingNodeEstimator) addNewNodeToSnapshot(
template *framework.NodeInfo,
) error {
newNodeInfo := scheduler.DeepCopyTemplateNode(template, fmt.Sprintf("e-%d", estimationState.newNodeNameIndex))
var pods []*apiv1.Pod
for _, podInfo := range newNodeInfo.Pods() {
pods = append(pods, podInfo.Pod)
}
if err := e.clusterSnapshot.AddNodeWithPods(newNodeInfo.Node(), pods); err != nil {
if err := e.clusterSnapshot.AddNodeInfo(newNodeInfo); err != nil {
return err
}
estimationState.newNodeNameIndex++
Expand All @@ -229,7 +225,7 @@ func (e *BinpackingNodeEstimator) tryToAddNode(
pod *apiv1.Pod,
nodeName string,
) error {
if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil {
if err := e.clusterSnapshot.ForceAddPod(pod, nodeName); err != nil {
return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err)
}
estimationState.newNodesWithPods[nodeName] = true
Expand Down
6 changes: 4 additions & 2 deletions cluster-autoscaler/estimator/binpacking_estimator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ func TestBinpackingEstimate(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
// Add one node in different zone to trigger topology spread constraints
clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
assert.NoError(t, err)

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(t, err)
Expand Down Expand Up @@ -268,7 +269,8 @@ func BenchmarkBinpackingEstimate(b *testing.B) {

for i := 0; i < b.N; i++ {
clusterSnapshot := clustersnapshot.NewBasicClusterSnapshot()
clusterSnapshot.AddNode(makeNode(100, 100, 10, "oldnode", "zone-jupiter"))
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(makeNode(100, 100, 10, "oldnode", "zone-jupiter")))
assert.NoError(b, err)

predicateChecker, err := predicatechecker.NewTestPredicateChecker()
assert.NoError(b, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
podinjectionbackoff "k8s.io/autoscaler/cluster-autoscaler/processors/podinjection/backoff"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
)
Expand Down Expand Up @@ -112,10 +113,8 @@ func TestTargetCountInjectionPodListProcessor(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
p := NewPodInjectionPodListProcessor(podinjectionbackoff.NewFakePodControllerRegistry())
clusterSnapshot := clustersnapshot.NewDeltaClusterSnapshot()
clusterSnapshot.AddNode(node)
for _, pod := range tc.scheduledPods {
clusterSnapshot.AddPod(pod, node.Name)
}
err := clusterSnapshot.AddNodeInfo(framework.NewTestNodeInfo(node, tc.scheduledPods...))
assert.NoError(t, err)
ctx := context.AutoscalingContext{
AutoscalingKubeClients: context.AutoscalingKubeClients{
ListerRegistry: kubernetes.NewListerRegistry(nil, nil, nil, nil, nil, nil, jobLister, replicaSetLister, statefulsetLister),
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/simulator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"

klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

// NodeToBeRemoved contain information about a node that can be removed.
Expand Down Expand Up @@ -223,7 +223,7 @@ func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, n

// remove pods from clusterSnapshot first
for _, pod := range pods {
if err := r.clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
if err := r.clusterSnapshot.ForceRemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
// just log error
klog.Errorf("Simulating removal of %s/%s return error; %v", pod.Namespace, pod.Name, err)
}
Expand Down
Loading

0 comments on commit 30e57c9

Please sign in to comment.