Skip to content

Commit

Permalink
Remove machine annotations from KCP
Browse files Browse the repository at this point in the history
Upgrade logic no longer uses machine annotations

Logic is now:
- Check number of nodes in workload cluster
- If node count <= replicas, create new upgrade machine
- If node count > replicas, scale down

Scale up logic ensures that we don't create additional machines if we
reconcile while waiting for an upgrade machine to appear in the node
list

Scale up should only consider machines needing upgrade

We never support upgrading a subset of the cluster, but this will ensure
that we pick the FD that has the most machines needing upgrade, rather
than just the FD with the most machines.

Also add a comment to explain why scale up will not cause more than 1
machine to be created

Scale down always scales down outdated machines first

This removes the need to pass through outdated machines
  • Loading branch information
Ben Moss committed Apr 15, 2020
1 parent cb8f8e4 commit 8ff8401
Show file tree
Hide file tree
Showing 9 changed files with 210 additions and 288 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,8 @@ import (
)

const (
KubeadmControlPlaneFinalizer = "kubeadm.controlplane.cluster.x-k8s.io"
KubeadmControlPlaneHashLabelKey = "kubeadm.controlplane.cluster.x-k8s.io/hash"
SelectedForUpgradeAnnotation = "kubeadm.controlplane.cluster.x-k8s.io/selected-for-upgrade"
UpgradeReplacementCreatedAnnotation = "kubeadm.controlplane.cluster.x-k8s.io/upgrade-replacement-created"
DeleteForScaleDownAnnotation = "kubeadm.controlplane.cluster.x-k8s.io/delete-for-scale-down"
ScaleDownConfigMapEntryRemovedAnnotation = "kubeadm.controlplane.cluster.x-k8s.io/scale-down-configmap-entry-removed"
KubeadmControlPlaneFinalizer = "kubeadm.controlplane.cluster.x-k8s.io"
KubeadmControlPlaneHashLabelKey = "kubeadm.controlplane.cluster.x-k8s.io/hash"
)

// KubeadmControlPlaneSpec defines the desired state of KubeadmControlPlane.
Expand Down
6 changes: 3 additions & 3 deletions controlplane/kubeadm/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
// Upgrade takes precedence over other operations
if len(requireUpgrade) > 0 {
logger.Info("Upgrading Control Plane")
return r.upgradeControlPlane(ctx, cluster, kcp, ownedMachines, requireUpgrade, controlPlane)
return r.upgradeControlPlane(ctx, cluster, kcp, controlPlane)
}

// If we've made it this far, we can assume that all ownedMachines are up to date
Expand All @@ -231,11 +231,11 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
case numMachines < desiredReplicas && numMachines > 0:
// Create a new Machine w/ join
logger.Info("Scaling up control plane", "Desired", desiredReplicas, "Existing", numMachines)
return r.scaleUpControlPlane(ctx, cluster, kcp, ownedMachines, controlPlane)
return r.scaleUpControlPlane(ctx, cluster, kcp, controlPlane)
// We are scaling down
case numMachines > desiredReplicas:
logger.Info("Scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines)
return r.scaleDownControlPlane(ctx, cluster, kcp, ownedMachines, ownedMachines, controlPlane)
return r.scaleDownControlPlane(ctx, cluster, kcp, controlPlane)
}

