Skip to content

Commit

Permalink
generate events for topology changes
Browse files Browse the repository at this point in the history
  • Loading branch information
fabriziopandini committed Nov 10, 2021
1 parent 7233fd7 commit 744b4b3
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 39 deletions.
5 changes: 4 additions & 1 deletion controllers/topology/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/api/v1beta1/index"
"sigs.k8s.io/cluster-api/controllers/external"
Expand Down Expand Up @@ -59,6 +60,7 @@ type ClusterReconciler struct {
UnstructuredCachingClient client.Client

externalTracker external.ObjectTracker
recorder record.EventRecorder

// patchEngine is used to apply patches during computeDesiredState.
patchEngine patches.Engine
Expand Down Expand Up @@ -88,7 +90,7 @@ func (r *ClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
Controller: c,
}
r.patchEngine = patches.NewEngine()

r.recorder = mgr.GetEventRecorderFor("topology/cluster")
return nil
}

Expand All @@ -108,6 +110,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_
// Error reading the object - requeue the request.
return ctrl.Result{}, err
}
cluster.Kind = "Cluster"

// Return early, if the Cluster does not use a managed topology.
// NOTE: This should be removed as soon as we start to support Clusters moving from managed <-> unmanaged.
Expand Down
2 changes: 1 addition & 1 deletion controllers/topology/internal/mergepatch/mergepatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,5 +293,5 @@ func (h *Helper) Patch(ctx context.Context) error {
log := ctrl.LoggerFrom(ctx)
log.V(5).Info("Patching object", "Patch", string(h.patch))

return h.client.Patch(ctx, h.original, client.RawPatch(types.MergePatchType, h.patch))
return h.client.Patch(ctx, h.original.DeepCopyObject().(client.Object), client.RawPatch(types.MergePatchType, h.patch))
}
129 changes: 101 additions & 28 deletions controllers/topology/reconcile_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apiserver/pkg/storage/names"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/controllers/topology/internal/contract"
tlog "sigs.k8s.io/cluster-api/controllers/topology/internal/log"
"sigs.k8s.io/cluster-api/controllers/topology/internal/mergepatch"
Expand Down Expand Up @@ -63,7 +64,14 @@ func (r *ClusterReconciler) reconcileState(ctx context.Context, s *scope.Scope)
// reconcileInfrastructureCluster reconciles the desired state of the InfrastructureCluster object.
func (r *ClusterReconciler) reconcileInfrastructureCluster(ctx context.Context, s *scope.Scope) error {
ctx, _ = tlog.LoggerFrom(ctx).WithObject(s.Desired.InfrastructureCluster).Into(ctx)
return r.reconcileReferencedObject(ctx, s.Current.InfrastructureCluster, s.Desired.InfrastructureCluster, mergepatch.IgnorePaths(contract.InfrastructureCluster().IgnorePaths()))
return r.reconcileReferencedObject(ctx, reconcileReferencedObjectInput{
cluster: s.Current.Cluster,
current: s.Current.InfrastructureCluster,
desired: s.Desired.InfrastructureCluster,
opts: []mergepatch.HelperOption{
mergepatch.IgnorePaths(contract.InfrastructureCluster().IgnorePaths()),
},
})
}

