Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Identifying cloud provider deleted nodes #5054

Merged
merged 9 commits into from
Dec 16, 2022
Next Next commit
Adding support for identifying nodes that have been deleted from clou…
…d provider that are still registered within Kubernetes. Avoids misidentifying not autoscaled nodes as deleted. Simplified implementation to use apiv1.Node instead of new struct. Expanded test cases to include not autoscaled nodes and tracking deleted nodes over multiple updates.

Adding check to backfill loop to confirm cloud provider node no longer exists before flagging the node as deleted. Modifying some comments to be more accurate. Replacing erroneous line deletion.
  • Loading branch information
fookenc committed Oct 17, 2022
commit 776d7311a1f6d51c619ff03cb8680ae14476caf6
8 changes: 8 additions & 0 deletions cluster-autoscaler/cloudprovider/test/test_cloud_provider.go
Original file line number Diff line number Diff line change
@@ -253,6 +253,14 @@ func (tcp *TestCloudProvider) AddNode(nodeGroupId string, node *apiv1.Node) {
tcp.nodes[node.Name] = nodeGroupId
}

// DeleteNode delete the given node from the provider.
func (tcp *TestCloudProvider) DeleteNode(node *apiv1.Node) {
tcp.Lock()
defer tcp.Unlock()

delete(tcp.nodes, node.Name)
}

// GetResourceLimiter returns struct containing limits (max, min) for resources (cores, memory etc.).
func (tcp *TestCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) {
return tcp.resourceLimiter, nil
72 changes: 70 additions & 2 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
@@ -28,7 +28,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"

apiv1 "k8s.io/api/core/v1"
@@ -121,6 +120,7 @@ type ClusterStateRegistry struct {
acceptableRanges map[string]AcceptableRange
incorrectNodeGroupSizes map[string]IncorrectNodeGroupSize
unregisteredNodes map[string]UnregisteredNode
deletedNodes map[string]*apiv1.Node
candidatesForScaleDown map[string][]string
backoff backoff.Backoff
lastStatus *api.ClusterAutoscalerStatus
@@ -153,6 +153,7 @@ func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config C
acceptableRanges: make(map[string]AcceptableRange),
incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize),
unregisteredNodes: make(map[string]UnregisteredNode),
deletedNodes: make(map[string]*apiv1.Node),
candidatesForScaleDown: make(map[string][]string),
backoff: backoff,
lastStatus: emptyStatus,
@@ -292,6 +293,7 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr
}

cloudProviderNodeInstances, err := csr.getCloudProviderNodeInstances()
cloudProviderNodesRemoved := csr.getCloudProviderDeletedNodes(nodes, cloudProviderNodeInstances)
if err != nil {
return err
}
@@ -306,6 +308,7 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr
csr.cloudProviderNodeInstances = cloudProviderNodeInstances

csr.updateUnregisteredNodes(notRegistered)
csr.updateCloudProviderDeletedNodes(cloudProviderNodesRemoved)
csr.updateReadinessStats(currentTime)

