Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster-autoscaler/aws: batch launch config query and ttl cache #2840

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 131 additions & 14 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,23 @@ package aws

import (
"fmt"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/autoscaling"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
)

const (
launchConfigurationCachedTTL = time.Minute * 20
cacheMinTTL = 120
cacheMaxTTL = 600
)

// autoScaling is the interface represents a specific aspect of the auto-scaling service provided by AWS SDK for use in CA
type autoScaling interface {
DescribeAutoScalingGroupsPages(input *autoscaling.DescribeAutoScalingGroupsInput, fn func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool) error
Expand All @@ -36,30 +47,94 @@ type autoScaling interface {
// autoScalingWrapper provides several utility methods over the auto-scaling service provided by AWS SDK
type autoScalingWrapper struct {
autoScaling
launchConfigurationInstanceTypeCache map[string]string
launchConfigurationInstanceTypeCache *expirationStore
}

func (m autoScalingWrapper) getInstanceTypeByLCName(name string) (string, error) {
if instanceType, found := m.launchConfigurationInstanceTypeCache[name]; found {
return instanceType, nil
// expirationStore cache the launch configuration with their instance type.
// The store expires its keys based on a TTL. This TTL can have a jitter applied to it.
// This allows to get a better repartition of the AWS queries.
type expirationStore struct {
cache.Store
jitterClock *jitterClock
}

type instanceTypeCachedObject struct {
name string
instanceType string
}

type jitterClock struct {
clock.Clock

jitter bool
sync.RWMutex
}

func newLaunchConfigurationInstanceTypeCache() *expirationStore {
jc := &jitterClock{}
return &expirationStore{
cache.NewExpirationStore(func(obj interface{}) (s string, e error) {
return obj.(instanceTypeCachedObject).name, nil
}, &cache.TTLPolicy{
TTL: launchConfigurationCachedTTL,
Clock: jc,
}),
jc,
}
}

func (c *jitterClock) Since(ts time.Time) time.Duration {
since := time.Since(ts)
c.RLock()
defer c.RUnlock()
if c.jitter {
return since + (time.Second * time.Duration(rand.IntnRange(cacheMinTTL, cacheMaxTTL)))
}
return since
}

func (m autoScalingWrapper) getInstanceTypeByLCNames(launchConfigToQuery []*string) ([]*autoscaling.LaunchConfiguration, error) {
var launchConfigurations []*autoscaling.LaunchConfiguration

for i := 0; i < len(launchConfigToQuery); i += 50 {
end := i + 50

if end > len(launchConfigToQuery) {
end = len(launchConfigToQuery)
}
params := &autoscaling.DescribeLaunchConfigurationsInput{
LaunchConfigurationNames: launchConfigToQuery[i:end],
MaxRecords: aws.Int64(50),
}
r, err := m.DescribeLaunchConfigurations(params)
if err != nil {
return nil, err
}
launchConfigurations = append(launchConfigurations, r.LaunchConfigurations...)
for _, lc := range r.LaunchConfigurations {
_ = m.launchConfigurationInstanceTypeCache.Add(instanceTypeCachedObject{
name: *lc.LaunchConfigurationName,
instanceType: *lc.InstanceType,
})
}
}
return launchConfigurations, nil
}

params := &autoscaling.DescribeLaunchConfigurationsInput{
LaunchConfigurationNames: []*string{aws.String(name)},
MaxRecords: aws.Int64(1),
func (m autoScalingWrapper) getInstanceTypeByLCName(name string) (string, error) {
if obj, found, _ := m.launchConfigurationInstanceTypeCache.GetByKey(name); found {
return obj.(instanceTypeCachedObject).instanceType, nil
}
launchConfigurations, err := m.DescribeLaunchConfigurations(params)

launchConfigs, err := m.getInstanceTypeByLCNames([]*string{aws.String(name)})
if err != nil {
klog.V(4).Infof("Failed LaunchConfiguration info request for %s: %v", name, err)
klog.Errorf("Failed to query the launch configuration %s to get the instance type: %v", name, err)
return "", err
}
if len(launchConfigurations.LaunchConfigurations) < 1 {
if len(launchConfigs) < 1 || launchConfigs[0].InstanceType == nil {
return "", fmt.Errorf("unable to get first LaunchConfiguration for %s", name)
}

instanceType := *launchConfigurations.LaunchConfigurations[0].InstanceType
m.launchConfigurationInstanceTypeCache[name] = instanceType
return instanceType, nil
return *launchConfigs[0].InstanceType, nil
}

func (m *autoScalingWrapper) getAutoscalingGroupsByNames(names []string) ([]*autoscaling.Group, error) {
Expand Down Expand Up @@ -94,6 +169,48 @@ func (m *autoScalingWrapper) getAutoscalingGroupsByNames(names []string) ([]*aut
return asgs, nil
}

func (m autoScalingWrapper) populateLaunchConfigurationInstanceTypeCache(autoscalingGroups []*autoscaling.Group) error {
var launchConfigToQuery []*string

m.launchConfigurationInstanceTypeCache.jitterClock.Lock()
m.launchConfigurationInstanceTypeCache.jitterClock.jitter = true
m.launchConfigurationInstanceTypeCache.jitterClock.Unlock()
for _, asg := range autoscalingGroups {
if asg == nil {
continue
}
if asg.LaunchConfigurationName == nil {
continue
}
_, found, _ := m.launchConfigurationInstanceTypeCache.GetByKey(*asg.LaunchConfigurationName)
if found {
continue
}
launchConfigToQuery = append(launchConfigToQuery, asg.LaunchConfigurationName)
}
m.launchConfigurationInstanceTypeCache.jitterClock.Lock()
m.launchConfigurationInstanceTypeCache.jitterClock.jitter = false
m.launchConfigurationInstanceTypeCache.jitterClock.Unlock()

// List expire old entries
_ = m.launchConfigurationInstanceTypeCache.List()

if len(launchConfigToQuery) == 0 {
klog.V(4).Infof("%d launch configurations already in cache", len(autoscalingGroups))
return nil
}
klog.V(4).Infof("%d launch configurations to query", len(launchConfigToQuery))

_, err := m.getInstanceTypeByLCNames(launchConfigToQuery)
if err != nil {
klog.Errorf("Failed to query %d launch configurations", len(launchConfigToQuery))
return err
}

klog.V(4).Infof("Successfully query %d launch configurations", len(launchConfigToQuery))
return nil
}

func (m *autoScalingWrapper) getAutoscalingGroupNamesByTags(kvs map[string]string) ([]string, error) {
// DescribeTags does an OR query when multiple filters on different tags are
// specified. In other words, DescribeTags returns [asg1, asg1] for keys
Expand Down
5 changes: 5 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ func (m *asgCache) regenerate() error {
return err
}

err = m.service.populateLaunchConfigurationInstanceTypeCache(groups)
if err != nil {
klog.Warningf("Failed to fully populate all launchConfigurations: %v", err)
}

// If currently any ASG has more Desired than running Instances, introduce placeholders
// for the instances to come up. This is required to track Desired instances that
// will never come up, like with Spot Request that can't be fulfilled
Expand Down
14 changes: 14 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/aws/aws-sdk-go/service/autoscaling"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
)

func TestMoreThen50Groups(t *testing.T) {
Expand Down Expand Up @@ -68,3 +69,16 @@ func TestMoreThen50Groups(t *testing.T) {
assert.Equal(t, *asgs[0].AutoScalingGroupName, "asg-1")
assert.Equal(t, *asgs[1].AutoScalingGroupName, "asg-2")
}

func TestLaunchConfigurationCache(t *testing.T) {
c := newLaunchConfigurationInstanceTypeCache()
err := c.Add(instanceTypeCachedObject{
name: "123",
instanceType: "t2.medium",
})
require.NoError(t, err)
obj, ok, err := c.GetByKey("123")
require.NoError(t, err)
require.True(t, ok)
require.Equal(t, "t2.medium", obj.(instanceTypeCachedObject).instanceType)
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (e *EC2Mock) DescribeLaunchTemplateVersions(i *ec2.DescribeLaunchTemplateVe
return args.Get(0).(*ec2.DescribeLaunchTemplateVersionsOutput), nil
}

var testService = autoScalingWrapper{&AutoScalingMock{}, map[string]string{}}
var testService = autoScalingWrapper{&AutoScalingMock{}, newLaunchConfigurationInstanceTypeCache()}

var testAwsManager = &AwsManager{
asgCache: &asgCache{
Expand All @@ -80,7 +80,7 @@ var testAwsManager = &AwsManager{
}

func newTestAwsManagerWithService(service autoScaling, autoDiscoverySpecs []asgAutoDiscoveryConfig) *AwsManager {
wrapper := autoScalingWrapper{service, map[string]string{}}
wrapper := autoScalingWrapper{service, newLaunchConfigurationInstanceTypeCache()}
return &AwsManager{
autoScalingService: wrapper,
asgCache: &asgCache{
Expand Down
3 changes: 2 additions & 1 deletion cluster-autoscaler/cloudprovider/aws/aws_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,8 @@ func createAWSManagerInternal(
}

if autoScalingService == nil {
autoScalingService = &autoScalingWrapper{autoscaling.New(sess), map[string]string{}}
c := newLaunchConfigurationInstanceTypeCache()
autoScalingService = &autoScalingWrapper{autoscaling.New(sess), c}
}

if ec2Service == nil {
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/cloudprovider/aws/aws_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ func TestFetchExplicitAsgs(t *testing.T) {
defer resetAWSRegion(os.LookupEnv("AWS_REGION"))
os.Setenv("AWS_REGION", "fanghorn")
// fetchExplicitASGs is called at manager creation time.
m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s, map[string]string{}}, nil)
m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s, newLaunchConfigurationInstanceTypeCache()}, nil)
assert.NoError(t, err)

asgs := m.asgCache.Get()
Expand Down Expand Up @@ -469,7 +469,7 @@ func TestFetchAutoAsgs(t *testing.T) {
defer resetAWSRegion(os.LookupEnv("AWS_REGION"))
os.Setenv("AWS_REGION", "fanghorn")
// fetchAutoASGs is called at manager creation time, via forceRefresh
m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s, map[string]string{}}, nil)
m, err := createAWSManagerInternal(nil, do, &autoScalingWrapper{s, newLaunchConfigurationInstanceTypeCache()}, nil)
assert.NoError(t, err)

asgs := m.asgCache.Get()
Expand Down