Skip to content

Commit

Permalink
🐛 KCP reconcileEtcdMembers should use its own NodeRefs
Browse files Browse the repository at this point in the history
These changes bring more safety when we reconcile the etcd members for
the given workload cluster.

To perform these changes, some modifications to the internal structs and
interfaces were needed. The etcd client generator now accepts node names
(as []string) instead of corev1.Node(s). This allows us to be more
flexible in how we pass in the list of nodes that we expect the etcd
member list to have.

The reconcileEtcdMembers method already waits for all machines to have
NodeRefs set before proceeding. While we check for that, now we also
collect all the names in a slice before passing it in to the inner
Workload struct method.

A NodeRef is assigned by the Machine controller as soon as that
Machine's infrastructure provider exposes the ProviderID, the machine
controller then compares the ProviderID to the list of nodes available
in the workload cluster, and finally assigns the NodeRef under the
Machine's Status field.

If a NodeRef is assigned to a Machine that KCP owns, we know it _should_
be a control plane machine even if kubeadm join hasn't set the label on
the Node object.

Signed-off-by: Vince Prignano <[email protected]>
  • Loading branch information
vincepri committed Nov 30, 2020
1 parent 5803be3 commit 979197c
Show file tree
Hide file tree
Showing 8 changed files with 66 additions and 62 deletions.
7 changes: 5 additions & 2 deletions controlplane/kubeadm/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,14 @@ func (r *KubeadmControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context
return ctrl.Result{}, nil
}

// If there are provisioning machines (machines without a node yet), return.
// Collect all the node names.
nodeNames := []string{}
for _, machine := range controlPlane.Machines {
if machine.Status.NodeRef == nil {
// If there are provisioning machines (machines without a node yet), return.
return ctrl.Result{}, nil
}
nodeNames = append(nodeNames, machine.Status.NodeRef.Name)
}

// Potential inconsistencies between the list of members and the list of machines/nodes are
Expand All @@ -525,7 +528,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context
return ctrl.Result{}, errors.Wrap(err, "cannot get remote client to workload cluster")
}

removedMembers, err := workloadCluster.ReconcileEtcdMembers(ctx)
removedMembers, err := workloadCluster.ReconcileEtcdMembers(ctx, nodeNames)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed attempt to reconcile etcd members")
}
Expand Down
2 changes: 1 addition & 1 deletion controlplane/kubeadm/controllers/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (f fakeWorkloadCluster) ForwardEtcdLeadership(_ context.Context, _ *cluster
return nil
}

func (f fakeWorkloadCluster) ReconcileEtcdMembers(ctx context.Context) ([]string, error) {
func (f fakeWorkloadCluster) ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error) {
return nil, nil
}

