From 0e354614364cdb30c46c339873c82c3811028715 Mon Sep 17 00:00:00 2001 From: Vince Prignano Date: Tue, 17 Mar 2020 10:59:17 -0700 Subject: [PATCH] :sparkles: Move etcd leadership to newest Machine Signed-off-by: Vince Prignano --- .../kubeadm/controllers/fakes_test.go | 2 +- .../kubeadm_control_plane_controller.go | 19 +++++------ .../kubeadm_control_plane_controller_test.go | 2 +- .../kubeadm/internal/machine_collection.go | 8 +++++ .../kubeadm/internal/workload_cluster.go | 32 +++++++++++++------ 5 files changed, 42 insertions(+), 21 deletions(-) diff --git a/controlplane/kubeadm/controllers/fakes_test.go b/controlplane/kubeadm/controllers/fakes_test.go index 58c658f32740..afc6846483d8 100644 --- a/controlplane/kubeadm/controllers/fakes_test.go +++ b/controlplane/kubeadm/controllers/fakes_test.go @@ -66,7 +66,7 @@ type fakeWorkloadCluster struct { Status internal.ClusterStatus } -func (f fakeWorkloadCluster) ForwardEtcdLeadership(_ context.Context, _ *clusterv1.Machine) error { +func (f fakeWorkloadCluster) ForwardEtcdLeadership(_ context.Context, _ *clusterv1.Machine, _ *clusterv1.Machine) error { return nil } diff --git a/controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go b/controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go index 82fb5d2d7988..ea440b23c4d4 100644 --- a/controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go +++ b/controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go @@ -272,7 +272,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster * // We are scaling down case numMachines > desiredReplicas: logger.Info("Scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines) - return r.scaleDownControlPlane(ctx, cluster, kcp, ownedMachines) + return r.scaleDownControlPlane(ctx, cluster, kcp, ownedMachines, ownedMachines) } // Get the workload cluster client. @@ -411,7 +411,7 @@ func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(ctx context.Context, return result, nil } - return r.scaleDownControlPlane(ctx, cluster, kcp, replacementCreated) + return r.scaleDownControlPlane(ctx, cluster, kcp, ownedMachines, replacementCreated) } func (r *KubeadmControlPlaneReconciler) markWithAnnotationKey(ctx context.Context, machine *clusterv1.Machine, annotationKey string) error { @@ -494,7 +494,7 @@ func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, return ctrl.Result{Requeue: true}, nil } -func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, machines internal.FilterableMachineCollection) (ctrl.Result, error) { +func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, ownedMachines internal.FilterableMachineCollection, selectedMachines internal.FilterableMachineCollection) (ctrl.Result, error) { logger := r.Log.WithValues("namespace", kcp.Namespace, "kubeadmControlPlane", kcp.Name, "cluster", cluster.Name) workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) @@ -506,14 +506,14 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex // We don't want to health check at the beginning of this method to avoid blocking re-entrancy // Wait for any delete in progress to complete before deleting another Machine - if len(machines.Filter(machinefilters.HasDeletionTimestamp)) > 0 { + if len(selectedMachines.Filter(machinefilters.HasDeletionTimestamp)) > 0 { return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: DeleteRequeueAfter} } - markedForDeletion := machines.Filter(machinefilters.HasAnnotationKey(controlplanev1.DeleteForScaleDownAnnotation)) + markedForDeletion := selectedMachines.Filter(machinefilters.HasAnnotationKey(controlplanev1.DeleteForScaleDownAnnotation)) if len(markedForDeletion) == 0 { - fd := r.failureDomainForScaleDown(cluster, machines) - machinesInFailureDomain := machines.Filter(machinefilters.InFailureDomains(fd)) + fd := r.failureDomainForScaleDown(cluster, selectedMachines) + machinesInFailureDomain := selectedMachines.Filter(machinefilters.InFailureDomains(fd)) machineToMark := machinesInFailureDomain.Oldest() if machineToMark == nil { logger.Info("failed to pick control plane Machine to mark for deletion") @@ -538,8 +538,9 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass etcd health check before removing a control plane machine: %v", err) return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter} } - // If etcd leadership is on machine that is about to be deleted, move it to first follower - if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete); err != nil { + // If etcd leadership is on machine that is about to be deleted, move it to the newest member available. + etcdLeaderCandidate := ownedMachines.Newest() + if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete, etcdLeaderCandidate); err != nil { logger.Error(err, "failed to move leadership to another machine") return ctrl.Result{}, err } diff --git a/controlplane/kubeadm/controllers/kubeadm_control_plane_controller_test.go b/controlplane/kubeadm/controllers/kubeadm_control_plane_controller_test.go index fd092cfc4ca5..147255d138de 100644 --- a/controlplane/kubeadm/controllers/kubeadm_control_plane_controller_test.go +++ b/controlplane/kubeadm/controllers/kubeadm_control_plane_controller_test.go @@ -1665,7 +1665,7 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing. }, } - _, err := r.scaleDownControlPlane(context.Background(), &clusterv1.Cluster{}, &controlplanev1.KubeadmControlPlane{}, machines) + _, err := r.scaleDownControlPlane(context.Background(), &clusterv1.Cluster{}, &controlplanev1.KubeadmControlPlane{}, machines, machines) g.Expect(err).ToNot(HaveOccurred()) } diff --git a/controlplane/kubeadm/internal/machine_collection.go b/controlplane/kubeadm/internal/machine_collection.go index d086dab85a42..8ea47b34d80f 100644 --- a/controlplane/kubeadm/internal/machine_collection.go +++ b/controlplane/kubeadm/internal/machine_collection.go @@ -120,6 +120,14 @@ func (s FilterableMachineCollection) Oldest() *clusterv1.Machine { return s.list()[0] } +// Newest returns the Machine with the most recent CreationTimestamp +func (s FilterableMachineCollection) Newest() *clusterv1.Machine { + if len(s) == 0 { + return nil + } + return s.list()[len(s)-1] +} + // DeepCopy returns a deep copy func (s FilterableMachineCollection) DeepCopy() FilterableMachineCollection { result := make(FilterableMachineCollection, len(s)) diff --git a/controlplane/kubeadm/internal/workload_cluster.go b/controlplane/kubeadm/internal/workload_cluster.go index 9c74b8387750..63fef1a5a52e 100644 --- a/controlplane/kubeadm/internal/workload_cluster.go +++ b/controlplane/kubeadm/internal/workload_cluster.go @@ -74,7 +74,7 @@ type WorkloadCluster interface { UpdateCoreDNS(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane) error RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error RemoveMachineFromKubeadmConfigMap(ctx context.Context, machine *clusterv1.Machine) error - ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine) error + ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error } // Workload defines operations on workload clusters. @@ -490,7 +490,7 @@ func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) { } // ForwardEtcdLeadership forwards etcd leadership to the first follower -func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine) error { +func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error { if machine == nil || machine.Status.NodeRef == nil { // Nothing to do, no node for Machine return nil @@ -508,22 +508,34 @@ func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1 } currentMember := etcdutil.MemberForName(members, machine.Status.NodeRef.Name) - + if currentMember == nil { + return errors.Errorf("failed to get etcd member from node %q", machine.Status.NodeRef.Name) + } if currentMember.ID != etcdClient.LeaderID { return nil } - // If current member is leader, move leadship to first follower - for _, member := range members { - if member.ID != currentMember.ID { - err := etcdClient.MoveLeader(ctx, member.ID) - if err != nil { - return errors.Wrapf(err, "failed to move leader") + // If we don't have a leader candidate, move the leader to the next available machine. + if leaderCandidate == nil || leaderCandidate.Status.NodeRef == nil { + for _, member := range members { + if member.ID != currentMember.ID { + if err := etcdClient.MoveLeader(ctx, member.ID); err != nil { + return errors.Wrapf(err, "failed to move leader") + } + break } - break } + return nil } + // Move the leader to the provided candidate. + nextLeader := etcdutil.MemberForName(members, leaderCandidate.Status.NodeRef.Name) + if nextLeader == nil { + return errors.Errorf("failed to get etcd member from node %q", leaderCandidate.Status.NodeRef.Name) + } + if err := etcdClient.MoveLeader(ctx, nextLeader.ID); err != nil { + return errors.Wrapf(err, "failed to move leader") + } return nil }