Skip to content

Commit

Permalink
Merge pull request #5631 from fabriziopandini/topology-events
Browse files Browse the repository at this point in the history
🌱 Generate events for topology changes
  • Loading branch information
k8s-ci-robot authored Dec 3, 2021
2 parents 98caa4b + 2093eb5 commit 0e15762
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 38 deletions.
5 changes: 4 additions & 1 deletion controllers/topology/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/api/v1beta1/index"
"sigs.k8s.io/cluster-api/controllers/external"
Expand Down Expand Up @@ -61,6 +62,7 @@ type ClusterReconciler struct {
UnstructuredCachingClient client.Client

externalTracker external.ObjectTracker
recorder record.EventRecorder

// patchEngine is used to apply patches during computeDesiredState.
patchEngine patches.Engine
Expand Down Expand Up @@ -95,7 +97,7 @@ func (r *ClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
Controller: c,
}
r.patchEngine = patches.NewEngine()

r.recorder = mgr.GetEventRecorderFor("topology/cluster")
return nil
}

Expand All @@ -115,6 +117,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_
// Error reading the object - requeue the request.
return ctrl.Result{}, err
}
cluster.Kind = "Cluster"

// Return early, if the Cluster does not use a managed topology.
// NOTE: We're already filtering events, but this is a safeguard for cases like e.g. when
Expand Down
3 changes: 2 additions & 1 deletion controllers/topology/internal/mergepatch/mergepatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,5 +302,6 @@ func (h *Helper) Patch(ctx context.Context) error {
log := ctrl.LoggerFrom(ctx)
log.V(5).Info("Patching object", "Patch", string(h.patch))

return h.client.Patch(ctx, h.original, client.RawPatch(types.MergePatchType, h.patch))
// Note: deepcopy before patching in order to avoid modifications to the original object.
return h.client.Patch(ctx, h.original.DeepCopyObject().(client.Object), client.RawPatch(types.MergePatchType, h.patch))
}
134 changes: 106 additions & 28 deletions controllers/topology/reconcile_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
createEventReason = "TopologyCreate"
updateEventReason = "TopologyUpdate"
deleteEventReason = "TopologyDelete"
)

