Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌱 Generate events for topology changes #5631

Merged
merged 1 commit into from
Dec 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion controllers/topology/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1"
"sigs.k8s.io/cluster-api/api/v1beta1/index"
"sigs.k8s.io/cluster-api/controllers/external"
Expand Down Expand Up @@ -61,6 +62,7 @@ type ClusterReconciler struct {
UnstructuredCachingClient client.Client

externalTracker external.ObjectTracker
recorder record.EventRecorder

// patchEngine is used to apply patches during computeDesiredState.
patchEngine patches.Engine
Expand Down Expand Up @@ -95,7 +97,7 @@ func (r *ClusterReconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manag
Controller: c,
}
r.patchEngine = patches.NewEngine()

r.recorder = mgr.GetEventRecorderFor("topology/cluster")
return nil
}

Expand All @@ -115,6 +117,7 @@ func (r *ClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (_
// Error reading the object - requeue the request.
return ctrl.Result{}, err
}
cluster.Kind = "Cluster"
fabriziopandini marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

curious why is this needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we actually use the scheme here to set the GVK?


// Return early, if the Cluster does not use a managed topology.
// NOTE: We're already filtering events, but this is a safeguard for cases like e.g. when
Expand Down
3 changes: 2 additions & 1 deletion controllers/topology/internal/mergepatch/mergepatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,5 +302,6 @@ func (h *Helper) Patch(ctx context.Context) error {
log := ctrl.LoggerFrom(ctx)
log.V(5).Info("Patching object", "Patch", string(h.patch))

return h.client.Patch(ctx, h.original, client.RawPatch(types.MergePatchType, h.patch))
// Note: deepcopy before patching in order to avoid modifications to the original object.
return h.client.Patch(ctx, h.original.DeepCopyObject().(client.Object), client.RawPatch(types.MergePatchType, h.patch))
fabriziopandini marked this conversation as resolved.
Show resolved Hide resolved
}
134 changes: 106 additions & 28 deletions controllers/topology/reconcile_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
createEventReason = "TopologyCreate"
updateEventReason = "TopologyUpdate"
deleteEventReason = "TopologyDelete"
)

