Skip to content

Commit

Permalink
Get replicas always from API server for cluster-autoscaler CAPI provider
Browse files Browse the repository at this point in the history
When getting Replicas() the local struct in the scalable resource might be stale. To mitigate possible side effects, we want always get a fresh replicas.

This is one in a series of PR to mitigate kubernetes#3104
  • Loading branch information
enxebre authored and Madan Gudi committed Jan 28, 2021
1 parent 8ab2db1 commit c8c0781
Show file tree
Hide file tree
Showing 8 changed files with 235 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -75,8 +76,22 @@ func (r machineDeploymentScalableResource) Nodes() ([]string, error) {
return result, nil
}

func (r machineDeploymentScalableResource) Replicas() int32 {
return pointer.Int32PtrDerefOr(r.machineDeployment.Spec.Replicas, 0)
func (r machineDeploymentScalableResource) Replicas() (int32, error) {
freshMachineDeployment, err := r.controller.getMachineDeployment(r.machineDeployment.Namespace, r.machineDeployment.Name, metav1.GetOptions{})
if err != nil {
return 0, err
}

if freshMachineDeployment == nil {
return 0, fmt.Errorf("unknown machineDeployment %s", r.machineDeployment.Name)
}

if freshMachineDeployment.Spec.Replicas == nil {
klog.Warningf("MachineDeployment %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineDeployment.Name)
}
// If no value for replicas on the MachineSet spec, fallback to the status
// TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas
return pointer.Int32PtrDerefOr(freshMachineDeployment.Spec.Replicas, freshMachineDeployment.Status.Replicas), nil
}

func (r machineDeploymentScalableResource) SetSize(nreplicas int32) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -60,8 +61,23 @@ func (r machineSetScalableResource) Nodes() ([]string, error) {
return r.controller.machineSetProviderIDs(r.machineSet)
}

func (r machineSetScalableResource) Replicas() int32 {
return pointer.Int32PtrDerefOr(r.machineSet.Spec.Replicas, 0)
func (r machineSetScalableResource) Replicas() (int32, error) {
freshMachineSet, err := r.controller.getMachineSet(r.machineSet.Namespace, r.machineSet.Name, metav1.GetOptions{})
if err != nil {
return 0, err
}

if freshMachineSet == nil {
return 0, fmt.Errorf("unknown machineSet %s", r.machineSet.Name)
}

if freshMachineSet.Spec.Replicas == nil {
klog.Warningf("MachineSet %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineSet.Name)
}

// If no value for replicas on the MachineSet spec, fallback to the status
// TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas
return pointer.Int32PtrDerefOr(freshMachineSet.Spec.Replicas, freshMachineSet.Status.Replicas), nil
}

func (r machineSetScalableResource) SetSize(nreplicas int32) error {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package clusterapi

import (
"context"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

func TestSetSize(t *testing.T) {
initialReplicas := int32(1)
updatedReplicas := int32(5)

testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
controller, stop := mustCreateTestController(t, testConfig)
defer stop()

sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
if err != nil {
t.Fatal(err)
}

err = sr.SetSize(updatedReplicas)
if err != nil {
t.Fatal(err)
}

// fetch machineSet
u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace).
Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

replicas, found, err := unstructured.NestedInt64(u.Object, "spec", "replicas")
if err != nil {
t.Fatal(err)
}
if !found {
t.Fatal("spec.replicas not found")
}

got := int32(replicas)
if got != updatedReplicas {
t.Errorf("expected %v, got: %v", updatedReplicas, got)
}
}

func TestReplicas(t *testing.T) {
initialReplicas := int32(1)
updatedReplicas := int32(5)

testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
controller, stop := mustCreateTestController(t, testConfig)
defer stop()

sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
if err != nil {
t.Fatal(err)
}

i, err := sr.Replicas()
if err != nil {
t.Fatal(err)
}

if i != initialReplicas {
t.Errorf("expected %v, got: %v", initialReplicas, i)
}

// fetch and update machineSet
u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace).
Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

if err := unstructured.SetNestedField(u.Object, int64(updatedReplicas), "spec", "replicas"); err != nil {
t.Fatal(err)
}

_, err = sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(u.GetNamespace()).
Update(context.TODO(), u, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

i, err = sr.Replicas()
if err != nil {
t.Fatal(err)
}

if i != updatedReplicas {
t.Errorf("expected %v, got: %v", updatedReplicas, i)
}
}

func TestSetSizeAndReplicas(t *testing.T) {
initialReplicas := int32(1)
updatedReplicas := int32(5)

testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
controller, stop := mustCreateTestController(t, testConfig)
defer stop()

sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
if err != nil {
t.Fatal(err)
}

i, err := sr.Replicas()
if err != nil {
t.Fatal(err)
}

if i != initialReplicas {
t.Errorf("expected %v, got: %v", initialReplicas, i)
}

err = sr.SetSize(updatedReplicas)
if err != nil {
t.Fatal(err)
}

i, err = sr.Replicas()
if err != nil {
t.Fatal(err)
}

if i != updatedReplicas {
t.Errorf("expected %v, got: %v", updatedReplicas, i)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ func (ng *nodegroup) MaxSize() int {
// (new nodes finish startup and registration or removed nodes are
// deleted completely). Implementation required.
func (ng *nodegroup) TargetSize() (int, error) {
return int(ng.scalableResource.Replicas()), nil
size, err := ng.scalableResource.Replicas()
if err != nil {
return 0, err
}
return int(size), nil
}

// IncreaseSize increases the size of the node group. To delete a node
Expand All @@ -70,11 +74,17 @@ func (ng *nodegroup) IncreaseSize(delta int) error {
if delta <= 0 {
return fmt.Errorf("size increase must be positive")
}
size := int(ng.scalableResource.Replicas())
if size+delta > ng.MaxSize() {
return fmt.Errorf("size increase too large - desired:%d max:%d", size+delta, ng.MaxSize())

size, err := ng.scalableResource.Replicas()
if err != nil {
return err
}
return ng.scalableResource.SetSize(int32(size + delta))
intSize := int(size)

if intSize+delta > ng.MaxSize() {
return fmt.Errorf("size increase too large - desired:%d max:%d", intSize+delta, ng.MaxSize())
}
return ng.scalableResource.SetSize(int32(intSize + delta))
}

// DeleteNodes deletes nodes from this node group. Error is returned
Expand Down Expand Up @@ -104,7 +114,10 @@ func (ng *nodegroup) DeleteNodes(nodes []*corev1.Node) error {
// Step 2: if deleting len(nodes) would make the replica count
// <= 0, then the request to delete that many nodes is bogus
// and we fail fast.
replicas := ng.scalableResource.Replicas()
replicas, err := ng.scalableResource.Replicas()
if err != nil {
return err
}

if replicas-int32(len(nodes)) <= 0 {
return fmt.Errorf("unable to delete %d machines in %q, machine replicas are <= 0 ", len(nodes), ng.Id())
Expand Down Expand Up @@ -187,7 +200,11 @@ func (ng *nodegroup) Id() string {

// Debug returns a string containing all information regarding this node group.
func (ng *nodegroup) Debug() string {
return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), ng.scalableResource.Replicas())
replicas, err := ng.scalableResource.Replicas()
if err != nil {
return fmt.Sprintf("%s (min: %d, max: %d, replicas: %v)", ng.Id(), ng.MinSize(), ng.MaxSize(), err)
}
return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), replicas)
}

// Nodes returns a list of all nodes that belong to this node group.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ limitations under the License.
package clusterapi

import (
"context"
"fmt"
"path"
"sort"
"strings"
"testing"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/utils/pointer"
Expand Down Expand Up @@ -108,7 +110,7 @@ func TestNodeGroupNewNodeGroupConstructor(t *testing.T) {
}

test := func(t *testing.T, tc testCase, testConfig *testConfig) {
controller, stop := mustCreateTestController(t)
controller, stop := mustCreateTestController(t, testConfig)
defer stop()

ng, err := newNodeGroup(t, controller, testConfig)
Expand Down Expand Up @@ -429,12 +431,22 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) {
switch v := (ng.scalableResource).(type) {
case *machineSetScalableResource:
testConfig.machineSet.Spec.Replicas = int32ptr(*testConfig.machineSet.Spec.Replicas + tc.targetSizeIncrement)
if err := controller.machineSetInformer.Informer().GetStore().Add(newUnstructuredFromMachineSet(testConfig.machineSet)); err != nil {
u := newUnstructuredFromMachineSet(testConfig.machineSet)
if err := controller.machineSetInformer.Informer().GetStore().Add(u); err != nil {
t.Fatalf("failed to add new machine: %v", err)
}
_, err := controller.dynamicclient.Resource(*controller.machineSetResource).Namespace(u.GetNamespace()).Update(context.TODO(), u, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("failed to updating machine: %v", err)
}
case *machineDeploymentScalableResource:
testConfig.machineDeployment.Spec.Replicas = int32ptr(*testConfig.machineDeployment.Spec.Replicas + tc.targetSizeIncrement)
if err := controller.machineDeploymentInformer.Informer().GetStore().Add(newUnstructuredFromMachineDeployment(testConfig.machineDeployment)); err != nil {
u := newUnstructuredFromMachineDeployment(testConfig.machineDeployment)
if err := controller.machineDeploymentInformer.Informer().GetStore().Add(u); err != nil {
}
_, err := controller.dynamicclient.Resource(*controller.machineDeploymentResource).Namespace(u.GetNamespace()).Update(context.TODO(), u, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("failed to updating machine: %v", err)
}
default:
t.Errorf("unexpected type: %T", v)
Expand All @@ -450,6 +462,7 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) {
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if currReplicas != int(tc.initial)+int(tc.targetSizeIncrement) {
t.Errorf("initially expected %v, got %v", tc.initial, currReplicas)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type scalableResource interface {
SetSize(nreplicas int32) error

// Replicas returns the current replica count of the resource
Replicas() int32
Replicas() (int32, error)

// MarkMachineForDeletion marks machine for deletion
MarkMachineForDeletion(machine *Machine) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ type MachineDeploymentSpec struct {
}

// MachineDeploymentStatus is the internal autoscaler Schema for MachineDeploymentStatus
type MachineDeploymentStatus struct{}
type MachineDeploymentStatus struct {
// Number of desired machines. Defaults to 1.
// This is a pointer to distinguish between explicit zero and not specified.
Replicas int32 `json:"replicas,omitempty"`
}

// MachineDeployment is the internal autoscaler Schema for MachineDeployment
type MachineDeployment struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ type MachineTemplateSpec struct {
}

// MachineSetStatus is the internal autoscaler Schema for MachineSetStatus
type MachineSetStatus struct{}
type MachineSetStatus struct {
// Replicas is the most recently observed number of replicas.
Replicas int32 `json:"replicas"`
}

// MachineSetList is the internal autoscaler Schema for MachineSetList
type MachineSetList struct {
Expand Down

0 comments on commit c8c0781

Please sign in to comment.