Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ feat: accept resource mutators in Move operation #7966

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/v1beta1/cluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ const (
// ClusterFinalizer is the finalizer used by the cluster controller to
// cleanup the cluster resources when a Cluster is being deleted.
ClusterFinalizer = "cluster.cluster.x-k8s.io"

// KindCluster represents the Kind of Cluster.
KindCluster = "Cluster"
takirala marked this conversation as resolved.
Show resolved Hide resolved
)

// ANCHOR: ClusterSpec
Expand Down
3 changes: 3 additions & 0 deletions api/v1beta1/clusterclass_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
)

// KindClusterClass represents the Kind of ClusterClass.
const KindClusterClass = "ClusterClass"
takirala marked this conversation as resolved.
Show resolved Hide resolved

// +kubebuilder:object:root=true
// +kubebuilder:resource:path=clusterclasses,shortName=cc,scope=Namespaced,categories=cluster-api
// +kubebuilder:storageversion
Expand Down
96 changes: 59 additions & 37 deletions cmd/clusterctl/client/cluster/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ import (
"sigs.k8s.io/cluster-api/util/yaml"
)

// ResourceMutatorFunc holds the type for mutators to be applied on resources during a move operation.
type ResourceMutatorFunc func(u *unstructured.Unstructured)

// ObjectMover defines methods for moving Cluster API objects to another management cluster.
type ObjectMover interface {
// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
Move(namespace string, toCluster Client, dryRun bool) error
Move(namespace string, mutators []ResourceMutatorFunc, toCluster Client, dryRun bool) error
fabriziopandini marked this conversation as resolved.
Show resolved Hide resolved
takirala marked this conversation as resolved.
Show resolved Hide resolved

// ToDirectory writes all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target directory.
ToDirectory(namespace string, directory string) error
Expand All @@ -64,7 +67,7 @@ type objectMover struct {
// ensure objectMover implements the ObjectMover interface.
var _ ObjectMover = &objectMover{}

func (o *objectMover) Move(namespace string, toCluster Client, dryRun bool) error {
func (o *objectMover) Move(namespace string, mutators []ResourceMutatorFunc, toCluster Client, dryRun bool) error {
log := logf.Log
log.Info("Performing move...")
o.dryRun = dryRun
Expand Down Expand Up @@ -92,7 +95,7 @@ func (o *objectMover) Move(namespace string, toCluster Client, dryRun bool) erro
proxy = toCluster.Proxy()
}

return o.move(objectGraph, proxy)
return o.move(objectGraph, proxy, mutators)
}

func (o *objectMover) ToDirectory(namespace string, directory string) error {
Expand Down Expand Up @@ -309,7 +312,7 @@ func getMachineObj(proxy Proxy, machine *node, machineObj *clusterv1.Machine) er
}

// Move moves all the Cluster API objects existing in a namespace (or from all the namespaces if empty) to a target management cluster.
func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
func (o *objectMover) move(graph *objectGraph, toProxy Proxy, mutators []ResourceMutatorFunc) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep the signature consistent

Suggested change
func (o *objectMover) move(graph *objectGraph, toProxy Proxy, mutators []ResourceMutatorFunc) error {
func (o *objectMover) move(graph *objectGraph, toProxy Proxy, mutators ...ResourceMutatorFunc) error {

log := logf.Log

clusters := graph.getClusters()
Expand All @@ -320,12 +323,12 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {

// Sets the pause field on the Cluster object in the source management cluster, so the controllers stop reconciling it.
log.V(1).Info("Pausing the source cluster")
if err := setClusterPause(o.fromProxy, clusters, true, o.dryRun); err != nil {
if err := setClusterPause(o.fromProxy, clusters, nil, true, o.dryRun); err != nil {
return err
}

log.V(1).Info("Pausing the source ClusterClasses")
if err := setClusterClassPause(o.fromProxy, clusterClasses, true, o.dryRun); err != nil {
if err := setClusterClassPause(o.fromProxy, clusterClasses, nil, true, o.dryRun); err != nil {
return errors.Wrap(err, "error pausing ClusterClasses")
}

Expand All @@ -345,7 +348,7 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {
// Create all objects group by group, ensuring all the ownerReferences are re-created.
log.Info("Creating objects in the target cluster")
for groupIndex := 0; groupIndex < len(moveSequence.groups); groupIndex++ {
if err := o.createGroup(moveSequence.getGroup(groupIndex), toProxy); err != nil {
if err := o.createGroup(moveSequence.getGroup(groupIndex), toProxy, mutators); err != nil {
return err
}
}
Expand All @@ -360,13 +363,13 @@ func (o *objectMover) move(graph *objectGraph, toProxy Proxy) error {

// Resume the ClusterClasses in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the target ClusterClasses")
if err := setClusterClassPause(toProxy, clusterClasses, false, o.dryRun); err != nil {
if err := setClusterClassPause(toProxy, clusterClasses, mutators, false, o.dryRun); err != nil {
return errors.Wrap(err, "error resuming ClusterClasses")
}

// Reset the pause field on the Cluster object in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the target cluster")
return setClusterPause(toProxy, clusters, false, o.dryRun)
return setClusterPause(toProxy, clusters, mutators, false, o.dryRun)
}

func (o *objectMover) toDirectory(graph *objectGraph, directory string) error {
Expand All @@ -380,12 +383,12 @@ func (o *objectMover) toDirectory(graph *objectGraph, directory string) error {

// Sets the pause field on the Cluster object in the source management cluster, so the controllers stop reconciling it.
log.V(1).Info("Pausing the source cluster")
if err := setClusterPause(o.fromProxy, clusters, true, o.dryRun); err != nil {
if err := setClusterPause(o.fromProxy, clusters, nil, true, o.dryRun); err != nil {
return err
}

log.V(1).Info("Pausing the source ClusterClasses")
if err := setClusterClassPause(o.fromProxy, clusterClasses, true, o.dryRun); err != nil {
if err := setClusterClassPause(o.fromProxy, clusterClasses, nil, true, o.dryRun); err != nil {
return errors.Wrap(err, "error pausing ClusterClasses")
}

Expand All @@ -406,13 +409,13 @@ func (o *objectMover) toDirectory(graph *objectGraph, directory string) error {

// Resume the ClusterClasses in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the target ClusterClasses")
if err := setClusterClassPause(o.fromProxy, clusterClasses, false, o.dryRun); err != nil {
if err := setClusterClassPause(o.fromProxy, clusterClasses, nil, false, o.dryRun); err != nil {
return errors.Wrap(err, "error resuming ClusterClasses")
}

// Reset the pause field on the Cluster object in the target management cluster, so the controllers start reconciling it.
log.V(1).Info("Resuming the source cluster")
return setClusterPause(o.fromProxy, clusters, false, o.dryRun)
return setClusterPause(o.fromProxy, clusters, nil, false, o.dryRun)
}

func (o *objectMover) fromDirectory(graph *objectGraph, toProxy Proxy) error {
Expand Down Expand Up @@ -447,14 +450,14 @@ func (o *objectMover) fromDirectory(graph *objectGraph, toProxy Proxy) error {
// Resume reconciling the ClusterClasses after being restored from a backup.
// By default, during backup, ClusterClasses are paused so they must be unpaused to be used again
log.V(1).Info("Resuming the target ClusterClasses")
if err := setClusterClassPause(toProxy, clusterClasses, false, o.dryRun); err != nil {
if err := setClusterClassPause(toProxy, clusterClasses, nil, false, o.dryRun); err != nil {
return errors.Wrap(err, "error resuming ClusterClasses")
}

// Resume reconciling the Clusters after being restored from a directory.
// By default, when moved to a directory, Clusters are paused, so they must be unpaused to be used again.
log.V(1).Info("Resuming the target cluster")
return setClusterPause(toProxy, clusters, false, o.dryRun)
return setClusterPause(toProxy, clusters, nil, false, o.dryRun)
}

// moveSequence defines a list of group of moveGroups.
Expand Down Expand Up @@ -533,7 +536,7 @@ func getMoveSequence(graph *objectGraph) *moveSequence {
}

// setClusterPause sets the paused field on nodes referring to Cluster objects.
func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) error {
func setClusterPause(proxy Proxy, clusters []*node, mutators []ResourceMutatorFunc, value bool, dryRun bool) error {
takirala marked this conversation as resolved.
Show resolved Hide resolved
if dryRun {
return nil
}
Expand All @@ -554,7 +557,7 @@ func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) err

// Nb. The operation is wrapped in a retry loop to make setClusterPause more resilient to unexpected conditions.
if err := retryWithExponentialBackoff(setClusterPauseBackoff, func() error {
return patchCluster(proxy, cluster, patch)
return patchCluster(proxy, cluster, patch, mutators)
}); err != nil {
return errors.Wrapf(err, "error setting Cluster.Spec.Paused=%t", value)
}
Expand All @@ -563,7 +566,7 @@ func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool) err
}

// setClusterClassPause sets the paused annotation on nodes referring to ClusterClass objects.
func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRun bool) error {
func setClusterClassPause(proxy Proxy, clusterclasses []*node, mutators []ResourceMutatorFunc, pause bool, dryRun bool) error {
takirala marked this conversation as resolved.
Show resolved Hide resolved
if dryRun {
return nil
}
Expand All @@ -581,7 +584,7 @@ func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRu

// Nb. The operation is wrapped in a retry loop to make setClusterClassPause more resilient to unexpected conditions.
if err := retryWithExponentialBackoff(setClusterClassPauseBackoff, func() error {
return pauseClusterClass(proxy, clusterclass, pause)
return pauseClusterClass(proxy, clusterclass, pause, mutators)
}); err != nil {
return errors.Wrapf(err, "error updating ClusterClass %s/%s", clusterclass.identity.Namespace, clusterclass.identity.Name)
}
Expand All @@ -590,19 +593,23 @@ func setClusterClassPause(proxy Proxy, clusterclasses []*node, pause bool, dryRu
}

// patchCluster applies a patch to a node referring to a Cluster object.
func patchCluster(proxy Proxy, cluster *node, patch client.Patch) error {
func patchCluster(proxy Proxy, n *node, patch client.Patch, mutators []ResourceMutatorFunc) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To keep the signature consistent

Suggested change
func patchCluster(proxy Proxy, n *node, patch client.Patch, mutators []ResourceMutatorFunc) error {
func patchCluster(proxy Proxy, n *node, patch client.Patch, mutators ...ResourceMutatorFunc) error {

cFrom, err := proxy.NewClient()
if err != nil {
return err
}

clusterObj := &clusterv1.Cluster{}
clusterObjKey := client.ObjectKey{
Namespace: cluster.identity.Namespace,
Name: cluster.identity.Name,
// Get the ClusterClass from the server
takirala marked this conversation as resolved.
Show resolved Hide resolved
clusterObj := &unstructured.Unstructured{}
clusterObj.SetAPIVersion(clusterv1.GroupVersion.String())
clusterObj.SetKind(clusterv1.KindCluster)
clusterObj.SetName(n.identity.Name)
clusterObj.SetNamespace(n.identity.Namespace)
for _, mutator := range mutators {
mutator(clusterObj)
}

if err := cFrom.Get(ctx, clusterObjKey, clusterObj); err != nil {
if err := cFrom.Get(ctx, client.ObjectKeyFromObject(clusterObj), clusterObj); err != nil {
return errors.Wrapf(err, "error reading Cluster %s/%s",
clusterObj.GetNamespace(), clusterObj.GetName())
}
Expand All @@ -615,19 +622,22 @@ func patchCluster(proxy Proxy, cluster *node, patch client.Patch) error {
return nil
}

func pauseClusterClass(proxy Proxy, n *node, pause bool) error {
func pauseClusterClass(proxy Proxy, n *node, pause bool, mutators []ResourceMutatorFunc) error {
takirala marked this conversation as resolved.
Show resolved Hide resolved
cFrom, err := proxy.NewClient()
if err != nil {
return errors.Wrap(err, "error creating client")
}

// Get the ClusterClass from the server
clusterClass := &clusterv1.ClusterClass{}
clusterClassObjKey := client.ObjectKey{
Name: n.identity.Name,
Namespace: n.identity.Namespace,
}
if err := cFrom.Get(ctx, clusterClassObjKey, clusterClass); err != nil {
clusterClass := &unstructured.Unstructured{}
clusterClass.SetAPIVersion(clusterv1.GroupVersion.String())
clusterClass.SetKind(clusterv1.KindClusterClass)
clusterClass.SetName(n.identity.Name)
clusterClass.SetNamespace(n.identity.Namespace)
for _, mutator := range mutators {
mutator(clusterClass)
}
if err := cFrom.Get(ctx, client.ObjectKeyFromObject(clusterClass), clusterClass); err != nil {
return errors.Wrapf(err, "error reading ClusterClass %s/%s", n.identity.Namespace, n.identity.Name)
}

Expand Down Expand Up @@ -740,7 +750,7 @@ func (o *objectMover) ensureNamespace(toProxy Proxy, namespace string) error {
return err
}

// If the namespace does not exists, create it.
// If the namespace does not exist, create it.
ns = &corev1.Namespace{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Expand All @@ -758,15 +768,18 @@ func (o *objectMover) ensureNamespace(toProxy Proxy, namespace string) error {
}

// createGroup creates all the Kubernetes objects into the target management cluster corresponding to the object graph nodes in a moveGroup.
func (o *objectMover) createGroup(group moveGroup, toProxy Proxy) error {
func (o *objectMover) createGroup(group moveGroup, toProxy Proxy, mutators []ResourceMutatorFunc) error {
takirala marked this conversation as resolved.
Show resolved Hide resolved
createTargetObjectBackoff := newWriteBackoff()
errList := []error{}

// Maintain a cache of namespaces that have been verified to already exist.
// Nb. This prevents us from making repetitive (and expensive) calls in listing all namespaces to ensure a namespace exists before creating a resource.
ensuredNamespaces := sets.New[string]()
takirala marked this conversation as resolved.
Show resolved Hide resolved
for _, nodeToCreate := range group {
// Creates the Kubernetes object corresponding to the nodeToCreate.
// Nb. The operation is wrapped in a retry loop to make move more resilient to unexpected conditions.
err := retryWithExponentialBackoff(createTargetObjectBackoff, func() error {
return o.createTargetObject(nodeToCreate, toProxy)
return o.createTargetObject(nodeToCreate, toProxy, mutators, ensuredNamespaces)
})
if err != nil {
errList = append(errList, err)
Expand Down Expand Up @@ -825,7 +838,7 @@ func (o *objectMover) restoreGroup(group moveGroup, toProxy Proxy) error {
}

// createTargetObject creates the Kubernetes object in the target Management cluster corresponding to the object graph node, taking care of restoring the OwnerReference with the owner nodes, if any.
func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) error {
func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy, mutators []ResourceMutatorFunc, ensuredNamespaces sets.Set[string]) error {
log := logf.Log
log.V(1).Info("Creating", nodeToCreate.identity.Kind, nodeToCreate.identity.Name, "Namespace", nodeToCreate.identity.Namespace)

Expand Down Expand Up @@ -858,7 +871,7 @@ func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) erro
// Removes current OwnerReferences
obj.SetOwnerReferences(nil)

// Rebuild the owne reference chain
// Rebuild the owner reference chain
o.buildOwnerChain(obj, nodeToCreate)

// FIXME Workaround for https://github.com/kubernetes/kubernetes/issues/32220. Remove when the issue is fixed.
Expand All @@ -873,6 +886,15 @@ func (o *objectMover) createTargetObject(nodeToCreate *node, toProxy Proxy) erro
return err
}

for _, mutator := range mutators {
mutator(obj)
}
// Applying mutators MAY change the namespace, so ensure the namespace exists before creating the resource.
if !nodeToCreate.isGlobal && !ensuredNamespaces.Has(obj.GetNamespace()) {
if err = o.ensureNamespace(toProxy, obj.GetNamespace()); err != nil {
return err
}
}
oldManagedFields := obj.GetManagedFields()
if err := cTo.Create(ctx, obj); err != nil {
if !apierrors.IsAlreadyExists(err) {
Expand Down
Loading