Skip to content

Commit

Permalink
Refactor controlplane health check in KCP
Browse files Browse the repository at this point in the history
  • Loading branch information
Sedef committed Oct 15, 2020
1 parent af66309 commit 0d07944
Show file tree
Hide file tree
Showing 12 changed files with 241 additions and 182 deletions.
105 changes: 84 additions & 21 deletions controlplane/kubeadm/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/tools/record"
"k8s.io/utils/pointer"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"

clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1alpha3"
kubeadmv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/types/v1beta1"
Expand All @@ -51,6 +44,12 @@ import (
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
"sigs.k8s.io/cluster-api/util/secret"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch
Expand Down Expand Up @@ -289,21 +288,25 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
return ctrl.Result{}, err
}

ownedMachines := controlPlaneMachines.Filter(machinefilters.OwnedMachines(kcp))
if len(ownedMachines) != len(controlPlaneMachines) {
logger.Info("Not all control plane machines are owned by this KubeadmControlPlane, refusing to operate in mixed management mode")
return ctrl.Result{}, nil
}

controlPlane, err := internal.NewControlPlane(ctx, r.Client, cluster, kcp, ownedMachines)
controlPlane, err := r.createControlPlane(ctx, cluster, kcp)
if err != nil {
logger.Error(err, "failed to initialize control plane")
return ctrl.Result{}, err
}
if len(controlPlane.Machines) != len(controlPlaneMachines) {
logger.Info("Not all control plane machines are owned by this KubeadmControlPlane, refusing to operate in mixed management mode")
return ctrl.Result{}, nil
}

// Aggregate the operational state of all the machines; while aggregating we are adding the
// source ref (reason@machine/name) so the problem can be easily tracked down to its source machine.
conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, ownedMachines.ConditionGetters(), conditions.AddSourceRef())
conditions.SetAggregate(controlPlane.KCP, controlplanev1.MachinesReadyCondition, controlPlane.Machines.ConditionGetters(), conditions.AddSourceRef())

// reconcileControlPlaneHealth returns err if there is a machine being delete or control plane is unhealthy.
// If control plane is not initialized, then control-plane machines will be empty and hence health check will not fail.
if result, err := r.reconcileControlPlaneHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() {
return result, err
}

// Control plane machines rollout due to configuration changes (e.g. upgrades) takes precedence over other operations.
needRollout := controlPlane.MachinesNeedingRollout()
Expand All @@ -324,7 +327,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
}

// If we've made it this far, we can assume that all ownedMachines are up to date
numMachines := len(ownedMachines)
numMachines := len(controlPlane.Machines)
desiredReplicas := int(*kcp.Spec.Replicas)

switch {
Expand Down Expand Up @@ -372,13 +375,51 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
return ctrl.Result{}, nil
}

func (r *KubeadmControlPlaneReconciler) createControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) (*internal.ControlPlane, error) {
logger := r.Log.WithValues("namespace", kcp.Namespace, "kubeadmControlPlane", kcp.Name, "cluster", cluster.Name)

controlPlaneMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), machinefilters.ControlPlaneMachines(cluster.Name))
if err != nil {
logger.Error(err, "failed to retrieve control plane machines for cluster")
return nil, err
}
ownedMachines := controlPlaneMachines.Filter(machinefilters.OwnedMachines(kcp))

controlPlane, err := internal.NewControlPlane(ctx, r.Client, cluster, kcp, ownedMachines)
if err != nil {
logger.Error(err, "failed to initialize control plane")
return nil, err
}
return controlPlane, nil
}

