Skip to content

Commit

Permalink
✨ Move etcd leadership to newest Machine
Browse files Browse the repository at this point in the history
Signed-off-by: Vince Prignano <vincepri@vmware.com>
vincepri committed Mar 17, 2020

Unverified

This commit is not signed, but one or more authors requires that any commit attributed to them is signed.
1 parent 4d87796 commit 0e35461
Showing 5 changed files with 42 additions and 21 deletions.
2 changes: 1 addition & 1 deletion controlplane/kubeadm/controllers/fakes_test.go
Original file line number Diff line number Diff line change
@@ -66,7 +66,7 @@ type fakeWorkloadCluster struct {
Status internal.ClusterStatus
}

func (f fakeWorkloadCluster) ForwardEtcdLeadership(_ context.Context, _ *clusterv1.Machine) error {
func (f fakeWorkloadCluster) ForwardEtcdLeadership(_ context.Context, _ *clusterv1.Machine, _ *clusterv1.Machine) error {
return nil
}

Original file line number Diff line number Diff line change
@@ -272,7 +272,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
// We are scaling down
case numMachines > desiredReplicas:
logger.Info("Scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines)
return r.scaleDownControlPlane(ctx, cluster, kcp, ownedMachines)
return r.scaleDownControlPlane(ctx, cluster, kcp, ownedMachines, ownedMachines)
}

// Get the workload cluster client.
@@ -411,7 +411,7 @@ func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(ctx context.Context,
return result, nil
}

return r.scaleDownControlPlane(ctx, cluster, kcp, replacementCreated)
return r.scaleDownControlPlane(ctx, cluster, kcp, ownedMachines, replacementCreated)
}

func (r *KubeadmControlPlaneReconciler) markWithAnnotationKey(ctx context.Context, machine *clusterv1.Machine, annotationKey string) error {
@@ -494,7 +494,7 @@ func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context,
return ctrl.Result{Requeue: true}, nil
}

func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, machines internal.FilterableMachineCollection) (ctrl.Result, error) {
func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, ownedMachines internal.FilterableMachineCollection, selectedMachines internal.FilterableMachineCollection) (ctrl.Result, error) {
logger := r.Log.WithValues("namespace", kcp.Namespace, "kubeadmControlPlane", kcp.Name, "cluster", cluster.Name)

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
@@ -506,14 +506,14 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex
// We don't want to health check at the beginning of this method to avoid blocking re-entrancy

// Wait for any delete in progress to complete before deleting another Machine
if len(machines.Filter(machinefilters.HasDeletionTimestamp)) > 0 {
if len(selectedMachines.Filter(machinefilters.HasDeletionTimestamp)) > 0 {
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: DeleteRequeueAfter}
}

markedForDeletion := machines.Filter(machinefilters.HasAnnotationKey(controlplanev1.DeleteForScaleDownAnnotation))
markedForDeletion := selectedMachines.Filter(machinefilters.HasAnnotationKey(controlplanev1.DeleteForScaleDownAnnotation))
if len(markedForDeletion) == 0 {
fd := r.failureDomainForScaleDown(cluster, machines)
machinesInFailureDomain := machines.Filter(machinefilters.InFailureDomains(fd))
fd := r.failureDomainForScaleDown(cluster, selectedMachines)
machinesInFailureDomain := selectedMachines.Filter(machinefilters.InFailureDomains(fd))
machineToMark := machinesInFailureDomain.Oldest()
if machineToMark == nil {
logger.Info("failed to pick control plane Machine to mark for deletion")
@@ -538,8 +538,9 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass etcd health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter}
}
// If etcd leadership is on machine that is about to be deleted, move it to first follower
if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete); err != nil {
// If etcd leadership is on machine that is about to be deleted, move it to the newest member available.
etcdLeaderCandidate := ownedMachines.Newest()
if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete, etcdLeaderCandidate); err != nil {
logger.Error(err, "failed to move leadership to another machine")
return ctrl.Result{}, err
}
Original file line number Diff line number Diff line change
@@ -1665,7 +1665,7 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.
},
}

_, err := r.scaleDownControlPlane(context.Background(), &clusterv1.Cluster{}, &controlplanev1.KubeadmControlPlane{}, machines)
_, err := r.scaleDownControlPlane(context.Background(), &clusterv1.Cluster{}, &controlplanev1.KubeadmControlPlane{}, machines, machines)
g.Expect(err).ToNot(HaveOccurred())
}

8 changes: 8 additions & 0 deletions controlplane/kubeadm/internal/machine_collection.go
Original file line number Diff line number Diff line change
@@ -120,6 +120,14 @@ func (s FilterableMachineCollection) Oldest() *clusterv1.Machine {
return s.list()[0]
}

// Newest returns the Machine with the most recent CreationTimestamp
func (s FilterableMachineCollection) Newest() *clusterv1.Machine {
if len(s) == 0 {
return nil
}
return s.list()[len(s)-1]
}

// DeepCopy returns a deep copy
func (s FilterableMachineCollection) DeepCopy() FilterableMachineCollection {
result := make(FilterableMachineCollection, len(s))
32 changes: 22 additions & 10 deletions controlplane/kubeadm/internal/workload_cluster.go
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ type WorkloadCluster interface {
UpdateCoreDNS(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane) error
RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error
RemoveMachineFromKubeadmConfigMap(ctx context.Context, machine *clusterv1.Machine) error
ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine) error
ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error
}

// Workload defines operations on workload clusters.
@@ -490,7 +490,7 @@ func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) {
}

// ForwardEtcdLeadership forwards etcd leadership to the first follower
func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine) error {
func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error {
if machine == nil || machine.Status.NodeRef == nil {
// Nothing to do, no node for Machine
return nil
@@ -508,22 +508,34 @@ func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1
}

currentMember := etcdutil.MemberForName(members, machine.Status.NodeRef.Name)

if currentMember == nil {
return errors.Errorf("failed to get etcd member from node %q", machine.Status.NodeRef.Name)
}
if currentMember.ID != etcdClient.LeaderID {
return nil
}

// If current member is leader, move leadship to first follower
for _, member := range members {
if member.ID != currentMember.ID {
err := etcdClient.MoveLeader(ctx, member.ID)
if err != nil {
return errors.Wrapf(err, "failed to move leader")
// If we don't have a leader candidate, move the leader to the next available machine.
if leaderCandidate == nil || leaderCandidate.Status.NodeRef == nil {
for _, member := range members {
if member.ID != currentMember.ID {
if err := etcdClient.MoveLeader(ctx, member.ID); err != nil {
return errors.Wrapf(err, "failed to move leader")
}
break
}
break
}
return nil
}

// Move the leader to the provided candidate.
nextLeader := etcdutil.MemberForName(members, leaderCandidate.Status.NodeRef.Name)
if nextLeader == nil {
return errors.Errorf("failed to get etcd member from node %q", leaderCandidate.Status.NodeRef.Name)
}
if err := etcdClient.MoveLeader(ctx, nextLeader.ID); err != nil {
return errors.Wrapf(err, "failed to move leader")
}
return nil
}

0 comments on commit 0e35461

Please sign in to comment.