Skip to content

Commit

Permalink
Moves machineset patching to defer call
Browse files Browse the repository at this point in the history
This is to align the machineset controller with the other controllers
which defer the patching of the object to the end of their reconcile loop.

Signed-off-by: Sagar Muchhal <[email protected]>
Co-authored-by: Vince Prignano <[email protected]>
  • Loading branch information
srm09 and vincepri committed Dec 4, 2020
1 parent d87a39c commit 2cd9823
Showing 1 changed file with 46 additions and 66 deletions.
112 changes: 46 additions & 66 deletions controllers/machineset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (r *MachineSetReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Ma
return nil
}

func (r *MachineSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
func (r *MachineSetReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Result, reterr error) {
log := ctrl.LoggerFrom(ctx)

machineSet := &clusterv1.MachineSet{}
Expand All @@ -135,6 +135,19 @@ func (r *MachineSetReconciler) Reconcile(ctx context.Context, req ctrl.Request)
return ctrl.Result{}, nil
}

// Initialize the patch helper
patchHelper, err := patch.NewHelper(machineSet, r.Client)
if err != nil {
return ctrl.Result{}, err
}

defer func() {
// Always attempt to patch the object and status after each reconciliation.
if err := patchHelper.Patch(ctx, machineSet); err != nil {
reterr = kerrors.NewAggregate([]error{reterr, err})
}
}()

// Ignore deleted MachineSets, this can happen when foregroundDeletion
// is enabled
if !machineSet.DeletionTimestamp.IsZero() {
Expand All @@ -160,17 +173,12 @@ func (r *MachineSetReconciler) reconcile(ctx context.Context, cluster *clusterv1
machineSet.Labels[clusterv1.ClusterLabelName] = machineSet.Spec.ClusterName

if r.shouldAdopt(machineSet) {
patch := client.MergeFrom(machineSet.DeepCopy())
machineSet.OwnerReferences = util.EnsureOwnerRef(machineSet.OwnerReferences, metav1.OwnerReference{
APIVersion: clusterv1.GroupVersion.String(),
Kind: "Cluster",
Name: cluster.Name,
UID: cluster.UID,
})
// Patch using a deep copy to avoid overwriting any unexpected Status changes from the returned result
if err := r.Client.Patch(ctx, machineSet.DeepCopy(), patch); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to add OwnerReference to MachineSet %s/%s", machineSet.Namespace, machineSet.Name)
}
}

// Make sure to reconcile the external infrastructure reference.
Expand Down Expand Up @@ -250,28 +258,18 @@ func (r *MachineSetReconciler) reconcile(ctx context.Context, cluster *clusterv1

syncErr := r.syncReplicas(ctx, machineSet, filteredMachines)

ms := machineSet.DeepCopy()
newStatus, err := r.calculateStatus(ctx, cluster, ms, filteredMachines)
if err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to calculate MachineSet's Status")
}

// Always updates status as machines come up or die.
updatedMS, err := r.patchMachineSetStatus(ctx, machineSet, newStatus)
if err != nil {
if syncErr != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to sync machines: %v. failed to patch MachineSet's Status", syncErr)
}
return ctrl.Result{}, errors.Wrap(err, "failed to patch MachineSet's Status")
if err := r.updateStatus(ctx, cluster, machineSet, filteredMachines); err != nil {
return ctrl.Result{}, errors.Wrapf(kerrors.NewAggregate([]error{err, syncErr}), "failed to update MachineSet's Status")
}

if syncErr != nil {
return ctrl.Result{}, errors.Wrapf(syncErr, "failed to sync MachineSet replicas")
}

var replicas int32
if updatedMS.Spec.Replicas != nil {
replicas = *updatedMS.Spec.Replicas
if machineSet.Spec.Replicas != nil {
replicas = *machineSet.Spec.Replicas
}

// Resync the MachineSet after MinReadySeconds as a last line of defense to guard against clock-skew.
Expand All @@ -281,15 +279,15 @@ func (r *MachineSetReconciler) reconcile(ctx context.Context, cluster *clusterv1
// exceeds MinReadySeconds could be incorrect.
// To avoid an available replica stuck in the ready state, we force a reconcile after MinReadySeconds,
// at which point it should confirm any available replica to be available.
if updatedMS.Spec.MinReadySeconds > 0 &&
updatedMS.Status.ReadyReplicas == replicas &&
updatedMS.Status.AvailableReplicas != replicas {
if machineSet.Spec.MinReadySeconds > 0 &&
machineSet.Status.ReadyReplicas == replicas &&
machineSet.Status.AvailableReplicas != replicas {

return ctrl.Result{RequeueAfter: time.Duration(updatedMS.Spec.MinReadySeconds) * time.Second}, nil
return ctrl.Result{RequeueAfter: time.Duration(machineSet.Spec.MinReadySeconds) * time.Second}, nil
}

// Quickly rereconcile until the nodes become Ready.
if updatedMS.Status.ReadyReplicas != replicas {
// Quickly reconcile until the nodes become Ready.
if machineSet.Status.ReadyReplicas != replicas {
log.V(4).Info("Some nodes are not ready yet, requeuing until they are ready")
return ctrl.Result{RequeueAfter: 15 * time.Second}, nil
}
Expand Down Expand Up @@ -585,15 +583,17 @@ func (r *MachineSetReconciler) shouldAdopt(ms *clusterv1.MachineSet) bool {
return !util.HasOwner(ms.OwnerReferences, clusterv1.GroupVersion.String(), []string{"MachineDeployment", "Cluster"})
}

func (r *MachineSetReconciler) calculateStatus(ctx context.Context, cluster *clusterv1.Cluster, ms *clusterv1.MachineSet, filteredMachines []*clusterv1.Machine) (*clusterv1.MachineSetStatus, error) {
// updateStatus updates the Status field for the MachineSet
// It checks for the current state of the replicas and updates the Status of the MachineSet
func (r *MachineSetReconciler) updateStatus(ctx context.Context, cluster *clusterv1.Cluster, ms *clusterv1.MachineSet, filteredMachines []*clusterv1.Machine) error {
log := ctrl.LoggerFrom(ctx)
newStatus := ms.Status.DeepCopy()

// Copy label selector to its status counterpart in string format.
// This is necessary for CRDs including scale subresources.
selector, err := metav1.LabelSelectorAsSelector(&ms.Spec.Selector)
if err != nil {
return nil, errors.Wrapf(err, "failed to calculate status for MachineSet %s/%s", ms.Namespace, ms.Name)
return errors.Wrapf(err, "failed to update status for MachineSet %s/%s", ms.Namespace, ms.Name)
}
newStatus.Selector = selector.String()

Expand Down Expand Up @@ -635,47 +635,27 @@ func (r *MachineSetReconciler) calculateStatus(ctx context.Context, cluster *clu
newStatus.FullyLabeledReplicas = int32(fullyLabeledReplicasCount)
newStatus.ReadyReplicas = int32(readyReplicasCount)
newStatus.AvailableReplicas = int32(availableReplicasCount)
return newStatus, nil
}

// patchMachineSetStatus attempts to update the Status.Replicas of the given MachineSet.
func (r *MachineSetReconciler) patchMachineSetStatus(ctx context.Context, ms *clusterv1.MachineSet, newStatus *clusterv1.MachineSetStatus) (*clusterv1.MachineSet, error) {
log := ctrl.LoggerFrom(ctx)

// This is the steady state. It happens when the MachineSet doesn't have any expectations, since
// we do a periodic relist every 10 minutes. If the generations differ but the replicas are
// the same, a caller might've resized to the same replica count.
if ms.Status.Replicas == newStatus.Replicas &&
ms.Status.FullyLabeledReplicas == newStatus.FullyLabeledReplicas &&
ms.Status.ReadyReplicas == newStatus.ReadyReplicas &&
ms.Status.AvailableReplicas == newStatus.AvailableReplicas &&
ms.Generation == ms.Status.ObservedGeneration {
return ms, nil
// Copy the newly calculated status into the machineset
if ms.Status.Replicas != newStatus.Replicas ||
ms.Status.FullyLabeledReplicas != newStatus.FullyLabeledReplicas ||
ms.Status.ReadyReplicas != newStatus.ReadyReplicas ||
ms.Status.AvailableReplicas != newStatus.AvailableReplicas ||
ms.Generation != ms.Status.ObservedGeneration {
// Save the generation number we acted on, otherwise we might wrongfully indicate
// that we've seen a spec update when we retry.
newStatus.ObservedGeneration = ms.Generation
newStatus.DeepCopyInto(&ms.Status)

log.V(4).Info(fmt.Sprintf("Updating status for %v: %s/%s, ", ms.Kind, ms.Namespace, ms.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", ms.Status.Replicas, newStatus.Replicas, *ms.Spec.Replicas) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", ms.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", ms.Status.ReadyReplicas, newStatus.ReadyReplicas) +
fmt.Sprintf("availableReplicas %d->%d, ", ms.Status.AvailableReplicas, newStatus.AvailableReplicas) +
fmt.Sprintf("sequence No: %v->%v", ms.Status.ObservedGeneration, newStatus.ObservedGeneration))
}

patch := client.MergeFrom(ms.DeepCopyObject())

// Save the generation number we acted on, otherwise we might wrongfully indicate
// that we've seen a spec update when we retry.
newStatus.ObservedGeneration = ms.Generation

// Calculate the replicas for logging.
var replicas int32
if ms.Spec.Replicas != nil {
replicas = *ms.Spec.Replicas
}
log.V(4).Info(fmt.Sprintf("Updating status for %v: %s/%s, ", ms.Kind, ms.Namespace, ms.Name) +
fmt.Sprintf("replicas %d->%d (need %d), ", ms.Status.Replicas, newStatus.Replicas, replicas) +
fmt.Sprintf("fullyLabeledReplicas %d->%d, ", ms.Status.FullyLabeledReplicas, newStatus.FullyLabeledReplicas) +
fmt.Sprintf("readyReplicas %d->%d, ", ms.Status.ReadyReplicas, newStatus.ReadyReplicas) +
fmt.Sprintf("availableReplicas %d->%d, ", ms.Status.AvailableReplicas, newStatus.AvailableReplicas) +
fmt.Sprintf("sequence No: %v->%v", ms.Status.ObservedGeneration, newStatus.ObservedGeneration))

newStatus.DeepCopyInto(&ms.Status)
if err := r.Client.Status().Patch(ctx, ms, patch); err != nil {
return nil, err
}
return ms, nil
return nil
}

func (r *MachineSetReconciler) getMachineNode(ctx context.Context, cluster *clusterv1.Cluster, machine *clusterv1.Machine) (*corev1.Node, error) {
Expand Down

0 comments on commit 2cd9823

Please sign in to comment.