Skip to content

Commit

Permalink
🌱 refactor failure domains logic out of controlplane internal package
Browse files Browse the repository at this point in the history
  • Loading branch information
Cecile Robert-Michon committed Feb 10, 2021
1 parent 51a6d64 commit 401a4ab
Show file tree
Hide file tree
Showing 23 changed files with 622 additions and 577 deletions.
16 changes: 8 additions & 8 deletions controlplane/kubeadm/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"sigs.k8s.io/cluster-api/util/collections"
"time"

"github.com/blang/semver"
Expand All @@ -36,7 +37,6 @@ import (
"sigs.k8s.io/cluster-api/controllers/remote"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha4"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/machinefilters"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
Expand Down Expand Up @@ -270,20 +270,20 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
return result, err
}

controlPlaneMachines, err := r.managementClusterUncached.GetMachinesForCluster(ctx, util.ObjectKey(cluster), machinefilters.ControlPlaneMachines(cluster.Name))
controlPlaneMachines, err := r.managementClusterUncached.GetMachinesForCluster(ctx, util.ObjectKey(cluster), collections.ControlPlaneMachines(cluster.Name))
if err != nil {
log.Error(err, "failed to retrieve control plane machines for cluster")
return ctrl.Result{}, err
}

adoptableMachines := controlPlaneMachines.Filter(machinefilters.AdoptableControlPlaneMachines(cluster.Name))
adoptableMachines := controlPlaneMachines.Filter(collections.AdoptableControlPlaneMachines(cluster.Name))
if len(adoptableMachines) > 0 {
// We adopt the Machines and then wait for the update event for the ownership reference to re-queue them so the cache is up-to-date
err = r.adoptMachines(ctx, kcp, adoptableMachines, cluster)
return ctrl.Result{}, err
}

