Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cleanup MachineDeployment controller #762

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 46 additions & 34 deletions pkg/controller/machinedeployment/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,18 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
)

// controllerKind contains the schema.GroupVersionKind for this controller type.
var controllerKind = v1alpha1.SchemeGroupVersion.WithKind("MachineDeployment")
var (
// controllerKind contains the schema.GroupVersionKind for this controller type.
controllerKind = v1alpha1.SchemeGroupVersion.WithKind("MachineDeployment")
)

// ReconcileMachineDeployment reconciles a MachineDeployment object
// ReconcileMachineDeployment reconciles a MachineDeployment object.
type ReconcileMachineDeployment struct {
client.Client
scheme *runtime.Scheme
}

// newReconciler returns a new reconcile.Reconciler
// newReconciler returns a new reconcile.Reconciler.
func newReconciler(mgr manager.Manager) *ReconcileMachineDeployment {
return &ReconcileMachineDeployment{Client: mgr.GetClient(), scheme: mgr.GetScheme()}
}
Expand All @@ -57,47 +59,49 @@ func Add(mgr manager.Manager) error {
return add(mgr, newReconciler(mgr), r.MachineSetToDeployments)
}

// add adds a new Controller to mgr with r as the reconcile.Reconciler
// add adds a new Controller to mgr with r as the reconcile.Reconciler.
func add(mgr manager.Manager, r reconcile.Reconciler, mapFn handler.ToRequestsFunc) error {
// Create a new controller
// Create a new controller.
c, err := controller.New("machinedeployment-controller", mgr, controller.Options{Reconciler: r})
if err != nil {
return err
}

// Watch for changes to MachineDeployment
err = c.Watch(&source.Kind{Type: &v1alpha1.MachineDeployment{}}, &handler.EnqueueRequestForObject{})
// Watch for changes to MachineDeployment.
err = c.Watch(&source.Kind{
Type: &v1alpha1.MachineDeployment{}},
&handler.EnqueueRequestForObject{},
)
if err != nil {
return err
}

// Watch for changes to MachineSet and reconcile the owner MachineDeployment
err = c.Watch(&source.Kind{Type: &v1alpha1.MachineSet{}},
&handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.MachineDeployment{}, IsController: true})
// Watch for changes to MachineSet and reconcile the owner MachineDeployment.
err = c.Watch(
&source.Kind{Type: &v1alpha1.MachineSet{}},
&handler.EnqueueRequestForOwner{OwnerType: &v1alpha1.MachineDeployment{}, IsController: true},
)
if err != nil {
return err
}

// Map MachineSet changes to MachineDeployment
// Map MachineSet changes to MachineDeployment.
err = c.Watch(
&source.Kind{Type: &v1alpha1.MachineSet{}},
&handler.EnqueueRequestsFromMapFunc{ToRequests: mapFn})
&handler.EnqueueRequestsFromMapFunc{ToRequests: mapFn},
)
if err != nil {
return err
}

return nil
}

var _ reconcile.Reconciler = &ReconcileMachineDeployment{}

func (r *ReconcileMachineDeployment) getMachineSetsForDeployment(d *v1alpha1.MachineDeployment) ([]*v1alpha1.MachineSet, error) {
// List all MachineSets to find those we own but that no longer match our
// selector.
// List all MachineSets to find those we own but that no longer match our selector.
machineSets := &v1alpha1.MachineSetList{}
listOptions := &client.ListOptions{
Namespace: d.Namespace,
}
listOptions := &client.ListOptions{Namespace: d.Namespace}

if err := r.Client.List(context.Background(), listOptions, machineSets); err != nil {
return nil, err
}
Expand All @@ -111,22 +115,27 @@ func (r *ReconcileMachineDeployment) getMachineSetsForDeployment(d *v1alpha1.Mac
klog.V(4).Infof("%s not controlled by %v", ms.Name, d.Name)
continue
}

selector, err := metav1.LabelSelectorAsSelector(&d.Spec.Selector)
if err != nil {
klog.Errorf("Skipping machineset %v, failed to get label selector from spec selector.", ms.Name)
continue
}

// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() {
klog.Warningf("Skipping machineset %v as the selector is empty.", ms.Name)
continue
}

