Skip to content

Commit

Permalink
apply comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Cecile Robert-Michon committed Feb 10, 2021
1 parent 31fb44c commit c0696cc
Show file tree
Hide file tree
Showing 21 changed files with 527 additions and 509 deletions.
15 changes: 7 additions & 8 deletions controlplane/kubeadm/controllers/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/annotations"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/machinefilters"
"sigs.k8s.io/cluster-api/util/patch"
"sigs.k8s.io/cluster-api/util/predicates"
"sigs.k8s.io/cluster-api/util/secret"
Expand Down Expand Up @@ -271,20 +270,20 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
return result, err
}

controlPlaneMachines, err := r.managementClusterUncached.GetMachinesForCluster(ctx, util.ObjectKey(cluster), machinefilters.ControlPlaneMachines(cluster.Name))
controlPlaneMachines, err := r.managementClusterUncached.GetMachinesForCluster(ctx, util.ObjectKey(cluster), collections.ControlPlaneMachines(cluster.Name))
if err != nil {
log.Error(err, "failed to retrieve control plane machines for cluster")
return ctrl.Result{}, err
}

adoptableMachines := controlPlaneMachines.Filter(machinefilters.AdoptableControlPlaneMachines(cluster.Name))
adoptableMachines := controlPlaneMachines.Filter(collections.AdoptableControlPlaneMachines(cluster.Name))
if len(adoptableMachines) > 0 {
// We adopt the Machines and then wait for the update event for the ownership reference to re-queue them so the cache is up-to-date
err = r.adoptMachines(ctx, kcp, adoptableMachines, cluster)
return ctrl.Result{}, err
}

