Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ feat: accept resource mutators in Move operation #7966

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v1beta1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
// ClusterFinalizer is the finalizer used by the cluster controller to
// cleanup the cluster resources when a Cluster is being deleted.
ClusterFinalizer = "cluster.cluster.x-k8s.io"

// ClusterKind represents the Kind of Cluster.
ClusterKind = "Cluster"
)

// ANCHOR: ClusterSpec
Expand Down
3 changes: 3 additions & 0 deletions api/v1beta1/clusterclass_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)

// ClusterClassKind represents the Kind of ClusterClass.
const ClusterClassKind = "ClusterClass"

// +kubebuilder:object:root=true
// +kubebuilder:resource:path=clusterclasses,shortName=cc,scope=Namespaced,categories=cluster-api
// +kubebuilder:storageversion
Expand Down
87 changes: 59 additions & 28 deletions cmd/clusterctl/client/cluster/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ import (
"sigs.k8s.io/cluster-api/util/yaml"
)

// ResourceMutatorFunc holds the type for mutators to be applied on resources during a move operation.
type ResourceMutatorFunc func(u *unstructured.Unstructured) error

// ObjectMover defines methods for moving Cluster API objects to another management cluster.
type ObjectMover interface {
// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
Move(namespace string, toCluster Client, dryRun bool) error
Move(namespace string, toCluster Client, dryRun bool, mutators ...ResourceMutatorFunc) error

// ToDirectory writes all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target directory.
ToDirectory(namespace string, directory string) error
Expand All @@ -64,7 +67,7 @@ type objectMover struct {
// ensure objectMover implements the ObjectMover interface.
var _ ObjectMover = &objectMover{}

func (o *objectMover) Move(namespace string, toCluster Client, dryRun bool) error {
func (o *objectMover) Move(namespace string, toCluster Client, dryRun bool, mutators ...ResourceMutatorFunc) error {
log := logf.Log
log.Info("Performing move...")
o.dryRun = dryRun
Expand Down Expand Up @@ -92,7 +95,7 @@ func (o *objectMover) Move(namespace string, toCluster Client, dryRun bool) erro
proxy = toCluster.Proxy()
}

return o.move(objectGraph, proxy)
return o.move(objectGraph, proxy, mutators...)
}

func (o *objectMover) ToDirectory(namespace string, directory string) error {
Expand Down Expand Up @@ -309,7 +312,7 @@ func getMachineObj(proxy Proxy, machine *node, machineObj *clusterv1.Machine) er
}

// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
func (o *objectMover) move(graph *objectGraph, toProxy Proxy, mutators ...ResourceMutatorFunc) error {
log := logf.Log

clusters := graph.getClusters()
Expand Down Expand Up @@ -345,7 +348,7 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
// Create all objects group by group, ensuring all the ownerReferences are re-created.
log.Info("Creating objects in the target cluster")
for groupIndex := 0; groupIndex < len(moveSequence.groups); groupIndex++ {
if err := o.createGroup(moveSequence.getGroup(groupIndex), toProxy); err != nil {
if err := o.createGroup(moveSequence.getGroup(groupIndex), toProxy, mutators...); err != nil {
return err
}
}
Expand All @@ -360,13 +363,13 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {

// Resume the ClusterClasses in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the target ClusterClasses")
if err := setClusterClassPause(toProxy, clusterClasses, false, o.dryRun); err != nil {
if err := setClusterClassPause(toProxy, clusterClasses, false, o.dryRun, mutators...); err != nil {
return errors.Wrap(err, "error resuming ClusterClasses")
}

// Reset the pause field on the Cluster object in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the target cluster")
return setClusterPause(toProxy, clusters, false, o.dryRun)
return setClusterPause(toProxy, clusters, false, o.dryRun, mutators...)
}

func (o *objectMover) toDirectory(graph *objectGraph, directory string) error {
Expand Down Expand Up @@ -533,7 +536,7 @@ func getMoveSequence(graph *objectGraph) *moveSequence {
}

// setClusterPause sets the paused field on nodes referring to Cluster objects.
func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) error {
func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
if dryRun {
return nil
}
Expand All @@ -554,7 +557,7 @@ func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) err

// Nb. The operation is wrapped in a retry loop to make setClusterPause more resilient to unexpected conditions.
if err := retryWithExponentialBackoff(setClusterPauseBackoff, func() error {
return patchCluster(proxy, cluster, patch)
return patchCluster(proxy, cluster, patch, mutators...)
}); err != nil {
return errors.Wrapf(err, "error setting Cluster.Spec.Paused=%t", value)
}
Expand All @@ -563,7 +566,7 @@ func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) err
}

// setClusterClassPause sets the paused annotation on nodes referring to ClusterClass objects.
func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRun bool) error {
func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRun bool, mutators ...ResourceMutatorFunc) error {
if dryRun {
return nil
}
Expand All @@ -581,7 +584,7 @@ func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRu

// Nb. The operation is wrapped in a retry loop to make setClusterClassPause more resilient to unexpected conditions.
if err := retryWithExponentialBackoff(setClusterClassPauseBackoff, func() error {
return pauseClusterClass(proxy, clusterclass, pause)
return pauseClusterClass(proxy, clusterclass, pause, mutators...)
}); err != nil {
return errors.Wrapf(err, "error updating ClusterClass %s/%s", clusterclass.identity.Namespace, clusterclass.identity.Name)
}
Expand All @@ -590,19 +593,26 @@ func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRu
}

// patchCluster applies a patch to a node referring to a Cluster object.
func patchCluster(proxy Proxy, cluster *node, patch client.Patch) error {
func patchCluster(proxy Proxy, n *node, patch client.Patch, mutators ...ResourceMutatorFunc) error {
cFrom, err := proxy.NewClient()
if err != nil {
return err
}

clusterObj := &clusterv1.Cluster{}
clusterObjKey := client.ObjectKey{
Namespace: cluster.identity.Namespace,
Name: cluster.identity.Name,
// Get the ClusterClass from the server
takirala marked this conversation as resolved.
Show resolved Hide resolved
clusterObj := &unstructured.Unstructured{}
clusterObj.SetAPIVersion(clusterv1.GroupVersion.String())
clusterObj.SetKind(clusterv1.ClusterKind)
clusterObj.SetName(n.identity.Name)
clusterObj.SetNamespace(n.identity.Namespace)
for _, mutator := range mutators {
if err = mutator(clusterObj); err != nil {
return errors.Wrapf(err, "error applying resource mutator to %q %s/%s",
clusterObj.GroupVersionKind(), clusterObj.GetNamespace(), clusterObj.GetName())
}
takirala marked this conversation as resolved.
Show resolved Hide resolved
}

if err := cFrom.Get(ctx, clusterObjKey, clusterObj); err != nil {
if err := cFrom.Get(ctx, client.ObjectKeyFromObject(clusterObj), clusterObj); err != nil {
return errors.Wrapf(err, "error reading Cluster %s/%s",
clusterObj.GetNamespace(), clusterObj.GetName())
}
Expand All @@ -615,19 +625,25 @@ func patchCluster(proxy Proxy, cluster *node, patch client.Patch) error {
return nil
}

func pauseClusterClass(proxy Proxy, n *node, pause bool) error {
func pauseClusterClass(proxy Proxy, n *node, pause bool, mutators ...ResourceMutatorFunc) error {
takirala marked this conversation as resolved.
Show resolved Hide resolved
cFrom, err := proxy.NewClient()
if err != nil {
return errors.Wrap(err, "error creating client")
}

// Get the ClusterClass from the server
clusterClass := &clusterv1.ClusterClass{}
clusterClassObjKey := client.ObjectKey{
Name: n.identity.Name,
Namespace: n.identity.Namespace,
clusterClass := &unstructured.Unstructured{}
clusterClass.SetAPIVersion(clusterv1.GroupVersion.String())
clusterClass.SetKind(clusterv1.ClusterClassKind)
clusterClass.SetName(n.identity.Name)
clusterClass.SetNamespace(n.identity.Namespace)
for _, mutator := range mutators {
if err = mutator(clusterClass); err != nil {
return errors.Wrapf(err, "error applying resource mutator to %q %s/%s",
clusterClass.GroupVersionKind(), clusterClass.GetNamespace(), clusterClass.GetName())
}
}
if err := cFrom.Get(ctx, clusterClassObjKey, clusterClass); err != nil {
if err := cFrom.Get(ctx, client.ObjectKeyFromObject(clusterClass), clusterClass); err != nil {
return errors.Wrapf(err, "error reading ClusterClass %s/%s", n.identity.Namespace, n.identity.Name)
}

Expand Down Expand Up @@ -740,7 +756,7 @@ func (o *objectMover) ensureNamespace(toProxy Proxy, namespace string) error {
return err
}

// If the namespace does not exists, create it.
// If the namespace does not exist, create it.
ns = &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand All @@ -758,15 +774,18 @@ func (o *objectMover) ensureNamespace(toProxy Proxy, namespace string) error {
}

// createGroup creates all the Kubernetes objects into the target management cluster corresponding to the object graph nodes in a moveGroup.
func (o *objectMover) createGroup(group moveGroup, toProxy Proxy) error {
func (o *objectMover) createGroup(group moveGroup, toProxy Proxy, mutators ...ResourceMutatorFunc) error {
createTargetObjectBackoff := newWriteBackoff()
errList := []error{}

// Maintain a cache of namespaces that have been verified to already exist.
// Nb. This prevents us from making repetitive (and expensive) calls in listing all namespaces to ensure a namespace exists before creating a resource.
existingNamespaces := sets.New[string]()
takirala marked this conversation as resolved.
Show resolved Hide resolved
for _, nodeToCreate := range group {
// Creates the Kubernetes object corresponding to the nodeToCreate.
// Nb. The operation is wrapped in a retry loop to make move more resilient to unexpected conditions.
err := retryWithExponentialBackoff(createTargetObjectBackoff, func() error {
return o.createTargetObject(nodeToCreate, toProxy)
return o.createTargetObject(nodeToCreate, toProxy, mutators, existingNamespaces)
})
if err != nil {
errList = append(errList, err)
Expand Down Expand Up @@ -825,7 +844,7 @@ func (o *objectMover) restoreGroup(group moveGroup, toProxy Proxy) error {
}

// createTargetObject creates the Kubernetes object in the target Management cluster corresponding to the object graph node, taking care of restoring the OwnerReference with the owner nodes, if any.
func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) error {
func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy, mutators []ResourceMutatorFunc, existingNamespaces sets.Set[string]) error {
log := logf.Log
log.V(1).Info("Creating", nodeToCreate.identity.Kind, nodeToCreate.identity.Name, "Namespace", nodeToCreate.identity.Namespace)

Expand Down Expand Up @@ -858,7 +877,7 @@ func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) erro
// Removes current OwnerReferences
obj.SetOwnerReferences(nil)

// Rebuild the owne reference chain
// Rebuild the owner reference chain
o.buildOwnerChain(obj, nodeToCreate)

// FIXME Workaround for https://github.com/kubernetes/kubernetes/issues/32220. Remove when the issue is fixed.
Expand All @@ -873,6 +892,18 @@ func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) erro
return err
}

for _, mutator := range mutators {
if err = mutator(obj); err != nil {
return errors.Wrapf(err, "error applying resource mutator to %q %s/%s",
obj.GroupVersionKind(), obj.GetNamespace(), obj.GetName())
}
}
// Applying mutators MAY change the namespace, so ensure the namespace exists before creating the resource.
if !nodeToCreate.isGlobal && !existingNamespaces.Has(obj.GetNamespace()) {
if err = o.ensureNamespace(toProxy, obj.GetNamespace()); err != nil {
return err
}
}
oldManagedFields := obj.GetManagedFields()
if err := cTo.Create(ctx, obj); err != nil {
if !apierrors.IsAlreadyExists(err) {
Expand Down
72 changes: 69 additions & 3 deletions cmd/clusterctl/client/cluster/mover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cluster

import (
"fmt"
"math/rand"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -1112,7 +1113,7 @@ func Test_objectMover_move_dryRun(t *testing.T) {
dryRun: true,
}

err := mover.move(graph, toProxy)
err := mover.move(graph, toProxy, nil)
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
return
Expand Down Expand Up @@ -1163,9 +1164,45 @@ func Test_objectMover_move_dryRun(t *testing.T) {

func Test_objectMover_move(t *testing.T) {
// NB. we are testing the move and move sequence using the same set of moveTests, but checking the results at different stages of the move process
// we use same mutator function for all tests and validate outcome based on input.
randSource := rand.NewSource(time.Now().Unix())
for _, tt := range moveTests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)
toNamespace := "foobar"
updateKnownKinds := map[string][][]string{
"Cluster": {
{"metadata", "namespace"},
{"spec", "controlPlaneRef", "namespace"},
{"spec", "infrastructureRef", "namespace"},
{"unknown", "field", "does", "not", "cause", "errors"},
},
"KubeadmControlPlane": {
{"spec", "machineTemplate", "infrastructureRef", "namespace"},
},
"Machine": {
{"spec", "bootstrap", "configRef", "namespace"},
{"spec", "infrastructureRef", "namespace"},
},
}
var namespaceMutator ResourceMutatorFunc = func(u *unstructured.Unstructured) error {
if u == nil || u.Object == nil {
return nil
}
if u.GetNamespace() != "" {
u.SetNamespace(toNamespace)
}
if fields, knownKind := updateKnownKinds[u.GetKind()]; knownKind {
for _, nsField := range fields {
_, exists, err := unstructured.NestedFieldNoCopy(u.Object, nsField...)
g.Expect(err).To(BeNil())
if exists {
g.Expect(unstructured.SetNestedField(u.Object, toNamespace, nsField...)).To(Succeed())
}
}
}
return nil
}

// Create an objectGraph bound a source cluster with all the CRDs for the types involved in the test.
graph := getObjectGraphWithObjs(tt.fields.objs)
Expand All @@ -1184,7 +1221,15 @@ func Test_objectMover_move(t *testing.T) {
fromProxy: graph.proxy,
}

err := mover.move(graph, toProxy)
// choose to include/exclude mutator randomly
includeMutator := randSource.Int63()%2 == 0
takirala marked this conversation as resolved.
Show resolved Hide resolved
var mutators []ResourceMutatorFunc
if includeMutator {
mutators = append(mutators, namespaceMutator)
}

err := mover.move(graph, toProxy, mutators...)

if tt.wantErr {
g.Expect(err).To(HaveOccurred())
return
Expand Down Expand Up @@ -1225,11 +1270,27 @@ func Test_objectMover_move(t *testing.T) {
oTo := &unstructured.Unstructured{}
oTo.SetAPIVersion(node.identity.APIVersion)
oTo.SetKind(node.identity.Kind)
if includeMutator {
if key.Namespace != "" {
key.Namespace = toNamespace
}
}

if err := csTo.Get(ctx, key, oTo); err != nil {
t.Errorf("error = %v when checking for %v created in target cluster", err, key)
continue
}
if includeMutator {
if fields, knownKind := updateKnownKinds[oTo.GetKind()]; knownKind {
for _, nsField := range fields {
value, exists, err := unstructured.NestedFieldNoCopy(oTo.Object, nsField...)
g.Expect(err).To(BeNil())
if exists {
g.Expect(value).To(Equal(toNamespace))
}
}
}
}
}
})
}
Expand Down Expand Up @@ -1797,6 +1858,11 @@ func Test_createTargetObject(t *testing.T) {
},
},
want: func(g *WithT, toClient client.Client) {
ns := &corev1.Namespace{}
nsKey := client.ObjectKey{
Name: "ns1",
}
g.Expect(toClient.Get(ctx, nsKey, ns)).To(Succeed())
c := &clusterv1.Cluster{}
key := client.ObjectKey{
Namespace: "ns1",
Expand Down Expand Up @@ -1932,7 +1998,7 @@ func Test_createTargetObject(t *testing.T) {
fromProxy: tt.args.fromProxy,
}

err := mover.createTargetObject(tt.args.node, tt.args.toProxy)
err := mover.createTargetObject(tt.args.node, tt.args.toProxy, nil, nil)
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
return
Expand Down
Loading