Skip to content

Commit

Permalink
Merge pull request #2821 from benmoss/etcd-leader
Browse files Browse the repository at this point in the history
✨ Connect to the etcd leader when possible
  • Loading branch information
k8s-ci-robot authored Apr 2, 2020
2 parents a9c54ec + d823fc7 commit 96f0bed
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 207 deletions.
34 changes: 4 additions & 30 deletions controlplane/kubeadm/internal/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ type fakeClient struct {
getErr error
patchErr error
updateErr error
listErr error
}

func (f *fakeClient) Get(_ context.Context, key client.ObjectKey, obj runtime.Object) error {
Expand Down Expand Up @@ -251,6 +252,9 @@ func (f *fakeClient) Get(_ context.Context, key client.ObjectKey, obj runtime.Ob
}

func (f *fakeClient) List(_ context.Context, list runtime.Object, _ ...client.ListOption) error {
if f.listErr != nil {
return f.listErr
}
switch l := f.list.(type) {
case *clusterv1.MachineList:
l.DeepCopyInto(list.(*clusterv1.MachineList))
Expand Down Expand Up @@ -470,33 +474,3 @@ func nilNodeRef(machine clusterv1.Machine) clusterv1.Machine {
machine.Status.NodeRef = nil
return machine
}

func TestRemoveMemberForNode_ErrControlPlaneMinNodes(t *testing.T) {
t.Run("do not remove the etcd member if the cluster has fewer than 2 control plane nodes", func(t *testing.T) {
g := NewWithT(t)

expectedErr := ErrControlPlaneMinNodes

workloadCluster := &Workload{
Client: &fakeClient{
list: &corev1.NodeList{
Items: []corev1.Node{
nodeNamed("first-control-plane"),
},
},
},
}

err := workloadCluster.removeMemberForNode(context.Background(), "first-control-plane")
g.Expect(err).To(MatchError(expectedErr))
})
}

func TestPickFirstNodeNotMatching(t *testing.T) {
g := NewWithT(t)

name := "first-control-plane"
anotherNode := firstNodeNotMatchingName(name, nodeListForTestControlPlaneIsHealthy().Items)
g.Expect(anotherNode).NotTo(BeNil())
g.Expect(anotherNode.Name).NotTo(Equal(name))
}
29 changes: 29 additions & 0 deletions controlplane/kubeadm/internal/etcd_client_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ import (
"context"
"crypto/tls"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"
Expand Down Expand Up @@ -56,3 +59,29 @@ func (c *etcdClientGenerator) forNode(ctx context.Context, name string) (*etcd.C
}
return customClient, nil
}

// forLeader takes a list of nodes and returns a client to the leader node
func (c *etcdClientGenerator) forLeader(ctx context.Context, nodes *corev1.NodeList) (*etcd.Client, error) {
var errs []error

for _, node := range nodes.Items {
client, err := c.forNode(ctx, node.Name)
if err != nil {
errs = append(errs, err)
continue
}
defer client.Close()
members, err := client.Members(ctx)
if err != nil {
errs = append(errs, err)
continue
}
for _, member := range members {
if member.ID == client.LeaderID {
return c.forNode(ctx, member.Name)
}
}
}

return nil, errors.Wrap(kerrors.NewAggregate(errs), "could not establish a connection to the etcd leader")
}
9 changes: 0 additions & 9 deletions controlplane/kubeadm/internal/workload_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,15 +435,6 @@ func checkNodeNoExecuteCondition(node corev1.Node) error {
return nil
}

func firstNodeNotMatchingName(name string, nodes []corev1.Node) *corev1.Node {
for _, n := range nodes {
if n.Name != name {
return &n
}
}
return nil
}

// UpdateKubeProxyImageInfo updates kube-proxy image in the kube-proxy DaemonSet.
func (w *Workload) UpdateKubeProxyImageInfo(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane) error {
ds := &appsv1.DaemonSet{}
Expand Down
103 changes: 37 additions & 66 deletions controlplane/kubeadm/internal/workload_cluster_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

type etcdClientFor interface {
forNode(ctx context.Context, name string) (*etcd.Client, error)
forLeader(ctx context.Context, nodes *corev1.NodeList) (*etcd.Client, error)
}

// EtcdIsHealthy runs checks for every etcd member in the cluster to satisfy our definition of healthy.
Expand Down Expand Up @@ -148,112 +149,82 @@ func (w *Workload) UpdateEtcdVersionInKubeadmConfigMap(ctx context.Context, imag
}

// RemoveEtcdMemberForMachine removes the etcd member from the target cluster's etcd cluster.
// Removing the last remaining member of the cluster is not supported.
func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error {
if machine == nil || machine.Status.NodeRef == nil {
// Nothing to do, no node for Machine
return nil
}

return w.removeMemberForNode(ctx, machine.Status.NodeRef.Name)
}

// ForwardEtcdLeadership forwards etcd leadership to the first follower
func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error {
if machine == nil || machine.Status.NodeRef == nil {
// Nothing to do, no node for Machine
return nil
// Pick a different node to talk to etcd
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
return err
}

// TODO we'd probably prefer to pass in all the known nodes and let grpc handle retrying connections across them
clientMachineName := machine.Status.NodeRef.Name
if leaderCandidate != nil && leaderCandidate.Status.NodeRef != nil {
// connect to the new leader candidate, in case machine's etcd membership has already been removed
clientMachineName = leaderCandidate.Status.NodeRef.Name
if len(controlPlaneNodes.Items) < 2 {
return ErrControlPlaneMinNodes
}

etcdClient, err := w.etcdClientGenerator.forNode(ctx, clientMachineName)
etcdClient, err := w.etcdClientGenerator.forLeader(ctx, controlPlaneNodes)
if err != nil {
return errors.Wrap(err, "failed to create etcd Client")
return errors.Wrap(err, "failed to create etcd client")
}

// List etcd members. This checks that the member is healthy, because the request goes through consensus.
members, err := etcdClient.Members(ctx)
if err != nil {
return errors.Wrap(err, "failed to list etcd members using etcd client")
}
member := etcdutil.MemberForName(members, machine.Status.NodeRef.Name)

currentMember := etcdutil.MemberForName(members, machine.Status.NodeRef.Name)
if currentMember == nil || currentMember.ID != etcdClient.LeaderID {
// The member has already been removed, return immediately
if member == nil {
return nil
}

// Move the etcd client to the current leader, which in this case is the machine we're about to delete.
etcdClient, err = w.etcdClientGenerator.forNode(ctx, machine.Status.NodeRef.Name)
if err != nil {
return errors.Wrap(err, "failed to create etcd Client")
}

// If we don't have a leader candidate, move the leader to the next available machine.
if leaderCandidate == nil || leaderCandidate.Status.NodeRef == nil {
for _, member := range members {
if member.ID != currentMember.ID {
if err := etcdClient.MoveLeader(ctx, member.ID); err != nil {
return errors.Wrapf(err, "failed to move leader")
}
break
}
}
return nil
if err := etcdClient.RemoveMember(ctx, member.ID); err != nil {
return errors.Wrap(err, "failed to remove member from etcd")
}

// Move the leader to the provided candidate.
nextLeader := etcdutil.MemberForName(members, leaderCandidate.Status.NodeRef.Name)
if nextLeader == nil {
return errors.Errorf("failed to get etcd member from node %q", leaderCandidate.Status.NodeRef.Name)
}
if err := etcdClient.MoveLeader(ctx, nextLeader.ID); err != nil {
return errors.Wrapf(err, "failed to move leader")
}
return nil
}

// removeMemberForNode removes the etcd member for the node. Removing the etcd
// member when the cluster has one control plane node is not supported. To allow
// the removal of a failed etcd member, the etcd API requests are sent to a
// different node.
func (w *Workload) removeMemberForNode(ctx context.Context, name string) error {
// Pick a different node to talk to etcd
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
return err
// ForwardEtcdLeadership forwards etcd leadership to the first follower
func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error {
if machine == nil || machine.Status.NodeRef == nil {
return nil
}
if len(controlPlaneNodes.Items) < 2 {
return ErrControlPlaneMinNodes
if leaderCandidate == nil {
return errors.New("leader candidate cannot be nil")
}
anotherNode := firstNodeNotMatchingName(name, controlPlaneNodes.Items)
if anotherNode == nil {
return errors.Errorf("failed to find a control plane node whose name is not %s", name)

nodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
return errors.Wrap(err, "failed to list control plane nodes")
}
etcdClient, err := w.etcdClientGenerator.forNode(ctx, anotherNode.Name)

etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodes)
if err != nil {
return errors.Wrap(err, "failed to create etcd client")
}

// List etcd members. This checks that the member is healthy, because the request goes through consensus.
members, err := etcdClient.Members(ctx)
if err != nil {
return errors.Wrap(err, "failed to list etcd members using etcd client")
}
member := etcdutil.MemberForName(members, name)

// The member has already been removed, return immediately
if member == nil {
currentMember := etcdutil.MemberForName(members, machine.Status.NodeRef.Name)
if currentMember == nil || currentMember.ID != etcdClient.LeaderID {
// nothing to do, this is not the etcd leader
return nil
}

if err := etcdClient.RemoveMember(ctx, member.ID); err != nil {
return errors.Wrap(err, "failed to remove member from etcd")
// Move the leader to the provided candidate.
nextLeader := etcdutil.MemberForName(members, leaderCandidate.Status.NodeRef.Name)
if nextLeader == nil {
return errors.Errorf("failed to get etcd member from node %q", leaderCandidate.Status.NodeRef.Name)
}
if err := etcdClient.MoveLeader(ctx, nextLeader.ID); err != nil {
return errors.Wrapf(err, "failed to move leader")
}

return nil
}
Loading

0 comments on commit 96f0bed

Please sign in to comment.