diff --git a/cluster-autoscaler/cloudprovider/test/test_cloud_provider.go b/cluster-autoscaler/cloudprovider/test/test_cloud_provider.go index 234d6853e55..af58f9e639a 100644 --- a/cluster-autoscaler/cloudprovider/test/test_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/test/test_cloud_provider.go @@ -253,6 +253,14 @@ func (tcp *TestCloudProvider) AddNode(nodeGroupId string, node *apiv1.Node) { tcp.nodes[node.Name] = nodeGroupId } +// DeleteNode delete the given node from the provider. +func (tcp *TestCloudProvider) DeleteNode(node *apiv1.Node) { + tcp.Lock() + defer tcp.Unlock() + + delete(tcp.nodes, node.Name) +} + // GetResourceLimiter returns struct containing limits (max, min) for resources (cores, memory etc.). func (tcp *TestCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) { return tcp.resourceLimiter, nil diff --git a/cluster-autoscaler/clusterstate/clusterstate.go b/cluster-autoscaler/clusterstate/clusterstate.go index 12edc67ba90..7eacc9e7be5 100644 --- a/cluster-autoscaler/clusterstate/clusterstate.go +++ b/cluster-autoscaler/clusterstate/clusterstate.go @@ -28,7 +28,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" - "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" apiv1 "k8s.io/api/core/v1" @@ -100,6 +99,15 @@ type UnregisteredNode struct { UnregisteredSince time.Time } +// DeletedNode contains information about nodes that have been removed from cluster provider side +// but are still registered in Kubernetes. +type DeletedNode struct { + // Node instance registered in Kubernetes. + Node *apiv1.Node + // DeletedSince is the time when the node was detected as deleted. + DeletedSince time.Time +} + // ScaleUpFailure contains information about a failure of a scale-up. type ScaleUpFailure struct { NodeGroup cloudprovider.NodeGroup @@ -121,6 +129,7 @@ type ClusterStateRegistry struct { acceptableRanges map[string]AcceptableRange incorrectNodeGroupSizes map[string]IncorrectNodeGroupSize unregisteredNodes map[string]UnregisteredNode + deletedNodes map[string]DeletedNode candidatesForScaleDown map[string][]string backoff backoff.Backoff lastStatus *api.ClusterAutoscalerStatus @@ -153,6 +162,7 @@ func NewClusterStateRegistry(cloudProvider cloudprovider.CloudProvider, config C acceptableRanges: make(map[string]AcceptableRange), incorrectNodeGroupSizes: make(map[string]IncorrectNodeGroupSize), unregisteredNodes: make(map[string]UnregisteredNode), + deletedNodes: make(map[string]DeletedNode), candidatesForScaleDown: make(map[string][]string), backoff: backoff, lastStatus: emptyStatus, @@ -296,7 +306,7 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr return err } notRegistered := getNotRegisteredNodes(nodes, cloudProviderNodeInstances, currentTime) - + cloudProviderNodesRemoved := getCloudProviderDeletedNodes(nodes, cloudProviderNodeInstances, currentTime) csr.Lock() defer csr.Unlock() @@ -306,6 +316,7 @@ func (csr *ClusterStateRegistry) UpdateNodes(nodes []*apiv1.Node, nodeInfosForGr csr.cloudProviderNodeInstances = cloudProviderNodeInstances csr.updateUnregisteredNodes(notRegistered) + csr.updateCloudProviderDeletedNodes(cloudProviderNodesRemoved) csr.updateReadinessStats(currentTime) // update acceptable ranges based on requests from last loop and targetSizes @@ -541,7 +552,7 @@ func (csr *ClusterStateRegistry) updateReadinessStats(currentTime time.Time) { update := func(current Readiness, node *apiv1.Node, nr kube_util.NodeReadiness) Readiness { current.Registered++ - if deletetaint.HasToBeDeletedTaint(node) { + if _, exists := csr.deletedNodes[node.Name]; exists { current.Deleted++ } else if nr.Ready { current.Ready++ @@ -669,6 +680,30 @@ func (csr *ClusterStateRegistry) GetUnregisteredNodes() []UnregisteredNode { return result } +func (csr *ClusterStateRegistry) updateCloudProviderDeletedNodes(deletedNodes []DeletedNode) { + result := make(map[string]DeletedNode) + for _, deleted := range deletedNodes { + if prev, found := csr.deletedNodes[deleted.Node.Name]; found { + result[deleted.Node.Name] = prev + } else { + result[deleted.Node.Name] = deleted + } + } + csr.deletedNodes = result +} + +//GetCloudProviderDeletedNodes returns a list of all nodes removed from cloud provider but registered in Kubernetes. +func (csr *ClusterStateRegistry) GetCloudProviderDeletedNodes() []DeletedNode { + csr.Lock() + defer csr.Unlock() + + result := make([]DeletedNode, 0, len(csr.deletedNodes)) + for _, deleted := range csr.deletedNodes { + result = append(result, deleted) + } + return result +} + // UpdateScaleDownCandidates updates scale down candidates func (csr *ClusterStateRegistry) UpdateScaleDownCandidates(nodes []*apiv1.Node, now time.Time) { result := make(map[string][]string) @@ -958,6 +993,26 @@ func getNotRegisteredNodes(allNodes []*apiv1.Node, cloudProviderNodeInstances ma return notRegistered } +// Calculates which of the registered nodes in Kubernetes that do not exist in cloud provider. +func getCloudProviderDeletedNodes(allNodes []*apiv1.Node, cloudProviderNodeInstances map[string][]cloudprovider.Instance, time time.Time) []DeletedNode { + cloudRegistered := sets.NewString() + for _, instances := range cloudProviderNodeInstances { + for _, instance := range instances { + cloudRegistered.Insert(instance.Id) + } + } + nodesRemoved := make([]DeletedNode, 0) + for _, node := range allNodes { + if !cloudRegistered.Has(node.Spec.ProviderID) { + nodesRemoved = append(nodesRemoved, DeletedNode{ + Node: node, + DeletedSince: time, + }) + } + } + return nodesRemoved +} + // GetAutoscaledNodesCount calculates and returns the actual and the target number of nodes // belonging to autoscaled node groups in the cluster. func (csr *ClusterStateRegistry) GetAutoscaledNodesCount() (currentSize, targetSize int) { diff --git a/cluster-autoscaler/clusterstate/clusterstate_test.go b/cluster-autoscaler/clusterstate/clusterstate_test.go index 65d3bc43c5f..ed79112543d 100644 --- a/cluster-autoscaler/clusterstate/clusterstate_test.go +++ b/cluster-autoscaler/clusterstate/clusterstate_test.go @@ -17,6 +17,7 @@ limitations under the License. package clusterstate import ( + "fmt" "testing" "time" @@ -27,6 +28,7 @@ import ( testprovider "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/test" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/api" "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" + "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" "k8s.io/client-go/kubernetes/fake" kube_record "k8s.io/client-go/tools/record" @@ -481,6 +483,22 @@ func TestUpcomingNodes(t *testing.T) { provider.AddNodeGroup("ng4", 1, 10, 1) provider.AddNode("ng4", ng4_1) + // One node is already there, for a second nde deletion / draining was already started. + ng5_1 := BuildTestNode("ng5-1", 1000, 1000) + SetNodeReadyState(ng5_1, true, now.Add(-time.Minute)) + ng5_2 := BuildTestNode("ng5-2", 1000, 1000) + SetNodeReadyState(ng5_2, true, now.Add(-time.Minute)) + ng5_2.Spec.Taints = []apiv1.Taint{ + { + Key: deletetaint.ToBeDeletedTaint, + Value: fmt.Sprint(time.Now().Unix()), + Effect: apiv1.TaintEffectNoSchedule, + }, + } + provider.AddNodeGroup("ng5", 1, 10, 2) + provider.AddNode("ng5", ng5_1) + provider.AddNode("ng5", ng5_2) + assert.NotNil(t, provider) fakeClient := &fake.Clientset{} fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") @@ -488,7 +506,7 @@ func TestUpcomingNodes(t *testing.T) { MaxTotalUnreadyPercentage: 10, OkTotalUnreadyCount: 1, }, fakeLogRecorder, newBackoff()) - err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1}, nil, now) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng2_1, ng3_1, ng4_1, ng5_1, ng5_2}, nil, now) assert.NoError(t, err) assert.Empty(t, clusterstate.GetScaleUpFailures()) @@ -497,6 +515,7 @@ func TestUpcomingNodes(t *testing.T) { assert.Equal(t, 1, upcomingNodes["ng2"]) assert.Equal(t, 2, upcomingNodes["ng3"]) assert.NotContains(t, upcomingNodes, "ng4") + assert.NotContains(t, upcomingNodes, "ng5") } func TestIncorrectSize(t *testing.T) { @@ -570,6 +589,53 @@ func TestUnregisteredNodes(t *testing.T) { assert.Equal(t, 0, len(clusterstate.GetUnregisteredNodes())) } +func TestCloudProviderDeletedNodes(t *testing.T) { + now := time.Now() + ng1_1 := BuildTestNode("ng1-1", 1000, 1000) + SetNodeReadyState(ng1_1, true, now.Add(-time.Minute)) + ng1_1.Spec.ProviderID = "ng1-1" + ng1_2 := BuildTestNode("ng1-2", 1000, 1000) + SetNodeReadyState(ng1_2, true, now.Add(-time.Minute)) + ng1_2.Spec.ProviderID = "ng1-2" + provider := testprovider.NewTestCloudProvider(nil, nil) + provider.AddNodeGroup("ng1", 1, 10, 2) + provider.AddNode("ng1", ng1_1) + provider.AddNode("ng1", ng1_2) + + fakeClient := &fake.Clientset{} + fakeLogRecorder, _ := utils.NewStatusMapRecorder(fakeClient, "kube-system", kube_record.NewFakeRecorder(5), false, "my-cool-configmap") + clusterstate := NewClusterStateRegistry(provider, ClusterStateRegistryConfig{ + MaxTotalUnreadyPercentage: 10, + OkTotalUnreadyCount: 1, + MaxNodeProvisionTime: 10 * time.Second, + }, fakeLogRecorder, newBackoff()) + now.Add(time.Minute) + err := clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, now) + + // Nodes are registered correctly between Kubernetes and cloud provider. + assert.NoError(t, err) + assert.Equal(t, 0, len(clusterstate.GetCloudProviderDeletedNodes())) + + // The node was removed from Cloud Provider + // should be counted as Deleted by cluster state + nodeGroup, err := provider.NodeGroupForNode(ng1_2) + assert.NoError(t, err) + provider.DeleteNode(ng1_2) + clusterstate.InvalidateNodeInstancesCacheEntry(nodeGroup) + now.Add(time.Minute) + err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1, ng1_2}, nil, now) + assert.NoError(t, err) + assert.Equal(t, 1, len(clusterstate.GetCloudProviderDeletedNodes())) + assert.Equal(t, "ng1-2", clusterstate.GetCloudProviderDeletedNodes()[0].Node.Name) + assert.Equal(t, 1, clusterstate.GetClusterReadiness().Deleted) + + // The node is removed from Kubernetes + now.Add(time.Minute) + err = clusterstate.UpdateNodes([]*apiv1.Node{ng1_1}, nil, now) + assert.NoError(t, err) + assert.Equal(t, 0, len(clusterstate.GetCloudProviderDeletedNodes())) +} + func TestUpdateLastTransitionTimes(t *testing.T) { now := metav1.Time{Time: time.Now()} later := metav1.Time{Time: now.Time.Add(10 * time.Second)} diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 84b09b65739..3a7bee5801a 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -191,14 +191,14 @@ func (a *StaticAutoscaler) cleanUpIfRequired() { } // CA can die at any time. Removing taints that might have been left from the previous run. - if readyNodes, err := a.ReadyNodeLister().List(); err != nil { - klog.Errorf("Failed to list ready nodes, not cleaning up taints: %v", err) + if nodes, err := a.AllNodeLister().List(); err != nil { + klog.Errorf("Failed to list nodes, not cleaning up taints: %v", err) } else { - deletetaint.CleanAllToBeDeleted(readyNodes, + deletetaint.CleanAllToBeDeleted(nodes, a.AutoscalingContext.ClientSet, a.Recorder, a.CordonNodeBeforeTerminate) if a.AutoscalingContext.AutoscalingOptions.MaxBulkSoftTaintCount == 0 { // Clean old taints if soft taints handling is disabled - deletetaint.CleanAllDeletionCandidates(readyNodes, + deletetaint.CleanAllDeletionCandidates(nodes, a.AutoscalingContext.ClientSet, a.Recorder) } }