Skip to content

Commit

Permalink
[KCP] Recover from a manual machine deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
Sedef committed Apr 2, 2020
1 parent 96f0bed commit feade58
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 27 deletions.
27 changes: 27 additions & 0 deletions controlplane/kubeadm/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,30 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(o handler.M

return nil
}

func (r *KubeadmControlPlaneReconciler) generalHealthCheck(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) error {
logger := controlPlane.Logger()

// Do a health check of the Control Plane components
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass control plane health check to continue reconciliation", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass control plane health check to continue reconciliation: %v", err)
return &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

// Ensure etcd is healthy
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
// If there are any nodes in ETCD members that do not exist, remove them from ETCD and from kubeadm configmap.
// This will solve issues related to manual control-plane machine deletion.
if err := r.managementCluster.TargetClusterRemoveMissingNodes(ctx, util.ObjectKey(cluster)); err != nil {
logger.V(2).Info("Failed attempt to remove potential hanging etcd members to pass etcd health check to continue reconciliation", "cause", err)
}
logger.V(2).Info("Waiting for control plane to pass etcd health check to continue reconciliation", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass etcd health check to continue reconciliation: %v", err)
return &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

return nil
}
4 changes: 4 additions & 0 deletions controlplane/kubeadm/controllers/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ func (f *fakeManagementCluster) TargetClusterEtcdIsHealthy(_ context.Context, _
return nil
}

func (f *fakeManagementCluster) TargetClusterRemoveMissingNodes(ctx context.Context, clusterKey client.ObjectKey) error {
return nil
}

type fakeWorkloadCluster struct {
*internal.Workload
Status internal.ClusterStatus
Expand Down
31 changes: 6 additions & 25 deletions controlplane/kubeadm/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,7 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, _ internal.FilterableMachineCollection, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
logger := controlPlane.Logger()

if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass control plane health check before adding an additional control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass control plane health check before adding additional control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass etcd health check before adding an additional control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass etcd health check before adding additional control plane machine: %v", err)
if err := r.generalHealthCheck(ctx, cluster, kcp, controlPlane); err != nil {
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

Expand Down Expand Up @@ -90,13 +82,16 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster")
}

// We don't want to health check at the beginning of this method to avoid blocking re-entrancy

// Wait for any delete in progress to complete before deleting another Machine
if controlPlane.HasDeletingMachine() {
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: deleteRequeueAfter}
}

// We don't want to health check at the beginning of this method to avoid blocking re-entrancy
if err := r.generalHealthCheck(ctx, cluster, kcp, controlPlane); err != nil {
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

markedForDeletion := selectedMachines.Filter(machinefilters.HasAnnotationKey(controlplanev1.DeleteForScaleDownAnnotation))
if len(markedForDeletion) == 0 {
fd := controlPlane.FailureDomainWithMostMachines(selectedMachines)
Expand All @@ -117,13 +112,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{}, errors.New("failed to pick control plane Machine to delete")
}

// Ensure etcd is healthy prior to attempting to remove the member
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass etcd health check before removing a control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass etcd health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}
// If etcd leadership is on machine that is about to be deleted, move it to the newest member available.
etcdLeaderCandidate := ownedMachines.Newest()
if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete, etcdLeaderCandidate); err != nil {
Expand Down Expand Up @@ -152,13 +140,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
}
}

// Do a final health check of the Control Plane components prior to actually deleting the machine
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass control plane health check before removing a control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass control plane health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}
logger = logger.WithValues("machine", machineToDelete)
if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "Failed to delete control plane machine")
Expand Down
11 changes: 11 additions & 0 deletions controlplane/kubeadm/internal/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ManagementCluster interface {
TargetClusterEtcdIsHealthy(ctx context.Context, clusterKey client.ObjectKey, controlPlaneName string) error
TargetClusterControlPlaneIsHealthy(ctx context.Context, clusterKey client.ObjectKey, controlPlaneName string) error
GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey) (WorkloadCluster, error)
TargetClusterRemoveMissingNodes(ctx context.Context, clusterKey client.ObjectKey) error
}

// Management holds operations on the management cluster.
Expand Down Expand Up @@ -178,3 +179,13 @@ func (m *Management) TargetClusterEtcdIsHealthy(ctx context.Context, clusterKey
}
return m.healthCheck(ctx, cluster.EtcdIsHealthy, clusterKey, controlPlaneName)
}

