Skip to content

Commit

Permalink
Refactor scaleMachineSet
Browse files Browse the repository at this point in the history
This drops the presumably unncessary split between scaleMachineSet and scaleMachineSetOperation and cover scaleMachineSet with unit tests.
  • Loading branch information
enxebre committed Apr 21, 2021
1 parent 46dc43a commit 69c00b8
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 40 deletions.
63 changes: 23 additions & 40 deletions controllers/machinedeployment_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,14 +293,11 @@ func (r *MachineDeploymentReconciler) scale(ctx context.Context, deployment *clu
// drives what happens in case we are trying to scale machine sets of the same size.
// In such a case when scaling up, we should scale up newer machine sets first, and
// when scaling down, we should scale down older machine sets first.
var scalingOperation string
switch {
case deploymentReplicasToAdd > 0:
sort.Sort(mdutil.MachineSetsBySizeNewer(allMSs))
scalingOperation = "up"
case deploymentReplicasToAdd < 0:
sort.Sort(mdutil.MachineSetsBySizeOlder(allMSs))
scalingOperation = "down"
}

// Iterate over all active machine sets and estimate proportions for each of them.
Expand Down Expand Up @@ -340,7 +337,7 @@ func (r *MachineDeploymentReconciler) scale(ctx context.Context, deployment *clu
}

// TODO: Use transactions when we have them.
if err := r.scaleMachineSetOperation(ctx, ms, nameToSize[ms.Name], deployment, scalingOperation); err != nil {
if err := r.scaleMachineSet(ctx, ms, nameToSize[ms.Name], deployment); err != nil {
// Return as soon as we fail, the deployment is requeued
return err
}
Expand Down Expand Up @@ -406,55 +403,41 @@ func calculateStatus(allMSs []*clusterv1.MachineSet, newMS *clusterv1.MachineSet

func (r *MachineDeploymentReconciler) scaleMachineSet(ctx context.Context, ms *clusterv1.MachineSet, newScale int32, deployment *clusterv1.MachineDeployment) error {
if ms.Spec.Replicas == nil {
return errors.Errorf("spec replicas for machine set %v is nil, this is unexpected", ms.Name)
return errors.Errorf("spec replicas for MachineSet %s is nil, this is unexpected", client.ObjectKeyFromObject(ms).String())
}

// No need to scale
if *(ms.Spec.Replicas) == newScale {
return nil
}

var scalingOperation string
if *(ms.Spec.Replicas) < newScale {
scalingOperation = "up"
} else {
scalingOperation = "down"
}

return r.scaleMachineSetOperation(ctx, ms, newScale, deployment, scalingOperation)
}

func (r *MachineDeploymentReconciler) scaleMachineSetOperation(ctx context.Context, ms *clusterv1.MachineSet, newScale int32, deployment *clusterv1.MachineDeployment, scaleOperation string) error {
if ms.Spec.Replicas == nil {
return errors.Errorf("spec replicas for machine set %v is nil, this is unexpected", ms.Name)
}

sizeNeedsUpdate := *(ms.Spec.Replicas) != newScale

annotationsNeedUpdate := mdutil.ReplicasAnnotationsNeedUpdate(
ms,
*(deployment.Spec.Replicas),
*(deployment.Spec.Replicas)+mdutil.MaxSurge(*deployment),
)

if sizeNeedsUpdate || annotationsNeedUpdate {
patchHelper, err := patch.NewHelper(ms, r.Client)
if err != nil {
return err
}
// No need to scale nor setting annotations.
if *(ms.Spec.Replicas) == newScale && !annotationsNeedUpdate {
return nil
}

patchHelper, err := patch.NewHelper(ms, r.Client)
if err != nil {
return err
}

*(ms.Spec.Replicas) = newScale
mdutil.SetReplicasAnnotations(ms, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+mdutil.MaxSurge(*deployment))
// Save original replicas to log in event.
originalReplicas := *(ms.Spec.Replicas)

err = patchHelper.Patch(ctx, ms)
if err != nil {
r.recorder.Eventf(deployment, corev1.EventTypeWarning, "FailedScale", "Failed to scale MachineSet %q: %v", ms.Name, err)
} else if sizeNeedsUpdate {
r.recorder.Eventf(deployment, corev1.EventTypeNormal, "SuccessfulScale", "Scaled %s MachineSet %q to %d", scaleOperation, ms.Name, newScale)
}
// Mutate.
ms.Spec.Replicas = &newScale
mdutil.SetReplicasAnnotations(ms, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+mdutil.MaxSurge(*deployment))

if err = patchHelper.Patch(ctx, ms); err != nil {
r.recorder.Eventf(deployment, corev1.EventTypeWarning, "FailedScale", "Failed to scale MachineSet %s: %v",
client.ObjectKeyFromObject(ms).String(), err)
return err
}

r.recorder.Eventf(deployment, corev1.EventTypeNormal, "SuccessfulScale", "Scaled MachineSet %s: %d -> %d",
client.ObjectKeyFromObject(ms).String(), originalReplicas, *ms.Spec.Replicas)

return nil
}

Expand Down
158 changes: 158 additions & 0 deletions controllers/machinedeployment_sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,21 @@ limitations under the License.
package controllers

import (
"context"
"fmt"
"testing"

. "github.com/onsi/gomega"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
"sigs.k8s.io/cluster-api/controllers/mdutil"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
)

func TestCalculateStatus(t *testing.T) {
Expand Down Expand Up @@ -219,3 +227,153 @@ func TestCalculateStatus(t *testing.T) {
})
}
}

func TestScaleMachineSet(t *testing.T) {
testCases := []struct {
name string
machineDeployment *clusterv1.MachineDeployment
machineSet *clusterv1.MachineSet
newScale int32
error error
}{
{
name: "It fails when new machineSet has no replicas",
machineDeployment: &clusterv1.MachineDeployment{
Spec: clusterv1.MachineDeploymentSpec{
Replicas: pointer.Int32Ptr(2),
},
},
machineSet: &clusterv1.MachineSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
},
error: errors.Errorf("spec replicas for MachineSet foo/bar is nil, this is unexpected"),
},
{
name: "Scale up",
machineDeployment: &clusterv1.MachineDeployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: clusterv1.MachineDeploymentSpec{
Strategy: &clusterv1.MachineDeploymentStrategy{
Type: clusterv1.RollingUpdateMachineDeploymentStrategyType,
RollingUpdate: &clusterv1.MachineRollingUpdateDeployment{
MaxUnavailable: intOrStrPtr(0),
MaxSurge: intOrStrPtr(2),
},
},
Replicas: pointer.Int32Ptr(2),
},
},
machineSet: &clusterv1.MachineSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: clusterv1.MachineSetSpec{
Replicas: pointer.Int32Ptr(0),
},
},
newScale: 2,
},
{
name: "Scale down",
machineDeployment: &clusterv1.MachineDeployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: clusterv1.MachineDeploymentSpec{
Strategy: &clusterv1.MachineDeploymentStrategy{
Type: clusterv1.RollingUpdateMachineDeploymentStrategyType,
RollingUpdate: &clusterv1.MachineRollingUpdateDeployment{
MaxUnavailable: intOrStrPtr(0),
MaxSurge: intOrStrPtr(2),
},
},
Replicas: pointer.Int32Ptr(2),
},
},
machineSet: &clusterv1.MachineSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: clusterv1.MachineSetSpec{
Replicas: pointer.Int32Ptr(4),
},
},
newScale: 2,
},
{
name: "Same replicas does not scale",
machineDeployment: &clusterv1.MachineDeployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: clusterv1.MachineDeploymentSpec{
Strategy: &clusterv1.MachineDeploymentStrategy{
Type: clusterv1.RollingUpdateMachineDeploymentStrategyType,
RollingUpdate: &clusterv1.MachineRollingUpdateDeployment{
MaxUnavailable: intOrStrPtr(0),
MaxSurge: intOrStrPtr(2),
},
},
Replicas: pointer.Int32Ptr(2),
},
},
machineSet: &clusterv1.MachineSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: clusterv1.MachineSetSpec{
Replicas: pointer.Int32Ptr(2),
},
},
newScale: 2,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)

