From 54308d77cdf4bef21186e6d63b72411c5f20c12f Mon Sep 17 00:00:00 2001 From: Enxebre Date: Mon, 4 May 2020 12:10:15 +0200 Subject: [PATCH] Get replicas always from API server for cluster-autoscaler CAPI provider When getting Replicas() the local struct in the scalable resource might be stale. To mitigate possible side effects, we want always get a fresh replicas. This is one in a series of PR to mitigate kubernetes#3104 --- .../clusterapi_machinedeployment.go | 19 ++- .../clusterapi/clusterapi_machineset.go | 20 ++- .../clusterapi/clusterapi_machineset_test.go | 150 ++++++++++++++++++ .../clusterapi/clusterapi_nodegroup.go | 31 +++- .../clusterapi/clusterapi_nodegroup_test.go | 19 ++- .../clusterapi/clusterapi_scalableresource.go | 2 +- .../clusterapi/machinedeployment_types.go | 6 +- .../clusterapi/machineset_types.go | 5 +- 8 files changed, 235 insertions(+), 17 deletions(-) create mode 100644 cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset_test.go diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machinedeployment.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machinedeployment.go index 100c1ff6bbf8..2276b8ebb3a4 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machinedeployment.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machinedeployment.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/klog" "k8s.io/utils/pointer" ) @@ -75,8 +76,22 @@ func (r machineDeploymentScalableResource) Nodes() ([]string, error) { return result, nil } -func (r machineDeploymentScalableResource) Replicas() int32 { - return pointer.Int32PtrDerefOr(r.machineDeployment.Spec.Replicas, 0) +func (r machineDeploymentScalableResource) Replicas() (int32, error) { + freshMachineDeployment, err := r.controller.getMachineDeployment(r.machineDeployment.Namespace, r.machineDeployment.Name, metav1.GetOptions{}) + if err != nil { + return 0, err + } + + if freshMachineDeployment == nil { + return 0, fmt.Errorf("unknown machineDeployment %s", r.machineDeployment.Name) + } + + if freshMachineDeployment.Spec.Replicas == nil { + klog.Warningf("MachineDeployment %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineDeployment.Name) + } + // If no value for replicas on the MachineSet spec, fallback to the status + // TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas + return pointer.Int32PtrDerefOr(freshMachineDeployment.Spec.Replicas, freshMachineDeployment.Status.Replicas), nil } func (r machineDeploymentScalableResource) SetSize(nreplicas int32) error { diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset.go index 98655bfaf3b5..18c968144347 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset.go @@ -24,6 +24,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/klog" "k8s.io/utils/pointer" ) @@ -60,8 +61,23 @@ func (r machineSetScalableResource) Nodes() ([]string, error) { return r.controller.machineSetProviderIDs(r.machineSet) } -func (r machineSetScalableResource) Replicas() int32 { - return pointer.Int32PtrDerefOr(r.machineSet.Spec.Replicas, 0) +func (r machineSetScalableResource) Replicas() (int32, error) { + freshMachineSet, err := r.controller.getMachineSet(r.machineSet.Namespace, r.machineSet.Name, metav1.GetOptions{}) + if err != nil { + return 0, err + } + + if freshMachineSet == nil { + return 0, fmt.Errorf("unknown machineSet %s", r.machineSet.Name) + } + + if freshMachineSet.Spec.Replicas == nil { + klog.Warningf("MachineSet %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineSet.Name) + } + + // If no value for replicas on the MachineSet spec, fallback to the status + // TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas + return pointer.Int32PtrDerefOr(freshMachineSet.Spec.Replicas, freshMachineSet.Status.Replicas), nil } func (r machineSetScalableResource) SetSize(nreplicas int32) error { diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset_test.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset_test.go new file mode 100644 index 000000000000..92930ee01c6d --- /dev/null +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset_test.go @@ -0,0 +1,150 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterapi + +import ( + "context" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +func TestSetSize(t *testing.T) { + initialReplicas := int32(1) + updatedReplicas := int32(5) + + testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil) + controller, stop := mustCreateTestController(t, testConfig) + defer stop() + + sr, err := newMachineSetScalableResource(controller, testConfig.machineSet) + if err != nil { + t.Fatal(err) + } + + err = sr.SetSize(updatedReplicas) + if err != nil { + t.Fatal(err) + } + + // fetch machineSet + u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace). + Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + replicas, found, err := unstructured.NestedInt64(u.Object, "spec", "replicas") + if err != nil { + t.Fatal(err) + } + if !found { + t.Fatal("spec.replicas not found") + } + + got := int32(replicas) + if got != updatedReplicas { + t.Errorf("expected %v, got: %v", updatedReplicas, got) + } +} + +func TestReplicas(t *testing.T) { + initialReplicas := int32(1) + updatedReplicas := int32(5) + + testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil) + controller, stop := mustCreateTestController(t, testConfig) + defer stop() + + sr, err := newMachineSetScalableResource(controller, testConfig.machineSet) + if err != nil { + t.Fatal(err) + } + + i, err := sr.Replicas() + if err != nil { + t.Fatal(err) + } + + if i != initialReplicas { + t.Errorf("expected %v, got: %v", initialReplicas, i) + } + + // fetch and update machineSet + u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace). + Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + if err := unstructured.SetNestedField(u.Object, int64(updatedReplicas), "spec", "replicas"); err != nil { + t.Fatal(err) + } + + _, err = sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(u.GetNamespace()). + Update(context.TODO(), u, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + i, err = sr.Replicas() + if err != nil { + t.Fatal(err) + } + + if i != updatedReplicas { + t.Errorf("expected %v, got: %v", updatedReplicas, i) + } +} + +func TestSetSizeAndReplicas(t *testing.T) { + initialReplicas := int32(1) + updatedReplicas := int32(5) + + testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil) + controller, stop := mustCreateTestController(t, testConfig) + defer stop() + + sr, err := newMachineSetScalableResource(controller, testConfig.machineSet) + if err != nil { + t.Fatal(err) + } + + i, err := sr.Replicas() + if err != nil { + t.Fatal(err) + } + + if i != initialReplicas { + t.Errorf("expected %v, got: %v", initialReplicas, i) + } + + err = sr.SetSize(updatedReplicas) + if err != nil { + t.Fatal(err) + } + + i, err = sr.Replicas() + if err != nil { + t.Fatal(err) + } + + if i != updatedReplicas { + t.Errorf("expected %v, got: %v", updatedReplicas, i) + } +} diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup.go index a106c8a195b7..c73fd0e96b5f 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup.go @@ -59,7 +59,11 @@ func (ng *nodegroup) MaxSize() int { // (new nodes finish startup and registration or removed nodes are // deleted completely). Implementation required. func (ng *nodegroup) TargetSize() (int, error) { - return int(ng.scalableResource.Replicas()), nil + size, err := ng.scalableResource.Replicas() + if err != nil { + return 0, err + } + return int(size), nil } // IncreaseSize increases the size of the node group. To delete a node @@ -70,11 +74,17 @@ func (ng *nodegroup) IncreaseSize(delta int) error { if delta <= 0 { return fmt.Errorf("size increase must be positive") } - size := int(ng.scalableResource.Replicas()) - if size+delta > ng.MaxSize() { - return fmt.Errorf("size increase too large - desired:%d max:%d", size+delta, ng.MaxSize()) + + size, err := ng.scalableResource.Replicas() + if err != nil { + return err } - return ng.scalableResource.SetSize(int32(size + delta)) + intSize := int(size) + + if intSize+delta > ng.MaxSize() { + return fmt.Errorf("size increase too large - desired:%d max:%d", intSize+delta, ng.MaxSize()) + } + return ng.scalableResource.SetSize(int32(intSize + delta)) } // DeleteNodes deletes nodes from this node group. Error is returned @@ -104,7 +114,10 @@ func (ng *nodegroup) DeleteNodes(nodes []*corev1.Node) error { // Step 2: if deleting len(nodes) would make the replica count // <= 0, then the request to delete that many nodes is bogus // and we fail fast. - replicas := ng.scalableResource.Replicas() + replicas, err := ng.scalableResource.Replicas() + if err != nil { + return err + } if replicas-int32(len(nodes)) <= 0 { return fmt.Errorf("unable to delete %d machines in %q, machine replicas are <= 0 ", len(nodes), ng.Id()) @@ -187,7 +200,11 @@ func (ng *nodegroup) Id() string { // Debug returns a string containing all information regarding this node group. func (ng *nodegroup) Debug() string { - return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), ng.scalableResource.Replicas()) + replicas, err := ng.scalableResource.Replicas() + if err != nil { + return fmt.Sprintf("%s (min: %d, max: %d, replicas: %v)", ng.Id(), ng.MinSize(), ng.MaxSize(), err) + } + return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), replicas) } // Nodes returns a list of all nodes that belong to this node group. diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go index 1d3d4ed7468e..e153bbe1a6c4 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go @@ -17,6 +17,7 @@ limitations under the License. package clusterapi import ( + "context" "fmt" "path" "sort" @@ -24,6 +25,7 @@ import ( "testing" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/utils/pointer" @@ -108,7 +110,7 @@ func TestNodeGroupNewNodeGroupConstructor(t *testing.T) { } test := func(t *testing.T, tc testCase, testConfig *testConfig) { - controller, stop := mustCreateTestController(t) + controller, stop := mustCreateTestController(t, testConfig) defer stop() ng, err := newNodeGroup(t, controller, testConfig) @@ -429,12 +431,22 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) { switch v := (ng.scalableResource).(type) { case *machineSetScalableResource: testConfig.machineSet.Spec.Replicas = int32ptr(*testConfig.machineSet.Spec.Replicas + tc.targetSizeIncrement) - if err := controller.machineSetInformer.Informer().GetStore().Add(newUnstructuredFromMachineSet(testConfig.machineSet)); err != nil { + u := newUnstructuredFromMachineSet(testConfig.machineSet) + if err := controller.machineSetInformer.Informer().GetStore().Add(u); err != nil { t.Fatalf("failed to add new machine: %v", err) } + _, err := controller.dynamicclient.Resource(*controller.machineSetResource).Namespace(u.GetNamespace()).Update(context.TODO(), u, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to updating machine: %v", err) + } case *machineDeploymentScalableResource: testConfig.machineDeployment.Spec.Replicas = int32ptr(*testConfig.machineDeployment.Spec.Replicas + tc.targetSizeIncrement) - if err := controller.machineDeploymentInformer.Informer().GetStore().Add(newUnstructuredFromMachineDeployment(testConfig.machineDeployment)); err != nil { + u := newUnstructuredFromMachineDeployment(testConfig.machineDeployment) + if err := controller.machineDeploymentInformer.Informer().GetStore().Add(u); err != nil { + } + _, err := controller.dynamicclient.Resource(*controller.machineDeploymentResource).Namespace(u.GetNamespace()).Update(context.TODO(), u, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to updating machine: %v", err) } default: t.Errorf("unexpected type: %T", v) @@ -450,6 +462,7 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) { if err != nil { t.Fatalf("unexpected error: %v", err) } + if currReplicas != int(tc.initial)+int(tc.targetSizeIncrement) { t.Errorf("initially expected %v, got %v", tc.initial, currReplicas) } diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_scalableresource.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_scalableresource.go index 6c47da3004db..30f41eae993c 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_scalableresource.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_scalableresource.go @@ -42,7 +42,7 @@ type scalableResource interface { SetSize(nreplicas int32) error // Replicas returns the current replica count of the resource - Replicas() int32 + Replicas() (int32, error) // MarkMachineForDeletion marks machine for deletion MarkMachineForDeletion(machine *Machine) error diff --git a/cluster-autoscaler/cloudprovider/clusterapi/machinedeployment_types.go b/cluster-autoscaler/cloudprovider/clusterapi/machinedeployment_types.go index 943cb3206923..d1525bf9d5a1 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/machinedeployment_types.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/machinedeployment_types.go @@ -36,7 +36,11 @@ type MachineDeploymentSpec struct { } // MachineDeploymentStatus is the internal autoscaler Schema for MachineDeploymentStatus -type MachineDeploymentStatus struct{} +type MachineDeploymentStatus struct { + // Number of desired machines. Defaults to 1. + // This is a pointer to distinguish between explicit zero and not specified. + Replicas int32 `json:"replicas,omitempty"` +} // MachineDeployment is the internal autoscaler Schema for MachineDeployment type MachineDeployment struct { diff --git a/cluster-autoscaler/cloudprovider/clusterapi/machineset_types.go b/cluster-autoscaler/cloudprovider/clusterapi/machineset_types.go index 38232d7fa947..2f2d7ddb895c 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/machineset_types.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/machineset_types.go @@ -68,7 +68,10 @@ type MachineTemplateSpec struct { } // MachineSetStatus is the internal autoscaler Schema for MachineSetStatus -type MachineSetStatus struct{} +type MachineSetStatus struct { + // Replicas is the most recently observed number of replicas. + Replicas int32 `json:"replicas"` +} // MachineSetList is the internal autoscaler Schema for MachineSetList type MachineSetList struct {