Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CA: refactor ClusterSnapshot methods #7466

Merged
merged 6 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/context"
Expand Down Expand Up @@ -109,7 +110,8 @@ func TestFilterOutExpendable(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
processor := NewFilterOutExpendablePodListProcessor()
snapshot := clustersnapshot.NewBasicClusterSnapshot()
snapshot.AddNodes(tc.nodes)
err := snapshot.SetClusterState(tc.nodes, nil)
assert.NoError(t, err)

pods, err := processor.Process(&context.AutoscalingContext{
ClusterSnapshot: snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/assert"

apiv1 "k8s.io/api/core/v1"
"k8s.io/autoscaler/cluster-autoscaler/simulator/clustersnapshot"
"k8s.io/autoscaler/cluster-autoscaler/simulator/framework"
Expand Down Expand Up @@ -286,15 +287,10 @@ func BenchmarkFilterOutSchedulable(b *testing.B) {
assert.NoError(b, err)

clusterSnapshot := snapshotFactory()
if err := clusterSnapshot.AddNodes(nodes); err != nil {
if err := clusterSnapshot.SetClusterState(nodes, scheduledPods); err != nil {
assert.NoError(b, err)
}

for _, pod := range scheduledPods {
if err := clusterSnapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
assert.NoError(b, err)
}
}
b.ResetTimer()

for i := 0; i < b.N; i++ {
Expand Down
35 changes: 21 additions & 14 deletions cluster-autoscaler/simulator/clustersnapshot/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,6 @@ func (data *internalBasicSnapshotData) addNode(node *apiv1.Node) error {
return nil
}

func (data *internalBasicSnapshotData) addNodes(nodes []*apiv1.Node) error {
for _, node := range nodes {
if err := data.addNode(node); err != nil {
return err
}
}
return nil
}

func (data *internalBasicSnapshotData) removeNode(nodeName string) error {
if _, found := data.nodeInfoMap[nodeName]; !found {
return ErrNodeNotFound
Expand Down Expand Up @@ -241,16 +232,32 @@ func (snapshot *BasicClusterSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo)
return nil
}

// SetClusterState sets the cluster state.
func (snapshot *BasicClusterSnapshot) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error {
snapshot.Clear()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we just remove Clear() from interface at this point? It feels completely redundant. Initialize(nil, nil) should do exactly the same thing and, after renaming it, should also be a very obvious and natural way of doing that (not to mention the fact that IIRC we never Clear() a snapshot without immediately re-initializing it, so the Clear() functionality is probably never needed).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, full agreement here - removed Clear().


knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only error condition for adding a node is if your []*apiv1.Node set has a duplicate. I wonder if there's a more efficient way of doing that targeted error handling earlier in the execution flow so we don't have to do so much error handling at this point. It would also have the side-benefit of allowing us to ditch this knownNodes accounting overhead in this function.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this is the perfect place to validate this:

  • The alternative is for Initialize() to assume that some validation happened earlier and that its input is correct. This doesn't seem safe, as it relies on every Initialize() user properly validating data first.
  • There are multiple places that call Initialize(), so ideally we'd want to extract the validation logic to remove redundancy anyway. If we're extracting it outside of Initialize(), we essentially have 2 functions that always need to be called in sequence.

Keep in mind that this should be called once per snapshot per loop, so the knownNodes overhead should be trivial compared to the rest of the loop.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

O.K. I think my initial skepticism here is keeping the source of truth in two places here:

  1. in the cluster snapshot data store
  2. in the local function (knownNodes)

I don't have any reason to suspect a race condition risk here, but would this be more "correct"?:

	for _, pod := range scheduledPods {
		if err := snapshot.getInternalData().addPod(pod, pod.Spec.NodeName); err != nil {
			continue
		}
	}

The only error return condition for the addPod() method is

	if _, found := data.nodeInfoMap[nodeName]; !found {
		return ErrNodeNotFound
	}

So rather than keeping a local accounting of "found nodes" (knownNodes) and conditionally calling addPod(), we could simply leverage the lookup in addPod() which is equivalently expensive to if knownNodes[pod.Spec.NodeName], and by so doing permit ourselves to drop the knownNodes local accounting.

return err
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a slight modification from the original usage of AddNode -> AddPod. I think it's confusing that in case a pod with pod.Spec.NodeName not present in the nodes is provided -> we skip it, instead of returning an error. The original use cases in codebase would return error in such cases.

What would you think about removing the knownNodes from this method and letting snapshot.AddPod return ErrNodeNotFound?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, after additional review I see that it is meant to replace eg. static_autoscaler.go usage of AddNode, which ignores pods with unknown nodes. Do you think it's okay that we do it this way? I am worried that some places might depend on the error returned by AddPod.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah your second comment got it right - SetClusterState is intended to match/replace initializeClusterSnapshot from static_autoscaler.go. The direct replacement for AddNode is AddNodeInfo.

There are basically 2 usages in production code - StaticAutoscaler and Actuator. Both first filter the provided pods to only those which have nodeName set. So I'm pretty confident that this change is safe from the perspective of not changing behavior.

It's not ideal that each caller has to process the input pods in the same way, but I'd prefer to tackle this when we get to having unschedulable pods tracked in the snapshot. Then SetClusterState will just take a list of all Pods, and split it between scheduled and unschedulable internally. If a pod doesn't have a nodeName set, it's just unschedulable - no error condition.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds good to me, especially given that we will be able to tackle it for unschedulable pods.

if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
return err
}
}
}
return nil
}

// AddNode adds node to the snapshot.
func (snapshot *BasicClusterSnapshot) AddNode(node *apiv1.Node) error {
return snapshot.getInternalData().addNode(node)
}

// AddNodes adds nodes in batch to the snapshot.
func (snapshot *BasicClusterSnapshot) AddNodes(nodes []*apiv1.Node) error {
return snapshot.getInternalData().addNodes(nodes)
}

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *BasicClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.getInternalData().removeNode(nodeName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@ import (
// It exposes mutation methods and can be viewed as scheduler's SharedLister.
type ClusterSnapshot interface {
schedulerframework.SharedLister

// SetClusterState resets the snapshot to an unforked state and replaces the contents of the snapshot
// with the provided data. scheduledPods are correlated to their Nodes based on spec.NodeName.
SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error

// AddNode adds node to the snapshot.
AddNode(node *apiv1.Node) error
// AddNodes adds nodes to the snapshot.
AddNodes(nodes []*apiv1.Node) error
// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
RemoveNode(nodeName string) error
// AddPod adds pod to the snapshot and schedules it to given node.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"time"

"github.com/stretchr/testify/assert"

. "k8s.io/autoscaler/cluster-autoscaler/utils/test"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -67,7 +68,7 @@ func assignPodsToNodes(pods []*apiv1.Pod, nodes []*apiv1.Node) {
}
}

func BenchmarkAddNodes(b *testing.B) {
func BenchmarkAddNode(b *testing.B) {
testCases := []int{1, 10, 100, 1000, 5000, 15000, 100000}

for snapshotName, snapshotFactory := range snapshots {
Expand All @@ -90,24 +91,6 @@ func BenchmarkAddNodes(b *testing.B) {
})
}
}
for snapshotName, snapshotFactory := range snapshots {
for _, tc := range testCases {
nodes := createTestNodes(tc)
clusterSnapshot := snapshotFactory()
b.ResetTimer()
b.Run(fmt.Sprintf("%s: AddNodes() %d", snapshotName, tc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
clusterSnapshot.Clear()
b.StartTimer()
err := clusterSnapshot.AddNodes(nodes)
if err != nil {
assert.NoError(b, err)
}
}
})
}
}
}

func BenchmarkListNodeInfos(b *testing.B) {
Expand All @@ -117,7 +100,7 @@ func BenchmarkListNodeInfos(b *testing.B) {
for _, tc := range testCases {
nodes := createTestNodes(tc)
clusterSnapshot := snapshotFactory()
err := clusterSnapshot.AddNodes(nodes)
err := clusterSnapshot.SetClusterState(nodes, nil)
if err != nil {
assert.NoError(b, err)
}
Expand All @@ -142,19 +125,18 @@ func BenchmarkAddPods(b *testing.B) {

for snapshotName, snapshotFactory := range snapshots {
for _, tc := range testCases {
clusterSnapshot := snapshotFactory()
nodes := createTestNodes(tc)
err := clusterSnapshot.AddNodes(nodes)
assert.NoError(b, err)
pods := createTestPods(tc * 30)
assignPodsToNodes(pods, nodes)
clusterSnapshot := snapshotFactory()
err := clusterSnapshot.SetClusterState(nodes, nil)
assert.NoError(b, err)
b.ResetTimer()
b.Run(fmt.Sprintf("%s: AddPod() 30*%d", snapshotName, tc), func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
clusterSnapshot.Clear()

err = clusterSnapshot.AddNodes(nodes)
err = clusterSnapshot.SetClusterState(nodes, nil)
if err != nil {
assert.NoError(b, err)
}
Expand Down Expand Up @@ -182,12 +164,8 @@ func BenchmarkForkAddRevert(b *testing.B) {
pods := createTestPods(ntc * ptc)
assignPodsToNodes(pods, nodes)
clusterSnapshot := snapshotFactory()
err := clusterSnapshot.AddNodes(nodes)
err := clusterSnapshot.SetClusterState(nodes, pods)
assert.NoError(b, err)
for _, pod := range pods {
err = clusterSnapshot.AddPod(pod, pod.Spec.NodeName)
assert.NoError(b, err)
}
tmpNode1 := BuildTestNode("tmp-1", 2000, 2000000)
tmpNode2 := BuildTestNode("tmp-2", 2000, 2000000)
b.ResetTimer()
Expand Down Expand Up @@ -234,12 +212,14 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
b.Run(fmt.Sprintf("fork add 1000 to %d", tc.nodeCount), func(b *testing.B) {
nodes := createTestNodes(tc.nodeCount + 1000)
snapshot := NewDeltaClusterSnapshot()
if err := snapshot.AddNodes(nodes[:tc.nodeCount]); err != nil {
if err := snapshot.SetClusterState(nodes[:tc.nodeCount], nil); err != nil {
assert.NoError(b, err)
}
snapshot.Fork()
if err := snapshot.AddNodes(nodes[tc.nodeCount:]); err != nil {
assert.NoError(b, err)
for _, node := range nodes[tc.nodeCount:] {
if err := snapshot.AddNode(node); err != nil {
assert.NoError(b, err)
}
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand All @@ -254,7 +234,7 @@ func BenchmarkBuildNodeInfoList(b *testing.B) {
b.Run(fmt.Sprintf("base %d", tc.nodeCount), func(b *testing.B) {
nodes := createTestNodes(tc.nodeCount)
snapshot := NewDeltaClusterSnapshot()
if err := snapshot.AddNodes(nodes); err != nil {
if err := snapshot.SetClusterState(nodes, nil); err != nil {
assert.NoError(b, err)
}
b.ResetTimer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,8 @@ func getSnapshotState(t *testing.T, snapshot ClusterSnapshot) snapshotState {

func startSnapshot(t *testing.T, snapshotFactory func() ClusterSnapshot, state snapshotState) ClusterSnapshot {
snapshot := snapshotFactory()
err := snapshot.AddNodes(state.nodes)
err := snapshot.SetClusterState(state.nodes, state.pods)
assert.NoError(t, err)
for _, pod := range state.pods {
err := snapshot.AddPod(pod, pod.Spec.NodeName)
assert.NoError(t, err)
}
return snapshot
}

Expand Down Expand Up @@ -324,8 +320,10 @@ func TestClear(t *testing.T) {

snapshot.Fork()

err := snapshot.AddNodes(extraNodes)
assert.NoError(t, err)
for _, node := range extraNodes {
err := snapshot.AddNode(node)
assert.NoError(t, err)
}

for _, pod := range extraPods {
err := snapshot.AddPod(pod, pod.Spec.NodeName)
Expand All @@ -340,7 +338,6 @@ func TestClear(t *testing.T) {

// Clear() should break out of forked state.
snapshot.Fork()
assert.NoError(t, err)
})
}
}
Expand Down
36 changes: 21 additions & 15 deletions cluster-autoscaler/simulator/clustersnapshot/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,16 +136,6 @@ func (data *internalDeltaSnapshotData) buildNodeInfoList() []*schedulerframework
return nodeInfoList
}

// Convenience method to avoid writing loop for adding nodes.
func (data *internalDeltaSnapshotData) addNodes(nodes []*apiv1.Node) error {
for _, node := range nodes {
if err := data.addNode(node); err != nil {
return err
}
}
return nil
}

func (data *internalDeltaSnapshotData) addNode(node *apiv1.Node) error {
nodeInfo := schedulerframework.NewNodeInfo()
nodeInfo.SetNode(node)
Expand Down Expand Up @@ -431,16 +421,32 @@ func (snapshot *DeltaClusterSnapshot) AddNodeInfo(nodeInfo *framework.NodeInfo)
return nil
}

// SetClusterState sets the cluster state.
func (snapshot *DeltaClusterSnapshot) SetClusterState(nodes []*apiv1.Node, scheduledPods []*apiv1.Pod) error {
snapshot.Clear()

knownNodes := make(map[string]bool)
for _, node := range nodes {
if err := snapshot.AddNode(node); err != nil {
return err
}
knownNodes[node.Name] = true
}
for _, pod := range scheduledPods {
if knownNodes[pod.Spec.NodeName] {
if err := snapshot.AddPod(pod, pod.Spec.NodeName); err != nil {
return err
}
}
}
return nil
}

// AddNode adds node to the snapshot.
func (snapshot *DeltaClusterSnapshot) AddNode(node *apiv1.Node) error {
return snapshot.data.addNode(node)
}

// AddNodes adds nodes in batch to the snapshot.
func (snapshot *DeltaClusterSnapshot) AddNodes(nodes []*apiv1.Node) error {
return snapshot.data.addNodes(nodes)
}

// RemoveNode removes nodes (and pods scheduled to it) from the snapshot.
func (snapshot *DeltaClusterSnapshot) RemoveNode(nodeName string) error {
return snapshot.data.removeNode(nodeName)
Expand Down