ownedMachines := controlPlaneMachines.Filter(machinefilters.OwnedMachines(kcp))
ownedMachines := controlPlaneMachines.Filter(collections.OwnedMachines(kcp))
if len(ownedMachines) != len(controlPlaneMachines) {
log.Info("Not all control plane machines are owned by this KubeadmControlPlane, refusing to operate in mixed management mode")
return ctrl.Result{}, nil
Expand Down Expand Up @@ -353,7 +353,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
case numMachines > desiredReplicas:
log.Info("Scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines)
// The last parameter (i.e. machines needing to be rolled out) should always be empty here.
return r.scaleDownControlPlane(ctx, cluster, kcp, controlPlane, internal.FilterableMachineCollection{})
return r.scaleDownControlPlane(ctx, cluster, kcp, controlPlane, collections.Machines{})
}

// Get the workload cluster client.
Expand Down Expand Up @@ -394,7 +394,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, clu
if err != nil {
return ctrl.Result{}, err
}
ownedMachines := allMachines.Filter(machinefilters.OwnedMachines(kcp))
ownedMachines := allMachines.Filter(collections.OwnedMachines(kcp))

// If no control plane machines remain, remove the finalizer
if len(ownedMachines) == 0 {
Expand Down Expand Up @@ -428,7 +428,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, clu
}

// Delete control plane machines in parallel
machinesToDelete := ownedMachines.Filter(machinefilters.Not(machinefilters.HasDeletionTimestamp))
machinesToDelete := ownedMachines.Filter(collections.Not(collections.HasDeletionTimestamp))
var errs []error
for i := range machinesToDelete {
m := machinesToDelete[i]
Expand Down Expand Up @@ -542,7 +542,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context
return ctrl.Result{}, nil
}

func (r *KubeadmControlPlaneReconciler) adoptMachines(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, machines internal.FilterableMachineCollection, cluster *clusterv1.Cluster) error {
func (r *KubeadmControlPlaneReconciler) adoptMachines(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, machines collections.Machines, cluster *clusterv1.Cluster) error {
// We do an uncached full quorum read against the KCP to avoid re-adopting Machines the garbage collector just intentionally orphaned
// See https://github.com/kubernetes/kubernetes/issues/42639
uncached := controlplanev1.KubeadmControlPlane{}
Expand Down
9 changes: 5 additions & 4 deletions controlplane/kubeadm/controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package controllers
import (
"context"
"fmt"
"sigs.k8s.io/cluster-api/util/collections"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -360,7 +361,7 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
kcp.Spec.Version = version

fmc := &fakeManagementCluster{
Machines: internal.FilterableMachineCollection{},
Machines: collections.Machines{},
Workload: fakeWorkloadCluster{},
}
objs := []client.Object{cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy()}
Expand Down Expand Up @@ -424,7 +425,7 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
kcp.Spec.Version = version

fmc := &fakeManagementCluster{
Machines: internal.FilterableMachineCollection{},
Machines: collections.Machines{},
Workload: fakeWorkloadCluster{},
}
objs := []client.Object{cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy()}
Expand Down Expand Up @@ -534,7 +535,7 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
kcp.DeletionTimestamp = &now

fmc := &fakeManagementCluster{
Machines: internal.FilterableMachineCollection{},
Machines: collections.Machines{},
Workload: fakeWorkloadCluster{},
}
objs := []client.Object{cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy()}
Expand Down Expand Up @@ -596,7 +597,7 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
kcp.Spec.Version = "v1.17.0"

fmc := &fakeManagementCluster{
Machines: internal.FilterableMachineCollection{
Machines: collections.Machines{
"test0": &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Expand Down
7 changes: 3 additions & 4 deletions controlplane/kubeadm/controllers/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,17 @@ package controllers

import (
"context"

"github.com/blang/semver"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/machinefilters"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type fakeManagementCluster struct {
// TODO: once all client interactions are moved to the Management cluster this can go away
Management *internal.Management
Machines internal.FilterableMachineCollection
Machines collections.Machines
Workload fakeWorkloadCluster
Reader client.Reader
}
Expand All @@ -46,7 +45,7 @@ func (f *fakeManagementCluster) GetWorkloadCluster(_ context.Context, _ client.O
return f.Workload, nil
}

func (f *fakeManagementCluster) GetMachinesForCluster(c context.Context, n client.ObjectKey, filters ...machinefilters.Func) (internal.FilterableMachineCollection, error) {
func (f *fakeManagementCluster) GetMachinesForCluster(c context.Context, n client.ObjectKey, filters ...collections.Func) (collections.Machines, error) {
if f.Management != nil {
return f.Management.GetMachinesForCluster(c, n, filters...)
}
Expand Down
33 changes: 17 additions & 16 deletions controlplane/kubeadm/controllers/remediation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"sigs.k8s.io/cluster-api/util/collections"
"testing"

. "github.com/onsi/gomega"
Expand Down Expand Up @@ -50,7 +51,7 @@ func TestReconcileUnhealthyMachines(t *testing.T) {
controlPlane := &internal.ControlPlane{
KCP: &controlplanev1.KubeadmControlPlane{},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(),
Machines: collections.New(),
}
ret, err := r.reconcileUnhealthyMachines(context.TODO(), controlPlane)

Expand All @@ -66,7 +67,7 @@ func TestReconcileUnhealthyMachines(t *testing.T) {
controlPlane := &internal.ControlPlane{
KCP: &controlplanev1.KubeadmControlPlane{},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m),
Machines: collections.FromMachines(m),
}
ret, err := r.reconcileUnhealthyMachines(context.TODO(), controlPlane)

Expand All @@ -82,7 +83,7 @@ func TestReconcileUnhealthyMachines(t *testing.T) {
Replicas: utilpointer.Int32Ptr(1),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m),
Machines: collections.FromMachines(m),
}
ret, err := r.reconcileUnhealthyMachines(context.TODO(), controlPlane)

Expand All @@ -101,7 +102,7 @@ func TestReconcileUnhealthyMachines(t *testing.T) {
Replicas: utilpointer.Int32Ptr(3),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m),
Machines: collections.FromMachines(m),
}
ret, err := r.reconcileUnhealthyMachines(context.TODO(), controlPlane)

Expand All @@ -122,7 +123,7 @@ func TestReconcileUnhealthyMachines(t *testing.T) {
Replicas: utilpointer.Int32Ptr(3),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1, m2, m3),
Machines: collections.FromMachines(m1, m2, m3),
}
ret, err := r.reconcileUnhealthyMachines(context.TODO(), controlPlane)

Expand All @@ -144,7 +145,7 @@ func TestReconcileUnhealthyMachines(t *testing.T) {
Replicas: utilpointer.Int32Ptr(3),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1, m2, m3),
Machines: collections.FromMachines(m1, m2, m3),
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -179,7 +180,7 @@ func TestReconcileUnhealthyMachines(t *testing.T) {
Replicas: utilpointer.Int32Ptr(5),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1, m2, m3, m4, m5),
Machines: collections.FromMachines(m1, m2, m3, m4, m5),
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -217,7 +218,7 @@ func TestReconcileUnhealthyMachines(t *testing.T) {
Replicas: utilpointer.Int32Ptr(3),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1, m2, m3),
Machines: collections.FromMachines(m1, m2, m3),
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -269,7 +270,7 @@ func TestCanSafelyRemoveEtcdMember(t *testing.T) {
Replicas: utilpointer.Int32Ptr(1),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1),
Machines: collections.FromMachines(m1),
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -299,7 +300,7 @@ func TestCanSafelyRemoveEtcdMember(t *testing.T) {
Replicas: utilpointer.Int32Ptr(2),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1, m2),
Machines: collections.FromMachines(m1, m2),
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -330,7 +331,7 @@ func TestCanSafelyRemoveEtcdMember(t *testing.T) {
Replicas: utilpointer.Int32Ptr(5),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1, m2, m3),
Machines: collections.FromMachines(m1, m2, m3),
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -361,7 +362,7 @@ func TestCanSafelyRemoveEtcdMember(t *testing.T) {
Replicas: utilpointer.Int32Ptr(5),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1, m2, m3),
Machines: collections.FromMachines(m1, m2, m3),
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -394,7 +395,7 @@ func TestCanSafelyRemoveEtcdMember(t *testing.T) {
Replicas: utilpointer.Int32Ptr(5),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1, m2, m3, m4, m5),
Machines: collections.FromMachines(m1, m2, m3, m4, m5),
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -427,7 +428,7 @@ func TestCanSafelyRemoveEtcdMember(t *testing.T) {
Replicas: utilpointer.Int32Ptr(5),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1, m2, m3, m4, m5),
Machines: collections.FromMachines(m1, m2, m3, m4, m5),
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -462,7 +463,7 @@ func TestCanSafelyRemoveEtcdMember(t *testing.T) {
Replicas: utilpointer.Int32Ptr(5),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1, m2, m3, m4, m5, m6, m7),
Machines: collections.FromMachines(m1, m2, m3, m4, m5, m6, m7),
}

r := &KubeadmControlPlaneReconciler{
Expand Down Expand Up @@ -497,7 +498,7 @@ func TestCanSafelyRemoveEtcdMember(t *testing.T) {
Replicas: utilpointer.Int32Ptr(5),
}},
Cluster: &clusterv1.Cluster{},
Machines: internal.NewFilterableMachineCollection(m1, m2, m3, m4, m5, m6, m7),
Machines: collections.FromMachines(m1, m2, m3, m4, m5, m6, m7),
}

r := &KubeadmControlPlaneReconciler{
Expand Down
10 changes: 5 additions & 5 deletions controlplane/kubeadm/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controllers

import (
"context"
"sigs.k8s.io/cluster-api/util/collections"
"strings"

"github.com/pkg/errors"
Expand All @@ -27,7 +28,6 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha4"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal/machinefilters"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
ctrl "sigs.k8s.io/controller-runtime"
Expand All @@ -38,7 +38,7 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte

// Perform an uncached read of all the owned machines. This check is in place to make sure
// that the controller cache is not misbehaving and we end up initializing the cluster more than once.
ownedMachines, err := r.managementClusterUncached.GetMachinesForCluster(ctx, util.ObjectKey(cluster), machinefilters.OwnedMachines(kcp))
ownedMachines, err := r.managementClusterUncached.GetMachinesForCluster(ctx, util.ObjectKey(cluster), collections.OwnedMachines(kcp))
if err != nil {
logger.Error(err, "failed to perform an uncached read of control plane machines for cluster")
return ctrl.Result{}, err
Expand Down Expand Up @@ -88,7 +88,7 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
cluster *clusterv1.Cluster,
kcp *controlplanev1.KubeadmControlPlane,
controlPlane *internal.ControlPlane,
outdatedMachines internal.FilterableMachineCollection,
outdatedMachines collections.Machines,
) (ctrl.Result, error) {
logger := controlPlane.Logger()

Expand Down Expand Up @@ -164,7 +164,7 @@ func (r *KubeadmControlPlaneReconciler) preflightChecks(_ context.Context, contr

// If there are deleting machines, wait for the operation to complete.
if controlPlane.HasDeletingMachine() {
logger.Info("Waiting for machines to be deleted", "Machines", strings.Join(controlPlane.Machines.Filter(machinefilters.HasDeletionTimestamp).Names(), ", "))
logger.Info("Waiting for machines to be deleted", "Machines", strings.Join(controlPlane.Machines.Filter(collections.HasDeletionTimestamp).Names(), ", "))
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}

Expand Down Expand Up @@ -225,7 +225,7 @@ func preflightCheckCondition(kind string, obj conditions.Getter, condition clust
return nil
}

func selectMachineForScaleDown(controlPlane *internal.ControlPlane, outdatedMachines internal.FilterableMachineCollection) (*clusterv1.Machine, error) {
func selectMachineForScaleDown(controlPlane *internal.ControlPlane, outdatedMachines collections.Machines) (*clusterv1.Machine, error) {
machines := controlPlane.Machines
switch {
case controlPlane.MachineWithDeleteAnnotation(outdatedMachines).Len() > 0:
Expand Down
Loading

0 comments on commit 401a4ab

Please sign in to comment.