Skip to content

Commit

Permalink
Merge pull request #4443 from codablock/fix-rate-limitting
Browse files Browse the repository at this point in the history
[clusterapi] Rely on replica count found in unstructuredScalableResource
  • Loading branch information
k8s-ci-robot authored Dec 14, 2021
2 parents 56f37ff + 897c208 commit 12efcce
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package clusterapi
import (
"context"
"fmt"
"k8s.io/client-go/tools/cache"
"path"
"sort"
"strings"
Expand Down Expand Up @@ -425,42 +426,79 @@ func TestNodeGroupDecreaseTargetSize(t *testing.T) {
t.Fatalf("unexpected error: %v", err)
}

scalableResource.Spec.Replicas += tc.targetSizeIncrement
ch := make(chan error)
checkDone := func(obj interface{}) (bool, error) {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return false, nil
}
if u.GetResourceVersion() != scalableResource.GetResourceVersion() {
return false, nil
}
ng, err := newNodeGroupFromScalableResource(controller, u)
if err != nil {
return true, fmt.Errorf("unexpected error: %v", err)
}
if ng == nil {
return false, nil
}
currReplicas, err := ng.TargetSize()
if err != nil {
return true, fmt.Errorf("unexpected error: %v", err)
}

_, err = ng.machineController.managementScaleClient.Scales(ng.scalableResource.Namespace()).
Update(context.TODO(), gvr.GroupResource(), scalableResource, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if currReplicas != int(tc.initial)+int(tc.targetSizeIncrement) {
return true, fmt.Errorf("expected %v, got %v", tc.initial+tc.targetSizeIncrement, currReplicas)
}

// A nodegroup is immutable; get a fresh copy after adding targetSizeIncrement.
nodegroups, err = controller.nodeGroups()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
ng = nodegroups[0]
if err := ng.DecreaseTargetSize(tc.delta); (err != nil) != tc.expectedError {
return true, fmt.Errorf("expected error: %v, got: %v", tc.expectedError, err)
}

currReplicas, err := ng.TargetSize()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
scalableResource, err := controller.managementScaleClient.Scales(testConfig.spec.namespace).
Get(context.TODO(), gvr.GroupResource(), ng.scalableResource.Name(), metav1.GetOptions{})
if err != nil {
return true, fmt.Errorf("unexpected error: %v", err)
}

if currReplicas != int(tc.initial)+int(tc.targetSizeIncrement) {
t.Errorf("initially expected %v, got %v", tc.initial, currReplicas)
if scalableResource.Spec.Replicas != tc.expected {
return true, fmt.Errorf("expected %v, got %v", tc.expected, scalableResource.Spec.Replicas)
}
return true, nil
}

if err := ng.DecreaseTargetSize(tc.delta); (err != nil) != tc.expectedError {
t.Fatalf("expected error: %v, got: %v", tc.expectedError, err)
handler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
match, err := checkDone(obj)
if match {
ch <- err
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
match, err := checkDone(newObj)
if match {
ch <- err
}
},
}
controller.machineSetInformer.Informer().AddEventHandler(handler)
controller.machineDeploymentInformer.Informer().AddEventHandler(handler)

scalableResource, err = controller.managementScaleClient.Scales(testConfig.spec.namespace).
Get(context.TODO(), gvr.GroupResource(), ng.scalableResource.Name(), metav1.GetOptions{})
scalableResource.Spec.Replicas += tc.targetSizeIncrement

_, err = ng.machineController.managementScaleClient.Scales(ng.scalableResource.Namespace()).
Update(context.TODO(), gvr.GroupResource(), scalableResource, metav1.UpdateOptions{})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if scalableResource.Spec.Replicas != tc.expected {
t.Errorf("expected %v, got %v", tc.expected, scalableResource.Spec.Replicas)
lastErr := fmt.Errorf("no updates received yet")
for lastErr != nil {
select {
case err = <-ch:
lastErr = err
case <-time.After(1 * time.Second):
t.Fatal(fmt.Errorf("timeout while waiting for update. Last error was: %v", lastErr))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,14 @@ func (r unstructuredScalableResource) ProviderIDs() ([]string, error) {
}

func (r unstructuredScalableResource) Replicas() (int, error) {
gvr, err := r.GroupVersionResource()
replicas, found, err := unstructured.NestedInt64(r.unstructured.UnstructuredContent(), "spec", "replicas")
if err != nil {
return 0, err
return 0, errors.Wrap(err, "error getting replica count")
}

s, err := r.controller.managementScaleClient.Scales(r.Namespace()).Get(context.TODO(), gvr.GroupResource(), r.Name(), metav1.GetOptions{})
if err != nil {
return 0, err
if !found {
replicas = 0
}
if s == nil {
return 0, fmt.Errorf("unknown %s %s/%s", r.Kind(), r.Namespace(), r.Name())
}
return int(s.Spec.Replicas), nil
return int(replicas), nil
}

func (r unstructuredScalableResource) SetSize(nreplicas int) error {
Expand All @@ -119,6 +114,11 @@ func (r unstructuredScalableResource) SetSize(nreplicas int) error {

s.Spec.Replicas = int32(nreplicas)
_, updateErr := r.controller.managementScaleClient.Scales(r.Namespace()).Update(context.TODO(), gvr.GroupResource(), s, metav1.UpdateOptions{})

if updateErr == nil {
updateErr = unstructured.SetNestedField(r.unstructured.UnstructuredContent(), int64(nreplicas), "spec", "replicas")
}

return updateErr
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package clusterapi

import (
"context"
"testing"

"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/cache"
"testing"
"time"
)

func TestSetSize(t *testing.T) {
Expand Down Expand Up @@ -125,19 +128,57 @@ func TestReplicas(t *testing.T) {

s.Spec.Replicas = int32(updatedReplicas)

_, err = sr.controller.managementScaleClient.Scales(testResource.GetNamespace()).
Update(context.TODO(), gvr.GroupResource(), s, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
ch := make(chan error)
checkDone := func(obj interface{}) (bool, error) {
u, ok := obj.(*unstructured.Unstructured)
if !ok {
return false, nil
}
sr, err := newUnstructuredScalableResource(controller, u)
if err != nil {
return true, err
}
i, err := sr.Replicas()
if err != nil {
return true, err
}
if i != updatedReplicas {
return true, fmt.Errorf("expected %v, got: %v", updatedReplicas, i)
}
return true, nil
}
handler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
match, err := checkDone(obj)
if match {
ch <- err
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
match, err := checkDone(newObj)
if match {
ch <- err
}
},
}

i, err = sr.Replicas()
controller.machineSetInformer.Informer().AddEventHandler(handler)
controller.machineDeploymentInformer.Informer().AddEventHandler(handler)

_, err = sr.controller.managementScaleClient.Scales(testResource.GetNamespace()).
Update(context.TODO(), gvr.GroupResource(), s, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

if i != updatedReplicas {
t.Errorf("expected %v, got: %v", updatedReplicas, i)
lastErr := fmt.Errorf("no updates received yet")
for lastErr != nil {
select {
case err = <-ch:
lastErr = err
case <-time.After(1 * time.Second):
t.Fatal(fmt.Errorf("timeout while waiting for update. Last error was: %v", lastErr))
}
}
}

Expand Down

0 comments on commit 12efcce

Please sign in to comment.