Skip to content

Commit

Permalink
Merge pull request kubernetes#6273 from fische/fix-taint-unselected-node
Browse files Browse the repository at this point in the history
Stop (un)tainting nodes from unselected node groups.
  • Loading branch information
k8s-ci-robot authored Feb 6, 2024
2 parents 00fbbe1 + e8e3ad0 commit 3802594
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 3 deletions.
44 changes: 42 additions & 2 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
if allNodes, err := a.AllNodeLister().List(); err != nil {
klog.Errorf("Failed to list ready nodes, not cleaning up taints: %v", err)
} else {
taints.CleanAllToBeDeleted(allNodes,
// Make sure we are only cleaning taints from selected node groups.
selectedNodes := filterNodesFromSelectedGroups(a.CloudProvider, allNodes...)
taints.CleanAllToBeDeleted(selectedNodes,
a.AutoscalingContext.ClientSet, a.Recorder, a.CordonNodeBeforeTerminate)
if a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount == 0 {
// Clean old taints if soft taints handling is disabled
Expand Down Expand Up @@ -656,7 +658,14 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) caerrors.AutoscalerErr
scaleDownStatus.Result == scaledownstatus.ScaleDownNoUnneeded) &&
a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount != 0 {
taintableNodes := a.scaleDownPlanner.UnneededNodes()
untaintableNodes := subtractNodes(allNodes, taintableNodes)

// Make sure we are only cleaning taints from selected node groups.
selectedNodes := filterNodesFromSelectedGroups(a.CloudProvider, allNodes...)

// This is a sanity check to make sure `taintableNodes` only includes
// nodes from selected nodes.
taintableNodes = intersectNodes(selectedNodes, taintableNodes)
untaintableNodes := subtractNodes(selectedNodes, taintableNodes)
actuation.UpdateSoftDeletionTaints(a.AutoscalingContext, taintableNodes, untaintableNodes)
}

Expand Down Expand Up @@ -972,6 +981,18 @@ func (a *StaticAutoscaler) obtainNodeLists() ([]*apiv1.Node, []*apiv1.Node, caer
return allNodes, readyNodes, nil
}

func filterNodesFromSelectedGroups(cp cloudprovider.CloudProvider, nodes ...*apiv1.Node) []*apiv1.Node {
filtered := make([]*apiv1.Node, 0, len(nodes))
for _, n := range nodes {
if ng, err := cp.NodeGroupForNode(n); err != nil {
klog.Errorf("Failed to get a node group node node: %v", err)
} else if ng != nil {
filtered = append(filtered, n)
}
}
return filtered
}

func (a *StaticAutoscaler) updateClusterState(allNodes []*apiv1.Node, nodeInfosForGroups map[string]*schedulerframework.NodeInfo, currentTime time.Time) caerrors.AutoscalerError {
err := a.clusterStateRegistry.UpdateNodes(allNodes, nodeInfosForGroups, currentTime)
if err != nil {
Expand Down Expand Up @@ -1070,6 +1091,25 @@ func subtractNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
return subtractNodesByName(a, nodeNames(b))
}

func filterNodesByName(nodes []*apiv1.Node, names []string) []*apiv1.Node {
c := make([]*apiv1.Node, 0, len(names))
filterSet := make(map[string]bool, len(names))
for _, name := range names {
filterSet[name] = true
}
for _, n := range nodes {
if filterSet[n.Name] {
c = append(c, n)
}
}
return c
}

// intersectNodes gives intersection of 2 node lists
func intersectNodes(a []*apiv1.Node, b []*apiv1.Node) []*apiv1.Node {
return filterNodesByName(a, nodeNames(b))
}

func nodeNames(ns []*apiv1.Node) []string {
names := make([]string, len(ns))
for i, node := range ns {
Expand Down
103 changes: 102 additions & 1 deletion cluster-autoscaler/core/static_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package core

import (
"bytes"
stdcontext "context"
"flag"
"fmt"
"os"
Expand Down Expand Up @@ -459,7 +460,7 @@ func TestStaticAutoscalerRunOnce(t *testing.T) {
mock.AssertExpectationsForObjects(t, allPodListerMock,
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)

// Scale up to node gorup min size.
// Scale up to node group min size.
readyNodeLister.SetNodes([]*apiv1.Node{n4})
allNodeLister.SetNodes([]*apiv1.Node{n4})
allPodListerMock.On("List").Return([]*apiv1.Pod{}, nil).Twice()
Expand Down Expand Up @@ -1337,6 +1338,106 @@ func TestStaticAutoscalerRunOnceWithFilteringOnUpcomingNodesEnabledNoScaleUp(t *
podDisruptionBudgetListerMock, daemonSetListerMock, onScaleUpMock, onScaleDownMock)
}

// We should not touch taints from unselected node groups.
func TestStaticAutoscalerRunOnceWithUnselectedNodeGroups(t *testing.T) {
n1 := BuildTestNode("n1", 1000, 1000)
n1.Spec.Taints = append(n1.Spec.Taints, apiv1.Taint{
Key: taints.DeletionCandidateTaint,
Value: fmt.Sprint(time.Now().Unix()),
Effect: apiv1.TaintEffectPreferNoSchedule,
})
SetNodeReadyState(n1, true, time.Now())
n2 := BuildTestNode("n2", 1000, 1000)
n2.Spec.Taints = append(n2.Spec.Taints, apiv1.Taint{
Key: taints.DeletionCandidateTaint,
Value: fmt.Sprint(time.Now().Unix()),
Effect: apiv1.TaintEffectPreferNoSchedule,
})
SetNodeReadyState(n2, true, time.Now())

p1 := BuildTestPod("p1", 600, 100)
p1.Spec.NodeName = n1.Name

// set minimal cloud provider where only ng1 is defined as selected node group
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 1)
provider.AddNode("ng1", n1)
assert.NotNil(t, provider)

tests := map[string]struct {
node *apiv1.Node
pods []*apiv1.Pod
expectedTaints []apiv1.Taint
}{
"Node from selected node groups can get their deletion candidate taints removed": {
node: n1,
pods: []*apiv1.Pod{p1},
expectedTaints: []apiv1.Taint{},
},
"Node from non-selected node groups should keep their deletion candidate taints": {
node: n2,
pods: nil,
expectedTaints: n2.Spec.Taints,
},
}

for name, test := range tests {
// prevent issues with scoping, we should be able to get rid of that with Go 1.22
test := test
t.Run(name, func(t *testing.T) {
t.Parallel()
// Create fake listers for the generated nodes, nothing returned by the rest (but the ones used in the tested path have to be defined).
readyNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{test.node})
allNodeLister := kubernetes.NewTestNodeLister([]*apiv1.Node{test.node})
allPodListerMock := kubernetes.NewTestPodLister(test.pods)
daemonSetLister, err := kubernetes.NewTestDaemonSetLister(nil)
assert.NoError(t, err)
listerRegistry := kube_util.NewListerRegistry(allNodeLister, readyNodeLister, allPodListerMock,
kubernetes.NewTestPodDisruptionBudgetLister(nil), daemonSetLister,
nil, nil, nil, nil)

// Create context with minimal autoscalingOptions that guarantee we reach the tested logic.
autoscalingOptions := config.AutoscalingOptions{
ScaleDownEnabled: true,
MaxBulkSoftTaintCount: 10,
MaxBulkSoftTaintTime: 3 * time.Second,
}
processorCallbacks := newStaticAutoscalerProcessorCallbacks()
clientset := fake.NewSimpleClientset(test.node)
context, err := NewScaleTestAutoscalingContext(autoscalingOptions, clientset, listerRegistry, provider, processorCallbacks, nil)
assert.NoError(t, err)

// Create CSR with unhealthy cluster protection effectively disabled, to guarantee we reach the tested logic.
clusterStateConfig := clusterstate.ClusterStateRegistryConfig{
OkTotalUnreadyCount: 1,
}
clusterState := clusterstate.NewClusterStateRegistry(provider, clusterStateConfig, context.LogRecorder, NewBackoff(), nodegroupconfig.NewDefaultNodeGroupConfigProcessor(autoscalingOptions.NodeGroupDefaults))

// Setting the Actuator is necessary for testing any scale-down logic, it shouldn't have anything to do in this test.
sdActuator := actuation.NewActuator(&context, clusterState, deletiontracker.NewNodeDeletionTracker(0*time.Second), options.NodeDeleteOptions{}, nil, NewTestProcessors(&context).NodeGroupConfigProcessor)
context.ScaleDownActuator = sdActuator

// Fake planner that keeps track of the scale-down candidates passed to UpdateClusterState.
sdPlanner := &candidateTrackingFakePlanner{}

autoscaler := &StaticAutoscaler{
AutoscalingContext: &context,
clusterStateRegistry: clusterState,
scaleDownPlanner: sdPlanner,
scaleDownActuator: sdActuator,
processors: NewTestProcessors(&context),
processorCallbacks: processorCallbacks,
}

err = autoscaler.RunOnce(time.Now().Add(5 * time.Hour))
assert.NoError(t, err)
newNode, err := clientset.CoreV1().Nodes().Get(stdcontext.TODO(), test.node.Name, metav1.GetOptions{})
assert.NoError(t, err)
assert.Equal(t, test.expectedTaints, newNode.Spec.Taints)
})
}
}

func TestStaticAutoscalerRunOnceWithBypassedSchedulers(t *testing.T) {
bypassedScheduler := "bypassed-scheduler"
nonBypassedScheduler := "non-bypassed-scheduler"
Expand Down

0 comments on commit 3802594

Please sign in to comment.