Skip to content

Commit

Permalink
Merge pull request #4896 from fookenc/master
Browse files Browse the repository at this point in the history
Adding support for identifying nodes that have been deleted from cloud provider that are still registered within Kubernetes
  • Loading branch information
k8s-ci-robot authored Jul 4, 2022
2 parents de59b91 + ee80c93 commit af5fb07
Show file tree
Hide file tree
Showing 4 changed files with 137 additions and 8 deletions.
8 changes: 8 additions & 0 deletions cluster-autoscaler/cloudprovider/test/test_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,14 @@ func (tcp *TestCloudProvider) AddNode(nodeGroupId string, node *apiv1.Node) {
tcp.nodes[node.Name] = nodeGroupId
}

// DeleteNode delete the given node from the provider.
func (tcp *TestCloudProvider) DeleteNode(node *apiv1.Node) {
tcp.Lock()
defer tcp.Unlock()

delete(tcp.nodes, node.Name)
}

// GetResourceLimiter returns struct containing limits (max, min) for resources (cores, memory etc.).
func (tcp *TestCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) {
return tcp.resourceLimiter, nil
Expand Down
61 changes: 58 additions & 3 deletions cluster-autoscaler/clusterstate/clusterstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/backoff"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -100,6 +99,15 @@ type UnregisteredNode struct {
UnregisteredSince time.Time
}

// DeletedNode contains information about nodes that have been removed from cluster provider side
// but are still registered in Kubernetes.
type DeletedNode struct {
// Node instance registered in Kubernetes.
Node *apiv1.Node
// DeletedSince is the time when the node was detected as deleted.
DeletedSince time.Time
}

// ScaleUpFailure contains information about a failure of a scale-up.
type ScaleUpFailure struct {
NodeGroup cloudprovider.NodeGroup
Expand All @@ -121,6 +129,7 @@ type ClusterStateRegistry struct {
acceptableRanges map[string]AcceptableRange
incorrectNodeGroupSizes map[string]IncorrectNodeGroupSize
unregisteredNodes map[string]UnregisteredNode
deletedNodes map[string]DeletedNode
candidatesForScaleDown map[string][]string
backoff backoff.Backoff
lastStatus *api.ClusterAutoscalerStatus
Expand Down Expand Up @@ -153,6 +162,7 @@ func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config C
acceptableRanges: make(map[string]AcceptableRange),
incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize),
unregisteredNodes: make(map[string]UnregisteredNode),
deletedNodes: make(map[string]DeletedNode),
candidatesForScaleDown: make(map[string][]string),
backoff: backoff,
lastStatus: emptyStatus,
Expand Down Expand Up @@ -296,7 +306,7 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr
return err
}
notRegistered := getNotRegisteredNodes(nodes, cloudProviderNodeInstances, currentTime)

cloudProviderNodesRemoved := getCloudProviderDeletedNodes(nodes, cloudProviderNodeInstances, currentTime)
csr.Lock()
defer csr.Unlock()

Expand All @@ -306,6 +316,7 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr
csr.cloudProviderNodeInstances = cloudProviderNodeInstances

csr.updateUnregisteredNodes(notRegistered)
csr.updateCloudProviderDeletedNodes(cloudProviderNodesRemoved)
csr.updateReadinessStats(currentTime)

