Skip to content

Commit

Permalink
refactor: optimizing processing of tenant resources per namespace
Browse files Browse the repository at this point in the history
  • Loading branch information
prometherion committed Feb 16, 2023
1 parent d63a9a0 commit 4e5c00f
Showing 1 changed file with 95 additions and 114 deletions.
209 changes: 95 additions & 114 deletions controllers/resources/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,153 +130,134 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant

syncErr := new(multierror.Error)

for nsIndex, item := range spec.NamespacedItems {
keysAndValues := []interface{}{"index", nsIndex, "namespace", item.Namespace}
// A TenantResource is created by a TenantOwner, and potentially, they could point to a resource in a non-owned
// Namespace: this must be blocked by checking it this is the case.
if !allowCrossNamespaceSelection && !tntNamespaces.Has(item.Namespace) {
log.Info("skipping processing of namespacedItem, referring a Namespace that is not part of the given Global", keysAndValues...)
codecFactory := serializer.NewCodecFactory(r.client.Scheme())

continue
}
// Namespaced Items are relying on selecting resources, rather than specifying a specific name:
// creating it to get used by the client List action.
itemSelector, selectorErr := metav1.LabelSelectorAsSelector(&item.Selector)
if err != nil {
log.Error(selectorErr, "cannot create Selector for namespacedItem", keysAndValues...)
for _, ns := range namespaces.Items {
for nsIndex, item := range spec.NamespacedItems {
keysAndValues := []interface{}{"index", nsIndex, "namespace", item.Namespace}
// A TenantResource is created by a TenantOwner, and potentially, they could point to a resource in a non-owned
// Namespace: this must be blocked by checking it this is the case.
if !allowCrossNamespaceSelection && !tntNamespaces.Has(item.Namespace) {
log.Info("skipping processing of namespacedItem, referring a Namespace that is not part of the given Tenant", keysAndValues...)

continue
}
continue
}
// Namespaced Items are relying on selecting resources, rather than specifying a specific name:
// creating it to get used by the client List action.
itemSelector, selectorErr := metav1.LabelSelectorAsSelector(&item.Selector)
if selectorErr != nil {
log.Error(selectorErr, "cannot create Selector for namespacedItem", keysAndValues...)

objs := unstructured.UnstructuredList{}
objs.SetGroupVersionKind(schema.FromAPIVersionAndKind(item.APIVersion, fmt.Sprintf("%sList", item.Kind)))
syncErr = multierror.Append(syncErr, selectorErr)

if clientErr := r.client.List(ctx, &objs, client.InNamespace(item.Namespace), client.MatchingLabelsSelector{Selector: itemSelector}); clientErr != nil {
log.Error(clientErr, "cannot retrieve object for namespacedItem", keysAndValues...)
continue
}

syncErr = multierror.Append(syncErr, clientErr)
objs := unstructured.UnstructuredList{}
objs.SetGroupVersionKind(schema.FromAPIVersionAndKind(item.APIVersion, fmt.Sprintf("%sList", item.Kind)))

continue
}
if clientErr := r.client.List(ctx, &objs, client.InNamespace(item.Namespace), client.MatchingLabelsSelector{Selector: itemSelector}); clientErr != nil {
log.Error(clientErr, "cannot retrieve object for namespacedItem", keysAndValues...)

multiErr := new(multierror.Group)
// Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces:
// in case of error during the create or update function, this will be appended to the list of errors.
for _, o := range objs.Items {
obj := o
syncErr = multierror.Append(syncErr, clientErr)

multiErr.Go(func() error {
nsItems, nsErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces)
if nsErr != nil {
log.Error(err, "unable to sync namespacedItems", keysAndValues...)
continue
}

return nsErr
}
multiErr := new(multierror.Group)
// Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces:
// in case of error during the create or update function, this will be appended to the list of errors.
for _, o := range objs.Items {
obj := o
obj.SetNamespace(ns.Name)

processed.Insert(nsItems...)
multiErr.Go(func() error {
kv := keysAndValues
kv = append(kv, []interface{}{"resource", fmt.Sprintf("%s/%s", obj.GetNamespace(), obj.GetNamespace())})

return nil
})
}
if opErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations); opErr != nil {
log.Error(opErr, "unable to sync namespacedItems", kv...)

if objsErr := multiErr.Wait(); objsErr != nil {
syncErr = multierror.Append(syncErr, objsErr)
}
}
return opErr
}

codecFactory := serializer.NewCodecFactory(r.client.Scheme())
log.Info("resource has been replicated", kv...)

for rawIndex, item := range spec.RawItems {
obj, keysAndValues := unstructured.Unstructured{}, []interface{}{"index", rawIndex}
replicatedItem := &capsulev1beta2.ObjectReferenceStatus{}
replicatedItem.Name = obj.GetName()
replicatedItem.Kind = obj.GetKind()
replicatedItem.Namespace = ns.Name
replicatedItem.APIVersion = obj.GetAPIVersion()

if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode(item.Raw, nil, &obj); decodeErr != nil {
log.Error(decodeErr, "unable to deserialize rawItem", keysAndValues...)
processed.Insert(replicatedItem.String())

syncErr = multierror.Append(syncErr, decodeErr)
return nil
})
}

continue
if objsErr := multiErr.Wait(); objsErr != nil {
syncErr = multierror.Append(syncErr, objsErr)
}
}

syncedRaw, rawErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces)
if rawErr != nil {
log.Info("unable to sync rawItem", keysAndValues...)
// In case of error processing an item in one of any selected Namespaces, storing it to report it lately
// to the upper call to ensure a partial sync that will be fixed by a subsequent reconciliation.
syncErr = multierror.Append(syncErr, rawErr)
} else {
processed.Insert(syncedRaw...)
}
}
for rawIndex, item := range spec.RawItems {
obj, keysAndValues := unstructured.Unstructured{}, []interface{}{"index", rawIndex}

return processed.List(), syncErr.ErrorOrNil()
}

