Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🏃[KCP] Recover from a manual machine deletion #2841

Merged
merged 2 commits into from
Apr 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions controlplane/kubeadm/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,37 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(o handler.M

return nil
}

// reconcileHealth performs health checks for control plane components and etcd
// It removes any etcd members that do not have a corresponding node.
func (r *KubeadmControlPlaneReconciler) reconcileHealth(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) error {
logger := controlPlane.Logger()

// Do a health check of the Control Plane components
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass control plane health check to continue reconciliation", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass control plane health check to continue reconciliation: %v", err)
return &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

// Ensure etcd is healthy
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
// If there are any etcd members that do not have corresponding nodes, remove them from etcd and from the kubeadm configmap.
// This will solve issues related to manual control-plane machine deletion.
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
return err
}
if err := workloadCluster.ReconcileEtcdMembers(ctx); err != nil {
detiber marked this conversation as resolved.
Show resolved Hide resolved
logger.V(2).Info("Failed attempt to remove potential hanging etcd members to pass etcd health check to continue reconciliation", "cause", err)
}

logger.V(2).Info("Waiting for control plane to pass etcd health check to continue reconciliation", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass etcd health check to continue reconciliation: %v", err)
return &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

return nil
}
4 changes: 4 additions & 0 deletions controlplane/kubeadm/controllers/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ func (f fakeWorkloadCluster) ForwardEtcdLeadership(_ context.Context, _ *cluster
return nil
}

func (f fakeWorkloadCluster) ReconcileEtcdMembers(ctx context.Context) error {
return nil
}

func (f fakeWorkloadCluster) ClusterStatus(_ context.Context) (internal.ClusterStatus, error) {
return f.Status, nil
}
Expand Down
30 changes: 5 additions & 25 deletions controlplane/kubeadm/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,7 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, _ internal.FilterableMachineCollection, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
logger := controlPlane.Logger()

if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass control plane health check before adding an additional control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass control plane health check before adding additional control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass etcd health check before adding an additional control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass etcd health check before adding additional control plane machine: %v", err)
if err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil {
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

Expand Down Expand Up @@ -90,13 +82,15 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{}, errors.Wrapf(err, "failed to create client to workload cluster")
}

// We don't want to health check at the beginning of this method to avoid blocking re-entrancy

// Wait for any delete in progress to complete before deleting another Machine
if controlPlane.HasDeletingMachine() {
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: deleteRequeueAfter}
}

if err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil {
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

// If there is not already a Machine that is marked for scale down, find one and mark it
markedForDeletion := selectedMachines.Filter(machinefilters.HasAnnotationKey(controlplanev1.DeleteForScaleDownAnnotation))
if len(markedForDeletion) == 0 {
Expand All @@ -113,13 +107,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{}, errors.New("failed to pick control plane Machine to delete")
}

// Ensure etcd is healthy prior to attempting to remove the member
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass etcd health check before removing a control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass etcd health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}
// If etcd leadership is on machine that is about to be deleted, move it to the newest member available.
etcdLeaderCandidate := ownedMachines.Newest()
if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete, etcdLeaderCandidate); err != nil {
detiber marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -148,13 +135,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
}
}

// Do a final health check of the Control Plane components prior to actually deleting the machine
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass control plane health check before removing a control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass control plane health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}
logger = logger.WithValues("machine", machineToDelete)
if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "Failed to delete control plane machine")
Expand Down
1 change: 1 addition & 0 deletions controlplane/kubeadm/controllers/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
}

ownedMachines := fmc.Machines.DeepCopy()

_, err := r.scaleUpControlPlane(context.Background(), cluster.DeepCopy(), kcp.DeepCopy(), ownedMachines, controlPlane)
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError(&capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}))
Expand Down
11 changes: 10 additions & 1 deletion controlplane/kubeadm/internal/workload_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,11 @@ type WorkloadCluster interface {
UpdateCoreDNS(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane) error
RemoveEtcdMemberForMachine(ctx context.Context, machine *clusterv1.Machine) error
RemoveMachineFromKubeadmConfigMap(ctx context.Context, machine *clusterv1.Machine) error
RemoveNodeFromKubeadmConfigMap(ctx context.Context, nodeName string) error
ForwardEtcdLeadership(ctx context.Context, machine *clusterv1.Machine, leaderCandidate *clusterv1.Machine) error

// State recovery tasks.
ReconcileEtcdMembers(ctx context.Context) error
}

// Workload defines operations on workload clusters.
Expand Down Expand Up @@ -227,13 +231,18 @@ func (w *Workload) RemoveMachineFromKubeadmConfigMap(ctx context.Context, machin
return nil
}

return w.RemoveNodeFromKubeadmConfigMap(ctx, machine.Status.NodeRef.Name)
}

