From 27c1a571aa60b3169e7a5827bfd8118684999771 Mon Sep 17 00:00:00 2001 From: Kubernetes Prow Robot Date: Mon, 23 Aug 2021 03:48:00 -0700 Subject: [PATCH] Merge pull request #4236 from DataDog/autoscaling-options-gce implement GetOptions for GCE --- cluster-autoscaler/cloudprovider/gce/cache.go | 16 +++++ .../cloudprovider/gce/gce_cloud_provider.go | 2 +- .../gce/gce_cloud_provider_test.go | 6 ++ .../cloudprovider/gce/gce_manager.go | 57 ++++++++++++++++ .../cloudprovider/gce/gce_manager_test.go | 68 +++++++++++++++++++ .../cloudprovider/gce/templates.go | 47 +++++++++++++ .../cloudprovider/gce/templates_test.go | 59 ++++++++++++++++ cluster-autoscaler/config/const.go | 9 +++ 8 files changed, 263 insertions(+), 1 deletion(-) diff --git a/cluster-autoscaler/cloudprovider/gce/cache.go b/cluster-autoscaler/cloudprovider/gce/cache.go index 2b8a695c1360..fd2abec09c57 100644 --- a/cluster-autoscaler/cloudprovider/gce/cache.go +++ b/cluster-autoscaler/cloudprovider/gce/cache.go @@ -69,6 +69,7 @@ type GceCache struct { instanceRefToMigRef map[GceRef]GceRef instancesFromUnknownMigs map[GceRef]struct{} resourceLimiter *cloudprovider.ResourceLimiter + autoscalingOptionsCache map[GceRef]map[string]string machinesCache map[MachineTypeKey]machinesCacheValue migTargetSizeCache map[GceRef]int64 migBaseNameCache map[GceRef]string @@ -85,6 +86,7 @@ func NewGceCache(gceService AutoscalingGceClient, concurrentGceRefreshes int) *G migs: map[GceRef]Mig{}, instanceRefToMigRef: map[GceRef]GceRef{}, instancesFromUnknownMigs: map[GceRef]struct{}{}, + autoscalingOptionsCache: map[GceRef]map[string]string{}, machinesCache: map[MachineTypeKey]machinesCacheValue{}, migTargetSizeCache: map[GceRef]int64{}, migBaseNameCache: map[GceRef]string{}, @@ -290,6 +292,20 @@ func (gc *GceCache) RegenerateInstancesCache() error { return nil } +// SetAutoscalingOptions stores autoscaling options strings obtained from IT. +func (gc *GceCache) SetAutoscalingOptions(ref GceRef, options map[string]string) { + gc.cacheMutex.Lock() + defer gc.cacheMutex.Unlock() + gc.autoscalingOptionsCache[ref] = options +} + +// GetAutoscalingOptions return autoscaling options strings obtained from IT. +func (gc *GceCache) GetAutoscalingOptions(ref GceRef) map[string]string { + gc.cacheMutex.Lock() + defer gc.cacheMutex.Unlock() + return gc.autoscalingOptionsCache[ref] +} + // SetResourceLimiter sets resource limiter. func (gc *GceCache) SetResourceLimiter(resourceLimiter *cloudprovider.ResourceLimiter) { gc.cacheMutex.Lock() diff --git a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go index 7018bcfe87dc..4e3c5024b839 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go @@ -330,7 +330,7 @@ func (mig *gceMig) Autoprovisioned() bool { // GetOptions returns NodeGroupAutoscalingOptions that should be used for this particular // NodeGroup. Returning a nil will result in using default options. func (mig *gceMig) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) { - return nil, cloudprovider.ErrNotImplemented + return mig.gceManager.GetMigOptions(mig, defaults), nil } // TemplateNodeInfo returns a node template for this node group. diff --git a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider_test.go index e6362a228c08..67b603cef632 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_cloud_provider_test.go @@ -24,6 +24,7 @@ import ( "testing" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" apiv1 "k8s.io/api/core/v1" @@ -87,6 +88,11 @@ func (m *gceManagerMock) findMigsNamed(name *regexp.Regexp) ([]string, error) { return args.Get(0).([]string), args.Error(1) } +func (m *gceManagerMock) GetMigOptions(mig Mig, defaults config.NodeGroupAutoscalingOptions) *config.NodeGroupAutoscalingOptions { + args := m.Called(mig, defaults) + return args.Get(0).(*config.NodeGroupAutoscalingOptions) +} + func (m *gceManagerMock) GetMigTemplateNode(mig Mig) (*apiv1.Node, error) { args := m.Called(mig) return args.Get(0).(*apiv1.Node), args.Error(1) diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager.go b/cluster-autoscaler/cloudprovider/gce/gce_manager.go index dded5730085f..84219c8211b5 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "os" + "reflect" "regexp" "strconv" "strings" @@ -30,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/utils/units" "k8s.io/client-go/util/workqueue" @@ -90,6 +92,8 @@ type GceManager interface { GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) // GetMigSize gets MIG size. GetMigSize(mig Mig) (int64, error) + // GetMigOptions returns MIG's NodeGroupAutoscalingOptions + GetMigOptions(mig Mig, defaults config.NodeGroupAutoscalingOptions) *config.NodeGroupAutoscalingOptions // SetMigSize sets MIG size. SetMigSize(mig Mig, size int64) error @@ -313,11 +317,40 @@ func (m *gceManagerImpl) forceRefresh() error { klog.Errorf("Failed to fetch MIGs: %v", err) return err } + m.refreshAutoscalingOptions() m.lastRefresh = time.Now() klog.V(2).Infof("Refreshed GCE resources, next refresh after %v", m.lastRefresh.Add(refreshInterval)) return nil } +func (m *gceManagerImpl) refreshAutoscalingOptions() { + for _, mig := range m.cache.GetMigs() { + template, err := m.migInstanceTemplatesProvider.GetMigInstanceTemplate(mig.GceRef()) + if err != nil { + klog.Warningf("Not evaluating autoscaling options for %q MIG: failed to find corresponding instance template", mig.GceRef(), err) + continue + } + if template.Properties == nil { + klog.Warningf("Failed to extract autoscaling options from %q metadata: instance template is incomplete", template.Name) + continue + } + kubeEnvValue, err := getKubeEnvValueFromTemplateMetadata(template) + if err != nil { + klog.Warningf("Failed to extract autoscaling options from %q instance template's metadata: can't get KubeEnv: %v", template.Name, err) + continue + } + options, err := extractAutoscalingOptionsFromKubeEnv(kubeEnvValue) + if err != nil { + klog.Warningf("Failed to extract autoscaling options from %q instance template's metadata: %v", template.Name, err) + continue + } + if !reflect.DeepEqual(m.cache.GetAutoscalingOptions(mig.GceRef()), options) { + klog.V(4).Infof("Extracted autoscaling options from %q instance template KubeEnv: %v", template.Name, options) + } + m.cache.SetAutoscalingOptions(mig.GceRef(), options) + } +} + // Fetch explicitly configured MIGs. These MIGs should never be unregistered // during refreshes, even if they no longer exist in GCE. func (m *gceManagerImpl) fetchExplicitMigs(specs []string) error { @@ -505,6 +538,30 @@ func (m *gceManagerImpl) findMigsInRegion(region string, name *regexp.Regexp) ([ return links, nil } +// GetMigOptions merges default options with user-provided options as specified in the MIG's instance template metadata +func (m *gceManagerImpl) GetMigOptions(mig Mig, defaults config.NodeGroupAutoscalingOptions) *config.NodeGroupAutoscalingOptions { + migRef := mig.GceRef() + options := m.cache.GetAutoscalingOptions(migRef) + if options == nil { + return &defaults + } + + if opt, ok := getFloat64Option(options, migRef.Name, config.DefaultScaleDownUtilizationThresholdKey); ok { + defaults.ScaleDownUtilizationThreshold = opt + } + if opt, ok := getFloat64Option(options, migRef.Name, config.DefaultScaleDownGpuUtilizationThresholdKey); ok { + defaults.ScaleDownGpuUtilizationThreshold = opt + } + if opt, ok := getDurationOption(options, migRef.Name, config.DefaultScaleDownUnneededTimeKey); ok { + defaults.ScaleDownUnneededTime = opt + } + if opt, ok := getDurationOption(options, migRef.Name, config.DefaultScaleDownUnreadyTimeKey); ok { + defaults.ScaleDownUnreadyTime = opt + } + + return &defaults +} + // GetMigTemplateNode constructs a node from GCE instance template of the given MIG. func (m *gceManagerImpl) GetMigTemplateNode(mig Mig) (*apiv1.Node, error) { template, err := m.migInstanceTemplatesProvider.GetMigInstanceTemplate(mig.GceRef()) diff --git a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go index 1d0c88b1e0a1..cd7a3918c785 100644 --- a/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go +++ b/cluster-autoscaler/cloudprovider/gce/gce_manager_test.go @@ -24,6 +24,7 @@ import ( "time" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/utils/units" . "k8s.io/autoscaler/cluster-autoscaler/utils/test" @@ -336,6 +337,7 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa GceService: gceService, instanceRefToMigRef: make(map[GceRef]GceRef), instancesFromUnknownMigs: make(map[GceRef]struct{}), + autoscalingOptionsCache: map[GceRef]map[string]string{}, machinesCache: map[MachineTypeKey]machinesCacheValue{ {"us-central1-b", "n1-standard-1"}: {&gce.MachineType{GuestCpus: 1, MemoryMb: 1}, nil}, {"us-central1-c", "n1-standard-1"}: {&gce.MachineType{GuestCpus: 1, MemoryMb: 1}, nil}, @@ -1576,3 +1578,69 @@ func TestAppendInstances(t *testing.T) { assert.NoError(t, err) mock.AssertExpectationsForObjects(t, server) } + +func TestGetMigOptions(t *testing.T) { + defaultOptions := &config.NodeGroupAutoscalingOptions{ + ScaleDownUtilizationThreshold: 0.1, + ScaleDownGpuUtilizationThreshold: 0.2, + ScaleDownUnneededTime: time.Second, + ScaleDownUnreadyTime: time.Minute, + } + + cases := []struct { + desc string + opts map[string]string + expected *config.NodeGroupAutoscalingOptions + }{ + { + desc: "return provided defaults on empty metadata", + opts: map[string]string{}, + expected: defaultOptions, + }, + { + desc: "return specified options", + opts: map[string]string{ + config.DefaultScaleDownGpuUtilizationThresholdKey: "0.6", + config.DefaultScaleDownUtilizationThresholdKey: "0.7", + config.DefaultScaleDownUnneededTimeKey: "1h", + config.DefaultScaleDownUnreadyTimeKey: "30m", + }, + expected: &config.NodeGroupAutoscalingOptions{ + ScaleDownGpuUtilizationThreshold: 0.6, + ScaleDownUtilizationThreshold: 0.7, + ScaleDownUnneededTime: time.Hour, + ScaleDownUnreadyTime: 30 * time.Minute, + }, + }, + { + desc: "complete partial options specs with defaults", + opts: map[string]string{ + config.DefaultScaleDownGpuUtilizationThresholdKey: "0.1", + config.DefaultScaleDownUnneededTimeKey: "1m", + }, + expected: &config.NodeGroupAutoscalingOptions{ + ScaleDownGpuUtilizationThreshold: 0.1, + ScaleDownUtilizationThreshold: defaultOptions.ScaleDownUtilizationThreshold, + ScaleDownUnneededTime: time.Minute, + ScaleDownUnreadyTime: defaultOptions.ScaleDownUnreadyTime, + }, + }, + { + desc: "keep defaults on unparsable options values", + opts: map[string]string{ + config.DefaultScaleDownGpuUtilizationThresholdKey: "foo", + config.DefaultScaleDownUnneededTimeKey: "bar", + }, + expected: defaultOptions, + }, + } + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + mgr := newTestGceManager(t, "", false) + mig := setupTestDefaultPool(mgr, true) + mgr.cache.SetAutoscalingOptions(mig.GceRef(), c.opts) + actual := mgr.GetMigOptions(mig, *defaultOptions) + assert.Equal(t, c.expected, actual) + }) + } +} diff --git a/cluster-autoscaler/cloudprovider/gce/templates.go b/cluster-autoscaler/cloudprovider/gce/templates.go index ff82c7dd7449..4732a0990e35 100644 --- a/cluster-autoscaler/cloudprovider/gce/templates.go +++ b/cluster-autoscaler/cloudprovider/gce/templates.go @@ -21,7 +21,9 @@ import ( "math" "math/rand" "regexp" + "strconv" "strings" + "time" gce "google.golang.org/api/compute/v1" apiv1 "k8s.io/api/core/v1" @@ -459,6 +461,51 @@ func extractOperatingSystemDistributionFromKubeEnv(kubeEnv string) OperatingSyst } } +func getFloat64Option(options map[string]string, templateName, name string) (float64, bool) { + raw, ok := options[name] + if !ok { + return 0, false + } + + option, err := strconv.ParseFloat(raw, 64) + if err != nil { + klog.Warningf("failed to convert autoscaling_options option %q (value %q) for MIG %q to float: %v", name, raw, templateName, err) + return 0, false + } + + return option, true +} + +func getDurationOption(options map[string]string, templateName, name string) (time.Duration, bool) { + raw, ok := options[name] + if !ok { + return 0, false + } + + option, err := time.ParseDuration(raw) + if err != nil { + klog.Warningf("failed to convert autoscaling_options option %q (value %q) for MIG %q to duration: %v", name, raw, templateName, err) + return 0, false + } + + return option, true +} + +func extractAutoscalingOptionsFromKubeEnv(kubeEnvValue string) (map[string]string, error) { + optionsAsString, found, err := extractAutoscalerVarFromKubeEnv(kubeEnvValue, "autoscaling_options") + if err != nil { + klog.Warningf("error while obtaining autoscaling_options from AUTOSCALER_ENV_VARS: %v", err) + return nil, err + } + + if !found { + klog.V(5).Info("no autoscaling_options defined in AUTOSCALER_ENV_VARS") + return make(map[string]string), nil + } + + return parseKeyValueListToMap(optionsAsString) +} + func extractEvictionHardFromKubeEnv(kubeEnvValue string) (map[string]string, error) { evictionHardAsString, found, err := extractAutoscalerVarFromKubeEnv(kubeEnvValue, "evictionHard") if err != nil { diff --git a/cluster-autoscaler/cloudprovider/gce/templates_test.go b/cluster-autoscaler/cloudprovider/gce/templates_test.go index 9a27937f47ff..7c5f0e83b1ee 100644 --- a/cluster-autoscaler/cloudprovider/gce/templates_test.go +++ b/cluster-autoscaler/cloudprovider/gce/templates_test.go @@ -23,6 +23,7 @@ import ( "testing" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config" gpuUtils "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/autoscaler/cluster-autoscaler/utils/units" @@ -489,6 +490,64 @@ func TestBuildCapacityMemory(t *testing.T) { } } +func TestExtractAutoscalingOptionsFromKubeEnv(t *testing.T) { + cases := []struct { + desc string + env string + expectedValue map[string]string + expectedErr bool + }{ + { + desc: "autoscaling_options not specified", + env: "AUTOSCALER_ENV_VARS: node_labels=a=b,c=d;node_taints=a=b:c,d=e:f\n", + expectedValue: map[string]string{}, + expectedErr: false, + }, + { + desc: "empty KubeEnv", + env: "", + expectedValue: map[string]string{}, + expectedErr: false, + }, + { + desc: "unparsable KubeEnv", + env: "AUTOSCALER_ENV_VARS", + expectedValue: nil, + expectedErr: true, + }, + { + desc: "partial option set", + env: "AUTOSCALER_ENV_VARS: node_labels=a=b;autoscaling_options=scaledownunreadytime=1h", + expectedValue: map[string]string{ + config.DefaultScaleDownUnreadyTimeKey: "1h", + }, + expectedErr: false, + }, + { + desc: "full option set", + env: "AUTOSCALER_ENV_VARS: node_labels=a,b;autoscaling_options=scaledownutilizationthreshold=0.4,scaledowngpuutilizationthreshold=0.5,scaledownunneededtime=30m,scaledownunreadytime=1h", + expectedValue: map[string]string{ + config.DefaultScaleDownUtilizationThresholdKey: "0.4", + config.DefaultScaleDownGpuUtilizationThresholdKey: "0.5", + config.DefaultScaleDownUnneededTimeKey: "30m", + config.DefaultScaleDownUnreadyTimeKey: "1h", + }, + expectedErr: false, + }, + } + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + value, err := extractAutoscalingOptionsFromKubeEnv(c.env) + assert.Equal(t, c.expectedValue, value) + if c.expectedErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + func TestExtractAutoscalerVarFromKubeEnv(t *testing.T) { cases := []struct { desc string diff --git a/cluster-autoscaler/config/const.go b/cluster-autoscaler/config/const.go index d81bde304ef7..5e4873d7e078 100644 --- a/cluster-autoscaler/config/const.go +++ b/cluster-autoscaler/config/const.go @@ -21,4 +21,13 @@ const ( DefaultMaxClusterCores = 5000 * 64 // DefaultMaxClusterMemory is the default maximum number of gigabytes of memory in cluster. DefaultMaxClusterMemory = 5000 * 64 * 20 + + // DefaultScaleDownUtilizationThresholdKey identifies ScaleDownUtilizationThreshold autoscaling option + DefaultScaleDownUtilizationThresholdKey = "scaledownutilizationthreshold" + // DefaultScaleDownGpuUtilizationThresholdKey identifies ScaleDownGpuUtilizationThreshold autoscaling option + DefaultScaleDownGpuUtilizationThresholdKey = "scaledowngpuutilizationthreshold" + // DefaultScaleDownUnneededTimeKey identifies ScaleDownUnneededTime autoscaling option + DefaultScaleDownUnneededTimeKey = "scaledownunneededtime" + // DefaultScaleDownUnreadyTimeKey identifies ScaleDownUnreadyTime autoscaling option + DefaultScaleDownUnreadyTimeKey = "scaledownunreadytime" )