Skip to content

Commit

Permalink
DRA: rename ClusterSnapshot AddPod, RemovePod, RemoveNode
Browse files Browse the repository at this point in the history
RemoveNode is renamed to RemoveNodeInfo for consistency with other
NodeInfo methods.

For DRA, the snapshot will have to potentially allocate ResourceClaims
when adding a Pod to a Node, and deallocate them when removing a Pod
from a Node. This will happen in new methods added to ClusterSnapshot
in later commits - SchedulePod and UnschedulePod. These new methods
should be the "default" way of moving pods around the snapshot going
forward.

However, we'll still need to be able to add and remove pods from the
snapshot "forcefully" to handle some corner cases (e.g. expendable pods).
AddPod is renamed to ForceAddPod, and RemovePod to ForceRemovePod to
highlight that these are no longer the "default" methods of moving pods
around the snapshot, and are bypassing something important.
  • Loading branch information
towca committed Nov 18, 2024
1 parent 7a71ee4 commit e960edf
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/context"
core_utils "k8s.io/autoscaler/cluster-autoscaler/core/utils"
caerrors "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

type filterOutExpendable struct {
Expand Down Expand Up @@ -56,7 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
if err := ctx.ClusterSnapshot.ForceAddPod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
Expand Down
10 changes: 5 additions & 5 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming)
// Remove the nodes from the snapshot as well so that the state is consistent.
for _, notStartedNodeName := range allRegisteredUpcoming {
err := a.ClusterSnapshot.RemoveNode(notStartedNodeName)
err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName)
if err != nil {
klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
// ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the
Expand Down Expand Up @@ -660,16 +660,16 @@ func (a *StaticAutoscaler) addUpcomingNodesToClusterSnapshot(upcomingCounts map[
nodeGroups := a.nodeGroupsById()
upcomingNodeGroups := make(map[string]int)
upcomingNodesFromUpcomingNodeGroups := 0
for nodeGroupName, upcomingNodes := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
for nodeGroupName, upcomingNodeInfos := range getUpcomingNodeInfos(upcomingCounts, nodeInfosForGroups) {
nodeGroup := nodeGroups[nodeGroupName]
if nodeGroup == nil {
return fmt.Errorf("failed to find node group: %s", nodeGroupName)
}
isUpcomingNodeGroup := a.processors.AsyncNodeGroupStateChecker.IsUpcoming(nodeGroup)
for _, upcomingNode := range upcomingNodes {
err := a.ClusterSnapshot.AddNodeInfo(upcomingNode)
for _, upcomingNodeInfo := range upcomingNodeInfos {
err := a.ClusterSnapshot.AddNodeInfo(upcomingNodeInfo)
if err != nil {
return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNode.Node().Name, err)
return fmt.Errorf("failed to add upcoming node %s to cluster snapshot: %w", upcomingNodeInfo.Node().Name, err)
}
if isUpcomingNodeGroup {
upcomingNodesFromUpcomingNodeGroups++
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
"k8s.io/autoscaler/cluster-autoscaler/simulator/predicatechecker"
"k8s.io/autoscaler/cluster-autoscaler/utils/scheduler"
klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

// BinpackingNodeEstimator estimates the number of needed nodes to handle the given amount of pods.
Expand Down Expand Up @@ -225,7 +225,7 @@ func (e *BinpackingNodeEstimator) tryToAddNode(
pod *apiv1.Pod,
nodeName string,
) error {
if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil {
if err := e.clusterSnapshot.ForceAddPod(pod, nodeName); err != nil {
return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err)
}
estimationState.newNodesWithPods[nodeName] = true
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/simulator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
"k8s.io/autoscaler/cluster-autoscaler/utils/tpu"

klog "k8s.io/klog/v2"
"k8s.io/klog/v2"
)

// NodeToBeRemoved contain information about a node that can be removed.
Expand Down Expand Up @@ -223,7 +223,7 @@ func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, n

// remove pods from clusterSnapshot first
for _, pod := range pods {
if err := r.clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
if err := r.clusterSnapshot.ForceRemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
// just log error
klog.Errorf("Simulating removal of %s/%s return error; %v", pod.Namespace, pod.Name, err)
}
Expand Down
16 changes: 8 additions & 8 deletions cluster-autoscaler/simulator/clustersnapshot/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (data *internalBasicSnapshotData) addNode(node *apiv1.Node) error {
return nil
}

func (data *internalBasicSnapshotData) removeNode(nodeName string) error {
func (data *internalBasicSnapshotData) removeNodeInfo(nodeName string) error {
if _, found := data.nodeInfoMap[nodeName]; !found {
return ErrNodeNotFound
}
Expand Down Expand Up @@ -253,18 +253,18 @@ func (snapshot *BasicClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched
return nil
}

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.getInternalData().removeNode(nodeName)
// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *BasicClusterSnapshot) RemoveNodeInfo(nodeName string) error {
return snapshot.getInternalData().removeNodeInfo(nodeName)
}

// AddPod adds pod to the snapshot and schedules it to given node.
func (snapshot *BasicClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error {
// ForceAddPod adds pod to the snapshot and schedules it to given node.
func (snapshot *BasicClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error {
return snapshot.getInternalData().addPod(pod, nodeName)
}

// RemovePod removes pod from the snapshot.
func (snapshot *BasicClusterSnapshot) RemovePod(namespace, podName, nodeName string) error {
// ForceRemovePod removes pod from the snapshot.
func (snapshot *BasicClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error {
return snapshot.getInternalData().removePod(namespace, podName, nodeName)
}

Expand Down
13 changes: 7 additions & 6 deletions cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ type ClusterSnapshot interface {
// with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName.
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
RemoveNode(nodeName string) error
// AddPod adds pod to the snapshot and schedules it to given node.
AddPod(pod *apiv1.Pod, nodeName string) error
// RemovePod removes pod from the snapshot.
RemovePod(namespace string, podName string, nodeName string) error
// ForceAddPod adds the given Pod to the Node with the given nodeName inside the snapshot.
ForceAddPod(pod *apiv1.Pod, nodeName string) error
// ForceRemovePod removes the given Pod (and all DRA objects it owns) from the snapshot.
ForceRemovePod(namespace string, podName string, nodeName string) error

// AddNodeInfo adds the given NodeInfo to the snapshot. The Node and the Pods are added, as well as
// any DRA objects passed along them.
AddNodeInfo(nodeInfo *framework.NodeInfo) error
// RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as
// any DRA objects owned by them.
RemoveNodeInfo(nodeName string) error
// GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot.
// This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos
// obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func BenchmarkAddPods(b *testing.B) {
err := clusterSnapshot.SetClusterState(nodes, nil)
assert.NoError(b, err)
b.ResetTimer()
b.Run(fmt.Sprintf("%s: AddPod() 30*%d", snapshotName, tc), func(b *testing.B) {
b.Run(fmt.Sprintf("%s: ForceAddPod() 30*%d", snapshotName, tc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

Expand All @@ -143,7 +143,7 @@ func BenchmarkAddPods(b *testing.B) {
}
b.StartTimer()
for _, pod := range pods {
err = clusterSnapshot.AddPod(pod, pod.Spec.NodeName)
err = clusterSnapshot.ForceAddPod(pod, pod.Spec.NodeName)
if err != nil {
assert.NoError(b, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,22 @@ func validTestCases(t *testing.T) []modificationTestCase {
},
},
{
name: "remove node",
name: "remove nodeInfo",
state: snapshotState{
nodes: []*apiv1.Node{node},
},
op: func(snapshot ClusterSnapshot) {
err := snapshot.RemoveNode(node.Name)
err := snapshot.RemoveNodeInfo(node.Name)
assert.NoError(t, err)
},
},
{
name: "remove node, then add it back",
name: "remove nodeInfo, then add it back",
state: snapshotState{
nodes: []*apiv1.Node{node},
},
op: func(snapshot ClusterSnapshot) {
err := snapshot.RemoveNode(node.Name)
err := snapshot.RemoveNodeInfo(node.Name)
assert.NoError(t, err)

err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
Expand All @@ -141,14 +141,14 @@ func validTestCases(t *testing.T) []modificationTestCase {
},
},
{
name: "add pod, then remove node",
name: "add pod, then remove nodeInfo",
state: snapshotState{
nodes: []*apiv1.Node{node},
},
op: func(snapshot ClusterSnapshot) {
err := snapshot.AddPod(pod, node.Name)
err := snapshot.ForceAddPod(pod, node.Name)
assert.NoError(t, err)
err = snapshot.RemoveNode(node.Name)
err = snapshot.RemoveNodeInfo(node.Name)
assert.NoError(t, err)
},
},
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestClear(t *testing.T) {
}

for _, pod := range extraPods {
err := snapshot.AddPod(pod, pod.Spec.NodeName)
err := snapshot.ForceAddPod(pod, pod.Spec.NodeName)
assert.NoError(t, err)
}

Expand All @@ -349,17 +349,17 @@ func TestNode404(t *testing.T) {
op func(ClusterSnapshot) error
}{
{"add pod", func(snapshot ClusterSnapshot) error {
return snapshot.AddPod(BuildTestPod("p1", 0, 0), "node")
return snapshot.ForceAddPod(BuildTestPod("p1", 0, 0), "node")
}},
{"remove pod", func(snapshot ClusterSnapshot) error {
return snapshot.RemovePod("default", "p1", "node")
return snapshot.ForceRemovePod("default", "p1", "node")
}},
{"get node", func(snapshot ClusterSnapshot) error {
_, err := snapshot.NodeInfos().Get("node")
return err
}},
{"remove node", func(snapshot ClusterSnapshot) error {
return snapshot.RemoveNode("node")
{"remove nodeInfo", func(snapshot ClusterSnapshot) error {
return snapshot.RemoveNodeInfo("node")
}},
}

Expand All @@ -385,7 +385,7 @@ func TestNode404(t *testing.T) {
snapshot.Fork()
assert.NoError(t, err)

err = snapshot.RemoveNode("node")
err = snapshot.RemoveNodeInfo("node")
assert.NoError(t, err)

// Node deleted after fork - shouldn't be able to operate on it.
Expand All @@ -408,7 +408,7 @@ func TestNode404(t *testing.T) {
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

err = snapshot.RemoveNode("node")
err = snapshot.RemoveNodeInfo("node")
assert.NoError(t, err)

// Node deleted from base - shouldn't be able to operate on it.
Expand Down Expand Up @@ -625,7 +625,7 @@ func TestPVCUsedByPods(t *testing.T) {
assert.Equal(t, tc.exists, volumeExists)

if tc.removePod != "" {
err = snapshot.RemovePod("default", tc.removePod, "node")
err = snapshot.ForceRemovePod("default", tc.removePod, "node")
assert.NoError(t, err)

volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName))
Expand Down Expand Up @@ -698,7 +698,7 @@ func TestPVCClearAndFork(t *testing.T) {
volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1"))
assert.Equal(t, true, volumeExists)

err = snapshot.AddPod(pod2, "node")
err = snapshot.ForceAddPod(pod2, "node")
assert.NoError(t, err)

volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2"))
Expand Down
20 changes: 10 additions & 10 deletions cluster-autoscaler/simulator/clustersnapshot/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (data *internalDeltaSnapshotData) clearPodCaches() {
data.pvcNamespaceMap = nil
}

func (data *internalDeltaSnapshotData) removeNode(nodeName string) error {
func (data *internalDeltaSnapshotData) removeNodeInfo(nodeName string) error {
_, foundInDelta := data.addedNodeInfoMap[nodeName]
if foundInDelta {
// If node was added within this delta, delete this change.
Expand Down Expand Up @@ -296,12 +296,12 @@ func (data *internalDeltaSnapshotData) commit() (*internalDeltaSnapshotData, err
return data, nil
}
for node := range data.deletedNodeInfos {
if err := data.baseData.removeNode(node); err != nil {
if err := data.baseData.removeNodeInfo(node); err != nil {
return nil, err
}
}
for _, node := range data.modifiedNodeInfoMap {
if err := data.baseData.removeNode(node.Node().Name); err != nil {
if err := data.baseData.removeNodeInfo(node.Node().Name); err != nil {
return nil, err
}
if err := data.baseData.addNodeInfo(node); err != nil {
Expand Down Expand Up @@ -442,18 +442,18 @@ func (snapshot *DeltaClusterSnapshot) SetClusterState(nodes []*apiv1.Node, sched
return nil
}

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.data.removeNode(nodeName)
// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemoveNodeInfo(nodeName string) error {
return snapshot.data.removeNodeInfo(nodeName)
}

// AddPod adds pod to the snapshot and schedules it to given node.
func (snapshot *DeltaClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error {
// ForceAddPod adds pod to the snapshot and schedules it to given node.
func (snapshot *DeltaClusterSnapshot) ForceAddPod(pod *apiv1.Pod, nodeName string) error {
return snapshot.data.addPod(pod, nodeName)
}

// RemovePod removes pod from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemovePod(namespace, podName, nodeName string) error {
// ForceRemovePod removes pod from the snapshot.
func (snapshot *DeltaClusterSnapshot) ForceRemovePod(namespace, podName, nodeName string) error {
return snapshot.data.removePod(namespace, podName, nodeName)
}

Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/simulator/clustersnapshot/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
)
Expand All @@ -42,10 +43,10 @@ func InitializeClusterSnapshotOrDie(

for _, pod := range pods {
if pod.Spec.NodeName != "" {
err = snapshot.AddPod(pod, pod.Spec.NodeName)
err = snapshot.ForceAddPod(pod, pod.Spec.NodeName)
assert.NoError(t, err, "error while adding pod %s/%s to node %s", pod.Namespace, pod.Name, pod.Spec.NodeName)
} else if pod.Status.NominatedNodeName != "" {
err = snapshot.AddPod(pod, pod.Status.NominatedNodeName)
err = snapshot.ForceAddPod(pod, pod.Status.NominatedNodeName)
assert.NoError(t, err, "error while adding pod %s/%s to nominated node %s", pod.Namespace, pod.Name, pod.Status.NominatedNodeName)
} else {
assert.Fail(t, "pod %s/%s does not have Spec.NodeName nor Status.NominatedNodeName set", pod.Namespace, pod.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *HintingSimulator) TrySchedulePods(clusterSnapshot clustersnapshot.Clust

if nodeName != "" {
klogx.V(4).UpTo(loggingQuota).Infof("Pod %s/%s can be moved to %s", pod.Namespace, pod.Name, nodeName)
if err := clusterSnapshot.AddPod(pod, nodeName); err != nil {
if err := clusterSnapshot.ForceAddPod(pod, nodeName); err != nil {
return nil, 0, fmt.Errorf("simulating scheduling of %s/%s to %s return error; %v", pod.Namespace, pod.Name, nodeName, err)
}
statuses = append(statuses, Status{Pod: pod, NodeName: nodeName})
Expand Down

0 comments on commit e960edf

Please sign in to comment.