diff --git a/controlplane/kubeadm/controllers/controller.go b/controlplane/kubeadm/controllers/controller.go index 494212ef99ee..ea22e507b5d6 100644 --- a/controlplane/kubeadm/controllers/controller.go +++ b/controlplane/kubeadm/controllers/controller.go @@ -320,3 +320,37 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(o handler.M return nil } + +// reconcileHealth performs health checks for control plane components and etcd +// It removes any etcd members that do not have a corresponding node. +func (r *KubeadmControlPlaneReconciler) reconcileHealth(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) error { + logger := controlPlane.Logger() + + // Do a health check of the Control Plane components + if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil { + logger.V(2).Info("Waiting for control plane to pass control plane health check to continue reconciliation", "cause", err) + r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", + "Waiting for control plane to pass control plane health check to continue reconciliation: %v", err) + return &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter} + } + + // Ensure etcd is healthy + if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil { + // If there are any etcd members that do not have corresponding nodes, remove them from etcd and from the kubeadm configmap. + // This will solve issues related to manual control-plane machine deletion. + workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) + if err != nil { + return err + } + if err := workloadCluster.ReconcileEtcdMembers(ctx); err != nil { + logger.V(2).Info("Failed attempt to remove potential hanging etcd members to pass etcd health check to continue reconciliation", "cause", err) + } + + logger.V(2).Info("Waiting for control plane to pass etcd health check to continue reconciliation", "cause", err) + r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", + "Waiting for control plane to pass etcd health check to continue reconciliation: %v", err) + return &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter} + } + + return nil +} diff --git a/controlplane/kubeadm/controllers/fakes_test.go b/controlplane/kubeadm/controllers/fakes_test.go index afc6846483d8..2a713c02af95 100644 --- a/controlplane/kubeadm/controllers/fakes_test.go +++ b/controlplane/kubeadm/controllers/fakes_test.go @@ -70,6 +70,10 @@ func (f fakeWorkloadCluster) ForwardEtcdLeadership(_ context.Context, _ *cluster return nil } +func (f fakeWorkloadCluster) ReconcileEtcdMembers(ctx context.Context) error { + return nil +} + func (f fakeWorkloadCluster) ClusterStatus(_ context.Context) (internal.ClusterStatus, error) { return f.Status, nil } diff --git a/controlplane/kubeadm/controllers/scale.go b/controlplane/kubeadm/controllers/scale.go index d0ff0e081b02..2b31e14ab068 100644 --- a/controlplane/kubeadm/controllers/scale.go +++ b/controlplane/kubeadm/controllers/scale.go @@ -49,15 +49,7 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, _ internal.FilterableMachineCollection, controlPlane *internal.ControlPlane) (ctrl.Result, error) { logger := controlPlane.Logger() - if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil { - logger.V(2).Info("Waiting for control plane to pass control plane health check before adding an additional control plane machine", "cause", err) - r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass control plane health check before adding additional control plane machine: %v", err) - return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter} - } - - if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil { - logger.V(2).Info("Waiting for control plane to pass etcd health check before adding an additional control plane machine", "cause", err) - r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass etcd health check before adding additional control plane machine: %v", err) + if err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil { return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter} } @@ -90,13 +82,15 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane( return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster") } - // We don't want to health check at the beginning of this method to avoid blocking re-entrancy - // Wait for any delete in progress to complete before deleting another Machine if controlPlane.HasDeletingMachine() { return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: deleteRequeueAfter} } + if err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil { + return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter} + } + // If there is not already a Machine that is marked for scale down, find one and mark it markedForDeletion := selectedMachines.Filter(machinefilters.HasAnnotationKey(controlplanev1.DeleteForScaleDownAnnotation)) if len(markedForDeletion) == 0 { @@ -113,13 +107,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane( return ctrl.Result{}, errors.New("failed to pick control plane Machine to delete") } - // Ensure etcd is healthy prior to attempting to remove the member - if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil { - logger.V(2).Info("Waiting for control plane to pass etcd health check before removing a control plane machine", "cause", err) - r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", - "Waiting for control plane to pass etcd health check before removing a control plane machine: %v", err) - return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter} - } // If etcd leadership is on machine that is about to be deleted, move it to the newest member available. etcdLeaderCandidate := ownedMachines.Newest() if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete, etcdLeaderCandidate); err != nil { @@ -148,13 +135,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane( } } - // Do a final health check of the Control Plane components prior to actually deleting the machine - if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil { - logger.V(2).Info("Waiting for control plane to pass control plane health check before removing a control plane machine", "cause", err) - r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", - "Waiting for control plane to pass control plane health check before removing a control plane machine: %v", err) - return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter} - } logger = logger.WithValues("machine", machineToDelete) if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) { logger.Error(err, "Failed to delete control plane machine") diff --git a/controlplane/kubeadm/controllers/scale_test.go b/controlplane/kubeadm/controllers/scale_test.go index 9aa5bd3962ea..2fa053fbe49d 100644 --- a/controlplane/kubeadm/controllers/scale_test.go +++ b/controlplane/kubeadm/controllers/scale_test.go @@ -164,6 +164,7 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) { } ownedMachines := fmc.Machines.DeepCopy() + _, err := r.scaleUpControlPlane(context.Background(), cluster.DeepCopy(), kcp.DeepCopy(), ownedMachines, controlPlane) g.Expect(err).To(HaveOccurred()) g.Expect(err).To(MatchError(&capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter})) diff --git a/controlplane/kubeadm/internal/workload_cluster.go b/controlplane/kubeadm/internal/workload_cluster.go index 0dcbc846f962..e96967a6907d 100644 --- a/controlplane/kubeadm/internal/workload_cluster.go +++ b/controlplane/kubeadm/internal/workload_cluster.go @@ -71,7 +71,11 @@ type WorkloadCluster interface { UpdateCoreDNS(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane) error RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error RemoveMachineFromKubeadmConfigMap(ctx context.Context, machine *clusterv1.Machine) error + RemoveNodeFromKubeadmConfigMap(ctx context.Context, nodeName string) error ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error + + // State recovery tasks. + ReconcileEtcdMembers(ctx context.Context) error } // Workload defines operations on workload clusters. @@ -227,13 +231,18 @@ func (w *Workload) RemoveMachineFromKubeadmConfigMap(ctx context.Context, machin return nil } + return w.RemoveNodeFromKubeadmConfigMap(ctx, machine.Status.NodeRef.Name) +} + +// RemoveNodeFromKubeadmConfigMap removes the entry for the node from the kubeadm configmap. +func (w *Workload) RemoveNodeFromKubeadmConfigMap(ctx context.Context, name string) error { configMapKey := ctrlclient.ObjectKey{Name: kubeadmConfigKey, Namespace: metav1.NamespaceSystem} kubeadmConfigMap, err := w.getConfigMap(ctx, configMapKey) if err != nil { return err } config := &kubeadmConfig{ConfigMap: kubeadmConfigMap} - if err := config.RemoveAPIEndpoint(machine.Status.NodeRef.Name); err != nil { + if err := config.RemoveAPIEndpoint(name); err != nil { return err } if err := w.Client.Update(ctx, config.ConfigMap); err != nil { diff --git a/controlplane/kubeadm/internal/workload_cluster_etcd.go b/controlplane/kubeadm/internal/workload_cluster_etcd.go index 4edec84adcf3..ec2029519bd9 100644 --- a/controlplane/kubeadm/internal/workload_cluster_etcd.go +++ b/controlplane/kubeadm/internal/workload_cluster_etcd.go @@ -22,6 +22,7 @@ import ( "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + kerrors "k8s.io/apimachinery/pkg/util/errors" clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd" etcdutil "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd/util" @@ -89,6 +90,7 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error) response[name] = errors.Wrap(err, "failed to list etcd members using etcd client") continue } + member := etcdutil.MemberForName(members, name) // Check that the member reports no alarms. @@ -130,6 +132,54 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error) return response, nil } +// ReconcileEtcdMembers iterates over all etcd members and finds members that do not have corresponding nodes. +// If there are any such members, it deletes them from etcd and removes their nodes from the kubeadm configmap so that kubeadm does not run etcd health checks on them. +func (w *Workload) ReconcileEtcdMembers(ctx context.Context) error { + controlPlaneNodes, err := w.getControlPlaneNodes(ctx) + if err != nil { + return err + } + + errs := []error{} + for _, node := range controlPlaneNodes.Items { + name := node.Name + + // Create the etcd Client for the etcd Pod scheduled on the Node + etcdClient, err := w.etcdClientGenerator.forNode(ctx, name) + if err != nil { + continue + } + + members, err := etcdClient.Members(ctx) + if err != nil { + continue + } + // Check if any member's node is missing from workload cluster + // If any, delete it with best effort + for _, member := range members { + isFound := false + for _, node := range controlPlaneNodes.Items { + if member.Name == node.Name { + isFound = true + break + } + } + // Stop here if we found the member to be in the list of control plane nodes. + if isFound { + continue + } + if err := w.removeMemberForNode(ctx, member.Name); err != nil { + errs = append(errs, err) + } + + if err := w.RemoveNodeFromKubeadmConfigMap(ctx, member.Name); err != nil { + errs = append(errs, err) + } + } + } + return kerrors.NewAggregate(errs) +} + // UpdateEtcdVersionInKubeadmConfigMap sets the imageRepository or the imageTag or both in the kubeadm config map. func (w *Workload) UpdateEtcdVersionInKubeadmConfigMap(ctx context.Context, imageRepository, imageTag string) error { configMapKey := ctrlclient.ObjectKey{Name: kubeadmConfigKey, Namespace: metav1.NamespaceSystem} @@ -155,7 +205,10 @@ func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clus // Nothing to do, no node for Machine return nil } + return w.removeMemberForNode(ctx, machine.Status.NodeRef.Name) +} +func (w *Workload) removeMemberForNode(ctx context.Context, name string) error { // Pick a different node to talk to etcd controlPlaneNodes, err := w.getControlPlaneNodes(ctx) if err != nil { @@ -174,7 +227,7 @@ func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clus if err != nil { return errors.Wrap(err, "failed to list etcd members using etcd client") } - member := etcdutil.MemberForName(members, machine.Status.NodeRef.Name) + member := etcdutil.MemberForName(members, name) // The member has already been removed, return immediately if member == nil { diff --git a/test/infrastructure/docker/e2e/docker_test.go b/test/infrastructure/docker/e2e/docker_test.go index 7cf27824cb08..8047ac326acc 100644 --- a/test/infrastructure/docker/e2e/docker_test.go +++ b/test/infrastructure/docker/e2e/docker_test.go @@ -46,7 +46,7 @@ var _ = Describe("Docker Create", func() { mgmtClient ctrlclient.Client cluster *clusterv1.Cluster ) - SetDefaultEventuallyTimeout(10 * time.Minute) + SetDefaultEventuallyTimeout(15 * time.Minute) SetDefaultEventuallyPollingInterval(10 * time.Second) AfterEach(func() { @@ -137,7 +137,7 @@ var _ = Describe("Docker Create", func() { Cluster: cluster, ControlPlane: controlPlane, } - framework.WaitForOneKubeadmControlPlaneMachineToExist(ctx, waitForOneKubeadmControlPlaneMachineToExistInput, "5m") + framework.WaitForOneKubeadmControlPlaneMachineToExist(ctx, waitForOneKubeadmControlPlaneMachineToExistInput, "15m") // Insatll a networking solution on the workload cluster workloadClient, err := mgmt.GetWorkloadClient(ctx, cluster.Namespace, cluster.Name) @@ -194,6 +194,42 @@ var _ = Describe("Docker Create", func() { }, } framework.AssertControlPlaneFailureDomains(ctx, assertControlPlaneFailureDomainInput) + + Describe("Docker recover from manual workload machine deletion", func() { + By("cleaning up etcd members and kubeadm configMap") + inClustersNamespaceListOption := ctrlclient.InNamespace(cluster.Namespace) + // ControlPlane labels + matchClusterListOption := ctrlclient.MatchingLabels{ + clusterv1.MachineControlPlaneLabelName: "", + clusterv1.ClusterLabelName: cluster.Name, + } + + machineList := &clusterv1.MachineList{} + err = mgmtClient.List(ctx, machineList, inClustersNamespaceListOption, matchClusterListOption) + Expect(err).ToNot(HaveOccurred()) + Expect(machineList.Items).To(HaveLen(int(*controlPlane.Spec.Replicas))) + + Expect(mgmtClient.Delete(ctx, &machineList.Items[0])).To(Succeed()) + + Eventually(func() (int, error) { + machineList := &clusterv1.MachineList{} + if err := mgmtClient.List(ctx, machineList, inClustersNamespaceListOption, matchClusterListOption); err != nil { + fmt.Println(err) + return 0, err + } + return len(machineList.Items), nil + }, "15m", "5s").Should(Equal(int(*controlPlane.Spec.Replicas) - 1)) + + By("ensuring a replacement machine is created") + Eventually(func() (int, error) { + machineList := &clusterv1.MachineList{} + if err := mgmtClient.List(ctx, machineList, inClustersNamespaceListOption, matchClusterListOption); err != nil { + fmt.Println(err) + return 0, err + } + return len(machineList.Items), nil + }, "15m", "30s").Should(Equal(int(*controlPlane.Spec.Replicas))) + }) }) }) diff --git a/test/infrastructure/docker/e2e/docker_upgrade_test.go b/test/infrastructure/docker/e2e/docker_upgrade_test.go index 21740bddd00f..2a8ed856028c 100644 --- a/test/infrastructure/docker/e2e/docker_upgrade_test.go +++ b/test/infrastructure/docker/e2e/docker_upgrade_test.go @@ -51,7 +51,7 @@ var _ = Describe("Docker Upgrade", func() { cluster *clusterv1.Cluster controlPlane *controlplanev1.KubeadmControlPlane ) - SetDefaultEventuallyTimeout(10 * time.Minute) + SetDefaultEventuallyTimeout(15 * time.Minute) SetDefaultEventuallyPollingInterval(10 * time.Second) BeforeEach(func() { @@ -97,7 +97,7 @@ var _ = Describe("Docker Upgrade", func() { Cluster: cluster, ControlPlane: controlPlane, } - framework.WaitForOneKubeadmControlPlaneMachineToExist(ctx, waitForOneKubeadmControlPlaneMachineToExistInput, "5m") + framework.WaitForOneKubeadmControlPlaneMachineToExist(ctx, waitForOneKubeadmControlPlaneMachineToExistInput, "15m") // Insatll a networking solution on the workload cluster workloadClient, err := mgmt.GetWorkloadClient(ctx, cluster.Namespace, cluster.Name) @@ -116,7 +116,7 @@ var _ = Describe("Docker Upgrade", func() { Cluster: cluster, ControlPlane: controlPlane, } - framework.WaitForKubeadmControlPlaneMachinesToExist(ctx, assertKubeadmControlPlaneNodesExistInput, "10m", "10s") + framework.WaitForKubeadmControlPlaneMachinesToExist(ctx, assertKubeadmControlPlaneNodesExistInput, "15m", "10s") // Create the workload nodes createMachineDeploymentinput := framework.CreateMachineDeploymentInput{ @@ -228,7 +228,7 @@ var _ = Describe("Docker Upgrade", func() { return 0, errors.New("old nodes remain") } return upgraded, nil - }, "10m", "30s").Should(Equal(int(*controlPlane.Spec.Replicas))) + }, "15m", "30s").Should(Equal(int(*controlPlane.Spec.Replicas))) workloadClient, err := mgmt.GetWorkloadClient(ctx, cluster.Namespace, cluster.Name) Expect(err).ToNot(HaveOccurred()) @@ -245,7 +245,7 @@ var _ = Describe("Docker Upgrade", func() { } return false, nil - }, "10m", "30s").Should(BeTrue()) + }, "15m", "30s").Should(BeTrue()) By("ensuring CoreDNS has the correct image") Eventually(func() (bool, error) { @@ -259,7 +259,7 @@ var _ = Describe("Docker Upgrade", func() { } return false, nil - }, "10m", "30s").Should(BeTrue()) + }, "15m", "30s").Should(BeTrue()) // Before patching ensure all pods are ready in workload cluster // Might not need this step any more.