// RemoveNodeFromKubeadmConfigMap removes the entry for the node from the kubeadm configmap.
func (w *Workload) RemoveNodeFromKubeadmConfigMap(ctx context.Context, name string) error {
vincepri marked this conversation as resolved.
Show resolved Hide resolved
configMapKey := ctrlclient.ObjectKey{Name: kubeadmConfigKey, Namespace: metav1.NamespaceSystem}
kubeadmConfigMap, err := w.getConfigMap(ctx, configMapKey)
if err != nil {
return err
}
config := &kubeadmConfig{ConfigMap: kubeadmConfigMap}
if err := config.RemoveAPIEndpoint(machine.Status.NodeRef.Name); err != nil {
if err := config.RemoveAPIEndpoint(name); err != nil {
return err
}
if err := w.Client.Update(ctx, config.ConfigMap); err != nil {
Expand Down
55 changes: 54 additions & 1 deletion controlplane/kubeadm/internal/workload_cluster_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kerrors "k8s.io/apimachinery/pkg/util/errors"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd"
etcdutil "sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/etcd/util"
Expand Down Expand Up @@ -89,6 +90,7 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
response[name] = errors.Wrap(err, "failed to list etcd members using etcd client")
continue
}

vincepri marked this conversation as resolved.
Show resolved Hide resolved
member := etcdutil.MemberForName(members, name)

// Check that the member reports no alarms.
Expand Down Expand Up @@ -130,6 +132,54 @@ func (w *Workload) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
return response, nil
}

// ReconcileEtcdMembers iterates over all etcd members and finds members that do not have corresponding nodes.
// If there are any such members, it deletes them from etcd and removes their nodes from the kubeadm configmap so that kubeadm does not run etcd health checks on them.
func (w *Workload) ReconcileEtcdMembers(ctx context.Context) error {
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
return err
}

errs := []error{}
for _, node := range controlPlaneNodes.Items {
sedefsavas marked this conversation as resolved.
Show resolved Hide resolved
sedefsavas marked this conversation as resolved.
Show resolved Hide resolved
name := node.Name

// Create the etcd Client for the etcd Pod scheduled on the Node
etcdClient, err := w.etcdClientGenerator.forNode(ctx, name)
if err != nil {
continue
}

members, err := etcdClient.Members(ctx)
if err != nil {
continue
}
// Check if any member's node is missing from workload cluster
// If any, delete it with best effort
for _, member := range members {
isFound := false
for _, node := range controlPlaneNodes.Items {
if member.Name == node.Name {
isFound = true
sedefsavas marked this conversation as resolved.
Show resolved Hide resolved
break
}
}
// Stop here if we found the member to be in the list of control plane nodes.
if isFound {
continue
}
if err := w.removeMemberForNode(ctx, member.Name); err != nil {
errs = append(errs, err)
}

if err := w.RemoveNodeFromKubeadmConfigMap(ctx, member.Name); err != nil {
errs = append(errs, err)
}
sedefsavas marked this conversation as resolved.
Show resolved Hide resolved
detiber marked this conversation as resolved.
Show resolved Hide resolved
}
}
return kerrors.NewAggregate(errs)
}

// UpdateEtcdVersionInKubeadmConfigMap sets the imageRepository or the imageTag or both in the kubeadm config map.
func (w *Workload) UpdateEtcdVersionInKubeadmConfigMap(ctx context.Context, imageRepository, imageTag string) error {
configMapKey := ctrlclient.ObjectKey{Name: kubeadmConfigKey, Namespace: metav1.NamespaceSystem}
Expand All @@ -155,7 +205,10 @@ func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clus
// Nothing to do, no node for Machine
return nil
}
return w.removeMemberForNode(ctx, machine.Status.NodeRef.Name)
}