// createOrUpdate replicates the provided unstructured object to all the provided Namespaces:
// this function mimics the CreateOrUpdate, by retrieving the object to understand if it must be created or updated,
// along adding the additional metadata, if required.
func (r *Processor) createOrUpdate(ctx context.Context, obj *unstructured.Unstructured, labels map[string]string, annotations map[string]string, namespaces corev1.NamespaceList) ([]string, error) {
log := ctrllog.FromContext(ctx)
if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode(item.Raw, nil, &obj); decodeErr != nil {
log.Error(decodeErr, "unable to deserialize rawItem", keysAndValues...)

errGroup := new(multierror.Group)
syncErr = multierror.Append(syncErr, decodeErr)

var items []string

for _, item := range namespaces.Items {
ns := item.GetName()

errGroup.Go(func() (err error) {
actual, desired := &unstructured.Unstructured{}, obj.DeepCopy()

actual.SetAPIVersion(desired.GetAPIVersion())
actual.SetKind(desired.GetKind())
actual.SetNamespace(desired.GetNamespace())
actual.SetName(desired.GetName())
// Using a deferred function to properly log the results, and adding the item to the processed set.
defer func() {
keysAndValues := []interface{}{"resource", fmt.Sprintf("%s/%s", ns, desired.GetName())}

if err != nil {
log.Error(err, "unable to replicate resource", keysAndValues...)
continue
}

return
}
obj.SetNamespace(ns.Name)

if rawErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations); rawErr != nil {
log.Info("unable to sync rawItem", keysAndValues...)
// In case of error processing an item in one of any selected Namespaces, storing it to report it lately
// to the upper call to ensure a partial sync that will be fixed by a subsequent reconciliation.
syncErr = multierror.Append(syncErr, rawErr)
} else {
log.Info("resource has been replicated", keysAndValues...)

replicatedItem := &capsulev1beta2.ObjectReferenceStatus{
Name: obj.GetName(),
}
replicatedItem := &capsulev1beta2.ObjectReferenceStatus{}
replicatedItem.Name = obj.GetName()
replicatedItem.Kind = obj.GetKind()
replicatedItem.Namespace = ns
replicatedItem.Namespace = ns.Name
replicatedItem.APIVersion = obj.GetAPIVersion()

items = append(items, replicatedItem.String())
}()
processed.Insert(replicatedItem.String())
}
}
}

actual.SetNamespace(ns)
return processed.List(), syncErr.ErrorOrNil()
}

_, err = controllerutil.CreateOrUpdate(ctx, r.client, actual, func() error {
UID := actual.GetUID()
rv := actual.GetResourceVersion()
// createOrUpdate replicates the provided unstructured object to all the provided Namespaces:
// this function mimics the CreateOrUpdate, by retrieving the object to understand if it must be created or updated,
// along adding the additional metadata, if required.
func (r *Processor) createOrUpdate(ctx context.Context, obj *unstructured.Unstructured, labels map[string]string, annotations map[string]string) (err error) {
actual, desired := &unstructured.Unstructured{}, obj.DeepCopy()

actual.SetUnstructuredContent(desired.Object)
actual.SetNamespace(ns)
actual.SetLabels(labels)
actual.SetAnnotations(annotations)
actual.SetResourceVersion(rv)
actual.SetUID(UID)
actual.SetAPIVersion(desired.GetAPIVersion())
actual.SetKind(desired.GetKind())
actual.SetNamespace(desired.GetNamespace())
actual.SetName(desired.GetName())

return nil
})
_, err = controllerutil.CreateOrUpdate(ctx, r.client, actual, func() error {
UID := actual.GetUID()
rv := actual.GetResourceVersion()

return
})
}
// Wait returns *multierror.Error that implements stdlib error:
// the nil check must be performed down here rather than at the caller level to avoid wrong casting.
if err := errGroup.Wait(); err != nil {
return items, err
}
actual.SetUnstructuredContent(desired.Object)
actual.SetLabels(labels)
actual.SetAnnotations(annotations)
actual.SetResourceVersion(rv)
actual.SetUID(UID)

return nil
})

return items, nil
return err
}

0 comments on commit 4e5c00f

Please sign in to comment.