Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Add MHC remediation to KCP #3185

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions controllers/machinehealthcheck_targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ func (t *healthCheckTarget) needsRemediation(logger logr.Logger, timeoutForMachi
now := time.Now()

if t.Machine.Status.FailureReason != nil {
conditions.MarkFalse(t.Machine, clusterv1.MachineHealthCheckSuccededCondition, clusterv1.MachineHasFailure, clusterv1.ConditionSeverityWarning, "FailureReason: %v", t.Machine.Status.FailureReason)
conditions.MarkFalse(t.Machine, clusterv1.MachineHealthCheckSuccededCondition, clusterv1.MachineHasFailure, clusterv1.ConditionSeverityWarning, "FailureReason: %v", *t.Machine.Status.FailureReason)
benmoss marked this conversation as resolved.
Show resolved Hide resolved
logger.V(3).Info("Target is unhealthy", "failureReason", t.Machine.Status.FailureReason)
return true, time.Duration(0)
}

if t.Machine.Status.FailureMessage != nil {
conditions.MarkFalse(t.Machine, clusterv1.MachineHealthCheckSuccededCondition, clusterv1.MachineHasFailure, clusterv1.ConditionSeverityWarning, "FailureMessage: %v", t.Machine.Status.FailureMessage)
conditions.MarkFalse(t.Machine, clusterv1.MachineHealthCheckSuccededCondition, clusterv1.MachineHasFailure, clusterv1.ConditionSeverityWarning, "FailureMessage: %v", *t.Machine.Status.FailureMessage)
logger.V(3).Info("Target is unhealthy", "failureMessage", t.Machine.Status.FailureMessage)
return true, time.Duration(0)
}
Expand Down
72 changes: 64 additions & 8 deletions controlplane/kubeadm/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (r *KubeadmControlPlaneReconciler) SetupWithManager(mgr ctrl.Manager, optio
r.recorder = mgr.GetEventRecorderFor("kubeadm-control-plane-controller")

if r.managementCluster == nil {
r.managementCluster = &internal.Management{Client: r.Client}
r.managementCluster = &internal.Management{Client: r.Client, Log: r.Log}
}
if r.managementClusterUncached == nil {
r.managementClusterUncached = &internal.Management{Client: mgr.GetAPIReader()}
Expand Down Expand Up @@ -292,6 +292,19 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
// source ref (reason@machine/name) so the problem can be easily tracked down to its source machine.
conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef())

// Wait for machines to finish deleting before performing reconciliation to simplify subsequent logic
if controlPlane.HasDeletingMachine() {
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: deleteRequeueAfter}
}

if result, err := r.remediateUnhealthy(ctx, logger, controlPlane); err != nil || !result.IsZero() {
return result, err
}

if result, err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() {
return result, err
}

// Control plane machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
needRollout := controlPlane.MachinesNeedingRollout()
switch {
Expand Down Expand Up @@ -425,6 +438,10 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(o handler.M
// It removes any etcd members that do not have a corresponding node.
// Also, as a final step, checks if there is any machines that is being deleted.
func (r *KubeadmControlPlaneReconciler) reconcileHealth(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
if controlPlane.Machines.Len() == 0 {
benmoss marked this conversation as resolved.
Show resolved Hide resolved
return ctrl.Result{}, nil
}

logger := controlPlane.Logger()

// Do a health check of the Control Plane components
Expand Down Expand Up @@ -453,13 +470,6 @@ func (r *KubeadmControlPlaneReconciler) reconcileHealth(ctx context.Context, clu
return ctrl.Result{RequeueAfter: healthCheckFailedRequeueAfter}, nil
}

// We need this check for scale up as well as down to avoid scaling up when there is a machine being deleted.
// This should be at the end of this method as no need to wait for machine to be completely deleted to reconcile etcd.
// TODO: Revisit during machine remediation implementation which may need to cover other machine phases.
if controlPlane.HasDeletingMachine() {
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -574,3 +584,49 @@ func (r *KubeadmControlPlaneReconciler) adoptOwnedSecrets(ctx context.Context, k

return nil
}

func (r *KubeadmControlPlaneReconciler) remediateUnhealthy(ctx context.Context, logger logr.Logger, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
// No remediation while cluster is below remediation threshold, or if the cluster is scaling up
if !controlPlane.RemediationAllowed() || !controlPlane.HasUnhealthyMachine() {
benmoss marked this conversation as resolved.
Show resolved Hide resolved
return ctrl.Result{}, nil
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(controlPlane.Cluster))
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to create workload cluster client")
}

etcdStatus, err := workloadCluster.EtcdStatus(ctx)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to fetch etcd status")
}

machine := controlPlane.UnhealthyMachines().Oldest()
benmoss marked this conversation as resolved.
Show resolved Hide resolved

if etcdStatus.FailureTolerance(machine) <= 0 {
logger.Info("cluster has no failure tolerance, skipping remediation")
return ctrl.Result{}, nil
}

logger.Info("remediating unhealthy machine",
"machine", machine.Name,
"reason", conditions.GetReason(machine, clusterv1.MachineHealthCheckSuccededCondition),
"message", conditions.GetMessage(machine, clusterv1.MachineHealthCheckSuccededCondition),
)

if err := r.Client.Delete(ctx, machine); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to delete unhealthy machine")
}

patchHelper, err := patch.NewHelper(machine, r.Client)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to initialize patch helper")
}

conditions.MarkTrue(machine, clusterv1.MachineOwnerRemediatedCondition)
if err := patchHelper.Patch(ctx, machine); err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to patch unhealthy machine")
}

return ctrl.Result{}, nil
}
9 changes: 0 additions & 9 deletions controlplane/kubeadm/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
logger := controlPlane.Logger()