// update acceptable ranges based on requests from last loop and targetSizes
@@ -541,7 +544,7 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {

update := func(current Readiness, node *apiv1.Node, nr kube_util.NodeReadiness) Readiness {
current.Registered++
if deletetaint.HasToBeDeletedTaint(node) {
if _, exists := csr.deletedNodes[node.Name]; exists {
current.Deleted++
} else if nr.Ready {
current.Ready++
@@ -669,6 +672,30 @@ func (csr *ClusterStateRegistry) GetUnregisteredNodes() []UnregisteredNode {
return result
}

func (csr *ClusterStateRegistry) updateCloudProviderDeletedNodes(deletedNodes []*apiv1.Node) {
result := make(map[string]*apiv1.Node)
fookenc marked this conversation as resolved.
Show resolved Hide resolved
for _, deleted := range deletedNodes {
if prev, found := csr.deletedNodes[deleted.Name]; found {
result[deleted.Name] = prev
fookenc marked this conversation as resolved.
Show resolved Hide resolved
} else {
result[deleted.Name] = deleted
}
}
csr.deletedNodes = result
}

//GetCloudProviderDeletedNodes returns a list of all nodes removed from cloud provider but registered in Kubernetes.
func (csr *ClusterStateRegistry) GetCloudProviderDeletedNodes() []*apiv1.Node {
csr.Lock()
defer csr.Unlock()

result := make([]*apiv1.Node, 0, len(csr.deletedNodes))
for _, deleted := range csr.deletedNodes {
result = append(result, deleted)
}
return result
}

// UpdateScaleDownCandidates updates scale down candidates
func (csr *ClusterStateRegistry) UpdateScaleDownCandidates(nodes []*apiv1.Node, now time.Time) {
result := make(map[string][]string)
@@ -958,6 +985,47 @@ func getNotRegisteredNodes(allNodes []*apiv1.Node, cloudProviderNodeInstances ma
return notRegistered
}

// Calculates which of the registered nodes in Kubernetes that do not exist in cloud provider.
func (csr *ClusterStateRegistry) getCloudProviderDeletedNodes(allNodes []*apiv1.Node, cloudProviderNodeInstances map[string][]cloudprovider.Instance) []*apiv1.Node {
nodesRemoved := make([]*apiv1.Node, 0)
currentCloudInstances := make(map[string]string, 0)
fookenc marked this conversation as resolved.
Show resolved Hide resolved
registeredNodes := make(map[string]*apiv1.Node, 0)
for nodeGroupName, instances := range cloudProviderNodeInstances {
for _, instance := range instances {
currentCloudInstances[instance.Id] = nodeGroupName
}
}
for _, node := range allNodes {
registeredNodes[node.Name] = node
}

// Fill previously deleted nodes, if they are still registered in Kubernetes
for nodeName, node := range csr.deletedNodes {
// Safety check to prevent flagging Kubernetes nodes as deleted
// if the Cloud Provider instance is re-discovered
_, cloudProviderFound := currentCloudInstances[node.Name]
if _, found := registeredNodes[nodeName]; found && !cloudProviderFound {
nodesRemoved = append(nodesRemoved, node)
}
}

// Seek nodes that may have been deleted since last update
// cloudProviderNodeInstances are retrieved by nodeGroup,
// not autoscaled nodes will be excluded
for _, instances := range csr.cloudProviderNodeInstances {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't relying on previous state cause issues in case of CA restart? IIUC some nodes will be incorrectly considered as Ready/Unready/NotStarted, potentially leading to bad scaling decisions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would definitely occur on a restart. Would it make sense to create a new taint and add it onto the nodes? There might be some additional overhead for the implementation, but should prevent that scenario.

I also thought about adding a check for each node in the deletedNodes (lines 1001 - 1006) to confirm it doesn't appear in the cloud provider nodes. I don't have good insight to know if that is a case that could happen though. I wouldn't want to keep flagging a node as deleted if the cloud provider node exists.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't particularly like the idea of introducing more taints to carry over CA internal state between restarts. That feels brittle and complex. I think maybe it is ok to just leave it as is: if the instance was deleted right before CA restart, there will be a slight risk of not scaling up until k8s API catches up with the deletion. This should be fine.

Re-checking deletedNodes would catch a case in which instance disappears from cloud provider due to some temporary problem and then comes back. Indeed now we would treat it as deleted forever and it would be better to avoid code that cannot recover from such scenario (even though it is unlikely).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. I agree that the API should eventually correct itself, and the scaling should only momentarily be impacted. I've created a new commit that includes the backfill safety check mentioned and some other minor cosmetic code changes.

for _, instance := range instances {
if _, found := currentCloudInstances[instance.Id]; !found {
// Check Kubernetes registered nodes for corresponding deleted
// Cloud Provider instance
if kubeNode, kubeNodeFound := registeredNodes[instance.Id]; kubeNodeFound {
nodesRemoved = append(nodesRemoved, kubeNode)
}
}
}
}
return nodesRemoved
}

// GetAutoscaledNodesCount calculates and returns the actual and the target number of nodes
// belonging to autoscaled node groups in the cluster.
func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetSize int) {
111 changes: 110 additions & 1 deletion cluster-autoscaler/clusterstate/clusterstate_test.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,8 @@ limitations under the License.
package clusterstate

import (
"fmt"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"testing"
"time"

@@ -481,14 +483,30 @@ func TestUpcomingNodes(t *testing.T) {
provider.AddNodeGroup("ng4", 1, 10, 1)
provider.AddNode("ng4", ng4_1)

// One node is already there, for a second nde deletion / draining was already started.
ng5_1 := BuildTestNode("ng5-1", 1000, 1000)
SetNodeReadyState(ng5_1, true, now.Add(-time.Minute))
ng5_2 := BuildTestNode("ng5-2", 1000, 1000)
SetNodeReadyState(ng5_2, true, now.Add(-time.Minute))
ng5_2.Spec.Taints = []apiv1.Taint{
{
Key: deletetaint.ToBeDeletedTaint,
Value: fmt.Sprint(time.Now().Unix()),
Effect: apiv1.TaintEffectNoSchedule,
},
}
provider.AddNodeGroup("ng5", 1, 10, 2)
provider.AddNode("ng5", ng5_1)
provider.AddNode("ng5", ng5_2)

assert.NotNil(t, provider)
fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1}, nil, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1, ng5_1, ng5_2}, nil, now)
assert.NoError(t, err)
assert.Empty(t, clusterstate.GetScaleUpFailures())

@@ -497,6 +515,7 @@ func TestUpcomingNodes(t *testing.T) {
assert.Equal(t, 1, upcomingNodes["ng2"])
assert.Equal(t, 2, upcomingNodes["ng3"])
assert.NotContains(t, upcomingNodes, "ng4")
assert.NotContains(t, upcomingNodes, "ng5")
}

func TestIncorrectSize(t *testing.T) {
@@ -570,6 +589,96 @@ func TestUnregisteredNodes(t *testing.T) {
assert.Equal(t, 0, len(clusterstate.GetUnregisteredNodes()))
}

func TestCloudProviderDeletedNodes(t *testing.T) {
now := time.Now()
ng1_1 := BuildTestNode("ng1-1", 1000, 1000)
SetNodeReadyState(ng1_1, true, now.Add(-time.Minute))
ng1_1.Spec.ProviderID = "ng1-1"
ng1_2 := BuildTestNode("ng1-2", 1000, 1000)
SetNodeReadyState(ng1_2, true, now.Add(-time.Minute))
ng1_2.Spec.ProviderID = "ng1-2"
// No Node Group - Not Autoscaled Node
noNg := BuildTestNode("no-ng", 1000, 1000)
SetNodeReadyState(noNg, true, now.Add(-time.Minute))
noNg.Spec.ProviderID = "no-ng"
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNode("ng1", ng1_1)
provider.AddNode("ng1", ng1_2)

fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: 10 * time.Second,
}, fakeLogRecorder, newBackoff())
now.Add(time.Minute)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, noNg}, nil, now)

// Nodes are registered correctly between Kubernetes and cloud provider.
assert.NoError(t, err)
assert.Equal(t, 0, len(clusterstate.GetCloudProviderDeletedNodes()))

// The node was removed from Cloud Provider
// should be counted as Deleted by cluster state
nodeGroup, err := provider.NodeGroupForNode(ng1_2)
assert.NoError(t, err)
provider.DeleteNode(ng1_2)
clusterstate.InvalidateNodeInstancesCacheEntry(nodeGroup)
now.Add(time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2, noNg}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetCloudProviderDeletedNodes()))
assert.Equal(t, "ng1-2", clusterstate.GetCloudProviderDeletedNodes()[0].Name)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)

