From 07b2f4f5a731fb28eaee2d9c45206f44214e43e2 Mon Sep 17 00:00:00 2001 From: Benjamin Pineau Date: Thu, 8 Apr 2021 13:34:34 +0200 Subject: [PATCH 1/3] Set maxAsgNamesPerDescribe to the new maximum value While this was previously effectively limited to 50, `DescribeAutoScalingGroups` now supports fetching 100 ASG per calls on all regions, matching what's documented: https://docs.aws.amazon.com/autoscaling/ec2/APIReference/API_DescribeAutoScalingGroups.html ``` AutoScalingGroupNames.member.N The names of the Auto Scaling groups. By default, you can only specify up to 50 names. You can optionally increase this limit using the MaxRecords parameter. MaxRecords The maximum number of items to return with this call. The default value is 50 and the maximum value is 100. ``` Doubling this halves API calls on large clusters, which should help to prevent throttling. --- .../cloudprovider/aws/auto_scaling_test.go | 14 +++++++------- .../cloudprovider/aws/aws_manager.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/auto_scaling_test.go b/cluster-autoscaler/cloudprovider/aws/auto_scaling_test.go index 90e5dba7e261..8b85887574e8 100644 --- a/cluster-autoscaler/cloudprovider/aws/auto_scaling_test.go +++ b/cluster-autoscaler/cloudprovider/aws/auto_scaling_test.go @@ -27,22 +27,22 @@ import ( "github.com/stretchr/testify/require" ) -func TestMoreThen50Groups(t *testing.T) { +func TestMoreThen100Groups(t *testing.T) { service := &AutoScalingMock{} autoScalingWrapper := &autoScalingWrapper{ autoScaling: service, } - // Generate 51 ASG names - names := make([]string, 51) + // Generate 101 ASG names + names := make([]string, 101) for i := 0; i < len(names); i++ { names[i] = fmt.Sprintf("asg-%d", i) } - // First batch, first 50 elements + // First batch, first 100 elements service.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice(names[:50]), + AutoScalingGroupNames: aws.StringSlice(names[:100]), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), }, mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), @@ -51,10 +51,10 @@ func TestMoreThen50Groups(t *testing.T) { fn(testNamedDescribeAutoScalingGroupsOutput("asg-1", 1, "test-instance-id"), false) }).Return(nil) - // Second batch, element 51 + // Second batch, element 101 service.On("DescribeAutoScalingGroupsPages", &autoscaling.DescribeAutoScalingGroupsInput{ - AutoScalingGroupNames: aws.StringSlice([]string{"asg-50"}), + AutoScalingGroupNames: aws.StringSlice([]string{"asg-100"}), MaxRecords: aws.Int64(maxRecordsReturnedByAPI), }, mock.AnythingOfType("func(*autoscaling.DescribeAutoScalingGroupsOutput, bool) bool"), diff --git a/cluster-autoscaler/cloudprovider/aws/aws_manager.go b/cluster-autoscaler/cloudprovider/aws/aws_manager.go index bf91d49b3792..51517f7cabb6 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_manager.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_manager.go @@ -48,7 +48,7 @@ const ( operationWaitTimeout = 5 * time.Second operationPollInterval = 100 * time.Millisecond maxRecordsReturnedByAPI = 100 - maxAsgNamesPerDescribe = 50 + maxAsgNamesPerDescribe = 100 refreshInterval = 1 * time.Minute autoDiscovererTypeASG = "asg" asgAutoDiscovererKeyTag = "tag" From 90433ace2093dca4b7d7ee28db43cd73274f2ae7 Mon Sep 17 00:00:00 2001 From: Adrian Lai Date: Wed, 7 Jul 2021 16:00:57 +0100 Subject: [PATCH 2/3] Break out unmarshal from GenerateEC2InstanceTypes Refactor to allow for optimisation --- .../cloudprovider/aws/aws_util.go | 39 +++--- .../cloudprovider/aws/aws_util_test.go | 119 +++++++++++++++++- 2 files changed, 143 insertions(+), 15 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_util.go b/cluster-autoscaler/cloudprovider/aws/aws_util.go index 5f1aa6c84681..ef53690ab28d 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_util.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_util.go @@ -20,17 +20,20 @@ import ( "encoding/json" "errors" "fmt" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/ec2metadata" - "github.com/aws/aws-sdk-go/aws/endpoints" - "github.com/aws/aws-sdk-go/aws/session" + "io" "io/ioutil" - klog "k8s.io/klog/v2" "net/http" "os" "regexp" "strconv" "strings" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/endpoints" + "github.com/aws/aws-sdk-go/aws/session" + + klog "k8s.io/klog/v2" ) var ( @@ -87,16 +90,9 @@ func GenerateEC2InstanceTypes(region string) (map[string]*InstanceType, error) { defer res.Body.Close() - body, err := ioutil.ReadAll(res.Body) + unmarshalled, err := unmarshalProductsResponse(res.Body) if err != nil { - klog.Warningf("Error parsing %s skipping...\n", url) - continue - } - - var unmarshalled = response{} - err = json.Unmarshal(body, &unmarshalled) - if err != nil { - klog.Warningf("Error unmarshalling %s, skip...\n", url) + klog.Warningf("Error parsing %s skipping...\n%s\n", url, err) continue } @@ -135,6 +131,21 @@ func GetStaticEC2InstanceTypes() (map[string]*InstanceType, string) { return InstanceTypes, staticListLastUpdateTime } +func unmarshalProductsResponse(r io.Reader) (*response, error) { + body, err := ioutil.ReadAll(r) + if err != nil { + return nil, err + } + + var unmarshalled = response{} + err = json.Unmarshal(body, &unmarshalled) + if err != nil { + return nil, err + } + + return &unmarshalled, nil +} + func parseMemory(memory string) int64 { reg, err := regexp.Compile("[^0-9\\.]+") if err != nil { diff --git a/cluster-autoscaler/cloudprovider/aws/aws_util_test.go b/cluster-autoscaler/cloudprovider/aws/aws_util_test.go index 55ef894a6197..243f96043a14 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_util_test.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_util_test.go @@ -17,12 +17,14 @@ limitations under the License. package aws import ( - "github.com/stretchr/testify/assert" "net/http" "net/http/httptest" "os" "strconv" + "strings" "testing" + + "github.com/stretchr/testify/assert" ) func TestGetStaticEC2InstanceTypes(t *testing.T) { @@ -136,3 +138,118 @@ func TestGetCurrentAwsRegionWithRegionEnv(t *testing.T) { assert.Nil(t, err) assert.Equal(t, region, result) } + +func TestUnmarshalProductsResponse(t *testing.T) { + body := ` +{ + "products": { + "VVD8BG8WWFD3DAZN" : { + "sku" : "VVD8BG8WWFD3DAZN", + "productFamily" : "Compute Instance", + "attributes" : { + "servicecode" : "AmazonEC2", + "location" : "US East (N. Virginia)", + "locationType" : "AWS Region", + "instanceType" : "r5b.4xlarge", + "currentGeneration" : "Yes", + "instanceFamily" : "Memory optimized", + "vcpu" : "16", + "physicalProcessor" : "Intel Xeon Platinum 8259 (Cascade Lake)", + "clockSpeed" : "3.1 GHz", + "memory" : "128 GiB", + "storage" : "EBS only", + "networkPerformance" : "Up to 10 Gigabit", + "processorArchitecture" : "64-bit", + "tenancy" : "Shared", + "operatingSystem" : "Linux", + "licenseModel" : "No License required", + "usagetype" : "UnusedBox:r5b.4xlarge", + "operation" : "RunInstances:0004", + "availabilityzone" : "NA", + "capacitystatus" : "UnusedCapacityReservation", + "classicnetworkingsupport" : "false", + "dedicatedEbsThroughput" : "10 Gbps", + "ecu" : "NA", + "enhancedNetworkingSupported" : "Yes", + "instancesku" : "G4NFAXD9TGJM3RY8", + "intelAvxAvailable" : "Yes", + "intelAvx2Available" : "No", + "intelTurboAvailable" : "No", + "marketoption" : "OnDemand", + "normalizationSizeFactor" : "32", + "preInstalledSw" : "SQL Std", + "servicename" : "Amazon Elastic Compute Cloud", + "vpcnetworkingsupport" : "true" + } + }, + "C36QEQQQJ8ZR7N32" : { + "sku" : "C36QEQQQJ8ZR7N32", + "productFamily" : "Compute Instance", + "attributes" : { + "servicecode" : "AmazonEC2", + "location" : "US East (N. Virginia)", + "locationType" : "AWS Region", + "instanceType" : "d3en.8xlarge", + "currentGeneration" : "Yes", + "instanceFamily" : "Storage optimized", + "vcpu" : "32", + "physicalProcessor" : "Intel Xeon Platinum 8259 (Cascade Lake)", + "clockSpeed" : "3.1 GHz", + "memory" : "128 GiB", + "storage" : "16 x 14000 HDD", + "networkPerformance" : "50 Gigabit", + "processorArchitecture" : "64-bit", + "tenancy" : "Dedicated", + "operatingSystem" : "SUSE", + "licenseModel" : "No License required", + "usagetype" : "DedicatedRes:d3en.8xlarge", + "operation" : "RunInstances:000g", + "availabilityzone" : "NA", + "capacitystatus" : "AllocatedCapacityReservation", + "classicnetworkingsupport" : "false", + "dedicatedEbsThroughput" : "5000 Mbps", + "ecu" : "NA", + "enhancedNetworkingSupported" : "Yes", + "instancesku" : "2XW3BCEZ83WMGFJY", + "intelAvxAvailable" : "Yes", + "intelAvx2Available" : "Yes", + "intelTurboAvailable" : "Yes", + "marketoption" : "OnDemand", + "normalizationSizeFactor" : "64", + "preInstalledSw" : "NA", + "processorFeatures" : "AVX; AVX2; Intel AVX; Intel AVX2; Intel AVX512; Intel Turbo", + "servicename" : "Amazon Elastic Compute Cloud", + "vpcnetworkingsupport" : "true" + } + } + } +} +` + r := strings.NewReader(body) + resp, err := unmarshalProductsResponse(r) + assert.Nil(t, err) + assert.Len(t, resp.Products, 2) + assert.NotNil(t, resp.Products["VVD8BG8WWFD3DAZN"]) + assert.NotNil(t, resp.Products["C36QEQQQJ8ZR7N32"]) + assert.Equal(t, resp.Products["VVD8BG8WWFD3DAZN"].Attributes.InstanceType, "r5b.4xlarge") + assert.Equal(t, resp.Products["C36QEQQQJ8ZR7N32"].Attributes.InstanceType, "d3en.8xlarge") + + invalidJsonTests := map[string]string{ + "[": "[", + "]": "]", + "}": "}", + "{": "{", + "Plain text": "invalid", + "List": "[]", + "Invalid products ([])": `{"products":[]}`, + "Invalid product ([])": `{"products":{"zz":[]}}`, + } + for name, body := range invalidJsonTests { + t.Run(name, func(t *testing.T) { + r := strings.NewReader(body) + resp, err := unmarshalProductsResponse(r) + assert.NotNil(t, err) + assert.Nil(t, resp) + }) + } +} From 41e2d4abf6f0de7eacf32e73fe6302d904842a97 Mon Sep 17 00:00:00 2001 From: Adrian Lai Date: Wed, 7 Jul 2021 16:31:28 +0100 Subject: [PATCH 3/3] Optimise GenerateEC2InstanceTypes unmarshal memory usage The pricing json for us-east-1 is currently 129MB. Currently fetching this into memory and parsing results in a large memory footprint on startup, and can lead to the autoscaler being OOMKilled. Change the ReadAll/Unmarshal logic to a stream decoder to significantly reduce the memory use. --- .../cloudprovider/aws/aws_util.go | 44 +++++++++++++++++-- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/aws/aws_util.go b/cluster-autoscaler/cloudprovider/aws/aws_util.go index ef53690ab28d..9fec8ba8eb62 100644 --- a/cluster-autoscaler/cloudprovider/aws/aws_util.go +++ b/cluster-autoscaler/cloudprovider/aws/aws_util.go @@ -21,7 +21,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net/http" "os" "regexp" @@ -132,16 +131,53 @@ func GetStaticEC2InstanceTypes() (map[string]*InstanceType, string) { } func unmarshalProductsResponse(r io.Reader) (*response, error) { - body, err := ioutil.ReadAll(r) + dec := json.NewDecoder(r) + t, err := dec.Token() if err != nil { return nil, err } + if delim, ok := t.(json.Delim); !ok || delim.String() != "{" { + return nil, errors.New("Invalid products json") + } + + unmarshalled := response{map[string]product{}} + + for dec.More() { + t, err = dec.Token() + if err != nil { + return nil, err + } + + if t == "products" { + tt, err := dec.Token() + if err != nil { + return nil, err + } + if delim, ok := tt.(json.Delim); !ok || delim.String() != "{" { + return nil, errors.New("Invalid products json") + } + for dec.More() { + productCode, err := dec.Token() + if err != nil { + return nil, err + } + + prod := product{} + if err = dec.Decode(&prod); err != nil { + return nil, err + } + unmarshalled.Products[productCode.(string)] = prod + } + } + } - var unmarshalled = response{} - err = json.Unmarshal(body, &unmarshalled) + t, err = dec.Token() if err != nil { return nil, err } + if delim, ok := t.(json.Delim); !ok || delim.String() != "}" { + return nil, errors.New("Invalid products json") + } return &unmarshalled, nil }