Skip to content

Commit

Permalink
Merge pull request #4346 from relyt0925/ondelete-rollout-strategy
Browse files Browse the repository at this point in the history
✨  OnDelete rollout strategy
  • Loading branch information
k8s-ci-robot authored Apr 14, 2021
2 parents d270d95 + 2469f74 commit 3bc596e
Show file tree
Hide file tree
Showing 9 changed files with 250 additions and 6 deletions.
5 changes: 5 additions & 0 deletions api/v1alpha4/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ const (
// on the reconciled object.
PausedAnnotation = "cluster.x-k8s.io/paused"

// DisableMachineCreate is an annotation that can be used to signal a MachineSet to stop creating new machines.
// It is utilized in the OnDelete MachineDeploymentStrategy to allow the MachineDeployment controller to scale down
// older MachineSets when Machines are deleted and add the new replicas to the latest MachineSet.
DisableMachineCreate = "cluster.x-k8s.io/disable-machine-create"

// WatchLabel is a label othat can be applied to any Cluster API object.
//
// Controllers which allow for selective reconciliation may check this label and proceed
Expand Down
7 changes: 5 additions & 2 deletions api/v1alpha4/machinedeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
// i.e. gradually scale down the old MachineSet and scale up the new one.
RollingUpdateMachineDeploymentStrategyType MachineDeploymentStrategyType = "RollingUpdate"

// OnDeleteMachineDeploymentStrategyType replaces old MachineSets when the deletion of the associated machines are completed.
OnDeleteMachineDeploymentStrategyType MachineDeploymentStrategyType = "OnDelete"

// RevisionAnnotation is the revision annotation of a machine deployment's machine sets which records its rollout sequence.
RevisionAnnotation = "machinedeployment.clusters.x-k8s.io/revision"
// RevisionHistoryAnnotation maintains the history of all old revisions that a machine set has served for a machine deployment.
Expand Down Expand Up @@ -101,9 +104,9 @@ type MachineDeploymentSpec struct {
// MachineDeploymentStrategy describes how to replace existing machines
// with new ones.
type MachineDeploymentStrategy struct {
// Type of deployment. Currently the only supported strategy is
// "RollingUpdate".
// Type of deployment.
// Default is RollingUpdate.
// +kubebuilder:validation:Enum=RollingUpdate;OnDelete
// +optional
Type MachineDeploymentStrategyType `json:"type,omitempty"`

Expand Down
5 changes: 4 additions & 1 deletion config/crd/bases/cluster.x-k8s.io_machinedeployments.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,10 @@ spec:
x-kubernetes-int-or-string: true
type: object
type:
description: Type of deployment. Currently the only supported strategy is "RollingUpdate". Default is RollingUpdate.
description: Type of deployment. Default is RollingUpdate.
enum:
- RollingUpdate
- OnDelete
type: string
type: object
template:
Expand Down
4 changes: 4 additions & 0 deletions controllers/machinedeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,10 @@ func (r *MachineDeploymentReconciler) reconcile(ctx context.Context, cluster *cl
return ctrl.Result{}, r.rolloutRolling(ctx, d, msList)
}

if d.Spec.Strategy.Type == clusterv1.OnDeleteMachineDeploymentStrategyType {
return ctrl.Result{}, r.rolloutOnDelete(ctx, d, msList)
}

return ctrl.Result{}, errors.Errorf("unexpected deployment strategy type: %s", d.Spec.Strategy.Type)
}

Expand Down
180 changes: 180 additions & 0 deletions controllers/machinedeployment_rollout_ondelete.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"
"fmt"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
"sigs.k8s.io/cluster-api/controllers/mdutil"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
)

// rolloutOnDelete implements the logic for the OnDelete MachineDeploymentStrategyType.
func (r *MachineDeploymentReconciler) rolloutOnDelete(ctx context.Context, d *clusterv1.MachineDeployment, msList []*clusterv1.MachineSet) error {
newMS, oldMSs, err := r.getAllMachineSetsAndSyncRevision(ctx, d, msList, true)
if err != nil {
return err
}

// newMS can be nil in case there is already a MachineSet associated with this deployment,
// but there are only either changes in annotations or MinReadySeconds. Or in other words,
// this can be nil if there are changes, but no replacement of existing machines is needed.
if newMS == nil {
return nil
}

allMSs := append(oldMSs, newMS)

// Scale up, if we can.
if err := r.reconcileNewMachineSetOnDelete(ctx, allMSs, newMS, d); err != nil {
return err
}

if err := r.syncDeploymentStatus(allMSs, newMS, d); err != nil {
return err
}

// Scale down, if we can.
if err := r.reconcileOldMachineSetsOnDelete(ctx, oldMSs, allMSs, d); err != nil {
return err
}

if err := r.syncDeploymentStatus(allMSs, newMS, d); err != nil {
return err
}

if mdutil.DeploymentComplete(d, &d.Status) {
if err := r.cleanupDeployment(ctx, oldMSs, d); err != nil {
return err
}
}

return nil
}

// reconcileOldMachineSetsOnDelete handles reconciliation of Old MachineSets associated with the MachineDeployment in the OnDelete MachineDeploymentStrategyType.
func (r *MachineDeploymentReconciler) reconcileOldMachineSetsOnDelete(ctx context.Context, oldMSs []*clusterv1.MachineSet, allMSs []*clusterv1.MachineSet, deployment *clusterv1.MachineDeployment) error {
log := ctrl.LoggerFrom(ctx)
if deployment.Spec.Replicas == nil {
return errors.Errorf("spec replicas for MachineDeployment %q/%q is nil, this is unexpected",
deployment.Namespace, deployment.Name)
}
log.V(4).Info("Checking to see if machines have been deleted or are in the process of deleting for old machine sets")
totalReplicas := mdutil.GetReplicaCountForMachineSets(allMSs)
scaleDownAmount := totalReplicas - *deployment.Spec.Replicas
for _, oldMS := range oldMSs {
if oldMS.Spec.Replicas == nil || *oldMS.Spec.Replicas <= 0 {
log.V(4).Info("fully scaled down", "MachineSet", oldMS.Name)
continue
}
if oldMS.Annotations == nil {
oldMS.Annotations = map[string]string{}
}
if _, ok := oldMS.Annotations[clusterv1.DisableMachineCreate]; !ok {
log.V(4).Info("setting annotation on old MachineSet to disable machine creation", "MachineSet", oldMS.Name)
patchHelper, err := patch.NewHelper(oldMS, r.Client)
if err != nil {
return err
}
oldMS.Annotations[clusterv1.DisableMachineCreate] = "true"
if err := patchHelper.Patch(ctx, oldMS); err != nil {
return err
}
}
selectorMap, err := metav1.LabelSelectorAsMap(&oldMS.Spec.Selector)
if err != nil {
log.V(4).Error(err, "failed to convert MachineSet %q label selector to a map", oldMS.Name)
continue
}
log.V(4).Info("Fetching Machines associated with MachineSet", "MachineSet", oldMS.Name)
// Get all Machines linked to this MachineSet.
allMachinesInOldMS := &clusterv1.MachineList{}
if err := r.Client.List(ctx,
allMachinesInOldMS,
client.InNamespace(oldMS.Namespace),
client.MatchingLabels(selectorMap),
); err != nil {
return errors.Wrap(err, "failed to list machines")
}
totalMachineCount := int32(len(allMachinesInOldMS.Items))
log.V(4).Info("Retrieved machines", "totalMachineCount", totalMachineCount)
updatedReplicaCount := totalMachineCount - mdutil.GetDeletingMachineCount(allMachinesInOldMS)
if updatedReplicaCount < 0 {
return errors.Errorf("negative updated replica count %d for MachineSet %q, this is unexpected", updatedReplicaCount, oldMS.Name)
}
machineSetScaleDownAmountDueToMachineDeletion := *oldMS.Spec.Replicas - updatedReplicaCount
if machineSetScaleDownAmountDueToMachineDeletion < 0 {
log.V(4).Error(errors.Errorf("unexpected negative scale down amount: %d", machineSetScaleDownAmountDueToMachineDeletion), fmt.Sprintf("Error reconciling MachineSet %s", oldMS.Name))
}
scaleDownAmount -= machineSetScaleDownAmountDueToMachineDeletion
log.V(4).Info("Adjusting replica count for deleted machines", "replicaCount", oldMS.Name, "replicas", updatedReplicaCount)
log.V(4).Info("Scaling down", "MachineSet", oldMS.Name, "replicas", updatedReplicaCount)
if err := r.scaleMachineSet(ctx, oldMS, updatedReplicaCount, deployment); err != nil {
return err
}
}
log.V(4).Info("Finished reconcile of Old MachineSets to account for deleted machines. Now analyzing if there's more potential to scale down")
for _, oldMS := range oldMSs {
if scaleDownAmount <= 0 {
break
}
if oldMS.Spec.Replicas == nil || *oldMS.Spec.Replicas <= 0 {
log.V(4).Info("Fully scaled down", "MachineSet", oldMS.Name)
continue
}
updatedReplicaCount := *oldMS.Spec.Replicas
if updatedReplicaCount >= scaleDownAmount {
updatedReplicaCount -= scaleDownAmount
scaleDownAmount = 0
} else {
scaleDownAmount -= updatedReplicaCount
updatedReplicaCount = 0
}
log.V(4).Info("Scaling down", "MachineSet", oldMS.Name, "replicas", updatedReplicaCount)
if err := r.scaleMachineSet(ctx, oldMS, updatedReplicaCount, deployment); err != nil {
return err
}
}
log.V(4).Info("Finished reconcile of all old MachineSets")
return nil
}

//reconcileNewMachineSetOnDelete handles reconciliation of the latest MachineSet associated with the MachineDeployment in the OnDelete MachineDeploymentStrategyType.
func (r *MachineDeploymentReconciler) reconcileNewMachineSetOnDelete(ctx context.Context, allMSs []*clusterv1.MachineSet, newMS *clusterv1.MachineSet, deployment *clusterv1.MachineDeployment) error {
// logic same as reconcile logic for RollingUpdate
log := ctrl.LoggerFrom(ctx)
if newMS.Annotations != nil {
if _, ok := newMS.Annotations[clusterv1.DisableMachineCreate]; ok {
log.V(4).Info("removing annotation on latest MachineSet to enable machine creation", "MachineSet", newMS.Name)
patchHelper, err := patch.NewHelper(newMS, r.Client)
if err != nil {
return err
}
delete(newMS.Annotations, clusterv1.DisableMachineCreate)
err = patchHelper.Patch(ctx, newMS)
if err != nil {
return err
}
}
}
return r.reconcileNewMachineSet(ctx, allMSs, newMS, deployment)
}
8 changes: 6 additions & 2 deletions controllers/machineset_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,13 +313,17 @@ func (r *MachineSetReconciler) syncReplicas(ctx context.Context, ms *clusterv1.M
if ms.Spec.Replicas == nil {
return errors.Errorf("the Replicas field in Spec for machineset %v is nil, this should not be allowed", ms.Name)
}

diff := len(machines) - int(*(ms.Spec.Replicas))
switch {
case diff < 0:
diff *= -1
log.Info("Too few replicas", "need", *(ms.Spec.Replicas), "creating", diff)

if ms.Annotations != nil {
if _, ok := ms.Annotations[clusterv1.DisableMachineCreate]; ok {
log.V(2).Info("Automatic creation of new machines disabled for machine set")
return nil
}
}
var (
machineList []*clusterv1.Machine
errs []error
Expand Down
26 changes: 25 additions & 1 deletion controllers/mdutil/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,8 @@ func DeploymentComplete(deployment *clusterv1.MachineDeployment, newStatus *clus
// NewMSNewReplicas calculates the number of replicas a deployment's new MS should have.
// When one of the following is true, we're rolling out the deployment; otherwise, we're scaling it.
// 1) The new MS is saturated: newMS's replicas == deployment's replicas
// 2) Max number of machines allowed is reached: deployment's replicas + maxSurge == all MSs' replicas.
// 2) For RollingUpdateStrategy: Max number of machines allowed is reached: deployment's replicas + maxSurge == all MSs' replicas.
// 3) For OnDeleteStrategy: Max number of machines allowed is reached: deployment's replicas == all MSs' replicas.
func NewMSNewReplicas(deployment *clusterv1.MachineDeployment, allMSs []*clusterv1.MachineSet, newMS *clusterv1.MachineSet) (int32, error) {
switch deployment.Spec.Strategy.Type {
case clusterv1.RollingUpdateMachineDeploymentStrategyType:
Expand All @@ -548,6 +549,17 @@ func NewMSNewReplicas(deployment *clusterv1.MachineDeployment, allMSs []*cluster
// Do not exceed the number of desired replicas.
scaleUpCount = integer.Int32Min(scaleUpCount, *(deployment.Spec.Replicas)-*(newMS.Spec.Replicas))
return *(newMS.Spec.Replicas) + scaleUpCount, nil
case clusterv1.OnDeleteMachineDeploymentStrategyType:
// Find the total number of machines
currentMachineCount := TotalMachineSetsReplicaSum(allMSs)
if currentMachineCount >= *(deployment.Spec.Replicas) {
// Cannot scale up as more replicas exist than desired number of replicas in the MachineDeployment.
return *(newMS.Spec.Replicas), nil
}
// Scale up the latest MachineSet so the total amount of replicas across all MachineSets match
// the desired number of replicas in the MachineDeployment
scaleUpCount := *(deployment.Spec.Replicas) - currentMachineCount
return *(newMS.Spec.Replicas) + scaleUpCount, nil
default:
return 0, fmt.Errorf("deployment strategy %v isn't supported", deployment.Spec.Strategy.Type)
}
Expand Down Expand Up @@ -697,3 +709,15 @@ func ComputeHash(template *clusterv1.MachineTemplateSpec) uint32 {
DeepHashObject(machineTemplateSpecHasher, *template)
return machineTemplateSpecHasher.Sum32()
}

// GetDeletingMachineCount gets the number of machines that are in the process of being deleted
// in a machineList.
func GetDeletingMachineCount(machineList *clusterv1.MachineList) int32 {
var deletingMachineCount int32 = 0
for _, machine := range machineList.Items {
if !machine.GetDeletionTimestamp().IsZero() {
deletingMachineCount++
}
}
return deletingMachineCount
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,27 @@ repeat
elseif (New MachineSets Replicas available) then (yes)
#LightBlue:Scale MachineSet down;
endif
elseif (OnDelete Deployment Strategy) then (yes)
:Select newest MachineSet;
if (Too Many replicas) then (yes)
#LightBlue:Scale machineSet down;
elseif (Not Enough Replicas)
#LightBlue:Create new replicas;
endif
repeat
if (Old MachineSet Has Desired Replicas) then (yes)
if (Old MachineSet Has Actual Replicas Deleting) then (yes)
:Scale down DesiredReplicas to ActualReplicas - DeletingReplicas;
endif
endif
repeat while (More Old MachineSets need Processing) then (yes)
repeat
if (MachineDeployment Desired Replicas < Desired Replicas of All MachineSets) then (yes)
if (Old MachineSet Has Desired Replicas) then (yes)
:Remove replica;
endif
endif
repeat while (MachineDeployment Desired Replicas != Desired Replicas of All MachineSets) then (yes)
else (no)
#Pink:Unknown strategy;
endif
Expand Down
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 3bc596e

Please sign in to comment.