Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CA-1.18] #3177 cherry-pick: Fix stale replicas issue with cluster-autoscaler CAPI provider #3345

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"
"strings"
"sync"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -65,6 +66,7 @@ type machineController struct {
machineSetResource *schema.GroupVersionResource
machineResource *schema.GroupVersionResource
machineDeploymentResource *schema.GroupVersionResource
accessLock sync.Mutex
}

type machineSetFilterFunc func(machineSet *MachineSet) error
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -75,8 +76,22 @@ func (r machineDeploymentScalableResource) Nodes() ([]string, error) {
return result, nil
}

func (r machineDeploymentScalableResource) Replicas() int32 {
return pointer.Int32PtrDerefOr(r.machineDeployment.Spec.Replicas, 0)
func (r machineDeploymentScalableResource) Replicas() (int32, error) {
freshMachineDeployment, err := r.controller.getMachineDeployment(r.machineDeployment.Namespace, r.machineDeployment.Name, metav1.GetOptions{})
if err != nil {
return 0, err
}

if freshMachineDeployment == nil {
return 0, fmt.Errorf("unknown machineDeployment %s", r.machineDeployment.Name)
}

if freshMachineDeployment.Spec.Replicas == nil {
klog.Warningf("MachineDeployment %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineDeployment.Name)
}
// If no value for replicas on the MachineSet spec, fallback to the status
// TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas
return pointer.Int32PtrDerefOr(freshMachineDeployment.Spec.Replicas, freshMachineDeployment.Status.Replicas), nil
}

func (r machineDeploymentScalableResource) SetSize(nreplicas int32) error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/klog"
"k8s.io/utils/pointer"
)

Expand Down Expand Up @@ -60,8 +61,23 @@ func (r machineSetScalableResource) Nodes() ([]string, error) {
return r.controller.machineSetProviderIDs(r.machineSet)
}

func (r machineSetScalableResource) Replicas() int32 {
return pointer.Int32PtrDerefOr(r.machineSet.Spec.Replicas, 0)
func (r machineSetScalableResource) Replicas() (int32, error) {
freshMachineSet, err := r.controller.getMachineSet(r.machineSet.Namespace, r.machineSet.Name, metav1.GetOptions{})
if err != nil {
return 0, err
}

if freshMachineSet == nil {
return 0, fmt.Errorf("unknown machineSet %s", r.machineSet.Name)
}

if freshMachineSet.Spec.Replicas == nil {
klog.Warningf("MachineSet %q has nil spec.replicas. This is unsupported behaviour. Falling back to status.replicas.", r.machineSet.Name)
}

// If no value for replicas on the MachineSet spec, fallback to the status
// TODO: Remove this fallback once defaulting is implemented for MachineSet Replicas
return pointer.Int32PtrDerefOr(freshMachineSet.Spec.Replicas, freshMachineSet.Status.Replicas), nil
}

func (r machineSetScalableResource) SetSize(nreplicas int32) error {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
Copyright 2020 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package clusterapi

import (
"context"
"testing"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)

func TestSetSize(t *testing.T) {
initialReplicas := int32(1)
updatedReplicas := int32(5)

testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
controller, stop := mustCreateTestController(t, testConfig)
defer stop()

sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
if err != nil {
t.Fatal(err)
}

err = sr.SetSize(updatedReplicas)
if err != nil {
t.Fatal(err)
}

// fetch machineSet
u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace).
Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

replicas, found, err := unstructured.NestedInt64(u.Object, "spec", "replicas")
if err != nil {
t.Fatal(err)
}
if !found {
t.Fatal("spec.replicas not found")
}

got := int32(replicas)
if got != updatedReplicas {
t.Errorf("expected %v, got: %v", updatedReplicas, got)
}
}

func TestReplicas(t *testing.T) {
initialReplicas := int32(1)
updatedReplicas := int32(5)

testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
controller, stop := mustCreateTestController(t, testConfig)
defer stop()

sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
if err != nil {
t.Fatal(err)
}

i, err := sr.Replicas()
if err != nil {
t.Fatal(err)
}

if i != initialReplicas {
t.Errorf("expected %v, got: %v", initialReplicas, i)
}

// fetch and update machineSet
u, err := sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(testConfig.machineSet.Namespace).
Get(context.TODO(), testConfig.machineSet.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}

if err := unstructured.SetNestedField(u.Object, int64(updatedReplicas), "spec", "replicas"); err != nil {
t.Fatal(err)
}

_, err = sr.controller.dynamicclient.Resource(*sr.controller.machineSetResource).Namespace(u.GetNamespace()).
Update(context.TODO(), u, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

i, err = sr.Replicas()
if err != nil {
t.Fatal(err)
}

if i != updatedReplicas {
t.Errorf("expected %v, got: %v", updatedReplicas, i)
}
}

func TestSetSizeAndReplicas(t *testing.T) {
initialReplicas := int32(1)
updatedReplicas := int32(5)

testConfig := createMachineSetTestConfig(testNamespace, int(initialReplicas), nil)
controller, stop := mustCreateTestController(t, testConfig)
defer stop()

sr, err := newMachineSetScalableResource(controller, testConfig.machineSet)
if err != nil {
t.Fatal(err)
}

i, err := sr.Replicas()
if err != nil {
t.Fatal(err)
}

if i != initialReplicas {
t.Errorf("expected %v, got: %v", initialReplicas, i)
}

err = sr.SetSize(updatedReplicas)
if err != nil {
t.Fatal(err)
}

i, err = sr.Replicas()
if err != nil {
t.Fatal(err)
}

if i != updatedReplicas {
t.Errorf("expected %v, got: %v", updatedReplicas, i)
}
}
47 changes: 36 additions & 11 deletions cluster-autoscaler/cloudprovider/clusterapi/clusterapi_nodegroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ func (ng *nodegroup) MaxSize() int {
// (new nodes finish startup and registration or removed nodes are
// deleted completely). Implementation required.
func (ng *nodegroup) TargetSize() (int, error) {
return int(ng.scalableResource.Replicas()), nil
size, err := ng.scalableResource.Replicas()
if err != nil {
return 0, err
}
return int(size), nil
}

// IncreaseSize increases the size of the node group. To delete a node
Expand All @@ -70,18 +74,37 @@ func (ng *nodegroup) IncreaseSize(delta int) error {
if delta <= 0 {
return fmt.Errorf("size increase must be positive")
}
size := int(ng.scalableResource.Replicas())
if size+delta > ng.MaxSize() {
return fmt.Errorf("size increase too large - desired:%d max:%d", size+delta, ng.MaxSize())

size, err := ng.scalableResource.Replicas()
if err != nil {
return err
}
return ng.scalableResource.SetSize(int32(size + delta))
intSize := int(size)

if intSize+delta > ng.MaxSize() {
return fmt.Errorf("size increase too large - desired:%d max:%d", intSize+delta, ng.MaxSize())
}
return ng.scalableResource.SetSize(int32(intSize + delta))
}

// DeleteNodes deletes nodes from this node group. Error is returned
// either on failure or if the given node doesn't belong to this node
// group. This function should wait until node group size is updated.
// Implementation required.
func (ng *nodegroup) DeleteNodes(nodes []*corev1.Node) error {
ng.machineController.accessLock.Lock()
defer ng.machineController.accessLock.Unlock()

replicas, err := ng.scalableResource.Replicas()
if err != nil {
return err
}

// if we are at minSize already we wail early.
if int(replicas) <= ng.MinSize() {
return fmt.Errorf("min size reached, nodes will not be deleted")
}

// Step 1: Verify all nodes belong to this node group.
for _, node := range nodes {
actualNodeGroup, err := ng.machineController.nodeGroupForNode(node)
Expand All @@ -99,12 +122,10 @@ func (ng *nodegroup) DeleteNodes(nodes []*corev1.Node) error {
}

// Step 2: if deleting len(nodes) would make the replica count
// <= 0, then the request to delete that many nodes is bogus
// < minSize, then the request to delete that many nodes is bogus
// and we fail fast.
replicas := ng.scalableResource.Replicas()

if replicas-int32(len(nodes)) <= 0 {
return fmt.Errorf("unable to delete %d machines in %q, machine replicas are <= 0 ", len(nodes), ng.Id())
if replicas-int32(len(nodes)) < int32(ng.MinSize()) {
return fmt.Errorf("unable to delete %d machines in %q, machine replicas are %q, minSize is %q ", len(nodes), ng.Id(), replicas, ng.MinSize())
}

// Step 3: annotate the corresponding machine that it is a
Expand Down Expand Up @@ -184,7 +205,11 @@ func (ng *nodegroup) Id() string {

// Debug returns a string containing all information regarding this node group.
func (ng *nodegroup) Debug() string {
return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), ng.scalableResource.Replicas())
replicas, err := ng.scalableResource.Replicas()
if err != nil {
return fmt.Sprintf("%s (min: %d, max: %d, replicas: %v)", ng.Id(), ng.MinSize(), ng.MaxSize(), err)
}
return fmt.Sprintf(debugFormat, ng.Id(), ng.MinSize(), ng.MaxSize(), replicas)
}

// Nodes returns a list of all nodes that belong to this node group.
Expand Down
Loading