Skip to content

Commit

Permalink
Add MHC remediation to KCP
Browse files Browse the repository at this point in the history
  • Loading branch information
Ben Moss committed Jun 24, 2020
1 parent 3a4b104 commit c97f63a
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 101 deletions.
14 changes: 12 additions & 2 deletions controlplane/kubeadm/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,10 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *

controlPlane := internal.NewControlPlane(cluster, kcp, ownedMachines)

if controlPlane.HasDeletingMachine() || controlPlane.ProvisioningMachines().Len() > 0 {
return ctrl.Result{}, nil
}

// Aggregate the operational state of all the machines; while aggregating we are adding the
// source ref (reason@machine/name) so the problem can be easily tracked down to its source machine.
conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef())
Expand All @@ -305,6 +309,12 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
numMachines := len(ownedMachines)
desiredReplicas := int(*kcp.Spec.Replicas)

if numMachines > 0 && controlPlane.UnhealthyMachines().Len() == 0 {
if err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil {
return ctrl.Result{}, err
}
}

switch {
// We are creating the first replica
case numMachines < desiredReplicas && numMachines == 0:
Expand All @@ -317,8 +327,8 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
// Create a new Machine w/ join
logger.Info("Scaling up control plane", "Desired", desiredReplicas, "Existing", numMachines)
return r.scaleUpControlPlane(ctx, cluster, kcp, controlPlane)
// We are scaling down
case numMachines > desiredReplicas:
// We are scaling down
case numMachines > desiredReplicas || controlPlane.UnhealthyMachines().Len() > 0:
logger.Info("Scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines)
return r.scaleDownControlPlane(ctx, cluster, kcp, controlPlane)
}
Expand Down
30 changes: 8 additions & 22 deletions controlplane/kubeadm/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/machinefilters"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util"
ctrl "sigs.k8s.io/controller-runtime"
)
Expand Down Expand Up @@ -63,11 +62,6 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
logger := controlPlane.Logger()

// reconcileHealth returns err if there is a machine being delete which is a required condition to check before scaling up
if err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil {
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

// Create the bootstrap configuration
bootstrapSpec := controlPlane.JoinControlPlaneConfig()
fd := controlPlane.FailureDomainWithFewestMachines()
Expand All @@ -89,10 +83,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
) (ctrl.Result, error) {
logger := controlPlane.Logger()

if err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil {
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "Failed to create client to workload cluster")
Expand All @@ -109,6 +99,11 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{}, errors.New("failed to pick control plane Machine to delete")
}

if err := workloadCluster.RemoveMachineFromKubeadmConfigMap(ctx, machineToDelete); err != nil {
logger.Error(err, "Failed to remove machine from kubeadm ConfigMap")
return ctrl.Result{}, err
}

// If etcd leadership is on machine that is about to be deleted, move it to the newest member available.
etcdLeaderCandidate := controlPlane.Machines.Newest()
if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete, etcdLeaderCandidate); err != nil {
Expand All @@ -120,18 +115,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{}, err
}

if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
logger.V(2).Info("Waiting for control plane to pass control plane health check before removing a control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass control plane health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}

}
if err := workloadCluster.RemoveMachineFromKubeadmConfigMap(ctx, machineToDelete); err != nil {
logger.Error(err, "Failed to remove machine from kubeadm ConfigMap")
return ctrl.Result{}, err
}

logger = logger.WithValues("machine", machineToDelete)
if err := r.Client.Delete(ctx, machineToDelete); err != nil && !apierrors.IsNotFound(err) {
logger.Error(err, "Failed to delete control plane machine")
Expand All @@ -149,5 +132,8 @@ func selectMachineForScaleDown(controlPlane *internal.ControlPlane) (*clusterv1.
if needingUpgrade := controlPlane.MachinesNeedingRollout(); needingUpgrade.Len() > 0 {
machines = needingUpgrade
}
if unhealthy := controlPlane.UnhealthyMachines(); unhealthy.Len() > 0 {
machines = unhealthy
}
return controlPlane.MachineInFailureDomainWithMostMachines(machines)
}
68 changes: 1 addition & 67 deletions controlplane/kubeadm/controllers/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/hash"
capierrors "sigs.k8s.io/cluster-api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -84,7 +83,7 @@ func TestKubeadmControlPlaneReconciler_initializeControlPlane(t *testing.T) {
}