// reconcileDelete handles KubeadmControlPlane deletion.
// The implementation does not take non-control plane workloads into consideration. This may or may not change in the future.
// Please see https://github.com/kubernetes-sigs/cluster-api/issues/2064.
func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane) (ctrl.Result, error) {
logger := r.Log.WithValues("namespace", kcp.Namespace, "kubeadmControlPlane", kcp.Name, "cluster", cluster.Name)
logger.Info("Reconcile KubeadmControlPlane deletion")

controlPlane, err := r.createControlPlane(ctx, cluster, kcp)
if err != nil {
logger.Error(err, "failed to initialize control plane")
return ctrl.Result{}, err
}

// Ignore the health check results here as well as the errors, health check functions are to set health related conditions on Machines.
// Errors may be due to not being able to get workload cluster nodes.
_, err = r.managementCluster.TargetClusterControlPlaneHealthCheck(ctx, controlPlane, util.ObjectKey(cluster))
if err != nil {
// Do nothing
r.Log.Info("Control plane did not pass control plane health check during delete reconciliation", "err", err.Error())
}
_, err = r.managementCluster.TargetClusterEtcdHealthCheck(ctx, controlPlane, util.ObjectKey(cluster))
if err != nil {
// Do nothing
r.Log.Info("Control plane did not pass etcd health check during delete reconciliation", "err", err.Error())
}

// Gets all machines, not just control plane machines.
allMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster))
if err != nil {
return ctrl.Result{}, err
Expand Down Expand Up @@ -442,21 +483,43 @@ func (r *KubeadmControlPlaneReconciler) ClusterToKubeadmControlPlane(o handler.M
return nil
}

// reconcileHealth performs health checks for control plane components and etcd
// reconcileControlPlaneHealth performs health checks for control plane components and etcd
// It removes any etcd members that do not have a corresponding node.
// Also, as a final step, checks if there is any machines that is being deleted.
func (r *KubeadmControlPlaneReconciler) reconcileHealth(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
func (r *KubeadmControlPlaneReconciler) reconcileControlPlaneHealth(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
logger := r.Log.WithValues("namespace", kcp.Namespace, "kubeadmControlPlane", kcp.Name)

// If there is no KCP-owned control-plane machines, then control-plane has not been initialized yet.
if controlPlane.Machines.Len() == 0 {
return ctrl.Result{}, nil
}

for i := range controlPlane.Machines {
m := controlPlane.Machines[i]
// Initialize the patch helper.
patchHelper, err := patch.NewHelper(m, r.Client)
if err != nil {
return ctrl.Result{}, err
}

defer func() {
// Always attempt to Patch the Machine conditions after each health reconciliation.
if err := patchHelper.Patch(ctx, m); err != nil {
logger.Error(err, "Failed to patch KubeadmControlPlane Machine", "machine", m.Name)
}
}()
}

// Do a health check of the Control Plane components
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
if err := r.managementCluster.TargetClusterAnalyseControlPlaneHealth(ctx, controlPlane, util.ObjectKey(cluster)); err != nil {
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass control plane health check to continue reconciliation: %v", err)
return ctrl.Result{RequeueAfter: healthCheckFailedRequeueAfter}, nil
return ctrl.Result{}, errors.Wrap(err, "failed to pass control-plane health check")
}

// If KCP should manage etcd, ensure etcd is healthy.
if controlPlane.IsEtcdManaged() {
if err := r.managementCluster.TargetClusterEtcdIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
if err := r.managementCluster.TargetClusterAnalyseEtcdHealth(ctx, controlPlane, util.ObjectKey(cluster)); err != nil {
errList := []error{errors.Wrap(err, "failed to pass etcd health check")}
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass etcd health check to continue reconciliation: %v", err)
Expand Down
4 changes: 2 additions & 2 deletions controlplane/kubeadm/controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
g := NewWithT(t)

cluster, kcp, tmpl := createClusterWithControlPlane()
cluster.Spec.ControlPlaneEndpoint.Host = "nodomain.example.com"
cluster.Spec.ControlPlaneEndpoint.Host = "nodomain.example.com1"
cluster.Spec.ControlPlaneEndpoint.Port = 6443
kcp.Spec.Version = version

Expand Down Expand Up @@ -642,7 +642,7 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
g := NewWithT(t)

cluster, kcp, tmpl := createClusterWithControlPlane()
cluster.Spec.ControlPlaneEndpoint.Host = "nodomain.example.com"
cluster.Spec.ControlPlaneEndpoint.Host = "nodomain.example.com2"
cluster.Spec.ControlPlaneEndpoint.Port = 6443
kcp.Spec.Version = "v1.17.0"

Expand Down
17 changes: 15 additions & 2 deletions controlplane/kubeadm/controllers/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,32 @@ func (f *fakeManagementCluster) GetMachinesForCluster(c context.Context, n clien
return f.Machines, nil
}

func (f *fakeManagementCluster) TargetClusterControlPlaneIsHealthy(_ context.Context, _ client.ObjectKey) error {
func (f *fakeManagementCluster) TargetClusterAnalyseControlPlaneHealth(_ context.Context, _ *internal.ControlPlane, _ client.ObjectKey) error {
if !f.ControlPlaneHealthy {
return errors.New("control plane is not healthy")
}
return nil
}

func (f *fakeManagementCluster) TargetClusterEtcdIsHealthy(_ context.Context, _ client.ObjectKey) error {
func (f *fakeManagementCluster) TargetClusterAnalyseEtcdHealth(_ context.Context, _ *internal.ControlPlane, _ client.ObjectKey) error {
if !f.EtcdHealthy {
return errors.New("etcd is not healthy")
}
return nil
}
func (f *fakeManagementCluster) TargetClusterEtcdHealthCheck(_ context.Context, _ *internal.ControlPlane, _ client.ObjectKey) (internal.HealthCheckResult, error) {
if !f.EtcdHealthy {
return nil, errors.New("etcd is not healthy")
}
return nil, nil
}

func (f *fakeManagementCluster) TargetClusterControlPlaneHealthCheck(_ context.Context, _ *internal.ControlPlane, _ client.ObjectKey) (internal.HealthCheckResult, error) {
if !f.ControlPlaneHealthy {
return nil, errors.New("control plane is not healthy")
}
return nil, nil
}

type fakeWorkloadCluster struct {
*internal.Workload
Expand Down
12 changes: 2 additions & 10 deletions controlplane/kubeadm/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
logger := controlPlane.Logger()

// reconcileHealth returns err if there is a machine being delete which is a required condition to check before scaling up
if result, err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() {
return result, err
}

// Create the bootstrap configuration
bootstrapSpec := controlPlane.JoinControlPlaneConfig()
fd := controlPlane.NextFailureDomainForScaleUp()
Expand All @@ -90,10 +85,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
) (ctrl.Result, error) {
logger := controlPlane.Logger()

if result, err := r.reconcileHealth(ctx, cluster, kcp, controlPlane); err != nil || !result.IsZero() {
return result, err
}

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "Failed to create client to workload cluster")
Expand Down Expand Up @@ -123,7 +114,8 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
}
}

if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster)); err != nil {
// TODO: check if this is needed after moving the health check to the main reconcile
if err := r.managementCluster.TargetClusterAnalyseControlPlaneHealth(ctx, controlPlane, util.ObjectKey(cluster)); err != nil {
logger.V(2).Info("Waiting for control plane to pass control plane health check before removing a control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass control plane health check before removing a control plane machine: %v", err)
Expand Down
15 changes: 5 additions & 10 deletions controlplane/kubeadm/controllers/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,8 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
})
t.Run("does not create a control plane Machine if health checks fail", func(t *testing.T) {
cluster, kcp, genericMachineTemplate := createClusterWithControlPlane()
cluster.Spec.ControlPlaneEndpoint.Host = "nodomain.example.com"
cluster.Spec.ControlPlaneEndpoint.Port = 6443
initObjs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), genericMachineTemplate.DeepCopy()}

beforeMachines := internal.NewFilterableMachineCollection()
Expand Down Expand Up @@ -170,18 +172,11 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
Log: log.Log,
recorder: record.NewFakeRecorder(32),
}
controlPlane := &internal.ControlPlane{
KCP: kcp,
Cluster: cluster,
Machines: beforeMachines,
}

result, err := r.scaleUpControlPlane(context.Background(), cluster.DeepCopy(), kcp.DeepCopy(), controlPlane)
if tc.expectErr {
g.Expect(err).To(HaveOccurred())
}
g.Expect(result).To(Equal(tc.expectResult))
_, err := r.reconcile(context.Background(), cluster, kcp)
g.Expect(err).To(HaveOccurred())

// scaleUpControlPlane is never called due to health check failure and new machine is not created to scale up.
controlPlaneMachines := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), controlPlaneMachines)).To(Succeed())
g.Expect(controlPlaneMachines.Items).To(HaveLen(len(beforeMachines)))
Expand Down
8 changes: 5 additions & 3 deletions controlplane/kubeadm/controllers/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ func TestKubeadmControlPlaneReconciler_upgradeControlPlane(t *testing.T) {
g := NewWithT(t)

cluster, kcp, genericMachineTemplate := createClusterWithControlPlane()
cluster.Spec.ControlPlaneEndpoint.Host = "nodomain.example.com"
cluster.Spec.ControlPlaneEndpoint.Port = 6443
kcp.Spec.Version = "v1.17.3"
kcp.Spec.KubeadmConfigSpec.ClusterConfiguration = nil
kcp.Spec.Replicas = pointer.Int32Ptr(1)
Expand Down Expand Up @@ -89,9 +91,9 @@ func TestKubeadmControlPlaneReconciler_upgradeControlPlane(t *testing.T) {

// run upgrade a second time, simulate that the node has not appeared yet but the machine exists
r.managementCluster.(*fakeManagementCluster).ControlPlaneHealthy = false
result, err = r.upgradeControlPlane(context.Background(), cluster, kcp, controlPlane, needingUpgrade)
g.Expect(result).To(Equal(ctrl.Result{RequeueAfter: healthCheckFailedRequeueAfter}))
g.Expect(err).To(BeNil())
// Unhealthy control plane will be detected during reconcile loop and upgrade will never be called.
_, err = r.reconcile(context.Background(), cluster, kcp)
g.Expect(err).To(HaveOccurred())
g.Expect(fakeClient.List(context.Background(), bothMachines, client.InNamespace(cluster.Namespace))).To(Succeed())
g.Expect(bothMachines.Items).To(HaveLen(2))

Expand Down
Loading

0 comments on commit 0d07944

Please sign in to comment.