Skip to content

Commit

Permalink
Cleanup MachineDeployment controller (kubernetes-sigs#762)
Browse files Browse the repository at this point in the history
Signed-off-by: Vince Prignano <[email protected]>
  • Loading branch information
vincepri authored and k8s-ci-robot committed Feb 25, 2019
1 parent 3770c4b commit 07f69ff
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 50 deletions.
80 changes: 46 additions & 34 deletions pkg/controller/machinedeployment/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = v1alpha1.SchemeGroupVersion.WithKind("MachineDeployment")
var (
// controllerKind contains the schema.GroupVersionKind for this controller type.
controllerKind = v1alpha1.SchemeGroupVersion.WithKind("MachineDeployment")
)

// ReconcileMachineDeployment reconciles a MachineDeployment object
// ReconcileMachineDeployment reconciles a MachineDeployment object.
type ReconcileMachineDeployment struct {
client.Client
scheme *runtime.Scheme
}

// newReconciler returns a new reconcile.Reconciler
// newReconciler returns a new reconcile.Reconciler.
func newReconciler(mgr manager.Manager) *ReconcileMachineDeployment {
return &ReconcileMachineDeployment{Client: mgr.GetClient(), scheme: mgr.GetScheme()}
}
Expand All @@ -57,47 +59,49 @@ func Add(mgr manager.Manager) error {
return add(mgr, newReconciler(mgr), r.MachineSetToDeployments)
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
// add adds a new Controller to mgr with r as the reconcile.Reconciler.
func add(mgr manager.Manager, r reconcile.Reconciler, mapFn handler.ToRequestsFunc) error {
// Create a new controller
// Create a new controller.
c, err := controller.New("machinedeployment-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}

// Watch for changes to MachineDeployment
err = c.Watch(&source.Kind{Type: &v1alpha1.MachineDeployment{}}, &handler.EnqueueRequestForObject{})
// Watch for changes to MachineDeployment.
err = c.Watch(&source.Kind{
Type: &v1alpha1.MachineDeployment{}},
&handler.EnqueueRequestForObject{},
)
if err != nil {
return err
}

// Watch for changes to MachineSet and reconcile the owner MachineDeployment
err = c.Watch(&source.Kind{Type: &v1alpha1.MachineSet{}},
&handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.MachineDeployment{}, IsController: true})
// Watch for changes to MachineSet and reconcile the owner MachineDeployment.
err = c.Watch(
&source.Kind{Type: &v1alpha1.MachineSet{}},
&handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.MachineDeployment{}, IsController: true},
)
if err != nil {
return err
}

// Map MachineSet changes to MachineDeployment
// Map MachineSet changes to MachineDeployment.
err = c.Watch(
&source.Kind{Type: &v1alpha1.MachineSet{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: mapFn})
&handler.EnqueueRequestsFromMapFunc{ToRequests: mapFn},
)
if err != nil {
return err
}

return nil
}

var _ reconcile.Reconciler = &ReconcileMachineDeployment{}

func (r *ReconcileMachineDeployment) getMachineSetsForDeployment(d *v1alpha1.MachineDeployment) ([]*v1alpha1.MachineSet, error) {
// List all MachineSets to find those we own but that no longer match our
// selector.
// List all MachineSets to find those we own but that no longer match our selector.
machineSets := &v1alpha1.MachineSetList{}
listOptions := &client.ListOptions{
Namespace: d.Namespace,
}
listOptions := &client.ListOptions{Namespace: d.Namespace}

if err := r.Client.List(context.Background(), listOptions, machineSets); err != nil {
return nil, err
}
Expand All @@ -111,22 +115,27 @@ func (r *ReconcileMachineDeployment) getMachineSetsForDeployment(d *v1alpha1.Mac
klog.V(4).Infof("%s not controlled by %v", ms.Name, d.Name)
continue
}

selector, err := metav1.LabelSelectorAsSelector(&d.Spec.Selector)
if err != nil {
klog.Errorf("Skipping machineset %v, failed to get label selector from spec selector.", ms.Name)
continue
}

// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() {
klog.Warningf("Skipping machineset %v as the selector is empty.", ms.Name)
continue
}

if !selector.Matches(labels.Set(ms.Labels)) {
klog.V(4).Infof("Skipping machineset %v, label mismatch.", ms.Name)
continue
}

filteredMS = append(filteredMS, ms)
}

