Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 KCP reconcileEtcdMembers should use its own NodeRefs #3964

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions controlplane/kubeadm/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,11 +506,14 @@ func (r *KubeadmControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context
return ctrl.Result{}, nil
}

// If there are provisioning machines (machines without a node yet), return.
// Collect all the node names.
nodeNames := []string{}
for _, machine := range controlPlane.Machines {
if machine.Status.NodeRef == nil {
// If there are provisioning machines (machines without a node yet), return.
return ctrl.Result{}, nil
}
nodeNames = append(nodeNames, machine.Status.NodeRef.Name)
}

// Potential inconsistencies between the list of members and the list of machines/nodes are
Expand All @@ -525,7 +528,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context
return ctrl.Result{}, errors.Wrap(err, "cannot get remote client to workload cluster")
}

removedMembers, err := workloadCluster.ReconcileEtcdMembers(ctx)
removedMembers, err := workloadCluster.ReconcileEtcdMembers(ctx, nodeNames)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed attempt to reconcile etcd members")
}
Expand Down
2 changes: 1 addition & 1 deletion controlplane/kubeadm/controllers/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (f fakeWorkloadCluster) ForwardEtcdLeadership(_ context.Context, _ *cluster
return nil
}

func (f fakeWorkloadCluster) ReconcileEtcdMembers(ctx context.Context) ([]string, error) {
func (f fakeWorkloadCluster) ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error) {
return nil, nil
}

