Skip to content

Commit

Permalink
Merge pull request #2477 from chuckha/refactor-workload-cluster
Browse files Browse the repository at this point in the history
🏃 Refactor workload cluster out of cluster
  • Loading branch information
k8s-ci-robot authored Feb 28, 2020
2 parents d2953f9 + 4c4f45c commit 3d6efa3
Show file tree
Hide file tree
Showing 5 changed files with 452 additions and 644 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,9 @@ const (

type managementCluster interface {
GetMachinesForCluster(ctx context.Context, cluster types.NamespacedName, filters ...internal.MachineFilter) (internal.FilterableMachineCollection, error)
TargetClusterControlPlaneIsHealthy(ctx context.Context, clusterKey types.NamespacedName, controlPlaneName string) error
GetWorkloadCluster(ctx context.Context, cluster types.NamespacedName) (*internal.Cluster, error)
TargetClusterEtcdIsHealthy(ctx context.Context, clusterKey types.NamespacedName, controlPlaneName string) error
RemoveEtcdMemberForMachine(ctx context.Context, clusterKey types.NamespacedName, machine *clusterv1.Machine) error
RemoveMachineFromKubeadmConfigMap(ctx context.Context, clusterKey types.NamespacedName, machine *clusterv1.Machine) error
UpdateKubernetesVersionInKubeadmConfigMap(ctx context.Context, clusterKey types.NamespacedName, version string) error
UpdateKubeletConfigMap(ctx context.Context, clusterKey types.NamespacedName, version semver.Version) error
TargetClusterControlPlaneIsHealthy(ctx context.Context, clusterKey types.NamespacedName, controlPlaneName string) error
}

// +kubebuilder:rbac:groups=core,resources=events,verbs=get;list;watch;create;patch
Expand Down Expand Up @@ -348,8 +345,14 @@ func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(ctx context.Context,

// TODO: handle reconciliation of etcd members and kubeadm config in case they get out of sync with cluster

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "failed to get remote client for workload cluster", "cluster key", util.ObjectKey(cluster))
return ctrl.Result{}, err
}

