From 04e1675f73fcb70f65d559b1ec74e003f36868cf Mon Sep 17 00:00:00 2001 From: Dario Tranchitella Date: Thu, 2 Feb 2023 18:48:57 +0100 Subject: [PATCH 1/4] fix: creation of namespaced resources backed by cache --- controllers/resources/processor.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/controllers/resources/processor.go b/controllers/resources/processor.go index 6e381c7c..bd86905f 100644 --- a/controllers/resources/processor.go +++ b/controllers/resources/processor.go @@ -225,7 +225,12 @@ func (r *Processor) createOrUpdate(ctx context.Context, obj *unstructured.Unstru ns := item.GetName() errGroup.Go(func() (err error) { - actual, desired := obj.DeepCopy(), obj.DeepCopy() + actual, desired := &unstructured.Unstructured{}, obj.DeepCopy() + + actual.SetAPIVersion(desired.GetAPIVersion()) + actual.SetKind(desired.GetKind()) + actual.SetNamespace(desired.GetNamespace()) + actual.SetName(desired.GetName()) // Using a deferred function to properly log the results, and adding the item to the processed set. defer func() { keysAndValues := []interface{}{"resource", fmt.Sprintf("%s/%s", ns, desired.GetName())} From beb8a1e3ead586cf6bca2e230b7455170ee8da6f Mon Sep 17 00:00:00 2001 From: Dario Tranchitella Date: Thu, 2 Feb 2023 19:56:52 +0100 Subject: [PATCH 2/4] refactor: optimizing processing of tenant resources per namespace --- controllers/resources/processor.go | 209 +++++++++++++---------------- 1 file changed, 95 insertions(+), 114 deletions(-) diff --git a/controllers/resources/processor.go b/controllers/resources/processor.go index bd86905f..1d7c4483 100644 --- a/controllers/resources/processor.go +++ b/controllers/resources/processor.go @@ -130,153 +130,134 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant syncErr := new(multierror.Error) - for nsIndex, item := range spec.NamespacedItems { - keysAndValues := []interface{}{"index", nsIndex, "namespace", item.Namespace} - // A TenantResource is created by a TenantOwner, and potentially, they could point to a resource in a non-owned - // Namespace: this must be blocked by checking it this is the case. - if !allowCrossNamespaceSelection && !tntNamespaces.Has(item.Namespace) { - log.Info("skipping processing of namespacedItem, referring a Namespace that is not part of the given Global", keysAndValues...) + codecFactory := serializer.NewCodecFactory(r.client.Scheme()) - continue - } - // Namespaced Items are relying on selecting resources, rather than specifying a specific name: - // creating it to get used by the client List action. - itemSelector, selectorErr := metav1.LabelSelectorAsSelector(&item.Selector) - if err != nil { - log.Error(selectorErr, "cannot create Selector for namespacedItem", keysAndValues...) + for _, ns := range namespaces.Items { + for nsIndex, item := range spec.NamespacedItems { + keysAndValues := []interface{}{"index", nsIndex, "namespace", item.Namespace} + // A TenantResource is created by a TenantOwner, and potentially, they could point to a resource in a non-owned + // Namespace: this must be blocked by checking it this is the case. + if !allowCrossNamespaceSelection && !tntNamespaces.Has(item.Namespace) { + log.Info("skipping processing of namespacedItem, referring a Namespace that is not part of the given Tenant", keysAndValues...) - continue - } + continue + } + // Namespaced Items are relying on selecting resources, rather than specifying a specific name: + // creating it to get used by the client List action. + itemSelector, selectorErr := metav1.LabelSelectorAsSelector(&item.Selector) + if selectorErr != nil { + log.Error(selectorErr, "cannot create Selector for namespacedItem", keysAndValues...) - objs := unstructured.UnstructuredList{} - objs.SetGroupVersionKind(schema.FromAPIVersionAndKind(item.APIVersion, fmt.Sprintf("%sList", item.Kind))) + syncErr = multierror.Append(syncErr, selectorErr) - if clientErr := r.client.List(ctx, &objs, client.InNamespace(item.Namespace), client.MatchingLabelsSelector{Selector: itemSelector}); clientErr != nil { - log.Error(clientErr, "cannot retrieve object for namespacedItem", keysAndValues...) + continue + } - syncErr = multierror.Append(syncErr, clientErr) + objs := unstructured.UnstructuredList{} + objs.SetGroupVersionKind(schema.FromAPIVersionAndKind(item.APIVersion, fmt.Sprintf("%sList", item.Kind))) - continue - } + if clientErr := r.client.List(ctx, &objs, client.InNamespace(item.Namespace), client.MatchingLabelsSelector{Selector: itemSelector}); clientErr != nil { + log.Error(clientErr, "cannot retrieve object for namespacedItem", keysAndValues...) - multiErr := new(multierror.Group) - // Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces: - // in case of error during the create or update function, this will be appended to the list of errors. - for _, o := range objs.Items { - obj := o + syncErr = multierror.Append(syncErr, clientErr) - multiErr.Go(func() error { - nsItems, nsErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces) - if nsErr != nil { - log.Error(err, "unable to sync namespacedItems", keysAndValues...) + continue + } - return nsErr - } + multiErr := new(multierror.Group) + // Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces: + // in case of error during the create or update function, this will be appended to the list of errors. + for _, o := range objs.Items { + obj := o + obj.SetNamespace(ns.Name) - processed.Insert(nsItems...) + multiErr.Go(func() error { + kv := keysAndValues + kv = append(kv, []interface{}{"resource", fmt.Sprintf("%s/%s", obj.GetNamespace(), obj.GetNamespace())}) - return nil - }) - } + if opErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations); opErr != nil { + log.Error(opErr, "unable to sync namespacedItems", kv...) - if objsErr := multiErr.Wait(); objsErr != nil { - syncErr = multierror.Append(syncErr, objsErr) - } - } + return opErr + } - codecFactory := serializer.NewCodecFactory(r.client.Scheme()) + log.Info("resource has been replicated", kv...) - for rawIndex, item := range spec.RawItems { - obj, keysAndValues := unstructured.Unstructured{}, []interface{}{"index", rawIndex} + replicatedItem := &capsulev1beta2.ObjectReferenceStatus{} + replicatedItem.Name = obj.GetName() + replicatedItem.Kind = obj.GetKind() + replicatedItem.Namespace = ns.Name + replicatedItem.APIVersion = obj.GetAPIVersion() - if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode(item.Raw, nil, &obj); decodeErr != nil { - log.Error(decodeErr, "unable to deserialize rawItem", keysAndValues...) + processed.Insert(replicatedItem.String()) - syncErr = multierror.Append(syncErr, decodeErr) + return nil + }) + } - continue + if objsErr := multiErr.Wait(); objsErr != nil { + syncErr = multierror.Append(syncErr, objsErr) + } } - syncedRaw, rawErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces) - if rawErr != nil { - log.Info("unable to sync rawItem", keysAndValues...) - // In case of error processing an item in one of any selected Namespaces, storing it to report it lately - // to the upper call to ensure a partial sync that will be fixed by a subsequent reconciliation. - syncErr = multierror.Append(syncErr, rawErr) - } else { - processed.Insert(syncedRaw...) - } - } + for rawIndex, item := range spec.RawItems { + obj, keysAndValues := unstructured.Unstructured{}, []interface{}{"index", rawIndex} - return processed.List(), syncErr.ErrorOrNil() -} - -// createOrUpdate replicates the provided unstructured object to all the provided Namespaces: -// this function mimics the CreateOrUpdate, by retrieving the object to understand if it must be created or updated, -// along adding the additional metadata, if required. -func (r *Processor) createOrUpdate(ctx context.Context, obj *unstructured.Unstructured, labels map[string]string, annotations map[string]string, namespaces corev1.NamespaceList) ([]string, error) { - log := ctrllog.FromContext(ctx) + if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode(item.Raw, nil, &obj); decodeErr != nil { + log.Error(decodeErr, "unable to deserialize rawItem", keysAndValues...) - errGroup := new(multierror.Group) + syncErr = multierror.Append(syncErr, decodeErr) - var items []string - - for _, item := range namespaces.Items { - ns := item.GetName() - - errGroup.Go(func() (err error) { - actual, desired := &unstructured.Unstructured{}, obj.DeepCopy() - - actual.SetAPIVersion(desired.GetAPIVersion()) - actual.SetKind(desired.GetKind()) - actual.SetNamespace(desired.GetNamespace()) - actual.SetName(desired.GetName()) - // Using a deferred function to properly log the results, and adding the item to the processed set. - defer func() { - keysAndValues := []interface{}{"resource", fmt.Sprintf("%s/%s", ns, desired.GetName())} - - if err != nil { - log.Error(err, "unable to replicate resource", keysAndValues...) + continue + } - return - } + obj.SetNamespace(ns.Name) + if rawErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations); rawErr != nil { + log.Info("unable to sync rawItem", keysAndValues...) + // In case of error processing an item in one of any selected Namespaces, storing it to report it lately + // to the upper call to ensure a partial sync that will be fixed by a subsequent reconciliation. + syncErr = multierror.Append(syncErr, rawErr) + } else { log.Info("resource has been replicated", keysAndValues...) - replicatedItem := &capsulev1beta2.ObjectReferenceStatus{ - Name: obj.GetName(), - } + replicatedItem := &capsulev1beta2.ObjectReferenceStatus{} + replicatedItem.Name = obj.GetName() replicatedItem.Kind = obj.GetKind() - replicatedItem.Namespace = ns + replicatedItem.Namespace = ns.Name replicatedItem.APIVersion = obj.GetAPIVersion() - items = append(items, replicatedItem.String()) - }() + processed.Insert(replicatedItem.String()) + } + } + } - actual.SetNamespace(ns) + return processed.List(), syncErr.ErrorOrNil() +} - _, err = controllerutil.CreateOrUpdate(ctx, r.client, actual, func() error { - UID := actual.GetUID() - rv := actual.GetResourceVersion() +// createOrUpdate replicates the provided unstructured object to all the provided Namespaces: +// this function mimics the CreateOrUpdate, by retrieving the object to understand if it must be created or updated, +// along adding the additional metadata, if required. +func (r *Processor) createOrUpdate(ctx context.Context, obj *unstructured.Unstructured, labels map[string]string, annotations map[string]string) (err error) { + actual, desired := &unstructured.Unstructured{}, obj.DeepCopy() - actual.SetUnstructuredContent(desired.Object) - actual.SetNamespace(ns) - actual.SetLabels(labels) - actual.SetAnnotations(annotations) - actual.SetResourceVersion(rv) - actual.SetUID(UID) + actual.SetAPIVersion(desired.GetAPIVersion()) + actual.SetKind(desired.GetKind()) + actual.SetNamespace(desired.GetNamespace()) + actual.SetName(desired.GetName()) - return nil - }) + _, err = controllerutil.CreateOrUpdate(ctx, r.client, actual, func() error { + UID := actual.GetUID() + rv := actual.GetResourceVersion() - return - }) - } - // Wait returns *multierror.Error that implements stdlib error: - // the nil check must be performed down here rather than at the caller level to avoid wrong casting. - if err := errGroup.Wait(); err != nil { - return items, err - } + actual.SetUnstructuredContent(desired.Object) + actual.SetLabels(labels) + actual.SetAnnotations(annotations) + actual.SetResourceVersion(rv) + actual.SetUID(UID) + + return nil + }) - return items, nil + return err } From 033702c7a799280c90a0aba4bdaeed60af34f012 Mon Sep 17 00:00:00 2001 From: Dario Tranchitella Date: Thu, 2 Feb 2023 20:07:46 +0100 Subject: [PATCH 3/4] feat: template support for rawitems Allowed template values: - `{{ tenant.name }}` for the Tenant name managing the Namespace - `{{ namespace }}` for the Namespace where the resource is replicated --- controllers/resources/processor.go | 12 +++++++++++- go.mod | 2 ++ go.sum | 4 ++++ 3 files changed, 17 insertions(+), 1 deletion(-) diff --git a/controllers/resources/processor.go b/controllers/resources/processor.go index 1d7c4483..1db2b28a 100644 --- a/controllers/resources/processor.go +++ b/controllers/resources/processor.go @@ -8,6 +8,7 @@ import ( "fmt" "github.com/hashicorp/go-multierror" + "github.com/valyala/fasttemplate" corev1 "k8s.io/api/core/v1" apierr "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -201,9 +202,18 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant } for rawIndex, item := range spec.RawItems { + template := string(item.Raw) + + t := fasttemplate.New(template, "{{ ", " }}") + + tmplString := t.ExecuteString(map[string]interface{}{ + "tenant.name": tnt.Name, + "namespace": ns.Name, + }) + obj, keysAndValues := unstructured.Unstructured{}, []interface{}{"index", rawIndex} - if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode(item.Raw, nil, &obj); decodeErr != nil { + if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode([]byte(tmplString), nil, &obj); decodeErr != nil { log.Error(decodeErr, "unable to deserialize rawItem", keysAndValues...) syncErr = multierror.Append(syncErr, decodeErr) diff --git a/go.mod b/go.mod index daf6c332..27e8635f 100644 --- a/go.mod +++ b/go.mod @@ -59,6 +59,8 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasttemplate v1.2.2 // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/net v0.0.0-20220617184016-355a448f1bc9 // indirect diff --git a/go.sum b/go.sum index 6888b749..794bc157 100644 --- a/go.sum +++ b/go.sum @@ -482,6 +482,10 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= +github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= From f75201981b82a7de78c860e62a643fa39754cc87 Mon Sep 17 00:00:00 2001 From: Dario Tranchitella Date: Thu, 2 Feb 2023 20:09:20 +0100 Subject: [PATCH 4/4] test(e2e): template support for rawitems --- e2e/tenantresource_test.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/e2e/tenantresource_test.go b/e2e/tenantresource_test.go index a1d7cede..76847f82 100644 --- a/e2e/tenantresource_test.go +++ b/e2e/tenantresource_test.go @@ -104,6 +104,10 @@ var _ = Describe("Creating a TenantResource object", func() { Name: "raw-secret-1", }, Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "{{ tenant.name }}": []byte("Cg=="), + "{{ namespace }}": []byte("Cg=="), + }, }, }, }, @@ -118,6 +122,10 @@ var _ = Describe("Creating a TenantResource object", func() { Name: "raw-secret-2", }, Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "{{ tenant.name }}": []byte("Cg=="), + "{{ namespace }}": []byte("Cg=="), + }, }, }, }, @@ -132,6 +140,10 @@ var _ = Describe("Creating a TenantResource object", func() { Name: "raw-secret-3", }, Type: corev1.SecretTypeOpaque, + Data: map[string][]byte{ + "{{ tenant.name }}": []byte("Cg=="), + "{{ namespace }}": []byte("Cg=="), + }, }, }, }, @@ -220,6 +232,16 @@ var _ = Describe("Creating a TenantResource object", func() { return secrets.Items }, defaultTimeoutInterval, defaultPollInterval).Should(HaveLen(4)) }) + + By(fmt.Sprintf("ensuring raw items are templated in %s Namespace", ns), func() { + for _, name := range []string{"raw-secret-1", "raw-secret-2", "raw-secret-3"} { + secret := corev1.Secret{} + Expect(k8sClient.Get(context.TODO(), types.NamespacedName{Name: name, Namespace: ns}, &secret)).ToNot(HaveOccurred()) + + Expect(secret.Data).To(HaveKey(solar.Name)) + Expect(secret.Data).To(HaveKey(ns)) + } + }) } By("using a Namespace selector", func() {