Skip to content

Commit

Permalink
Merge pull request #8922 from sbueringer/pr-cluster-topology-md-list
Browse files Browse the repository at this point in the history
🌱 cluster/topology: use cached MD list in get current state
  • Loading branch information
k8s-ci-robot authored Jun 27, 2023
2 parents aae328c + 6335950 commit c91e481
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 8 deletions.
5 changes: 4 additions & 1 deletion internal/controllers/topology/cluster/current_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,11 @@ func (r *Reconciler) getCurrentMachineDeploymentState(ctx context.Context, bluep
state := make(scope.MachineDeploymentsStateMap)

// List all the machine deployments in the current cluster and in a managed topology.
// Note: This is a cached list call. We ensure in reconcile_state that the cache is up-to-date
// after we create/update a MachineDeployment and we double-check if an MD already exists before
// we create it.
md := &clusterv1.MachineDeploymentList{}
err := r.APIReader.List(ctx, md,
err := r.Client.List(ctx, md,
client.MatchingLabels{
clusterv1.ClusterNameLabel: cluster.Name,
clusterv1.ClusterTopologyOwnedLabel: "",
Expand Down
88 changes: 85 additions & 3 deletions internal/controllers/topology/cluster/reconcile_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage/names"
"k8s.io/klog/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -446,11 +449,29 @@ func (r *Reconciler) reconcileMachineDeployments(ctx context.Context, s *scope.S
diff := calculateMachineDeploymentDiff(s.Current.MachineDeployments, s.Desired.MachineDeployments)

// Create MachineDeployments.
for _, mdTopologyName := range diff.toCreate {
md := s.Desired.MachineDeployments[mdTopologyName]
if err := r.createMachineDeployment(ctx, s, md); err != nil {
if len(diff.toCreate) > 0 {
// In current state we only got the MD list via a cached call.
// As a consequence, in order to prevent the creation of duplicate MD due to stale reads,
// we are now using a live client to double-check here that the MachineDeployment
// to be created doesn't exist yet.
currentMDTopologyNames, err := r.getCurrentMachineDeployments(ctx, s)
if err != nil {
return err
}
for _, mdTopologyName := range diff.toCreate {
md := s.Desired.MachineDeployments[mdTopologyName]

// Skip the MD creation if the MD already exists.
if currentMDTopologyNames.Has(mdTopologyName) {
log := tlog.LoggerFrom(ctx).WithMachineDeployment(md.Object)
log.V(3).Infof(fmt.Sprintf("Skipping creation of MachineDeployment %s because MachineDeployment for topology %s already exists (only considered creation because of stale cache)", tlog.KObj{Obj: md.Object}, mdTopologyName))
continue
}

if err := r.createMachineDeployment(ctx, s, md); err != nil {
return err
}
}
}

// Update MachineDeployments.
Expand All @@ -472,6 +493,32 @@ func (r *Reconciler) reconcileMachineDeployments(ctx context.Context, s *scope.S
return nil
}

// getCurrentMachineDeployments gets the current list of MachineDeployments via the APIReader.
func (r *Reconciler) getCurrentMachineDeployments(ctx context.Context, s *scope.Scope) (sets.Set[string], error) {
// TODO: We should consider using PartialObjectMetadataList here. Currently this doesn't work as our
// implementation for topology dryrun doesn't support PartialObjectMetadataList.
mdList := &clusterv1.MachineDeploymentList{}
err := r.APIReader.List(ctx, mdList,
client.MatchingLabels{
clusterv1.ClusterNameLabel: s.Current.Cluster.Name,
clusterv1.ClusterTopologyOwnedLabel: "",
},
client.InNamespace(s.Current.Cluster.Namespace),
)
if err != nil {
return nil, errors.Wrap(err, "failed to read MachineDeployments for managed topology")
}

currentMDs := sets.Set[string]{}
for _, md := range mdList.Items {
mdTopologyName, ok := md.ObjectMeta.Labels[clusterv1.ClusterTopologyMachineDeploymentNameLabel]
if ok || mdTopologyName != "" {
currentMDs.Insert(mdTopologyName)
}
}
return currentMDs, nil
}

// createMachineDeployment creates a MachineDeployment and the corresponding Templates.
func (r *Reconciler) createMachineDeployment(ctx context.Context, s *scope.Scope, md *scope.MachineDeploymentState) error {
// Do not create the MachineDeployment if it is marked as pending create.
Expand Down Expand Up @@ -517,6 +564,23 @@ func (r *Reconciler) createMachineDeployment(ctx context.Context, s *scope.Scope
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, createEventReason, "Created %q", tlog.KObj{Obj: md.Object})

// Wait until MachineDeployment is visible in the cache.
// Note: We have to do this because otherwise using a cached client in current state could
// miss a newly created MachineDeployment (because the cache might be stale).
err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
key := client.ObjectKey{Namespace: md.Object.Namespace, Name: md.Object.Name}
if err := r.Client.Get(ctx, key, &clusterv1.MachineDeployment{}); err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
return true, nil
})
if err != nil {
return errors.Wrapf(err, "failed to create %s: failed waiting for MachineDeployment to be visible in cache", md.Object.Kind)
}

// If the MachineDeployment has defined a MachineHealthCheck reconcile it.
if md.MachineHealthCheck != nil {
if err := r.reconcileMachineHealthCheck(ctx, nil, md.MachineHealthCheck); err != nil {
Expand Down Expand Up @@ -588,6 +652,24 @@ func (r *Reconciler) updateMachineDeployment(ctx context.Context, s *scope.Scope
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, updateEventReason, "Updated %q%s", tlog.KObj{Obj: currentMD.Object}, logMachineDeploymentVersionChange(currentMD.Object, desiredMD.Object))

// Wait until MachineDeployment is updated in the cache.
// Note: We have to do this because otherwise using a cached client in current state could
// return a stale state of a MachineDeployment we just patched (because the cache might be stale).
// Note: It is good enough to check that the resource version changed. Other controllers might have updated the
// MachineDeployment as well, but the combination of the patch call above without a conflict and a changed resource
// version here guarantees that we see the changes of our own update.
err = wait.PollUntilContextTimeout(ctx, 5*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
key := client.ObjectKey{Namespace: currentMD.Object.GetNamespace(), Name: currentMD.Object.GetName()}
cachedMD := &clusterv1.MachineDeployment{}
if err := r.Client.Get(ctx, key, cachedMD); err != nil {
return false, err
}
return currentMD.Object.GetResourceVersion() != cachedMD.GetResourceVersion(), nil
})
if err != nil {
return errors.Wrapf(err, "failed to patch %s: failed waiting for MachineDeployment to be updated in cache", tlog.KObj{Obj: currentMD.Object})
}

// We want to call both cleanup functions even if one of them fails to clean up as much as possible.
return nil
}
Expand Down
32 changes: 28 additions & 4 deletions internal/controllers/topology/cluster/reconcile_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1778,6 +1778,7 @@ func TestReconcileMachineDeployments(t *testing.T) {
tests := []struct {
name string
current []*scope.MachineDeploymentState
currentOnlyAPIServer []*scope.MachineDeploymentState
desired []*scope.MachineDeploymentState
upgradeTracker *scope.UpgradeTracker
want []*scope.MachineDeploymentState
Expand All @@ -1792,6 +1793,14 @@ func TestReconcileMachineDeployments(t *testing.T) {
want: []*scope.MachineDeploymentState{md1},
wantErr: false,
},
{
name: "Should skip creating desired MachineDeployment if it already exists in the apiserver (even if it is not in current state)",
current: nil,
currentOnlyAPIServer: []*scope.MachineDeploymentState{md1},
desired: []*scope.MachineDeploymentState{md1},
want: []*scope.MachineDeploymentState{md1},
wantErr: false,
},
{
name: "Should not create desired MachineDeployment if the current does not exists yet and it marked as pending create",
current: nil,
Expand Down Expand Up @@ -1916,17 +1925,28 @@ func TestReconcileMachineDeployments(t *testing.T) {
}

currentMachineDeploymentStates := toMachineDeploymentTopologyStateMap(tt.current)
s := scope.New(builder.Cluster(metav1.NamespaceDefault, "cluster-1").Build())
s := scope.New(builder.Cluster(namespace.GetName(), "cluster-1").Build())
s.Current.MachineDeployments = currentMachineDeploymentStates

// currentOnlyAPIServer MDs only exist in the APIserver but are not part of s.Current.
// This simulates that getCurrentMachineDeploymentState in current_state.go read a stale MD list.
for _, s := range tt.currentOnlyAPIServer {
mdState := prepareMachineDeploymentState(s, namespace.GetName())

g.Expect(env.PatchAndWait(ctx, mdState.InfrastructureMachineTemplate, client.ForceOwnership, client.FieldOwner(structuredmerge.TopologyManagerName))).To(Succeed())
g.Expect(env.PatchAndWait(ctx, mdState.BootstrapTemplate, client.ForceOwnership, client.FieldOwner(structuredmerge.TopologyManagerName))).To(Succeed())
g.Expect(env.PatchAndWait(ctx, mdState.Object, client.ForceOwnership, client.FieldOwner(structuredmerge.TopologyManagerName))).To(Succeed())
}

s.Desired = &scope.ClusterState{MachineDeployments: toMachineDeploymentTopologyStateMap(tt.desired)}

if tt.upgradeTracker != nil {
s.UpgradeTracker = tt.upgradeTracker
}

r := Reconciler{
Client: env,
Client: env.GetClient(),
APIReader: env.GetAPIReader(),
patchHelperFactory: serverSideApplyPatchHelperFactory(env, ssa.NewCache()),
recorder: env.GetEventRecorderFor("test"),
}
Expand Down Expand Up @@ -2676,7 +2696,8 @@ func TestReconcileMachineDeploymentMachineHealthCheck(t *testing.T) {
s.Desired = &scope.ClusterState{MachineDeployments: toMachineDeploymentTopologyStateMap(tt.desired)}

r := Reconciler{
Client: env,
Client: env.GetClient(),
APIReader: env.GetAPIReader(),
patchHelperFactory: serverSideApplyPatchHelperFactory(env, ssa.NewCache()),
recorder: env.GetEventRecorderFor("test"),
}
Expand Down Expand Up @@ -2712,7 +2733,10 @@ func newFakeMachineDeploymentTopologyState(name string, infrastructureMachineTem
Object: builder.MachineDeployment(metav1.NamespaceDefault, name).
WithInfrastructureTemplate(infrastructureMachineTemplate).
WithBootstrapTemplate(bootstrapTemplate).
WithLabels(map[string]string{clusterv1.ClusterTopologyMachineDeploymentNameLabel: name + "-topology"}).
WithLabels(map[string]string{
clusterv1.ClusterTopologyMachineDeploymentNameLabel: name + "-topology",
clusterv1.ClusterTopologyOwnedLabel: "",
}).
WithClusterName("cluster-1").
WithReplicas(1).
WithDefaulter(true).
Expand Down

0 comments on commit c91e481

Please sign in to comment.