// Reconcile the remote cluster's configuration necessary for upgrade
if err := r.reconcileConfiguration(ctx, kcp.Spec.Version, util.ObjectKey(cluster)); err != nil {
if err := r.reconcileConfiguration(ctx, kcp.Spec.Version, util.ObjectKey(cluster), workloadCluster); err != nil {
logger.Error(err, "failed reconcile remote cluster configuration for upgrade")
return ctrl.Result{}, err
}
Expand Down Expand Up @@ -384,7 +387,7 @@ func (r *KubeadmControlPlaneReconciler) upgradeControlPlane(ctx context.Context,
}

// reconcileConfiguration will update the remote cluster's system configurations in preparation for an upgrade.
func (r *KubeadmControlPlaneReconciler) reconcileConfiguration(ctx context.Context, version string, clusterKey client.ObjectKey) error {
func (r *KubeadmControlPlaneReconciler) reconcileConfiguration(ctx context.Context, version string, clusterKey client.ObjectKey, workloadCluster *internal.Cluster) error {
parsedVersion, err := semver.ParseTolerant(version)
if err != nil {
return errors.Wrapf(err, "failed to parse kubernetes version %q", version)
Expand All @@ -404,11 +407,11 @@ func (r *KubeadmControlPlaneReconciler) reconcileConfiguration(ctx context.Conte
return errors.Wrap(err, "failed to reconcile the remote kubelet RBAC binding")
}

if err := r.managementCluster.UpdateKubernetesVersionInKubeadmConfigMap(ctx, clusterKey, version); err != nil {
if err := workloadCluster.UpdateKubernetesVersionInKubeadmConfigMap(ctx, version); err != nil {
return errors.Wrap(err, "failed to update the kubernetes version in the kubeadm config map")
}

if err := r.managementCluster.UpdateKubeletConfigMap(ctx, clusterKey, parsedVersion); err != nil {
if err := workloadCluster.UpdateKubeletConfigMap(ctx, parsedVersion); err != nil {
return errors.Wrap(err, "failed to upgrade kubelet config map")
}

Expand Down Expand Up @@ -570,6 +573,13 @@ func (r *KubeadmControlPlaneReconciler) scaleUpControlPlane(ctx context.Context,

func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Context, cluster *clusterv1.Cluster, kcp *controlplanev1.KubeadmControlPlane, machines internal.FilterableMachineCollection) (ctrl.Result, error) {
logger := r.Log.WithValues("namespace", kcp.Namespace, "kubeadmControlPlane", kcp.Name, "cluster", cluster.Name)

workloadCluster, err := r.managementCluster.GetWorkloadCluster(ctx, util.ObjectKey(cluster))
if err != nil {
logger.Error(err, "failed to create client to workload cluster")
return ctrl.Result{}, errors.New("failed to create client to workload cluster")
}

// We don't want to health check at the beginning of this method to avoid blocking re-entrancy

// Wait for any delete in progress to complete before deleting another Machine
Expand Down Expand Up @@ -604,9 +614,8 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex
logger.Error(err, "waiting for control plane to pass etcd health check before adding removing a control plane machine")
r.recorder.Eventf(kcp, corev1.EventTypeWarning, "ControlPlaneUnhealthy", "Waiting for control plane to pass etcd health check before removing a control plane machine: %v", err)
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter}

}
if err := r.managementCluster.RemoveEtcdMemberForMachine(ctx, util.ObjectKey(cluster), machineToDelete); err != nil {
if err := workloadCluster.RemoveEtcdMemberForMachine(ctx, machineToDelete); err != nil {
logger.Error(err, "failed to remove etcd member for machine")
return ctrl.Result{}, err
}
Expand All @@ -622,7 +631,7 @@ func (r *KubeadmControlPlaneReconciler) scaleDownControlPlane(ctx context.Contex
return ctrl.Result{}, &capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter}

}
if err := r.managementCluster.RemoveMachineFromKubeadmConfigMap(ctx, util.ObjectKey(cluster), machineToDelete); err != nil {
if err := workloadCluster.RemoveMachineFromKubeadmConfigMap(ctx, machineToDelete); err != nil {
logger.Error(err, "failed to remove machine from kubeadm ConfigMap")
return ctrl.Result{}, err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"testing"
"time"

"github.com/blang/semver"
. "github.com/onsi/gomega"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -1226,6 +1225,10 @@ type fakeManagementCluster struct {
Machines internal.FilterableMachineCollection
}

func (f *fakeManagementCluster) GetWorkloadCluster(_ context.Context, _ types.NamespacedName) (*internal.Cluster, error) {
return nil, nil
}

func (f *fakeManagementCluster) GetMachinesForCluster(_ context.Context, _ types.NamespacedName, _ ...internal.MachineFilter) (internal.FilterableMachineCollection, error) {
return f.Machines, nil
}
Expand All @@ -1244,22 +1247,6 @@ func (f *fakeManagementCluster) TargetClusterEtcdIsHealthy(_ context.Context, _
return nil
}

func (f *fakeManagementCluster) RemoveEtcdMemberForMachine(_ context.Context, _ types.NamespacedName, _ *clusterv1.Machine) error {
return nil
}

func (f *fakeManagementCluster) RemoveMachineFromKubeadmConfigMap(_ context.Context, _ types.NamespacedName, _ *clusterv1.Machine) error {
return nil
}

func (f *fakeManagementCluster) UpdateKubernetesVersionInKubeadmConfigMap(_ context.Context, _ types.NamespacedName, _ string) error {
return nil
}

func (f *fakeManagementCluster) UpdateKubeletConfigMap(_ context.Context, _ types.NamespacedName, _ semver.Version) error {
return nil
}

func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
t.Run("creates a control plane Machine if health checks pass", func(t *testing.T) {
g := NewWithT(t)
Expand Down Expand Up @@ -1357,234 +1344,34 @@ func TestKubeadmControlPlaneReconciler_scaleUpControlPlane(t *testing.T) {
})
}

func TestKubeadmControlPlaneReconciler_scaleDownControlPlane(t *testing.T) {
t.Run("deletes a control plane Machine if health checks pass", func(t *testing.T) {
g := NewWithT(t)
func TestKubeadmControlPlaneReconciler_scaleDownControlPlane_NoError(t *testing.T) {
g := NewWithT(t)

cluster, kcp, genericMachineTemplate := createClusterWithControlPlane()
initObjs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), genericMachineTemplate.DeepCopy()}
machines := map[string]*clusterv1.Machine{
"one": machine("one"),
}

fmc := &fakeManagementCluster{
Machines: internal.NewFilterableMachineCollection(),
ControlPlaneHealthy: true,
r := &KubeadmControlPlaneReconciler{
Log: log.Log,
recorder: record.NewFakeRecorder(32),
Client: newFakeClient(g, machines["one"]),
managementCluster: &fakeManagementCluster{
EtcdHealthy: true,
}

for i := 0; i < 2; i++ {
m, _ := createMachineNodePair(fmt.Sprintf("test-%d", i), cluster, kcp, true)
fmc.Machines = fmc.Machines.Insert(m)
initObjs = append(initObjs, m.DeepCopy())
}

fakeClient := newFakeClient(g, initObjs...)

r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
managementCluster: fmc,
Log: log.Log,
recorder: record.NewFakeRecorder(32),
}

fmc.ControlPlaneHealthy = true
fmc.EtcdHealthy = true
result, err := r.scaleDownControlPlane(context.Background(), &clusterv1.Cluster{}, &controlplanev1.KubeadmControlPlane{}, fmc.Machines.DeepCopy())
g.Expect(err).ToNot(HaveOccurred())
g.Expect(result).To(Equal(ctrl.Result{Requeue: true}))

controlPlaneMachines := clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), &controlPlaneMachines)).To(Succeed())
g.Expect(controlPlaneMachines.Items).To(HaveLen(1))
})
t.Run("does not delete a control plane Machine if health checks fail", func(t *testing.T) {
cluster, kcp, genericMachineTemplate := createClusterWithControlPlane()
initObjs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), genericMachineTemplate.DeepCopy()}

beforeMachines := internal.NewFilterableMachineCollection()
for i := 0; i < 3; i++ {
m, _ := createMachineNodePair(fmt.Sprintf("test-%d", i), cluster.DeepCopy(), kcp.DeepCopy(), true)
beforeMachines = beforeMachines.Insert(m)
initObjs = append(initObjs, m.DeepCopy())
}

testCases := []struct {
name string
etcdUnHealthy bool
controlPlaneUnHealthy bool
selectedAnnotationKeys []string
}{
{
name: "etcd health check fails",
etcdUnHealthy: true,
selectedAnnotationKeys: []string{
controlplanev1.DeleteForScaleDownAnnotation,
},
},
{
name: "controlplane component health check fails",
controlPlaneUnHealthy: true,
selectedAnnotationKeys: []string{
controlplanev1.DeleteForScaleDownAnnotation,
controlplanev1.ScaleDownEtcdMemberRemovedAnnotation,
},
},
{
name: "both health check fails",
etcdUnHealthy: true,
controlPlaneUnHealthy: true,
selectedAnnotationKeys: []string{
controlplanev1.DeleteForScaleDownAnnotation,
},
},
}
for _, tc := range testCases {
g := NewWithT(t)

fakeClient := newFakeClient(g, initObjs...)
fmc := &fakeManagementCluster{
Machines: beforeMachines.DeepCopy(),
ControlPlaneHealthy: !tc.controlPlaneUnHealthy,
EtcdHealthy: !tc.etcdUnHealthy,
}

r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
managementCluster: fmc,
Log: log.Log,
recorder: record.NewFakeRecorder(32),
}

ownedMachines := fmc.Machines.DeepCopy()
_, err := r.scaleDownControlPlane(context.Background(), cluster.DeepCopy(), kcp.DeepCopy(), ownedMachines)
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError(&capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter}))

controlPlaneMachines := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), controlPlaneMachines)).To(Succeed())
g.Expect(controlPlaneMachines.Items).To(HaveLen(len(beforeMachines)))

