Skip to content

Commit

Permalink
implement GetOptions for GCE
Browse files Browse the repository at this point in the history
Support per-MIG (scaledown) settings as permited by the
cloudprovider's interface `GetOptions()` method.
  • Loading branch information
bpineau committed Aug 19, 2021
1 parent c563a40 commit a5183ad
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 1 deletion.
16 changes: 16 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type GceCache struct {
instanceRefToMigRef map[GceRef]GceRef
instancesFromUnknownMigs map[GceRef]struct{}
resourceLimiter *cloudprovider.ResourceLimiter
autoscalingOptionsCache map[GceRef]map[string]string
machinesCache map[MachineTypeKey]machinesCacheValue
migTargetSizeCache map[GceRef]int64
migBaseNameCache map[GceRef]string
Expand All @@ -85,6 +86,7 @@ func NewGceCache(gceService AutoscalingGceClient, concurrentGceRefreshes int) *G
migs: map[GceRef]Mig{},
instanceRefToMigRef: map[GceRef]GceRef{},
instancesFromUnknownMigs: map[GceRef]struct{}{},
autoscalingOptionsCache: map[GceRef]map[string]string{},
machinesCache: map[MachineTypeKey]machinesCacheValue{},
migTargetSizeCache: map[GceRef]int64{},
migBaseNameCache: map[GceRef]string{},
Expand Down Expand Up @@ -290,6 +292,20 @@ func (gc *GceCache) RegenerateInstancesCache() error {
return nil
}

// SetAutoscalingOptions stores autoscaling options strings obtained from IT.
func (gc *GceCache) SetAutoscalingOptions(ref GceRef, options map[string]string) {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()
gc.autoscalingOptionsCache[ref] = options
}

// GetAutoscalingOptions return autoscaling options strings obtained from IT.
func (gc *GceCache) GetAutoscalingOptions(ref GceRef) map[string]string {
gc.cacheMutex.Lock()
defer gc.cacheMutex.Unlock()
return gc.autoscalingOptionsCache[ref]
}

// SetResourceLimiter sets resource limiter.
func (gc *GceCache) SetResourceLimiter(resourceLimiter *cloudprovider.ResourceLimiter) {
gc.cacheMutex.Lock()
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ func (mig *gceMig) Autoprovisioned() bool {
// GetOptions returns NodeGroupAutoscalingOptions that should be used for this particular
// NodeGroup. Returning a nil will result in using default options.
func (mig *gceMig) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) {
return nil, cloudprovider.ErrNotImplemented
return mig.gceManager.GetMigOptions(mig, defaults), nil
}

// TemplateNodeInfo returns a node template for this node group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
. "k8s.io/autoscaler/cluster-autoscaler/utils/test"

apiv1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -87,6 +88,11 @@ func (m *gceManagerMock) findMigsNamed(name *regexp.Regexp) ([]string, error) {
return args.Get(0).([]string), args.Error(1)
}

func (m *gceManagerMock) GetMigOptions(mig Mig, defaults config.NodeGroupAutoscalingOptions) *config.NodeGroupAutoscalingOptions {
args := m.Called(mig, defaults)
return args.Get(0).(*config.NodeGroupAutoscalingOptions)
}

func (m *gceManagerMock) GetMigTemplateNode(mig Mig) (*apiv1.Node, error) {
args := m.Called(mig)
return args.Get(0).(*apiv1.Node), args.Error(1)
Expand Down
57 changes: 57 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"os"
"reflect"
"regexp"
"strconv"
"strings"
Expand All @@ -30,6 +31,7 @@ import (

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"
"k8s.io/client-go/util/workqueue"
Expand Down Expand Up @@ -90,6 +92,8 @@ type GceManager interface {
GetResourceLimiter() (*cloudprovider.ResourceLimiter, error)
// GetMigSize gets MIG size.
GetMigSize(mig Mig) (int64, error)
// GetMigOptions returns MIG's NodeGroupAutoscalingOptions
GetMigOptions(mig Mig, defaults config.NodeGroupAutoscalingOptions) *config.NodeGroupAutoscalingOptions

// SetMigSize sets MIG size.
SetMigSize(mig Mig, size int64) error
Expand Down Expand Up @@ -321,11 +325,40 @@ func (m *gceManagerImpl) forceRefresh() error {
klog.Errorf("Failed to fetch MIGs: %v", err)
return err
}
m.refreshAutoscalingOptions()
m.lastRefresh = time.Now()
klog.V(2).Infof("Refreshed GCE resources, next refresh after %v", m.lastRefresh.Add(refreshInterval))
return nil
}

func (m *gceManagerImpl) refreshAutoscalingOptions() {
for _, mig := range m.cache.GetMigs() {
template, err := m.migInstanceTemplatesProvider.GetMigInstanceTemplate(mig.GceRef())
if err != nil {
klog.Warningf("Not evaluating autoscaling options for %q MIG: failed to find corresponding instance template", mig.GceRef(), err)
continue
}
if template.Properties == nil {
klog.Warningf("Failed to extract autoscaling options from %q metadata: instance template is incomplete", template.Name)
continue
}
kubeEnvValue, err := getKubeEnvValueFromTemplateMetadata(template)
if err != nil {
klog.Warningf("Failed to extract KubeEnv from %q instance template's metadata: %v", template.Name, err)
continue
}
options, err := extractAutoscalingOptionsFromKubeEnv(kubeEnvValue)
if err != nil {
klog.Warningf("Failed to extract autoscaling options from %q instance template's metadata: %v", template.Name, err)
continue
}
if !reflect.DeepEqual(m.cache.GetAutoscalingOptions(mig.GceRef()), options) {
klog.V(4).Infof("Extracted autoscaling options from %q instance template KubeEnv: %v", template.Name, options)
}
m.cache.SetAutoscalingOptions(mig.GceRef(), options)
}
}

// Fetch explicitly configured MIGs. These MIGs should never be unregistered
// during refreshes, even if they no longer exist in GCE.
func (m *gceManagerImpl) fetchExplicitMigs(specs []string) error {
Expand Down Expand Up @@ -522,6 +555,30 @@ func (m *gceManagerImpl) findMigsInRegion(region string, name *regexp.Regexp) ([
return links, nil
}

// GetMigOptions merges default options with user-provided options as specified in the MIG's instance template metadata
func (m *gceManagerImpl) GetMigOptions(mig Mig, defaults config.NodeGroupAutoscalingOptions) *config.NodeGroupAutoscalingOptions {
migRef := mig.GceRef()
options := m.cache.GetAutoscalingOptions(migRef)
if options == nil {
return &defaults
}

if opt, ok := getFloat64Option(options, migRef.Name, config.DefaultScaleDownUtilizationThresholdKey); ok {
defaults.ScaleDownUtilizationThreshold = opt
}
if opt, ok := getFloat64Option(options, migRef.Name, config.DefaultScaleDownGpuUtilizationThresholdKey); ok {
defaults.ScaleDownGpuUtilizationThreshold = opt
}
if opt, ok := getDurationOption(options, migRef.Name, config.DefaultScaleDownUnneededTimeKey); ok {
defaults.ScaleDownUnneededTime = opt
}
if opt, ok := getDurationOption(options, migRef.Name, config.DefaultScaleDownUnreadyTimeKey); ok {
defaults.ScaleDownUnreadyTime = opt
}

return &defaults
}

// GetMigTemplateNode constructs a node from GCE instance template of the given MIG.
func (m *gceManagerImpl) GetMigTemplateNode(mig Mig) (*apiv1.Node, error) {
template, err := m.migInstanceTemplatesProvider.GetMigInstanceTemplate(mig.GceRef())
Expand Down
68 changes: 68 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/gce_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"

. "k8s.io/autoscaler/cluster-autoscaler/utils/test"
Expand Down Expand Up @@ -336,6 +337,7 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa
GceService: gceService,
instanceRefToMigRef: make(map[GceRef]GceRef),
instancesFromUnknownMigs: make(map[GceRef]struct{}),
autoscalingOptionsCache: map[GceRef]map[string]string{},
machinesCache: map[MachineTypeKey]machinesCacheValue{
{"us-central1-b", "n1-standard-1"}: {&gce.MachineType{GuestCpus: 1, MemoryMb: 1}, nil},
{"us-central1-c", "n1-standard-1"}: {&gce.MachineType{GuestCpus: 1, MemoryMb: 1}, nil},
Expand Down Expand Up @@ -1576,3 +1578,69 @@ func TestAppendInstances(t *testing.T) {
assert.NoError(t, err)
mock.AssertExpectationsForObjects(t, server)
}

func TestGetMigOptions(t *testing.T) {
defaultOptions := &config.NodeGroupAutoscalingOptions{
ScaleDownUtilizationThreshold: 0.1,
ScaleDownGpuUtilizationThreshold: 0.2,
ScaleDownUnneededTime: time.Second,
ScaleDownUnreadyTime: time.Minute,
}

cases := []struct {
desc string
opts map[string]string
expected *config.NodeGroupAutoscalingOptions
}{
{
desc: "return provided defaults on empty metadata",
opts: map[string]string{},
expected: defaultOptions,
},
{
desc: "return specified options",
opts: map[string]string{
config.DefaultScaleDownGpuUtilizationThresholdKey: "0.6",
config.DefaultScaleDownUtilizationThresholdKey: "0.7",
config.DefaultScaleDownUnneededTimeKey: "1h",
config.DefaultScaleDownUnreadyTimeKey: "30m",
},
expected: &config.NodeGroupAutoscalingOptions{
ScaleDownGpuUtilizationThreshold: 0.6,
ScaleDownUtilizationThreshold: 0.7,
ScaleDownUnneededTime: time.Hour,
ScaleDownUnreadyTime: 30 * time.Minute,
},
},
{
desc: "complete partial options specs with defaults",
opts: map[string]string{
config.DefaultScaleDownGpuUtilizationThresholdKey: "0.1",
config.DefaultScaleDownUnneededTimeKey: "1m",
},
expected: &config.NodeGroupAutoscalingOptions{
ScaleDownGpuUtilizationThreshold: 0.1,
ScaleDownUtilizationThreshold: defaultOptions.ScaleDownUtilizationThreshold,
ScaleDownUnneededTime: time.Minute,
ScaleDownUnreadyTime: defaultOptions.ScaleDownUnreadyTime,
},
},
{
desc: "keep defaults on unparsable options values",
opts: map[string]string{
config.DefaultScaleDownGpuUtilizationThresholdKey: "foo",
config.DefaultScaleDownUnneededTimeKey: "bar",
},
expected: defaultOptions,
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
mgr := newTestGceManager(t, "", false)
mig := setupTestDefaultPool(mgr, true)
mgr.cache.SetAutoscalingOptions(mig.GceRef(), c.opts)
actual := mgr.GetMigOptions(mig, *defaultOptions)
assert.Equal(t, c.expected, actual)
})
}
}
47 changes: 47 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/templates.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"math"
"math/rand"
"regexp"
"strconv"
"strings"
"time"

"github.com/ghodss/yaml"
gce "google.golang.org/api/compute/v1"
Expand Down Expand Up @@ -452,6 +454,51 @@ func extractOperatingSystemDistributionFromKubeEnv(kubeEnv string) OperatingSyst
}
}

func getFloat64Option(options map[string]string, templateName, name string) (float64, bool) {
raw, ok := options[name]
if !ok {
return 0, false
}

option, err := strconv.ParseFloat(raw, 64)
if err != nil {
klog.Warningf("failed to convert autoscaling_options option %q (value %q) for MIG %q to float: %v", name, raw, templateName, err)
return 0, false
}

return option, true
}

func getDurationOption(options map[string]string, templateName, name string) (time.Duration, bool) {
raw, ok := options[name]
if !ok {
return 0, false
}

option, err := time.ParseDuration(raw)
if err != nil {
klog.Warningf("failed to convert autoscaling_options option %q (value %q) for MIG %q to duration: %v", name, raw, templateName, err)
return 0, false
}

return option, true
}

func extractAutoscalingOptionsFromKubeEnv(kubeEnvValue string) (map[string]string, error) {
optionsAsString, found, err := extractAutoscalerVarFromKubeEnv(kubeEnvValue, "autoscaling_options")
if err != nil {
klog.Warningf("error while obtaining autoscaling_options from AUTOSCALER_ENV_VARS: %v", err)
return nil, err
}

if !found {
klog.V(5).Info("no autoscaling_options defined in AUTOSCALER_ENV_VARS")
return make(map[string]string), nil
}

return parseKeyValueListToMap(optionsAsString)
}

func extractEvictionHardFromKubeEnv(kubeEnvValue string) (map[string]string, error) {
evictionHardAsString, found, err := extractAutoscalerVarFromKubeEnv(kubeEnvValue, "evictionHard")
if err != nil {
Expand Down
59 changes: 59 additions & 0 deletions cluster-autoscaler/cloudprovider/gce/templates_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"testing"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
gpuUtils "k8s.io/autoscaler/cluster-autoscaler/utils/gpu"
"k8s.io/autoscaler/cluster-autoscaler/utils/units"

Expand Down Expand Up @@ -482,6 +483,64 @@ func TestBuildCapacityMemory(t *testing.T) {
}
}

func TestExtractAutoscalingOptionsFromKubeEnv(t *testing.T) {
cases := []struct {
desc string
env string
expectedValue map[string]string
expectedErr bool
}{
{
desc: "autoscaling_options not specified",
env: "AUTOSCALER_ENV_VARS: node_labels=a=b,c=d;node_taints=a=b:c,d=e:f\n",
expectedValue: map[string]string{},
expectedErr: false,
},
{
desc: "empty KubeEnv",
env: "",
expectedValue: map[string]string{},
expectedErr: false,
},
{
desc: "unparsable KubeEnv",
env: "AUTOSCALER_ENV_VARS",
expectedValue: nil,
expectedErr: true,
},
{
desc: "partial option set",
env: "AUTOSCALER_ENV_VARS: node_labels=a=b;autoscaling_options=scaledownunreadytime=1h",
expectedValue: map[string]string{
config.DefaultScaleDownUnreadyTimeKey: "1h",
},
expectedErr: false,
},
{
desc: "full option set",
env: "AUTOSCALER_ENV_VARS: node_labels=a,b;autoscaling_options=scaledownutilizationthreshold=0.4,scaledowngpuutilizationthreshold=0.5,scaledownunneededtime=30m,scaledownunreadytime=1h",
expectedValue: map[string]string{
config.DefaultScaleDownUtilizationThresholdKey: "0.4",
config.DefaultScaleDownGpuUtilizationThresholdKey: "0.5",
config.DefaultScaleDownUnneededTimeKey: "30m",
config.DefaultScaleDownUnreadyTimeKey: "1h",
},
expectedErr: false,
},
}
for _, c := range cases {
t.Run(c.desc, func(t *testing.T) {
value, err := extractAutoscalingOptionsFromKubeEnv(c.env)
assert.Equal(t, c.expectedValue, value)
if c.expectedErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func TestExtractAutoscalerVarFromKubeEnv(t *testing.T) {
cases := []struct {
desc string
Expand Down
9 changes: 9 additions & 0 deletions cluster-autoscaler/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,13 @@ const (
DefaultMaxClusterCores = 5000 * 64
// DefaultMaxClusterMemory is the default maximum number of gigabytes of memory in cluster.
DefaultMaxClusterMemory = 5000 * 64 * 20

// DefaultScaleDownUtilizationThresholdKey identifies ScaleDownUtilizationThreshold autoscaling option
DefaultScaleDownUtilizationThresholdKey = "scaledownutilizationthreshold"
// DefaultScaleDownGpuUtilizationThresholdKey identifies ScaleDownGpuUtilizationThreshold autoscaling option
DefaultScaleDownGpuUtilizationThresholdKey = "scaledowngpuutilizationthreshold"
// DefaultScaleDownUnneededTimeKey identifies ScaleDownUnneededTime autoscaling option
DefaultScaleDownUnneededTimeKey = "scaledownunneededtime"
// DefaultScaleDownUnreadyTimeKey identifies ScaleDownUnreadyTime autoscaling option
DefaultScaleDownUnreadyTimeKey = "scaledownunreadytime"
)

0 comments on commit a5183ad

Please sign in to comment.