// TargetClusterRemoveMissingNodes is to be called if target cluster's etcd cluster is unhealthy.
// It checks if there are any ETCD members whose nodes are missing from the target cluster and remove them if there are any.
func (m *Management) TargetClusterRemoveMissingNodes(ctx context.Context, clusterKey client.ObjectKey) error {
cluster, err := m.GetWorkloadCluster(ctx, clusterKey)
if err != nil {
return err
}
return cluster.RemoveEtcdMissingNodes(ctx)
}
11 changes: 10 additions & 1 deletion controlplane/kubeadm/internal/workload_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ type WorkloadCluster interface {
UpdateCoreDNS(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane) error
RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error
RemoveMachineFromKubeadmConfigMap(ctx context.Context, machine *clusterv1.Machine) error
RemoveNodeFromKubeadmConfigMap(ctx context.Context, nodeName string) error
ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error

// State recovery tasks.
RemoveEtcdMissingNodes(ctx context.Context) error
}

// Workload defines operations on workload clusters.
Expand Down Expand Up @@ -226,13 +230,18 @@ func (w *Workload) RemoveMachineFromKubeadmConfigMap(ctx context.Context, machin
return nil
}

return w.RemoveNodeFromKubeadmConfigMap(ctx, machine.Status.NodeRef.Name)
}

// RemoveNodeFromKubeadmConfigMap removes the entry for the node from the kubeadm configmap.
func (w *Workload) RemoveNodeFromKubeadmConfigMap(ctx context.Context, name string) error {
configMapKey := ctrlclient.ObjectKey{Name: kubeadmConfigKey, Namespace: metav1.NamespaceSystem}
kubeadmConfigMap, err := w.getConfigMap(ctx, configMapKey)
if err != nil {
return err
}
config := &kubeadmConfig{ConfigMap: kubeadmConfigMap}
if err := config.RemoveAPIEndpoint(machine.Status.NodeRef.Name); err != nil {
if err := config.RemoveAPIEndpoint(name); err != nil {
return err
}
if err := w.Client.Update(ctx, config.ConfigMap); err != nil {
Expand Down
55 changes: 54 additions & 1 deletion controlplane/kubeadm/internal/workload_cluster_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
etcdutil "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd/util"
Expand Down Expand Up @@ -89,6 +90,7 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
response[name] = errors.Wrap(err, "failed to list etcd members using etcd client")
continue
}

member := etcdutil.MemberForName(members, name)

// Check that the member reports no alarms.
Expand Down Expand Up @@ -130,6 +132,54 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
return response, nil
}

// RemoveEtcdMissingNodes iterates over all ETCD members and finds members with nodes that do not belong to the workload cluster.
// If there are any such members, it deletes those members and remove nodes from kubeadmconfig so that Kubeadm does not try to run etcd-health check over them.
func (w *Workload) RemoveEtcdMissingNodes(ctx context.Context) error {
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
return err
}

errList := []error{}
for _, node := range controlPlaneNodes.Items {
name := node.Name

// Create the etcd Client for the etcd Pod scheduled on the Node
etcdClient, err := w.etcdClientGenerator.forNode(ctx, name)
if err != nil {
continue
}

// List etcd members. This checks that the member is healthy, because the request goes through consensus.
members, err := etcdClient.Members(ctx)
if err != nil {
continue
}
// check if there is any member's node is missing from workload cluster.
// Check if any member's node is missing from workload cluster
// If any, delete it with best effort
for _, m := range members {
for _, node := range controlPlaneNodes.Items {

name := node.Name
if m.Name == name {
continue
}
}

if err := w.removeMemberForNode(ctx, m.Name); err != nil {
errList = append(errList, err)
}

if err := w.RemoveNodeFromKubeadmConfigMap(ctx, m.Name); err != nil {
errList = append(errList, err)

}
}
}
return kerrors.NewAggregate(errList)
}

// UpdateEtcdVersionInKubeadmConfigMap sets the imageRepository or the imageTag or both in the kubeadm config map.
func (w *Workload) UpdateEtcdVersionInKubeadmConfigMap(ctx context.Context, imageRepository, imageTag string) error {
configMapKey := ctrlclient.ObjectKey{Name: kubeadmConfigKey, Namespace: metav1.NamespaceSystem}
Expand All @@ -155,7 +205,10 @@ func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clus
// Nothing to do, no node for Machine
return nil
}
return w.removeMemberForNode(ctx, machine.Status.NodeRef.Name)
}

func (w *Workload) removeMemberForNode(ctx context.Context, name string) error {
// Pick a different node to talk to etcd
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
Expand All @@ -174,7 +227,7 @@ func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clus
if err != nil {
return errors.Wrap(err, "failed to list etcd members using etcd client")
}
member := etcdutil.MemberForName(members, machine.Status.NodeRef.Name)
member := etcdutil.MemberForName(members, name)

// The member has already been removed, return immediately
if member == nil {
Expand Down

0 comments on commit feade58

Please sign in to comment.