// The node is removed from Kubernetes
now.Add(time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, noNg}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, len(clusterstate.GetCloudProviderDeletedNodes()))

// New Node is added afterwards
ng1_3 := BuildTestNode("ng1-3", 1000, 1000)
SetNodeReadyState(ng1_3, true, now.Add(-time.Minute))
ng1_3.Spec.ProviderID = "ng1-3"
provider.AddNode("ng1", ng1_3)
clusterstate.InvalidateNodeInstancesCacheEntry(nodeGroup)
now.Add(time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_3, noNg}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, len(clusterstate.GetCloudProviderDeletedNodes()))

// Newly added node is removed from Cloud Provider
// should be counted as Deleted by cluster state
nodeGroup, err = provider.NodeGroupForNode(ng1_3)
assert.NoError(t, err)
provider.DeleteNode(ng1_3)
clusterstate.InvalidateNodeInstancesCacheEntry(nodeGroup)
now.Add(time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, noNg, ng1_3}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetCloudProviderDeletedNodes()))
assert.Equal(t, "ng1-3", clusterstate.GetCloudProviderDeletedNodes()[0].Name)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)

// Confirm that previously identified deleted Cloud Provider nodes are still included
// until it is removed from Kubernetes
now.Add(time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, noNg, ng1_3}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetCloudProviderDeletedNodes()))
assert.Equal(t, "ng1-3", clusterstate.GetCloudProviderDeletedNodes()[0].Name)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)

// The node is removed from Kubernetes
now.Add(time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, noNg}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, len(clusterstate.GetCloudProviderDeletedNodes()))
}

func TestUpdateLastTransitionTimes(t *testing.T) {
now := metav1.Time{Time: time.Now()}
later := metav1.Time{Time: now.Time.Add(10 * time.Second)}