Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

correctly handle lack of capacity of AWS spot ASGs #2235

Merged
merged 1 commit into from
Aug 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 61 additions & 11 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package aws
import (
"fmt"
"reflect"
"regexp"
"strings"
"sync"

Expand All @@ -30,7 +31,10 @@ import (
"k8s.io/klog"
)

const scaleToZeroSupported = true
const (
scaleToZeroSupported = true
placeholderInstanceNamePrefix = "i-placeholder"
)

type asgCache struct {
registeredAsgs []*asg
Expand Down Expand Up @@ -195,6 +199,10 @@ func (m *asgCache) SetAsgSize(asg *asg, size int) error {
m.mutex.Lock()
defer m.mutex.Unlock()

return m.setAsgSizeNoLock(asg, size)
}

func (m *asgCache) setAsgSizeNoLock(asg *asg, size int) error {
params := &autoscaling.SetDesiredCapacityInput{
AutoScalingGroupName: aws.String(asg.Name),
DesiredCapacity: aws.Int64(int64(size)),
Expand All @@ -212,6 +220,10 @@ func (m *asgCache) SetAsgSize(asg *asg, size int) error {
return nil
}

func (m *asgCache) decreaseAsgSizeByOneNoLock(asg *asg) error {
return m.setAsgSizeNoLock(asg, asg.curSize-1)
}

// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG.
func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
m.mutex.Lock()
Expand Down Expand Up @@ -239,24 +251,36 @@ func (m *asgCache) DeleteInstances(instances []*AwsInstanceRef) error {
}

for _, instance := range instances {
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
resp, err := m.service.TerminateInstanceInAutoScalingGroup(params)
if err != nil {
return err
// check if the instance is a placeholder - a requested instance that was never created by the node group
// if it is, just decrease the size of the node group, as there's no specific instance we can remove
if m.isPlaceholderInstance(instance) {
klog.V(4).Infof("instance %s is detected as a placeholder, decreasing ASG requested size instead "+
"of deleting instance", instance.Name)
m.decreaseAsgSizeByOneNoLock(commonAsg)
} else {
params := &autoscaling.TerminateInstanceInAutoScalingGroupInput{
InstanceId: aws.String(instance.Name),
ShouldDecrementDesiredCapacity: aws.Bool(true),
}
resp, err := m.service.TerminateInstanceInAutoScalingGroup(params)
if err != nil {
return err
}
klog.V(4).Infof(*resp.Activity.Description)
}

// Proactively decrement the size so autoscaler makes better decisions
commonAsg.curSize--

klog.V(4).Infof(*resp.Activity.Description)
}

return nil
}

// isPlaceholderInstance checks if the given instance is only a placeholder
func (m *asgCache) isPlaceholderInstance(instance *AwsInstanceRef) bool {
matched, _ := regexp.MatchString(fmt.Sprintf("^%s.*\\d+$", placeholderInstanceNamePrefix), instance.Name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A future improvement might be to compile this regex into the module scope and re-use the compiled regex here. Either that, or just use strings.HasPrefix() and ignore the need to verify a numeric component to the name.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This idea came up here on 29 May 2019.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, indeed it did! :)

return matched
}

// Fetch automatically discovered ASGs. These ASGs should be unregistered if
// they no longer exist in AWS.
func (m *asgCache) fetchAutoAsgNames() ([]string, error) {
Expand Down Expand Up @@ -323,6 +347,11 @@ func (m *asgCache) regenerate() error {
return err
}

// If currently any ASG has more Desired than running Instances, introduce placeholders
// for the instances to come up. This is required to track Desired instances that
// will never come up, like with Spot Request that can't be fulfilled
groups = m.createPlaceholdersForDesiredNonStartedInstances(groups)

// Register or update ASGs
exists := make(map[AwsRef]bool)
for _, group := range groups {
Expand Down Expand Up @@ -355,6 +384,27 @@ func (m *asgCache) regenerate() error {
return nil
}

func (m *asgCache) createPlaceholdersForDesiredNonStartedInstances(groups []*autoscaling.Group) []*autoscaling.Group {
for _, g := range groups {
desired := *g.DesiredCapacity
real := int64(len(g.Instances))
if desired <= real {
continue
}

for i := real; i < desired; i++ {
id := fmt.Sprintf("%s-%s-%d", placeholderInstanceNamePrefix, *g.AutoScalingGroupName, i)
klog.V(4).Infof("Instance group %s has only %d instances created while requested count is %d. "+
"Creating placeholder instance with ID %s.", *g.AutoScalingGroupName, real, desired, id)
g.Instances = append(g.Instances, &autoscaling.Instance{
InstanceId: &id,
AvailabilityZone: g.AvailabilityZones[0],
})
}
}
return groups
}

func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
spec := dynamic.NodeGroupSpec{
Name: aws.StringValue(g.AutoScalingGroupName),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,7 @@ func TestDeleteNodes(t *testing.T) {
})

// Look up the current number of instances...
var expectedInstancesCount int64 = 2
service.On("DescribeAutoScalingGroupsPages",
&autoscaling.DescribeAutoScalingGroupsInput{
AutoScalingGroupNames: aws.StringSlice([]string{"test-asg"}),
Expand All @@ -406,7 +407,9 @@ func TestDeleteNodes(t *testing.T) {
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", 2, "test-instance-id", "second-test-instance-id"), false)
fn(testNamedDescribeAutoScalingGroupsOutput("test-asg", expectedInstancesCount, "test-instance-id", "second-test-instance-id"), false)
// we expect the instance count to be 1 after the call to DeleteNodes
expectedInstancesCount = 1
}).Return(nil)

provider.Refresh()
Expand All @@ -423,7 +426,7 @@ func TestDeleteNodes(t *testing.T) {
err = asgs[0].DeleteNodes([]*apiv1.Node{node})
assert.NoError(t, err)
service.AssertNumberOfCalls(t, "TerminateInstanceInAutoScalingGroup", 1)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 1)
service.AssertNumberOfCalls(t, "DescribeAutoScalingGroupsPages", 2)

newSize, err := asgs[0].TargetSize()
assert.NoError(t, err)
Expand Down
6 changes: 5 additions & 1 deletion cluster-autoscaler/cloudprovider/aws/aws_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,11 @@ func (m *AwsManager) SetAsgSize(asg *asg, size int) error {

// DeleteInstances deletes the given instances. All instances must be controlled by the same ASG.
func (m *AwsManager) DeleteInstances(instances []*AwsInstanceRef) error {
return m.asgCache.DeleteInstances(instances)
if err := m.asgCache.DeleteInstances(instances); err != nil {
return err
}
klog.V(2).Infof("Some ASG instances might have been deleted, forcing ASG list refresh")
return m.forceRefresh()
}

// GetAsgNodes returns Asg nodes.
Expand Down
12 changes: 11 additions & 1 deletion cluster-autoscaler/cloudprovider/aws/aws_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,16 @@ func TestFetchExplicitAsgs(t *testing.T) {
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
zone := "test-1a"
fn(&autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: []*autoscaling.Group{
{AutoScalingGroupName: aws.String(groupname)},
{
AvailabilityZones: []*string{&zone},
AutoScalingGroupName: aws.String(groupname),
MinSize: aws.Int64(int64(min)),
MaxSize: aws.Int64(int64(max)),
DesiredCapacity: aws.Int64(int64(min)),
},
}}, false)
}).Return(nil)

Expand Down Expand Up @@ -381,11 +388,14 @@ func TestFetchAutoAsgs(t *testing.T) {
mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool)
zone := "test-1a"
fn(&autoscaling.DescribeAutoScalingGroupsOutput{
AutoScalingGroups: []*autoscaling.Group{{
AvailabilityZones: []*string{&zone},
AutoScalingGroupName: aws.String(groupname),
MinSize: aws.Int64(int64(min)),
MaxSize: aws.Int64(int64(max)),
DesiredCapacity: aws.Int64(int64(min)),
}}}, false)
}).Return(nil).Twice()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,7 @@ func (cache *CloudProviderNodeInstancesCache) GetCloudProviderNodeInstances() (m
wg.Add(1)
go func() {
defer wg.Done()
_, err := cache.fetchCloudProviderNodeInstancesForNodeGroup(nodeGroup)
if err != nil {
if _, err := cache.fetchCloudProviderNodeInstancesForNodeGroup(nodeGroup); err != nil {
klog.Errorf("Failed to fetch cloud provider node instances for %v, error %v", nodeGroup.Id(), err)
}
}()
Expand Down
4 changes: 3 additions & 1 deletion cluster-autoscaler/core/static_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,9 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
unregisteredNodes := a.clusterStateRegistry.GetUnregisteredNodes()
if len(unregisteredNodes) > 0 {
klog.V(1).Infof("%d unregistered nodes present", len(unregisteredNodes))
removedAny, err := removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext, currentTime, autoscalingContext.LogRecorder)
removedAny, err := removeOldUnregisteredNodes(unregisteredNodes, autoscalingContext,
currentTime, autoscalingContext.LogRecorder)
// There was a problem with removing unregistered nodes. Retry in the next loop.
if err != nil {
klog.Warningf("Failed to remove unregistered nodes: %v", err)
}
Expand Down