// reconcileHealth returns err if there is a machine being delete which is a required condition to check before scaling up
benmoss marked this conversation as resolved.
Show resolved Hide resolved
if result, err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() {
return result, err
}

// Create the bootstrap configuration
bootstrapSpec := controlPlane.JoinControlPlaneConfig()
fd := controlPlane.NextFailureDomainForScaleUp()
Expand All @@ -90,10 +85,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
) (ctrl.Result, error) {
logger := controlPlane.Logger()

if result, err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() {
return result, err
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "Failed to create client to workload cluster")
Expand Down
67 changes: 1 addition & 66 deletions controlplane/kubeadm/controllers/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestKubeadmControlPlaneReconciler_initializeControlPlane(t *testing.T) {
}

func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
t.Run("creates a control plane Machine if health checks pass", func(t *testing.T) {
t.Run("creates a control plane Machine", func(t *testing.T) {
g := NewWithT(t)

cluster, kcp, genericMachineTemplate := createClusterWithControlPlane()
Expand Down Expand Up @@ -122,71 +122,6 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
g.Expect(fakeClient.List(context.Background(), &controlPlaneMachines)).To(Succeed())
g.Expect(controlPlaneMachines.Items).To(HaveLen(3))
})
t.Run("does not create a control plane Machine if health checks fail", func(t *testing.T) {
cluster, kcp, genericMachineTemplate := createClusterWithControlPlane()
initObjs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), genericMachineTemplate.DeepCopy()}

beforeMachines := internal.NewFilterableMachineCollection()
for i := 0; i < 2; i++ {
m, _ := createMachineNodePair(fmt.Sprintf("test-%d", i), cluster.DeepCopy(), kcp.DeepCopy(), true)
beforeMachines.Insert(m)
initObjs = append(initObjs, m.DeepCopy())
}

testCases := []struct {
name string
etcdUnHealthy bool
controlPlaneUnHealthy bool
}{
{
name: "etcd health check fails",
etcdUnHealthy: true,
},
{
name: "controlplane component health check fails",
controlPlaneUnHealthy: true,
},
}
for _, tc := range testCases {
g := NewWithT(t)

fakeClient := newFakeClient(g, initObjs...)
fmc := &fakeManagementCluster{
Machines: beforeMachines.DeepCopy(),
ControlPlaneHealthy: !tc.controlPlaneUnHealthy,
EtcdHealthy: !tc.etcdUnHealthy,
}

r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
managementCluster: fmc,
managementClusterUncached: fmc,
Log: log.Log,
recorder: record.NewFakeRecorder(32),
}
controlPlane := &internal.ControlPlane{
KCP: kcp,
Cluster: cluster,
Machines: beforeMachines,
}

result, err := r.scaleUpControlPlane(context.Background(), cluster.DeepCopy(), kcp.DeepCopy(), controlPlane)
g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: healthCheckFailedRequeueAfter}))
g.Expect(err).To(BeNil())

