Skip to content

Commit

Permalink
Support attribute-based instance selection for AWS
Browse files Browse the repository at this point in the history
  • Loading branch information
AustinSiu committed Jan 10, 2022
1 parent b3576e0 commit 7562b79
Show file tree
Hide file tree
Showing 118 changed files with 83,231 additions and 15,416 deletions.
27 changes: 19 additions & 8 deletions cluster-autoscaler/cloudprovider/aws/auto_scaling_groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ type launchTemplate struct {
}

type mixedInstancesPolicy struct {
launchTemplate *launchTemplate
instanceTypesOverrides []string
launchTemplate *launchTemplate
instanceTypesOverrides []string
instanceRequirementsOverrides *autoscaling.InstanceRequirements
}

type asg struct {
Expand Down Expand Up @@ -483,17 +484,27 @@ func (m *asgCache) buildAsgFromAWS(g *autoscaling.Group) (*asg, error) {
}

if g.MixedInstancesPolicy != nil {
getInstanceTypes := func(data []*autoscaling.LaunchTemplateOverrides) []string {
res := make([]string, len(data))
for i := 0; i < len(data); i++ {
res[i] = aws.StringValue(data[i].InstanceType)
getInstanceTypes := func(overrides []*autoscaling.LaunchTemplateOverrides) []string {
res := []string{}
for _, override := range overrides {
if override.InstanceType != nil {
res = append(res, *override.InstanceType)
}
}
return res
}

getInstanceTypeRequirements := func(overrides []*autoscaling.LaunchTemplateOverrides) *autoscaling.InstanceRequirements {
if len(overrides) == 1 && overrides[0].InstanceRequirements != nil {
return overrides[0].InstanceRequirements
}
return nil
}

asg.MixedInstancesPolicy = &mixedInstancesPolicy{
launchTemplate: buildLaunchTemplateFromSpec(g.MixedInstancesPolicy.LaunchTemplate.LaunchTemplateSpecification),
instanceTypesOverrides: getInstanceTypes(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
launchTemplate: buildLaunchTemplateFromSpec(g.MixedInstancesPolicy.LaunchTemplate.LaunchTemplateSpecification),
instanceTypesOverrides: getInstanceTypes(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
instanceRequirementsOverrides: getInstanceTypeRequirements(g.MixedInstancesPolicy.LaunchTemplate.Overrides),
}
}

Expand Down
28 changes: 28 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ func (m *AwsManager) getAsgTemplate(asg *asg) (*asgTemplate, error) {
Tags: asg.Tags,
}, nil
}

return nil, fmt.Errorf("ASG %q uses the unknown EC2 instance type %q", asg.Name, instanceTypeName)
}

Expand Down Expand Up @@ -395,6 +396,33 @@ func (m *AwsManager) buildNodeFromTemplate(asg *asg, template *asgTemplate) (*ap
node.Status.Capacity[gpu.ResourceNvidiaGPU] = *resource.NewQuantity(template.InstanceType.GPU, resource.DecimalSI)
node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(template.InstanceType.MemoryMb*1024*1024, resource.DecimalSI)

if asg.MixedInstancesPolicy != nil && asg.MixedInstancesPolicy.instanceRequirementsOverrides != nil {
instanceReqirements := asg.MixedInstancesPolicy.instanceRequirementsOverrides

if instanceReqirements.VCpuCount != nil {
if instanceReqirements.VCpuCount.Min != nil {
node.Status.Capacity[apiv1.ResourceCPU] = *resource.NewQuantity(*instanceReqirements.VCpuCount.Min, resource.DecimalSI)
}
}

for _, manufacturer := range instanceReqirements.AcceleratorManufacturers {
if *manufacturer == autoscaling.AcceleratorManufacturerNvidia {
for _, acceleratorType := range instanceReqirements.AcceleratorTypes {
if *acceleratorType == autoscaling.AcceleratorTypeGpu {
node.Status.Capacity[gpu.ResourceNvidiaGPU] = *resource.NewQuantity(*instanceReqirements.AcceleratorCount.Min, resource.DecimalSI)
}
}
}
}

if instanceReqirements.MemoryMiB != nil {
if instanceReqirements.MemoryMiB.Min != nil {
node.Status.Capacity[apiv1.ResourceMemory] = *resource.NewQuantity(*instanceReqirements.MemoryMiB.Min*1024*1024, resource.DecimalSI)
}
}

}

resourcesFromTags := extractAllocatableResourcesFromAsg(template.Tags)
for resourceName, val := range resourcesFromTags {
node.Status.Capacity[apiv1.ResourceName(resourceName)] = *val
Expand Down
32 changes: 32 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
provider_aws "k8s.io/legacy-cloud-providers/aws"
)

Expand Down Expand Up @@ -267,6 +268,37 @@ func TestBuildNodeFromTemplate(t *testing.T) {
observedTaints := observedNode.Spec.Taints
assert.Equal(t, 1, len(observedTaints))
assert.Equal(t, gpuTaint, observedTaints[0])

// Node with instance requirements
asg.MixedInstancesPolicy = &mixedInstancesPolicy{
instanceRequirementsOverrides: &autoscaling.InstanceRequirements{
VCpuCount: &autoscaling.VCpuCountRequest{
Min: aws.Int64(4),
Max: aws.Int64(8),
},
MemoryMiB: &autoscaling.MemoryMiBRequest{
Min: aws.Int64(4),
Max: aws.Int64(8),
},
AcceleratorTypes: []*string{aws.String(autoscaling.AcceleratorTypeGpu)},
AcceleratorManufacturers: []*string{aws.String(autoscaling.AcceleratorManufacturerNvidia)},
AcceleratorCount: &autoscaling.AcceleratorCountRequest{
Min: aws.Int64(4),
Max: aws.Int64(8),
},
},
}
observedNode, observedErr = awsManager.buildNodeFromTemplate(asg, &asgTemplate{
InstanceType: c5Instance,
})

assert.NoError(t, observedErr)
observedMemoryRequirement := observedNode.Status.Capacity[apiv1.ResourceMemory]
assert.Equal(t, int64(4*1024*1024), observedMemoryRequirement.Value())
observedVCpuRequirement := observedNode.Status.Capacity[apiv1.ResourceCPU]
assert.Equal(t, int64(4), observedVCpuRequirement.Value())
observedGpuRequirement := observedNode.Status.Capacity[gpu.ResourceNvidiaGPU]
assert.Equal(t, int64(4), observedGpuRequirement.Value())
}

func TestExtractLabelsFromAsg(t *testing.T) {
Expand Down
90 changes: 90 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package aws

import (
"encoding/json"
"fmt"
"strconv"
"time"
Expand All @@ -40,7 +41,9 @@ type autoScalingI interface {

// ec2I is the interface abstracting specific API calls of the EC2 service provided by AWS SDK for use in CA
type ec2I interface {
DescribeImages(input *ec2.DescribeImagesInput) (*ec2.DescribeImagesOutput, error)
DescribeLaunchTemplateVersions(input *ec2.DescribeLaunchTemplateVersionsInput) (*ec2.DescribeLaunchTemplateVersionsOutput, error)
GetInstanceTypesFromInstanceRequirementsPages(input *ec2.GetInstanceTypesFromInstanceRequirementsInput, fn func(*ec2.GetInstanceTypesFromInstanceRequirementsOutput, bool) bool) error
}

// eksI is the interface that represents a specific aspect of EKS (Elastic Kubernetes Service) which is provided by AWS SDK for use in CA
Expand Down Expand Up @@ -255,10 +258,84 @@ func (m *awsWrapper) getInstanceTypeByLaunchTemplate(launchTemplate *launchTempl
return aws.StringValue(instanceType), nil
}

func (m *awsWrapper) getInstanceTypesFromInstanceRequirements(policy *mixedInstancesPolicy) ([]string, error) {
requirementsRequest, err := m.getInstanceRequirementsRequestInput(policy.instanceRequirementsOverrides)
if err != nil {
return nil, fmt.Errorf("unable to get build instance requirements request")
}

describeTemplateInput := &ec2.DescribeLaunchTemplateVersionsInput{
LaunchTemplateName: aws.String(policy.launchTemplate.name),
Versions: []*string{aws.String(policy.launchTemplate.version)},
}

start := time.Now()
describeData, err := m.DescribeLaunchTemplateVersions(describeTemplateInput)
observeAWSRequest("DescribeLaunchTemplateVersions", err, start)
if len(describeData.LaunchTemplateVersions) == 0 {
return nil, fmt.Errorf("unable to find template versions")
}

describeImagesInput := &ec2.DescribeImagesInput{
ImageIds: []*string{describeData.LaunchTemplateVersions[0].LaunchTemplateData.ImageId},
}

start = time.Now()
describeImagesOutput, err := m.DescribeImages(describeImagesInput)
observeAWSRequest("DescribeImages", err, start)
if err != nil {
return nil, fmt.Errorf("unable to find get image details")
}

imageArchitectures := []*string{}
imageVirtualizationTypes := []*string{}
for _, image := range describeImagesOutput.Images {
imageArchitectures = append(imageArchitectures, image.Architecture)
imageVirtualizationTypes = append(imageVirtualizationTypes, image.VirtualizationType)
}

requirementsInput := &ec2.GetInstanceTypesFromInstanceRequirementsInput{
ArchitectureTypes: imageArchitectures,
InstanceRequirements: requirementsRequest,
VirtualizationTypes: imageVirtualizationTypes,
}

start = time.Now()
instanceTypes := []string{}
err = m.GetInstanceTypesFromInstanceRequirementsPages(requirementsInput, func(page *ec2.GetInstanceTypesFromInstanceRequirementsOutput, isLastPage bool) bool {
for _, instanceType := range page.InstanceTypes {
instanceTypes = append(instanceTypes, *instanceType.InstanceType)
}
return !isLastPage
})
observeAWSRequest("GetInstanceTypesFromInstanceRequirements", err, start)
if err != nil {
return nil, fmt.Errorf("unable to get instance types from requirements")
}

return instanceTypes, nil
}

func (m *awsWrapper) getInstanceRequirementsRequestInput(requirements *autoscaling.InstanceRequirements) (*ec2.InstanceRequirementsRequest, error) {
requirementsJson, err := json.Marshal(*requirements)
if err != nil {
return nil, err
}

var requirementsRequest *ec2.InstanceRequirementsRequest
err = json.Unmarshal(requirementsJson, &requirementsRequest)
if err != nil {
return nil, err
}

return requirementsRequest, nil
}

func (m *awsWrapper) getInstanceTypesForAsgs(asgs []*asg) (map[string]string, error) {
results := map[string]string{}
launchConfigsToQuery := map[string]string{}
launchTemplatesToQuery := map[string]*launchTemplate{}
mixedInstancesPoliciesToQuery := map[string]*mixedInstancesPolicy{}

for _, asg := range asgs {
name := asg.AwsRef.Name
Expand All @@ -269,6 +346,8 @@ func (m *awsWrapper) getInstanceTypesForAsgs(asgs []*asg) (map[string]string, er
} else if asg.MixedInstancesPolicy != nil {
if len(asg.MixedInstancesPolicy.instanceTypesOverrides) > 0 {
results[name] = asg.MixedInstancesPolicy.instanceTypesOverrides[0]
} else if asg.MixedInstancesPolicy.instanceRequirementsOverrides != nil {
mixedInstancesPoliciesToQuery[name] = asg.MixedInstancesPolicy
} else {
launchTemplatesToQuery[name] = asg.MixedInstancesPolicy.launchTemplate
}
Expand Down Expand Up @@ -305,6 +384,17 @@ func (m *awsWrapper) getInstanceTypesForAsgs(asgs []*asg) (map[string]string, er
}
klog.V(4).Infof("Successfully queried %d launch templates", len(launchTemplatesToQuery))

// Have to match Instance Requirements one-at-a-time, since they are configured per asg and can't be queried in bulk
for asgName, policy := range mixedInstancesPoliciesToQuery {
instanceTypes, err := m.getInstanceTypesFromInstanceRequirements(policy)
if err != nil {
klog.Errorf("Failed to query instance requirements for ASG %s: %v", asgName, err)
continue
}
results[asgName] = instanceTypes[0]
}
klog.V(4).Infof("Successfully queried instance requirements for %d ASGs", len(mixedInstancesPoliciesToQuery))

return results, nil
}

Expand Down
91 changes: 91 additions & 0 deletions cluster-autoscaler/cloudprovider/aws/aws_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,21 @@ type ec2Mock struct {
mock.Mock
}

func (e *ec2Mock) DescribeImages(input *ec2.DescribeImagesInput) (*ec2.DescribeImagesOutput, error) {
args := e.Called(input)
return args.Get(0).(*ec2.DescribeImagesOutput), nil
}

func (e *ec2Mock) DescribeLaunchTemplateVersions(i *ec2.DescribeLaunchTemplateVersionsInput) (*ec2.DescribeLaunchTemplateVersionsOutput, error) {
args := e.Called(i)
return args.Get(0).(*ec2.DescribeLaunchTemplateVersionsOutput), nil
}

func (e *ec2Mock) GetInstanceTypesFromInstanceRequirementsPages(input *ec2.GetInstanceTypesFromInstanceRequirementsInput, fn func(*ec2.GetInstanceTypesFromInstanceRequirementsOutput, bool) bool) error {
args := e.Called(input, fn)
return args.Error(0)
}

type eksMock struct {
mock.Mock
}
Expand Down Expand Up @@ -377,6 +387,87 @@ func TestGetInstanceTypesForAsgs(t *testing.T) {
}
}

func TestGetInstanceTypesFromInstanceRequirements(t *testing.T) {
mixedInstancesPolicy := &mixedInstancesPolicy{
launchTemplate: &launchTemplate{
name: "launchTemplateName",
version: "1",
},
instanceRequirementsOverrides: &autoscaling.InstanceRequirements{
VCpuCount: &autoscaling.VCpuCountRequest{
Min: aws.Int64(4),
Max: aws.Int64(8),
},
MemoryMiB: &autoscaling.MemoryMiBRequest{
Min: aws.Int64(4),
Max: aws.Int64(8),
},
AcceleratorTypes: []*string{aws.String(autoscaling.AcceleratorTypeGpu)},
AcceleratorManufacturers: []*string{aws.String(autoscaling.AcceleratorManufacturerNvidia)},
AcceleratorCount: &autoscaling.AcceleratorCountRequest{
Min: aws.Int64(4),
Max: aws.Int64(8),
},
},
}

e := &ec2Mock{}
awsWrapper := &awsWrapper{
autoScalingI: nil,
ec2I: e,
eksI: nil,
}

e.On("DescribeLaunchTemplateVersions", &ec2.DescribeLaunchTemplateVersionsInput{
LaunchTemplateName: aws.String("launchTemplateName"),
Versions: []*string{aws.String("1")},
}).Return(&ec2.DescribeLaunchTemplateVersionsOutput{
LaunchTemplateVersions: []*ec2.LaunchTemplateVersion{
{
LaunchTemplateData: &ec2.ResponseLaunchTemplateData{
ImageId: aws.String("123"),
},
},
},
})

e.On("DescribeImages", &ec2.DescribeImagesInput{
ImageIds: []*string{aws.String("123")},
}).Return(&ec2.DescribeImagesOutput{
Images: []*ec2.Image{
{
Architecture: aws.String("x86_64"),
VirtualizationType: aws.String("xen"),
},
},
})

requirements, err := awsWrapper.getInstanceRequirementsRequestInput(mixedInstancesPolicy.instanceRequirementsOverrides)
assert.NoError(t, err)
e.On("GetInstanceTypesFromInstanceRequirementsPages",
&ec2.GetInstanceTypesFromInstanceRequirementsInput{
ArchitectureTypes: []*string{aws.String("x86_64")},
InstanceRequirements: requirements,
VirtualizationTypes: []*string{aws.String("xen")},
},
mock.AnythingOfType("func(*ec2.GetInstanceTypesFromInstanceRequirementsOutput, bool) bool"),
).Run(func(args mock.Arguments) {
fn := args.Get(1).(func(*ec2.GetInstanceTypesFromInstanceRequirementsOutput, bool) bool)
fn(&ec2.GetInstanceTypesFromInstanceRequirementsOutput{
InstanceTypes: []*ec2.InstanceTypeInfoFromInstanceRequirements{
{
InstanceType: aws.String("g4dn.xlarge"),
},
},
}, false)
}).Return(nil)

results, err := awsWrapper.getInstanceTypesFromInstanceRequirements(mixedInstancesPolicy)
assert.NoError(t, err)
assert.Equal(t, len(results), 1)
assert.Equal(t, results[0], "g4dn.xlarge")
}

func TestBuildLaunchTemplateFromSpec(t *testing.T) {
assert := assert.New(t)

Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8
github.com/Azure/go-autorest/autorest/date v0.3.0
github.com/Azure/go-autorest/autorest/to v0.4.0
github.com/aws/aws-sdk-go v1.38.49
github.com/aws/aws-sdk-go v1.42.25
github.com/digitalocean/godo v1.27.0
github.com/ghodss/yaml v1.0.0
github.com/golang/mock v1.6.0
Expand Down
Loading

0 comments on commit 7562b79

Please sign in to comment.