// reconcileState reconciles the current and desired state of the managed Cluster topology.
// NOTE: We are assuming all the required objects are provided as input; also, in case of any error,
// the entire reconcile operation will fail. This might be improved in the future if support for reconciling
Expand Down Expand Up @@ -157,7 +163,14 @@ func hasOwnerReferenceFrom(obj, owner client.Object) bool {
// reconcileInfrastructureCluster reconciles the desired state of the InfrastructureCluster object.
func (r *ClusterReconciler) reconcileInfrastructureCluster(ctx context.Context, s *scope.Scope) error {
ctx, _ = tlog.LoggerFrom(ctx).WithObject(s.Desired.InfrastructureCluster).Into(ctx)
return r.reconcileReferencedObject(ctx, s.Current.InfrastructureCluster, s.Desired.InfrastructureCluster, mergepatch.IgnorePaths(contract.InfrastructureCluster().IgnorePaths()))
return r.reconcileReferencedObject(ctx, reconcileReferencedObjectInput{
cluster: s.Current.Cluster,
current: s.Current.InfrastructureCluster,
desired: s.Desired.InfrastructureCluster,
opts: []mergepatch.HelperOption{
mergepatch.IgnorePaths(contract.InfrastructureCluster().IgnorePaths()),
},
})
}

// reconcileControlPlane works to bring the current state of a managed topology in line with the desired state. This involves
Expand All @@ -174,6 +187,7 @@ func (r *ClusterReconciler) reconcileControlPlane(ctx context.Context, s *scope.

// Create or update the MachineInfrastructureTemplate of the control plane.
err = r.reconcileReferencedTemplate(ctx, reconcileReferencedTemplateInput{
cluster: s.Current.Cluster,
ref: cpInfraRef,
current: s.Current.ControlPlane.InfrastructureMachineTemplate,
desired: s.Desired.ControlPlane.InfrastructureMachineTemplate,
Expand All @@ -194,14 +208,22 @@ func (r *ClusterReconciler) reconcileControlPlane(ctx context.Context, s *scope.

// Create or update the ControlPlaneObject for the ControlPlaneState.
ctx, _ = tlog.LoggerFrom(ctx).WithObject(s.Desired.ControlPlane.Object).Into(ctx)
if err := r.reconcileReferencedObject(ctx, s.Current.ControlPlane.Object, s.Desired.ControlPlane.Object, mergepatch.AuthoritativePaths{
// Note: we want to be authoritative WRT machine's metadata labels and annotations.
// This has the nice benefit that it greatly simplify the UX around ControlPlaneClass.Metadata and
// ControlPlaneTopology.Metadata, given that changes are reflected into generated objects without
// accounting for instance specific changes like we do for other maps into spec.
// Note: nested metadata have only labels and annotations, so it is possible to override the entire
// parent struct.
contract.ControlPlane().MachineTemplate().Metadata().Path(),
if err := r.reconcileReferencedObject(ctx, reconcileReferencedObjectInput{
cluster: s.Current.Cluster,
current: s.Current.ControlPlane.Object,
desired: s.Desired.ControlPlane.Object,
versionGetter: contract.ControlPlane().Version().Get,
opts: []mergepatch.HelperOption{
mergepatch.AuthoritativePaths{
// Note: we want to be authoritative WRT machine's metadata labels and annotations.
// This has the nice benefit that it greatly simplify the UX around ControlPlaneClass.Metadata and
// ControlPlaneTopology.Metadata, given that changes are reflected into generated objects without
// accounting for instance specific changes like we do for other maps into spec.
// Note: nested metadata have only labels and annotations, so it is possible to override the entire
// parent struct.
contract.ControlPlane().MachineTemplate().Metadata().Path(),
},
},
}); err != nil {
return errors.Wrapf(err, "failed to update %s", tlog.KObj{Obj: s.Desired.ControlPlane.Object})
}
Expand Down Expand Up @@ -230,6 +252,7 @@ func (r *ClusterReconciler) reconcileCluster(ctx context.Context, s *scope.Scope
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: s.Current.Cluster})
}
r.recorder.Eventf(s.Current.Cluster, corev1.EventTypeNormal, updateEventReason, "Updated %q", tlog.KObj{Obj: s.Current.Cluster})
return nil
}

Expand All @@ -240,7 +263,7 @@ func (r *ClusterReconciler) reconcileMachineDeployments(ctx context.Context, s *
// Create MachineDeployments.
for _, mdTopologyName := range diff.toCreate {
md := s.Desired.MachineDeployments[mdTopologyName]
if err := r.createMachineDeployment(ctx, md); err != nil {
if err := r.createMachineDeployment(ctx, s.Current.Cluster, md); err != nil {
return err
}
}
Expand All @@ -249,15 +272,15 @@ func (r *ClusterReconciler) reconcileMachineDeployments(ctx context.Context, s *
for _, mdTopologyName := range diff.toUpdate {
currentMD := s.Current.MachineDeployments[mdTopologyName]
desiredMD := s.Desired.MachineDeployments[mdTopologyName]
if err := r.updateMachineDeployment(ctx, s.Current.Cluster.Name, mdTopologyName, currentMD, desiredMD); err != nil {
if err := r.updateMachineDeployment(ctx, s.Current.Cluster, mdTopologyName, currentMD, desiredMD); err != nil {
return err
}
}

// Delete MachineDeployments.
for _, mdTopologyName := range diff.toDelete {
md := s.Current.MachineDeployments[mdTopologyName]
if err := r.deleteMachineDeployment(ctx, md); err != nil {
if err := r.deleteMachineDeployment(ctx, s.Current.Cluster, md); err != nil {
return err
}
}
Expand All @@ -266,18 +289,20 @@ func (r *ClusterReconciler) reconcileMachineDeployments(ctx context.Context, s *
}

// createMachineDeployment creates a MachineDeployment and the corresponding Templates.
func (r *ClusterReconciler) createMachineDeployment(ctx context.Context, md *scope.MachineDeploymentState) error {
func (r *ClusterReconciler) createMachineDeployment(ctx context.Context, cluster *clusterv1.Cluster, md *scope.MachineDeploymentState) error {
log := tlog.LoggerFrom(ctx).WithMachineDeployment(md.Object)

infraCtx, _ := log.WithObject(md.InfrastructureMachineTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(infraCtx, reconcileReferencedTemplateInput{
cluster: cluster,
desired: md.InfrastructureMachineTemplate,
}); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: md.Object})
}

bootstrapCtx, _ := log.WithObject(md.BootstrapTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(bootstrapCtx, reconcileReferencedTemplateInput{
cluster: cluster,
desired: md.BootstrapTemplate,
}); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: md.Object})
Expand All @@ -288,30 +313,34 @@ func (r *ClusterReconciler) createMachineDeployment(ctx context.Context, md *sco
if err := r.Client.Create(ctx, md.Object.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: md.Object})
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, createEventReason, "Created %q", tlog.KObj{Obj: md.Object})

return nil
}

// updateMachineDeployment updates a MachineDeployment. Also rotates the corresponding Templates if necessary.
func (r *ClusterReconciler) updateMachineDeployment(ctx context.Context, clusterName string, mdTopologyName string, currentMD, desiredMD *scope.MachineDeploymentState) error {
func (r *ClusterReconciler) updateMachineDeployment(ctx context.Context, cluster *clusterv1.Cluster, mdTopologyName string, currentMD, desiredMD *scope.MachineDeploymentState) error {
log := tlog.LoggerFrom(ctx).WithMachineDeployment(desiredMD.Object)

infraCtx, _ := log.WithObject(desiredMD.InfrastructureMachineTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(infraCtx, reconcileReferencedTemplateInput{
cluster: cluster,
ref: &desiredMD.Object.Spec.Template.Spec.InfrastructureRef,
current: currentMD.InfrastructureMachineTemplate,
desired: desiredMD.InfrastructureMachineTemplate,
templateNamePrefix: infrastructureMachineTemplateNamePrefix(clusterName, mdTopologyName),
templateNamePrefix: infrastructureMachineTemplateNamePrefix(cluster.Name, mdTopologyName),
compatibilityChecker: check.ObjectsAreCompatible,
}); err != nil {
return errors.Wrapf(err, "failed to update %s", tlog.KObj{Obj: currentMD.Object})
}

bootstrapCtx, _ := log.WithObject(desiredMD.BootstrapTemplate).Into(ctx)
if err := r.reconcileReferencedTemplate(bootstrapCtx, reconcileReferencedTemplateInput{
cluster: cluster,
ref: desiredMD.Object.Spec.Template.Spec.Bootstrap.ConfigRef,
current: currentMD.BootstrapTemplate,
desired: desiredMD.BootstrapTemplate,
templateNamePrefix: bootstrapTemplateNamePrefix(clusterName, mdTopologyName),
templateNamePrefix: bootstrapTemplateNamePrefix(cluster.Name, mdTopologyName),
compatibilityChecker: check.ObjectsAreInTheSameNamespace,
}); err != nil {
return errors.Wrapf(err, "failed to update %s", tlog.KObj{Obj: currentMD.Object})
Expand Down Expand Up @@ -344,19 +373,32 @@ func (r *ClusterReconciler) updateMachineDeployment(ctx context.Context, cluster
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: currentMD.Object})
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, updateEventReason, "Updated %q%s", tlog.KObj{Obj: currentMD.Object}, logMachineDeploymentVersionChange(currentMD.Object, desiredMD.Object))

// We want to call both cleanup functions even if one of them fails to clean up as much as possible.
return nil
}

func logMachineDeploymentVersionChange(current, desired *clusterv1.MachineDeployment) string {
if current.Spec.Template.Spec.Version == nil || desired.Spec.Template.Spec.Version == nil {
return ""
}

if *current.Spec.Template.Spec.Version != *desired.Spec.Template.Spec.Version {
return fmt.Sprintf(" with version change from %s to %s", *current.Spec.Template.Spec.Version, *desired.Spec.Template.Spec.Version)
}
return ""
}

// deleteMachineDeployment deletes a MachineDeployment.
func (r *ClusterReconciler) deleteMachineDeployment(ctx context.Context, md *scope.MachineDeploymentState) error {
func (r *ClusterReconciler) deleteMachineDeployment(ctx context.Context, cluster *clusterv1.Cluster, md *scope.MachineDeploymentState) error {
log := tlog.LoggerFrom(ctx).WithMachineDeployment(md.Object).WithObject(md.Object)

log.Infof("Deleting %s", tlog.KObj{Obj: md.Object})
if err := r.Client.Delete(ctx, md.Object); err != nil && !apierrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to delete %s", tlog.KObj{Obj: md.Object})
}
r.recorder.Eventf(cluster, corev1.EventTypeNormal, deleteEventReason, "Deleted %q", tlog.KObj{Obj: md.Object})
return nil
}

Expand Down Expand Up @@ -386,44 +428,77 @@ func calculateMachineDeploymentDiff(current, desired map[string]*scope.MachineDe
return diff
}

type unstructuredVersionGetter func(obj *unstructured.Unstructured) (*string, error)

type reconcileReferencedObjectInput struct {
cluster *clusterv1.Cluster
current *unstructured.Unstructured
desired *unstructured.Unstructured
versionGetter unstructuredVersionGetter
opts []mergepatch.HelperOption
}

// reconcileReferencedObject reconciles the desired state of the referenced object.
// NOTE: After a referenced object is created it is assumed that the reference should
// never change (only the content of the object can eventually change). Thus, we are checking for strict compatibility.
func (r *ClusterReconciler) reconcileReferencedObject(ctx context.Context, current, desired *unstructured.Unstructured, opts ...mergepatch.HelperOption) error {
func (r *ClusterReconciler) reconcileReferencedObject(ctx context.Context, in reconcileReferencedObjectInput) error {
log := tlog.LoggerFrom(ctx)

// If there is no current object, create it.
if current == nil {
log.Infof("Creating %s", tlog.KObj{Obj: desired})
if err := r.Client.Create(ctx, desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: desired})
if in.current == nil {
log.Infof("Creating %s", tlog.KObj{Obj: in.desired})
if err := r.Client.Create(ctx, in.desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, createEventReason, "Created %q", tlog.KObj{Obj: in.desired})
return nil
}

// Check if the current and desired referenced object are compatible.
if allErrs := check.ObjectsAreStrictlyCompatible(current, desired); len(allErrs) > 0 {
if allErrs := check.ObjectsAreStrictlyCompatible(in.current, in.desired); len(allErrs) > 0 {
return allErrs.ToAggregate()
}

// Check differences between current and desired state, and eventually patch the current object.
patchHelper, err := mergepatch.NewHelper(current, desired, r.Client, opts...)
patchHelper, err := mergepatch.NewHelper(in.current, in.desired, r.Client, in.opts...)
if err != nil {
return errors.Wrapf(err, "failed to create patch helper for %s", tlog.KObj{Obj: current})
return errors.Wrapf(err, "failed to create patch helper for %s", tlog.KObj{Obj: in.current})
}
if !patchHelper.HasChanges() {
log.V(3).Infof("No changes for %s", tlog.KObj{Obj: desired})
log.V(3).Infof("No changes for %s", tlog.KObj{Obj: in.desired})
return nil
}

log.Infof("Patching %s", tlog.KObj{Obj: desired})
log.Infof("Patching %s", tlog.KObj{Obj: in.desired})
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: current})
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: in.current})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, updateEventReason, "Updated %q%s", tlog.KObj{Obj: in.desired}, logUnstructuredVersionChange(in.current, in.desired, in.versionGetter))
return nil
}

func logUnstructuredVersionChange(current, desired *unstructured.Unstructured, versionGetter unstructuredVersionGetter) string {
if versionGetter == nil {
return ""
}

currentVersion, err := versionGetter(current)
if err != nil || currentVersion == nil {
return ""
}
desiredVersion, err := versionGetter(desired)
if err != nil || desiredVersion == nil {
return ""
}

if *currentVersion != *desiredVersion {
return fmt.Sprintf(" with version change from %s to %s", *currentVersion, *desiredVersion)
}
return ""
}

type reconcileReferencedTemplateInput struct {
cluster *clusterv1.Cluster
ref *corev1.ObjectReference
current *unstructured.Unstructured
desired *unstructured.Unstructured
Expand All @@ -448,6 +523,7 @@ func (r *ClusterReconciler) reconcileReferencedTemplate(ctx context.Context, in
if err := r.Client.Create(ctx, in.desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, createEventReason, "Created %q", tlog.KObj{Obj: in.desired})
return nil
}

Expand Down Expand Up @@ -479,6 +555,7 @@ func (r *ClusterReconciler) reconcileReferencedTemplate(ctx context.Context, in
if err := patchHelper.Patch(ctx); err != nil {
return errors.Wrapf(err, "failed to patch %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, updateEventReason, "Updated %q (metadata changes)", tlog.KObj{Obj: in.desired})
return nil
}

Expand All @@ -494,6 +571,7 @@ func (r *ClusterReconciler) reconcileReferencedTemplate(ctx context.Context, in
if err := r.Client.Create(ctx, in.desired.DeepCopy()); err != nil {
return errors.Wrapf(err, "failed to create %s", tlog.KObj{Obj: in.desired})
}
r.recorder.Eventf(in.cluster, corev1.EventTypeNormal, createEventReason, "Created %q as a replacement for %q (template rotation)", tlog.KObj{Obj: in.desired}, in.ref.Name)

// Update the reference with the new name.
// NOTE: Updating the object hosting reference to the template is executed outside this func.
Expand Down
Loading