Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop (un)tainting nodes from unselected node groups. #6273

Merged
merged 11 commits into from
Feb 6, 2024
44 changes: 42 additions & 2 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,9 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
if allNodes, err := a.AllNodeLister().List(); err != nil {
klog.Errorf("Failed to list ready nodes, not cleaning up taints: %v", err)
} else {
taints.CleanAllToBeDeleted(allNodes,
// Make sure we are only cleaning taints from selected node groups.
selectedNodes := filterNodesFromSelectedGroups(a.CloudProvider, allNodes...)
taints.CleanAllToBeDeleted(selectedNodes,
a.AutoscalingContext.ClientSet, a.Recorder, a.CordonNodeBeforeTerminate)
if a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount == 0 {
// Clean old taints if soft taints handling is disabled
Expand Down Expand Up @@ -660,7 +662,14 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
scaleDownStatus.Result == scaledownstatus.ScaleDownNoUnneeded) &&
a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount != 0 {
taintableNodes := a.scaleDownPlanner.UnneededNodes()
untaintableNodes := subtractNodes(allNodes, taintableNodes)

// Make sure we are only cleaning taints from selected node groups.
selectedNodes := filterNodesFromSelectedGroups(a.CloudProvider, allNodes...)

// This is a sanity check to make sure `taintableNodes` only includes
// nodes from selected nodes.
taintableNodes = intersectNodes(selectedNodes, taintableNodes)
untaintableNodes := subtractNodes(selectedNodes, taintableNodes)
actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes)
}

Expand Down Expand Up @@ -964,6 +973,18 @@ func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*a
return allNodes, readyNodes, nil
}

func filterNodesFromSelectedGroups(cp cloudprovider.CloudProvider, nodes ...*apiv1.Node) []*apiv1.Node {
filtered := make([]*apiv1.Node, 0, len(nodes))
for _, n := range nodes {
if ng, err := cp.NodeGroupForNode(n); err != nil {
klog.Errorf("Failed to get a node group node node: %v", err)
} else if ng != nil {
filtered = append(filtered, n)
}
}
return filtered
}

func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulerframework.NodeInfo, currentTime time.Time) caerrors.AutoscalerError {
err := a.clusterStateRegistry.UpdateNodes(allNodes, nodeInfosForGroups, currentTime)
if err != nil {
Expand Down Expand Up @@ -1062,6 +1083,25 @@ func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
return subtractNodesByName(a, nodeNames(b))
}

func filterNodesByName(nodes []*apiv1.Node, names []string) []*apiv1.Node {
c := make([]*apiv1.Node, 0, len(names))
filterSet := make(map[string]bool, len(names))
for _, name := range names {
filterSet[name] = true
}
for _, n := range nodes {
if filterSet[n.Name] {
c = append(c, n)
}
}
return c
}

// intersectNodes gives intersection of 2 node lists
func intersectNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
return filterNodesByName(a, nodeNames(b))
}

func nodeNames(ns []*apiv1.Node) []string {
names := make([]string, len(ns))
for i, node := range ns {
Expand Down
103 changes: 102 additions & 1 deletion cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package core

import (
"bytes"
stdcontext "context"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -457,7 +458,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
mock.AssertExpectationsForObjects(t, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)

// Scale up to node gorup min size.
// Scale up to node group min size.
readyNodeLister.SetNodes([]*apiv1.Node{n4})
allNodeLister.SetNodes([]*apiv1.Node{n4})
allPodListerMock.On("List").Return([]*apiv1.Pod{}, nil).Twice()
Expand Down Expand Up @@ -1120,6 +1121,106 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
}

// We should not touch taints from unselected node groups.
func TestStaticAutoscalerRunOnceWithUnselectedNodeGroups(t *testing.T) {
n1 := BuildTestNode("n1", 1000, 1000)
n1.Spec.Taints = append(n1.Spec.Taints, apiv1.Taint{
Key: taints.DeletionCandidateTaint,
Value: fmt.Sprint(time.Now().Unix()),
Effect: apiv1.TaintEffectPreferNoSchedule,
})
SetNodeReadyState(n1, true, time.Now())
n2 := BuildTestNode("n2", 1000, 1000)
n2.Spec.Taints = append(n2.Spec.Taints, apiv1.Taint{
Key: taints.DeletionCandidateTaint,
Value: fmt.Sprint(time.Now().Unix()),
Effect: apiv1.TaintEffectPreferNoSchedule,
})
SetNodeReadyState(n2, true, time.Now())

p1 := BuildTestPod("p1", 600, 100)
p1.Spec.NodeName = n1.Name

// set minimal cloud provider where only ng1 is defined as selected node group
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNode("ng1", n1)
assert.NotNil(t, provider)

tests := map[string]struct {
node *apiv1.Node
pods []*apiv1.Pod
expectedTaints []apiv1.Taint
}{
"Node from selected node groups can get their deletion candidate taints removed": {
node: n1,
pods: []*apiv1.Pod{p1},
expectedTaints: []apiv1.Taint{},
},
"Node from non-selected node groups should keep their deletion candidate taints": {
node: n2,
pods: nil,
expectedTaints: n2.Spec.Taints,
},
}

for name, test := range tests {
// prevent issues with scoping, we should be able to get rid of that with Go 1.22
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
// Create fake listers for the generated nodes, nothing returned by the rest (but the ones used in the tested path have to be defined).
readyNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{test.node})
allNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{test.node})
allPodListerMock := kubernetes.NewTestPodLister(test.pods)
daemonSetLister, err := kubernetes.NewTestDaemonSetLister(nil)
assert.NoError(t, err)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock,
kubernetes.NewTestPodDisruptionBudgetLister(nil), daemonSetLister,
nil, nil, nil, nil)

// Create context with minimal autoscalingOptions that guarantee we reach the tested logic.
autoscalingOptions := config.AutoscalingOptions{
ScaleDownEnabled: true,
MaxBulkSoftTaintCount: 10,
MaxBulkSoftTaintTime: 3 * time.Second,
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
clientset := fake.NewSimpleClientset(test.node)
context, err := NewScaleTestAutoscalingContext(autoscalingOptions, clientset, listerRegistry, provider, processorCallbacks, nil)
assert.NoError(t, err)

// Create CSR with unhealthy cluster protection effectively disabled, to guarantee we reach the tested logic.
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
}
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingOptions.NodeGroupDefaults))

// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
sdActuator := actuation.NewActuator(&context, clusterState, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, NewTestProcessors(&context).NodeGroupConfigProcessor)
context.ScaleDownActuator = sdActuator

// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
sdPlanner := &candidateTrackingFakePlanner{}

autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: NewTestProcessors(&context),
processorCallbacks: processorCallbacks,
}

err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour))
assert.NoError(t, err)
newNode, err := clientset.CoreV1().Nodes().Get(stdcontext.TODO(), test.node.Name, metav1.GetOptions{})
assert.NoError(t, err)
assert.Equal(t, test.expectedTaints, newNode.Spec.Taints)
})
}
}

func TestStaticAutoscalerRunOnceWithBypassedSchedulers(t *testing.T) {
bypassedScheduler := "bypassed-scheduler"
nonBypassedScheduler := "non-bypassed-scheduler"
Expand Down
Loading