controlPlaneMachines := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), controlPlaneMachines)).To(Succeed())
g.Expect(controlPlaneMachines.Items).To(HaveLen(len(beforeMachines)))

endMachines := internal.NewFilterableMachineCollectionFromMachineList(controlPlaneMachines)
for _, m := range endMachines {
bm, ok := beforeMachines[m.Name]
bm.SetResourceVersion("1")
g.Expect(ok).To(BeTrue())
g.Expect(m).To(Equal(bm))
}
}
})
}

func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.T) {
Expand Down
9 changes: 0 additions & 9 deletions controlplane/kubeadm/controllers/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,6 @@ func TestKubeadmControlPlaneReconciler_upgradeControlPlane(t *testing.T) {
bothMachines := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), bothMachines, client.InNamespace(cluster.Namespace))).To(Succeed())
g.Expect(bothMachines.Items).To(HaveLen(2))

// run upgrade a second time, simulate that the node has not appeared yet but the machine exists
r.managementCluster.(*fakeManagementCluster).ControlPlaneHealthy = false
result, err = r.upgradeControlPlane(context.Background(), cluster, kcp, controlPlane, needingUpgrade)
g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: healthCheckFailedRequeueAfter}))
g.Expect(err).To(BeNil())
g.Expect(fakeClient.List(context.Background(), bothMachines, client.InNamespace(cluster.Namespace))).To(Succeed())
g.Expect(bothMachines.Items).To(HaveLen(2))

controlPlane.Machines = internal.NewFilterableMachineCollectionFromMachineList(bothMachines)

// manually increase number of nodes, make control plane healthy again
Expand Down
3 changes: 3 additions & 0 deletions controlplane/kubeadm/internal/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -50,6 +51,7 @@ type ManagementCluster interface {
// Management holds operations on the management cluster.
type Management struct {
Client ctrlclient.Reader
Log logr.Logger
}

// RemoteClusterConnectionError represents a failure to connect to a remote cluster
Expand Down Expand Up @@ -137,6 +139,7 @@ func (m *Management) GetWorkloadCluster(ctx context.Context, clusterKey client.O
restConfig: restConfig,
tlsConfig: cfg,
},
Log: m.Log,
}, nil
}

Expand Down
21 changes: 21 additions & 0 deletions controlplane/kubeadm/internal/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

// MachineHealthCheck remediation is only supported on clusters with >= 3 machines to avoid disrupting etcd consensus
const minimumClusterSizeForRemediation = 3

// ControlPlane holds business logic around control planes.
// It should never need to connect to a service, that responsibility lies outside of this struct.
// Going forward we should be trying to add more logic to here and reduce the amount of logic in the reconciler.
Expand Down Expand Up @@ -244,6 +247,24 @@ func (c *ControlPlane) UpToDateMachines() FilterableMachineCollection {
return c.Machines.Difference(c.MachinesNeedingRollout())
}

// RemediationAllowed returns whether the cluster is large enough to support MHC remediation and has reached the desired number of replicas.
// Clusters < minimumClusterSizeForRemediation do not have sufficient etcd failure tolerance.
// We check to ensure that we have finished scaling up to avoid multiple remediations from happening before replacement machines have been created.
// This has the downside that KCP may get stuck if it encounters a failed machine during a scale-up operation,
// since remediation will not be allowed and KCP's own health checks will prevent it from reaching the full number of replicas.
func (c *ControlPlane) RemediationAllowed() bool {
return c.Machines.Len() >= minimumClusterSizeForRemediation && c.Machines.Len() >= int(*c.KCP.Spec.Replicas)
}

// UnhealthyMachines returns the machines that need remediation.
func (c *ControlPlane) UnhealthyMachines() FilterableMachineCollection {
return c.Machines.Filter(machinefilters.NeedsRemediation)
}