// reconcileControlPlane works to bring the current state of a managed topology in line with the desired state. This involves
Expand All @@ -80,6 +88,7 @@ func (r *ClusterReconciler) reconcileControlPlane(ctx context.Context, s *scope.

// Create or update the MachineInfrastructureTemplate of the control plane.
err = r.reconcileReferencedTemplate(ctx, reconcileReferencedTemplateInput{
cluster: s.Current.Cluster,
ref: cpInfraRef,
current: s.Current.ControlPlane.InfrastructureMachineTemplate,
desired: s.Desired.ControlPlane.InfrastructureMachineTemplate,
Expand All @@ -100,14 +109,22 @@ func (r *ClusterReconciler) reconcileControlPlane(ctx context.Context, s *scope.

// Create or update the ControlPlaneObject for the ControlPlaneState.
ctx, _ = tlog.LoggerFrom(ctx).WithObject(s.Desired.ControlPlane.Object).Into(ctx)
if err := r.reconcileReferencedObject(ctx, s.Current.ControlPlane.Object, s.Desired.ControlPlane.Object, mergepatch.AuthoritativePaths{
// Note: we want to be authoritative WRT machine's metadata labels and annotations.
// This has the nice benefit that it greatly simplify the UX around ControlPlaneClass.Metadata and
// ControlPlaneTopology.Metadata, given that changes are reflected into generated objects without
// accounting for instance specific changes like we do for other maps into spec.
// Note: nested metadata have only labels and annotations, so it is possible to override the entire
// parent struct.
contract.ControlPlane().MachineTemplate().Metadata().Path(),
if err := r.reconcileReferencedObject(ctx, reconcileReferencedObjectInput{
cluster: s.Current.Cluster,
current: s.Current.ControlPlane.Object,
desired: s.Desired.ControlPlane.Object,
versionGetter: contract.ControlPlane().Version().Get,
opts: []mergepatch.HelperOption{
mergepatch.AuthoritativePaths{
// Note: we want to be authoritative WRT machine's metadata labels and annotations.
// This has the nice benefit that it greatly simplify the UX around ControlPlaneClass.Metadata and
// ControlPlaneTopology.Metadata, given that changes are reflected into generated objects without
// accounting for instance specific changes like we do for other maps into spec.
// Note: nested metadata have only labels and annotations, so it is possible to override the entire
// parent struct.
contract.ControlPlane().MachineTemplate().Metadata().Path(),
},
},
}); err != nil {
return errors.Wrapf(err, "failed to update %s", tlog.KObj{Obj: s.Desired.ControlPlane.Object})
}
Expand Down Expand Up @@ -136,6 +153,7 @@ func (r *ClusterReconciler) reconcileCluster(ctx context.Context, s *scope.Scope
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: s.Current.Cluster})
}
r.recorder.Eventf(s.Current.Cluster, corev1.EventTypeNormal, "TopologyUpdate", "Updated %q", tlog.KObj{Obj: s.Current.Cluster})
return nil
}

Expand All @@ -146,7 +164,7 @@ func (r *ClusterReconciler) reconcileMachineDeployments(ctx context.Context, s *
// Create MachineDeployments.
for _, mdTopologyName := range diff.toCreate {
md := s.Desired.MachineDeployments[mdTopologyName]
if err := r.createMachineDeployment(ctx, md); err != nil {
if err := r.createMachineDeployment(ctx, s.Current.Cluster, md); err != nil {
return err
}
}
Expand All @@ -155,15 +173,15 @@ func (r *ClusterReconciler) reconcileMachineDeployments(ctx context.Context, s *
for _, mdTopologyName := range diff.toUpdate {
currentMD := s.Current.MachineDeployments[mdTopologyName]
desiredMD := s.Desired.MachineDeployments[mdTopologyName]
if err := r.updateMachineDeployment(ctx, s.Current.Cluster.Name, mdTopologyName, currentMD, desiredMD); err != nil {
if err := r.updateMachineDeployment(ctx, s.Current.Cluster, mdTopologyName, currentMD, desiredMD); err != nil {
return err
}
}

// Delete MachineDeployments.
for _, mdTopologyName := range diff.toDelete {
md := s.Current.MachineDeployments[mdTopologyName]
if err := r.deleteMachineDeployment(ctx, md); err != nil {
if err := r.deleteMachineDeployment(ctx, s.Current.Cluster, md); err != nil {
return err
}
}
Expand All @@ -172,18 +190,20 @@ func (r *ClusterReconciler) reconcileMachineDeployments(ctx context.Context, s *
}

// createMachineDeployment creates a MachineDeployment and the corresponding Templates.
func (r *ClusterReconciler) createMachineDeployment(ctx context.Context, md *scope.MachineDeploymentState) error {
func (r *ClusterReconciler) createMachineDeployment(ctx context.Context, cluster *clusterv1.Cluster, md *scope.MachineDeploymentState) error {
log := tlog.LoggerFrom(ctx).WithMachineDeployment(md.Object)

ctx, _ = log.WithObject(md.InfrastructureMachineTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(ctx, reconcileReferencedTemplateInput{
cluster: cluster,
desired: md.InfrastructureMachineTemplate,
}); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: md.Object})
}

ctx, _ = log.WithObject(md.BootstrapTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(ctx, reconcileReferencedTemplateInput{
cluster: cluster,
desired: md.BootstrapTemplate,
}); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: md.Object})
Expand All @@ -194,30 +214,34 @@ func (r *ClusterReconciler) createMachineDeployment(ctx context.Context, md *sco
if err := r.Client.Create(ctx, md.Object.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: md.Object})
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, "TopologyCreate", "Created %q", tlog.KObj{Obj: md.Object})

return nil
}

// updateMachineDeployment updates a MachineDeployment. Also rotates the corresponding Templates if necessary.
func (r *ClusterReconciler) updateMachineDeployment(ctx context.Context, clusterName string, mdTopologyName string, currentMD, desiredMD *scope.MachineDeploymentState) error {
func (r *ClusterReconciler) updateMachineDeployment(ctx context.Context, cluster *clusterv1.Cluster, mdTopologyName string, currentMD, desiredMD *scope.MachineDeploymentState) error {
log := tlog.LoggerFrom(ctx).WithMachineDeployment(desiredMD.Object)

ctx, _ = log.WithObject(desiredMD.InfrastructureMachineTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(ctx, reconcileReferencedTemplateInput{
cluster: cluster,
ref: &desiredMD.Object.Spec.Template.Spec.InfrastructureRef,
current: currentMD.InfrastructureMachineTemplate,
desired: desiredMD.InfrastructureMachineTemplate,
templateNamePrefix: infrastructureMachineTemplateNamePrefix(clusterName, mdTopologyName),
templateNamePrefix: infrastructureMachineTemplateNamePrefix(cluster.Name, mdTopologyName),
compatibilityChecker: check.ReferencedObjectsAreCompatible,
}); err != nil {
return errors.Wrapf(err, "failed to update %s", tlog.KObj{Obj: currentMD.Object})
}

ctx, _ = log.WithObject(desiredMD.BootstrapTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(ctx, reconcileReferencedTemplateInput{
cluster: cluster,
ref: desiredMD.Object.Spec.Template.Spec.Bootstrap.ConfigRef,
current: currentMD.BootstrapTemplate,
desired: desiredMD.BootstrapTemplate,
templateNamePrefix: bootstrapTemplateNamePrefix(clusterName, mdTopologyName),
templateNamePrefix: bootstrapTemplateNamePrefix(cluster.Name, mdTopologyName),
compatibilityChecker: check.ObjectsAreInTheSameNamespace,
}); err != nil {
return errors.Wrapf(err, "failed to update %s", tlog.KObj{Obj: currentMD.Object})
Expand Down Expand Up @@ -246,19 +270,32 @@ func (r *ClusterReconciler) updateMachineDeployment(ctx context.Context, cluster
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: currentMD.Object})
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, "TopologyUpdate", "Updated %q%s", tlog.KObj{Obj: currentMD.Object}, logMachineDeploymentVersionChange(currentMD.Object, desiredMD.Object))

// We want to call both cleanup functions even if one of them fails to clean up as much as possible.
return nil
}

func logMachineDeploymentVersionChange(current, desired *clusterv1.MachineDeployment) string {
if current.Spec.Template.Spec.Version == nil || desired.Spec.Template.Spec.Version == nil {
return ""
}

if *current.Spec.Template.Spec.Version != *desired.Spec.Template.Spec.Version {
return fmt.Sprintf(" with version change from %s to %s", *current.Spec.Template.Spec.Version, *desired.Spec.Template.Spec.Version)
}
return ""
}

// deleteMachineDeployment deletes a MachineDeployment.
func (r *ClusterReconciler) deleteMachineDeployment(ctx context.Context, md *scope.MachineDeploymentState) error {
func (r *ClusterReconciler) deleteMachineDeployment(ctx context.Context, cluster *clusterv1.Cluster, md *scope.MachineDeploymentState) error {
log := tlog.LoggerFrom(ctx).WithMachineDeployment(md.Object).WithObject(md.Object)

log.Infof("Deleting %s", tlog.KObj{Obj: md.Object})
if err := r.Client.Delete(ctx, md.Object); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to delete %s", tlog.KObj{Obj: md.Object})
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, "TopologyDelete", "Deleted %q", tlog.KObj{Obj: md.Object})
return nil
}

Expand Down Expand Up @@ -288,44 +325,77 @@ func calculateMachineDeploymentDiff(current, desired map[string]*scope.MachineDe
return diff
}

type unstructuredVersionGetter func(obj *unstructured.Unstructured) (*string, error)

type reconcileReferencedObjectInput struct {
cluster *clusterv1.Cluster
current *unstructured.Unstructured
desired *unstructured.Unstructured
versionGetter unstructuredVersionGetter
opts []mergepatch.HelperOption
}

// reconcileReferencedObject reconciles the desired state of the referenced object.
// NOTE: After a referenced object is created it is assumed that the reference should
// never change (only the content of the object can eventually change). Thus, we are checking for strict compatibility.
func (r *ClusterReconciler) reconcileReferencedObject(ctx context.Context, current, desired *unstructured.Unstructured, opts ...mergepatch.HelperOption) error {
func (r *ClusterReconciler) reconcileReferencedObject(ctx context.Context, in reconcileReferencedObjectInput) error {
log := tlog.LoggerFrom(ctx)

// If there is no current object, create it.
if current == nil {
log.Infof("Creating %s", tlog.KObj{Obj: desired})
if err := r.Client.Create(ctx, desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: desired})
if in.current == nil {
log.Infof("Creating %s", tlog.KObj{Obj: in.desired})
if err := r.Client.Create(ctx, in.desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, "TopologyCreate", "Created %q", tlog.KObj{Obj: in.desired})
return nil
}

// Check if the current and desired referenced object are compatible.
if err := check.ReferencedObjectsAreStrictlyCompatible(current, desired); err != nil {
if err := check.ReferencedObjectsAreStrictlyCompatible(in.current, in.desired); err != nil {
return err
}

// Check differences between current and desired state, and eventually patch the current object.
patchHelper, err := mergepatch.NewHelper(current, desired, r.Client, opts...)
patchHelper, err := mergepatch.NewHelper(in.current, in.desired, r.Client, in.opts...)
if err != nil {
return errors.Wrapf(err, "failed to create patch helper for %s", tlog.KObj{Obj: current})
return errors.Wrapf(err, "failed to create patch helper for %s", tlog.KObj{Obj: in.current})
}
if !patchHelper.HasChanges() {
log.V(3).Infof("No changes for %s", tlog.KObj{Obj: desired})
log.V(3).Infof("No changes for %s", tlog.KObj{Obj: in.desired})
return nil
}

log.Infof("Patching %s", tlog.KObj{Obj: desired})
log.Infof("Patching %s", tlog.KObj{Obj: in.desired})
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: current})
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: in.current})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, "TopologyUpdate", "Updated %q%s", tlog.KObj{Obj: in.desired}, logUnstructuredVersionChange(in.current, in.desired, in.versionGetter))
return nil
}

func logUnstructuredVersionChange(current, desired *unstructured.Unstructured, versionGetter unstructuredVersionGetter) string {
if versionGetter == nil {
return ""
}

currentVersion, err := versionGetter(current)
if err != nil || currentVersion == nil {
return ""
}
desiredVersion, err := versionGetter(desired)
if err != nil || desiredVersion == nil {
return ""
}

if *currentVersion != *desiredVersion {
return fmt.Sprintf(" with version change from %s to %s", *currentVersion, *desiredVersion)
}
return ""
}

type reconcileReferencedTemplateInput struct {
cluster *clusterv1.Cluster
ref *corev1.ObjectReference
current *unstructured.Unstructured
desired *unstructured.Unstructured
Expand All @@ -350,6 +420,7 @@ func (r *ClusterReconciler) reconcileReferencedTemplate(ctx context.Context, in
if err := r.Client.Create(ctx, in.desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, "TopologyCreate", "Created %q", tlog.KObj{Obj: in.desired})
return nil
}

Expand Down Expand Up @@ -381,6 +452,7 @@ func (r *ClusterReconciler) reconcileReferencedTemplate(ctx context.Context, in
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, "TopologyUpdate", "Updated %q (metadata changes)", tlog.KObj{Obj: in.desired})
return nil
}

Expand All @@ -396,6 +468,7 @@ func (r *ClusterReconciler) reconcileReferencedTemplate(ctx context.Context, in
if err := r.Client.Create(ctx, in.desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, "TopologyCreate", "Created %q as a replacement for %q (template rotation)", tlog.KObj{Obj: in.desired}, in.ref.Name)

// Update the reference with the new name.
// NOTE: Updating the object hosting reference to the template is executed outside this func.
Expand Down
Loading

0 comments on commit 744b4b3

Please sign in to comment.