From 4e5c00fa658027860d5e1369db7fedc13f54fd2b Mon Sep 17 00:00:00 2001 From: Dario Tranchitella Date: Thu, 2 Feb 2023 19:56:52 +0100 Subject: [PATCH] refactor: optimizing processing of tenant resources per namespace --- controllers/resources/processor.go | 209 +++++++++++++---------------- 1 file changed, 95 insertions(+), 114 deletions(-) diff --git a/controllers/resources/processor.go b/controllers/resources/processor.go index bd86905f..1d7c4483 100644 --- a/controllers/resources/processor.go +++ b/controllers/resources/processor.go @@ -130,153 +130,134 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant syncErr := new(multierror.Error) - for nsIndex, item := range spec.NamespacedItems { - keysAndValues := []interface{}{"index", nsIndex, "namespace", item.Namespace} - // A TenantResource is created by a TenantOwner, and potentially, they could point to a resource in a non-owned - // Namespace: this must be blocked by checking it this is the case. - if !allowCrossNamespaceSelection && !tntNamespaces.Has(item.Namespace) { - log.Info("skipping processing of namespacedItem, referring a Namespace that is not part of the given Global", keysAndValues...) + codecFactory := serializer.NewCodecFactory(r.client.Scheme()) - continue - } - // Namespaced Items are relying on selecting resources, rather than specifying a specific name: - // creating it to get used by the client List action. - itemSelector, selectorErr := metav1.LabelSelectorAsSelector(&item.Selector) - if err != nil { - log.Error(selectorErr, "cannot create Selector for namespacedItem", keysAndValues...) + for _, ns := range namespaces.Items { + for nsIndex, item := range spec.NamespacedItems { + keysAndValues := []interface{}{"index", nsIndex, "namespace", item.Namespace} + // A TenantResource is created by a TenantOwner, and potentially, they could point to a resource in a non-owned + // Namespace: this must be blocked by checking it this is the case. + if !allowCrossNamespaceSelection && !tntNamespaces.Has(item.Namespace) { + log.Info("skipping processing of namespacedItem, referring a Namespace that is not part of the given Tenant", keysAndValues...) - continue - } + continue + } + // Namespaced Items are relying on selecting resources, rather than specifying a specific name: + // creating it to get used by the client List action. + itemSelector, selectorErr := metav1.LabelSelectorAsSelector(&item.Selector) + if selectorErr != nil { + log.Error(selectorErr, "cannot create Selector for namespacedItem", keysAndValues...) - objs := unstructured.UnstructuredList{} - objs.SetGroupVersionKind(schema.FromAPIVersionAndKind(item.APIVersion, fmt.Sprintf("%sList", item.Kind))) + syncErr = multierror.Append(syncErr, selectorErr) - if clientErr := r.client.List(ctx, &objs, client.InNamespace(item.Namespace), client.MatchingLabelsSelector{Selector: itemSelector}); clientErr != nil { - log.Error(clientErr, "cannot retrieve object for namespacedItem", keysAndValues...) + continue + } - syncErr = multierror.Append(syncErr, clientErr) + objs := unstructured.UnstructuredList{} + objs.SetGroupVersionKind(schema.FromAPIVersionAndKind(item.APIVersion, fmt.Sprintf("%sList", item.Kind))) - continue - } + if clientErr := r.client.List(ctx, &objs, client.InNamespace(item.Namespace), client.MatchingLabelsSelector{Selector: itemSelector}); clientErr != nil { + log.Error(clientErr, "cannot retrieve object for namespacedItem", keysAndValues...) - multiErr := new(multierror.Group) - // Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces: - // in case of error during the create or update function, this will be appended to the list of errors. - for _, o := range objs.Items { - obj := o + syncErr = multierror.Append(syncErr, clientErr) - multiErr.Go(func() error { - nsItems, nsErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces) - if nsErr != nil { - log.Error(err, "unable to sync namespacedItems", keysAndValues...) + continue + } - return nsErr - } + multiErr := new(multierror.Group) + // Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces: + // in case of error during the create or update function, this will be appended to the list of errors. + for _, o := range objs.Items { + obj := o + obj.SetNamespace(ns.Name) - processed.Insert(nsItems...) + multiErr.Go(func() error { + kv := keysAndValues + kv = append(kv, []interface{}{"resource", fmt.Sprintf("%s/%s", obj.GetNamespace(), obj.GetNamespace())}) - return nil - }) - } + if opErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations); opErr != nil { + log.Error(opErr, "unable to sync namespacedItems", kv...) - if objsErr := multiErr.Wait(); objsErr != nil { - syncErr = multierror.Append(syncErr, objsErr) - } - } + return opErr + } - codecFactory := serializer.NewCodecFactory(r.client.Scheme()) + log.Info("resource has been replicated", kv...) - for rawIndex, item := range spec.RawItems { - obj, keysAndValues := unstructured.Unstructured{}, []interface{}{"index", rawIndex} + replicatedItem := &capsulev1beta2.ObjectReferenceStatus{} + replicatedItem.Name = obj.GetName() + replicatedItem.Kind = obj.GetKind() + replicatedItem.Namespace = ns.Name + replicatedItem.APIVersion = obj.GetAPIVersion() - if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode(item.Raw, nil, &obj); decodeErr != nil { - log.Error(decodeErr, "unable to deserialize rawItem", keysAndValues...) + processed.Insert(replicatedItem.String()) - syncErr = multierror.Append(syncErr, decodeErr) + return nil + }) + } - continue + if objsErr := multiErr.Wait(); objsErr != nil { + syncErr = multierror.Append(syncErr, objsErr) + } } - syncedRaw, rawErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces) - if rawErr != nil { - log.Info("unable to sync rawItem", keysAndValues...) - // In case of error processing an item in one of any selected Namespaces, storing it to report it lately - // to the upper call to ensure a partial sync that will be fixed by a subsequent reconciliation. - syncErr = multierror.Append(syncErr, rawErr) - } else { - processed.Insert(syncedRaw...) - } - } + for rawIndex, item := range spec.RawItems { + obj, keysAndValues := unstructured.Unstructured{}, []interface{}{"index", rawIndex} - return processed.List(), syncErr.ErrorOrNil() -} - -// createOrUpdate replicates the provided unstructured object to all the provided Namespaces: -// this function mimics the CreateOrUpdate, by retrieving the object to understand if it must be created or updated, -// along adding the additional metadata, if required. -func (r *Processor) createOrUpdate(ctx context.Context, obj *unstructured.Unstructured, labels map[string]string, annotations map[string]string, namespaces corev1.NamespaceList) ([]string, error) { - log := ctrllog.FromContext(ctx) + if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode(item.Raw, nil, &obj); decodeErr != nil { + log.Error(decodeErr, "unable to deserialize rawItem", keysAndValues...) - errGroup := new(multierror.Group) + syncErr = multierror.Append(syncErr, decodeErr) - var items []string - - for _, item := range namespaces.Items { - ns := item.GetName() - - errGroup.Go(func() (err error) { - actual, desired := &unstructured.Unstructured{}, obj.DeepCopy() - - actual.SetAPIVersion(desired.GetAPIVersion()) - actual.SetKind(desired.GetKind()) - actual.SetNamespace(desired.GetNamespace()) - actual.SetName(desired.GetName()) - // Using a deferred function to properly log the results, and adding the item to the processed set. - defer func() { - keysAndValues := []interface{}{"resource", fmt.Sprintf("%s/%s", ns, desired.GetName())} - - if err != nil { - log.Error(err, "unable to replicate resource", keysAndValues...) + continue + } - return - } + obj.SetNamespace(ns.Name) + if rawErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations); rawErr != nil { + log.Info("unable to sync rawItem", keysAndValues...) + // In case of error processing an item in one of any selected Namespaces, storing it to report it lately + // to the upper call to ensure a partial sync that will be fixed by a subsequent reconciliation. + syncErr = multierror.Append(syncErr, rawErr) + } else { log.Info("resource has been replicated", keysAndValues...) - replicatedItem := &capsulev1beta2.ObjectReferenceStatus{ - Name: obj.GetName(), - } + replicatedItem := &capsulev1beta2.ObjectReferenceStatus{} + replicatedItem.Name = obj.GetName() replicatedItem.Kind = obj.GetKind() - replicatedItem.Namespace = ns + replicatedItem.Namespace = ns.Name replicatedItem.APIVersion = obj.GetAPIVersion() - items = append(items, replicatedItem.String()) - }() + processed.Insert(replicatedItem.String()) + } + } + } - actual.SetNamespace(ns) + return processed.List(), syncErr.ErrorOrNil() +} - _, err = controllerutil.CreateOrUpdate(ctx, r.client, actual, func() error { - UID := actual.GetUID() - rv := actual.GetResourceVersion() +// createOrUpdate replicates the provided unstructured object to all the provided Namespaces: +// this function mimics the CreateOrUpdate, by retrieving the object to understand if it must be created or updated, +// along adding the additional metadata, if required. +func (r *Processor) createOrUpdate(ctx context.Context, obj *unstructured.Unstructured, labels map[string]string, annotations map[string]string) (err error) { + actual, desired := &unstructured.Unstructured{}, obj.DeepCopy() - actual.SetUnstructuredContent(desired.Object) - actual.SetNamespace(ns) - actual.SetLabels(labels) - actual.SetAnnotations(annotations) - actual.SetResourceVersion(rv) - actual.SetUID(UID) + actual.SetAPIVersion(desired.GetAPIVersion()) + actual.SetKind(desired.GetKind()) + actual.SetNamespace(desired.GetNamespace()) + actual.SetName(desired.GetName()) - return nil - }) + _, err = controllerutil.CreateOrUpdate(ctx, r.client, actual, func() error { + UID := actual.GetUID() + rv := actual.GetResourceVersion() - return - }) - } - // Wait returns *multierror.Error that implements stdlib error: - // the nil check must be performed down here rather than at the caller level to avoid wrong casting. - if err := errGroup.Wait(); err != nil { - return items, err - } + actual.SetUnstructuredContent(desired.Object) + actual.SetLabels(labels) + actual.SetAnnotations(annotations) + actual.SetResourceVersion(rv) + actual.SetUID(UID) + + return nil + }) - return items, nil + return err }