// We expect that a machine has been marked for deletion, since that isn't blocked by health checks
endMachines := internal.NewFilterableMachineCollectionFromMachineList(controlPlaneMachines)
selected := endMachines.Filter(internal.HasAnnotationKey(controlplanev1.DeleteForScaleDownAnnotation))
g.Expect(selected).To(HaveLen(1))
for _, m := range selected {
cm := m.DeepCopy()

g.Expect(m.Annotations).To(HaveLen(len(tc.selectedAnnotationKeys)))
for _, key := range tc.selectedAnnotationKeys {
g.Expect(m.Annotations).To(HaveKey(key))
}

// Remove the annotations and resource version to compare against the before copy
cm.Annotations = nil
cm.ResourceVersion = ""
bm, ok := beforeMachines[cm.Name]
g.Expect(ok).To(BeTrue())
g.Expect(cm).To(Equal(bm))
}
ControlPlaneHealthy: true,
},
}

// Ensure the non-selected machine match the before machines exactly
notSelected := endMachines.Filter(internal.Not(internal.HasAnnotationKey(controlplanev1.DeleteForScaleDownAnnotation)))
for _, m := range notSelected {
bm, ok := beforeMachines[m.Name]
g.Expect(ok).To(BeTrue())
g.Expect(m).To(Equal(bm))
}
}
})
_, err := r.scaleDownControlPlane(context.Background(), &clusterv1.Cluster{}, &controlplanev1.KubeadmControlPlane{}, machines)
g.Expect(err).ToNot(HaveOccurred())
}