Expand Down
22 changes: 10 additions & 12 deletions controlplane/kubeadm/internal/etcd_client_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ import (
"crypto/tls"

"github.com/pkg/errors"
kerrors "k8s.io/apimachinery/pkg/util/errors"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/rest"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/proxy"
Expand All @@ -36,10 +34,10 @@ type etcdClientGenerator struct {
tlsConfig *tls.Config
}

func (c *etcdClientGenerator) forNodes(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error) {
endpoints := make([]string, len(nodes))
for i, node := range nodes {
endpoints[i] = staticPodName("etcd", node.Name)
func (c *etcdClientGenerator) forNodes(ctx context.Context, nodeNames []string) (*etcd.Client, error) {
endpoints := make([]string, len(nodeNames))
for i, name := range nodeNames {
endpoints[i] = staticPodName("etcd", name)
}

p := proxy.Proxy{
Expand All @@ -53,11 +51,11 @@ func (c *etcdClientGenerator) forNodes(ctx context.Context, nodes []corev1.Node)
}

// forLeader takes a list of nodes and returns a client to the leader node
func (c *etcdClientGenerator) forLeader(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error) {
func (c *etcdClientGenerator) forLeader(ctx context.Context, nodeNames []string) (*etcd.Client, error) {
var errs []error

for _, node := range nodes {
client, err := c.forNodes(ctx, []corev1.Node{node})
for _, nodeName := range nodeNames {
client, err := c.forNodes(ctx, []string{nodeName})
if err != nil {
errs = append(errs, err)
continue
Expand All @@ -69,8 +67,8 @@ func (c *etcdClientGenerator) forLeader(ctx context.Context, nodes []corev1.Node
continue
}
for _, member := range members {
if member.Name == node.Name && member.ID == client.LeaderID {
return c.forNodes(ctx, []corev1.Node{node})
if member.Name == nodeName && member.ID == client.LeaderID {
return c.forNodes(ctx, []string{nodeName})
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions controlplane/kubeadm/internal/workload_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ var (
)

// WorkloadCluster defines all behaviors necessary to upgrade kubernetes on a workload cluster
//
// TODO: Add a detailed description to each of these method definitions.
type WorkloadCluster interface {
// Basic health and status checks.
ClusterStatus(ctx context.Context) (ClusterStatus, error)
Expand All @@ -77,7 +79,7 @@ type WorkloadCluster interface {
AllowBootstrapTokensToGetNodes(ctx context.Context) error

// State recovery tasks.
ReconcileEtcdMembers(ctx context.Context) ([]string, error)
ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error)
}

// Workload defines operations on workload clusters.
Expand All @@ -92,7 +94,6 @@ func (w *Workload) getControlPlaneNodes(ctx context.Context) (*corev1.NodeList,
labels := map[string]string{
labelNodeRoleControlPlane: "",
}

if err := w.Client.List(ctx, nodes, ctrlclient.MatchingLabels(labels)); err != nil {
return nil, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (w *Workload) updateManagedEtcdConditions(ctx context.Context, controlPlane
}

// Create the etcd Client for the etcd Pod scheduled on the Node
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []corev1.Node{node})
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []string{node.Name})
if err != nil {
conditions.MarkUnknown(machine, controlplanev1.MachineEtcdMemberHealthyCondition, controlplanev1.EtcdMemberInspectionFailedReason, "Failed to connect to the etcd pod on the %s node", node.Name)
continue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ func TestUpdateEtcdConditions(t *testing.T) {
},
},
injectEtcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClientFunc: func(n []corev1.Node) (*etcd.Client, error) {
switch n[0].Name {
forNodesClientFunc: func(n []string) (*etcd.Client, error) {
switch n[0] {
case "n1":
return &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
Expand Down Expand Up @@ -274,8 +274,8 @@ func TestUpdateEtcdConditions(t *testing.T) {
},
},
injectEtcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClientFunc: func(n []corev1.Node) (*etcd.Client, error) {
switch n[0].Name {
forNodesClientFunc: func(n []string) (*etcd.Client, error) {
switch n[0] {
case "n1":
return &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
Expand Down Expand Up @@ -342,8 +342,8 @@ func TestUpdateEtcdConditions(t *testing.T) {
},
},
injectEtcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClientFunc: func(n []corev1.Node) (*etcd.Client, error) {
switch n[0].Name {
forNodesClientFunc: func(n []string) (*etcd.Client, error) {
switch n[0] {
case "n1":
return &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
Expand Down Expand Up @@ -392,8 +392,8 @@ func TestUpdateEtcdConditions(t *testing.T) {
},
},
injectEtcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClientFunc: func(n []corev1.Node) (*etcd.Client, error) {
switch n[0].Name {
forNodesClientFunc: func(n []string) (*etcd.Client, error) {
switch n[0] {
case "n1":
return &etcd.Client{
EtcdClient: &fake2.FakeEtcdClient{
Expand Down
55 changes: 27 additions & 28 deletions controlplane/kubeadm/internal/workload_cluster_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ package internal

import (
"context"
"fmt"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
Expand All @@ -31,23 +29,18 @@ import (
)

type etcdClientFor interface {
forNodes(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error)
forLeader(ctx context.Context, nodes []corev1.Node) (*etcd.Client, error)
forNodes(ctx context.Context, nodeNames []string) (*etcd.Client, error)
forLeader(ctx context.Context, nodeNames []string) (*etcd.Client, error)
}

// ReconcileEtcdMembers iterates over all etcd members and finds members that do not have corresponding nodes.
// If there are any such members, it deletes them from etcd and removes their nodes from the kubeadm configmap so that kubeadm does not run etcd health checks on them.
func (w *Workload) ReconcileEtcdMembers(ctx context.Context) ([]string, error) {
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, here we are moving away from getControlPlaneNodes because it can give false negatives while kubeadm join is still in progress.
Wondering if we should reconsider also all the other points where getControlPlaneNodes is being called

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could, but I didn't anticipate any issues in getting the nodes from the cluster for those, given that we only use it to connect to etcd and not as an authoritative list of members that we compare against.

We really need to work on the etcd client generator though, it's really confusing on how it's structured today.

if err != nil {
return nil, err
}

func (w *Workload) ReconcileEtcdMembers(ctx context.Context, nodeNames []string) ([]string, error) {
removedMembers := []string{}
errs := []error{}
for _, node := range controlPlaneNodes.Items {
for _, nodeName := range nodeNames {
// Create the etcd Client for the etcd Pod scheduled on the Node
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []corev1.Node{node})
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, []string{nodeName})
if err != nil {
continue
}
Expand All @@ -57,26 +50,25 @@ func (w *Workload) ReconcileEtcdMembers(ctx context.Context) ([]string, error) {
if err != nil {
continue
}

// Check if any member's node is missing from workload cluster
// If any, delete it with best effort
loopmembers:
for _, member := range members {
// If this member is just added, it has a empty name until the etcd pod starts. Ignore it.
if member.Name == "" {
continue
}

isFound := false
for _, node := range controlPlaneNodes.Items {
if member.Name == node.Name {
isFound = true
break
for _, nodeName := range nodeNames {
if member.Name == nodeName {
// We found the matching node, continue with the outer loop.
continue loopmembers
}
}
// Stop here if we found the member to be in the list of control plane nodes.
if isFound {
continue
}
removedMembers = append(removedMembers, fmt.Sprintf("%d (Name: %s)", member.ID, member.Name))

// If we're here, the node cannot be found.
removedMembers = append(removedMembers, member.Name)
if err := w.removeMemberForNode(ctx, member.Name); err != nil {
errs = append(errs, err)
}
Expand All @@ -86,6 +78,7 @@ func (w *Workload) ReconcileEtcdMembers(ctx context.Context) ([]string, error) {
}
}
}

return removedMembers, kerrors.NewAggregate(errs)
}

Expand Down Expand Up @@ -127,10 +120,10 @@ func (w *Workload) removeMemberForNode(ctx context.Context, name string) error {
}

// Exclude node being removed from etcd client node list
var remainingNodes []corev1.Node
var remainingNodes []string
for _, n := range controlPlaneNodes.Items {
if n.Name != name {
remainingNodes = append(remainingNodes, n)
remainingNodes = append(remainingNodes, n.Name)
}
}
etcdClient, err := w.etcdClientGenerator.forNodes(ctx, remainingNodes)
Expand Down Expand Up @@ -174,8 +167,11 @@ func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1
if err != nil {
return errors.Wrap(err, "failed to list control plane nodes")
}

etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodes.Items)
nodeNames := make([]string, 0, len(nodes.Items))
for _, node := range nodes.Items {
nodeNames = append(nodeNames, node.Name)
}
etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodeNames)
if err != nil {
return errors.Wrap(err, "failed to create etcd client")
}
Expand Down Expand Up @@ -217,8 +213,11 @@ func (w *Workload) EtcdMembers(ctx context.Context) ([]string, error) {
if err != nil {
return nil, errors.Wrap(err, "failed to list control plane nodes")
}

etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodes.Items)
nodeNames := make([]string, 0, len(nodes.Items))
for _, node := range nodes.Items {
nodeNames = append(nodeNames, node.Name)
}
etcdClient, err := w.etcdClientGenerator.forLeader(ctx, nodeNames)
if err != nil {
return nil, errors.Wrap(err, "failed to create etcd client")
}
Expand Down
19 changes: 11 additions & 8 deletions controlplane/kubeadm/internal/workload_cluster_etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,15 +504,17 @@ kind: ClusterStatus`,
tests := []struct {
name string
objs []client.Object
nodes []string
etcdClientGenerator etcdClientFor
expectErr bool
assert func(*WithT)
}{
{
// the node to be removed is ip-10-0-0-3.ec2.internal since the
// other two have nodes
name: "successfully removes the etcd member without a node and removes the node from kubeadm config",
objs: []client.Object{node1.DeepCopy(), node2.DeepCopy(), kubeadmConfig.DeepCopy()},
name: "successfully removes the etcd member without a node and removes the node from kubeadm config",
objs: []client.Object{node1.DeepCopy(), node2.DeepCopy(), kubeadmConfig.DeepCopy()},
nodes: []string{node1.Name, node2.Name},
etcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClient: &etcd.Client{
EtcdClient: fakeEtcdClient,
Expand All @@ -524,8 +526,9 @@ kind: ClusterStatus`,
},
},
{
name: "return error if there aren't enough control plane nodes",
objs: []client.Object{node1.DeepCopy(), kubeadmConfig.DeepCopy()},
name: "return error if there aren't enough control plane nodes",
objs: []client.Object{node1.DeepCopy(), kubeadmConfig.DeepCopy()},
nodes: []string{node1.Name},
etcdClientGenerator: &fakeEtcdClientGenerator{
forNodesClient: &etcd.Client{
EtcdClient: fakeEtcdClient,
Expand All @@ -551,7 +554,7 @@ kind: ClusterStatus`,
etcdClientGenerator: tt.etcdClientGenerator,
}
ctx := context.TODO()
_, err := w.ReconcileEtcdMembers(ctx)
_, err := w.ReconcileEtcdMembers(ctx, tt.nodes)
if tt.expectErr {
g.Expect(err).To(HaveOccurred())
return
Expand All @@ -568,20 +571,20 @@ kind: ClusterStatus`,

type fakeEtcdClientGenerator struct {
forNodesClient *etcd.Client
forNodesClientFunc func([]corev1.Node) (*etcd.Client, error)
forNodesClientFunc func([]string) (*etcd.Client, error)
forLeaderClient *etcd.Client
forNodesErr error
forLeaderErr error
}

func (c *fakeEtcdClientGenerator) forNodes(_ context.Context, n []corev1.Node) (*etcd.Client, error) {
func (c *fakeEtcdClientGenerator) forNodes(_ context.Context, n []string) (*etcd.Client, error) {
if c.forNodesClientFunc != nil {
return c.forNodesClientFunc(n)
}
return c.forNodesClient, c.forNodesErr
}

func (c *fakeEtcdClientGenerator) forLeader(_ context.Context, _ []corev1.Node) (*etcd.Client, error) {
func (c *fakeEtcdClientGenerator) forLeader(_ context.Context, _ []string) (*etcd.Client, error) {
return c.forLeaderClient, c.forLeaderErr
}

Expand Down