Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[clusterapi] Rely on replica count found in unstructuredScalableResource #4443

Merged
merged 2 commits into from
Dec 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package clusterapi
import (
"context"
"fmt"
"k8s.io/client-go/tools/cache"
"path"
"sort"
"strings"
Expand Down Expand Up @@ -425,42 +426,79 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}

scalableResource.Spec.Replicas += tc.targetSizeIncrement
ch := make(chan error)
checkDone := func(obj interface{}) (bool, error) {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return false, nil
}
if u.GetResourceVersion() != scalableResource.GetResourceVersion() {
return false, nil
}
ng, err := newNodeGroupFromScalableResource(controller, u)
if err != nil {
return true, fmt.Errorf("unexpected error: %v", err)
}
if ng == nil {
return false, nil
}
currReplicas, err := ng.TargetSize()
if err != nil {
return true, fmt.Errorf("unexpected error: %v", err)
}

_, err = ng.machineController.managementScaleClient.Scales(ng.scalableResource.Namespace()).
Update(context.TODO(), gvr.GroupResource(), scalableResource, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if currReplicas != int(tc.initial)+int(tc.targetSizeIncrement) {
return true, fmt.Errorf("expected %v, got %v", tc.initial+tc.targetSizeIncrement, currReplicas)
}

// A nodegroup is immutable; get a fresh copy after adding targetSizeIncrement.
nodegroups, err = controller.nodeGroups()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ng = nodegroups[0]
if err := ng.DecreaseTargetSize(tc.delta); (err != nil) != tc.expectedError {
return true, fmt.Errorf("expected error: %v, got: %v", tc.expectedError, err)
}

currReplicas, err := ng.TargetSize()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
scalableResource, err := controller.managementScaleClient.Scales(testConfig.spec.namespace).
Get(context.TODO(), gvr.GroupResource(), ng.scalableResource.Name(), metav1.GetOptions{})
if err != nil {
return true, fmt.Errorf("unexpected error: %v", err)
}

if currReplicas != int(tc.initial)+int(tc.targetSizeIncrement) {
t.Errorf("initially expected %v, got %v", tc.initial, currReplicas)
if scalableResource.Spec.Replicas != tc.expected {
return true, fmt.Errorf("expected %v, got %v", tc.expected, scalableResource.Spec.Replicas)
}
return true, nil
}

if err := ng.DecreaseTargetSize(tc.delta); (err != nil) != tc.expectedError {
t.Fatalf("expected error: %v, got: %v", tc.expectedError, err)
handler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
match, err := checkDone(obj)
if match {
ch <- err
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
match, err := checkDone(newObj)
if match {
ch <- err
}
},
}
controller.machineSetInformer.Informer().AddEventHandler(handler)
controller.machineDeploymentInformer.Informer().AddEventHandler(handler)

scalableResource, err = controller.managementScaleClient.Scales(testConfig.spec.namespace).
Get(context.TODO(), gvr.GroupResource(), ng.scalableResource.Name(), metav1.GetOptions{})
scalableResource.Spec.Replicas += tc.targetSizeIncrement

_, err = ng.machineController.managementScaleClient.Scales(ng.scalableResource.Namespace()).
Update(context.TODO(), gvr.GroupResource(), scalableResource, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if scalableResource.Spec.Replicas != tc.expected {
t.Errorf("expected %v, got %v", tc.expected, scalableResource.Spec.Replicas)
lastErr := fmt.Errorf("no updates received yet")
for lastErr != nil {
select {
case err = <-ch:
lastErr = err
case <-time.After(1 * time.Second):
t.Fatal(fmt.Errorf("timeout while waiting for update. Last error was: %v", lastErr))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,14 @@ func (r unstructuredScalableResource) ProviderIDs() ([]string, error) {
}

func (r unstructuredScalableResource) Replicas() (int, error) {
gvr, err := r.GroupVersionResource()
replicas, found, err := unstructured.NestedInt64(r.unstructured.UnstructuredContent(), "spec", "replicas")
if err != nil {
return 0, err
return 0, errors.Wrap(err, "error getting replica count")
}

s, err := r.controller.managementScaleClient.Scales(r.Namespace()).Get(context.TODO(), gvr.GroupResource(), r.Name(), metav1.GetOptions{})
if err != nil {
return 0, err
if !found {
replicas = 0
}
if s == nil {
return 0, fmt.Errorf("unknown %s %s/%s", r.Kind(), r.Namespace(), r.Name())
}
return int(s.Spec.Replicas), nil
return int(replicas), nil
}

func (r unstructuredScalableResource) SetSize(nreplicas int) error {
Expand All @@ -119,6 +114,11 @@ func (r unstructuredScalableResource) SetSize(nreplicas int) error {

s.Spec.Replicas = int32(nreplicas)
_, updateErr := r.controller.managementScaleClient.Scales(r.Namespace()).Update(context.TODO(), gvr.GroupResource(), s, metav1.UpdateOptions{})

if updateErr == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I see that we're taking an accounting of the replica count and storing it every time we change the size from within cluster-autoscaler.

What if an out-of-band (i.e., not cluster-autoscaler-originating) replica change occurs. Won't our local replica count be stale in that event?

Lemme know if I'm not understanding everything here.

cc @elmiko @marwanad

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NodeGroups and underlying objects are temporary and re-queries/re-built every time they are needed, e.g. by calling machineController.nodeGroups(). This means that the next run of the autoscaler loop will definitely pick up up-to-date objects. Internally, watches/caches/informers are used to keep the local cache up-to-date.

This is based on my understanding after a few hours of debugging cluster-autoscaler. I'm not very familiar with the code-base and also far away from being a full-blown go-coder, so take my analysis/understanding with a grain of salt :)

updateErr = unstructured.SetNestedField(r.unstructured.UnstructuredContent(), int64(nreplicas), "spec", "replicas")
}

return updateErr
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package clusterapi

import (
"context"
"testing"

"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"
"testing"
"time"
)

func TestSetSize(t *testing.T) {
Expand Down Expand Up @@ -125,19 +128,57 @@ func TestReplicas(t *testing.T) {

s.Spec.Replicas = int32(updatedReplicas)

_, err = sr.controller.managementScaleClient.Scales(testResource.GetNamespace()).
Update(context.TODO(), gvr.GroupResource(), s, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
ch := make(chan error)
checkDone := func(obj interface{}) (bool, error) {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return false, nil
}
sr, err := newUnstructuredScalableResource(controller, u)
if err != nil {
return true, err
}
i, err := sr.Replicas()
if err != nil {
return true, err
}
if i != updatedReplicas {
return true, fmt.Errorf("expected %v, got: %v", updatedReplicas, i)
}
return true, nil
}
handler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
match, err := checkDone(obj)
if match {
ch <- err
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
match, err := checkDone(newObj)
if match {
ch <- err
}
},
}

i, err = sr.Replicas()
controller.machineSetInformer.Informer().AddEventHandler(handler)
controller.machineDeploymentInformer.Informer().AddEventHandler(handler)

_, err = sr.controller.managementScaleClient.Scales(testResource.GetNamespace()).
Update(context.TODO(), gvr.GroupResource(), s, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

if i != updatedReplicas {
t.Errorf("expected %v, got: %v", updatedReplicas, i)
lastErr := fmt.Errorf("no updates received yet")
for lastErr != nil {
select {
case err = <-ch:
lastErr = err
case <-time.After(1 * time.Second):
t.Fatal(fmt.Errorf("timeout while waiting for update. Last error was: %v", lastErr))
}
}
}

Expand Down