From 4a5d2813f49722f9ac598f57634e00a5fde67f20 Mon Sep 17 00:00:00 2001 From: Ravi Sinha Date: Mon, 10 Jun 2024 09:08:18 -0700 Subject: [PATCH 1/6] This merge resolves an issue in the Kubernetes Cluster Autoscaler where actual instances within AWS Auto Scaling Groups (ASGs) were incorrectly decommissioned instead of placeholders. The updates ensure that placeholders are exclusively targeted for scaling down under conditions where recent scaling activities have failed. This prevents the accidental termination of active nodes and enhances the reliability of the autoscaler in AWS environments. --- .../cloudprovider/aws/auto_scaling_groups.go | 127 ++++++++++++++++++ .../aws/aws_cloud_provider_test.go | 98 ++++++++++++++ 2 files changed, 225 insertions(+) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 4667e285b15..a8d3fc48c29 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -308,6 +308,54 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { } } + var isRecentScalingActivitySuccess = false + var err error + + placeHolderInstancesCount := m.GetPlaceHolderInstancesCount(instances) + // Check if there are any placeholder instances in the list. + if placeHolderInstancesCount > 0 { + // Log the check for placeholders in the ASG. + klog.V(4).Infof("Detected %d placeholder instance(s), checking recent scaling activity for ASG %s", + placeHolderInstancesCount, commonAsg.Name) + + // Retrieve the most recent scaling activity to determine its success state. + isRecentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg) + + // Handle errors from retrieving scaling activity. + if err != nil { + // Log the error if the scaling activity check fails and return the error. + klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err) + return err // Return error to prevent further processing with uncertain state information. + } + + if !isRecentScalingActivitySuccess { + asgDetail, err := m.getDescribeAutoScalingGroupResults(commonAsg) + + if err != nil { + klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err) + return err + } + + activeInstancesInAsg := len(asgDetail.Instances) + desiredCapacityInAsg := int(*asgDetail.DesiredCapacity) + klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d ", + commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg) + + // If the difference between the active instances and the desired capacity is greater than 1, + // it means that the ASG is under-provisioned and the desired capacity is not being reached. + // In this case, we would reduce the size of ASG by the count of unprovisioned instances + // which is equal to the total count of active instances in ASG + + err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg) + + if err != nil { + klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err) + return err + } + return nil + } + } + for _, instance := range instances { // check if the instance is a placeholder - a requested instance that was never created by the node group // if it is, just decrease the size of the node group, as there's no specific instance we can remove @@ -352,6 +400,33 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { return nil } +func (m *asgCache) getDescribeAutoScalingGroupResults(commonAsg *asg) (*autoscaling.Group, error) { + asgs := make([]*autoscaling.Group, 0) + commonAsgNames := []string{commonAsg.Name} + input := &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice(commonAsgNames), + MaxRecords: aws.Int64(100), + } + + err := m.awsService.DescribeAutoScalingGroupsPages(input, func(output *autoscaling.DescribeAutoScalingGroupsOutput, _ bool) bool { + asgs = append(asgs, output.AutoScalingGroups...) + // We return true while we want to be called with the next page of + // results, if any. + return false + }) + + if err != nil { + klog.Errorf("Failed while performing DescribeAutoScalingGroupsPages: %v", err) + return nil, err + } + + if len(asgs) == 0 { + return nil, fmt.Errorf("no ASGs found for %s", commonAsgNames) + } + + return asgs[0], nil +} + // isPlaceholderInstance checks if the given instance is only a placeholder func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool { return strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) @@ -624,3 +699,55 @@ func (m *asgCache) buildInstanceRefFromAWS(instance *autoscaling.Instance) AwsIn func (m *asgCache) Cleanup() { close(m.interrupt) } + +func (m *asgCache) getMostRecentScalingActivity(asg *asg) (bool, error) { + input := &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String(asg.Name), + MaxRecords: aws.Int64(1), + } + + var response *autoscaling.DescribeScalingActivitiesOutput + var err error + attempts := 3 + + for i := 0; i < attempts; i++ { + response, err = m.awsService.DescribeScalingActivities(input) + if err == nil { + break + } + klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err) + time.Sleep(time.Second * 2) + } + + if err != nil { + klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err) + return false, err + } + + if len(response.Activities) == 0 { + klog.Info("No scaling activities found for ASG:", asg.Name) + return false, nil + } + + lastActivity := response.Activities[0] + if *lastActivity.StatusCode == "Successful" { + klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name) + return true, nil + } else { + klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage) + return false, nil + } +} + +// GetPlaceHolderInstancesCount returns count of placeholder instances in the cache +func (m *asgCache) GetPlaceHolderInstancesCount(instances []*AwsInstanceRef) int { + + placeholderInstancesCount := 0 + for _, instance := range instances { + if strings.HasPrefix(instance.Name, placeholderInstanceNamePrefix) { + placeholderInstancesCount++ + + } + } + return placeholderInstancesCount +} diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 0033d27c68e..e2e425b068e 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -18,6 +18,7 @@ package aws import ( "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -568,6 +569,22 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { HonorCooldown: aws.Bool(false), }).Return(&autoscaling.SetDesiredCapacityOutput{}) + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }, + ).Return( + &autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Successful"), + StatusMessage: aws.String("Successful"), + StartTime: aws.Time(time.Now().Add(-30 * time.Minute)), + }, + }, + }, nil) + // Look up the current number of instances... var expectedInstancesCount int64 = 2 a.On("DescribeAutoScalingGroupsPages", @@ -739,3 +756,84 @@ func TestHasInstance(t *testing.T) { assert.NoError(t, err) assert.False(t, present) } + +// write unit test for DeleteInstances function +func TestDeleteInstances_scalingActivityFailure(t *testing.T) { + + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) + + asgs := provider.NodeGroups() + a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ + AutoScalingGroupName: aws.String(asgs[0].Id()), + DesiredCapacity: aws.Int64(1), + HonorCooldown: aws.Bool(false), + }).Return(&autoscaling.SetDesiredCapacityOutput{}) + var expectedInstancesCount int64 = 5 + a.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), + MaxRecords: aws.Int64(100), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "i-0c257f8f05fd1c64b", "i-0c257f8f05fd1c64c", "i-0c257f8f05fd1c64d"), false) + // we expect the instance count to be 1 after the call to DeleteNodes + //expectedInstancesCount = + }).Return(nil) + + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + MaxRecords: aws.Int64(1), + }, + ).Return( + &autoscaling.DescribeScalingActivitiesOutput{ + Activities: []*autoscaling.Activity{ + { + StatusCode: aws.String("Failed"), + StatusMessage: aws.String("Launching a new EC2 instance. Status Reason: We currently do not have sufficient p5.48xlarge capacity in zones with support for 'gp2' volumes. Our system will be working on provisioning additional capacity. Launching EC2 instance failed.\t"), + StartTime: aws.Time(time.Now().Add(-30 * time.Minute)), + }, + }, + }, nil) + + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + + a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ + AutoScalingGroupName: aws.String(asgs[0].Id()), + DesiredCapacity: aws.Int64(3), + HonorCooldown: aws.Bool(false), + }).Return(&autoscaling.SetDesiredCapacityOutput{}) + + provider.Refresh() + + initialSize, err := asgs[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 5, initialSize) + + nodes := []*apiv1.Node{} + asgToInstances := provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}] + for _, instance := range asgToInstances { + nodes = append(nodes, &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: instance.ProviderID, + }, + }) + } + + err = asgs[0].DeleteNodes(nodes) + assert.NoError(t, err) + a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) + + newSize, err := asgs[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 3, newSize) + +} From 9a0830eb01915a01bbcd6c74e95ccbb0a54e5be4 Mon Sep 17 00:00:00 2001 From: Ravi Sinha Date: Mon, 10 Jun 2024 11:36:20 -0700 Subject: [PATCH 2/6] handling placeholder instances with no scaling activities error --- .../cloudprovider/aws/auto_scaling_groups.go | 176 +++++------------- .../aws/aws_cloud_provider_test.go | 103 +--------- 2 files changed, 47 insertions(+), 232 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index a8d3fc48c29..ea744de0621 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -308,9 +308,6 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { } } - var isRecentScalingActivitySuccess = false - var err error - placeHolderInstancesCount := m.GetPlaceHolderInstancesCount(instances) // Check if there are any placeholder instances in the list. if placeHolderInstancesCount > 0 { @@ -318,113 +315,69 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { klog.V(4).Infof("Detected %d placeholder instance(s), checking recent scaling activity for ASG %s", placeHolderInstancesCount, commonAsg.Name) - // Retrieve the most recent scaling activity to determine its success state. - isRecentScalingActivitySuccess, err = m.getMostRecentScalingActivity(commonAsg) + asgNames := []string{commonAsg.Name} + asgDetail, err := m.awsService.getAutoscalingGroupsByNames(asgNames) - // Handle errors from retrieving scaling activity. if err != nil { - // Log the error if the scaling activity check fails and return the error. - klog.Errorf("Error retrieving scaling activity for ASG %s: %v", commonAsg.Name, err) - return err // Return error to prevent further processing with uncertain state information. + klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err) + return err } - if !isRecentScalingActivitySuccess { - asgDetail, err := m.getDescribeAutoScalingGroupResults(commonAsg) - - if err != nil { - klog.Errorf("Error retrieving ASG details %s: %v", commonAsg.Name, err) - return err - } + activeInstancesInAsg := len(asgDetail[0].Instances) + desiredCapacityInAsg := int(*asgDetail[0].DesiredCapacity) + klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d. updating ASG to match active instances count", + commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg) - activeInstancesInAsg := len(asgDetail.Instances) - desiredCapacityInAsg := int(*asgDetail.DesiredCapacity) - klog.V(4).Infof("asg %s has placeholders instances with desired capacity = %d and active instances = %d ", - commonAsg.Name, desiredCapacityInAsg, activeInstancesInAsg) + // If the difference between the active instances and the desired capacity is greater than 1, + // it means that the ASG is under-provisioned and the desired capacity is not being reached. + // In this case, we would reduce the size of ASG by the count of unprovisioned instances + // which is equal to the total count of active instances in ASG - // If the difference between the active instances and the desired capacity is greater than 1, - // it means that the ASG is under-provisioned and the desired capacity is not being reached. - // In this case, we would reduce the size of ASG by the count of unprovisioned instances - // which is equal to the total count of active instances in ASG + err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg) - err = m.setAsgSizeNoLock(commonAsg, activeInstancesInAsg) - - if err != nil { - klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err) - return err - } - return nil + if err != nil { + klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err) + return err } + return nil } for _, instance := range instances { - // check if the instance is a placeholder - a requested instance that was never created by the node group - // if it is, just decrease the size of the node group, as there's no specific instance we can remove - if m.isPlaceholderInstance(instance) { - klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+ - "of deleting instance", instance.Name) - m.decreaseAsgSizeByOneNoLock(commonAsg) - } else { - // check if the instance is already terminating - if it is, don't bother terminating again - // as doing so causes unnecessary API calls and can cause the curSize cached value to decrement - // unnecessarily. - lifecycle, err := m.findInstanceLifecycle(*instance) - if err != nil { - return err - } - if lifecycle != nil && - *lifecycle == autoscaling.LifecycleStateTerminated || - *lifecycle == autoscaling.LifecycleStateTerminating || - *lifecycle == autoscaling.LifecycleStateTerminatingWait || - *lifecycle == autoscaling.LifecycleStateTerminatingProceed { - klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle) - continue - } - - params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{ - InstanceId: aws.String(instance.Name), - ShouldDecrementDesiredCapacity: aws.Bool(true), - } - start := time.Now() - resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params) - observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start) - if err != nil { - return err - } - klog.V(4).Infof(*resp.Activity.Description) - - // Proactively decrement the size so autoscaler makes better decisions - commonAsg.curSize-- + // check if the instance is already terminating - if it is, don't bother terminating again + // as doing so causes unnecessary API calls and can cause the curSize cached value to decrement + // unnecessarily. + lifecycle, err := m.findInstanceLifecycle(*instance) + if err != nil { + return err } - } - return nil -} -func (m *asgCache) getDescribeAutoScalingGroupResults(commonAsg *asg) (*autoscaling.Group, error) { - asgs := make([]*autoscaling.Group, 0) - commonAsgNames := []string{commonAsg.Name} - input := &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice(commonAsgNames), - MaxRecords: aws.Int64(100), - } + if lifecycle != nil && + *lifecycle == autoscaling.LifecycleStateTerminated || + *lifecycle == autoscaling.LifecycleStateTerminating || + *lifecycle == autoscaling.LifecycleStateTerminatingWait || + *lifecycle == autoscaling.LifecycleStateTerminatingProceed { + klog.V(2).Infof("instance %s is already terminating in state %s, will skip instead", instance.Name, *lifecycle) + continue + } - err := m.awsService.DescribeAutoScalingGroupsPages(input, func(output *autoscaling.DescribeAutoScalingGroupsOutput, _ bool) bool { - asgs = append(asgs, output.AutoScalingGroups...) - // We return true while we want to be called with the next page of - // results, if any. - return false - }) + params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{ + InstanceId: aws.String(instance.Name), + ShouldDecrementDesiredCapacity: aws.Bool(true), + } + start := time.Now() + resp, err := m.awsService.TerminateInstanceInAutoScalingGroup(params) + observeAWSRequest("TerminateInstanceInAutoScalingGroup", err, start) + if err != nil { + return err + } + klog.V(4).Infof(*resp.Activity.Description) - if err != nil { - klog.Errorf("Failed while performing DescribeAutoScalingGroupsPages: %v", err) - return nil, err - } + // Proactively decrement the size so autoscaler makes better decisions + commonAsg.curSize-- - if len(asgs) == 0 { - return nil, fmt.Errorf("no ASGs found for %s", commonAsgNames) } - - return asgs[0], nil + return nil } // isPlaceholderInstance checks if the given instance is only a placeholder @@ -700,45 +653,6 @@ func (m *asgCache) Cleanup() { close(m.interrupt) } -func (m *asgCache) getMostRecentScalingActivity(asg *asg) (bool, error) { - input := &autoscaling.DescribeScalingActivitiesInput{ - AutoScalingGroupName: aws.String(asg.Name), - MaxRecords: aws.Int64(1), - } - - var response *autoscaling.DescribeScalingActivitiesOutput - var err error - attempts := 3 - - for i := 0; i < attempts; i++ { - response, err = m.awsService.DescribeScalingActivities(input) - if err == nil { - break - } - klog.V(2).Infof("Failed to describe scaling activities, attempt %d/%d: %v", i+1, attempts, err) - time.Sleep(time.Second * 2) - } - - if err != nil { - klog.Errorf("All attempts failed for DescribeScalingActivities: %v", err) - return false, err - } - - if len(response.Activities) == 0 { - klog.Info("No scaling activities found for ASG:", asg.Name) - return false, nil - } - - lastActivity := response.Activities[0] - if *lastActivity.StatusCode == "Successful" { - klog.Infof("Most recent scaling activity for ASG %s was successful", asg.Name) - return true, nil - } else { - klog.Infof("Most recent scaling activity for ASG %s was not successful: %s", asg.Name, *lastActivity.StatusMessage) - return false, nil - } -} - // GetPlaceHolderInstancesCount returns count of placeholder instances in the cache func (m *asgCache) GetPlaceHolderInstancesCount(instances []*AwsInstanceRef) int { diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index e2e425b068e..d375dd7aa23 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -17,9 +17,6 @@ limitations under the License. package aws import ( - "testing" - "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" apiv1 "k8s.io/api/core/v1" @@ -28,6 +25,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling" "k8s.io/autoscaler/cluster-autoscaler/config" + "testing" ) var testAwsManager = &AwsManager{ @@ -569,22 +567,6 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { HonorCooldown: aws.Bool(false), }).Return(&autoscaling.SetDesiredCapacityOutput{}) - a.On("DescribeScalingActivities", - &autoscaling.DescribeScalingActivitiesInput{ - AutoScalingGroupName: aws.String("test-asg"), - MaxRecords: aws.Int64(1), - }, - ).Return( - &autoscaling.DescribeScalingActivitiesOutput{ - Activities: []*autoscaling.Activity{ - { - StatusCode: aws.String("Successful"), - StatusMessage: aws.String("Successful"), - StartTime: aws.Time(time.Now().Add(-30 * time.Minute)), - }, - }, - }, nil) - // Look up the current number of instances... var expectedInstancesCount int64 = 2 a.On("DescribeAutoScalingGroupsPages", @@ -620,7 +602,7 @@ func TestDeleteNodesWithPlaceholder(t *testing.T) { err = asgs[0].DeleteNodes([]*apiv1.Node{node}) assert.NoError(t, err) a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) - a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) newSize, err := asgs[0].TargetSize() assert.NoError(t, err) @@ -756,84 +738,3 @@ func TestHasInstance(t *testing.T) { assert.NoError(t, err) assert.False(t, present) } - -// write unit test for DeleteInstances function -func TestDeleteInstances_scalingActivityFailure(t *testing.T) { - - a := &autoScalingMock{} - provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:5:test-asg"})) - - asgs := provider.NodeGroups() - a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ - AutoScalingGroupName: aws.String(asgs[0].Id()), - DesiredCapacity: aws.Int64(1), - HonorCooldown: aws.Bool(false), - }).Return(&autoscaling.SetDesiredCapacityOutput{}) - var expectedInstancesCount int64 = 5 - a.On("DescribeAutoScalingGroupsPages", - &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), - MaxRecords: aws.Int64(100), - }, - mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), - ).Run(func(args mock.Arguments) { - fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) - fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "i-0c257f8f05fd1c64b", "i-0c257f8f05fd1c64c", "i-0c257f8f05fd1c64d"), false) - // we expect the instance count to be 1 after the call to DeleteNodes - //expectedInstancesCount = - }).Return(nil) - - a.On("DescribeScalingActivities", - &autoscaling.DescribeScalingActivitiesInput{ - AutoScalingGroupName: aws.String("test-asg"), - MaxRecords: aws.Int64(1), - }, - ).Return( - &autoscaling.DescribeScalingActivitiesOutput{ - Activities: []*autoscaling.Activity{ - { - StatusCode: aws.String("Failed"), - StatusMessage: aws.String("Launching a new EC2 instance. Status Reason: We currently do not have sufficient p5.48xlarge capacity in zones with support for 'gp2' volumes. Our system will be working on provisioning additional capacity. Launching EC2 instance failed.\t"), - StartTime: aws.Time(time.Now().Add(-30 * time.Minute)), - }, - }, - }, nil) - - a.On("DescribeScalingActivities", - &autoscaling.DescribeScalingActivitiesInput{ - AutoScalingGroupName: aws.String("test-asg"), - }, - ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) - - a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ - AutoScalingGroupName: aws.String(asgs[0].Id()), - DesiredCapacity: aws.Int64(3), - HonorCooldown: aws.Bool(false), - }).Return(&autoscaling.SetDesiredCapacityOutput{}) - - provider.Refresh() - - initialSize, err := asgs[0].TargetSize() - assert.NoError(t, err) - assert.Equal(t, 5, initialSize) - - nodes := []*apiv1.Node{} - asgToInstances := provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}] - for _, instance := range asgToInstances { - nodes = append(nodes, &apiv1.Node{ - Spec: apiv1.NodeSpec{ - ProviderID: instance.ProviderID, - }, - }) - } - - err = asgs[0].DeleteNodes(nodes) - assert.NoError(t, err) - a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) - a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) - - newSize, err := asgs[0].TargetSize() - assert.NoError(t, err) - assert.Equal(t, 3, newSize) - -} From b7ba76feddddd845b095ca73d18cce7f1b5ae27e Mon Sep 17 00:00:00 2001 From: Ravi Sinha Date: Thu, 20 Jun 2024 15:10:33 -0700 Subject: [PATCH 3/6] handling deletion of actual instances along with placeholders --- .../cloudprovider/aws/auto_scaling_groups.go | 6 +- .../aws/aws_cloud_provider_test.go | 112 +++++++++++++++++- 2 files changed, 116 insertions(+), 2 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index ea744de0621..77e7b0b5795 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -339,11 +339,15 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { klog.Errorf("Error reducing ASG %s size to %d: %v", commonAsg.Name, activeInstancesInAsg, err) return err } - return nil } for _, instance := range instances { + if m.isPlaceholderInstance(instance) { + // skipping placeholder as placeholder instances don't exist + // and we have already reduced ASG size during placeholder check. + continue + } // check if the instance is already terminating - if it is, don't bother terminating again // as doing so causes unnecessary API calls and can cause the curSize cached value to decrement // unnecessarily. diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index d375dd7aa23..09e4de361f0 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -17,6 +17,8 @@ limitations under the License. package aws import ( + "testing" + "fmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" apiv1 "k8s.io/api/core/v1" @@ -25,7 +27,6 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling" "k8s.io/autoscaler/cluster-autoscaler/config" - "testing" ) var testAwsManager = &AwsManager{ @@ -738,3 +739,112 @@ func TestHasInstance(t *testing.T) { assert.NoError(t, err) assert.False(t, present) } + +func TestDeleteNodesWithPlaceholderAndIncorrectCache(t *testing.T) { + // This test validates the scenario where ASG cache is not in sync with Autoscaling configuration. + // we are taking an example where ASG size is 10, cache as 3 instances "i-0000", "i-0001" and "i-0002 + // But ASG has 6 instances i-0000 to i-10005. When DeleteInstances is called with 2 instances ("i-0000", "i-0001" ) + // and placeholders, CAS will terminate only these 2 instances after reducing ASG size by the count of placeholders + + a := &autoScalingMock{} + provider := testProvider(t, newTestAwsManagerWithAsgs(t, a, nil, []string{"1:10:test-asg"})) + asgs := provider.NodeGroups() + commonAsg := &asg{ + AwsRef: AwsRef{Name: asgs[0].Id()}, + minSize: asgs[0].MinSize(), + maxSize: asgs[0].MaxSize(), + } + + // desired capacity will be set as 6 as ASG has 4 placeholders + a.On("SetDesiredCapacity", &autoscaling.SetDesiredCapacityInput{ + AutoScalingGroupName: aws.String(asgs[0].Id()), + DesiredCapacity: aws.Int64(6), + HonorCooldown: aws.Bool(false), + }).Return(&autoscaling.SetDesiredCapacityOutput{}) + + // Look up the current number of instances... + var expectedInstancesCount int64 = 10 + a.On("DescribeAutoScalingGroupsPages", + &autoscaling.DescribeAutoScalingGroupsInput{ + AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}), + MaxRecords: aws.Int64(maxRecordsReturnedByAPI), + }, + mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), + ).Run(func(args mock.Arguments) { + fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) + fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "i-0000", "i-0001", "i-0002", "i-0003", "i-0004", "i-0005"), false) + + expectedInstancesCount = 4 + }).Return(nil) + + a.On("DescribeScalingActivities", + &autoscaling.DescribeScalingActivitiesInput{ + AutoScalingGroupName: aws.String("test-asg"), + }, + ).Return(&autoscaling.DescribeScalingActivitiesOutput{}, nil) + + provider.Refresh() + + initialSize, err := asgs[0].TargetSize() + assert.NoError(t, err) + assert.Equal(t, 10, initialSize) + + var awsInstanceRefs []AwsInstanceRef + instanceToAsg := make(map[AwsInstanceRef]*asg) + + var nodes []*apiv1.Node + for i := 3; i <= 9; i++ { + providerId := fmt.Sprintf("aws:///us-east-1a/i-placeholder-test-asg-%d", i) + node := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: providerId, + }, + } + nodes = append(nodes, node) + awsInstanceRef := AwsInstanceRef{ + ProviderID: providerId, + Name: fmt.Sprintf("i-placeholder-test-asg-%d", i), + } + awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef) + instanceToAsg[awsInstanceRef] = commonAsg + } + + for i := 0; i <= 2; i++ { + providerId := fmt.Sprintf("aws:///us-east-1a/i-000%d", i) + node := &apiv1.Node{ + Spec: apiv1.NodeSpec{ + ProviderID: providerId, + }, + } + // only setting 2 instances to be terminated out of 3 active instances + if i < 2 { + nodes = append(nodes, node) + a.On("TerminateInstanceInAutoScalingGroup", &autoscaling.TerminateInstanceInAutoScalingGroupInput{ + InstanceId: aws.String(fmt.Sprintf("i-000%d", i)), + ShouldDecrementDesiredCapacity: aws.Bool(true), + }).Return(&autoscaling.TerminateInstanceInAutoScalingGroupOutput{ + Activity: &autoscaling.Activity{Description: aws.String("Deleted instance")}, + }) + } + awsInstanceRef := AwsInstanceRef{ + ProviderID: providerId, + Name: fmt.Sprintf("i-000%d", i), + } + awsInstanceRefs = append(awsInstanceRefs, awsInstanceRef) + instanceToAsg[awsInstanceRef] = commonAsg + } + + // modifying provider to have incorrect information than ASG current state + provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}] = awsInstanceRefs + provider.awsManager.asgCache.instanceToAsg = instanceToAsg + + // calling delete nodes 2 nodes and remaining placeholders + err = asgs[0].DeleteNodes(nodes) + assert.NoError(t, err) + a.AssertNumberOfCalls(t, "SetDesiredCapacity", 1) + a.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2) + + // This ensures only 2 instances are terminated which are mocked in this unit test + a.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 2) + +} From d9f8217e54c7a7a97ab24b7d733d5c30fd88aa02 Mon Sep 17 00:00:00 2001 From: Ravi Sinha Date: Fri, 21 Jun 2024 12:21:01 -0700 Subject: [PATCH 4/6] updating unit test name --- .../cloudprovider/aws/aws_cloud_provider_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 09e4de361f0..80ae85b5325 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -740,7 +740,7 @@ func TestHasInstance(t *testing.T) { assert.False(t, present) } -func TestDeleteNodesWithPlaceholderAndIncorrectCache(t *testing.T) { +func TestDeleteNodesWithPlaceholderAndStaleCache(t *testing.T) { // This test validates the scenario where ASG cache is not in sync with Autoscaling configuration. // we are taking an example where ASG size is 10, cache as 3 instances "i-0000", "i-0001" and "i-0002 // But ASG has 6 instances i-0000 to i-10005. When DeleteInstances is called with 2 instances ("i-0000", "i-0001" ) @@ -834,7 +834,7 @@ func TestDeleteNodesWithPlaceholderAndIncorrectCache(t *testing.T) { instanceToAsg[awsInstanceRef] = commonAsg } - // modifying provider to have incorrect information than ASG current state + // modifying provider to bring disparity between ASG and cache provider.awsManager.asgCache.asgToInstances[AwsRef{Name: "test-asg"}] = awsInstanceRefs provider.awsManager.asgCache.instanceToAsg = instanceToAsg From f9c05fb19baf5b602675faac87e05fa4120a99ed Mon Sep 17 00:00:00 2001 From: Ravi Sinha Date: Fri, 21 Jun 2024 13:19:15 -0700 Subject: [PATCH 5/6] resolving gofmt issue --- cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go index 80ae85b5325..335cbeb3180 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_cloud_provider_test.go @@ -17,7 +17,6 @@ limitations under the License. package aws import ( - "testing" "fmt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -27,6 +26,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/aws" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/aws/aws-sdk-go/service/autoscaling" "k8s.io/autoscaler/cluster-autoscaler/config" + "testing" ) var testAwsManager = &AwsManager{ From 3c5a97d7b9c6d8556df204301a4abbe6fd3b4c97 Mon Sep 17 00:00:00 2001 From: Ravi Sinha Date: Tue, 25 Jun 2024 17:31:31 -0700 Subject: [PATCH 6/6] fixing log comments --- cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go index 77e7b0b5795..6c34b7c726d 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go @@ -312,7 +312,7 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error { // Check if there are any placeholder instances in the list. if placeHolderInstancesCount > 0 { // Log the check for placeholders in the ASG. - klog.V(4).Infof("Detected %d placeholder instance(s), checking recent scaling activity for ASG %s", + klog.V(4).Infof("Detected %d placeholder instance(s) in ASG %s", placeHolderInstancesCount, commonAsg.Name) asgNames := []string{commonAsg.Name}