From 6f34947d37aecb2ee3c2590771b7441faa8d75ff Mon Sep 17 00:00:00 2001 From: Enxebre Date: Mon, 4 May 2020 12:10:15 +0200 Subject: [PATCH] UPSTREAM: : Get replicas always from API server When getting Replicas() the local struct in the scalable resource might be stale. To mitigate possible side effects, we want always get a fresh replicas. This is one in a series of PR to mitigate kubernetes#3104 Once we got all merge we'll put a PR upstream. --- .../clusterapi_machinedeployment.go | 15 +- .../clusterapi/clusterapi_machineset.go | 15 +- .../clusterapi/clusterapi_machineset_test.go | 150 ++++++++++++++++++ .../clusterapi/clusterapi_nodegroup.go | 31 +++- .../clusterapi/clusterapi_nodegroup_test.go | 20 ++- .../clusterapi/clusterapi_scalableresource.go | 2 +- 6 files changed, 215 insertions(+), 18 deletions(-) create mode 100644 cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset_test.go diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machinedeployment.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machinedeployment.go index d12123f56e5..c563b2c85ae 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machinedeployment.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machinedeployment.go @@ -78,13 +78,22 @@ func (r machineDeploymentScalableResource) Nodes() ([]string, error) { return result, nil } -func (r machineDeploymentScalableResource) Replicas() int32 { - if r.machineDeployment.Spec.Replicas == nil { +func (r machineDeploymentScalableResource) Replicas() (int32, error) { + freshMachineDeployment, err := r.controller.getMachineDeployment(r.machineDeployment.Namespace, r.machineDeployment.Name, metav1.GetOptions{}) + if err != nil { + return 0, err + } + + if freshMachineDeployment == nil { + return 0, fmt.Errorf("unknown machineDeployment %s", r.machineDeployment.Name) + } + + if freshMachineDeployment.Spec.Replicas == nil { klog.Warningf("MachineDeployment %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineDeployment.Name) } // If no value for replicas on the MachineSet spec, fallback to the status // TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas - return pointer.Int32PtrDerefOr(r.machineDeployment.Spec.Replicas, r.machineDeployment.Status.Replicas) + return pointer.Int32PtrDerefOr(freshMachineDeployment.Spec.Replicas, freshMachineDeployment.Status.Replicas), nil } func (r machineDeploymentScalableResource) SetSize(nreplicas int32) error { diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset.go index ab1dee7e37b..09d7760d99d 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset.go @@ -63,14 +63,23 @@ func (r machineSetScalableResource) Nodes() ([]string, error) { return r.controller.machineSetProviderIDs(r.machineSet) } -func (r machineSetScalableResource) Replicas() int32 { - if r.machineSet.Spec.Replicas == nil { +func (r machineSetScalableResource) Replicas() (int32, error) { + freshMachineSet, err := r.controller.getMachineSet(r.machineSet.Namespace, r.machineSet.Name, metav1.GetOptions{}) + if err != nil { + return 0, err + } + + if freshMachineSet == nil { + return 0, fmt.Errorf("unknown machineSet %s", r.machineSet.Name) + } + + if freshMachineSet.Spec.Replicas == nil { klog.Warningf("MachineSet %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineSet.Name) } // If no value for replicas on the MachineSet spec, fallback to the status // TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas - return pointer.Int32PtrDerefOr(r.machineSet.Spec.Replicas, r.machineSet.Status.Replicas) + return pointer.Int32PtrDerefOr(freshMachineSet.Spec.Replicas, freshMachineSet.Status.Replicas), nil } func (r machineSetScalableResource) SetSize(nreplicas int32) error { diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset_test.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset_test.go new file mode 100644 index 00000000000..92930ee01c6 --- /dev/null +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_machineset_test.go @@ -0,0 +1,150 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterapi + +import ( + "context" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +func TestSetSize(t *testing.T) { + initialReplicas := int32(1) + updatedReplicas := int32(5) + + testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil) + controller, stop := mustCreateTestController(t, testConfig) + defer stop() + + sr, err := newMachineSetScalableResource(controller, testConfig.machineSet) + if err != nil { + t.Fatal(err) + } + + err = sr.SetSize(updatedReplicas) + if err != nil { + t.Fatal(err) + } + + // fetch machineSet + u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace). + Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + replicas, found, err := unstructured.NestedInt64(u.Object, "spec", "replicas") + if err != nil { + t.Fatal(err) + } + if !found { + t.Fatal("spec.replicas not found") + } + + got := int32(replicas) + if got != updatedReplicas { + t.Errorf("expected %v, got: %v", updatedReplicas, got) + } +} + +func TestReplicas(t *testing.T) { + initialReplicas := int32(1) + updatedReplicas := int32(5) + + testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil) + controller, stop := mustCreateTestController(t, testConfig) + defer stop() + + sr, err := newMachineSetScalableResource(controller, testConfig.machineSet) + if err != nil { + t.Fatal(err) + } + + i, err := sr.Replicas() + if err != nil { + t.Fatal(err) + } + + if i != initialReplicas { + t.Errorf("expected %v, got: %v", initialReplicas, i) + } + + // fetch and update machineSet + u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace). + Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + if err := unstructured.SetNestedField(u.Object, int64(updatedReplicas), "spec", "replicas"); err != nil { + t.Fatal(err) + } + + _, err = sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(u.GetNamespace()). + Update(context.TODO(), u, metav1.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } + + i, err = sr.Replicas() + if err != nil { + t.Fatal(err) + } + + if i != updatedReplicas { + t.Errorf("expected %v, got: %v", updatedReplicas, i) + } +} + +func TestSetSizeAndReplicas(t *testing.T) { + initialReplicas := int32(1) + updatedReplicas := int32(5) + + testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil) + controller, stop := mustCreateTestController(t, testConfig) + defer stop() + + sr, err := newMachineSetScalableResource(controller, testConfig.machineSet) + if err != nil { + t.Fatal(err) + } + + i, err := sr.Replicas() + if err != nil { + t.Fatal(err) + } + + if i != initialReplicas { + t.Errorf("expected %v, got: %v", initialReplicas, i) + } + + err = sr.SetSize(updatedReplicas) + if err != nil { + t.Fatal(err) + } + + i, err = sr.Replicas() + if err != nil { + t.Fatal(err) + } + + if i != updatedReplicas { + t.Errorf("expected %v, got: %v", updatedReplicas, i) + } +} diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup.go index 8d0cf6d8c52..c41c619d064 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup.go @@ -68,7 +68,11 @@ func (ng *nodegroup) MaxSize() int { // (new nodes finish startup and registration or removed nodes are // deleted completely). Implementation required. func (ng *nodegroup) TargetSize() (int, error) { - return int(ng.scalableResource.Replicas()), nil + size, err := ng.scalableResource.Replicas() + if err != nil { + return 0, err + } + return int(size), nil } // IncreaseSize increases the size of the node group. To delete a node @@ -79,11 +83,17 @@ func (ng *nodegroup) IncreaseSize(delta int) error { if delta <= 0 { return fmt.Errorf("size increase must be positive") } - size := int(ng.scalableResource.Replicas()) - if size+delta > ng.MaxSize() { - return fmt.Errorf("size increase too large - desired:%d max:%d", size+delta, ng.MaxSize()) + + size, err := ng.scalableResource.Replicas() + if err != nil { + return err } - return ng.scalableResource.SetSize(int32(size + delta)) + intSize := int(size) + + if intSize+delta > ng.MaxSize() { + return fmt.Errorf("size increase too large - desired:%d max:%d", intSize+delta, ng.MaxSize()) + } + return ng.scalableResource.SetSize(int32(intSize + delta)) } // DeleteNodes deletes nodes from this node group. Error is returned @@ -110,7 +120,10 @@ func (ng *nodegroup) DeleteNodes(nodes []*corev1.Node) error { // Step 2: if deleting len(nodes) would make the replica count // <= 0, then the request to delete that many nodes is bogus // and we fail fast. - replicas := ng.scalableResource.Replicas() + replicas, err := ng.scalableResource.Replicas() + if err != nil { + return err + } if replicas-int32(len(nodes)) < 0 { return fmt.Errorf("unable to delete %d machines in %q, machine replicas are < 0 ", len(nodes), ng.Id()) @@ -196,7 +209,11 @@ func (ng *nodegroup) Id() string { // Debug returns a string containing all information regarding this node group. func (ng *nodegroup) Debug() string { - return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), ng.scalableResource.Replicas()) + replicas, err := ng.scalableResource.Replicas() + if err != nil { + return fmt.Sprintf("%s (min: %d, max: %d, replicas: %v)", ng.Id(), ng.MinSize(), ng.MaxSize(), err) + } + return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), replicas) } // Nodes returns a list of all nodes that belong to this node group. diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go index bd7a0ae76e1..007720480d8 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup_test.go @@ -17,6 +17,7 @@ limitations under the License. package clusterapi import ( + "context" "fmt" "path" "sort" @@ -24,6 +25,7 @@ import ( "testing" "time" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/utils/pointer" @@ -108,7 +110,7 @@ func TestNodeGroupNewNodeGroupConstructor(t *testing.T) { } test := func(t *testing.T, tc testCase, testConfig *testConfig) { - controller, stop := mustCreateTestController(t) + controller, stop := mustCreateTestController(t, testConfig) defer stop() ng, err := newNodeGroup(t, controller, testConfig) @@ -457,12 +459,22 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) { switch v := (ng.scalableResource).(type) { case *machineSetScalableResource: testConfig.machineSet.Spec.Replicas = int32ptr(*testConfig.machineSet.Spec.Replicas + tc.targetSizeIncrement) - if err := controller.machineSetInformer.Informer().GetStore().Add(newUnstructuredFromMachineSet(testConfig.machineSet)); err != nil { + u := newUnstructuredFromMachineSet(testConfig.machineSet) + if err := controller.machineSetInformer.Informer().GetStore().Add(u); err != nil { t.Fatalf("failed to add new machine: %v", err) } + _, err := controller.dynamicclient.Resource(*controller.machineSetResource).Namespace(u.GetNamespace()).Update(context.TODO(), u, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to updating machine: %v", err) + } case *machineDeploymentScalableResource: testConfig.machineDeployment.Spec.Replicas = int32ptr(*testConfig.machineDeployment.Spec.Replicas + tc.targetSizeIncrement) - if err := controller.machineDeploymentInformer.Informer().GetStore().Add(newUnstructuredFromMachineDeployment(testConfig.machineDeployment)); err != nil { + u := newUnstructuredFromMachineDeployment(testConfig.machineDeployment) + if err := controller.machineDeploymentInformer.Informer().GetStore().Add(u); err != nil { + } + _, err := controller.dynamicclient.Resource(*controller.machineDeploymentResource).Namespace(u.GetNamespace()).Update(context.TODO(), u, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("failed to updating machine: %v", err) } default: t.Errorf("unexpected type: %T", v) @@ -483,7 +495,7 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) { // TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas if tc.initialSpec != nil { if currReplicas != int(*tc.initialSpec)+int(tc.targetSizeIncrement) { - t.Errorf("initially expected %v, got %v", tc.initialSpec, currReplicas) + t.Errorf("initially expected %v, got %v", *tc.initialSpec, currReplicas) } } else { if currReplicas != int(tc.initialStatus)+int(tc.targetSizeIncrement) { diff --git a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_scalableresource.go b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_scalableresource.go index 8ff8ae9cbfd..3c068e7a14b 100644 --- a/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_scalableresource.go +++ b/cluster-autoscaler/cloudprovider/clusterapi/clusterapi_scalableresource.go @@ -51,7 +51,7 @@ type scalableResource interface { SetSize(nreplicas int32) error // Replicas returns the current replica count of the resource - Replicas() int32 + Replicas() (int32, error) // MarkMachineForDeletion marks machine for deletion MarkMachineForDeletion(machine *Machine) error