g.Expect(clusterv1.AddToScheme(scheme.Scheme)).To(Succeed())

resources := []client.Object{
tc.machineDeployment,
tc.machineSet,
}

r := &MachineDeploymentReconciler{
Client: fake.NewClientBuilder().WithObjects(resources...).Build(),
recorder: record.NewFakeRecorder(32),
}

err := r.scaleMachineSet(context.Background(), tc.machineSet, tc.newScale, tc.machineDeployment)
if tc.error != nil {
g.Expect(err.Error()).To(BeEquivalentTo(tc.error.Error()))
return
}

g.Expect(err).ToNot(HaveOccurred())

freshMachineSet := &clusterv1.MachineSet{}
err = r.Client.Get(ctx, client.ObjectKeyFromObject(tc.machineSet), freshMachineSet)
g.Expect(err).ToNot(HaveOccurred())

g.Expect(*freshMachineSet.Spec.Replicas).To(BeEquivalentTo(tc.newScale))

expectedMachineSetAnnotations := map[string]string{
clusterv1.DesiredReplicasAnnotation: fmt.Sprintf("%d", *tc.machineDeployment.Spec.Replicas),
clusterv1.MaxReplicasAnnotation: fmt.Sprintf("%d", (*tc.machineDeployment.Spec.Replicas)+mdutil.MaxSurge(*tc.machineDeployment)),
}
g.Expect(freshMachineSet.GetAnnotations()).To(BeEquivalentTo(expectedMachineSetAnnotations))
})
}
}

0 comments on commit 69c00b8

Please sign in to comment.