// Get the workload cluster client.
Expand Down
68 changes: 28 additions & 40 deletions controlplane/kubeadm/controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package controllers
import (
"context"
"fmt"
"sync"
"testing"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand Down Expand Up @@ -920,52 +922,38 @@ func TestKubeadmControlPlaneReconciler_reconcileDelete(t *testing.T) {

}

func TestKubeadmControlPlaneReconciler_scaleDownControlPlane(t *testing.T) {
g := NewWithT(t)

machines := map[string]*clusterv1.Machine{
"one": machine("one"),
"two": machine("two"),
"three": machine("three"),
}
fd1 := "a"
fd2 := "b"
machines["one"].Spec.FailureDomain = &fd2
machines["two"].Spec.FailureDomain = &fd1
machines["three"].Spec.FailureDomain = &fd2

r := &KubeadmControlPlaneReconciler{
Log: log.Log,
recorder: record.NewFakeRecorder(32),
Client: newFakeClient(g, machines["one"]),
managementCluster: &fakeManagementCluster{
EtcdHealthy: true,
ControlPlaneHealthy: true,
},
}
cluster := newCluster(&types.NamespacedName{Name: "foo", Namespace: "default"})

kcp := &controlplanev1.KubeadmControlPlane{}
controlPlane := &internal.ControlPlane{
KCP: kcp,
Cluster: cluster,
Machines: machines,
}

ml := clusterv1.MachineList{}
ml.Items = []clusterv1.Machine{*machines["two"]}
mll := internal.NewFilterableMachineCollectionFromMachineList(&ml)
_, err := r.scaleDownControlPlane(context.Background(), cluster, kcp, machines, mll, controlPlane)
g.Expect(err).To(HaveOccurred())
}

// test utils

func newFakeClient(g *WithT, initObjs ...runtime.Object) client.Client {
g.Expect(clusterv1.AddToScheme(scheme.Scheme)).To(Succeed())
g.Expect(bootstrapv1.AddToScheme(scheme.Scheme)).To(Succeed())
g.Expect(controlplanev1.AddToScheme(scheme.Scheme)).To(Succeed())
return fake.NewFakeClientWithScheme(scheme.Scheme, initObjs...)
return &fakeClient{
startTime: time.Now(),
Client: fake.NewFakeClientWithScheme(scheme.Scheme, initObjs...),
}
}

type fakeClient struct {
startTime time.Time
mux sync.Mutex
client.Client
}

type fakeClientI interface {
SetCreationTimestamp(timestamp metav1.Time)
}

// controller-runtime's fake client doesn't set a CreationTimestamp
// this sets one that increments by a minute for each object created
func (c *fakeClient) Create(ctx context.Context, obj runtime.Object, opts ...client.CreateOption) error {
if f, ok := obj.(fakeClientI); ok {
c.mux.Lock()
c.startTime = c.startTime.Add(time.Minute)
f.SetCreationTimestamp(metav1.NewTime(c.startTime))
c.mux.Unlock()
}
return c.Client.Create(ctx, obj, opts...)
}

func createClusterWithControlPlane() (*clusterv1.Cluster, *controlplanev1.KubeadmControlPlane, *unstructured.Unstructured) {
Expand Down
20 changes: 0 additions & 20 deletions controlplane/kubeadm/controllers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,23 +229,3 @@ func (r *KubeadmControlPlaneReconciler) generateMachine(ctx context.Context, kcp
}
return nil
}

func (r *KubeadmControlPlaneReconciler) markWithAnnotationKey(ctx context.Context, machine *clusterv1.Machine, annotationKey string) error {
if machine == nil {
return errors.New("expected machine not nil")
}
patchHelper, err := patch.NewHelper(machine, r.Client)
if err != nil {
return errors.Wrapf(err, "failed to create patch helper for machine %s", machine.Name)
}

if machine.Annotations == nil {
machine.Annotations = make(map[string]string)
}
machine.Annotations[annotationKey] = ""

if err := patchHelper.Patch(ctx, machine); err != nil {
return errors.Wrapf(err, "failed to patch machine %s selected for upgrade", machine.Name)
}
return nil
}
50 changes: 22 additions & 28 deletions controlplane/kubeadm/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/machinefilters"
capierrors "sigs.k8s.io/cluster-api/errors"
"sigs.k8s.io/cluster-api/util"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -46,7 +45,7 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte
return ctrl.Result{Requeue: true}, nil
}

func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, _ internal.FilterableMachineCollection, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, controlPlane *internal.ControlPlane) (ctrl.Result, error) {
logger := controlPlane.Logger()

if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
Expand Down Expand Up @@ -78,8 +77,6 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
ctx context.Context,
cluster *clusterv1.Cluster,
kcp *controlplanev1.KubeadmControlPlane,
ownedMachines internal.FilterableMachineCollection,
selectedMachines internal.FilterableMachineCollection,
controlPlane *internal.ControlPlane,
) (ctrl.Result, error) {
logger := controlPlane.Logger()
Expand All @@ -97,17 +94,11 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: deleteRequeueAfter}
}

// If there is not already a Machine that is marked for scale down, find one and mark it
markedForDeletion := selectedMachines.Filter(machinefilters.HasAnnotationKey(controlplanev1.DeleteForScaleDownAnnotation))
if len(markedForDeletion) == 0 {
machineToMark, err := r.selectAndMarkMachine(ctx, selectedMachines, controlplanev1.DeleteForScaleDownAnnotation, controlPlane)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to select and mark machine for scale down")
}
markedForDeletion.Insert(machineToMark)
machineToDelete, err := selectMachineForScaleDown(controlPlane)
if err != nil {
return ctrl.Result{}, errors.Wrap(err, "failed to select machine for scale down")
}

