Skip to content

Commit

Permalink
DRA: rename ClusterSnapshot AddPod, RemovePod, RemoveNode
Browse files Browse the repository at this point in the history
AddPod is renamed to SchedulePod, RemovePod to UnschedulePod. This makes
more sense in the DRA world as for DRA we're not only adding/removing
the pod, but also modifying its ResourceClaims - but not adding/removing
them (the ResourceClaims need to be tracked even for pods that aren't
scheduled).

RemoveNode is renamed to RemoveNodeInfo for consistency with other
NodeInfo methods.
  • Loading branch information
towca committed Nov 5, 2024
1 parent b1015ae commit c7d18df
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (p *filterOutExpendable) Process(context *context.AutoscalingContext, pods
// CA logic from before migration to scheduler framework. So let's keep it for now
func (p *filterOutExpendable) addPreemptingPodsToSnapshot(pods []*apiv1.Pod, ctx *context.AutoscalingContext) error {
for _, p := range pods {
if err := ctx.ClusterSnapshot.AddPod(p, p.Status.NominatedNodeName); err != nil {
if err := ctx.ClusterSnapshot.SchedulePod(p, p.Status.NominatedNodeName); err != nil {
klog.Errorf("Failed to update snapshot with pod %s/%s waiting for preemption: %v", p.Namespace, p.Name, err)
return caerrors.ToAutoscalerError(caerrors.InternalError, err)
}
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
allNodes = subtractNodesByName(allNodes, allRegisteredUpcoming)
// Remove the nodes from the snapshot as well so that the state is consistent.
for _, notStartedNodeName := range allRegisteredUpcoming {
err := a.ClusterSnapshot.RemoveNode(notStartedNodeName)
err := a.ClusterSnapshot.RemoveNodeInfo(notStartedNodeName)
if err != nil {
klog.Errorf("Failed to remove NotStarted node %s from cluster snapshot: %v", notStartedNodeName, err)
// ErrNodeNotFound shouldn't happen (so it needs to be logged above if it does), but what we care about here is that the
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/estimator/binpacking_estimator.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (e *BinpackingNodeEstimator) tryToAddNode(
pod *apiv1.Pod,
nodeName string,
) error {
if err := e.clusterSnapshot.AddPod(pod, nodeName); err != nil {
if err := e.clusterSnapshot.SchedulePod(pod, nodeName); err != nil {
return fmt.Errorf("Error adding pod %v.%v to node %v in ClusterSnapshot; %v", pod.Namespace, pod.Name, nodeName, err)
}
estimationState.newNodesWithPods[nodeName] = true
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/simulator/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (r *RemovalSimulator) findPlaceFor(removedNode string, pods []*apiv1.Pod, n

// remove pods from clusterSnapshot first
for _, pod := range pods {
if err := r.clusterSnapshot.RemovePod(pod.Namespace, pod.Name, removedNode); err != nil {
if err := r.clusterSnapshot.UnschedulePod(pod.Namespace, pod.Name, removedNode); err != nil {
// just log error
klog.Errorf("Simulating removal of %s/%s return error; %v", pod.Namespace, pod.Name, err)
}
Expand Down
28 changes: 14 additions & 14 deletions cluster-autoscaler/simulator/clustersnapshot/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (data *internalBasicSnapshotData) addNode(node *apiv1.Node) error {
return nil
}

func (data *internalBasicSnapshotData) removeNode(nodeName string) error {
func (data *internalBasicSnapshotData) removeNodeInfo(nodeName string) error {
if _, found := data.nodeInfoMap[nodeName]; !found {
return ErrNodeNotFound
}
Expand All @@ -164,7 +164,7 @@ func (data *internalBasicSnapshotData) removeNode(nodeName string) error {
return nil
}

func (data *internalBasicSnapshotData) addPod(pod *apiv1.Pod, nodeName string) error {
func (data *internalBasicSnapshotData) schedulePod(pod *apiv1.Pod, nodeName string) error {
if _, found := data.nodeInfoMap[nodeName]; !found {
return ErrNodeNotFound
}
Expand All @@ -173,7 +173,7 @@ func (data *internalBasicSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e
return nil
}

func (data *internalBasicSnapshotData) removePod(namespace, podName, nodeName string) error {
func (data *internalBasicSnapshotData) unschedulePod(namespace, podName, nodeName string) error {
nodeInfo, found := data.nodeInfoMap[nodeName]
if !found {
return ErrNodeNotFound
Expand Down Expand Up @@ -225,7 +225,7 @@ func (snapshot *BasicClusterSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo)
return err
}
for _, podInfo := range nodeInfo.Pods() {
if err := snapshot.getInternalData().addPod(podInfo.Pod, nodeInfo.Node().Name); err != nil {
if err := snapshot.getInternalData().schedulePod(podInfo.Pod, nodeInfo.Node().Name); err != nil {
return err
}
}
Expand All @@ -244,27 +244,27 @@ func (snapshot *BasicClusterSnapshot) Initialize(nodes []*apiv1.Node, scheduledP
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.getInternalData().addPod(pod, pod.Spec.NodeName); err != nil {
if err := snapshot.getInternalData().schedulePod(pod, pod.Spec.NodeName); err != nil {
return err
}
}
}
return nil
}

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.getInternalData().removeNode(nodeName)
// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *BasicClusterSnapshot) RemoveNodeInfo(nodeName string) error {
return snapshot.getInternalData().removeNodeInfo(nodeName)
}

// AddPod adds pod to the snapshot and schedules it to given node.
func (snapshot *BasicClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error {
return snapshot.getInternalData().addPod(pod, nodeName)
// SchedulePod adds pod to the snapshot and schedules it to given node.
func (snapshot *BasicClusterSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) error {
return snapshot.getInternalData().schedulePod(pod, nodeName)
}

// RemovePod removes pod from the snapshot.
func (snapshot *BasicClusterSnapshot) RemovePod(namespace, podName, nodeName string) error {
return snapshot.getInternalData().removePod(namespace, podName, nodeName)
// UnschedulePod removes pod from the snapshot.
func (snapshot *BasicClusterSnapshot) UnschedulePod(namespace, podName, nodeName string) error {
return snapshot.getInternalData().unschedulePod(namespace, podName, nodeName)
}

// IsPVCUsedByPods returns if the pvc is used by any pod
Expand Down
13 changes: 7 additions & 6 deletions cluster-autoscaler/simulator/clustersnapshot/clustersnapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,17 @@ type ClusterSnapshot interface {
// scheduled pods.
Initialize(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
RemoveNode(nodeName string) error
// AddPod adds pod to the snapshot and schedules it to given node.
AddPod(pod *apiv1.Pod, nodeName string) error
// RemovePod removes pod from the snapshot.
RemovePod(namespace string, podName string, nodeName string) error
// SchedulePod schedules the given Pod onto the Node with the given nodeName inside the snapshot.
SchedulePod(pod *apiv1.Pod, nodeName string) error
// UnschedulePod removes the given Pod from the given Node inside the snapshot.
UnschedulePod(namespace string, podName string, nodeName string) error

// AddNodeInfo adds the given NodeInfo to the snapshot. The Node and the Pods are added, as well as
// any DRA objects passed along them.
AddNodeInfo(nodeInfo *framework.NodeInfo) error
// RemoveNodeInfo removes the given NodeInfo from the snapshot The Node and the Pods are removed, as well as
// any DRA objects owned by them.
RemoveNodeInfo(nodeName string) error
// GetNodeInfo returns an internal NodeInfo for a given Node - all information about the Node tracked in the snapshot.
// This means the Node itself, its scheduled Pods, as well as all relevant DRA objects. The internal NodeInfos
// obtained via this method should always be used in CA code instead of directly using *schedulerframework.NodeInfo.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func BenchmarkListNodeInfos(b *testing.B) {
}
}

func BenchmarkAddPods(b *testing.B) {
func BenchmarkSchedulePods(b *testing.B) {
testCases := []int{1, 10, 100, 1000, 5000, 15000}

for snapshotName, snapshotFactory := range snapshots {
Expand All @@ -132,7 +132,7 @@ func BenchmarkAddPods(b *testing.B) {
err := clusterSnapshot.Initialize(nodes, nil)
assert.NoError(b, err)
b.ResetTimer()
b.Run(fmt.Sprintf("%s: AddPod() 30*%d", snapshotName, tc), func(b *testing.B) {
b.Run(fmt.Sprintf("%s: SchedulePod() 30*%d", snapshotName, tc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

Expand All @@ -142,7 +142,7 @@ func BenchmarkAddPods(b *testing.B) {
}
b.StartTimer()
for _, pod := range pods {
err = clusterSnapshot.AddPod(pod, pod.Spec.NodeName)
err = clusterSnapshot.SchedulePod(pod, pod.Spec.NodeName)
if err != nil {
assert.NoError(b, err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,22 @@ func validTestCases(t *testing.T) []modificationTestCase {
},
},
{
name: "remove node",
name: "remove nodeInfo",
state: snapshotState{
nodes: []*apiv1.Node{node},
},
op: func(snapshot ClusterSnapshot) {
err := snapshot.RemoveNode(node.Name)
err := snapshot.RemoveNodeInfo(node.Name)
assert.NoError(t, err)
},
},
{
name: "remove node, then add it back",
name: "remove nodeInfo, then add it back",
state: snapshotState{
nodes: []*apiv1.Node{node},
},
op: func(snapshot ClusterSnapshot) {
err := snapshot.RemoveNode(node.Name)
err := snapshot.RemoveNodeInfo(node.Name)
assert.NoError(t, err)

err = snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
Expand All @@ -141,14 +141,14 @@ func validTestCases(t *testing.T) []modificationTestCase {
},
},
{
name: "add pod, then remove node",
name: "add pod, then remove nodeInfo",
state: snapshotState{
nodes: []*apiv1.Node{node},
},
op: func(snapshot ClusterSnapshot) {
err := snapshot.AddPod(pod, node.Name)
err := snapshot.SchedulePod(pod, node.Name)
assert.NoError(t, err)
err = snapshot.RemoveNode(node.Name)
err = snapshot.RemoveNodeInfo(node.Name)
assert.NoError(t, err)
},
},
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestClear(t *testing.T) {
}

for _, pod := range extraPods {
err := snapshot.AddPod(pod, pod.Spec.NodeName)
err := snapshot.SchedulePod(pod, pod.Spec.NodeName)
assert.NoError(t, err)
}

Expand All @@ -348,18 +348,18 @@ func TestNode404(t *testing.T) {
name string
op func(ClusterSnapshot) error
}{
{"add pod", func(snapshot ClusterSnapshot) error {
return snapshot.AddPod(BuildTestPod("p1", 0, 0), "node")
{"schedule pod", func(snapshot ClusterSnapshot) error {
return snapshot.SchedulePod(BuildTestPod("p1", 0, 0), "node")
}},
{"remove pod", func(snapshot ClusterSnapshot) error {
return snapshot.RemovePod("default", "p1", "node")
{"unschedule pod", func(snapshot ClusterSnapshot) error {
return snapshot.UnschedulePod("default", "p1", "node")
}},
{"get node", func(snapshot ClusterSnapshot) error {
_, err := snapshot.NodeInfos().Get("node")
return err
}},
{"remove node", func(snapshot ClusterSnapshot) error {
return snapshot.RemoveNode("node")
{"remove nodeInfo", func(snapshot ClusterSnapshot) error {
return snapshot.RemoveNodeInfo("node")
}},
}

Expand All @@ -385,7 +385,7 @@ func TestNode404(t *testing.T) {
snapshot.Fork()
assert.NoError(t, err)

err = snapshot.RemoveNode("node")
err = snapshot.RemoveNodeInfo("node")
assert.NoError(t, err)

// Node deleted after fork - shouldn't be able to operate on it.
Expand All @@ -408,7 +408,7 @@ func TestNode404(t *testing.T) {
err := snapshot.AddNodeInfo(framework.NewTestNodeInfo(node))
assert.NoError(t, err)

err = snapshot.RemoveNode("node")
err = snapshot.RemoveNodeInfo("node")
assert.NoError(t, err)

// Node deleted from base - shouldn't be able to operate on it.
Expand Down Expand Up @@ -625,7 +625,7 @@ func TestPVCUsedByPods(t *testing.T) {
assert.Equal(t, tc.exists, volumeExists)

if tc.removePod != "" {
err = snapshot.RemovePod("default", tc.removePod, "node")
err = snapshot.UnschedulePod("default", tc.removePod, "node")
assert.NoError(t, err)

volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", tc.claimName))
Expand Down Expand Up @@ -698,7 +698,7 @@ func TestPVCClearAndFork(t *testing.T) {
volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim1"))
assert.Equal(t, true, volumeExists)

err = snapshot.AddPod(pod2, "node")
err = snapshot.SchedulePod(pod2, "node")
assert.NoError(t, err)

volumeExists = snapshot.StorageInfos().IsPVCUsedByPods(schedulerframework.GetNamespacedName("default", "claim2"))
Expand Down
32 changes: 16 additions & 16 deletions cluster-autoscaler/simulator/clustersnapshot/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (data *internalDeltaSnapshotData) clearPodCaches() {
data.pvcNamespaceMap = nil
}

func (data *internalDeltaSnapshotData) removeNode(nodeName string) error {
func (data *internalDeltaSnapshotData) removeNodeInfo(nodeName string) error {
_, foundInDelta := data.addedNodeInfoMap[nodeName]
if foundInDelta {
// If node was added within this delta, delete this change.
Expand Down Expand Up @@ -227,7 +227,7 @@ func (data *internalDeltaSnapshotData) nodeInfoToModify(nodeName string) (*sched
return dni, true
}

func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) error {
func (data *internalDeltaSnapshotData) schedulePod(pod *apiv1.Pod, nodeName string) error {
ni, found := data.nodeInfoToModify(nodeName)
if !found {
return ErrNodeNotFound
Expand All @@ -240,7 +240,7 @@ func (data *internalDeltaSnapshotData) addPod(pod *apiv1.Pod, nodeName string) e
return nil
}

func (data *internalDeltaSnapshotData) removePod(namespace, name, nodeName string) error {
func (data *internalDeltaSnapshotData) unschedulePod(namespace, name, nodeName string) error {
// This always clones node info, even if the pod is actually missing.
// Not sure if we mind, since removing non-existent pod
// probably means things are very bad anyway.
Expand Down Expand Up @@ -296,12 +296,12 @@ func (data *internalDeltaSnapshotData) commit() (*internalDeltaSnapshotData, err
return data, nil
}
for node := range data.deletedNodeInfos {
if err := data.baseData.removeNode(node); err != nil {
if err := data.baseData.removeNodeInfo(node); err != nil {
return nil, err
}
}
for _, node := range data.modifiedNodeInfoMap {
if err := data.baseData.removeNode(node.Node().Name); err != nil {
if err := data.baseData.removeNodeInfo(node.Node().Name); err != nil {
return nil, err
}
if err := data.baseData.addNodeInfo(node); err != nil {
Expand Down Expand Up @@ -414,7 +414,7 @@ func (snapshot *DeltaClusterSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo)
return err
}
for _, podInfo := range nodeInfo.Pods() {
if err := snapshot.data.addPod(podInfo.Pod, nodeInfo.Node().Name); err != nil {
if err := snapshot.data.schedulePod(podInfo.Pod, nodeInfo.Node().Name); err != nil {
return err
}
}
Expand All @@ -433,27 +433,27 @@ func (snapshot *DeltaClusterSnapshot) Initialize(nodes []*apiv1.Node, scheduledP
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.data.addPod(pod, pod.Spec.NodeName); err != nil {
if err := snapshot.data.schedulePod(pod, pod.Spec.NodeName); err != nil {
return err
}
}
}
return nil
}

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.data.removeNode(nodeName)
// RemoveNodeInfo removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemoveNodeInfo(nodeName string) error {
return snapshot.data.removeNodeInfo(nodeName)
}

// AddPod adds pod to the snapshot and schedules it to given node.
func (snapshot *DeltaClusterSnapshot) AddPod(pod *apiv1.Pod, nodeName string) error {
return snapshot.data.addPod(pod, nodeName)
// SchedulePod adds pod to the snapshot and schedules it to given node.
func (snapshot *DeltaClusterSnapshot) SchedulePod(pod *apiv1.Pod, nodeName string) error {
return snapshot.data.schedulePod(pod, nodeName)
}

// RemovePod removes pod from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemovePod(namespace, podName, nodeName string) error {
return snapshot.data.removePod(namespace, podName, nodeName)
// UnschedulePod removes pod from the snapshot.
func (snapshot *DeltaClusterSnapshot) UnschedulePod(namespace, podName, nodeName string) error {
return snapshot.data.unschedulePod(namespace, podName, nodeName)
}

// IsPVCUsedByPods returns if the pvc is used by any pod
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/simulator/clustersnapshot/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ func InitializeClusterSnapshotOrDie(

for _, pod := range pods {
if pod.Spec.NodeName != "" {
err = snapshot.AddPod(pod, pod.Spec.NodeName)
err = snapshot.SchedulePod(pod, pod.Spec.NodeName)
assert.NoError(t, err, "error while adding pod %s/%s to node %s", pod.Namespace, pod.Name, pod.Spec.NodeName)
} else if pod.Status.NominatedNodeName != "" {
err = snapshot.AddPod(pod, pod.Status.NominatedNodeName)
err = snapshot.SchedulePod(pod, pod.Status.NominatedNodeName)
assert.NoError(t, err, "error while adding pod %s/%s to nominated node %s", pod.Namespace, pod.Name, pod.Status.NominatedNodeName)
} else {
assert.Fail(t, "pod %s/%s does not have Spec.NodeName nor Status.NominatedNodeName set", pod.Namespace, pod.Name)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *HintingSimulator) TrySchedulePods(clusterSnapshot clustersnapshot.Clust

if nodeName != "" {
klogx.V(4).UpTo(loggingQuota).Infof("Pod %s/%s can be moved to %s", pod.Namespace, pod.Name, nodeName)
if err := clusterSnapshot.AddPod(pod, nodeName); err != nil {
if err := clusterSnapshot.SchedulePod(pod, nodeName); err != nil {
return nil, 0, fmt.Errorf("simulating scheduling of %s/%s to %s return error; %v", pod.Namespace, pod.Name, nodeName, err)
}
statuses = append(statuses, Status{Pod: pod, NodeName: nodeName})
Expand Down

0 comments on commit c7d18df

Please sign in to comment.