Expand Down
22 changes: 10 additions & 12 deletions controlplane/kubeadm/internal/etcd_client_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (
"crypto/tls"

"github.com/pkg/errors"
kerrors "k8s.io/apimachinery/pkg/util/errors"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"
Expand All @@ -36,10 +34,10 @@ type etcdClientGenerator struct {
tlsConfig *tls.Config
}

func (c *etcdClientGenerator) forNodes(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error) {
endpoints := make([]string, len(nodes))
for i, node := range nodes {
endpoints[i] = staticPodName("etcd", node.Name)
func (c *etcdClientGenerator) forNodes(ctx context.Context, nodeNames []string) (*etcd.Client, error) {
endpoints := make([]string, len(nodeNames))
for i, name := range nodeNames {
endpoints[i] = staticPodName("etcd", name)
}

p := proxy.Proxy{
Expand All @@ -53,11 +51,11 @@ func (c *etcdClientGenerator) forNodes(ctx context.Context, nodes []corev1.Node)
}

// forLeader takes a list of nodes and returns a client to the leader node
func (c *etcdClientGenerator) forLeader(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error) {
func (c *etcdClientGenerator) forLeader(ctx context.Context, nodeNames []string) (*etcd.Client, error) {
var errs []error

for _, node := range nodes {
client, err := c.forNodes(ctx, []corev1.Node{node})
for _, nodeName := range nodeNames {
client, err := c.forNodes(ctx, []string{nodeName})
if err != nil {
errs = append(errs, err)
continue
Expand All @@ -69,8 +67,8 @@ func (c *etcdClientGenerator) forLeader(ctx context.Context, nodes []corev1.Node
continue
}
for _, member := range members {
if member.Name == node.Name && member.ID == client.LeaderID {
return c.forNodes(ctx, []corev1.Node{node})
if member.Name == nodeName && member.ID == client.LeaderID {
return c.forNodes(ctx, []string{nodeName})
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions controlplane/kubeadm/internal/workload_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ var (
)

// WorkloadCluster defines all behaviors necessary to upgrade kubernetes on a workload cluster
//
// TODO: Add a detailed description to each of these method definitions.
type WorkloadCluster interface {
// Basic health and status checks.
ClusterStatus(ctx context.Context) (ClusterStatus, error)
Expand All @@ -77,7 +79,7 @@ type WorkloadCluster interface {
AllowBootstrapTokensToGetNodes(ctx context.Context) error

// State recovery tasks.
ReconcileEtcdMembers(ctx context.Context) ([]string, error)
ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error)
}

// Workload defines operations on workload clusters.
Expand All @@ -92,7 +94,6 @@ func (w *Workload) getControlPlaneNodes(ctx context.Context) (*corev1.NodeList,
labels := map[string]string{
labelNodeRoleControlPlane: "",
}

if err := w.Client.List(ctx, nodes, ctrlclient.MatchingLabels(labels)); err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (w *Workload) updateManagedEtcdConditions(ctx context.Context, controlPlane
}

// Create the etcd Client for the etcd Pod scheduled on the Node
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []corev1.Node{node})
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []string{node.Name})
if err != nil {
conditions.MarkUnknown(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberInspectionFailedReason, "Failed to connect to the etcd pod on the %s node", node.Name)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ func TestUpdateEtcdConditions(t *testing.T) {
},
},
injectEtcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClientFunc: func(n []corev1.Node) (*etcd.Client, error) {
switch n[0].Name {
forNodesClientFunc: func(n []string) (*etcd.Client, error) {
switch n[0] {
case "n1":
return &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
Expand Down Expand Up @@ -274,8 +274,8 @@ func TestUpdateEtcdConditions(t *testing.T) {
},
},
injectEtcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClientFunc: func(n []corev1.Node) (*etcd.Client, error) {
switch n[0].Name {
forNodesClientFunc: func(n []string) (*etcd.Client, error) {
switch n[0] {
case "n1":
return &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
Expand Down Expand Up @@ -342,8 +342,8 @@ func TestUpdateEtcdConditions(t *testing.T) {
},
},
injectEtcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClientFunc: func(n []corev1.Node) (*etcd.Client, error) {
switch n[0].Name {
forNodesClientFunc: func(n []string) (*etcd.Client, error) {
switch n[0] {
case "n1":
return &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
Expand Down Expand Up @@ -392,8 +392,8 @@ func TestUpdateEtcdConditions(t *testing.T) {
},
},
injectEtcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClientFunc: func(n []corev1.Node) (*etcd.Client, error) {
switch n[0].Name {
forNodesClientFunc: func(n []string) (*etcd.Client, error) {
switch n[0] {
case "n1":
return &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
Expand Down
55 changes: 27 additions & 28 deletions controlplane/kubeadm/internal/workload_cluster_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ package internal

import (
"context"
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
Expand All @@ -31,23 +29,18 @@ import (
)

type etcdClientFor interface {
forNodes(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error)
forLeader(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error)
forNodes(ctx context.Context, nodeNames []string) (*etcd.Client, error)
forLeader(ctx context.Context, nodeNames []string) (*etcd.Client, error)
}

// ReconcileEtcdMembers iterates over all etcd members and finds members that do not have corresponding nodes.
// If there are any such members, it deletes them from etcd and removes their nodes from the kubeadm configmap so that kubeadm does not run etcd health checks on them.
func (w *Workload) ReconcileEtcdMembers(ctx context.Context) ([]string, error) {
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
return nil, err
}

func (w *Workload) ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error) {
removedMembers := []string{}
errs := []error{}
for _, node := range controlPlaneNodes.Items {
for _, nodeName := range nodeNames {
// Create the etcd Client for the etcd Pod scheduled on the Node
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []corev1.Node{node})
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []string{nodeName})
if err != nil {
continue
}
Expand All @@ -57,26 +50,25 @@ func (w *Workload) ReconcileEtcdMembers(ctx context.Context) ([]string, error) {
if err != nil {
continue
}

// Check if any member's node is missing from workload cluster
// If any, delete it with best effort
loopmembers:
for _, member := range members {
// If this member is just added, it has a empty name until the etcd pod starts. Ignore it.
if member.Name == "" {
continue
}

isFound := false
for _, node := range controlPlaneNodes.Items {
if member.Name == node.Name {
isFound = true
break
for _, nodeName := range nodeNames {
if member.Name == nodeName {
// We found the matching node, continue with the outer loop.
continue loopmembers
}
}
// Stop here if we found the member to be in the list of control plane nodes.
if isFound {
continue
}
removedMembers = append(removedMembers, fmt.Sprintf("%d (Name: %s)", member.ID, member.Name))

// If we're here, the node cannot be found.
removedMembers = append(removedMembers, member.Name)
if err := w.removeMemberForNode(ctx, member.Name); err != nil {
errs = append(errs, err)
}
Expand All @@ -86,6 +78,7 @@ func (w *Workload) ReconcileEtcdMembers(ctx context.Context) ([]string, error) {
}
}
}

return removedMembers, kerrors.NewAggregate(errs)
}

Expand Down Expand Up @@ -127,10 +120,10 @@ func (w *Workload) removeMemberForNode(ctx context.Context, name string) error {
}

// Exclude node being removed from etcd client node list
var remainingNodes []corev1.Node
var remainingNodes []string
for _, n := range controlPlaneNodes.Items {
if n.Name != name {
remainingNodes = append(remainingNodes, n)
remainingNodes = append(remainingNodes, n.Name)
}
}
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, remainingNodes)
Expand Down Expand Up @@ -174,8 +167,11 @@ func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1
if err != nil {
return errors.Wrap(err, "failed to list control plane nodes")
}

etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodes.Items)
nodeNames := make([]string, 0, len(nodes.Items))
for _, node := range nodes.Items {
nodeNames = append(nodeNames, node.Name)
}
etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodeNames)
if err != nil {
return errors.Wrap(err, "failed to create etcd client")
}
Expand Down Expand Up @@ -217,8 +213,11 @@ func (w *Workload) EtcdMembers(ctx context.Context) ([]string, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to list control plane nodes")
}

etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodes.Items)
nodeNames := make([]string, 0, len(nodes.Items))
for _, node := range nodes.Items {
nodeNames = append(nodeNames, node.Name)
}
etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodeNames)
if err != nil {
return nil, errors.Wrap(err, "failed to create etcd client")
}
Expand Down
19 changes: 11 additions & 8 deletions controlplane/kubeadm/internal/workload_cluster_etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,15 +504,17 @@ kind: ClusterStatus`,
tests := []struct {
name string
objs []client.Object
nodes []string
etcdClientGenerator etcdClientFor
expectErr bool
assert func(*WithT)
}{
{
// the node to be removed is ip-10-0-0-3.ec2.internal since the
// other two have nodes
name: "successfully removes the etcd member without a node and removes the node from kubeadm config",
objs: []client.Object{node1.DeepCopy(), node2.DeepCopy(), kubeadmConfig.DeepCopy()},
name: "successfully removes the etcd member without a node and removes the node from kubeadm config",
objs: []client.Object{node1.DeepCopy(), node2.DeepCopy(), kubeadmConfig.DeepCopy()},
nodes: []string{node1.Name, node2.Name},
etcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClient: &etcd.Client{
EtcdClient: fakeEtcdClient,
Expand All @@ -524,8 +526,9 @@ kind: ClusterStatus`,
},
},
{
name: "return error if there aren't enough control plane nodes",
objs: []client.Object{node1.DeepCopy(), kubeadmConfig.DeepCopy()},
name: "return error if there aren't enough control plane nodes",
objs: []client.Object{node1.DeepCopy(), kubeadmConfig.DeepCopy()},
nodes: []string{node1.Name},
etcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClient: &etcd.Client{
EtcdClient: fakeEtcdClient,
Expand All @@ -551,7 +554,7 @@ kind: ClusterStatus`,
etcdClientGenerator: tt.etcdClientGenerator,
}
ctx := context.TODO()
_, err := w.ReconcileEtcdMembers(ctx)
_, err := w.ReconcileEtcdMembers(ctx, tt.nodes)
if tt.expectErr {
g.Expect(err).To(HaveOccurred())
return
Expand All @@ -568,20 +571,20 @@ kind: ClusterStatus`,

type fakeEtcdClientGenerator struct {
forNodesClient *etcd.Client
forNodesClientFunc func([]corev1.Node) (*etcd.Client, error)
forNodesClientFunc func([]string) (*etcd.Client, error)
forLeaderClient *etcd.Client
forNodesErr error
forLeaderErr error
}

func (c *fakeEtcdClientGenerator) forNodes(_ context.Context, n []corev1.Node) (*etcd.Client, error) {
func (c *fakeEtcdClientGenerator) forNodes(_ context.Context, n []string) (*etcd.Client, error) {
if c.forNodesClientFunc != nil {
return c.forNodesClientFunc(n)
}
return c.forNodesClient, c.forNodesErr
}

func (c *fakeEtcdClientGenerator) forLeader(_ context.Context, _ []corev1.Node) (*etcd.Client, error) {
func (c *fakeEtcdClientGenerator) forLeader(_ context.Context, _ []string) (*etcd.Client, error) {
return c.forLeaderClient, c.forLeaderErr
}

Expand Down

0 comments on commit 979197c

Please sign in to comment.