From c7d2f23436b1551a959cfed5e54b12c936b7f386 Mon Sep 17 00:00:00 2001 From: Maksim Paskal Date: Thu, 28 Jul 2022 13:26:45 +0300 Subject: [PATCH] Cache requests to Hetzner Cloud API Hetzner Cloud has rate limiting to their API (3600 requests per hour), for large kubernetes clusters it makes difficult to correspond this rate limiting in cluster autoscaling. Signed-off-by: Maksim Paskal --- .../hetzner/hetzner_cloud_provider.go | 2 +- .../cloudprovider/hetzner/hetzner_manager.go | 58 ++++--- .../hetzner/hetzner_node_group.go | 23 +-- .../hetzner/hetzner_server_type_cache.go | 139 ++++++++++++++++ .../hetzner/hetzner_server_type_cache_test.go | 62 +++++++ .../hetzner/hetzner_servers_cache.go | 157 ++++++++++++++++++ .../hetzner/hetzner_servers_cache_test.go | 75 +++++++++ 7 files changed, 475 insertions(+), 41 deletions(-) create mode 100644 cluster-autoscaler/cloudprovider/hetzner/hetzner_server_type_cache.go create mode 100644 cluster-autoscaler/cloudprovider/hetzner/hetzner_server_type_cache_test.go create mode 100644 cluster-autoscaler/cloudprovider/hetzner/hetzner_servers_cache.go create mode 100644 cluster-autoscaler/cloudprovider/hetzner/hetzner_servers_cache_test.go diff --git a/cluster-autoscaler/cloudprovider/hetzner/hetzner_cloud_provider.go b/cluster-autoscaler/cloudprovider/hetzner/hetzner_cloud_provider.go index 9f81a85bfaa..7e7f35bf521 100644 --- a/cluster-autoscaler/cloudprovider/hetzner/hetzner_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/hetzner/hetzner_cloud_provider.go @@ -108,7 +108,7 @@ func (d *HetznerCloudProvider) Pricing() (cloudprovider.PricingModel, errors.Aut // GetAvailableMachineTypes get all machine types that can be requested from // the cloud provider. Implementation optional. func (d *HetznerCloudProvider) GetAvailableMachineTypes() ([]string, error) { - serverTypes, err := d.manager.client.ServerType.All(d.manager.apiCallContext) + serverTypes, err := d.manager.cachedServerType.getAllServerTypes() if err != nil { return nil, err } diff --git a/cluster-autoscaler/cloudprovider/hetzner/hetzner_manager.go b/cluster-autoscaler/cloudprovider/hetzner/hetzner_manager.go index f3d223c48c2..92b88dcfb78 100644 --- a/cluster-autoscaler/cloudprovider/hetzner/hetzner_manager.go +++ b/cluster-autoscaler/cloudprovider/hetzner/hetzner_manager.go @@ -38,17 +38,19 @@ var ( // hetznerManager handles Hetzner communication and data caching of // node groups type hetznerManager struct { - client *hcloud.Client - nodeGroups map[string]*hetznerNodeGroup - apiCallContext context.Context - cloudInit string - image *hcloud.Image - sshKey *hcloud.SSHKey - network *hcloud.Network - firewall *hcloud.Firewall - createTimeout time.Duration - publicIPv4 bool - publicIPv6 bool + client *hcloud.Client + nodeGroups map[string]*hetznerNodeGroup + apiCallContext context.Context + cloudInit string + image *hcloud.Image + sshKey *hcloud.SSHKey + network *hcloud.Network + firewall *hcloud.Firewall + createTimeout time.Duration + publicIPv4 bool + publicIPv6 bool + cachedServerType *serverTypeCache + cachedServers *serversCache } func newManager() (*hetznerManager, error) { @@ -152,17 +154,19 @@ func newManager() (*hetznerManager, error) { } m := &hetznerManager{ - client: client, - nodeGroups: make(map[string]*hetznerNodeGroup), - cloudInit: string(cloudInit), - image: image, - sshKey: sshKey, - network: network, - firewall: firewall, - createTimeout: createTimeout, - apiCallContext: ctx, - publicIPv4: publicIPv4, - publicIPv6: publicIPv6, + client: client, + nodeGroups: make(map[string]*hetznerNodeGroup), + cloudInit: string(cloudInit), + image: image, + sshKey: sshKey, + network: network, + firewall: firewall, + createTimeout: createTimeout, + apiCallContext: ctx, + publicIPv4: publicIPv4, + publicIPv6: publicIPv6, + cachedServerType: newServerTypeCache(ctx, client), + cachedServers: newServersCache(ctx, client), } m.nodeGroups[drainingNodePoolId] = &hetznerNodeGroup{ @@ -185,13 +189,7 @@ func (m *hetznerManager) Refresh() error { } func (m *hetznerManager) allServers(nodeGroup string) ([]*hcloud.Server, error) { - listOptions := hcloud.ListOpts{ - PerPage: 50, - LabelSelector: nodeGroupLabel + "=" + nodeGroup, - } - - requestOptions := hcloud.ServerListOpts{ListOpts: listOptions} - servers, err := m.client.Server.AllWithOpts(m.apiCallContext, requestOptions) + servers, err := m.cachedServers.getServersByNodeGroupName(nodeGroup) if err != nil { return nil, fmt.Errorf("failed to get servers for hcloud: %v", err) } @@ -230,7 +228,7 @@ func (m *hetznerManager) serverForNode(node *apiv1.Node) (*hcloud.Server, error) nodeIdOrName = node.Name } - server, _, err := m.client.Server.Get(m.apiCallContext, nodeIdOrName) + server, err := m.cachedServers.getServer(nodeIdOrName) if err != nil { return nil, fmt.Errorf("failed to get servers for node %s error: %v", node.Name, err) } diff --git a/cluster-autoscaler/cloudprovider/hetzner/hetzner_node_group.go b/cluster-autoscaler/cloudprovider/hetzner/hetzner_node_group.go index b7fe9e7691d..cb049f9473f 100644 --- a/cluster-autoscaler/cloudprovider/hetzner/hetzner_node_group.go +++ b/cluster-autoscaler/cloudprovider/hetzner/hetzner_node_group.go @@ -122,6 +122,11 @@ func (n *hetznerNodeGroup) IncreaseSize(delta int) error { n.targetSize = targetSize + // create new servers cache + if _, err := n.manager.cachedServers.servers(); err != nil { + klog.Errorf("failed to get servers: %v", err) + } + return nil } @@ -155,6 +160,11 @@ func (n *hetznerNodeGroup) DeleteNodes(nodes []*apiv1.Node) error { } waitGroup.Wait() + // create new servers cache + if _, err := n.manager.cachedServers.servers(); err != nil { + klog.Errorf("failed to get servers: %v", err) + } + n.resetTargetSize(-len(nodes)) return nil @@ -184,12 +194,7 @@ func (n *hetznerNodeGroup) Debug() string { // required that Instance objects returned by this method have Id field set. // Other fields are optional. func (n *hetznerNodeGroup) Nodes() ([]cloudprovider.Instance, error) { - listOptions := hcloud.ListOpts{ - PerPage: 50, - LabelSelector: nodeGroupLabel + "=" + n.id, - } - requestOptions := hcloud.ServerListOpts{ListOpts: listOptions} - servers, err := n.manager.client.Server.AllWithOpts(n.manager.apiCallContext, requestOptions) + servers, err := n.manager.cachedServers.getServersByNodeGroupName(n.id) if err != nil { return nil, fmt.Errorf("failed to get servers for hcloud: %v", err) } @@ -317,7 +322,7 @@ func buildNodeGroupLabels(n *hetznerNodeGroup) map[string]string { } func getMachineTypeResourceList(m *hetznerManager, instanceType string) (apiv1.ResourceList, error) { - typeInfo, _, err := m.client.ServerType.Get(m.apiCallContext, instanceType) + typeInfo, err := m.cachedServerType.getServerType(instanceType) if err != nil || typeInfo == nil { return nil, fmt.Errorf("failed to get machine type %s info error: %v", instanceType, err) } @@ -332,7 +337,7 @@ func getMachineTypeResourceList(m *hetznerManager, instanceType string) (apiv1.R } func serverTypeAvailable(manager *hetznerManager, instanceType string, region string) (bool, error) { - serverType, _, err := manager.client.ServerType.Get(manager.apiCallContext, instanceType) + serverType, err := manager.cachedServerType.getServerType(instanceType) if err != nil { return false, err } @@ -421,8 +426,6 @@ func waitForServerAction(m *hetznerManager, serverName string, action *hcloud.Ac case <-time.After(m.createTimeout): return fmt.Errorf("timeout waiting for server %s", serverName) } - - return nil } func (n *hetznerNodeGroup) resetTargetSize(expectedDelta int) { diff --git a/cluster-autoscaler/cloudprovider/hetzner/hetzner_server_type_cache.go b/cluster-autoscaler/cloudprovider/hetzner/hetzner_server_type_cache.go new file mode 100644 index 00000000000..28ffc10439d --- /dev/null +++ b/cluster-autoscaler/cloudprovider/hetzner/hetzner_server_type_cache.go @@ -0,0 +1,139 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hetzner + +import ( + "context" + "errors" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/hetzner/hcloud-go/hcloud" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +const ( + serverTypeCacheKey = "hetzner-server-type-cache" + serverTypeCachedTTL = time.Minute * 10 + serverTypeCacheMinTTL = 5 + serverTypeCacheMaxTTL = 60 +) + +type serverTypeCache struct { + cache.Store + mngJitterClock clock.Clock + hcloudClient *hcloud.Client + hcloudClientContext context.Context +} + +type serverTypeClock struct { + clock.Clock + + jitter bool + sync.RWMutex +} + +func (c *serverTypeClock) Since(ts time.Time) time.Duration { + since := time.Since(ts) + c.RLock() + defer c.RUnlock() + if c.jitter { + return since + (time.Second * time.Duration(rand.IntnRange(serverTypeCacheMinTTL, serverTypeCacheMaxTTL))) + } + return since +} + +type serverTypeCachedObject struct { + name string + serverTypes []*hcloud.ServerType +} + +func newServerTypeCache(ctx context.Context, hcloudClient *hcloud.Client) *serverTypeCache { + jc := &serverTypeClock{} + return newServerTypeCacheWithClock( + ctx, + hcloudClient, + jc, + cache.NewExpirationStore(func(obj interface{}) (s string, e error) { + return obj.(serverTypeCachedObject).name, nil + }, &cache.TTLPolicy{ + TTL: serverTypeCachedTTL, + Clock: jc, + }), + ) +} + +func newServerTypeCacheWithClock(ctx context.Context, hcloudClient *hcloud.Client, jc clock.Clock, store cache.Store) *serverTypeCache { + return &serverTypeCache{ + store, + jc, + hcloudClient, + ctx, + } +} + +func (m *serverTypeCache) serverTypes() ([]*hcloud.ServerType, error) { + klog.Warning("Fetching server types from Hetzner API") + + serverTypes, err := m.hcloudClient.ServerType.All(m.hcloudClientContext) + if err != nil { + return nil, err + } + + cacheObject := serverTypeCachedObject{ + name: serverTypeCacheKey, + serverTypes: serverTypes, + } + + if err := m.Add(cacheObject); err != nil { + return nil, err + } + + return serverTypes, nil +} + +func (m *serverTypeCache) getAllServerTypes() ([]*hcloud.ServerType, error) { + // List expires old entries + cacheList := m.List() + klog.V(5).Infof("Current serverTypeCache len: %d\n", len(cacheList)) + + if obj, found, err := m.GetByKey(serverTypeCacheKey); err == nil && found { + foundServerTypes := obj.(serverTypeCachedObject) + + return foundServerTypes.serverTypes, nil + } + + return m.serverTypes() +} + +func (m *serverTypeCache) getServerType(name string) (*hcloud.ServerType, error) { + serverTypes, err := m.getAllServerTypes() + if err != nil { + return nil, err + } + + for _, serverType := range serverTypes { + if serverType.Name == name { + return serverType, nil + } + } + + return nil, errors.New("server type not found") +} diff --git a/cluster-autoscaler/cloudprovider/hetzner/hetzner_server_type_cache_test.go b/cluster-autoscaler/cloudprovider/hetzner/hetzner_server_type_cache_test.go new file mode 100644 index 00000000000..f978659f468 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/hetzner/hetzner_server_type_cache_test.go @@ -0,0 +1,62 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hetzner + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/hetzner/hcloud-go/hcloud" +) + +func TestServerTypeCache(t *testing.T) { + c := newServerTypeCache(context.Background(), nil) + + serverTypes := []*hcloud.ServerType{ + { + Name: "test1", + }, + { + Name: "test2", + }, + } + + cacheObject := serverTypeCachedObject{ + name: serverTypeCacheKey, + serverTypes: serverTypes, + } + + err := c.Add(cacheObject) + + require.NoError(t, err) + obj, ok, err := c.GetByKey(serverTypeCacheKey) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, serverTypeCacheKey, obj.(serverTypeCachedObject).name) + foundServerTypes := obj.(serverTypeCachedObject).serverTypes + assert.Equal(t, 2, len(foundServerTypes)) + assert.Equal(t, "test1", foundServerTypes[0].Name) + + foundServerType, err := c.getServerType("test2") + require.NoError(t, err) + assert.Equal(t, "test2", foundServerType.Name) + + _, err = c.getServerType("test3") + require.Error(t, err) +} diff --git a/cluster-autoscaler/cloudprovider/hetzner/hetzner_servers_cache.go b/cluster-autoscaler/cloudprovider/hetzner/hetzner_servers_cache.go new file mode 100644 index 00000000000..34172bcbe6e --- /dev/null +++ b/cluster-autoscaler/cloudprovider/hetzner/hetzner_servers_cache.go @@ -0,0 +1,157 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hetzner + +import ( + "context" + "errors" + "strconv" + "sync" + "time" + + "k8s.io/apimachinery/pkg/util/rand" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/hetzner/hcloud-go/hcloud" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + "k8s.io/utils/clock" +) + +const ( + serversCacheKey = "hetzner-servers-cache" + serversCachedTTL = time.Minute * 1 + serversCacheMinTTL = 5 + serversCacheMaxTTL = 60 +) + +type serversCache struct { + cache.Store + mngJitterClock clock.Clock + hcloudClient *hcloud.Client + hcloudClientContext context.Context +} + +type serversClock struct { + clock.Clock + + jitter bool + sync.RWMutex +} + +func (c *serversClock) Since(ts time.Time) time.Duration { + since := time.Since(ts) + c.RLock() + defer c.RUnlock() + if c.jitter { + return since + (time.Second * time.Duration(rand.IntnRange(serversCacheMinTTL, serversCacheMaxTTL))) + } + return since +} + +type serversCachedObject struct { + name string + servers []*hcloud.Server +} + +func newServersCache(ctx context.Context, hcloudClient *hcloud.Client) *serversCache { + jc := &serversClock{} + return newServersCacheWithClock( + ctx, + hcloudClient, + jc, + cache.NewExpirationStore(func(obj interface{}) (s string, e error) { + return obj.(serversCachedObject).name, nil + }, &cache.TTLPolicy{ + TTL: serversCachedTTL, + Clock: jc, + }), + ) +} + +func newServersCacheWithClock(ctx context.Context, hcloudClient *hcloud.Client, jc clock.Clock, store cache.Store) *serversCache { + return &serversCache{ + store, + jc, + hcloudClient, + ctx, + } +} + +func (m *serversCache) servers() ([]*hcloud.Server, error) { + klog.Warning("Fetching servers from Hetzner API") + + servers, err := m.hcloudClient.Server.All(m.hcloudClientContext) + if err != nil { + return nil, err + } + + cacheObject := serversCachedObject{ + name: serversCacheKey, + servers: servers, + } + + if err := m.Add(cacheObject); err != nil { + return nil, err + } + + return servers, nil +} + +func (m *serversCache) getAllServers() ([]*hcloud.Server, error) { + // List expires old entries + cacheList := m.List() + klog.V(5).Infof("Current serversCache len: %d\n", len(cacheList)) + + if obj, found, err := m.GetByKey(serversCacheKey); err == nil && found { + foundServers := obj.(serversCachedObject) + + return foundServers.servers, nil + } + + return m.servers() +} + +func (m *serversCache) getServer(nodeIdOrName string) (*hcloud.Server, error) { + servers, err := m.getAllServers() + if err != nil { + return nil, err + } + + for _, server := range servers { + if server.Name == nodeIdOrName || strconv.Itoa(server.ID) == nodeIdOrName { + return server, nil + } + } + + return nil, errors.New("server not found") +} + +func (m *serversCache) getServersByNodeGroupName(nodeGroup string) ([]*hcloud.Server, error) { + servers, err := m.getAllServers() + if err != nil { + return nil, err + } + + foundServers := make([]*hcloud.Server, 0) + + for _, server := range servers { + if server.Labels[nodeGroupLabel] == nodeGroup { + foundServers = append(foundServers, server) + } + } + + return foundServers, nil +} diff --git a/cluster-autoscaler/cloudprovider/hetzner/hetzner_servers_cache_test.go b/cluster-autoscaler/cloudprovider/hetzner/hetzner_servers_cache_test.go new file mode 100644 index 00000000000..629365dd45c --- /dev/null +++ b/cluster-autoscaler/cloudprovider/hetzner/hetzner_servers_cache_test.go @@ -0,0 +1,75 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package hetzner + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/hetzner/hcloud-go/hcloud" +) + +func TestServersCache(t *testing.T) { + c := newServersCache(context.Background(), nil) + + // add initial cache entry, to test that it will be replaced + serversOld := []*hcloud.Server{ + { + Name: "test-old", + }, + } + + err := c.Add(serversCachedObject{ + name: serversCacheKey, + servers: serversOld, + }) + require.NoError(t, err) + + servers := []*hcloud.Server{ + { + Name: "test1", + }, + { + Name: "test2", + }, + } + + cacheObject := serversCachedObject{ + name: serversCacheKey, + servers: servers, + } + + err = c.Add(cacheObject) + + require.NoError(t, err) + obj, ok, err := c.GetByKey(serversCacheKey) + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, serversCacheKey, obj.(serversCachedObject).name) + foundserverss := obj.(serversCachedObject).servers + assert.Equal(t, 2, len(foundserverss)) + assert.Equal(t, "test1", foundserverss[0].Name) + + foundservers, err := c.getServer("test2") + require.NoError(t, err) + assert.Equal(t, "test2", foundservers.Name) + + _, err = c.getServer("test3") + require.Error(t, err) +}