ownedMachines := controlPlaneMachines.Filter(machinefilters.OwnedMachines(kcp))
ownedMachines := controlPlaneMachines.Filter(collections.OwnedMachines(kcp))
if len(ownedMachines) != len(controlPlaneMachines) {
log.Info("Not all control plane machines are owned by this KubeadmControlPlane, refusing to operate in mixed management mode")
return ctrl.Result{}, nil
Expand Down Expand Up @@ -354,7 +353,7 @@ func (r *KubeadmControlPlaneReconciler) reconcile(ctx context.Context, cluster *
case numMachines > desiredReplicas:
log.Info("Scaling down control plane", "Desired", desiredReplicas, "Existing", numMachines)
// The last parameter (i.e. machines needing to be rolled out) should always be empty here.
return r.scaleDownControlPlane(ctx, cluster, kcp, controlPlane, collections.FilterableMachines{})
return r.scaleDownControlPlane(ctx, cluster, kcp, controlPlane, collections.Machines{})
}

// Get the workload cluster client.
Expand Down Expand Up @@ -395,7 +394,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, clu
if err != nil {
return ctrl.Result{}, err
}
ownedMachines := allMachines.Filter(machinefilters.OwnedMachines(kcp))
ownedMachines := allMachines.Filter(collections.OwnedMachines(kcp))

// If no control plane machines remain, remove the finalizer
if len(ownedMachines) == 0 {
Expand Down Expand Up @@ -429,7 +428,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileDelete(ctx context.Context, clu
}

// Delete control plane machines in parallel
machinesToDelete := ownedMachines.Filter(machinefilters.Not(machinefilters.HasDeletionTimestamp))
machinesToDelete := ownedMachines.Filter(collections.Not(collections.HasDeletionTimestamp))
var errs []error
for i := range machinesToDelete {
m := machinesToDelete[i]
Expand Down Expand Up @@ -543,7 +542,7 @@ func (r *KubeadmControlPlaneReconciler) reconcileEtcdMembers(ctx context.Context
return ctrl.Result{}, nil
}

func (r *KubeadmControlPlaneReconciler) adoptMachines(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, machines collections.FilterableMachines, cluster *clusterv1.Cluster) error {
func (r *KubeadmControlPlaneReconciler) adoptMachines(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, machines collections.Machines, cluster *clusterv1.Cluster) error {
// We do an uncached full quorum read against the KCP to avoid re-adopting Machines the garbage collector just intentionally orphaned
// See https://github.com/kubernetes/kubernetes/issues/42639
uncached := controlplanev1.KubeadmControlPlane{}
Expand Down
8 changes: 4 additions & 4 deletions controlplane/kubeadm/controllers/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
kcp.Spec.Version = version

fmc := &fakeManagementCluster{
Machines: collections.FilterableMachines{},
Machines: collections.Machines{},
Workload: fakeWorkloadCluster{},
}
objs := []client.Object{cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy()}
Expand Down Expand Up @@ -425,7 +425,7 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
kcp.Spec.Version = version

fmc := &fakeManagementCluster{
Machines: collections.FilterableMachines{},
Machines: collections.Machines{},
Workload: fakeWorkloadCluster{},
}
objs := []client.Object{cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy()}
Expand Down Expand Up @@ -535,7 +535,7 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
kcp.DeletionTimestamp = &now

fmc := &fakeManagementCluster{
Machines: collections.FilterableMachines{},
Machines: collections.Machines{},
Workload: fakeWorkloadCluster{},
}
objs := []client.Object{cluster.DeepCopy(), kcp.DeepCopy(), tmpl.DeepCopy()}
Expand Down Expand Up @@ -597,7 +597,7 @@ func TestKubeadmControlPlaneReconciler_adoption(t *testing.T) {
kcp.Spec.Version = "v1.17.0"

fmc := &fakeManagementCluster{
Machines: collections.FilterableMachines{
Machines: collections.Machines{
"test0": &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Namespace: cluster.Namespace,
Expand Down
5 changes: 2 additions & 3 deletions controlplane/kubeadm/controllers/fakes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/util/collections"
"sigs.k8s.io/cluster-api/util/machinefilters"
"sigs.k8s.io/controller-runtime/pkg/client"
)

type fakeManagementCluster struct {
// TODO: once all client interactions are moved to the Management cluster this can go away
Management *internal.Management
Machines collections.FilterableMachines
Machines collections.Machines
Workload fakeWorkloadCluster
Reader client.Reader
}
Expand All @@ -46,7 +45,7 @@ func (f *fakeManagementCluster) GetWorkloadCluster(_ context.Context, _ client.O
return f.Workload, nil
}

func (f *fakeManagementCluster) GetMachinesForCluster(c context.Context, n client.ObjectKey, filters ...machinefilters.Func) (collections.FilterableMachines, error) {
func (f *fakeManagementCluster) GetMachinesForCluster(c context.Context, n client.ObjectKey, filters ...collections.Func) (collections.Machines, error) {
if f.Management != nil {
return f.Management.GetMachinesForCluster(c, n, filters...)
}
Expand Down
2 changes: 1 addition & 1 deletion controlplane/kubeadm/controllers/remediation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func TestReconcileUnhealthyMachines(t *testing.T) {
controlPlane := &internal.ControlPlane{
KCP: &controlplanev1.KubeadmControlPlane{},
Cluster: &clusterv1.Cluster{},
Machines: collections.FromMachines(),
Machines: collections.New(),
}
ret, err := r.reconcileUnhealthyMachines(context.TODO(), controlPlane)

Expand Down
9 changes: 4 additions & 5 deletions controlplane/kubeadm/controllers/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/machinefilters"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand All @@ -39,7 +38,7 @@ func (r *KubeadmControlPlaneReconciler) initializeControlPlane(ctx context.Conte

// Perform an uncached read of all the owned machines. This check is in place to make sure
// that the controller cache is not misbehaving and we end up initializing the cluster more than once.
ownedMachines, err := r.managementClusterUncached.GetMachinesForCluster(ctx, util.ObjectKey(cluster), machinefilters.OwnedMachines(kcp))
ownedMachines, err := r.managementClusterUncached.GetMachinesForCluster(ctx, util.ObjectKey(cluster), collections.OwnedMachines(kcp))
if err != nil {
logger.Error(err, "failed to perform an uncached read of control plane machines for cluster")
return ctrl.Result{}, err
Expand Down Expand Up @@ -89,7 +88,7 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(
cluster *clusterv1.Cluster,
kcp *controlplanev1.KubeadmControlPlane,
controlPlane *internal.ControlPlane,
outdatedMachines collections.FilterableMachines,
outdatedMachines collections.Machines,
) (ctrl.Result, error) {
logger := controlPlane.Logger()

Expand Down Expand Up @@ -165,7 +164,7 @@ func (r *KubeadmControlPlaneReconciler) preflightChecks(_ context.Context, contr

// If there are deleting machines, wait for the operation to complete.
if controlPlane.HasDeletingMachine() {
logger.Info("Waiting for machines to be deleted", "Machines", strings.Join(controlPlane.Machines.Filter(machinefilters.HasDeletionTimestamp).Names(), ", "))
logger.Info("Waiting for machines to be deleted", "Machines", strings.Join(controlPlane.Machines.Filter(collections.HasDeletionTimestamp).Names(), ", "))
return ctrl.Result{RequeueAfter: deleteRequeueAfter}, nil
}

Expand Down Expand Up @@ -226,7 +225,7 @@ func preflightCheckCondition(kind string, obj conditions.Getter, condition clust
return nil
}

func selectMachineForScaleDown(controlPlane *internal.ControlPlane, outdatedMachines collections.FilterableMachines) (*clusterv1.Machine, error) {
func selectMachineForScaleDown(controlPlane *internal.ControlPlane, outdatedMachines collections.Machines) (*clusterv1.Machine, error) {
machines := controlPlane.Machines
switch {
case controlPlane.MachineWithDeleteAnnotation(outdatedMachines).Len() > 0:
Expand Down
10 changes: 5 additions & 5 deletions controlplane/kubeadm/controllers/scale_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
initObjs := []client.Object{cluster.DeepCopy(), kcp.DeepCopy(), genericMachineTemplate.DeepCopy()}

fmc := &fakeManagementCluster{
Machines: collections.FromMachines(),
Machines: collections.New(),
Workload: fakeWorkloadCluster{},
}

Expand Down Expand Up @@ -127,7 +127,7 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
cluster.Spec.ControlPlaneEndpoint.Host = "nodomain.example.com"
cluster.Spec.ControlPlaneEndpoint.Port = 6443

beforeMachines := collections.FromMachines()
beforeMachines := collections.New()
for i := 0; i < 2; i++ {
m, _ := createMachineNodePair(fmt.Sprintf("test-%d", i), cluster.DeepCopy(), kcp.DeepCopy(), true)
beforeMachines.Insert(m)
Expand Down Expand Up @@ -319,7 +319,7 @@ func TestSelectMachineForScaleDown(t *testing.T) {
testCases := []struct {
name string
cp *internal.ControlPlane
outDatedMachines collections.FilterableMachines
outDatedMachines collections.Machines
expectErr bool
expectedMachine clusterv1.Machine
}{
Expand All @@ -333,7 +333,7 @@ func TestSelectMachineForScaleDown(t *testing.T) {
{
name: "when there are no outdated machines, it returns the oldest machine in the largest failure domain",
cp: upToDateControlPlane,
outDatedMachines: collections.FromMachines(),
outDatedMachines: collections.New(),
expectErr: false,
expectedMachine: clusterv1.Machine{ObjectMeta: metav1.ObjectMeta{Name: "machine-3"}},
},
Expand All @@ -354,7 +354,7 @@ func TestSelectMachineForScaleDown(t *testing.T) {
{
name: "when there are annotated machines which are part of the annotatedControlPlane but not in outdatedMachines, it returns the oldest marked machine first",
cp: annotatedControlPlane,
outDatedMachines: collections.FromMachines(),
outDatedMachines: collections.New(),
expectErr: false,
expectedMachine: clusterv1.Machine{ObjectMeta: metav1.ObjectMeta{Name: "machine-8"}},
},
Expand Down
8 changes: 4 additions & 4 deletions controlplane/kubeadm/controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ package controllers

import (
"context"
"sigs.k8s.io/cluster-api/util/collections"

"github.com/pkg/errors"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
controlplanev1 "sigs.k8s.io/cluster-api/controlplane/kubeadm/api/v1alpha4"
"sigs.k8s.io/cluster-api/controlplane/kubeadm/internal"
"sigs.k8s.io/cluster-api/util"
"sigs.k8s.io/cluster-api/util/conditions"
"sigs.k8s.io/cluster-api/util/machinefilters"
ctrl "sigs.k8s.io/controller-runtime"
)

Expand All @@ -34,12 +34,12 @@ import (
func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *controlplanev1.KubeadmControlPlane, cluster *clusterv1.Cluster) error {
log := ctrl.LoggerFrom(ctx, "cluster", cluster.Name)

selector := machinefilters.ControlPlaneSelectorForCluster(cluster.Name)
selector := collections.ControlPlaneSelectorForCluster(cluster.Name)
// Copy label selector to its status counterpart in string format.
// This is necessary for CRDs including scale subresources.
kcp.Status.Selector = selector.String()

ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), machinefilters.OwnedMachines(kcp))
ownedMachines, err := r.managementCluster.GetMachinesForCluster(ctx, util.ObjectKey(cluster), collections.OwnedMachines(kcp))
if err != nil {
return errors.Wrap(err, "failed to get list of owned machines")
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func (r *KubeadmControlPlaneReconciler) updateStatus(ctx context.Context, kcp *c
// make sure last resize operation is marked as completed.
// NOTE: we are checking the number of machines ready so we report resize completed only when the machines
// are actually provisioned (vs reporting completed immediately after the last machine object is created).
readyMachines := ownedMachines.Filter(machinefilters.IsReady())
readyMachines := ownedMachines.Filter(collections.IsReady())
if int32(len(readyMachines)) == replicas {
conditions.MarkTrue(kcp, controlplanev1.ResizedCondition)
}
Expand Down
2 changes: 1 addition & 1 deletion controlplane/kubeadm/controllers/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(
cluster *clusterv1.Cluster,
kcp *controlplanev1.KubeadmControlPlane,
controlPlane *internal.ControlPlane,
machinesRequireUpgrade collections.FilterableMachines,
machinesRequireUpgrade collections.Machines,
) (ctrl.Result, error) {
logger := controlPlane.Logger()

Expand Down
5 changes: 2 additions & 3 deletions controlplane/kubeadm/internal/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
corev1 "k8s.io/api/core/v1"
clusterv1 "sigs.k8s.io/cluster-api/api/v1alpha4"
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/util/machinefilters"
"sigs.k8s.io/cluster-api/util/secret"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -43,7 +42,7 @@ const (
type ManagementCluster interface {
ctrlclient.Reader

GetMachinesForCluster(ctx context.Context, cluster client.ObjectKey, filters ...machinefilters.Func) (collections.FilterableMachines, error)
GetMachinesForCluster(ctx context.Context, cluster client.ObjectKey, filters ...collections.Func) (collections.Machines, error)
GetWorkloadCluster(ctx context.Context, clusterKey client.ObjectKey) (WorkloadCluster, error)
}

Expand Down Expand Up @@ -74,7 +73,7 @@ func (m *Management) List(ctx context.Context, list client.ObjectList, opts ...c

// GetMachinesForCluster returns a list of machines that can be filtered or not.
// If no filter is supplied then all machines associated with the target cluster are returned.
func (m *Management) GetMachinesForCluster(ctx context.Context, cluster client.ObjectKey, filters ...machinefilters.Func) (collections.FilterableMachines, error) {
func (m *Management) GetMachinesForCluster(ctx context.Context, cluster client.ObjectKey, filters ...collections.Func) (collections.Machines, error) {
selector := map[string]string{
clusterv1.ClusterLabelName: cluster.Name,
}
Expand Down
6 changes: 3 additions & 3 deletions controlplane/kubeadm/internal/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"crypto/x509/pkix"
"fmt"
"math/big"
"sigs.k8s.io/cluster-api/util/collections"
"testing"
"time"

Expand All @@ -40,7 +41,6 @@ import (
"sigs.k8s.io/cluster-api/controllers/remote"
"sigs.k8s.io/cluster-api/util/certs"
"sigs.k8s.io/cluster-api/util/kubeconfig"
"sigs.k8s.io/cluster-api/util/machinefilters"
"sigs.k8s.io/cluster-api/util/secret"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand All @@ -60,15 +60,15 @@ func TestGetMachinesForCluster(t *testing.T) {
g.Expect(machines).To(HaveLen(3))

// Test the ControlPlaneMachines works
machines, err = m.GetMachinesForCluster(ctx, clusterKey, machinefilters.ControlPlaneMachines("my-cluster"))
machines, err = m.GetMachinesForCluster(ctx, clusterKey, collections.ControlPlaneMachines("my-cluster"))
g.Expect(err).NotTo(HaveOccurred())
g.Expect(machines).To(HaveLen(1))

// Test that the filters use AND logic instead of OR logic
nameFilter := func(cluster *clusterv1.Machine) bool {
return cluster.Name == "first-machine"
}
machines, err = m.GetMachinesForCluster(ctx, clusterKey, machinefilters.ControlPlaneMachines("my-cluster"), nameFilter)
machines, err = m.GetMachinesForCluster(ctx, clusterKey, collections.ControlPlaneMachines("my-cluster"), nameFilter)
g.Expect(err).NotTo(HaveOccurred())
g.Expect(machines).To(HaveLen(1))
}
Expand Down
Loading

0 comments on commit c0696cc

Please sign in to comment.