// reconcileState reconciles the current and desired state of the managed Cluster topology.
// NOTE: We are assuming all the required objects are provided as input; also, in case of any error,
// the entire reconcile operation will fail. This might be improved in the future if support for reconciling
Expand Down Expand Up @@ -157,7 +163,14 @@ func hasOwnerReferenceFrom(obj, owner client.Object) bool {
// reconcileInfrastructureCluster reconciles the desired state of the InfrastructureCluster object.
func (r *ClusterReconciler) reconcileInfrastructureCluster(ctx context.Context, s *scope.Scope) error {
ctx, _ = tlog.LoggerFrom(ctx).WithObject(s.Desired.InfrastructureCluster).Into(ctx)
return r.reconcileReferencedObject(ctx, s.Current.InfrastructureCluster, s.Desired.InfrastructureCluster, mergepatch.IgnorePaths(contract.InfrastructureCluster().IgnorePaths()))
return r.reconcileReferencedObject(ctx, reconcileReferencedObjectInput{
cluster: s.Current.Cluster,
current: s.Current.InfrastructureCluster,
desired: s.Desired.InfrastructureCluster,
opts: []mergepatch.HelperOption{
mergepatch.IgnorePaths(contract.InfrastructureCluster().IgnorePaths()),
},
})
}

// reconcileControlPlane works to bring the current state of a managed topology in line with the desired state. This involves
Expand All @@ -174,6 +187,7 @@ func (r *ClusterReconciler) reconcileControlPlane(ctx context.Context, s *scope.

// Create or update the MachineInfrastructureTemplate of the control plane.
err = r.reconcileReferencedTemplate(ctx, reconcileReferencedTemplateInput{
cluster: s.Current.Cluster,
ref: cpInfraRef,
current: s.Current.ControlPlane.InfrastructureMachineTemplate,
desired: s.Desired.ControlPlane.InfrastructureMachineTemplate,
Expand All @@ -194,14 +208,22 @@ func (r *ClusterReconciler) reconcileControlPlane(ctx context.Context, s *scope.

// Create or update the ControlPlaneObject for the ControlPlaneState.
ctx, _ = tlog.LoggerFrom(ctx).WithObject(s.Desired.ControlPlane.Object).Into(ctx)
if err := r.reconcileReferencedObject(ctx, s.Current.ControlPlane.Object, s.Desired.ControlPlane.Object, mergepatch.AuthoritativePaths{
// Note: we want to be authoritative WRT machine's metadata labels and annotations.
// This has the nice benefit that it greatly simplify the UX around ControlPlaneClass.Metadata and
// ControlPlaneTopology.Metadata, given that changes are reflected into generated objects without
// accounting for instance specific changes like we do for other maps into spec.
// Note: nested metadata have only labels and annotations, so it is possible to override the entire
// parent struct.
contract.ControlPlane().MachineTemplate().Metadata().Path(),
if err := r.reconcileReferencedObject(ctx, reconcileReferencedObjectInput{
cluster: s.Current.Cluster,
current: s.Current.ControlPlane.Object,
desired: s.Desired.ControlPlane.Object,
versionGetter: contract.ControlPlane().Version().Get,
opts: []mergepatch.HelperOption{
mergepatch.AuthoritativePaths{
// Note: we want to be authoritative WRT machine's metadata labels and annotations.
// This has the nice benefit that it greatly simplify the UX around ControlPlaneClass.Metadata and
// ControlPlaneTopology.Metadata, given that changes are reflected into generated objects without
// accounting for instance specific changes like we do for other maps into spec.
// Note: nested metadata have only labels and annotations, so it is possible to override the entire
// parent struct.
contract.ControlPlane().MachineTemplate().Metadata().Path(),
},
},
}); err != nil {
return errors.Wrapf(err, "failed to update %s", tlog.KObj{Obj: s.Desired.ControlPlane.Object})
}
Expand Down Expand Up @@ -230,6 +252,7 @@ func (r *ClusterReconciler) reconcileCluster(ctx context.Context, s *scope.Scope
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: s.Current.Cluster})
}
r.recorder.Eventf(s.Current.Cluster, corev1.EventTypeNormal, updateEventReason, "Updated %q", tlog.KObj{Obj: s.Current.Cluster})
return nil
}

Expand All @@ -240,7 +263,7 @@ func (r *ClusterReconciler) reconcileMachineDeployments(ctx context.Context, s *
// Create MachineDeployments.
for _, mdTopologyName := range diff.toCreate {
md := s.Desired.MachineDeployments[mdTopologyName]
if err := r.createMachineDeployment(ctx, md); err != nil {
if err := r.createMachineDeployment(ctx, s.Current.Cluster, md); err != nil {
return err
}
}
Expand All @@ -249,15 +272,15 @@ func (r *ClusterReconciler) reconcileMachineDeployments(ctx context.Context, s *
for _, mdTopologyName := range diff.toUpdate {
currentMD := s.Current.MachineDeployments[mdTopologyName]
desiredMD := s.Desired.MachineDeployments[mdTopologyName]
if err := r.updateMachineDeployment(ctx, s.Current.Cluster.Name, mdTopologyName, currentMD, desiredMD); err != nil {
if err := r.updateMachineDeployment(ctx, s.Current.Cluster, mdTopologyName, currentMD, desiredMD); err != nil {
return err
}
}

// Delete MachineDeployments.
for _, mdTopologyName := range diff.toDelete {
md := s.Current.MachineDeployments[mdTopologyName]
if err := r.deleteMachineDeployment(ctx, md); err != nil {
if err := r.deleteMachineDeployment(ctx, s.Current.Cluster, md); err != nil {
return err
}
}
Expand All @@ -266,18 +289,20 @@ func (r *ClusterReconciler) reconcileMachineDeployments(ctx context.Context, s *
}

// createMachineDeployment creates a MachineDeployment and the corresponding Templates.
func (r *ClusterReconciler) createMachineDeployment(ctx context.Context, md *scope.MachineDeploymentState) error {
func (r *ClusterReconciler) createMachineDeployment(ctx context.Context, cluster *clusterv1.Cluster, md *scope.MachineDeploymentState) error {
log := tlog.LoggerFrom(ctx).WithMachineDeployment(md.Object)

infraCtx, _ := log.WithObject(md.InfrastructureMachineTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(infraCtx, reconcileReferencedTemplateInput{
cluster: cluster,
desired: md.InfrastructureMachineTemplate,
}); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: md.Object})
}

bootstrapCtx, _ := log.WithObject(md.BootstrapTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(bootstrapCtx, reconcileReferencedTemplateInput{
cluster: cluster,
desired: md.BootstrapTemplate,
}); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: md.Object})
Expand All @@ -288,30 +313,34 @@ func (r *ClusterReconciler) createMachineDeployment(ctx context.Context, md *sco
if err := r.Client.Create(ctx, md.Object.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: md.Object})
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, createEventReason, "Created %q", tlog.KObj{Obj: md.Object})

return nil
}

// updateMachineDeployment updates a MachineDeployment. Also rotates the corresponding Templates if necessary.
func (r *ClusterReconciler) updateMachineDeployment(ctx context.Context, clusterName string, mdTopologyName string, currentMD, desiredMD *scope.MachineDeploymentState) error {
func (r *ClusterReconciler) updateMachineDeployment(ctx context.Context, cluster *clusterv1.Cluster, mdTopologyName string, currentMD, desiredMD *scope.MachineDeploymentState) error {
log := tlog.LoggerFrom(ctx).WithMachineDeployment(desiredMD.Object)

infraCtx, _ := log.WithObject(desiredMD.InfrastructureMachineTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(infraCtx, reconcileReferencedTemplateInput{
cluster: cluster,
ref: &desiredMD.Object.Spec.Template.Spec.InfrastructureRef,
current: currentMD.InfrastructureMachineTemplate,
desired: desiredMD.InfrastructureMachineTemplate,
templateNamePrefix: infrastructureMachineTemplateNamePrefix(clusterName, mdTopologyName),
templateNamePrefix: infrastructureMachineTemplateNamePrefix(cluster.Name, mdTopologyName),
compatibilityChecker: check.ObjectsAreCompatible,
}); err != nil {
return errors.Wrapf(err, "failed to update %s", tlog.KObj{Obj: currentMD.Object})
}

bootstrapCtx, _ := log.WithObject(desiredMD.BootstrapTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(bootstrapCtx, reconcileReferencedTemplateInput{
cluster: cluster,
ref: desiredMD.Object.Spec.Template.Spec.Bootstrap.ConfigRef,
current: currentMD.BootstrapTemplate,
desired: desiredMD.BootstrapTemplate,
templateNamePrefix: bootstrapTemplateNamePrefix(clusterName, mdTopologyName),
templateNamePrefix: bootstrapTemplateNamePrefix(cluster.Name, mdTopologyName),
compatibilityChecker: check.ObjectsAreInTheSameNamespace,
}); err != nil {
return errors.Wrapf(err, "failed to update %s", tlog.KObj{Obj: currentMD.Object})
Expand Down Expand Up @@ -344,19 +373,32 @@ func (r *ClusterReconciler) updateMachineDeployment(ctx context.Context, cluster
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: currentMD.Object})
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, updateEventReason, "Updated %q%s", tlog.KObj{Obj: currentMD.Object}, logMachineDeploymentVersionChange(currentMD.Object, desiredMD.Object))

// We want to call both cleanup functions even if one of them fails to clean up as much as possible.
return nil
}

func logMachineDeploymentVersionChange(current, desired *clusterv1.MachineDeployment) string {
if current.Spec.Template.Spec.Version == nil || desired.Spec.Template.Spec.Version == nil {
return ""
}

if *current.Spec.Template.Spec.Version != *desired.Spec.Template.Spec.Version {
return fmt.Sprintf(" with version change from %s to %s", *current.Spec.Template.Spec.Version, *desired.Spec.Template.Spec.Version)
}
return ""
}

// deleteMachineDeployment deletes a MachineDeployment.
func (r *ClusterReconciler) deleteMachineDeployment(ctx context.Context, md *scope.MachineDeploymentState) error {
func (r *ClusterReconciler) deleteMachineDeployment(ctx context.Context, cluster *clusterv1.Cluster, md *scope.MachineDeploymentState) error {
log := tlog.LoggerFrom(ctx).WithMachineDeployment(md.Object).WithObject(md.Object)

log.Infof("Deleting %s", tlog.KObj{Obj: md.Object})
if err := r.Client.Delete(ctx, md.Object); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to delete %s", tlog.KObj{Obj: md.Object})
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, deleteEventReason, "Deleted %q", tlog.KObj{Obj: md.Object})
return nil
}

Expand Down Expand Up @@ -386,44 +428,77 @@ func calculateMachineDeploymentDiff(current, desired map[string]*scope.MachineDe
return diff
}

type unstructuredVersionGetter func(obj *unstructured.Unstructured) (*string, error)

type reconcileReferencedObjectInput struct {
cluster *clusterv1.Cluster
current *unstructured.Unstructured
desired *unstructured.Unstructured
versionGetter unstructuredVersionGetter
opts []mergepatch.HelperOption
}

// reconcileReferencedObject reconciles the desired state of the referenced object.
// NOTE: After a referenced object is created it is assumed that the reference should
// never change (only the content of the object can eventually change). Thus, we are checking for strict compatibility.
func (r *ClusterReconciler) reconcileReferencedObject(ctx context.Context, current, desired *unstructured.Unstructured, opts ...mergepatch.HelperOption) error {
func (r *ClusterReconciler) reconcileReferencedObject(ctx context.Context, in reconcileReferencedObjectInput) error {
log := tlog.LoggerFrom(ctx)

// If there is no current object, create it.
if current == nil {
log.Infof("Creating %s", tlog.KObj{Obj: desired})
if err := r.Client.Create(ctx, desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: desired})
if in.current == nil {
log.Infof("Creating %s", tlog.KObj{Obj: in.desired})
if err := r.Client.Create(ctx, in.desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, createEventReason, "Created %q", tlog.KObj{Obj: in.desired})
return nil
}

// Check if the current and desired referenced object are compatible.
if allErrs := check.ObjectsAreStrictlyCompatible(current, desired); len(allErrs) > 0 {
if allErrs := check.ObjectsAreStrictlyCompatible(in.current, in.desired); len(allErrs) > 0 {
return allErrs.ToAggregate()
}

// Check differences between current and desired state, and eventually patch the current object.
patchHelper, err := mergepatch.NewHelper(current, desired, r.Client, opts...)
patchHelper, err := mergepatch.NewHelper(in.current, in.desired, r.Client, in.opts...)
if err != nil {
return errors.Wrapf(err, "failed to create patch helper for %s", tlog.KObj{Obj: current})
return errors.Wrapf(err, "failed to create patch helper for %s", tlog.KObj{Obj: in.current})
}
if !patchHelper.HasChanges() {
log.V(3).Infof("No changes for %s", tlog.KObj{Obj: desired})
log.V(3).Infof("No changes for %s", tlog.KObj{Obj: in.desired})
return nil
}

log.Infof("Patching %s", tlog.KObj{Obj: desired})
log.Infof("Patching %s", tlog.KObj{Obj: in.desired})
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: current})
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: in.current})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, updateEventReason, "Updated %q%s", tlog.KObj{Obj: in.desired}, logUnstructuredVersionChange(in.current, in.desired, in.versionGetter))
return nil
}

func logUnstructuredVersionChange(current, desired *unstructured.Unstructured, versionGetter unstructuredVersionGetter) string {
if versionGetter == nil {
return ""
}

currentVersion, err := versionGetter(current)
if err != nil || currentVersion == nil {
return ""
}
desiredVersion, err := versionGetter(desired)
if err != nil || desiredVersion == nil {
return ""
}

if *currentVersion != *desiredVersion {
return fmt.Sprintf(" with version change from %s to %s", *currentVersion, *desiredVersion)
}
return ""
}

type reconcileReferencedTemplateInput struct {
cluster *clusterv1.Cluster
ref *corev1.ObjectReference
current *unstructured.Unstructured
desired *unstructured.Unstructured
Expand All @@ -448,6 +523,7 @@ func (r *ClusterReconciler) reconcileReferencedTemplate(ctx context.Context, in
if err := r.Client.Create(ctx, in.desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, createEventReason, "Created %q", tlog.KObj{Obj: in.desired})
return nil
}

Expand Down Expand Up @@ -479,6 +555,7 @@ func (r *ClusterReconciler) reconcileReferencedTemplate(ctx context.Context, in
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, updateEventReason, "Updated %q (metadata changes)", tlog.KObj{Obj: in.desired})
return nil
}

Expand All @@ -494,6 +571,7 @@ func (r *ClusterReconciler) reconcileReferencedTemplate(ctx context.Context, in
if err := r.Client.Create(ctx, in.desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, createEventReason, "Created %q as a replacement for %q (template rotation)", tlog.KObj{Obj: in.desired}, in.ref.Name)

// Update the reference with the new name.
// NOTE: Updating the object hosting reference to the template is executed outside this func.
Expand Down
Loading

0 comments on commit 0e15762

Please sign in to comment.