func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
t.Run("creates a control plane Machine if health checks pass", func(t *testing.T) {
t.Run("creates a control plane Machine", func(t *testing.T) {
g := NewWithT(t)

cluster, kcp, genericMachineTemplate := createClusterWithControlPlane()
Expand Down Expand Up @@ -125,71 +124,6 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
g.Expect(fakeClient.List(context.Background(), &controlPlaneMachines)).To(Succeed())
g.Expect(controlPlaneMachines.Items).To(HaveLen(3))
})
t.Run("does not create a control plane Machine if health checks fail", func(t *testing.T) {
cluster, kcp, genericMachineTemplate := createClusterWithControlPlane()
initObjs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), genericMachineTemplate.DeepCopy()}

beforeMachines := internal.NewFilterableMachineCollection()
for i := 0; i < 2; i++ {
m, _ := createMachineNodePair(fmt.Sprintf("test-%d", i), cluster.DeepCopy(), kcp.DeepCopy(), true)
beforeMachines = beforeMachines.Insert(m)
initObjs = append(initObjs, m.DeepCopy())
}

testCases := []struct {
name string
etcdUnHealthy bool
controlPlaneUnHealthy bool
}{
{
name: "etcd health check fails",
etcdUnHealthy: true,
},
{
name: "controlplane component health check fails",
controlPlaneUnHealthy: true,
},
}
for _, tc := range testCases {
g := NewWithT(t)

fakeClient := newFakeClient(g, initObjs...)
fmc := &fakeManagementCluster{
Machines: beforeMachines.DeepCopy(),
ControlPlaneHealthy: !tc.controlPlaneUnHealthy,
EtcdHealthy: !tc.etcdUnHealthy,
}

r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
managementCluster: fmc,
managementClusterUncached: fmc,
Log: log.Log,
recorder: record.NewFakeRecorder(32),
}
controlPlane := &internal.ControlPlane{
KCP: kcp,
Cluster: cluster,
Machines: beforeMachines,
}

_, err := r.scaleUpControlPlane(context.Background(), cluster.DeepCopy(), kcp.DeepCopy(), controlPlane)
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError(&capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}))

controlPlaneMachines := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), controlPlaneMachines)).To(Succeed())
g.Expect(controlPlaneMachines.Items).To(HaveLen(len(beforeMachines)))

endMachines := internal.NewFilterableMachineCollectionFromMachineList(controlPlaneMachines)
for _, m := range endMachines {
bm, ok := beforeMachines[m.Name]
bm.SetResourceVersion("1")
g.Expect(ok).To(BeTrue())
g.Expect(m).To(Equal(bm))
}
}
})
}

func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.T) {
Expand Down
13 changes: 13 additions & 0 deletions controlplane/kubeadm/controllers/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"fmt"

"github.com/blang/semver"
"github.com/pkg/errors"
Expand All @@ -41,58 +42,70 @@ func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(
workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "failed to get remote client for workload cluster", "cluster key", util.ObjectKey(cluster))
fmt.Println(0)
return ctrl.Result{}, err
}

parsedVersion, err := semver.ParseTolerant(kcp.Spec.Version)
if err != nil {
fmt.Println(1)
return ctrl.Result{}, errors.Wrapf(err, "failed to parse kubernetes version %q", kcp.Spec.Version)
}

if err := workloadCluster.ReconcileKubeletRBACRole(ctx, parsedVersion); err != nil {
fmt.Println(2)
return ctrl.Result{}, errors.Wrap(err, "failed to reconcile the remote kubelet RBAC role")
}

if err := workloadCluster.ReconcileKubeletRBACBinding(ctx, parsedVersion); err != nil {
fmt.Println(3)
return ctrl.Result{}, errors.Wrap(err, "failed to reconcile the remote kubelet RBAC binding")
}

// Ensure kubeadm cluster role & bindings for v1.18+
// as per https://github.com/kubernetes/kubernetes/commit/b117a928a6c3f650931bdac02a41fca6680548c4
if err := workloadCluster.AllowBootstrapTokensToGetNodes(ctx); err != nil {
fmt.Println(4)
return ctrl.Result{}, errors.Wrap(err, "failed to set role and role binding for kubeadm")
}

if err := workloadCluster.UpdateKubernetesVersionInKubeadmConfigMap(ctx, parsedVersion); err != nil {
fmt.Println(5)
return ctrl.Result{}, errors.Wrap(err, "failed to update the kubernetes version in the kubeadm config map")
}