func TestKubeadmControlPlaneReconciler_upgradeControlPlane(t *testing.T) {
t.Run("upgrades control plane Machines if health checks pass", func(t *testing.T) {
// TODO: add tests for positive condition
})
t.Run("does not upgrade a control plane Machine if health checks fail", func(t *testing.T) {
cluster, kcp, genericMachineTemplate := createClusterWithControlPlane()
kubeletConfigMap := &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "kubelet-config-1.16",
Namespace: metav1.NamespaceSystem,
},
}
initObjs := []runtime.Object{cluster.DeepCopy(), kcp.DeepCopy(), genericMachineTemplate.DeepCopy(), kubeletConfigMap.DeepCopy()}
beforeMachines := internal.NewFilterableMachineCollection()
for i := 0; i < 3; i++ {
m, _ := createMachineNodePair(fmt.Sprintf("test-%d", i), cluster.DeepCopy(), kcp.DeepCopy(), true)
beforeMachines = beforeMachines.Insert(m)
initObjs = append(initObjs, m.DeepCopy())
}

// mutate the kcp resource
kcp.Spec.Version = "v1.16.9"

testCases := []struct {
name string
etcdUnHealthy bool
controlPlaneUnHealthy bool
}{
{
name: "etcd health check fails",
etcdUnHealthy: true,
},
{
name: "controlplane component health check fails",
controlPlaneUnHealthy: true,
},
}
for _, tc := range testCases {
g := NewWithT(t)

fakeClient := newFakeClient(g, initObjs...)
fmc := &fakeManagementCluster{
Machines: beforeMachines.DeepCopy(),
ControlPlaneHealthy: !tc.controlPlaneUnHealthy,
EtcdHealthy: !tc.etcdUnHealthy,
}

r := &KubeadmControlPlaneReconciler{
Client: fakeClient,
managementCluster: fmc,
Log: log.Log,
recorder: record.NewFakeRecorder(32),
remoteClientGetter: fakeremote.NewClusterClient,
}

ownedMachines := fmc.Machines.DeepCopy()
requireUpgrade := fmc.Machines.Filter(internal.OwnedControlPlaneMachines(kcp.Name)).DeepCopy()
_, err := r.upgradeControlPlane(context.Background(), cluster.DeepCopy(), kcp.DeepCopy(), ownedMachines, requireUpgrade)
g.Expect(err).To(HaveOccurred())
g.Expect(err).To(MatchError(&capierrors.RequeueAfterError{RequeueAfter: HealthCheckFailedRequeueAfter}))

controlPlaneMachines := &clusterv1.MachineList{}
g.Expect(fakeClient.List(context.Background(), controlPlaneMachines)).To(Succeed())
g.Expect(controlPlaneMachines.Items).To(HaveLen(len(beforeMachines)))

// We expect that a machine has been selected for upgrade, since that isn't blocked by health checks
endMachines := internal.NewFilterableMachineCollectionFromMachineList(controlPlaneMachines)
selected := endMachines.Filter(internal.HasAnnotationKey(controlplanev1.SelectedForUpgradeAnnotation))
g.Expect(selected).To(HaveLen(1))
for _, m := range selected {
g.Expect(m.Annotations).To(HaveLen(1))
cm := m.DeepCopy()
cm.Annotations = nil
cm.ResourceVersion = ""
bm, ok := beforeMachines[cm.Name]
g.Expect(ok).To(BeTrue())
g.Expect(cm).To(Equal(bm))
}

// We expect that a replacement is not created, since that is blocked by health checks
replacementCreated := endMachines.Filter(internal.HasAnnotationKey(controlplanev1.UpgradeReplacementCreatedAnnotation))
g.Expect(replacementCreated).To(BeEmpty())

// Ensure the non-selected machine match the before machines exactly
notSelected := endMachines.Filter(internal.Not(internal.HasAnnotationKey(controlplanev1.SelectedForUpgradeAnnotation)))
for _, m := range notSelected {
bm, ok := beforeMachines[m.Name]
g.Expect(ok).To(BeTrue())
g.Expect(m).To(Equal(bm))
}
}
})
func machine(name string) *clusterv1.Machine {
return &clusterv1.Machine{
ObjectMeta: metav1.ObjectMeta{
Namespace: "default",
Name: name,
},
}
}

func TestKubeadmControlPlaneReconciler_failureDomainForScaleDown(t *testing.T) {
Expand Down
Loading

0 comments on commit 3d6efa3

Please sign in to comment.