machineToDelete := markedForDeletion.Oldest()
if machineToDelete == nil {
logger.Info("Failed to pick control plane Machine to delete")
return ctrl.Result{}, errors.New("failed to pick control plane Machine to delete")
Expand All @@ -121,7 +112,7 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
}
// If etcd leadership is on machine that is about to be deleted, move it to the newest member available.
etcdLeaderCandidate := ownedMachines.Newest()
etcdLeaderCandidate := controlPlane.Machines.Newest()
if err := workloadCluster.ForwardEtcdLeadership(ctx, machineToDelete, etcdLeaderCandidate); err != nil {
logger.Error(err, "Failed to move leadership to candidate machine", "candidate", etcdLeaderCandidate.Name)
return ctrl.Result{}, err
Expand All @@ -131,21 +122,16 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
return ctrl.Result{}, err
}

if !machinefilters.HasAnnotationKey(controlplanev1.ScaleDownConfigMapEntryRemovedAnnotation)(machineToDelete) {
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass control plane health check before removing a control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass control plane health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}
if err := r.managementCluster.TargetClusterControlPlaneIsHealthy(ctx, util.ObjectKey(cluster), kcp.Name); err != nil {
logger.V(2).Info("Waiting for control plane to pass control plane health check before removing a control plane machine", "cause", err)
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy",
"Waiting for control plane to pass control plane health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}

}
if err := workloadCluster.RemoveMachineFromKubeadmConfigMap(ctx, machineToDelete); err != nil {
logger.Error(err, "Failed to remove machine from kubeadm ConfigMap")
return ctrl.Result{}, err
}
if err := r.markWithAnnotationKey(ctx, machineToDelete, controlplanev1.ScaleDownConfigMapEntryRemovedAnnotation); err != nil {
return ctrl.Result{}, errors.Wrapf(err, "failed to mark machine %s as having config map entry removed", machineToDelete.Name)
}
}
if err := workloadCluster.RemoveMachineFromKubeadmConfigMap(ctx, machineToDelete); err != nil {
logger.Error(err, "Failed to remove machine from kubeadm ConfigMap")
return ctrl.Result{}, err
}

// Do a final health check of the Control Plane components prior to actually deleting the machine
Expand All @@ -166,3 +152,11 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
// Requeue the control plane, in case there are additional operations to perform
return ctrl.Result{Requeue: true}, nil
}

func selectMachineForScaleDown(controlPlane *internal.ControlPlane) (*clusterv1.Machine, error) {
machines := controlPlane.Machines
if needingUpgrade := controlPlane.MachinesNeedingUpgrade(); needingUpgrade.Len() > 0 {
machines = needingUpgrade
}
return controlPlane.MachineInFailureDomainWithMostMachines(machines)
}
110 changes: 106 additions & 4 deletions controlplane/kubeadm/controllers/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ import (
"context"
"fmt"
"testing"
"time"

. "github.com/onsi/gomega"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha3"
bootstrapv1 "sigs.k8s.io/cluster-api/bootstrap/kubeadm/api/v1alpha3"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha3"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/hash"
capierrors "sigs.k8s.io/cluster-api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -108,7 +112,7 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
Machines: fmc.Machines,
}

result, err := r.scaleUpControlPlane(context.Background(), cluster, kcp, fmc.Machines.DeepCopy(), controlPlane)
result, err := r.scaleUpControlPlane(context.Background(), cluster, kcp, controlPlane)
g.Expect(result).To(Equal(ctrl.Result{Requeue: true}))
g.Expect(err).ToNot(HaveOccurred())

Expand Down Expand Up @@ -163,8 +167,7 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
Machines: beforeMachines,
}

ownedMachines := fmc.Machines.DeepCopy()
_, err := r.scaleUpControlPlane(context.Background(), cluster.DeepCopy(), kcp.DeepCopy(), ownedMachines, controlPlane)
_, err := r.scaleUpControlPlane(context.Background(), cluster.DeepCopy(), kcp.DeepCopy(), controlPlane)
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError(&capierrors.RequeueAfterError{RequeueAfter: healthCheckFailedRequeueAfter}))