if kcp.Spec.KubeadmConfigSpec.ClusterConfiguration != nil {
imageRepository := kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.ImageRepository
if err := workloadCluster.UpdateImageRepositoryInKubeadmConfigMap(ctx, imageRepository); err != nil {
fmt.Println(6)
return ctrl.Result{}, errors.Wrap(err, "failed to update the image repository in the kubeadm config map")
}
}

if kcp.Spec.KubeadmConfigSpec.ClusterConfiguration != nil && kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.Local != nil {
meta := kcp.Spec.KubeadmConfigSpec.ClusterConfiguration.Etcd.Local.ImageMeta
if err := workloadCluster.UpdateEtcdVersionInKubeadmConfigMap(ctx, meta.ImageRepository, meta.ImageTag); err != nil {
fmt.Println(7)
return ctrl.Result{}, errors.Wrap(err, "failed to update the etcd version in the kubeadm config map")
}
}

if err := workloadCluster.UpdateKubeletConfigMap(ctx, parsedVersion); err != nil {
fmt.Println(8)
return ctrl.Result{}, errors.Wrap(err, "failed to upgrade kubelet config map")
}

status, err := workloadCluster.ClusterStatus(ctx)
if err != nil {
fmt.Println(9)
return ctrl.Result{}, err
}

if status.Nodes <= *kcp.Spec.Replicas {
// scaleUp ensures that we don't continue scaling up while waiting for Machines to have NodeRefs
fmt.Println(10)
return r.scaleUpControlPlane(ctx, cluster, kcp, controlPlane)
}
fmt.Println(11)
return r.scaleDownControlPlane(ctx, cluster, kcp, controlPlane)
}
9 changes: 0 additions & 9 deletions controlplane/kubeadm/controllers/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"k8s.io/utils/pointer"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
capierrors "sigs.k8s.io/cluster-api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
Expand Down Expand Up @@ -87,14 +86,6 @@ func TestKubeadmControlPlaneReconciler_upgradeControlPlane(t *testing.T) {
bothMachines := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), bothMachines, client.InNamespace(cluster.Namespace))).To(Succeed())
g.Expect(bothMachines.Items).To(HaveLen(2))

// run upgrade a second time, simulate that the node has not appeared yet but the machine exists
r.managementCluster.(*fakeManagementCluster).ControlPlaneHealthy = false
_, err = r.upgradeControlPlane(context.Background(), cluster, kcp, controlPlane)
g.Expect(err).To(Equal(&capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}))
g.Expect(fakeClient.List(context.Background(), bothMachines, client.InNamespace(cluster.Namespace))).To(Succeed())
g.Expect(bothMachines.Items).To(HaveLen(2))

controlPlane.Machines = internal.NewFilterableMachineCollectionFromMachineList(bothMachines)

// manually increase number of nodes, make control plane healthy again
Expand Down
27 changes: 26 additions & 1 deletion controlplane/kubeadm/internal/control_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ limitations under the License.
package internal

import (
"errors"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/storage/names"
Expand All @@ -29,6 +30,9 @@ import (
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/machinefilters"
)

// MachineHealthCheck remediation is only supported on clusters with >= 3 machines to avoid disrupting etcd consensus
const minimumClusterSizeForRemediation = 3

// ControlPlane holds business logic around control planes.
// It should never need to connect to a service, that responsibility lies outside of this struct.
type ControlPlane struct {
Expand Down Expand Up @@ -222,3 +226,24 @@ func (c *ControlPlane) NeedsReplacementNode() bool {
func (c *ControlPlane) HasDeletingMachine() bool {
return len(c.Machines.Filter(machinefilters.HasDeletionTimestamp)) > 0
}

// ProvisioningMachines returns machines that are still booting. In the case
// of 3 node or larger clusters, it excludes unhealthy machines.
func (c *ControlPlane) ProvisioningMachines() FilterableMachineCollection {
machines := c.Machines.Filter(machinefilters.IsProvisioning).
Filter(machinefilters.Not(machinefilters.IsFailed))

if c.Machines.Len() < minimumClusterSizeForRemediation {
return machines
}
return machines.Filter(machinefilters.Not(machinefilters.NeedsRemediation))
}

// UnhealthyMachines returns the machines that need remediation. If cluster
// size is less than 3, will return an empty list.
func (c *ControlPlane) UnhealthyMachines() FilterableMachineCollection {
if c.Machines.Len() < minimumClusterSizeForRemediation {
return nil
}
return c.Machines.Filter(machinefilters.NeedsRemediation)
}
Loading

0 comments on commit c97f63a

Please sign in to comment.