func (c *ControlPlane) HasUnhealthyMachine() bool {
return c.UnhealthyMachines().Len() > 0
}

// getInfraResources fetches the external infrastructure resource for each machine in the collection and returns a map of machine.Name -> infraResource.
func getInfraResources(ctx context.Context, cl client.Client, machines FilterableMachineCollection) (map[string]*unstructured.Unstructured, error) {
result := map[string]*unstructured.Unstructured{}
Expand Down
31 changes: 31 additions & 0 deletions controlplane/kubeadm/internal/control_plane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1alpha3"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3"
"sigs.k8s.io/cluster-api/util/conditions"
)

func TestControlPlane(t *testing.T) {
Expand Down Expand Up @@ -106,6 +107,18 @@ var _ = Describe("Control Plane", func() {
})
})

func TestUnhealthyMachines(t *testing.T) {
g := NewWithT(t)
kcp := &controlplanev1.KubeadmControlPlane{}
cp := &ControlPlane{KCP: kcp}
cp.Machines = NewFilterableMachineCollection(
machine("1", withNeedsRemediationCondition()),
machine("2", withNodeRef(), withInfrastructureReady()),
machine("3"),
)
g.Expect(cp.UnhealthyMachines().Names()).To(ConsistOf("1"))
}

func failureDomain(controlPlane bool) clusterv1.FailureDomainSpec {
return clusterv1.FailureDomainSpec{
ControlPlane: controlPlane,
Expand All @@ -117,3 +130,21 @@ func withFailureDomain(fd string) machineOpt {
m.Spec.FailureDomain = &fd
}
}

func withNeedsRemediationCondition() machineOpt {
return func(m *clusterv1.Machine) {
conditions.MarkFalse(m, clusterv1.MachineOwnerRemediatedCondition, "some reason", "some severity", "")
}
}

func withNodeRef() machineOpt {
return func(m *clusterv1.Machine) {
m.Status.NodeRef = &corev1.ObjectReference{}
}
}

func withInfrastructureReady() machineOpt {
return func(m *clusterv1.Machine) {
m.Status.InfrastructureReady = true
}
}
17 changes: 17 additions & 0 deletions controlplane/kubeadm/internal/machinefilters/machine_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,20 @@ func cleanupConfigFields(kcpConfig *bootstrapv1.KubeadmConfigSpec, machineConfig
machineConfig.Spec.JoinConfiguration.TypeMeta = kcpConfig.JoinConfiguration.TypeMeta
}
}

// NeedsRemediation returns whether the machine has the
// MachineOwnerRemediatedCondition set to false.
func NeedsRemediation(m *clusterv1.Machine) bool {
return conditions.IsFalse(m, clusterv1.MachineOwnerRemediatedCondition)
benmoss marked this conversation as resolved.
Show resolved Hide resolved
}

// IsProvisioning returns whether the machine is missing its NodeRef or does
// not have InfrastructureReady set to true.
func IsProvisioning(m *clusterv1.Machine) bool {
return m.Status.NodeRef == nil || !m.Status.InfrastructureReady
}

// IsFailed returns whether the machine has a FailureMessage or a FailureReason.
func IsFailed(m *clusterv1.Machine) bool {
return m.Status.FailureMessage != nil || m.Status.FailureReason != nil
}
3 changes: 3 additions & 0 deletions controlplane/kubeadm/internal/workload_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"time"

"github.com/blang/semver"
"github.com/go-logr/logr"
"github.com/pkg/errors"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -59,6 +60,7 @@ type WorkloadCluster interface {
ClusterStatus(ctx context.Context) (ClusterStatus, error)
ControlPlaneIsHealthy(ctx context.Context) (HealthCheckResult, error)
EtcdIsHealthy(ctx context.Context) (HealthCheckResult, error)
EtcdStatus(ctx context.Context) (EtcdStatus, error)

// Upgrade related tasks.
ReconcileKubeletRBACBinding(ctx context.Context, version semver.Version) error
Expand All @@ -83,6 +85,7 @@ type WorkloadCluster interface {
type Workload struct {
Client ctrlclient.Client
CoreDNSMigrator coreDNSMigrator
Log logr.Logger
etcdClientGenerator etcdClientFor
}

Expand Down
Loading