From 64d5067c333e6c106e54eb322b507816a113fc44 Mon Sep 17 00:00:00 2001 From: Alexander Demichev Date: Thu, 27 Feb 2020 11:15:09 +0100 Subject: [PATCH] Forward etcd leadship from machine that is being deleted --- .../kubeadm_control_plane_controller.go | 6 +++ controlplane/kubeadm/internal/etcd/etcd.go | 16 +++++-- .../kubeadm/internal/etcd_client_generator.go | 5 ++- .../kubeadm/internal/workload_cluster.go | 45 +++++++++++++++++-- 4 files changed, 64 insertions(+), 8 deletions(-) diff --git a/controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go b/controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go index ba73fbf83d0a..82fb5d2d7988 100644 --- a/controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go +++ b/controlplane/kubeadm/controllers/kubeadm_control_plane_controller.go @@ -538,6 +538,11 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass etcd health check before removing a control plane machine: %v", err) return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter} } + // If etcd leadership is on machine that is about to be deleted, move it to first follower + if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete); err != nil { + logger.Error(err, "failed to move leadership to another machine") + return ctrl.Result{}, err + } if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToDelete); err != nil { logger.Error(err, "failed to remove etcd member for machine") return ctrl.Result{}, err @@ -743,6 +748,7 @@ func (r *KubeadmControlPlaneReconciler) generateMachine(ctx context.Context, kcp // https://github.com/kubernetes-sigs/cluster-api/issues/2064 func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) (_ ctrl.Result, reterr error) { logger := r.Log.WithValues("namespace", kcp.Namespace, "kubeadmControlPlane", kcp.Name, "cluster", cluster.Name) + allMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster)) if err != nil { logger.Error(err, "failed to retrieve machines for cluster") diff --git a/controlplane/kubeadm/internal/etcd/etcd.go b/controlplane/kubeadm/internal/etcd/etcd.go index 27ee84c314ab..f91ce81c673d 100644 --- a/controlplane/kubeadm/internal/etcd/etcd.go +++ b/controlplane/kubeadm/internal/etcd/etcd.go @@ -40,12 +40,14 @@ type etcd interface { MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) MemberUpdate(ctx context.Context, id uint64, peerURLs []string) (*clientv3.MemberUpdateResponse, error) MoveLeader(ctx context.Context, id uint64) (*clientv3.MoveLeaderResponse, error) + Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) } // Client wraps an etcd client formatting its output to something more consumable. type Client struct { EtcdClient etcd Endpoint string + LeaderID uint64 } // MemberAlarm represents an alarm type association with a cluster member. @@ -128,13 +130,21 @@ func NewEtcdClient(endpoint string, dialer GRPCDial, tlsConfig *tls.Config) (*cl } // NewClientWithEtcd configures our response formatter (Client) with an etcd client and endpoint. -func NewClientWithEtcd(etcdClient etcd) (*Client, error) { - if len(etcdClient.Endpoints()) == 0 { +func NewClientWithEtcd(ctx context.Context, etcdClient etcd) (*Client, error) { + endpoints := etcdClient.Endpoints() + if len(endpoints) == 0 { return nil, errors.New("etcd client was not configured with any endpoints") } + + status, err := etcdClient.Status(ctx, endpoints[0]) + if err != nil { + return nil, err + } + return &Client{ - Endpoint: etcdClient.Endpoints()[0], + Endpoint: endpoints[0], EtcdClient: etcdClient, + LeaderID: status.Leader, }, nil } diff --git a/controlplane/kubeadm/internal/etcd_client_generator.go b/controlplane/kubeadm/internal/etcd_client_generator.go index f715df6dfebf..18525b8b1f67 100644 --- a/controlplane/kubeadm/internal/etcd_client_generator.go +++ b/controlplane/kubeadm/internal/etcd_client_generator.go @@ -17,6 +17,7 @@ limitations under the License. package internal import ( + "context" "crypto/tls" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -31,7 +32,7 @@ type etcdClientGenerator struct { tlsConfig *tls.Config } -func (c *etcdClientGenerator) forNode(name string) (*etcd.Client, error) { +func (c *etcdClientGenerator) forNode(ctx context.Context, name string) (*etcd.Client, error) { // This does not support external etcd. p := proxy.Proxy{ Kind: "pods", @@ -49,7 +50,7 @@ func (c *etcdClientGenerator) forNode(name string) (*etcd.Client, error) { if err != nil { return nil, err } - customClient, err := etcd.NewClientWithEtcd(etcdclient) + customClient, err := etcd.NewClientWithEtcd(ctx, etcdclient) if err != nil { return nil, err } diff --git a/controlplane/kubeadm/internal/workload_cluster.go b/controlplane/kubeadm/internal/workload_cluster.go index b4638b93fc4f..b85396ac2c7d 100644 --- a/controlplane/kubeadm/internal/workload_cluster.go +++ b/controlplane/kubeadm/internal/workload_cluster.go @@ -53,7 +53,7 @@ var ( ) type etcdClientFor interface { - forNode(name string) (*etcd.Client, error) + forNode(ctx context.Context, name string) (*etcd.Client, error) } // WorkloadCluster defines all behaviors necessary to upgrade kubernetes on a workload cluster @@ -74,6 +74,7 @@ type WorkloadCluster interface { UpdateCoreDNS(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane) error RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error RemoveMachineFromKubeadmConfigMap(ctx context.Context, machine *clusterv1.Machine) error + ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine) error } // Workload defines operations on workload clusters. @@ -169,7 +170,7 @@ func (w *Workload) removeMemberForNode(ctx context.Context, name string) error { if anotherNode == nil { return errors.Errorf("failed to find a control plane node whose name is not %s", name) } - etcdClient, err := w.etcdClientGenerator.forNode(anotherNode.Name) + etcdClient, err := w.etcdClientGenerator.forNode(ctx, anotherNode.Name) if err != nil { return errors.Wrap(err, "failed to create etcd client") } @@ -216,7 +217,7 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error) } // Create the etcd Client for the etcd Pod scheduled on the Node - etcdClient, err := w.etcdClientGenerator.forNode(name) + etcdClient, err := w.etcdClientGenerator.forNode(ctx, name) if err != nil { response[name] = errors.Wrap(err, "failed to create etcd client") continue @@ -488,6 +489,44 @@ func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) { return status, nil } +// ForwardEtcdLeadership forwards etcd leadership to the first follower +func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine) error { + if machine == nil || machine.Status.NodeRef == nil { + // Nothing to do, no node for Machine + return nil + } + + etcdClient, err := w.etcdClientGenerator.forNode(ctx, machine.Status.NodeRef.Name) + if err != nil { + return errors.Wrap(err, "failed to create etcd Client") + } + + // List etcd members. This checks that the member is healthy, because the request goes through consensus. + members, err := etcdClient.Members(ctx) + if err != nil { + return errors.Wrap(err, "failed to list etcd members using etcd client") + } + + currentMember := etcdutil.MemberForName(members, machine.Status.NodeRef.Name) + + if currentMember.ID != etcdClient.LeaderID { + return nil + } + + // If current member is leader, move leadship to first follower + for _, member := range members { + if member.ID != currentMember.ID { + err := etcdClient.MoveLeader(ctx, member.ID) + if err != nil { + return errors.Wrapf(err, "failed to move leader") + } + break + } + } + + return nil +} + func generateClientCert(caCertEncoded, caKeyEncoded []byte) (tls.Certificate, error) { privKey, err := certs.NewPrivateKey() if err != nil {