Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨Forward etcd leadership from machine that is being deleted #2525

Merged
merged 1 commit into from
Mar 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,11 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass etcd health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter}
}
// If etcd leadership is on machine that is about to be deleted, move it to first follower
if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete); err != nil {
logger.Error(err, "failed to move leadership to another machine")
return ctrl.Result{}, err
}
if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToDelete); err != nil {
logger.Error(err, "failed to remove etcd member for machine")
return ctrl.Result{}, err
Expand Down Expand Up @@ -743,6 +748,7 @@ func (r *KubeadmControlPlaneReconciler) generateMachine(ctx context.Context, kcp
// https://github.com/kubernetes-sigs/cluster-api/issues/2064
func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) (_ ctrl.Result, reterr error) {
logger := r.Log.WithValues("namespace", kcp.Namespace, "kubeadmControlPlane", kcp.Name, "cluster", cluster.Name)

allMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "failed to retrieve machines for cluster")
Expand Down
16 changes: 13 additions & 3 deletions controlplane/kubeadm/internal/etcd/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,14 @@ type etcd interface {
MemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error)
MemberUpdate(ctx context.Context, id uint64, peerURLs []string) (*clientv3.MemberUpdateResponse, error)
MoveLeader(ctx context.Context, id uint64) (*clientv3.MoveLeaderResponse, error)
Status(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
}

// Client wraps an etcd client formatting its output to something more consumable.
type Client struct {
EtcdClient etcd
Endpoint string
LeaderID uint64
}

// MemberAlarm represents an alarm type association with a cluster member.
Expand Down Expand Up @@ -128,13 +130,21 @@ func NewEtcdClient(endpoint string, dialer GRPCDial, tlsConfig *tls.Config) (*cl
}

// NewClientWithEtcd configures our response formatter (Client) with an etcd client and endpoint.
func NewClientWithEtcd(etcdClient etcd) (*Client, error) {
if len(etcdClient.Endpoints()) == 0 {
func NewClientWithEtcd(ctx context.Context, etcdClient etcd) (*Client, error) {
endpoints := etcdClient.Endpoints()
if len(endpoints) == 0 {
return nil, errors.New("etcd client was not configured with any endpoints")
}

status, err := etcdClient.Status(ctx, endpoints[0])
if err != nil {
return nil, err
}

return &Client{
Endpoint: etcdClient.Endpoints()[0],
Endpoint: endpoints[0],
EtcdClient: etcdClient,
LeaderID: status.Leader,
}, nil
}

Expand Down
5 changes: 3 additions & 2 deletions controlplane/kubeadm/internal/etcd_client_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package internal

import (
"context"
"crypto/tls"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -31,7 +32,7 @@ type etcdClientGenerator struct {
tlsConfig *tls.Config
}

func (c *etcdClientGenerator) forNode(name string) (*etcd.Client, error) {
func (c *etcdClientGenerator) forNode(ctx context.Context, name string) (*etcd.Client, error) {
// This does not support external etcd.
p := proxy.Proxy{
Kind: "pods",
Expand All @@ -49,7 +50,7 @@ func (c *etcdClientGenerator) forNode(name string) (*etcd.Client, error) {
if err != nil {
return nil, err
}
customClient, err := etcd.NewClientWithEtcd(etcdclient)
customClient, err := etcd.NewClientWithEtcd(ctx, etcdclient)
if err != nil {
return nil, err
}
Expand Down
45 changes: 42 additions & 3 deletions controlplane/kubeadm/internal/workload_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var (
)

type etcdClientFor interface {
forNode(name string) (*etcd.Client, error)
forNode(ctx context.Context, name string) (*etcd.Client, error)
}

// WorkloadCluster defines all behaviors necessary to upgrade kubernetes on a workload cluster
Expand All @@ -74,6 +74,7 @@ type WorkloadCluster interface {
UpdateCoreDNS(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane) error
RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error
RemoveMachineFromKubeadmConfigMap(ctx context.Context, machine *clusterv1.Machine) error
ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine) error
}

// Workload defines operations on workload clusters.
Expand Down Expand Up @@ -169,7 +170,7 @@ func (w *Workload) removeMemberForNode(ctx context.Context, name string) error {
if anotherNode == nil {
return errors.Errorf("failed to find a control plane node whose name is not %s", name)
}
etcdClient, err := w.etcdClientGenerator.forNode(anotherNode.Name)
etcdClient, err := w.etcdClientGenerator.forNode(ctx, anotherNode.Name)
if err != nil {
return errors.Wrap(err, "failed to create etcd client")
}
Expand Down Expand Up @@ -216,7 +217,7 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
}

// Create the etcd Client for the etcd Pod scheduled on the Node
etcdClient, err := w.etcdClientGenerator.forNode(name)
etcdClient, err := w.etcdClientGenerator.forNode(ctx, name)
if err != nil {
response[name] = errors.Wrap(err, "failed to create etcd client")
continue
Expand Down Expand Up @@ -488,6 +489,44 @@ func (w *Workload) ClusterStatus(ctx context.Context) (ClusterStatus, error) {
return status, nil
}

// ForwardEtcdLeadership forwards etcd leadership to the first follower
func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine) error {
if machine == nil || machine.Status.NodeRef == nil {
// Nothing to do, no node for Machine
return nil
}

etcdClient, err := w.etcdClientGenerator.forNode(ctx, machine.Status.NodeRef.Name)
if err != nil {
return errors.Wrap(err, "failed to create etcd Client")
}

// List etcd members. This checks that the member is healthy, because the request goes through consensus.
members, err := etcdClient.Members(ctx)
if err != nil {
return errors.Wrap(err, "failed to list etcd members using etcd client")
}

currentMember := etcdutil.MemberForName(members, machine.Status.NodeRef.Name)

if currentMember.ID != etcdClient.LeaderID {
return nil
}

// If current member is leader, move leadship to first follower
for _, member := range members {
if member.ID != currentMember.ID {
err := etcdClient.MoveLeader(ctx, member.ID)
Comment on lines +518 to +519
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should make sure that the new member isn't a machine that's going to be deleted, a (potential) simple solution would be to always pick the last created machine/etcd member

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexander-demichev do you have time to tackle the above? or we can do it in a follow-up PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you are fine with this PR then feel free to merge. A follow-up sounds good, if that's not urgent I can try to make something during the week.

if err != nil {
return errors.Wrapf(err, "failed to move leader")
}
chuckha marked this conversation as resolved.
Show resolved Hide resolved
break
}
}

return nil
}

func generateClientCert(caCertEncoded, caKeyEncoded []byte) (tls.Certificate, error) {
privKey, err := certs.NewPrivateKey()
if err != nil {
Expand Down