func (w *Workload) removeMemberForNode(ctx context.Context, name string) error {
// Pick a different node to talk to etcd
controlPlaneNodes, err := w.getControlPlaneNodes(ctx)
if err != nil {
Expand All @@ -174,7 +227,7 @@ func (w *Workload) RemoveEtcdMemberForMachine(ctx context.Context, machine *clus
if err != nil {
return errors.Wrap(err, "failed to list etcd members using etcd client")
}
member := etcdutil.MemberForName(members, machine.Status.NodeRef.Name)
member := etcdutil.MemberForName(members, name)

// The member has already been removed, return immediately
if member == nil {
Expand Down
40 changes: 38 additions & 2 deletions test/infrastructure/docker/e2e/docker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var _ = Describe("Docker Create", func() {
mgmtClient ctrlclient.Client
cluster *clusterv1.Cluster
)
SetDefaultEventuallyTimeout(10 * time.Minute)
SetDefaultEventuallyTimeout(15 * time.Minute)
SetDefaultEventuallyPollingInterval(10 * time.Second)

AfterEach(func() {
Expand Down Expand Up @@ -137,7 +137,7 @@ var _ = Describe("Docker Create", func() {
Cluster: cluster,
ControlPlane: controlPlane,
}
framework.WaitForOneKubeadmControlPlaneMachineToExist(ctx, waitForOneKubeadmControlPlaneMachineToExistInput, "5m")
framework.WaitForOneKubeadmControlPlaneMachineToExist(ctx, waitForOneKubeadmControlPlaneMachineToExistInput, "15m")

// Insatll a networking solution on the workload cluster
workloadClient, err := mgmt.GetWorkloadClient(ctx, cluster.Namespace, cluster.Name)
Expand Down Expand Up @@ -194,6 +194,42 @@ var _ = Describe("Docker Create", func() {
},
}
framework.AssertControlPlaneFailureDomains(ctx, assertControlPlaneFailureDomainInput)

Describe("Docker recover from manual workload machine deletion", func() {
By("cleaning up etcd members and kubeadm configMap")
inClustersNamespaceListOption := ctrlclient.InNamespace(cluster.Namespace)
// ControlPlane labels
matchClusterListOption := ctrlclient.MatchingLabels{
clusterv1.MachineControlPlaneLabelName: "",
clusterv1.ClusterLabelName: cluster.Name,
}

machineList := &clusterv1.MachineList{}
err = mgmtClient.List(ctx, machineList, inClustersNamespaceListOption, matchClusterListOption)
Expect(err).ToNot(HaveOccurred())
Expect(machineList.Items).To(HaveLen(int(*controlPlane.Spec.Replicas)))

Expect(mgmtClient.Delete(ctx, &machineList.Items[0])).To(Succeed())

Eventually(func() (int, error) {
machineList := &clusterv1.MachineList{}
if err := mgmtClient.List(ctx, machineList, inClustersNamespaceListOption, matchClusterListOption); err != nil {
fmt.Println(err)
return 0, err
}
return len(machineList.Items), nil
}, "15m", "5s").Should(Equal(int(*controlPlane.Spec.Replicas) - 1))

By("ensuring a replacement machine is created")
Eventually(func() (int, error) {
machineList := &clusterv1.MachineList{}
if err := mgmtClient.List(ctx, machineList, inClustersNamespaceListOption, matchClusterListOption); err != nil {
fmt.Println(err)
return 0, err
}
return len(machineList.Items), nil
}, "15m", "30s").Should(Equal(int(*controlPlane.Spec.Replicas)))
})
})

})
Expand Down
12 changes: 6 additions & 6 deletions test/infrastructure/docker/e2e/docker_upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ var _ = Describe("Docker Upgrade", func() {
cluster *clusterv1.Cluster
controlPlane *controlplanev1.KubeadmControlPlane
)
SetDefaultEventuallyTimeout(10 * time.Minute)
SetDefaultEventuallyTimeout(15 * time.Minute)
SetDefaultEventuallyPollingInterval(10 * time.Second)

BeforeEach(func() {
Expand Down Expand Up @@ -97,7 +97,7 @@ var _ = Describe("Docker Upgrade", func() {
Cluster: cluster,
ControlPlane: controlPlane,
}
framework.WaitForOneKubeadmControlPlaneMachineToExist(ctx, waitForOneKubeadmControlPlaneMachineToExistInput, "5m")
framework.WaitForOneKubeadmControlPlaneMachineToExist(ctx, waitForOneKubeadmControlPlaneMachineToExistInput, "15m")

// Insatll a networking solution on the workload cluster
workloadClient, err := mgmt.GetWorkloadClient(ctx, cluster.Namespace, cluster.Name)
Expand All @@ -116,7 +116,7 @@ var _ = Describe("Docker Upgrade", func() {
Cluster: cluster,
ControlPlane: controlPlane,
}
framework.WaitForKubeadmControlPlaneMachinesToExist(ctx, assertKubeadmControlPlaneNodesExistInput, "10m", "10s")
framework.WaitForKubeadmControlPlaneMachinesToExist(ctx, assertKubeadmControlPlaneNodesExistInput, "15m", "10s")

// Create the workload nodes
createMachineDeploymentinput := framework.CreateMachineDeploymentInput{
Expand Down Expand Up @@ -228,7 +228,7 @@ var _ = Describe("Docker Upgrade", func() {
return 0, errors.New("old nodes remain")
}
return upgraded, nil
}, "10m", "30s").Should(Equal(int(*controlPlane.Spec.Replicas)))
}, "15m", "30s").Should(Equal(int(*controlPlane.Spec.Replicas)))

workloadClient, err := mgmt.GetWorkloadClient(ctx, cluster.Namespace, cluster.Name)
Expect(err).ToNot(HaveOccurred())
Expand All @@ -245,7 +245,7 @@ var _ = Describe("Docker Upgrade", func() {
}

return false, nil
}, "10m", "30s").Should(BeTrue())
}, "15m", "30s").Should(BeTrue())

By("ensuring CoreDNS has the correct image")
Eventually(func() (bool, error) {
Expand All @@ -259,7 +259,7 @@ var _ = Describe("Docker Upgrade", func() {
}

return false, nil
}, "10m", "30s").Should(BeTrue())
}, "15m", "30s").Should(BeTrue())

// Before patching ensure all pods are ready in workload cluster
// Might not need this step any more.
Expand Down