// update acceptable ranges based on requests from last loop and targetSizes
Expand Down Expand Up @@ -541,7 +552,7 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) {

update := func(current Readiness, node *apiv1.Node, nr kube_util.NodeReadiness) Readiness {
current.Registered++
if deletetaint.HasToBeDeletedTaint(node) {
if _, exists := csr.deletedNodes[node.Name]; exists {
current.Deleted++
} else if nr.Ready {
current.Ready++
Expand Down Expand Up @@ -669,6 +680,30 @@ func (csr *ClusterStateRegistry) GetUnregisteredNodes() []UnregisteredNode {
return result
}

func (csr *ClusterStateRegistry) updateCloudProviderDeletedNodes(deletedNodes []DeletedNode) {
result := make(map[string]DeletedNode)
for _, deleted := range deletedNodes {
if prev, found := csr.deletedNodes[deleted.Node.Name]; found {
result[deleted.Node.Name] = prev
} else {
result[deleted.Node.Name] = deleted
}
}
csr.deletedNodes = result
}

//GetCloudProviderDeletedNodes returns a list of all nodes removed from cloud provider but registered in Kubernetes.
func (csr *ClusterStateRegistry) GetCloudProviderDeletedNodes() []DeletedNode {
csr.Lock()
defer csr.Unlock()

result := make([]DeletedNode, 0, len(csr.deletedNodes))
for _, deleted := range csr.deletedNodes {
result = append(result, deleted)
}
return result
}

// UpdateScaleDownCandidates updates scale down candidates
func (csr *ClusterStateRegistry) UpdateScaleDownCandidates(nodes []*apiv1.Node, now time.Time) {
result := make(map[string][]string)
Expand Down Expand Up @@ -958,6 +993,26 @@ func getNotRegisteredNodes(allNodes []*apiv1.Node, cloudProviderNodeInstances ma
return notRegistered
}

// Calculates which of the registered nodes in Kubernetes that do not exist in cloud provider.
func getCloudProviderDeletedNodes(allNodes []*apiv1.Node, cloudProviderNodeInstances map[string][]cloudprovider.Instance, time time.Time) []DeletedNode {
cloudRegistered := sets.NewString()
for _, instances := range cloudProviderNodeInstances {
for _, instance := range instances {
cloudRegistered.Insert(instance.Id)
}
}
nodesRemoved := make([]DeletedNode, 0)
for _, node := range allNodes {
if !cloudRegistered.Has(node.Spec.ProviderID) {
nodesRemoved = append(nodesRemoved, DeletedNode{
Node: node,
DeletedSince: time,
})
}
}
return nodesRemoved
}

// GetAutoscaledNodesCount calculates and returns the actual and the target number of nodes
// belonging to autoscaled node groups in the cluster.
func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetSize int) {
Expand Down
68 changes: 67 additions & 1 deletion cluster-autoscaler/clusterstate/clusterstate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package clusterstate

import (
"fmt"
"testing"
"time"

Expand All @@ -27,6 +28,7 @@ import (
testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/api"
"k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
"k8s.io/client-go/kubernetes/fake"
kube_record "k8s.io/client-go/tools/record"
Expand Down Expand Up @@ -481,14 +483,30 @@ func TestUpcomingNodes(t *testing.T) {
provider.AddNodeGroup("ng4", 1, 10, 1)
provider.AddNode("ng4", ng4_1)

// One node is already there, for a second nde deletion / draining was already started.
ng5_1 := BuildTestNode("ng5-1", 1000, 1000)
SetNodeReadyState(ng5_1, true, now.Add(-time.Minute))
ng5_2 := BuildTestNode("ng5-2", 1000, 1000)
SetNodeReadyState(ng5_2, true, now.Add(-time.Minute))
ng5_2.Spec.Taints = []apiv1.Taint{
{
Key: deletetaint.ToBeDeletedTaint,
Value: fmt.Sprint(time.Now().Unix()),
Effect: apiv1.TaintEffectNoSchedule,
},
}
provider.AddNodeGroup("ng5", 1, 10, 2)
provider.AddNode("ng5", ng5_1)
provider.AddNode("ng5", ng5_2)

assert.NotNil(t, provider)
fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
}, fakeLogRecorder, newBackoff())
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1}, nil, now)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1, ng5_1, ng5_2}, nil, now)
assert.NoError(t, err)
assert.Empty(t, clusterstate.GetScaleUpFailures())

Expand All @@ -497,6 +515,7 @@ func TestUpcomingNodes(t *testing.T) {
assert.Equal(t, 1, upcomingNodes["ng2"])
assert.Equal(t, 2, upcomingNodes["ng3"])
assert.NotContains(t, upcomingNodes, "ng4")
assert.NotContains(t, upcomingNodes, "ng5")
}

func TestIncorrectSize(t *testing.T) {
Expand Down Expand Up @@ -570,6 +589,53 @@ func TestUnregisteredNodes(t *testing.T) {
assert.Equal(t, 0, len(clusterstate.GetUnregisteredNodes()))
}

func TestCloudProviderDeletedNodes(t *testing.T) {
now := time.Now()
ng1_1 := BuildTestNode("ng1-1", 1000, 1000)
SetNodeReadyState(ng1_1, true, now.Add(-time.Minute))
ng1_1.Spec.ProviderID = "ng1-1"
ng1_2 := BuildTestNode("ng1-2", 1000, 1000)
SetNodeReadyState(ng1_2, true, now.Add(-time.Minute))
ng1_2.Spec.ProviderID = "ng1-2"
provider := testprovider.NewTestCloudProvider(nil, nil)
provider.AddNodeGroup("ng1", 1, 10, 2)
provider.AddNode("ng1", ng1_1)
provider.AddNode("ng1", ng1_2)

fakeClient := &fake.Clientset{}
fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap")
clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{
MaxTotalUnreadyPercentage: 10,
OkTotalUnreadyCount: 1,
MaxNodeProvisionTime: 10 * time.Second,
}, fakeLogRecorder, newBackoff())
now.Add(time.Minute)
err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, now)

// Nodes are registered correctly between Kubernetes and cloud provider.
assert.NoError(t, err)
assert.Equal(t, 0, len(clusterstate.GetCloudProviderDeletedNodes()))

// The node was removed from Cloud Provider
// should be counted as Deleted by cluster state
nodeGroup, err := provider.NodeGroupForNode(ng1_2)
assert.NoError(t, err)
provider.DeleteNode(ng1_2)
clusterstate.InvalidateNodeInstancesCacheEntry(nodeGroup)
now.Add(time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 1, len(clusterstate.GetCloudProviderDeletedNodes()))
assert.Equal(t, "ng1-2", clusterstate.GetCloudProviderDeletedNodes()[0].Node.Name)
assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted)

// The node is removed from Kubernetes
now.Add(time.Minute)
err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now)
assert.NoError(t, err)
assert.Equal(t, 0, len(clusterstate.GetCloudProviderDeletedNodes()))
}

func TestUpdateLastTransitionTimes(t *testing.T) {
now := metav1.Time{Time: time.Now()}
later := metav1.Time{Time: now.Time.Add(10 * time.Second)}
Expand Down
8 changes: 4 additions & 4 deletions cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,14 +191,14 @@ func (a *StaticAutoscaler) cleanUpIfRequired() {
}

// CA can die at any time. Removing taints that might have been left from the previous run.
if readyNodes, err := a.ReadyNodeLister().List(); err != nil {
klog.Errorf("Failed to list ready nodes, not cleaning up taints: %v", err)
if nodes, err := a.AllNodeLister().List(); err != nil {
klog.Errorf("Failed to list nodes, not cleaning up taints: %v", err)
} else {
deletetaint.CleanAllToBeDeleted(readyNodes,
deletetaint.CleanAllToBeDeleted(nodes,
a.AutoscalingContext.ClientSet, a.Recorder, a.CordonNodeBeforeTerminate)
if a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount == 0 {
// Clean old taints if soft taints handling is disabled
deletetaint.CleanAllDeletionCandidates(readyNodes,
deletetaint.CleanAllDeletionCandidates(nodes,
a.AutoscalingContext.ClientSet, a.Recorder)
}
}
Expand Down

0 comments on commit af5fb07

Please sign in to comment.