if !selector.Matches(labels.Set(ms.Labels)) {
klog.V(4).Infof("Skipping machineset %v, label mismatch.", ms.Name)
continue
}

filteredMS = append(filteredMS, ms)
}

return filteredMS, nil
}

Expand All @@ -136,8 +145,7 @@ func (r *ReconcileMachineDeployment) getMachineSetsForDeployment(d *v1alpha1.Mac
func (r *ReconcileMachineDeployment) Reconcile(request reconcile.Request) (reconcile.Result, error) {
// Fetch the MachineDeployment instance
d := &v1alpha1.MachineDeployment{}
err := r.Get(context.TODO(), request.NamespacedName, d)
if err != nil {
if err := r.Get(context.TODO(), request.NamespacedName, d); err != nil {
if apierrors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
Expand Down Expand Up @@ -202,16 +210,14 @@ func (r *ReconcileMachineDeployment) Reconcile(request reconcile.Request) (recon
// match a MachineSet.
func (r *ReconcileMachineDeployment) getMachineDeploymentsForMachineSet(ms *v1alpha1.MachineSet) []*v1alpha1.MachineDeployment {
if len(ms.Labels) == 0 {
klog.Warningf("no machine deployments found for MachineSet %v because it has no labels", ms.Name)
klog.Warningf("No machine deployments found for MachineSet %v because it has no labels", ms.Name)
return nil
}

dList := &v1alpha1.MachineDeploymentList{}
listOptions := &client.ListOptions{
Namespace: ms.Namespace,
}
listOptions := &client.ListOptions{Namespace: ms.Namespace}
if err := r.Client.List(context.Background(), listOptions, dList); err != nil {
klog.Warningf("failed to list machine deployments, %v", err)
klog.Warningf("Failed to list machine deployments: %v", err)
return nil
}

Expand All @@ -221,10 +227,12 @@ func (r *ReconcileMachineDeployment) getMachineDeploymentsForMachineSet(ms *v1al
if err != nil {
continue
}

// If a deployment with a nil or empty selector creeps in, it should match nothing, not everything.
if selector.Empty() || !selector.Matches(labels.Set(ms.Labels)) {
continue
}

deployments = append(deployments, &dList.Items[idx])
}

Expand All @@ -244,40 +252,44 @@ func (r *ReconcileMachineDeployment) getMachineMapForDeployment(d *v1alpha1.Mach
if err != nil {
return nil, err
}

machines := &v1alpha1.MachineList{}
listOptions := &client.ListOptions{
Namespace: d.Namespace,
}
listOptions := &client.ListOptions{Namespace: d.Namespace}
if err = r.Client.List(context.Background(), listOptions.MatchingLabels(selector), machines); err != nil {
return nil, err
}

// Group Machines by their controller (if it's in msList).
machineMap := make(map[types.UID]*v1alpha1.MachineList, len(msList))
for _, ms := range msList {
machineMap[ms.UID] = &v1alpha1.MachineList{}
}

for idx := range machines.Items {
machine := &machines.Items[idx]

// Do not ignore inactive Machines because Recreate Deployments need to verify that no
// Machines from older versions are running before spinning up new Machines.
controllerRef := metav1.GetControllerOf(machine)
if controllerRef == nil {
continue
}

// Only append if we care about this UID.
if machineList, ok := machineMap[controllerRef.UID]; ok {
machineList.Items = append(machineList.Items, *machine)
}
}

return machineMap, nil
}

func (r *ReconcileMachineDeployment) MachineSetToDeployments(o handler.MapObject) []reconcile.Request {
result := []reconcile.Request{}

ms := &v1alpha1.MachineSet{}
key := client.ObjectKey{Namespace: o.Meta.GetNamespace(), Name: o.Meta.GetName()}
err := r.Client.Get(context.Background(), key, ms)
if err != nil {
if err := r.Client.Get(context.Background(), key, ms); err != nil {
klog.Errorf("Unable to retrieve Machineset %v from store: %v", key, err)
return nil
}
Expand All @@ -295,8 +307,8 @@ func (r *ReconcileMachineDeployment) MachineSetToDeployments(o handler.MapObject
}

for _, md := range mds {
result = append(result, reconcile.Request{
NamespacedName: client.ObjectKey{Namespace: md.Namespace, Name: md.Name}})
name := client.ObjectKey{Namespace: md.Namespace, Name: md.Name}
result = append(result, reconcile.Request{NamespacedName: name})
}

return result
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/machinedeployment/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

var (
_ reconcile.Reconciler = &ReconcileMachineDeployment{}
)

func TestMachineSetToDeployments(t *testing.T) {
machineDeployment := v1alpha1.MachineDeployment{
ObjectMeta: metav1.ObjectMeta{
Expand Down
30 changes: 24 additions & 6 deletions pkg/controller/machinedeployment/rolling.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,18 +33,21 @@ func (r *ReconcileMachineDeployment) rolloutRolling(d *v1alpha1.MachineDeploymen
if err != nil {
return err
}

// newMS can be nil in case there is already a MachineSet associated with this deployment,
// but there are only either changes in annotations or MinReadySeconds. Or in other words,
// this can be nil if there are changes, but no replacement of existing machines is needed.
if newMS == nil {
return nil
}

allMSs := append(oldMSs, newMS)

// Scale up, if we can.
if err := r.reconcileNewMachineSet(allMSs, newMS, d); err != nil {
return err
}

if err := r.syncDeploymentStatus(allMSs, newMS, d); err != nil {
return err
}
Expand All @@ -53,6 +56,7 @@ func (r *ReconcileMachineDeployment) rolloutRolling(d *v1alpha1.MachineDeploymen
if err := r.reconcileOldMachineSets(allMSs, oldMSs, newMS, d); err != nil {
return err
}

if err := r.syncDeploymentStatus(allMSs, newMS, d); err != nil {
return err
}
Expand All @@ -70,6 +74,7 @@ func (r *ReconcileMachineDeployment) reconcileNewMachineSet(allMSs []*v1alpha1.M
if deployment.Spec.Replicas == nil {
return errors.Errorf("spec replicas for deployment set %v is nil, this is unexpected", deployment.Name)
}

if newMS.Spec.Replicas == nil {
return errors.Errorf("spec replicas for machine set %v is nil, this is unexpected", newMS.Name)
}
Expand All @@ -78,11 +83,13 @@ func (r *ReconcileMachineDeployment) reconcileNewMachineSet(allMSs []*v1alpha1.M
// Scaling not required.
return nil
}

if *(newMS.Spec.Replicas) > *(deployment.Spec.Replicas) {
// Scale down.
_, err := r.scaleMachineSet(newMS, *(deployment.Spec.Replicas), deployment)
return err
}

newReplicasCount, err := dutil.NewMSNewReplicas(deployment, allMSs, newMS)
if err != nil {
return err
Expand All @@ -95,6 +102,7 @@ func (r *ReconcileMachineDeployment) reconcileOldMachineSets(allMSs []*v1alpha1.
if deployment.Spec.Replicas == nil {
return errors.Errorf("spec replicas for deployment set %v is nil, this is unexpected", deployment.Name)
}

if newMS.Spec.Replicas == nil {
return errors.Errorf("spec replicas for machine set %v is nil, this is unexpected", newMS.Name)
}
Expand Down Expand Up @@ -152,22 +160,24 @@ func (r *ReconcileMachineDeployment) reconcileOldMachineSets(allMSs []*v1alpha1.
if err != nil {
return nil
}
klog.V(4).Infof("Cleaned up unhealthy replicas from old MSes by %d", cleanupCount)

klog.V(4).Infof("Cleaned up unhealthy replicas from old MachineSets by %d", cleanupCount)

// Scale down old machine sets, need check maxUnavailable to ensure we can scale down
allMSs = append(oldMSs, newMS)
scaledDownCount, err := r.scaleDownOldMachineSetsForRollingUpdate(allMSs, oldMSs, deployment)
if err != nil {
return err
}
klog.V(4).Infof("Scaled down old MSes of deployment %s by %d", deployment.Name, scaledDownCount)

klog.V(4).Infof("Scaled down old MachineSets of deployment %s by %d", deployment.Name, scaledDownCount)
return nil
}

// cleanupUnhealthyReplicas will scale down old machine sets with unhealthy replicas, so that all unhealthy replicas will be deleted.
func (r *ReconcileMachineDeployment) cleanupUnhealthyReplicas(oldMSs []*v1alpha1.MachineSet, deployment *v1alpha1.MachineDeployment, maxCleanupCount int32) ([]*v1alpha1.MachineSet, int32, error) {
sort.Sort(dutil.MachineSetsByCreationTimestamp(oldMSs))

// Safely scale down all old machine sets with unhealthy replicas. Replica set will sort the machines in the order
// such that not-ready < ready, unscheduled < scheduled, and pending < running. This ensures that unhealthy replicas will
// been deleted first and won't increase unavailability.
Expand All @@ -180,11 +190,13 @@ func (r *ReconcileMachineDeployment) cleanupUnhealthyReplicas(oldMSs []*v1alpha1
if totalScaledDown >= maxCleanupCount {
break
}

oldMSReplicas := *(targetMS.Spec.Replicas)
if oldMSReplicas == 0 {
// cannot scale down this machine set.
continue
}

oldMSAvailableReplicas := targetMS.Status.AvailableReplicas
klog.V(4).Infof("Found %d available machines in old MS %s/%s", oldMSAvailableReplicas, targetMS.Namespace, targetMS.Name)
if oldMSReplicas == oldMSAvailableReplicas {
Expand All @@ -200,12 +212,14 @@ func (r *ReconcileMachineDeployment) cleanupUnhealthyReplicas(oldMSs []*v1alpha1
if newReplicasCount > oldMSReplicas {
return nil, 0, errors.Errorf("when cleaning up unhealthy replicas, got invalid request to scale down %s/%s %d -> %d", targetMS.Namespace, targetMS.Name, oldMSReplicas, newReplicasCount)
}
_, err := r.scaleMachineSet(targetMS, newReplicasCount, deployment)
if err != nil {

if _, err := r.scaleMachineSet(targetMS, newReplicasCount, deployment); err != nil {
return nil, totalScaledDown, err
}

totalScaledDown += scaledDownCount
}

return oldMSs, totalScaledDown, nil
}

Expand All @@ -220,12 +234,14 @@ func (r *ReconcileMachineDeployment) scaleDownOldMachineSetsForRollingUpdate(all

// Check if we can scale down.
minAvailable := *(deployment.Spec.Replicas) - maxUnavailable

// Find the number of available machines.
availableMachineCount := dutil.GetAvailableReplicaCountForMachineSets(allMSs)
if availableMachineCount <= minAvailable {
// Cannot scale down.
return 0, nil
}

klog.V(4).Infof("Found %d available machines in deployment %s, scaling down old MSes", availableMachineCount, deployment.Name)

sort.Sort(dutil.MachineSetsByCreationTimestamp(oldMSs))
Expand All @@ -241,18 +257,20 @@ func (r *ReconcileMachineDeployment) scaleDownOldMachineSetsForRollingUpdate(all
// No further scaling required.
break
}

if *(targetMS.Spec.Replicas) == 0 {
// cannot scale down this MachineSet.
continue
}

// Scale down.
scaleDownCount := integer.Int32Min(*(targetMS.Spec.Replicas), totalScaleDownCount-totalScaledDown)
newReplicasCount := *(targetMS.Spec.Replicas) - scaleDownCount
if newReplicasCount > *(targetMS.Spec.Replicas) {
return totalScaledDown, errors.Errorf("when scaling down old MS, got invalid request to scale down %s/%s %d -> %d", targetMS.Namespace, targetMS.Name, *(targetMS.Spec.Replicas), newReplicasCount)
}
_, err := r.scaleMachineSet(targetMS, newReplicasCount, deployment)
if err != nil {

if _, err := r.scaleMachineSet(targetMS, newReplicasCount, deployment); err != nil {
return totalScaledDown, err
}

Expand Down
Loading