Skip to content

Commit

Permalink
Add unit test coverage for machineDeployments. reconcileOldMachineSets
Browse files Browse the repository at this point in the history
  • Loading branch information
enxebre committed Apr 21, 2021
1 parent a0c20fe commit 7040d62
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 32 deletions.
2 changes: 1 addition & 1 deletion controllers/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package controllers
import (
"context"
"fmt"
"sigs.k8s.io/cluster-api/util/collections"
"time"

"github.com/pkg/errors"
Expand All @@ -40,6 +39,7 @@ import (
kubedrain "sigs.k8s.io/cluster-api/third_party/kubernetes-drain"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
Expand Down
66 changes: 35 additions & 31 deletions controllers/machinedeployment_rolling.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// rolloutRolling implements the logic for rolling a new machine set.
// rolloutRolling implements the logic for rolling a new MachineSet.
func (r *MachineDeploymentReconciler) rolloutRolling(ctx context.Context, d *clusterv1.MachineDeployment, msList []*clusterv1.MachineSet) error {
newMS, oldMSs, err := r.getAllMachineSetsAndSyncRevision(ctx, d, msList, true)
if err != nil {
Expand Down Expand Up @@ -101,13 +101,13 @@ func (r *MachineDeploymentReconciler) reconcileOldMachineSets(ctx context.Contex
log := ctrl.LoggerFrom(ctx)

if deployment.Spec.Replicas == nil {
return errors.Errorf("spec replicas for MachineDeployment %q/%q is nil, this is unexpected",
deployment.Namespace, deployment.Name)
return errors.Errorf("spec replicas for MachineDeployment %s is nil, this is unexpected",
client.ObjectKeyFromObject(deployment).String())
}

if newMS.Spec.Replicas == nil {
return errors.Errorf("spec replicas for MachineSet %q/%q is nil, this is unexpected",
newMS.Namespace, newMS.Name)
return errors.Errorf("spec replicas for MachineSet %s is nil, this is unexpected",
client.ObjectKeyFromObject(newMS).String())
}

oldMachinesCount := mdutil.GetReplicaCountForMachineSets(oldMSs)
Expand All @@ -117,18 +117,18 @@ func (r *MachineDeploymentReconciler) reconcileOldMachineSets(ctx context.Contex
}

allMachinesCount := mdutil.GetReplicaCountForMachineSets(allMSs)
log.V(4).Info("New machine set has available machines",
"machineset", newMS.Name, "count", newMS.Status.AvailableReplicas)
log.V(4).Info("New MachineSet has available machines",
"machineset", client.ObjectKeyFromObject(newMS).String(), "available-replicas", newMS.Status.AvailableReplicas)
maxUnavailable := mdutil.MaxUnavailable(*deployment)

// Check if we can scale down. We can scale down in the following 2 cases:
// * Some old machine sets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
// * Some old MachineSets have unhealthy replicas, we could safely scale down those unhealthy replicas since that won't further
// increase unavailability.
// * New machine set has scaled up and it's replicas becomes ready, then we can scale down old machine sets in a further step.
// * New MachineSet has scaled up and it's replicas becomes ready, then we can scale down old MachineSets in a further step.
//
// maxScaledDown := allMachinesCount - minAvailable - newMachineSetMachinesUnavailable
// take into account not only maxUnavailable and any surge machines that have been created, but also unavailable machines from
// the newMS, so that the unavailable machines from the newMS would not make us scale down old machine sets in a further
// the newMS, so that the unavailable machines from the newMS would not make us scale down old MachineSets in a further
// step(that will increase unavailability).
//
// Concrete example:
Expand All @@ -139,18 +139,18 @@ func (r *MachineDeploymentReconciler) reconcileOldMachineSets(ctx context.Contex
//
// case 1:
// * Deployment is updated, newMS is created with 3 replicas, oldMS is scaled down to 8, and newMS is scaled up to 5.
// * The new machine set machines crashloop and never become available.
// * The new MachineSet machines crashloop and never become available.
// * allMachinesCount is 13. minAvailable is 8. newMSMachinesUnavailable is 5.
// * A node fails and causes one of the oldMS machines to become unavailable. However, 13 - 8 - 5 = 0, so the oldMS won't be scaled down.
// * The user notices the crashloop and does kubectl rollout undo to rollback.
// * newMSMachinesUnavailable is 1, since we rolled back to the good machine set, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping machines will be scaled down.
// * newMSMachinesUnavailable is 1, since we rolled back to the good MachineSet, so maxScaledDown = 13 - 8 - 1 = 4. 4 of the crashlooping machines will be scaled down.
// * The total number of machines will then be 9 and the newMS can be scaled up to 10.
//
// case 2:
// Same example, but pushing a new machine template instead of rolling back (aka "roll over"):
// * The new machine set created must start with 0 replicas because allMachinesCount is already at 13.
// * However, newMSMachinesUnavailable would also be 0, so the 2 old machine sets could be scaled down by 5 (13 - 8 - 0), which would then
// allow the new machine set to be scaled up by 5.
// * The new MachineSet created must start with 0 replicas because allMachinesCount is already at 13.
// * However, newMSMachinesUnavailable would also be 0, so the 2 old MachineSets could be scaled down by 5 (13 - 8 - 0), which would then
// allow the new MachineSet to be scaled up by 5.
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable
newMSUnavailableMachineCount := *(newMS.Spec.Replicas) - newMS.Status.AvailableReplicas
maxScaledDown := allMachinesCount - minAvailable - newMSUnavailableMachineCount
Expand All @@ -167,32 +167,33 @@ func (r *MachineDeploymentReconciler) reconcileOldMachineSets(ctx context.Contex

log.V(4).Info("Cleaned up unhealthy replicas from old MachineSets", "count", cleanupCount)

// Scale down old machine sets, need check maxUnavailable to ensure we can scale down
// Scale down old machineSets, need check maxUnavailable to ensure we can scale down
allMSs = oldMSs
allMSs = append(allMSs, newMS)
scaledDownCount, err := r.scaleDownOldMachineSetsForRollingUpdate(ctx, allMSs, oldMSs, deployment)
if err != nil {
return err
}

log.V(4).Info("Scaled down old MachineSets of deployment", "count", scaledDownCount)
log.V(4).Info("Scaled down old MachineSets of machineDeployment", "count", scaledDownCount)
return nil
}

// cleanupUnhealthyReplicas will scale down old machine sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
// cleanupUnhealthyReplicas will scale down old MachineSets with unhealthy replicas, so that all unhealthy replicas will be deleted.
func (r *MachineDeploymentReconciler) cleanupUnhealthyReplicas(ctx context.Context, oldMSs []*clusterv1.MachineSet, deployment *clusterv1.MachineDeployment, maxCleanupCount int32) ([]*clusterv1.MachineSet, int32, error) {
log := ctrl.LoggerFrom(ctx)

sort.Sort(mdutil.MachineSetsByCreationTimestamp(oldMSs))

// Safely scale down all old machine sets with unhealthy replicas. Replica set will sort the machines in the order
// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
// been deleted first and won't increase unavailability.
// Scale down all old MachineSets with any unhealthy replicas. MachineSet will honour Spec.DeletePolicy
// for deleting Machines. Machines with a deletion timestamp, with a failure message or without a nodeRef
// are preferred for all strategies.
// This results in a best effort to remove machines backing unhealthy nodes.
totalScaledDown := int32(0)

for _, targetMS := range oldMSs {
if targetMS.Spec.Replicas == nil {
return nil, 0, errors.Errorf("spec replicas for machine set %v is nil, this is unexpected", targetMS.Name)
return nil, 0, errors.Errorf("spec replicas for MachineSet %v is nil, this is unexpected", client.ObjectKeyFromObject(targetMS).String())
}

if totalScaledDown >= maxCleanupCount {
Expand All @@ -201,12 +202,13 @@ func (r *MachineDeploymentReconciler) cleanupUnhealthyReplicas(ctx context.Conte

oldMSReplicas := *(targetMS.Spec.Replicas)
if oldMSReplicas == 0 {
// cannot scale down this machine set.
// cannot scale down this MachineSet.
continue
}

oldMSAvailableReplicas := targetMS.Status.AvailableReplicas
log.V(4).Info("Found available machines in old MS", "count", oldMSAvailableReplicas, "target-machineset", targetMS.Name)
log.V(4).Info("Found available Machines in old MachineSet",
"count", oldMSAvailableReplicas, "target-machineset", client.ObjectKeyFromObject(targetMS).String())
if oldMSReplicas == oldMSAvailableReplicas {
// no unhealthy replicas found, no scaling required.
continue
Expand All @@ -218,7 +220,8 @@ func (r *MachineDeploymentReconciler) cleanupUnhealthyReplicas(ctx context.Conte
newReplicasCount := oldMSReplicas - scaledDownCount

if newReplicasCount > oldMSReplicas {
return nil, 0, errors.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetMS.Namespace, targetMS.Name, oldMSReplicas, newReplicasCount)
return nil, 0, errors.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s %d -> %d",
client.ObjectKeyFromObject(targetMS).String(), oldMSReplicas, newReplicasCount)
}

if err := r.scaleMachineSet(ctx, targetMS, newReplicasCount, deployment); err != nil {
Expand All @@ -231,22 +234,22 @@ func (r *MachineDeploymentReconciler) cleanupUnhealthyReplicas(ctx context.Conte
return oldMSs, totalScaledDown, nil
}

// scaleDownOldMachineSetsForRollingUpdate scales down old machine sets when deployment strategy is "RollingUpdate".
// scaleDownOldMachineSetsForRollingUpdate scales down old MachineSets when deployment strategy is "RollingUpdate".
// Need check maxUnavailable to ensure availability.
func (r *MachineDeploymentReconciler) scaleDownOldMachineSetsForRollingUpdate(ctx context.Context, allMSs []*clusterv1.MachineSet, oldMSs []*clusterv1.MachineSet, deployment *clusterv1.MachineDeployment) (int32, error) {
log := ctrl.LoggerFrom(ctx)

if deployment.Spec.Replicas == nil {
return 0, errors.Errorf("spec replicas for deployment %v is nil, this is unexpected", deployment.Name)
return 0, errors.Errorf("spec replicas for MachineDeployment %s is nil, this is unexpected", client.ObjectKeyFromObject(deployment).String())
}

maxUnavailable := mdutil.MaxUnavailable(*deployment)

// Check if we can scale down.
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable

// Find the number of available machines.
availableMachineCount := mdutil.GetAvailableReplicaCountForMachineSets(allMSs)

// Check if we can scale down.
if availableMachineCount <= minAvailable {
// Cannot scale down.
return 0, nil
Expand All @@ -260,7 +263,7 @@ func (r *MachineDeploymentReconciler) scaleDownOldMachineSetsForRollingUpdate(ct
totalScaleDownCount := availableMachineCount - minAvailable
for _, targetMS := range oldMSs {
if targetMS.Spec.Replicas == nil {
return 0, errors.Errorf("spec replicas for machine set %v is nil, this is unexpected", targetMS.Name)
return 0, errors.Errorf("spec replicas for MachineSet %s is nil, this is unexpected", client.ObjectKeyFromObject(targetMS).String())
}

if totalScaledDown >= totalScaleDownCount {
Expand All @@ -277,7 +280,8 @@ func (r *MachineDeploymentReconciler) scaleDownOldMachineSetsForRollingUpdate(ct
scaleDownCount := integer.Int32Min(*(targetMS.Spec.Replicas), totalScaleDownCount-totalScaledDown)
newReplicasCount := *(targetMS.Spec.Replicas) - scaleDownCount
if newReplicasCount > *(targetMS.Spec.Replicas) {
return totalScaledDown, errors.Errorf("when scaling down old MS, got invalid request to scale down %s/%s %d -> %d", targetMS.Namespace, targetMS.Name, *(targetMS.Spec.Replicas), newReplicasCount)
return totalScaledDown, errors.Errorf("when scaling down old MachineSet, got invalid request to scale down %s %d -> %d",
client.ObjectKeyFromObject(targetMS).String(), *(targetMS.Spec.Replicas), newReplicasCount)
}

if err := r.scaleMachineSet(ctx, targetMS, newReplicasCount, deployment); err != nil {
Expand Down
185 changes: 185 additions & 0 deletions controllers/machinedeployment_rolling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,3 +219,188 @@ func TestReconcileNewMachineSet(t *testing.T) {
})
}
}

func TestReconcileOldMachineSets(t *testing.T) {
testCases := []struct {
name string
machineDeployment *clusterv1.MachineDeployment
newMachineSet *clusterv1.MachineSet
oldMachineSets []*clusterv1.MachineSet
expectedOldMachineSetsReplicas int
error error
}{
{
name: "It fails when machineDeployment has no replicas",
machineDeployment: &clusterv1.MachineDeployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
},
newMachineSet: &clusterv1.MachineSet{
Spec: clusterv1.MachineSetSpec{
Replicas: pointer.Int32Ptr(2),
},
},
error: errors.Errorf("spec replicas for MachineDeployment foo/bar is nil, this is unexpected"),
},
{
name: "It fails when new machineSet has no replicas",
machineDeployment: &clusterv1.MachineDeployment{
Spec: clusterv1.MachineDeploymentSpec{
Replicas: pointer.Int32Ptr(2),
},
},
newMachineSet: &clusterv1.MachineSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
},
error: errors.Errorf("spec replicas for MachineSet foo/bar is nil, this is unexpected"),
},
{
name: "RollingUpdate strategy: Scale down old MachineSets when all new replicas are available",
machineDeployment: &clusterv1.MachineDeployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: clusterv1.MachineDeploymentSpec{
Strategy: &clusterv1.MachineDeploymentStrategy{
Type: clusterv1.RollingUpdateMachineDeploymentStrategyType,
RollingUpdate: &clusterv1.MachineRollingUpdateDeployment{
MaxUnavailable: intOrStrPtr(1),
MaxSurge: intOrStrPtr(3),
},
},
Replicas: pointer.Int32Ptr(2),
},
},
newMachineSet: &clusterv1.MachineSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: clusterv1.MachineSetSpec{
Replicas: pointer.Int32Ptr(0),
},
Status: clusterv1.MachineSetStatus{
AvailableReplicas: 2,
},
},
oldMachineSets: []*clusterv1.MachineSet{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "2replicas",
},
Spec: clusterv1.MachineSetSpec{
Replicas: pointer.Int32Ptr(2),
},
Status: clusterv1.MachineSetStatus{
AvailableReplicas: 2,
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "1replicas",
},
Spec: clusterv1.MachineSetSpec{
Replicas: pointer.Int32Ptr(1),
},
Status: clusterv1.MachineSetStatus{
AvailableReplicas: 1,
},
},
},
expectedOldMachineSetsReplicas: 0,
},
{
name: "RollingUpdate strategy: It does not scale down old MachineSets when above maxUnavailable",
machineDeployment: &clusterv1.MachineDeployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: clusterv1.MachineDeploymentSpec{
Strategy: &clusterv1.MachineDeploymentStrategy{
Type: clusterv1.RollingUpdateMachineDeploymentStrategyType,
RollingUpdate: &clusterv1.MachineRollingUpdateDeployment{
MaxUnavailable: intOrStrPtr(2),
MaxSurge: intOrStrPtr(3),
},
},
Replicas: pointer.Int32Ptr(10),
},
},
newMachineSet: &clusterv1.MachineSet{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "bar",
},
Spec: clusterv1.MachineSetSpec{
Replicas: pointer.Int32Ptr(5),
},
Status: clusterv1.MachineSetStatus{
Replicas: 5,
ReadyReplicas: 0,
AvailableReplicas: 0,
},
},
oldMachineSets: []*clusterv1.MachineSet{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "foo",
Name: "8replicas",
},
Spec: clusterv1.MachineSetSpec{
Replicas: pointer.Int32Ptr(8),
},
Status: clusterv1.MachineSetStatus{
Replicas: 10,
ReadyReplicas: 8,
AvailableReplicas: 8,
},
},
},
expectedOldMachineSetsReplicas: 8,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)

g.Expect(clusterv1.AddToScheme(scheme.Scheme)).To(Succeed())

resources := []client.Object{
tc.machineDeployment,
}

allMachineSets := append(tc.oldMachineSets, tc.newMachineSet)
for key := range allMachineSets {
resources = append(resources, allMachineSets[key])
}

r := &MachineDeploymentReconciler{
Client: fake.NewClientBuilder().WithObjects(resources...).Build(),
recorder: record.NewFakeRecorder(32),
}

err := r.reconcileOldMachineSets(ctx, allMachineSets, tc.oldMachineSets, tc.newMachineSet, tc.machineDeployment)
if tc.error != nil {
g.Expect(err.Error()).To(BeEquivalentTo(tc.error.Error()))
return
}

g.Expect(err).ToNot(HaveOccurred())
for key := range tc.oldMachineSets {
freshOldMachineSet := &clusterv1.MachineSet{}
err = r.Client.Get(ctx, client.ObjectKeyFromObject(tc.oldMachineSets[key]), freshOldMachineSet)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(*freshOldMachineSet.Spec.Replicas).To(BeEquivalentTo(tc.expectedOldMachineSetsReplicas))
}
})
}
}

0 comments on commit 7040d62

Please sign in to comment.