return filteredMS, nil
}

Expand All @@ -136,8 +145,7 @@ func (r *ReconcileMachineDeployment) getMachineSetsForDeployment(d *v1alpha1.Mac
func (r *ReconcileMachineDeployment) Reconcile(request reconcile.Request) (reconcile.Result, error) {
// Fetch the MachineDeployment instance
d := &v1alpha1.MachineDeployment{}
err := r.Get(context.TODO(), request.NamespacedName, d)
if err != nil {
if err := r.Get(context.TODO(), request.NamespacedName, d); err != nil {
if apierrors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
Expand Down Expand Up @@ -202,16 +210,14 @@ func (r *ReconcileMachineDeployment) Reconcile(request reconcile.Request) (recon
// match a MachineSet.
func (r *ReconcileMachineDeployment) getMachineDeploymentsForMachineSet(ms *v1alpha1.MachineSet) []*v1alpha1.MachineDeployment {
if len(ms.Labels) == 0 {
klog.Warningf("no machine deployments found for MachineSet %v because it has no labels", ms.Name)
klog.Warningf("No machine deployments found for MachineSet %v because it has no labels", ms.Name)
return nil
}

dList := &v1alpha1.MachineDeploymentList{}
listOptions := &client.ListOptions{
Namespace: ms.Namespace,
}
listOptions := &client.ListOptions{Namespace: ms.Namespace}
if err := r.Client.List(context.Background(), listOptions, dList); err != nil {
klog.Warningf("failed to list machine deployments, %v", err)
klog.Warningf("Failed to list machine deployments: %v", err)
return nil
}

Expand All @@ -221,10 +227,12 @@ func (r *ReconcileMachineDeployment) getMachineDeploymentsForMachineSet(ms *v1al
if err != nil {
continue
}

// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(ms.Labels)) {
continue
}

deployments = append(deployments, &dList.Items[idx])
}

Expand All @@ -244,40 +252,44 @@ func (r *ReconcileMachineDeployment) getMachineMapForDeployment(d *v1alpha1.Mach
if err != nil {
return nil, err
}

machines := &v1alpha1.MachineList{}
listOptions := &client.ListOptions{
Namespace: d.Namespace,
}
listOptions := &client.ListOptions{Namespace: d.Namespace}
if err = r.Client.List(context.Background(), listOptions.MatchingLabels(selector), machines); err != nil {
return nil, err
}

// Group Machines by their controller (if it's in msList).
machineMap := make(map[types.UID]*v1alpha1.MachineList, len(msList))
for _, ms := range msList {
machineMap[ms.UID] = &v1alpha1.MachineList{}
}

for idx := range machines.Items {
machine := &machines.Items[idx]

// Do not ignore inactive Machines because Recreate Deployments need to verify that no
// Machines from older versions are running before spinning up new Machines.
controllerRef := metav1.GetControllerOf(machine)
if controllerRef == nil {
continue
}

// Only append if we care about this UID.
if machineList, ok := machineMap[controllerRef.UID]; ok {
machineList.Items = append(machineList.Items, *machine)
}
}

return machineMap, nil
}

func (r *ReconcileMachineDeployment) MachineSetToDeployments(o handler.MapObject) []reconcile.Request {
result := []reconcile.Request{}

ms := &v1alpha1.MachineSet{}
key := client.ObjectKey{Namespace: o.Meta.GetNamespace(), Name: o.Meta.GetName()}
err := r.Client.Get(context.Background(), key, ms)
if err != nil {
if err := r.Client.Get(context.Background(), key, ms); err != nil {
klog.Errorf("Unable to retrieve Machineset %v from store: %v", key, err)
return nil
}
Expand All @@ -295,8 +307,8 @@ func (r *ReconcileMachineDeployment) MachineSetToDeployments(o handler.MapObject
}

for _, md := range mds {
result = append(result, reconcile.Request{
NamespacedName: client.ObjectKey{Namespace: md.Namespace, Name: md.Name}})
name := client.ObjectKey{Namespace: md.Namespace, Name: md.Name}
result = append(result, reconcile.Request{NamespacedName: name})
}

return result
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/machinedeployment/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var (
_ reconcile.Reconciler = &ReconcileMachineDeployment{}
)

func TestMachineSetToDeployments(t *testing.T) {
machineDeployment := v1alpha1.MachineDeployment{
ObjectMeta: metav1.ObjectMeta{
Expand Down
30 changes: 24 additions & 6 deletions pkg/controller/machinedeployment/rolling.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@ func (r *ReconcileMachineDeployment) rolloutRolling(d *v1alpha1.MachineDeploymen
if err != nil {
return err
}

// newMS can be nil in case there is already a MachineSet associated with this deployment,
// but there are only either changes in annotations or MinReadySeconds. Or in other words,
// this can be nil if there are changes, but no replacement of existing machines is needed.
if newMS == nil {
return nil
}

allMSs := append(oldMSs, newMS)

// Scale up, if we can.
if err := r.reconcileNewMachineSet(allMSs, newMS, d); err != nil {
return err
}

if err := r.syncDeploymentStatus(allMSs, newMS, d); err != nil {
return err
}
Expand All @@ -53,6 +56,7 @@ func (r *ReconcileMachineDeployment) rolloutRolling(d *v1alpha1.MachineDeploymen
if err := r.reconcileOldMachineSets(allMSs, oldMSs, newMS, d); err != nil {
return err
}

if err := r.syncDeploymentStatus(allMSs, newMS, d); err != nil {
return err
}
Expand All @@ -70,6 +74,7 @@ func (r *ReconcileMachineDeployment) reconcileNewMachineSet(allMSs []*v1alpha1.M
if deployment.Spec.Replicas == nil {
return errors.Errorf("spec replicas for deployment set %v is nil, this is unexpected", deployment.Name)
}

if newMS.Spec.Replicas == nil {
return errors.Errorf("spec replicas for machine set %v is nil, this is unexpected", newMS.Name)
}
Expand All @@ -78,11 +83,13 @@ func (r *ReconcileMachineDeployment) reconcileNewMachineSet(allMSs []*v1alpha1.M
// Scaling not required.
return nil
}

if *(newMS.Spec.Replicas) > *(deployment.Spec.Replicas) {
// Scale down.
_, err := r.scaleMachineSet(newMS, *(deployment.Spec.Replicas), deployment)
return err
}

newReplicasCount, err := dutil.NewMSNewReplicas(deployment, allMSs, newMS)
if err != nil {
return err
Expand All @@ -95,6 +102,7 @@ func (r *ReconcileMachineDeployment) reconcileOldMachineSets(allMSs []*v1alpha1.
if deployment.Spec.Replicas == nil {
return errors.Errorf("spec replicas for deployment set %v is nil, this is unexpected", deployment.Name)
}

if newMS.Spec.Replicas == nil {
return errors.Errorf("spec replicas for machine set %v is nil, this is unexpected", newMS.Name)
}
Expand Down Expand Up @@ -152,22 +160,24 @@ func (r *ReconcileMachineDeployment) reconcileOldMachineSets(allMSs []*v1alpha1.
if err != nil {
return nil
}
klog.V(4).Infof("Cleaned up unhealthy replicas from old MSes by %d", cleanupCount)

klog.V(4).Infof("Cleaned up unhealthy replicas from old MachineSets by %d", cleanupCount)

// Scale down old machine sets, need check maxUnavailable to ensure we can scale down
allMSs = append(oldMSs, newMS)
scaledDownCount, err := r.scaleDownOldMachineSetsForRollingUpdate(allMSs, oldMSs, deployment)
if err != nil {
return err
}
klog.V(4).Infof("Scaled down old MSes of deployment %s by %d", deployment.Name, scaledDownCount)

klog.V(4).Infof("Scaled down old MachineSets of deployment %s by %d", deployment.Name, scaledDownCount)
return nil
}

// cleanupUnhealthyReplicas will scale down old machine sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
func (r *ReconcileMachineDeployment) cleanupUnhealthyReplicas(oldMSs []*v1alpha1.MachineSet, deployment *v1alpha1.MachineDeployment, maxCleanupCount int32) ([]*v1alpha1.MachineSet, int32, error) {
sort.Sort(dutil.MachineSetsByCreationTimestamp(oldMSs))

// Safely scale down all old machine sets with unhealthy replicas. Replica set will sort the machines in the order
// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
// been deleted first and won't increase unavailability.
Expand All @@ -180,11 +190,13 @@ func (r *ReconcileMachineDeployment) cleanupUnhealthyReplicas(oldMSs []*v1alpha1
if totalScaledDown >= maxCleanupCount {
break
}

oldMSReplicas := *(targetMS.Spec.Replicas)
if oldMSReplicas == 0 {
// cannot scale down this machine set.
continue
}

oldMSAvailableReplicas := targetMS.Status.AvailableReplicas
klog.V(4).Infof("Found %d available machines in old MS %s/%s", oldMSAvailableReplicas, targetMS.Namespace, targetMS.Name)
if oldMSReplicas == oldMSAvailableReplicas {
Expand All @@ -200,12 +212,14 @@ func (r *ReconcileMachineDeployment) cleanupUnhealthyReplicas(oldMSs []*v1alpha1
if newReplicasCount > oldMSReplicas {
return nil, 0, errors.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetMS.Namespace, targetMS.Name, oldMSReplicas, newReplicasCount)
}
_, err := r.scaleMachineSet(targetMS, newReplicasCount, deployment)
if err != nil {

if _, err := r.scaleMachineSet(targetMS, newReplicasCount, deployment); err != nil {
return nil, totalScaledDown, err
}

totalScaledDown += scaledDownCount
}

return oldMSs, totalScaledDown, nil
}

Expand All @@ -220,12 +234,14 @@ func (r *ReconcileMachineDeployment) scaleDownOldMachineSetsForRollingUpdate(all

// Check if we can scale down.
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable

// Find the number of available machines.
availableMachineCount := dutil.GetAvailableReplicaCountForMachineSets(allMSs)
if availableMachineCount <= minAvailable {
// Cannot scale down.
return 0, nil
}

klog.V(4).Infof("Found %d available machines in deployment %s, scaling down old MSes", availableMachineCount, deployment.Name)

sort.Sort(dutil.MachineSetsByCreationTimestamp(oldMSs))
Expand All @@ -241,18 +257,20 @@ func (r *ReconcileMachineDeployment) scaleDownOldMachineSetsForRollingUpdate(all
// No further scaling required.
break
}

if *(targetMS.Spec.Replicas) == 0 {
// cannot scale down this MachineSet.
continue
}

// Scale down.
scaleDownCount := integer.Int32Min(*(targetMS.Spec.Replicas), totalScaleDownCount-totalScaledDown)
newReplicasCount := *(targetMS.Spec.Replicas) - scaleDownCount
if newReplicasCount > *(targetMS.Spec.Replicas) {
return totalScaledDown, errors.Errorf("when scaling down old MS, got invalid request to scale down %s/%s %d -> %d", targetMS.Namespace, targetMS.Name, *(targetMS.Spec.Replicas), newReplicasCount)
}
_, err := r.scaleMachineSet(targetMS, newReplicasCount, deployment)
if err != nil {

if _, err := r.scaleMachineSet(targetMS, newReplicasCount, deployment); err != nil {
return totalScaledDown, err
}

Expand Down
Loading

0 comments on commit 07f69ff

Please sign in to comment.