From e71a12317d01c6e7a93c7775dd5da5005b3bbbdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=92=B1=E7=A3=8A?= Date: Sat, 2 Dec 2023 22:11:19 +0800 Subject: [PATCH] fix(kwok): fix panic when scale down node group --- .../cloudprovider/builder/builder_kwok.go | 6 +- .../cloudprovider/kwok/kwok_helpers.go | 26 ++-- .../cloudprovider/kwok/kwok_nodegroups.go | 4 +- .../cloudprovider/kwok/kwok_provider.go | 43 ++--- .../cloudprovider/kwok/kwok_provider_test.go | 147 +++++++++++++++++- .../cloudprovider/kwok/kwok_types.go | 7 +- 6 files changed, 192 insertions(+), 41 deletions(-) diff --git a/cluster-autoscaler/cloudprovider/builder/builder_kwok.go b/cluster-autoscaler/cloudprovider/builder/builder_kwok.go index b79f7973b18d..a254ac4246bc 100644 --- a/cluster-autoscaler/cloudprovider/builder/builder_kwok.go +++ b/cluster-autoscaler/cloudprovider/builder/builder_kwok.go @@ -23,6 +23,8 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/kwok" "k8s.io/autoscaler/cluster-autoscaler/config" + + "k8s.io/client-go/informers" ) // AvailableCloudProviders supported by the cloud provider builder. @@ -33,10 +35,10 @@ var AvailableCloudProviders = []string{ // DefaultCloudProvider for Kwok-only build is Kwok. const DefaultCloudProvider = cloudprovider.KwokProviderName -func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter) cloudprovider.CloudProvider { +func buildCloudProvider(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscoveryOptions, rl *cloudprovider.ResourceLimiter, informerFactory informers.SharedInformerFactory) cloudprovider.CloudProvider { switch opts.CloudProviderName { case cloudprovider.KwokProviderName: - return kwok.BuildKwokCloudProvider(opts, do, rl)(opts, do, rl) + return kwok.BuildKwok(opts, do, rl, informerFactory) } return nil diff --git a/cluster-autoscaler/cloudprovider/kwok/kwok_helpers.go b/cluster-autoscaler/cloudprovider/kwok/kwok_helpers.go index f00400711e35..6c8ece4cffc6 100644 --- a/cluster-autoscaler/cloudprovider/kwok/kwok_helpers.go +++ b/cluster-autoscaler/cloudprovider/kwok/kwok_helpers.go @@ -25,7 +25,8 @@ import ( "log" "strconv" "strings" - "time" + + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,11 +34,10 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/yaml" - kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/client-go/kubernetes" clientscheme "k8s.io/client-go/kubernetes/scheme" v1lister "k8s.io/client-go/listers/core/v1" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) const ( @@ -155,13 +155,19 @@ func createNodegroups(nodes []*apiv1.Node, kubeClient kubernetes.Interface, kc * } ngName := getNGName(nodes[i], kc) + if ngName == "" { + klog.Fatalf("%s '%s' for node '%s' not present in the manifest", + kc.status.groupNodesBy, kc.status.key, + nodes[i].GetName()) + } + if ngs[ngName] != nil { ngs[ngName].targetSize += 1 continue } ng := parseAnnotations(nodes[i], kc) - ng.name = getNGName(nodes[i], kc) + ng.name = ngName sanitizeNode(nodes[i]) prepareNode(nodes[i], ng.name) ng.nodeTemplate = nodes[i] @@ -250,6 +256,8 @@ func parseAnnotations(no *apiv1.Node, kc *KwokProviderConfig) *NodeGroup { } } +// getNGName returns the node group name of the given k8s node object. +// Return empty string if no node group is found. func getNGName(no *apiv1.Node, kc *KwokProviderConfig) string { if no.GetAnnotations()[NGNameAnnotation] != "" { @@ -263,16 +271,8 @@ func getNGName(no *apiv1.Node, kc *KwokProviderConfig) string { case "label": ngName = no.GetLabels()[kc.status.key] default: - klog.Fatal("grouping criteria for nodes is not set (expected: 'annotation' or 'label')") - } - - if ngName == "" { - klog.Fatalf("%s '%s' for node '%s' not present in the manifest", - kc.status.groupNodesBy, kc.status.key, - no.GetName()) + klog.Warning("grouping criteria for nodes is not set (expected: 'annotation' or 'label')") } - ngName = fmt.Sprintf("%s-%v", ngName, time.Now().Unix()) - return ngName } diff --git a/cluster-autoscaler/cloudprovider/kwok/kwok_nodegroups.go b/cluster-autoscaler/cloudprovider/kwok/kwok_nodegroups.go index 32db81581cfd..8f2f64a9f03f 100644 --- a/cluster-autoscaler/cloudprovider/kwok/kwok_nodegroups.go +++ b/cluster-autoscaler/cloudprovider/kwok/kwok_nodegroups.go @@ -81,10 +81,9 @@ func (nodeGroup *NodeGroup) IncreaseSize(delta int) error { if err != nil { return fmt.Errorf("couldn't create new node '%s': %v", node.Name, err) } + nodeGroup.targetSize += 1 } - nodeGroup.targetSize = newSize - return nil } @@ -111,6 +110,7 @@ func (nodeGroup *NodeGroup) DeleteNodes(nodes []*apiv1.Node) error { if err != nil { return err } + nodeGroup.targetSize -= 1 } return nil } diff --git a/cluster-autoscaler/cloudprovider/kwok/kwok_provider.go b/cluster-autoscaler/cloudprovider/kwok/kwok_provider.go index 76be214f1314..a1691171bf0e 100644 --- a/cluster-autoscaler/cloudprovider/kwok/kwok_provider.go +++ b/cluster-autoscaler/cloudprovider/kwok/kwok_provider.go @@ -22,19 +22,21 @@ import ( "os" "strings" - apiv1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" - v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/config" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + + apiv1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/client-go/informers" kubeclient "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" ) // Name returns name of the cloud provider. @@ -123,24 +125,28 @@ func (kwok *KwokCloudProvider) GetNodeGpuConfig(node *apiv1.Node) *cloudprovider // Refresh is called before every main loop and can be used to dynamically update cloud provider state. // In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh(). -// TODO(vadasambar): implement this func (kwok *KwokCloudProvider) Refresh() error { - // TODO(vadasambar): causes CA to not recognize kwok nodegroups - // needs better implementation - // nodeList, err := kwok.lister.List() - // if err != nil { - // return err - // } + allNodes, err := kwok.allNodesLister.List(labels.Everything()) + if err != nil { + klog.ErrorS(err, "failed to list all nodes from lister") + return err + } + + targetSizeInCluster := make(map[string]int) - // ngs := []*NodeGroup{} - // for _, no := range nodeList { - // ng := parseAnnotationsToNodegroup(no) - // ng.kubeClient = kwok.kubeClient - // ngs = append(ngs, ng) - // } + for _, node := range allNodes { + ngName := getNGName(node, kwok.config) + if ngName == "" { + continue + } - // kwok.nodeGroups = ngs + targetSizeInCluster[ngName] += 1 + } + + for _, ng := range kwok.nodeGroups { + ng.targetSize = targetSizeInCluster[ng.Id()] + } return nil } @@ -245,6 +251,7 @@ func BuildKwokProvider(ko *kwokOptions) (*KwokCloudProvider, error) { kubeClient: ko.kubeClient, resourceLimiter: ko.resourceLimiter, config: kwokConfig, + allNodesLister: ko.allNodesLister, }, nil } diff --git a/cluster-autoscaler/cloudprovider/kwok/kwok_provider_test.go b/cluster-autoscaler/cloudprovider/kwok/kwok_provider_test.go index 50e46a9d88a9..784121754bc0 100644 --- a/cluster-autoscaler/cloudprovider/kwok/kwok_provider_test.go +++ b/cluster-autoscaler/cloudprovider/kwok/kwok_provider_test.go @@ -20,6 +20,11 @@ import ( "os" "testing" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" + "k8s.io/autoscaler/cluster-autoscaler/config" + "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" + kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "github.com/stretchr/testify/assert" apiv1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -27,10 +32,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - "k8s.io/autoscaler/cluster-autoscaler/config" - "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" - kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" "k8s.io/client-go/kubernetes/fake" v1lister "k8s.io/client-go/listers/core/v1" core "k8s.io/client-go/testing" @@ -156,6 +157,110 @@ func TestNodeGroups(t *testing.T) { }) } +func TestRefresh(t *testing.T) { + fakeClient := &fake.Clientset{} + var nodesFrom string + fakeClient.Fake.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { + getAction := action.(core.GetAction) + + if getAction == nil { + return false, nil, nil + } + + if getAction.GetName() == defaultConfigName { + if nodesFrom == "configmap" { + return true, &apiv1.ConfigMap{ + Data: map[string]string{ + configKey: testConfig, + }, + }, nil + } + + return true, &apiv1.ConfigMap{ + Data: map[string]string{ + configKey: testConfigDynamicTemplates, + }, + }, nil + + } + + if getAction.GetName() == defaultTemplatesConfigName { + if nodesFrom == "configmap" { + return true, &apiv1.ConfigMap{ + Data: map[string]string{ + templatesKey: testTemplates, + }, + }, nil + } + } + + return true, nil, errors.NewNotFound(apiv1.Resource("configmaps"), "whatever") + }) + + os.Setenv("POD_NAMESPACE", "kube-system") + + t.Run("refresh nodegroup target size", func(t *testing.T) { + nodesFrom = "configmap" + ngName := "kind-worker" + fakeNodeLister := newTestAllNodeLister(map[string]*apiv1.Node{ + "node1": { + ObjectMeta: metav1.ObjectMeta{ + Name: "node1", + Labels: map[string]string{ + "kwok-nodegroup": ngName, + }, + }, + }, + "node2": { + ObjectMeta: metav1.ObjectMeta{ + Name: "node2", + Labels: map[string]string{ + "kwok-nodegroup": ngName, + }, + }, + }, + "node3": { + ObjectMeta: metav1.ObjectMeta{ + Name: "node3", + Labels: map[string]string{ + "kwok-nodegroup": ngName, + }, + }, + }, + "node4": { + ObjectMeta: metav1.ObjectMeta{ + Name: "node4", + }, + }, + }) + + ko := &kwokOptions{ + kubeClient: fakeClient, + autoscalingOpts: &config.AutoscalingOptions{}, + discoveryOpts: &cloudprovider.NodeGroupDiscoveryOptions{}, + resourceLimiter: cloudprovider.NewResourceLimiter( + map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, + map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}), + allNodesLister: fakeNodeLister, + ngNodeListerFn: testNodeLister, + } + + p, err := BuildKwokProvider(ko) + assert.NoError(t, err) + assert.NotNil(t, p) + + err = p.Refresh() + assert.Nil(t, err) + for _, ng := range p.NodeGroups() { + if ng.Id() == ngName { + targetSize, err := ng.TargetSize() + assert.Nil(t, err) + assert.Equal(t, 3, targetSize) + } + } + }) +} + func TestGetResourceLimiter(t *testing.T) { fakeClient := &fake.Clientset{} fakeClient.Fake.AddReactor("get", "configmaps", func(action core.Action) (bool, runtime.Object, error) { @@ -639,6 +744,40 @@ func TestNodeGroupForNode(t *testing.T) { assert.Contains(t, ng.Id(), "kind-worker") }) + t.Run("empty nodegroup name for node", func(t *testing.T) { + nodesFrom = "configmap" + fakeNodeLister := newTestAllNodeLister(map[string]*apiv1.Node{}) + + ko := &kwokOptions{ + kubeClient: fakeClient, + autoscalingOpts: &config.AutoscalingOptions{}, + discoveryOpts: &cloudprovider.NodeGroupDiscoveryOptions{}, + resourceLimiter: cloudprovider.NewResourceLimiter( + map[string]int64{cloudprovider.ResourceNameCores: 1, cloudprovider.ResourceNameMemory: 10000000}, + map[string]int64{cloudprovider.ResourceNameCores: 10, cloudprovider.ResourceNameMemory: 100000000}), + allNodesLister: fakeNodeLister, + ngNodeListerFn: testNodeLister, + } + + p, err := BuildKwokProvider(ko) + assert.NoError(t, err) + assert.NotNil(t, p) + assert.Len(t, p.nodeGroups, 1) + assert.Contains(t, p.nodeGroups[0].Id(), "kind-worker") + + testNode := &apiv1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-without-labels", + }, + Spec: apiv1.NodeSpec{ + ProviderID: "kwok:random-instance-id", + }, + } + ng, err := p.NodeGroupForNode(testNode) + assert.NoError(t, err) + assert.Nil(t, ng) + }) + } func TestBuildKwokProvider(t *testing.T) { diff --git a/cluster-autoscaler/cloudprovider/kwok/kwok_types.go b/cluster-autoscaler/cloudprovider/kwok/kwok_types.go index 25ed7dae6fbf..538162df57bf 100644 --- a/cluster-autoscaler/cloudprovider/kwok/kwok_types.go +++ b/cluster-autoscaler/cloudprovider/kwok/kwok_types.go @@ -18,11 +18,12 @@ package kwok import ( apiv1 "k8s.io/api/core/v1" + "k8s.io/client-go/kubernetes" + listersv1 "k8s.io/client-go/listers/core/v1" + "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/config" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - "k8s.io/client-go/kubernetes" - listersv1 "k8s.io/client-go/listers/core/v1" ) // KwokCloudProvider implements CloudProvider interface for kwok @@ -32,6 +33,8 @@ type KwokCloudProvider struct { resourceLimiter *cloudprovider.ResourceLimiter // kubeClient is to be used only for create, delete and update kubeClient kubernetes.Interface + //allNodesLister is a lister to list all nodes in cluster + allNodesLister listersv1.NodeLister } type kwokOptions struct {