From a854877112f410d2ccca76b0455bcfaccf980e20 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Fri, 26 Jun 2020 21:39:27 +0000 Subject: [PATCH 1/7] KCP handles MHC remediation Do not remediate unless: - we have at least 3 machines - etcd quorum will be preserved - we have sufficient replicas (don't need to scale up) --- controllers/machinehealthcheck_targets.go | 4 +- .../kubeadm/controllers/controller.go | 61 +++++++++++++++++ controlplane/kubeadm/controllers/scale.go | 9 --- .../kubeadm/controllers/scale_test.go | 67 +------------------ .../kubeadm/controllers/upgrade_test.go | 9 --- .../kubeadm/internal/control_plane.go | 18 +++++ .../kubeadm/internal/control_plane_test.go | 31 +++++++++ .../machinefilters/machine_filters.go | 17 +++++ .../kubeadm/internal/workload_cluster.go | 1 + .../kubeadm/internal/workload_cluster_etcd.go | 34 ++++++++++ .../internal/workload_cluster_etcd_test.go | 24 +++++++ 11 files changed, 189 insertions(+), 86 deletions(-) diff --git a/controllers/machinehealthcheck_targets.go b/controllers/machinehealthcheck_targets.go index dec495adafee..416b10f0315e 100644 --- a/controllers/machinehealthcheck_targets.go +++ b/controllers/machinehealthcheck_targets.go @@ -96,13 +96,13 @@ func (t *healthCheckTarget) needsRemediation(logger logr.Logger, timeoutForMachi now := time.Now() if t.Machine.Status.FailureReason != nil { - conditions.MarkFalse(t.Machine, clusterv1.MachineHealthCheckSuccededCondition, clusterv1.MachineHasFailure, clusterv1.ConditionSeverityWarning, "FailureReason: %v", t.Machine.Status.FailureReason) + conditions.MarkFalse(t.Machine, clusterv1.MachineHealthCheckSuccededCondition, clusterv1.MachineHasFailure, clusterv1.ConditionSeverityWarning, "FailureReason: %v", *t.Machine.Status.FailureReason) logger.V(3).Info("Target is unhealthy", "failureReason", t.Machine.Status.FailureReason) return true, time.Duration(0) } if t.Machine.Status.FailureMessage != nil { - conditions.MarkFalse(t.Machine, clusterv1.MachineHealthCheckSuccededCondition, clusterv1.MachineHasFailure, clusterv1.ConditionSeverityWarning, "FailureMessage: %v", t.Machine.Status.FailureMessage) + conditions.MarkFalse(t.Machine, clusterv1.MachineHealthCheckSuccededCondition, clusterv1.MachineHasFailure, clusterv1.ConditionSeverityWarning, "FailureMessage: %v", *t.Machine.Status.FailureMessage) logger.V(3).Info("Target is unhealthy", "failureMessage", t.Machine.Status.FailureMessage) return true, time.Duration(0) } diff --git a/controlplane/kubeadm/controllers/controller.go b/controlplane/kubeadm/controllers/controller.go index d8fb0ac10a71..ee757bbc0e2b 100644 --- a/controlplane/kubeadm/controllers/controller.go +++ b/controlplane/kubeadm/controllers/controller.go @@ -292,6 +292,14 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster * // source ref (reason@machine/name) so the problem can be easily tracked down to its source machine. conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef()) + if result, err := r.remediateUnhealthy(ctx, logger, controlPlane); err != nil || !result.IsZero() { + return ctrl.Result{}, err + } + + if result, err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() { + return result, err + } + // Control plane machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations. needRollout := controlPlane.MachinesNeedingRollout() switch { @@ -425,6 +433,10 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(o handler.M // It removes any etcd members that do not have a corresponding node. // Also, as a final step, checks if there is any machines that is being deleted. func (r *KubeadmControlPlaneReconciler) reconcileHealth(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) { + if controlPlane.Machines.Len() == 0 { + return ctrl.Result{}, nil + } + logger := controlPlane.Logger() // Do a health check of the Control Plane components @@ -574,3 +586,52 @@ func (r *KubeadmControlPlaneReconciler) adoptOwnedSecrets(ctx context.Context, k return nil } + +func (r *KubeadmControlPlaneReconciler) remediateUnhealthy(ctx context.Context, logger logr.Logger, controlPlane *internal.ControlPlane) (ctrl.Result, error) { + if !controlPlane.RemediationAllowed() || !controlPlane.HasUnhealthyMachine() || controlPlane.Machines.Len() < int(*controlPlane.KCP.Spec.Replicas) { + return ctrl.Result{}, nil + } + + if controlPlane.HasDeletingMachine() { + return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil + } + + workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster)) + if err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to create workload cluster client") + } + + etcdStatus, err := workloadCluster.EtcdStatus(ctx) + if err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to fetch etcd status") + } + + if etcdStatus.FailureTolerance() == 0 { + logger.Info("refusing to remediate unhealthy machines, cluster has no failure tolerance") + return ctrl.Result{}, nil + } + + machine := controlPlane.UnhealthyMachines().Oldest() + + logger.Info("remediating unhealthy machine", + "machine", machine.Name, + "reason", conditions.GetReason(machine, clusterv1.MachineHealthCheckSuccededCondition), + "message", conditions.GetMessage(machine, clusterv1.MachineHealthCheckSuccededCondition), + ) + + if err := r.Client.Delete(ctx, machine); err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to delete unhealthy machine") + } + + patchHelper, err := patch.NewHelper(machine, r.Client) + if err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to initialize patch helper") + } + + conditions.MarkTrue(machine, clusterv1.MachineOwnerRemediatedCondition) + if err := patchHelper.Patch(ctx, machine); err != nil { + return ctrl.Result{}, errors.Wrap(err, "failed to patch unhealthy machine") + } + + return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil +} diff --git a/controlplane/kubeadm/controllers/scale.go b/controlplane/kubeadm/controllers/scale.go index 03b8184edc09..b54d3c25c40b 100644 --- a/controlplane/kubeadm/controllers/scale.go +++ b/controlplane/kubeadm/controllers/scale.go @@ -63,11 +63,6 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) { logger := controlPlane.Logger() - // reconcileHealth returns err if there is a machine being delete which is a required condition to check before scaling up - if result, err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() { - return result, err - } - // Create the bootstrap configuration bootstrapSpec := controlPlane.JoinControlPlaneConfig() fd := controlPlane.NextFailureDomainForScaleUp() @@ -90,10 +85,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane( ) (ctrl.Result, error) { logger := controlPlane.Logger() - if result, err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() { - return result, err - } - workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster)) if err != nil { logger.Error(err, "Failed to create client to workload cluster") diff --git a/controlplane/kubeadm/controllers/scale_test.go b/controlplane/kubeadm/controllers/scale_test.go index dd29817a1c08..cc6609d7addc 100644 --- a/controlplane/kubeadm/controllers/scale_test.go +++ b/controlplane/kubeadm/controllers/scale_test.go @@ -81,7 +81,7 @@ func TestKubeadmControlPlaneReconciler_initializeControlPlane(t *testing.T) { } func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) { - t.Run("creates a control plane Machine if health checks pass", func(t *testing.T) { + t.Run("creates a control plane Machine", func(t *testing.T) { g := NewWithT(t) cluster, kcp, genericMachineTemplate := createClusterWithControlPlane() @@ -122,71 +122,6 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) { g.Expect(fakeClient.List(context.Background(), &controlPlaneMachines)).To(Succeed()) g.Expect(controlPlaneMachines.Items).To(HaveLen(3)) }) - t.Run("does not create a control plane Machine if health checks fail", func(t *testing.T) { - cluster, kcp, genericMachineTemplate := createClusterWithControlPlane() - initObjs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), genericMachineTemplate.DeepCopy()} - - beforeMachines := internal.NewFilterableMachineCollection() - for i := 0; i < 2; i++ { - m, _ := createMachineNodePair(fmt.Sprintf("test-%d", i), cluster.DeepCopy(), kcp.DeepCopy(), true) - beforeMachines.Insert(m) - initObjs = append(initObjs, m.DeepCopy()) - } - - testCases := []struct { - name string - etcdUnHealthy bool - controlPlaneUnHealthy bool - }{ - { - name: "etcd health check fails", - etcdUnHealthy: true, - }, - { - name: "controlplane component health check fails", - controlPlaneUnHealthy: true, - }, - } - for _, tc := range testCases { - g := NewWithT(t) - - fakeClient := newFakeClient(g, initObjs...) - fmc := &fakeManagementCluster{ - Machines: beforeMachines.DeepCopy(), - ControlPlaneHealthy: !tc.controlPlaneUnHealthy, - EtcdHealthy: !tc.etcdUnHealthy, - } - - r := &KubeadmControlPlaneReconciler{ - Client: fakeClient, - managementCluster: fmc, - managementClusterUncached: fmc, - Log: log.Log, - recorder: record.NewFakeRecorder(32), - } - controlPlane := &internal.ControlPlane{ - KCP: kcp, - Cluster: cluster, - Machines: beforeMachines, - } - - result, err := r.scaleUpControlPlane(context.Background(), cluster.DeepCopy(), kcp.DeepCopy(), controlPlane) - g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: healthCheckFailedRequeueAfter})) - g.Expect(err).To(BeNil()) - - controlPlaneMachines := &clusterv1.MachineList{} - g.Expect(fakeClient.List(context.Background(), controlPlaneMachines)).To(Succeed()) - g.Expect(controlPlaneMachines.Items).To(HaveLen(len(beforeMachines))) - - endMachines := internal.NewFilterableMachineCollectionFromMachineList(controlPlaneMachines) - for _, m := range endMachines { - bm, ok := beforeMachines[m.Name] - bm.SetResourceVersion("1") - g.Expect(ok).To(BeTrue()) - g.Expect(m).To(Equal(bm)) - } - } - }) } func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.T) { diff --git a/controlplane/kubeadm/controllers/upgrade_test.go b/controlplane/kubeadm/controllers/upgrade_test.go index 53596c317188..1d9d21f43439 100644 --- a/controlplane/kubeadm/controllers/upgrade_test.go +++ b/controlplane/kubeadm/controllers/upgrade_test.go @@ -86,15 +86,6 @@ func TestKubeadmControlPlaneReconciler_upgradeControlPlane(t *testing.T) { bothMachines := &clusterv1.MachineList{} g.Expect(fakeClient.List(context.Background(), bothMachines, client.InNamespace(cluster.Namespace))).To(Succeed()) g.Expect(bothMachines.Items).To(HaveLen(2)) - - // run upgrade a second time, simulate that the node has not appeared yet but the machine exists - r.managementCluster.(*fakeManagementCluster).ControlPlaneHealthy = false - result, err = r.upgradeControlPlane(context.Background(), cluster, kcp, controlPlane, needingUpgrade) - g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: healthCheckFailedRequeueAfter})) - g.Expect(err).To(BeNil()) - g.Expect(fakeClient.List(context.Background(), bothMachines, client.InNamespace(cluster.Namespace))).To(Succeed()) - g.Expect(bothMachines.Items).To(HaveLen(2)) - controlPlane.Machines = internal.NewFilterableMachineCollectionFromMachineList(bothMachines) // manually increase number of nodes, make control plane healthy again diff --git a/controlplane/kubeadm/internal/control_plane.go b/controlplane/kubeadm/internal/control_plane.go index 3c4cb1d73b6f..8023b0bce2ac 100644 --- a/controlplane/kubeadm/internal/control_plane.go +++ b/controlplane/kubeadm/internal/control_plane.go @@ -34,6 +34,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +// MachineHealthCheck remediation is only supported on clusters with >= 3 machines to avoid disrupting etcd consensus +const minimumClusterSizeForRemediation = 3 + // ControlPlane holds business logic around control planes. // It should never need to connect to a service, that responsibility lies outside of this struct. // Going forward we should be trying to add more logic to here and reduce the amount of logic in the reconciler. @@ -244,6 +247,21 @@ func (c *ControlPlane) UpToDateMachines() FilterableMachineCollection { return c.Machines.Difference(c.MachinesNeedingRollout()) } +// RemediationAllowed returns whether the cluster is large enough to support MHC remediation. +// Clusters < minimumClusterSizeForRemediation do not have sufficient etcd failure tolerance. +func (c *ControlPlane) RemediationAllowed() bool { + return c.Machines.Len() >= minimumClusterSizeForRemediation +} + +// UnhealthyMachines returns the machines that need remediation. +func (c *ControlPlane) UnhealthyMachines() FilterableMachineCollection { + return c.Machines.Filter(machinefilters.NeedsRemediation) +} + +func (c *ControlPlane) HasUnhealthyMachine() bool { + return c.UnhealthyMachines().Len() > 0 +} + // getInfraResources fetches the external infrastructure resource for each machine in the collection and returns a map of machine.Name -> infraResource. func getInfraResources(ctx context.Context, cl client.Client, machines FilterableMachineCollection) (map[string]*unstructured.Unstructured, error) { result := map[string]*unstructured.Unstructured{} diff --git a/controlplane/kubeadm/internal/control_plane_test.go b/controlplane/kubeadm/internal/control_plane_test.go index 04915542a5f5..1edfce6a1a54 100644 --- a/controlplane/kubeadm/internal/control_plane_test.go +++ b/controlplane/kubeadm/internal/control_plane_test.go @@ -29,6 +29,7 @@ import ( clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3" bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1alpha3" controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3" + "sigs.k8s.io/cluster-api/util/conditions" ) func TestControlPlane(t *testing.T) { @@ -106,6 +107,18 @@ var _ = Describe("Control Plane", func() { }) }) +func TestUnhealthyMachines(t *testing.T) { + g := NewWithT(t) + kcp := &controlplanev1.KubeadmControlPlane{} + cp := &ControlPlane{KCP: kcp} + cp.Machines = NewFilterableMachineCollection( + machine("1", withNeedsRemediationCondition()), + machine("2", withNodeRef(), withInfrastructureReady()), + machine("3"), + ) + g.Expect(cp.UnhealthyMachines().Names()).To(ConsistOf("1")) +} + func failureDomain(controlPlane bool) clusterv1.FailureDomainSpec { return clusterv1.FailureDomainSpec{ ControlPlane: controlPlane, @@ -117,3 +130,21 @@ func withFailureDomain(fd string) machineOpt { m.Spec.FailureDomain = &fd } } + +func withNeedsRemediationCondition() machineOpt { + return func(m *clusterv1.Machine) { + conditions.MarkFalse(m, clusterv1.MachineOwnerRemediatedCondition, "some reason", "some severity", "") + } +} + +func withNodeRef() machineOpt { + return func(m *clusterv1.Machine) { + m.Status.NodeRef = &corev1.ObjectReference{} + } +} + +func withInfrastructureReady() machineOpt { + return func(m *clusterv1.Machine) { + m.Status.InfrastructureReady = true + } +} diff --git a/controlplane/kubeadm/internal/machinefilters/machine_filters.go b/controlplane/kubeadm/internal/machinefilters/machine_filters.go index a54fc3e9fda5..238e85056c90 100644 --- a/controlplane/kubeadm/internal/machinefilters/machine_filters.go +++ b/controlplane/kubeadm/internal/machinefilters/machine_filters.go @@ -387,3 +387,20 @@ func cleanupConfigFields(kcpConfig *bootstrapv1.KubeadmConfigSpec, machineConfig machineConfig.Spec.JoinConfiguration.TypeMeta = kcpConfig.JoinConfiguration.TypeMeta } } + +// NeedsRemediation returns whether the machine has the +// MachineOwnerRemediatedCondition set to false. +func NeedsRemediation(m *clusterv1.Machine) bool { + return conditions.IsFalse(m, clusterv1.MachineOwnerRemediatedCondition) +} + +// IsProvisioning returns whether the machine is missing its NodeRef or does +// not have InfrastructureReady set to true. +func IsProvisioning(m *clusterv1.Machine) bool { + return m.Status.NodeRef == nil || !m.Status.InfrastructureReady +} + +// IsFailed returns whether the machine has a FailureMessage or a FailureReason. +func IsFailed(m *clusterv1.Machine) bool { + return m.Status.FailureMessage != nil || m.Status.FailureReason != nil +} diff --git a/controlplane/kubeadm/internal/workload_cluster.go b/controlplane/kubeadm/internal/workload_cluster.go index 2a1a55a73af2..f35110fd183f 100644 --- a/controlplane/kubeadm/internal/workload_cluster.go +++ b/controlplane/kubeadm/internal/workload_cluster.go @@ -59,6 +59,7 @@ type WorkloadCluster interface { ClusterStatus(ctx context.Context) (ClusterStatus, error) ControlPlaneIsHealthy(ctx context.Context) (HealthCheckResult, error) EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error) + EtcdStatus(ctx context.Context) (EtcdStatus, error) // Upgrade related tasks. ReconcileKubeletRBACBinding(ctx context.Context, version semver.Version) error diff --git a/controlplane/kubeadm/internal/workload_cluster_etcd.go b/controlplane/kubeadm/internal/workload_cluster_etcd.go index d76934c6c8e2..172372e2ce22 100644 --- a/controlplane/kubeadm/internal/workload_cluster_etcd.go +++ b/controlplane/kubeadm/internal/workload_cluster_etcd.go @@ -289,3 +289,37 @@ func (w *Workload) ForwardEtcdLeadership(ctx context.Context, machine *clusterv1 } return nil } + +// EtcdStatus returns the current status of the etcd cluster +func (w *Workload) EtcdStatus(ctx context.Context) (EtcdStatus, error) { + nodes, err := w.getControlPlaneNodes(ctx) + if err != nil { + return EtcdStatus{}, errors.Wrap(err, "failed to list control plane nodes") + } + + etcdClient, err := w.etcdClientGenerator.forNodes(ctx, nodes.Items) + if err != nil { + return EtcdStatus{}, errors.Wrap(err, "failed to create etcd client") + } + + members, err := etcdClient.Members(ctx) + if err != nil { + return EtcdStatus{}, errors.Wrap(err, "failed to list etcd members using etcd client") + } + + return EtcdStatus{ + Members: members, + }, nil +} + +// EtcdStatus is a snapshot of the etcd cluster's status +type EtcdStatus struct { + Members []*etcd.Member +} + +// FailureTolerance is the amount of members the etcd cluster can afford to +// lose without losing quorum. +// Ref: https://github.com/etcd-io/etcd/blob/master/Documentation/faq.md#what-is-failure-tolerance +func (e *EtcdStatus) FailureTolerance() int { + return len(e.Members) - (len(e.Members)/2.0 + 1) +} diff --git a/controlplane/kubeadm/internal/workload_cluster_etcd_test.go b/controlplane/kubeadm/internal/workload_cluster_etcd_test.go index e2b9b9409e90..49438238c763 100644 --- a/controlplane/kubeadm/internal/workload_cluster_etcd_test.go +++ b/controlplane/kubeadm/internal/workload_cluster_etcd_test.go @@ -563,3 +563,27 @@ func defaultMachine(transforms ...func(m *clusterv1.Machine)) *clusterv1.Machine } return m } + +func TestEtcdStatusFailureTolerance(t *testing.T) { + g := NewWithT(t) + status := EtcdStatus{ + Members: []*etcd.Member{ + { + Name: "1", + }, + }, + } + g.Expect(status.FailureTolerance()).To(Equal(0)) + status.Members = append(status.Members, &etcd.Member{Name: "2"}) + g.Expect(status.FailureTolerance()).To(Equal(0)) + status.Members = append(status.Members, &etcd.Member{Name: "3"}) + g.Expect(status.FailureTolerance()).To(Equal(1)) + status.Members = append(status.Members, &etcd.Member{Name: "4"}) + g.Expect(status.FailureTolerance()).To(Equal(1)) + status.Members = append(status.Members, &etcd.Member{Name: "5"}) + g.Expect(status.FailureTolerance()).To(Equal(2)) + status.Members = append(status.Members, &etcd.Member{Name: "6"}) + g.Expect(status.FailureTolerance()).To(Equal(2)) + status.Members = append(status.Members, &etcd.Member{Name: "7"}) + g.Expect(status.FailureTolerance()).To(Equal(3)) +} From 56be5ed66376eaec72139f9d0a31c3db8b4156d8 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Mon, 29 Jun 2020 19:18:14 +0000 Subject: [PATCH 2/7] Special-case remediating a machine with unresponsive etcd --- .../kubeadm/controllers/controller.go | 10 +-- controlplane/kubeadm/internal/cluster.go | 3 + .../kubeadm/internal/workload_cluster.go | 2 + .../kubeadm/internal/workload_cluster_etcd.go | 44 ++++++++++- .../internal/workload_cluster_etcd_test.go | 79 +++++++++++++++---- 5 files changed, 112 insertions(+), 26 deletions(-) diff --git a/controlplane/kubeadm/controllers/controller.go b/controlplane/kubeadm/controllers/controller.go index ee757bbc0e2b..655b087a741e 100644 --- a/controlplane/kubeadm/controllers/controller.go +++ b/controlplane/kubeadm/controllers/controller.go @@ -101,7 +101,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, optio r.recorder = mgr.GetEventRecorderFor("kubeadm-control-plane-controller") if r.managementCluster == nil { - r.managementCluster = &internal.Management{Client: r.Client} + r.managementCluster = &internal.Management{Client: r.Client, Log: r.Log} } if r.managementClusterUncached == nil { r.managementClusterUncached = &internal.Management{Client: mgr.GetAPIReader()} @@ -606,13 +606,13 @@ func (r *KubeadmControlPlaneReconciler) remediateUnhealthy(ctx context.Context, return ctrl.Result{}, errors.Wrap(err, "failed to fetch etcd status") } - if etcdStatus.FailureTolerance() == 0 { - logger.Info("refusing to remediate unhealthy machines, cluster has no failure tolerance") + machine := controlPlane.UnhealthyMachines().Oldest() + + if etcdStatus.FailureTolerance(machine) == 0 { + logger.Info("cluster has no failure tolerance, skipping remediation") return ctrl.Result{}, nil } - machine := controlPlane.UnhealthyMachines().Oldest() - logger.Info("remediating unhealthy machine", "machine", machine.Name, "reason", conditions.GetReason(machine, clusterv1.MachineHealthCheckSuccededCondition), diff --git a/controlplane/kubeadm/internal/cluster.go b/controlplane/kubeadm/internal/cluster.go index 4421c86526c1..b41fef9146f8 100644 --- a/controlplane/kubeadm/internal/cluster.go +++ b/controlplane/kubeadm/internal/cluster.go @@ -23,6 +23,7 @@ import ( "fmt" "time" + "github.com/go-logr/logr" "github.com/pkg/errors" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" @@ -50,6 +51,7 @@ type ManagementCluster interface { // Management holds operations on the management cluster. type Management struct { Client ctrlclient.Reader + Log logr.Logger } // RemoteClusterConnectionError represents a failure to connect to a remote cluster @@ -137,6 +139,7 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O restConfig: restConfig, tlsConfig: cfg, }, + Log: m.Log, }, nil } diff --git a/controlplane/kubeadm/internal/workload_cluster.go b/controlplane/kubeadm/internal/workload_cluster.go index f35110fd183f..5e9bac70cf2a 100644 --- a/controlplane/kubeadm/internal/workload_cluster.go +++ b/controlplane/kubeadm/internal/workload_cluster.go @@ -29,6 +29,7 @@ import ( "time" "github.com/blang/semver" + "github.com/go-logr/logr" "github.com/pkg/errors" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -84,6 +85,7 @@ type WorkloadCluster interface { type Workload struct { Client ctrlclient.Client CoreDNSMigrator coreDNSMigrator + Log logr.Logger etcdClientGenerator etcdClientFor } diff --git a/controlplane/kubeadm/internal/workload_cluster_etcd.go b/controlplane/kubeadm/internal/workload_cluster_etcd.go index 172372e2ce22..a9692f8d78fd 100644 --- a/controlplane/kubeadm/internal/workload_cluster_etcd.go +++ b/controlplane/kubeadm/internal/workload_cluster_etcd.go @@ -307,19 +307,55 @@ func (w *Workload) EtcdStatus(ctx context.Context) (EtcdStatus, error) { return EtcdStatus{}, errors.Wrap(err, "failed to list etcd members using etcd client") } + statuses := map[string]EtcdMemberStatus{} + for _, member := range members { + status := EtcdMemberStatus{Member: member, Responsive: true} + + _, err := etcdClient.EtcdClient.Status(ctx, staticPodName("etcd", member.Name)) + if err != nil { + status.Responsive = false + w.Log.Error(err, "error fetching etcd status for member", "member", member.Name) + } + statuses[member.Name] = status + } + return EtcdStatus{ - Members: members, + Members: statuses, }, nil } // EtcdStatus is a snapshot of the etcd cluster's status type EtcdStatus struct { - Members []*etcd.Member + Members map[string]EtcdMemberStatus +} + +// EtcdMemberStatus is a snapshot of a etcd member's status +type EtcdMemberStatus struct { + *etcd.Member + Responsive bool } // FailureTolerance is the amount of members the etcd cluster can afford to // lose without losing quorum. // Ref: https://github.com/etcd-io/etcd/blob/master/Documentation/faq.md#what-is-failure-tolerance -func (e *EtcdStatus) FailureTolerance() int { - return len(e.Members) - (len(e.Members)/2.0 + 1) +func (e *EtcdStatus) FailureTolerance(machine *clusterv1.Machine) int { + var responsive int + for _, m := range e.Members { + if m.Responsive { + responsive++ + } + } + + defaultTolerance := responsive - (responsive/2.0 + 1) + + if machine.Status.NodeRef == nil { + return defaultTolerance + } + + member, found := e.Members[machine.Status.NodeRef.Name] + if !found || member.Responsive { + return defaultTolerance + } + + return defaultTolerance + 1 } diff --git a/controlplane/kubeadm/internal/workload_cluster_etcd_test.go b/controlplane/kubeadm/internal/workload_cluster_etcd_test.go index 49438238c763..159a24ac0fb7 100644 --- a/controlplane/kubeadm/internal/workload_cluster_etcd_test.go +++ b/controlplane/kubeadm/internal/workload_cluster_etcd_test.go @@ -564,26 +564,71 @@ func defaultMachine(transforms ...func(m *clusterv1.Machine)) *clusterv1.Machine return m } -func TestEtcdStatusFailureTolerance(t *testing.T) { +func TestEtcdStatusFailureTolerance_allResponsive(t *testing.T) { g := NewWithT(t) status := EtcdStatus{ - Members: []*etcd.Member{ - { - Name: "1", + Members: map[string]EtcdMemberStatus{ + "one": { + Responsive: true, }, }, } - g.Expect(status.FailureTolerance()).To(Equal(0)) - status.Members = append(status.Members, &etcd.Member{Name: "2"}) - g.Expect(status.FailureTolerance()).To(Equal(0)) - status.Members = append(status.Members, &etcd.Member{Name: "3"}) - g.Expect(status.FailureTolerance()).To(Equal(1)) - status.Members = append(status.Members, &etcd.Member{Name: "4"}) - g.Expect(status.FailureTolerance()).To(Equal(1)) - status.Members = append(status.Members, &etcd.Member{Name: "5"}) - g.Expect(status.FailureTolerance()).To(Equal(2)) - status.Members = append(status.Members, &etcd.Member{Name: "6"}) - g.Expect(status.FailureTolerance()).To(Equal(2)) - status.Members = append(status.Members, &etcd.Member{Name: "7"}) - g.Expect(status.FailureTolerance()).To(Equal(3)) + machine := &clusterv1.Machine{ObjectMeta: metav1.ObjectMeta{Name: "one"}} + g.Expect(status.FailureTolerance(machine)).To(Equal(0)) + + status.Members["two"] = EtcdMemberStatus{Responsive: true} + g.Expect(status.FailureTolerance(machine)).To(Equal(0)) + + status.Members["three"] = EtcdMemberStatus{Responsive: true} + g.Expect(status.FailureTolerance(machine)).To(Equal(1)) + + status.Members["four"] = EtcdMemberStatus{Responsive: true} + g.Expect(status.FailureTolerance(machine)).To(Equal(1)) + + status.Members["five"] = EtcdMemberStatus{Responsive: true} + g.Expect(status.FailureTolerance(machine)).To(Equal(2)) + + status.Members["six"] = EtcdMemberStatus{Responsive: true} + g.Expect(status.FailureTolerance(machine)).To(Equal(2)) + + status.Members["seven"] = EtcdMemberStatus{Responsive: true} + g.Expect(status.FailureTolerance(machine)).To(Equal(3)) +} + +func TestEtcdStatusFailureTolerance_unresponsive(t *testing.T) { + g := NewWithT(t) + status := EtcdStatus{ + Members: map[string]EtcdMemberStatus{ + "one": { + Responsive: true, + }, + "two": { + Responsive: true, + }, + "three": { + Responsive: true, + }, + }, + } + + // a machine without a noderef gets default tolerance + machine := &clusterv1.Machine{} + g.Expect(status.FailureTolerance(machine)).To(Equal(1)) + + // a machine that is not a member gets default tolerance + machine.Status = clusterv1.MachineStatus{NodeRef: &corev1.ObjectReference{Name: "foo"}} + g.Expect(status.FailureTolerance(machine)).To(Equal(1)) + + // a responsive machine gets default tolerance calculation + machine.Status.NodeRef.Name = "one" + g.Expect(status.FailureTolerance(machine)).To(Equal(1)) + + // this member is not responsive, so removing it will not reduce the failure tolerance + status.Members["one"] = EtcdMemberStatus{Responsive: false} + g.Expect(status.FailureTolerance(machine)).To(Equal(1)) + + // this member is responsive but another member is not, removing it will reduce failure tolerance + status.Members["one"] = EtcdMemberStatus{Responsive: true} + status.Members["two"] = EtcdMemberStatus{Responsive: false} + g.Expect(status.FailureTolerance(machine)).To(Equal(0)) } From b2d6df66f1b0ee49cac51035cf92943dd5398d5b Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Wed, 1 Jul 2020 14:05:18 +0000 Subject: [PATCH 3/7] Fix failure tolerance calculation --- .../kubeadm/internal/workload_cluster_etcd.go | 8 ++-- .../internal/workload_cluster_etcd_test.go | 47 ++++++++++++++----- 2 files changed, 39 insertions(+), 16 deletions(-) diff --git a/controlplane/kubeadm/internal/workload_cluster_etcd.go b/controlplane/kubeadm/internal/workload_cluster_etcd.go index a9692f8d78fd..bebc6c3f40b9 100644 --- a/controlplane/kubeadm/internal/workload_cluster_etcd.go +++ b/controlplane/kubeadm/internal/workload_cluster_etcd.go @@ -339,14 +339,14 @@ type EtcdMemberStatus struct { // lose without losing quorum. // Ref: https://github.com/etcd-io/etcd/blob/master/Documentation/faq.md#what-is-failure-tolerance func (e *EtcdStatus) FailureTolerance(machine *clusterv1.Machine) int { - var responsive int + var unresponsive int for _, m := range e.Members { - if m.Responsive { - responsive++ + if !m.Responsive { + unresponsive++ } } - defaultTolerance := responsive - (responsive/2.0 + 1) + defaultTolerance := len(e.Members) - (len(e.Members)/2.0 + 1) - unresponsive if machine.Status.NodeRef == nil { return defaultTolerance diff --git a/controlplane/kubeadm/internal/workload_cluster_etcd_test.go b/controlplane/kubeadm/internal/workload_cluster_etcd_test.go index 159a24ac0fb7..5f7c89d8a1c8 100644 --- a/controlplane/kubeadm/internal/workload_cluster_etcd_test.go +++ b/controlplane/kubeadm/internal/workload_cluster_etcd_test.go @@ -599,13 +599,31 @@ func TestEtcdStatusFailureTolerance_unresponsive(t *testing.T) { g := NewWithT(t) status := EtcdStatus{ Members: map[string]EtcdMemberStatus{ - "one": { + "1": { + Responsive: true, + }, + "2": { + Responsive: true, + }, + "3": { + Responsive: true, + }, + "4": { + Responsive: true, + }, + "5": { + Responsive: true, + }, + "6": { Responsive: true, }, - "two": { + "7": { Responsive: true, }, - "three": { + "8": { + Responsive: true, + }, + "9": { Responsive: true, }, }, @@ -613,22 +631,27 @@ func TestEtcdStatusFailureTolerance_unresponsive(t *testing.T) { // a machine without a noderef gets default tolerance machine := &clusterv1.Machine{} - g.Expect(status.FailureTolerance(machine)).To(Equal(1)) + g.Expect(status.FailureTolerance(machine)).To(Equal(4)) // a machine that is not a member gets default tolerance machine.Status = clusterv1.MachineStatus{NodeRef: &corev1.ObjectReference{Name: "foo"}} - g.Expect(status.FailureTolerance(machine)).To(Equal(1)) + g.Expect(status.FailureTolerance(machine)).To(Equal(4)) // a responsive machine gets default tolerance calculation - machine.Status.NodeRef.Name = "one" - g.Expect(status.FailureTolerance(machine)).To(Equal(1)) + machine.Status.NodeRef.Name = "1" + g.Expect(status.FailureTolerance(machine)).To(Equal(4)) // this member is not responsive, so removing it will not reduce the failure tolerance - status.Members["one"] = EtcdMemberStatus{Responsive: false} - g.Expect(status.FailureTolerance(machine)).To(Equal(1)) + status.Members["1"] = EtcdMemberStatus{Responsive: false} + g.Expect(status.FailureTolerance(machine)).To(Equal(4)) // this member is responsive but another member is not, removing it will reduce failure tolerance - status.Members["one"] = EtcdMemberStatus{Responsive: true} - status.Members["two"] = EtcdMemberStatus{Responsive: false} - g.Expect(status.FailureTolerance(machine)).To(Equal(0)) + status.Members["1"] = EtcdMemberStatus{Responsive: true} + status.Members["2"] = EtcdMemberStatus{Responsive: false} + g.Expect(status.FailureTolerance(machine)).To(Equal(3)) + + // 3 unresponsive members + status.Members["3"] = EtcdMemberStatus{Responsive: false} + status.Members["4"] = EtcdMemberStatus{Responsive: false} + g.Expect(status.FailureTolerance(machine)).To(Equal(1)) } From 8146ce575c29e994a9015da7a5eeeb8843c13ab6 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Mon, 6 Jul 2020 09:08:39 -0400 Subject: [PATCH 4/7] Update controlplane/kubeadm/controllers/controller.go Co-authored-by: Jason DeTiberus --- controlplane/kubeadm/controllers/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/controlplane/kubeadm/controllers/controller.go b/controlplane/kubeadm/controllers/controller.go index 655b087a741e..f49f4e8acb77 100644 --- a/controlplane/kubeadm/controllers/controller.go +++ b/controlplane/kubeadm/controllers/controller.go @@ -633,5 +633,5 @@ func (r *KubeadmControlPlaneReconciler) remediateUnhealthy(ctx context.Context, return ctrl.Result{}, errors.Wrap(err, "failed to patch unhealthy machine") } - return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil + return ctrl.Result{}, nil } From ae95eb82df531166ff792dc58bcb81f558206add Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Mon, 6 Jul 2020 16:39:25 +0000 Subject: [PATCH 5/7] comments, comments, comments --- controlplane/kubeadm/controllers/controller.go | 4 +++- .../kubeadm/internal/workload_cluster_etcd.go | 18 ++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/controlplane/kubeadm/controllers/controller.go b/controlplane/kubeadm/controllers/controller.go index f49f4e8acb77..7ec877748907 100644 --- a/controlplane/kubeadm/controllers/controller.go +++ b/controlplane/kubeadm/controllers/controller.go @@ -588,6 +588,8 @@ func (r *KubeadmControlPlaneReconciler) adoptOwnedSecrets(ctx context.Context, k } func (r *KubeadmControlPlaneReconciler) remediateUnhealthy(ctx context.Context, logger logr.Logger, controlPlane *internal.ControlPlane) (ctrl.Result, error) { + // No remediation while cluster is below remediation threshold, or if the cluster is scaling up + // This is to prevent multiple remediations from happening before replacements are created. if !controlPlane.RemediationAllowed() || !controlPlane.HasUnhealthyMachine() || controlPlane.Machines.Len() < int(*controlPlane.KCP.Spec.Replicas) { return ctrl.Result{}, nil } @@ -608,7 +610,7 @@ func (r *KubeadmControlPlaneReconciler) remediateUnhealthy(ctx context.Context, machine := controlPlane.UnhealthyMachines().Oldest() - if etcdStatus.FailureTolerance(machine) == 0 { + if etcdStatus.FailureTolerance(machine) <= 0 { logger.Info("cluster has no failure tolerance, skipping remediation") return ctrl.Result{}, nil } diff --git a/controlplane/kubeadm/internal/workload_cluster_etcd.go b/controlplane/kubeadm/internal/workload_cluster_etcd.go index bebc6c3f40b9..e4fde01bc6d9 100644 --- a/controlplane/kubeadm/internal/workload_cluster_etcd.go +++ b/controlplane/kubeadm/internal/workload_cluster_etcd.go @@ -337,6 +337,11 @@ type EtcdMemberStatus struct { // FailureTolerance is the amount of members the etcd cluster can afford to // lose without losing quorum. +// It accepts a machine as an argument in order to answer whether this specific +// machine can be safely deleted. For example, a 3-machine cluster has a +// failure tolerance of 1, but if member A is unresponsive and we are +// considering deleting member B, this method will give a failure tolerance of +// 0. Given member A, it would return 1. // Ref: https://github.com/etcd-io/etcd/blob/master/Documentation/faq.md#what-is-failure-tolerance func (e *EtcdStatus) FailureTolerance(machine *clusterv1.Machine) int { var unresponsive int @@ -346,16 +351,29 @@ func (e *EtcdStatus) FailureTolerance(machine *clusterv1.Machine) int { } } + // Calculate the default cluster failure tolerance as number-of-members / 2 + 1. + // Subtract unresponsive members to represent that they are already potentially failed + // and shouldn't be counted as contributing towards quorum. defaultTolerance := len(e.Members) - (len(e.Members)/2.0 + 1) - unresponsive + // If a machine does not have a NodeRef we have no way of correlating it to + // an etcd member. It is unlikely that it is an etcd member yet, but either + // way we have no way of determining if it is responsive or not. if machine.Status.NodeRef == nil { return defaultTolerance } member, found := e.Members[machine.Status.NodeRef.Name] + // If the nodeRef isn't in the map this member is not running etcd for some + // reason, this is an edge case. + // If the member is responsive, use the normal tolerance calculation to + // determine if it is deletable. if !found || member.Responsive { return defaultTolerance } + // If this machine is unresponsive, add 1 to the default tolerance since we + // have already subtracted unresponsive machines from the default tolerance but removing this + // machine will not further reduce quorum. return defaultTolerance + 1 } From 2590f2a7bf0c99230c6e6619e68c5b22fe3828af Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Wed, 15 Jul 2020 14:26:39 +0000 Subject: [PATCH 6/7] Move HasDeletingMachine requeue up in the stack --- controlplane/kubeadm/controllers/controller.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/controlplane/kubeadm/controllers/controller.go b/controlplane/kubeadm/controllers/controller.go index 7ec877748907..cfd211aff4f2 100644 --- a/controlplane/kubeadm/controllers/controller.go +++ b/controlplane/kubeadm/controllers/controller.go @@ -292,8 +292,13 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster * // source ref (reason@machine/name) so the problem can be easily tracked down to its source machine. conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef()) + // Wait for machines to finish deleting before performing reconciliation to simplify subsequent logic + if controlPlane.HasDeletingMachine() { + return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: deleteRequeueAfter} + } + if result, err := r.remediateUnhealthy(ctx, logger, controlPlane); err != nil || !result.IsZero() { - return ctrl.Result{}, err + return result, err } if result, err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() { @@ -465,13 +470,6 @@ func (r *KubeadmControlPlaneReconciler) reconcileHealth(ctx context.Context, clu return ctrl.Result{RequeueAfter: healthCheckFailedRequeueAfter}, nil } - // We need this check for scale up as well as down to avoid scaling up when there is a machine being deleted. - // This should be at the end of this method as no need to wait for machine to be completely deleted to reconcile etcd. - // TODO: Revisit during machine remediation implementation which may need to cover other machine phases. - if controlPlane.HasDeletingMachine() { - return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil - } - return ctrl.Result{}, nil } @@ -594,10 +592,6 @@ func (r *KubeadmControlPlaneReconciler) remediateUnhealthy(ctx context.Context, return ctrl.Result{}, nil } - if controlPlane.HasDeletingMachine() { - return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil - } - workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster)) if err != nil { return ctrl.Result{}, errors.Wrap(err, "failed to create workload cluster client") From 0733640ce32cf40eeecfc17ec2e2f33ad132ab34 Mon Sep 17 00:00:00 2001 From: Ben Moss Date: Mon, 3 Aug 2020 16:41:34 -0400 Subject: [PATCH 7/7] Add more comments about scale-up blocking remediation Move the logic to inside RemediationAllowed, it makes more sense to have all the logic be in the one method --- controlplane/kubeadm/controllers/controller.go | 3 +-- controlplane/kubeadm/internal/control_plane.go | 7 +++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/controlplane/kubeadm/controllers/controller.go b/controlplane/kubeadm/controllers/controller.go index cfd211aff4f2..ef9588ee7a27 100644 --- a/controlplane/kubeadm/controllers/controller.go +++ b/controlplane/kubeadm/controllers/controller.go @@ -587,8 +587,7 @@ func (r *KubeadmControlPlaneReconciler) adoptOwnedSecrets(ctx context.Context, k func (r *KubeadmControlPlaneReconciler) remediateUnhealthy(ctx context.Context, logger logr.Logger, controlPlane *internal.ControlPlane) (ctrl.Result, error) { // No remediation while cluster is below remediation threshold, or if the cluster is scaling up - // This is to prevent multiple remediations from happening before replacements are created. - if !controlPlane.RemediationAllowed() || !controlPlane.HasUnhealthyMachine() || controlPlane.Machines.Len() < int(*controlPlane.KCP.Spec.Replicas) { + if !controlPlane.RemediationAllowed() || !controlPlane.HasUnhealthyMachine() { return ctrl.Result{}, nil } diff --git a/controlplane/kubeadm/internal/control_plane.go b/controlplane/kubeadm/internal/control_plane.go index 8023b0bce2ac..176d890b05a4 100644 --- a/controlplane/kubeadm/internal/control_plane.go +++ b/controlplane/kubeadm/internal/control_plane.go @@ -247,10 +247,13 @@ func (c *ControlPlane) UpToDateMachines() FilterableMachineCollection { return c.Machines.Difference(c.MachinesNeedingRollout()) } -// RemediationAllowed returns whether the cluster is large enough to support MHC remediation. +// RemediationAllowed returns whether the cluster is large enough to support MHC remediation and has reached the desired number of replicas. // Clusters < minimumClusterSizeForRemediation do not have sufficient etcd failure tolerance. +// We check to ensure that we have finished scaling up to avoid multiple remediations from happening before replacement machines have been created. +// This has the downside that KCP may get stuck if it encounters a failed machine during a scale-up operation, +// since remediation will not be allowed and KCP's own health checks will prevent it from reaching the full number of replicas. func (c *ControlPlane) RemediationAllowed() bool { - return c.Machines.Len() >= minimumClusterSizeForRemediation + return c.Machines.Len() >= minimumClusterSizeForRemediation && c.Machines.Len() >= int(*c.KCP.Spec.Replicas) } // UnhealthyMachines returns the machines that need remediation.