Expand Down Expand Up @@ -206,6 +209,105 @@ func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.
Machines: machines,
}

_, err := r.scaleDownControlPlane(context.Background(), cluster, kcp, machines, machines, controlPlane)
_, err := r.scaleDownControlPlane(context.Background(), cluster, kcp, controlPlane)
g.Expect(err).ToNot(HaveOccurred())
}

func TestSelectMachineForScaleDown(t *testing.T) {
kcp := controlplanev1.KubeadmControlPlane{
Spec: controlplanev1.KubeadmControlPlaneSpec{},
}
startDate := time.Date(2000, 1, 1, 1, 0, 0, 0, time.UTC)
m1 := machine("machine-1", withFailureDomain("one"), withTimestamp(startDate.Add(time.Hour)), withValidHash(kcp.Spec))
m2 := machine("machine-2", withFailureDomain("one"), withTimestamp(startDate.Add(-3*time.Hour)), withValidHash(kcp.Spec))
m3 := machine("machine-3", withFailureDomain("one"), withTimestamp(startDate.Add(-4*time.Hour)), withValidHash(kcp.Spec))
m4 := machine("machine-4", withFailureDomain("two"), withTimestamp(startDate.Add(-time.Hour)), withValidHash(kcp.Spec))
m5 := machine("machine-5", withFailureDomain("two"), withTimestamp(startDate.Add(-2*time.Hour)), withHash("shrug"))

mc3 := internal.NewFilterableMachineCollection(m1, m2, m3, m4, m5)
fd := clusterv1.FailureDomains{
"one": failureDomain(true),
"two": failureDomain(true),
}

needsUpgradeControlPlane := &internal.ControlPlane{
KCP: &kcp,
Cluster: &clusterv1.Cluster{Status: clusterv1.ClusterStatus{FailureDomains: fd}},
Machines: mc3,
}
upToDateControlPlane := &internal.ControlPlane{
KCP: &kcp,
Cluster: &clusterv1.Cluster{Status: clusterv1.ClusterStatus{FailureDomains: fd}},
Machines: mc3.Filter(func(m *clusterv1.Machine) bool {
return m.Name != "machine-5"
}),
}

testCases := []struct {
name string
cp *internal.ControlPlane
expectErr bool
expectedMachine clusterv1.Machine
}{
{
name: "when there are are machines needing upgrade, it returns the oldest machine in the failure domain with the most machines needing upgrade",
cp: needsUpgradeControlPlane,
expectErr: false,
expectedMachine: clusterv1.Machine{ObjectMeta: metav1.ObjectMeta{Name: "machine-5"}},
},
{
name: "when there are no outdated machines, it returns the oldest machine in the largest failure domain",
cp: upToDateControlPlane,
expectErr: false,
expectedMachine: clusterv1.Machine{ObjectMeta: metav1.ObjectMeta{Name: "machine-3"}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
g := NewWithT(t)

g.Expect(clusterv1.AddToScheme(scheme.Scheme)).To(Succeed())

selectedMachine, err := selectMachineForScaleDown(tc.cp)

if tc.expectErr {
g.Expect(err).To(HaveOccurred())
return
}

g.Expect(err).NotTo(HaveOccurred())
g.Expect(tc.expectedMachine.Name).To(Equal(selectedMachine.Name))
})
}
}

func failureDomain(controlPlane bool) clusterv1.FailureDomainSpec {
return clusterv1.FailureDomainSpec{
ControlPlane: controlPlane,
}
}

func withFailureDomain(fd string) machineOpt {
return func(m *clusterv1.Machine) {
m.Spec.FailureDomain = &fd
}
}

func withTimestamp(t time.Time) machineOpt {
return func(m *clusterv1.Machine) {
m.CreationTimestamp = metav1.NewTime(t)
}
}

func withValidHash(kcp controlplanev1.KubeadmControlPlaneSpec) machineOpt {
return func(m *clusterv1.Machine) {
withHash(hash.Compute(&kcp))(m)
}
}

func withHash(hash string) machineOpt {
return func(m *clusterv1.Machine) {
m.SetLabels(map[string]string{controlplanev1.KubeadmControlPlaneHashLabelKey: hash})
}
}
Loading

0 comments on commit 8ff8401

Please sign in to comment.